Skip to content

Commit

Permalink
Add header option to csv module parse function and Parser class
Browse files Browse the repository at this point in the history
  • Loading branch information
oleiade committed Jan 29, 2025
1 parent 5c25c64 commit 1c41c35
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 5 deletions.
17 changes: 14 additions & 3 deletions internal/js/modules/k6/experimental/csv/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,10 @@ func (p *Parser) Next() *sobek.Promise {
promise, resolve, reject := promises.New(p.vu)

go func() {
// var records []string
var record any
var done bool
var err error

// records, err = p.reader.Read()
record, err = p.reader.Read()
if err != nil {
if errors.Is(err, io.EOF) {
Expand All @@ -211,7 +209,6 @@ func (p *Parser) Next() *sobek.Promise {

p.currentLine.Add(1)

// resolve(parseResult{Done: done, Value: records})
resolve(parseResult{Done: done, Value: record})
}()

Expand Down Expand Up @@ -246,13 +243,23 @@ type options struct {

// ToLine indicates the line at which to stop reading the CSV file (inclusive).
ToLine null.Int `js:"toLine"`

// Header indicates whether the first line of the CSV file should be used as the header, and whether
// the lines should be returned as objects with the header as keys.
//
// If the header is enabled, the first line of the CSV file will be skipped, and the option is thus
// equivalent to setting `skipFirstLine` to `true`.
//
// The header option is incompatible with a FromLine option value set to a value greater than 0
Header null.Bool `js:"header"`
}

// newDefaultParserOptions creates a new options instance with default values.
func newDefaultParserOptions() options {
return options{
Delimiter: ',',
SkipFirstLine: false,
Header: null.BoolFrom(false),
}
}

Expand Down Expand Up @@ -287,6 +294,10 @@ func newParserOptionsFrom(obj *sobek.Object) (options, error) {
options.ToLine = null.IntFrom(v.ToInteger())
}

if v := obj.Get("header"); v != nil {
options.Header = null.BoolFrom(v.ToBoolean())
}

if options.FromLine.Valid && options.ToLine.Valid && options.FromLine.Int64 >= options.ToLine.Int64 {
return options, fmt.Errorf("fromLine must be less than or equal to toLine")
}
Expand Down
108 changes: 108 additions & 0 deletions internal/js/modules/k6/experimental/csv/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,44 @@ func TestParserConstructor(t *testing.T) {
require.NoError(t, err)
})

t.Run("constructing a parser with both header and skipFirstLine options should fail", func(t *testing.T) {
t.Parallel()

r, err := newConfiguredRuntime(t)
require.NoError(t, err)

// Ensure the testdata.csv file is present on the test filesystem.
r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644)
})

_, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(`
const file = await fs.open(%q);
const parser = new csv.Parser(file, { delimiter: ';', skipFirstLine: true, header: true });
`, testFilePath)))

require.Error(t, err)
})

t.Run("constructing a parser with both the header option and fromLine option greater than 0 should fail", func(t *testing.T) {
t.Parallel()

r, err := newConfiguredRuntime(t)
require.NoError(t, err)

// Ensure the testdata.csv file is present on the test filesystem.
r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644)
})

_, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(`
const file = await fs.open(%q);
const parser = new csv.Parser(file, { delimiter: ';', fromLine: 1, header: true });
`, testFilePath)))

require.Error(t, err)
})

t.Run("constructing a parser without providing a file instance should fail", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -354,6 +392,44 @@ func TestParserNext(t *testing.T) {
require.NoError(t, err)
})

t.Run("next with header option should return records as objects and succeed", func(t *testing.T) {
t.Parallel()

r, err := newConfiguredRuntime(t)
require.NoError(t, err)

// Ensure the testdata.csv file is present on the test filesystem.
r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644)
})

_, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(`
const file = await fs.open(%q);
const parser = new csv.Parser(file, { header: true });
let gotParsedCount = 0;
let { done, value } = await parser.next();
while (!done) {
if (typeof value !== 'object' || value === null || Array.isArray(value)) {
throw new Error("Expected record to be an object, but got " + typeof value);
}
if (Object.keys(value).length !== 6) {
throw new Error("Expected record to have 6 fields, but got " + Object.keys(value).length);
}
gotParsedCount++;
({ done, value } = await parser.next());
}
if (gotParsedCount !== 10) {
throw new Error("Expected to parse 10 records, but got " + gotParsedCount);
}
`, testFilePath)))
require.NoError(t, err)
})

t.Run("calling next on a parser that has reached EOF should return done=true and no value", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -527,6 +603,38 @@ func TestParse(t *testing.T) {

require.NoError(t, err)
})

t.Run("parse respects the header option, returns records as objects and succeeds", func(t *testing.T) {
t.Parallel()

r, err := newConfiguredRuntime(t)
require.NoError(t, err)

// Ensure the testdata.csv file is present on the test filesystem.
r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644)
})

_, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(`
const file = await fs.open(%q);
const csvRecords = await csv.parse(file, { header: true });
if (csvRecords.length !== 10) {
throw new Error("Expected 11 records, but got " + csvRecords.length);
}
for (const record of csvRecords) {
if (typeof record !== 'object' || record === null || Array.isArray(record)) {
throw new Error("Expected record to be an object, but got " + typeof record);
}
if (Object.keys(record).length !== 6) {
throw new Error("Expected record to have 6 fields, but got " + Object.keys(record).length);
}
}
`, testFilePath)))
require.NoError(t, err)
})
}

