From 1035364269ab3d63ed40e57f02eb78777e94b0a4 Mon Sep 17 00:00:00 2001 From: Michael Dales Date: Tue, 29 Oct 2024 22:35:57 +0000 Subject: [PATCH] Fixes an issue with late close of file handles. Although in the command running loop we defer'd the close of stdout, so we technically were safe, because that was in a loop the defer wasn't closed until a lot later than necessary, and on large batches can cause ulimit to be exceeded. This fix moves the command exection to its own function which fixes the late defer, and also makes the code more easy to read. --- littlejohn.go | 116 ++++++++++++++++++++++++++------------------------ 1 file changed, 60 insertions(+), 56 deletions(-) diff --git a/littlejohn.go b/littlejohn.go index 81230e6..094d021 100644 --- a/littlejohn.go +++ b/littlejohn.go @@ -11,15 +11,15 @@ import ( "strings" "sync" - "github.com/jawher/mow.cli" + cli "github.com/jawher/mow.cli" ) func openCSV(filename string) (*csv.Reader, []string, error) { file, err := os.Open(filename) - if err != nil { + if err != nil { return nil, nil, err - } - reader := csv.NewReader(file) + } + reader := csv.NewReader(file) raw_argnames, err := reader.Read() if err != nil { @@ -35,7 +35,53 @@ func openCSV(filename string) (*csv.Reader, []string, error) { return reader, argnames, nil } -func runner(command string, parallelism int, dryrun bool, csvfile string, fixedargs []string, outputfile string) error { +func run_command(command string, fullarglist []string, output_channel chan string) { + + cmd := &exec.Cmd{ + Path: command, + Args: fullarglist, + Stderr: os.Stderr, + } + stdout, err := cmd.StdoutPipe() + if err != nil { + log.Printf("Failed to get output pipe for %v: %v", fullarglist, err) + return + } + defer stdout.Close() + + err = cmd.Start() + if err != nil { + log.Printf("Failed to run %v: %v", fullarglist, err) + return + } + + buf := bufio.NewReader(stdout) + count := 0 + for { + line, _, err := buf.ReadLine() + if err != nil { + if err != io.EOF { + log.Printf("Error reading process %v: %v", fullarglist, err) + } + break + } + result := fullarglist[0] + for _, arg := range fullarglist[1:] { + result = fmt.Sprintf("%v, %v", result, arg) + } + result = fmt.Sprintf("%v, %d, %v", result, count, string(line)) + output_channel <- result + count += 1 + } + + err = cmd.Wait() + if err != nil { + log.Printf("Failed to wait %v: %v", fullarglist, err) + return + } +} + +func cmd_main(command string, parallelism int, dryrun bool, csvfile string, fixedargs []string, outputfile string) error { argreader, argnames, err := openCSV(csvfile) if err != nil { @@ -54,8 +100,8 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda output_channel := make(chan string, 1) go func() { - for ;; { - res := <- output_channel + for { + res := <-output_channel output.Write([]byte(res + "\n")) } }() @@ -68,7 +114,7 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda wg.Add(1) go func() { for { - args := <- distribution + args := <-distribution if len(args) == 0 { break } @@ -78,11 +124,9 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda continue } - fullarglist := make([]string, 1, (len(args) * 2) + len(fixedargs) + 1) + fullarglist := make([]string, 1, (len(args)*2)+len(fixedargs)+1) fullarglist[0] = command - for _, arg := range fixedargs { - fullarglist = append(fullarglist, arg) - } + fullarglist = append(fullarglist, fixedargs...) for argindex := 0; argindex < len(args); argindex += 1 { fullarglist = append(fullarglist, argnames[argindex]) fullarglist = append(fullarglist, args[argindex]) @@ -92,49 +136,9 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda fmt.Printf("%s\n", strings.Join(fullarglist, " ")) continue } + fmt.Printf("%s\n", strings.Join(fullarglist, " ")) - cmd := &exec.Cmd{ - Path: command, - Args: fullarglist, - Stderr: os.Stderr, - } - stdout, err := cmd.StdoutPipe() - if err != nil { - log.Printf("Failed to get output pipe for %v: %v", fullarglist, err) - continue - } - defer stdout.Close() - - err = cmd.Start() - if err != nil { - log.Printf("Failed to run %v: %v", fullarglist, err) - continue - } - - buf := bufio.NewReader(stdout) - count := 0 - for ;; { - line, _, err := buf.ReadLine() - if err != nil { - if err != io.EOF { - log.Printf("Error reading process %v: %v", fullarglist, err) - } - break - } - result := fullarglist[0] - for _, arg := range fullarglist[1:] { - result = fmt.Sprintf("%v, %v", result, arg) - } - result = fmt.Sprintf("%v, %d, %v", result, count, string(line)) - output_channel <- result - count += 1 - } - - err = cmd.Wait() - if err != nil { - log.Printf("Failed to wait %v: %v", fullarglist, err) - continue - } + run_command(command, fullarglist, output_channel) } wg.Done() }() @@ -172,8 +176,8 @@ func main() { args := app.StringsArg("ARGS", nil, "Fixed arguments to all child programs") app.Action = func() { - runner(*command, *parallelism, *dryrun, *csv, *args, *output) + cmd_main(*command, *parallelism, *dryrun, *csv, *args, *output) } app.Run(os.Args) -} \ No newline at end of file +}