Skip to content

Commit

Permalink
Fixes an issue with late close of file handles.
Browse files Browse the repository at this point in the history
Although in the command running loop we defer'd the close of stdout, so
we technically were safe, because that was in a loop the defer wasn't
closed until a lot later than necessary, and on large batches can cause
ulimit to be exceeded.

This fix moves the command exection to its own function which fixes the
late defer, and also makes the code more easy to read.
  • Loading branch information
mdales committed Oct 29, 2024
1 parent 5143aef commit 1035364
Showing 1 changed file with 60 additions and 56 deletions.
116 changes: 60 additions & 56 deletions littlejohn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (
"strings"
"sync"

"github.com/jawher/mow.cli"
cli "github.com/jawher/mow.cli"
)

func openCSV(filename string) (*csv.Reader, []string, error) {
file, err := os.Open(filename)
if err != nil {
if err != nil {
return nil, nil, err
}
reader := csv.NewReader(file)
}
reader := csv.NewReader(file)

raw_argnames, err := reader.Read()
if err != nil {
Expand All @@ -35,7 +35,53 @@ func openCSV(filename string) (*csv.Reader, []string, error) {
return reader, argnames, nil
}

func runner(command string, parallelism int, dryrun bool, csvfile string, fixedargs []string, outputfile string) error {
func run_command(command string, fullarglist []string, output_channel chan string) {

cmd := &exec.Cmd{
Path: command,
Args: fullarglist,
Stderr: os.Stderr,
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Failed to get output pipe for %v: %v", fullarglist, err)
return
}
defer stdout.Close()

err = cmd.Start()
if err != nil {
log.Printf("Failed to run %v: %v", fullarglist, err)
return
}

buf := bufio.NewReader(stdout)
count := 0
for {
line, _, err := buf.ReadLine()
if err != nil {
if err != io.EOF {
log.Printf("Error reading process %v: %v", fullarglist, err)
}
break
}
result := fullarglist[0]
for _, arg := range fullarglist[1:] {
result = fmt.Sprintf("%v, %v", result, arg)
}
result = fmt.Sprintf("%v, %d, %v", result, count, string(line))
output_channel <- result
count += 1
}

err = cmd.Wait()
if err != nil {
log.Printf("Failed to wait %v: %v", fullarglist, err)
return
}
}

func cmd_main(command string, parallelism int, dryrun bool, csvfile string, fixedargs []string, outputfile string) error {

argreader, argnames, err := openCSV(csvfile)
if err != nil {
Expand All @@ -54,8 +100,8 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda

output_channel := make(chan string, 1)
go func() {
for ;; {
res := <- output_channel
for {
res := <-output_channel
output.Write([]byte(res + "\n"))
}
}()
Expand All @@ -68,7 +114,7 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda
wg.Add(1)
go func() {
for {
args := <- distribution
args := <-distribution
if len(args) == 0 {
break
}
Expand All @@ -78,11 +124,9 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda
continue
}

fullarglist := make([]string, 1, (len(args) * 2) + len(fixedargs) + 1)
fullarglist := make([]string, 1, (len(args)*2)+len(fixedargs)+1)
fullarglist[0] = command
for _, arg := range fixedargs {
fullarglist = append(fullarglist, arg)
}
fullarglist = append(fullarglist, fixedargs...)
for argindex := 0; argindex < len(args); argindex += 1 {
fullarglist = append(fullarglist, argnames[argindex])
fullarglist = append(fullarglist, args[argindex])
Expand All @@ -92,49 +136,9 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda
fmt.Printf("%s\n", strings.Join(fullarglist, " "))
continue
}
fmt.Printf("%s\n", strings.Join(fullarglist, " "))

cmd := &exec.Cmd{
Path: command,
Args: fullarglist,
Stderr: os.Stderr,
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Failed to get output pipe for %v: %v", fullarglist, err)
continue
}
defer stdout.Close()

err = cmd.Start()
if err != nil {
log.Printf("Failed to run %v: %v", fullarglist, err)
continue
}

buf := bufio.NewReader(stdout)
count := 0
for ;; {
line, _, err := buf.ReadLine()
if err != nil {
if err != io.EOF {
log.Printf("Error reading process %v: %v", fullarglist, err)
}
break
}
result := fullarglist[0]
for _, arg := range fullarglist[1:] {
result = fmt.Sprintf("%v, %v", result, arg)
}
result = fmt.Sprintf("%v, %d, %v", result, count, string(line))
output_channel <- result
count += 1
}

err = cmd.Wait()
if err != nil {
log.Printf("Failed to wait %v: %v", fullarglist, err)
continue
}
run_command(command, fullarglist, output_channel)
}
wg.Done()
}()
Expand Down Expand Up @@ -172,8 +176,8 @@ func main() {
args := app.StringsArg("ARGS", nil, "Fixed arguments to all child programs")

app.Action = func() {
runner(*command, *parallelism, *dryrun, *csv, *args, *output)
cmd_main(*command, *parallelism, *dryrun, *csv, *args, *output)
}

app.Run(os.Args)
}
}

0 comments on commit 1035364

Please sign in to comment.