const initGlobals = `
Expand Down
52 changes: 51 additions & 1 deletion internal/js/modules/k6/experimental/csv/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"io"
"sync/atomic"

"go.k6.io/k6/internal/js/modules/k6/data"
)

// Reader is a CSV reader.
Expand All @@ -18,6 +20,9 @@ type Reader struct {

// options holds the reader's options.
options options

// header stores the column names when header option is enabled.
header []string
}

// NewReaderFrom creates a new CSV reader from the provided io.Reader.
Expand Down Expand Up @@ -52,8 +57,29 @@ func NewReaderFrom(r io.Reader, options options) (*Reader, error) {
skipFirstLineSet = options.SkipFirstLine
fromLineIsPositive = fromLineSet && options.FromLine.Int64 >= 0
toLineIsPositive = toLineSet && options.ToLine.Int64 >= 0
headerEnabled = options.Header.Valid && options.Header.Bool
)

// If header is enabled and we're skipping the first line, return an error.
if headerEnabled && skipFirstLineSet {
return nil, fmt.Errorf("the 'header' option cannot be enabled when 'skipFirstLine' is true")
}

// If header is enabled and fromLine is set to a value greater than 0, return an error.
if headerEnabled && fromLineSet && options.FromLine.Int64 > 0 {
return nil, fmt.Errorf("the 'header' option cannot be enabled when 'fromLine' is set to a value greater than 0")
}

// If header is enabled, read the first line and store it as the header.
if headerEnabled {
header, err := csvParser.Read()
if err != nil {
return nil, fmt.Errorf("failed to read the first line; reason: %w", err)
}
reader.header = header
reader.currentLine.Add(1)
}

// If set, the fromLine option should either be greater than or equal to 0.
if fromLineSet && !fromLineIsPositive {
return nil, fmt.Errorf("the 'fromLine' option must be greater than or equal to 0; got %d", options.FromLine.Int64)
Expand Down Expand Up @@ -96,7 +122,13 @@ func NewReaderFrom(r io.Reader, options options) (*Reader, error) {
return reader, nil
}

// func (r *Reader) Read() ([]string, error) {
// The csv module's read must implement the RecordReader interface.
var _ data.RecordReader = (*Reader)(nil)

// Read reads a record from the CSV file.
//
// If the `header` option is enabled, it will return a map of the record.
// Otherwise, it will return the record as a slice of strings.
func (r *Reader) Read() (any, error) {
toLineSet := r.options.ToLine.Valid

Expand All @@ -112,5 +144,23 @@ func (r *Reader) Read() (any, error) {

r.currentLine.Add(1)

// If header option is enabled, return a map of the record.
if r.options.Header.Valid && r.options.Header.Bool {
if r.header == nil {
return nil, fmt.Errorf("the 'header' option is enabled, but no header was found")
}

if len(record) != len(r.header) {
return nil, fmt.Errorf("record length (%d) doesn't match header length (%d)", len(record), len(r.header))
}

recordMap := make(map[string]string)
for i, value := range record {
recordMap[r.header[i]] = value
}

return recordMap, nil
}

return record, nil
}
65 changes: 64 additions & 1 deletion internal/js/modules/k6/experimental/csv/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,39 @@ func TestNewReaderFrom(t *testing.T) {
require.Error(t, err)
})

t.Run("instantiating a new reader with header option enabled and skipFirstLine option enabled should fail", func(t *testing.T) {
t.Parallel()

_, err := NewReaderFrom(
strings.NewReader("lastname,firstname,composer,born,died,dates\n"),
options{Header: null.NewBool(true, true), SkipFirstLine: true},
)
require.Error(t, err)
})

t.Run("instantiating a new reader with header option enabled and fromLine option set to a value greater than 0 should fail", func(t *testing.T) {
t.Parallel()

_, err := NewReaderFrom(
strings.NewReader("lastname,firstname,composer,born,died,dates\n"),
options{Header: null.NewBool(true, true), FromLine: null.NewInt(1, true)},
)
require.Error(t, err)
})

t.Run("instantiating a new reader with header option enabled and compatible options should succeed", func(t *testing.T) {
t.Parallel()

r, err := NewReaderFrom(
strings.NewReader("lastname,firstname,composer,born,died,dates\n"),
options{Header: null.NewBool(true, true)},
)
require.NoError(t, err)
assert.NotNil(t, r.header)
assert.Equal(t, []string{"lastname", "firstname", "composer", "born", "died", "dates"}, r.header)
assert.Equal(t, r.currentLine.Load(), int64(1))
})

t.Run("skipFirstLine option skips first line and succeeds", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -110,7 +143,7 @@ func TestNewReaderFrom(t *testing.T) {
func TestReader_Read(t *testing.T) {
t.Parallel()

t.Run("default behavior should return all lines and succeed", func(t *testing.T) {
t.Run("default behavior should return all lines as slices of strings and succeed", func(t *testing.T) {
t.Parallel()

const csvTestData = "lastname,firstname,composer,born,died,dates\n" +
Expand Down Expand Up @@ -138,6 +171,36 @@ func TestReader_Read(t *testing.T) {
require.ErrorIs(t, err, io.EOF)
})

t.Run("header option should lead to lines being returned as maps and succeed", func(t *testing.T) {
t.Parallel()

const csvTestData = "lastname,firstname,composer,born,died,dates\n" +
"Scarlatti,Domenico,Domenico Scarlatti,1685,1757,1685–1757\n"

r, err := NewReaderFrom(
strings.NewReader(csvTestData),
options{Header: null.NewBool(true, true)},
)
require.NoError(t, err)

// Header line, if present should be ignored and records should be returned as maps
firstRecord, err := r.Read()
require.NoError(t, err)
assert.Equal(t, map[string]string{
"lastname": "Scarlatti",
"firstname": "Domenico",
"composer": "Domenico Scarlatti",
"born": "1685",
"died": "1757",
"dates": "1685–1757",
}, firstRecord)

// As we've reached EOF, we should get EOF
_, err = r.Read()
require.Error(t, err)
require.ErrorIs(t, err, io.EOF)
})

t.Run("toLine option returns EOF when toLine option is reached and succeeds", func(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 1c41c35

Please sign in to comment.