Skip to content

Commit

Permalink
Add support for parsing empty arrays from parquet into JSON.
Browse files Browse the repository at this point in the history
  • Loading branch information
nicktobey committed Jan 16, 2025
1 parent 6c7a3c9 commit 63eddeb
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 6 deletions.
36 changes: 30 additions & 6 deletions go/libraries/doltcore/table/typed/parquet/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ type ParquetReader struct {
fileData map[string][]interface{}
// rLevels indicate whether a value in a column is a repeat of a repeated type.
// We only include these for repeated fields.
rLevels map[string][]int32
rLevels map[string][]int32
// dLevels are used for interpreting null values by indicating the deepest level in
// a nested field that's defined.
dLevels map[string][]int32
columnName []string
}

Expand Down Expand Up @@ -82,6 +85,7 @@ func NewParquetReader(vrw types.ValueReadWriter, fr source.ParquetFile, sche sch
// TODO : need to solve for getting single row data in readRow (storing all columns data in memory right now)
data := make(map[string][]interface{})
rLevels := make(map[string][]int32)
dLevels := make(map[string][]int32)
rowReadCounters := make(map[string]int)
var colName []string
for _, col := range columns {
Expand All @@ -96,14 +100,15 @@ func NewParquetReader(vrw types.ValueReadWriter, fr source.ParquetFile, sche sch
}
return nil, fmt.Errorf("cannot read column: %s Column not found", col.Name)
}
colData, rLevel, _, cErr := pr.ReadColumnByPath(resolvedColumnName, num)
colData, rLevel, dLevel, cErr := pr.ReadColumnByPath(resolvedColumnName, num)
if cErr != nil {
return nil, fmt.Errorf("cannot read column: %s", cErr.Error())
}
data[col.Name] = colData
if isRepeated {
rLevels[col.Name] = rLevel
}
dLevels[col.Name] = dLevel
rowReadCounters[col.Name] = 0
colName = append(colName, col.Name)
}
Expand All @@ -118,6 +123,7 @@ func NewParquetReader(vrw types.ValueReadWriter, fr source.ParquetFile, sche sch
rowReadCounters: rowReadCounters,
fileData: data,
rLevels: rLevels,
dLevels: dLevels,
columnName: colName,
}, nil
}
Expand Down Expand Up @@ -229,19 +235,37 @@ func (pr *ParquetReader) ReadSqlRow(ctx context.Context) (sql.Row, error) {
}
var val interface{}
rLevels, isRepeated := pr.rLevels[col.Name]
if !isRepeated {
val = readVal()
} else {
dLevels, _ := pr.dLevels[col.Name]
readVals := func() (val interface{}) {
var vals []interface{}
for {
dLevel := dLevels[rowReadCounter]
subVal := readVal()
if subVal == nil {
// dLevels tells us how to interpret this nil value:
// 0 -> the column value is NULL
// 1 -> the column exists but is empty
// 2 -> the column contains an empty value
// 3+ -> the column contains a non-empty value
switch dLevel {
case 0:
return nil
case 1:
return []interface{}{}
}
}
vals = append(vals, subVal)
// an rLevel of 0 marks the start of a new record.
if rowReadCounter >= len(rLevels) || rLevels[rowReadCounter] == 0 {
break
}
}
val = vals
return vals
}
if !isRepeated {
val = readVal()
} else {
val = readVals()
}

pr.rowReadCounters[col.Name] = rowReadCounter
Expand Down
Binary file not shown.
1 change: 1 addition & 0 deletions integration-tests/bats/helper/parquet/sequences.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create table sequences(`pk` int primary key, `name` varchar(20), `embeddings` json);
19 changes: 19 additions & 0 deletions integration-tests/bats/import-create-tables.bats
Original file line number Diff line number Diff line change
Expand Up @@ -930,4 +930,23 @@ DELIM
[[ "$output" =~ "text" ]] || false
[[ "$output" =~ "hello foo" ]] || false
[[ "$output" =~ "hello world" ]] || false
}

@test "import-create-tables: import sequences as JSON arrays" {
# The file strings.parquet uses a different name for the root column than the one generated by `dolt table export`,
# but Dolt should still be able to import it.
run dolt table import -c -s `batshelper parquet/sequences.sql` sequences `batshelper parquet/sequences.parquet`
[ "$status" -eq 0 ]

dolt sql -r csv -q "select * from sequences;"
run dolt sql -r csv -q "select * from sequences;"
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 7 ]
[[ "$output" =~ '1,empty,[]' ]] || false
[[ "$output" =~ "2,single,[1]" ]] || false
[[ "$output" =~ "3,null," ]] || false
[[ "$output" =~ '4,double,"[2,3]"' ]] || false
[[ "$output" =~ '5,contains null,"[4,null]"' ]] || false
[[ "$output" =~ '6,empty,[]' ]] || false

}

0 comments on commit 63eddeb

Please sign in to comment.