-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcsvstutter.go
103 lines (98 loc) · 2.21 KB
/
csvstutter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package multicorecsv
import (
"bytes"
"encoding/csv"
"io"
)
type Reader struct {
reader *csv.Reader
done chan error
toWrite chan [][]string
writer *csv.Writer
toRead chan []string
}
func NewReader(rdr io.Reader, size int) *Reader {
reader := csv.NewReader(rdr)
reader.LazyQuotes = true
reader.FieldsPerRecord = -1 // disabling this check
buf := &bytes.Buffer{}
r := &Reader{
reader: reader,
done: make(chan error),
toWrite: make(chan [][]string, size),
writer: csv.NewWriter(buf),
toRead: make(chan []string, size),
}
go func() {
defer close(r.toRead)
// queue from the reader, then queue to the writer
for {
tw, ok := <-r.toWrite
if !ok {
return
}
for x := range tw {
select {
case r.toRead <- tw[x]:
case <-r.done:
return
}
}
}
}()
go func() {
// read and unstutter them!
defer func() {
close(r.toWrite)
}()
toSend := make([][]string, 0, size)
for {
line, err := reader.Read()
if err == io.EOF {
select {
case _, ok := <-r.done:
_ = ok // we don't use this but I need to receive a value for the compiler
// done processing
case r.toWrite <- toSend:
}
return
}
if err != nil {
select {
case _, ok := <-r.done:
_ = ok // we don't use this but I need to receive a value for the compiler
// done processing
case r.done <- err:
// error was sent, stop reading
}
return
}
toSend = append(toSend, line)
if len(toSend) == size {
select {
case _, ok := <-r.done:
_ = ok // we don't use this but I need to receive a value for the compiler
// done processing
case r.toWrite <- toSend:
toSend = make([][]string, 0, size)
// log.Printf("sent line toWrite")
}
}
}
}()
return r
}
// Close cleans up the resources created to read the file multicore style
func (reader *Reader) Close() {
close(reader.done)
}
// Read returns only valid CSV data as read from the source and removes multilines and stutter
// if there's an error all subsequent calls to Read will fail with the same error
func (reader *Reader) Read() ([]string, error) {
select {
case line := <-reader.toRead:
return line, nil
case err := <-reader.done:
return nil, err
}
}