Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes an issue with late close of file handles. #1

Merged
merged 1 commit into from
Oct 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 60 additions & 56 deletions littlejohn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (
"strings"
"sync"

"github.com/jawher/mow.cli"
cli "github.com/jawher/mow.cli"
)

func openCSV(filename string) (*csv.Reader, []string, error) {
file, err := os.Open(filename)
if err != nil {
if err != nil {
return nil, nil, err
}
reader := csv.NewReader(file)
}
reader := csv.NewReader(file)

raw_argnames, err := reader.Read()
if err != nil {
Expand All @@ -35,7 +35,53 @@ func openCSV(filename string) (*csv.Reader, []string, error) {
return reader, argnames, nil
}

func runner(command string, parallelism int, dryrun bool, csvfile string, fixedargs []string, outputfile string) error {
func run_command(command string, fullarglist []string, output_channel chan string) {

cmd := &exec.Cmd{
Path: command,
Args: fullarglist,
Stderr: os.Stderr,
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Failed to get output pipe for %v: %v", fullarglist, err)
return
}
defer stdout.Close()

err = cmd.Start()
if err != nil {
log.Printf("Failed to run %v: %v", fullarglist, err)
return
}

buf := bufio.NewReader(stdout)
count := 0
for {
line, _, err := buf.ReadLine()
if err != nil {
if err != io.EOF {
log.Printf("Error reading process %v: %v", fullarglist, err)
}
break
}
result := fullarglist[0]
for _, arg := range fullarglist[1:] {
result = fmt.Sprintf("%v, %v", result, arg)
}
result = fmt.Sprintf("%v, %d, %v", result, count, string(line))
output_channel <- result
count += 1
}

err = cmd.Wait()
if err != nil {
log.Printf("Failed to wait %v: %v", fullarglist, err)
return
}
}

func cmd_main(command string, parallelism int, dryrun bool, csvfile string, fixedargs []string, outputfile string) error {

argreader, argnames, err := openCSV(csvfile)
if err != nil {
Expand All @@ -54,8 +100,8 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda

output_channel := make(chan string, 1)
go func() {
for ;; {
res := <- output_channel
for {
res := <-output_channel
output.Write([]byte(res + "\n"))
}
}()
Expand All @@ -68,7 +114,7 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda
wg.Add(1)
go func() {
for {
args := <- distribution
args := <-distribution
if len(args) == 0 {
break
}
Expand All @@ -78,11 +124,9 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda
continue
}

fullarglist := make([]string, 1, (len(args) * 2) + len(fixedargs) + 1)
fullarglist := make([]string, 1, (len(args)*2)+len(fixedargs)+1)
fullarglist[0] = command
for _, arg := range fixedargs {
fullarglist = append(fullarglist, arg)
}
fullarglist = append(fullarglist, fixedargs...)
for argindex := 0; argindex < len(args); argindex += 1 {
fullarglist = append(fullarglist, argnames[argindex])
fullarglist = append(fullarglist, args[argindex])
Expand All @@ -92,49 +136,9 @@ func runner(command string, parallelism int, dryrun bool, csvfile string, fixeda
fmt.Printf("%s\n", strings.Join(fullarglist, " "))
continue
}
fmt.Printf("%s\n", strings.Join(fullarglist, " "))

cmd := &exec.Cmd{
Path: command,
Args: fullarglist,
Stderr: os.Stderr,
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Failed to get output pipe for %v: %v", fullarglist, err)
continue
}
defer stdout.Close()

err = cmd.Start()
if err != nil {
log.Printf("Failed to run %v: %v", fullarglist, err)
continue
}

buf := bufio.NewReader(stdout)
count := 0
for ;; {
line, _, err := buf.ReadLine()
if err != nil {
if err != io.EOF {
log.Printf("Error reading process %v: %v", fullarglist, err)
}
break
}
result := fullarglist[0]
for _, arg := range fullarglist[1:] {
result = fmt.Sprintf("%v, %v", result, arg)
}
result = fmt.Sprintf("%v, %d, %v", result, count, string(line))
output_channel <- result
count += 1
}

err = cmd.Wait()
if err != nil {
log.Printf("Failed to wait %v: %v", fullarglist, err)
continue
}
run_command(command, fullarglist, output_channel)
}
wg.Done()
}()
Expand Down Expand Up @@ -172,8 +176,8 @@ func main() {
args := app.StringsArg("ARGS", nil, "Fixed arguments to all child programs")

app.Action = func() {
runner(*command, *parallelism, *dryrun, *csv, *args, *output)
cmd_main(*command, *parallelism, *dryrun, *csv, *args, *output)
}

app.Run(os.Args)
}
}
Loading