Skip to content

Commit

Permalink
Merge pull request #1 from quantifyearth/mwd-release-stdout-sooner
Browse files Browse the repository at this point in the history
Fixes an issue with late close of file handles.
  • Loading branch information
mdales authored Oct 29, 2024
2 parents 6806371 + 1035364 commit e123d99
Showing 1 changed file with 60 additions and 56 deletions.
116 changes: 60 additions & 56 deletions littlejohn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (
"strings"
"sync"

"github.com/jawher/mow.cli"
cli "github.com/jawher/mow.cli"
)

func openCSV(filename string) (*csv.Reader, []string, error) {
file, err := os.Open(filename)
if err != nil {
if err != nil {
return nil, nil, err
}
reader := csv.NewReader(file)
}
reader := csv.NewReader(file)

raw_argnames, err := reader.Read()
if err != nil {
Expand All @@ -35,7 +35,53 @@ func openCSV(filename string) (*csv.Reader, []string, error) {
return reader, argnames, nil
}

func runner(command string, parallelism int, dryrun bool, csvfile string, fixedargs []string, outputfile string) error {
func run_command(command string, fullarglist []string, output_channel chan string) {

cmd := &exec.Cmd{
Path: command,
Args: fullarglist,
Stderr: os.Stderr,
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Failed to get output pipe for %v: %v", fullarglist, err)
return
}
defer stdout.Close()

err = cmd.Start()
if err != nil {
log.Printf("Failed to run %v: %v", fullarglist, err)
return
}

buf := bufio.NewReader(stdout)
count := 0
for {
line, _, err := buf.ReadLine()
if err != nil {
if err != io.EOF {
log.Printf("Error reading process %v: %v", fullarglist, err)
}
break
}
result := fullarglist[0]
for _, arg := range fullarglist[1:] {
result = fmt.Sprintf("%v, %v", result, arg)
}
result = fmt.Sprintf("%v, %d, %v", result, count, string(line))
output_channel <- result
count += 1
}

err = cmd.Wait()
if err != nil {
log.Printf("Failed to wait %v: %v", fullarglist, err)
return
}
}

func cmd_main(command string, parallelism int, dryrun bool, csvfile string, fixedargs []string, outputfile string) error {

argreader, argnames, err := openCSV(csvfile)
if err != nil {
Expand All @@ -54,8 +100,8 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda

output_channel := make(chan string, 1)
go func() {
for ;; {
res := <- output_channel
for {
res := <-output_channel
output.Write([]byte(res + "\n"))
}
}()
Expand All @@ -68,7 +114,7 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda
wg.Add(1)
go func() {
for {
args := <- distribution
args := <-distribution
if len(args) == 0 {
break
}
Expand All @@ -78,11 +124,9 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda
continue
}

fullarglist := make([]string, 1, (len(args) * 2) + len(fixedargs) + 1)
fullarglist := make([]string, 1, (len(args)*2)+len(fixedargs)+1)
fullarglist[0] = command
for _, arg := range fixedargs {
fullarglist = append(fullarglist, arg)
}
fullarglist = append(fullarglist, fixedargs...)
for argindex := 0; argindex < len(args); argindex += 1 {
fullarglist = append(fullarglist, argnames[argindex])
fullarglist = append(fullarglist, args[argindex])
Expand All @@ -92,49 +136,9 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda
fmt.Printf("%s\n", strings.Join(fullarglist, " "))
continue
}
fmt.Printf("%s\n", strings.Join(fullarglist, " "))

cmd := &exec.Cmd{
Path: command,
Args: fullarglist,
Stderr: os.Stderr,
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Failed to get output pipe for %v: %v", fullarglist, err)
continue
}
defer stdout.Close()

err = cmd.Start()
if err != nil {
log.Printf("Failed to run %v: %v", fullarglist, err)
continue
}

buf := bufio.NewReader(stdout)
count := 0
for ;; {
line, _, err := buf.ReadLine()
if err != nil {
if err != io.EOF {
log.Printf("Error reading process %v: %v", fullarglist, err)
}
break
}
result := fullarglist[0]
for _, arg := range fullarglist[1:] {
result = fmt.Sprintf("%v, %v", result, arg)
}
result = fmt.Sprintf("%v, %d, %v", result, count, string(line))
output_channel <- result
count += 1
}

err = cmd.Wait()
if err != nil {
log.Printf("Failed to wait %v: %v", fullarglist, err)
continue
}
run_command(command, fullarglist, output_channel)
}
wg.Done()
}()
Expand Down Expand Up @@ -172,8 +176,8 @@ func main() {
args := app.StringsArg("ARGS", nil, "Fixed arguments to all child programs")

app.Action = func() {
runner(*command, *parallelism, *dryrun, *csv, *args, *output)
cmd_main(*command, *parallelism, *dryrun, *csv, *args, *output)
}

app.Run(os.Args)
}
}

0 comments on commit e123d99

Please sign in to comment.