From 63eddeb1f3e813d2b9c6a5310c94458e03571363 Mon Sep 17 00:00:00 2001 From: Nick Tobey Date: Wed, 15 Jan 2025 16:10:48 -0800 Subject: [PATCH] Add support for parsing empty arrays from parquet into JSON. --- .../doltcore/table/typed/parquet/reader.go | 36 +++++++++++++++--- .../bats/helper/parquet/sequences.parquet | Bin 0 -> 2070 bytes .../bats/helper/parquet/sequences.sql | 1 + .../bats/import-create-tables.bats | 19 +++++++++ 4 files changed, 50 insertions(+), 6 deletions(-) create mode 100644 integration-tests/bats/helper/parquet/sequences.parquet create mode 100644 integration-tests/bats/helper/parquet/sequences.sql diff --git a/go/libraries/doltcore/table/typed/parquet/reader.go b/go/libraries/doltcore/table/typed/parquet/reader.go index 4ba3f0d68e7..e5e2946f0c9 100644 --- a/go/libraries/doltcore/table/typed/parquet/reader.go +++ b/go/libraries/doltcore/table/typed/parquet/reader.go @@ -50,7 +50,10 @@ type ParquetReader struct { fileData map[string][]interface{} // rLevels indicate whether a value in a column is a repeat of a repeated type. // We only include these for repeated fields. - rLevels map[string][]int32 + rLevels map[string][]int32 + // dLevels are used for interpreting null values by indicating the deepest level in + // a nested field that's defined. + dLevels map[string][]int32 columnName []string } @@ -82,6 +85,7 @@ func NewParquetReader(vrw types.ValueReadWriter, fr source.ParquetFile, sche sch // TODO : need to solve for getting single row data in readRow (storing all columns data in memory right now) data := make(map[string][]interface{}) rLevels := make(map[string][]int32) + dLevels := make(map[string][]int32) rowReadCounters := make(map[string]int) var colName []string for _, col := range columns { @@ -96,7 +100,7 @@ func NewParquetReader(vrw types.ValueReadWriter, fr source.ParquetFile, sche sch } return nil, fmt.Errorf("cannot read column: %s Column not found", col.Name) } - colData, rLevel, _, cErr := pr.ReadColumnByPath(resolvedColumnName, num) + colData, rLevel, dLevel, cErr := pr.ReadColumnByPath(resolvedColumnName, num) if cErr != nil { return nil, fmt.Errorf("cannot read column: %s", cErr.Error()) } @@ -104,6 +108,7 @@ func NewParquetReader(vrw types.ValueReadWriter, fr source.ParquetFile, sche sch if isRepeated { rLevels[col.Name] = rLevel } + dLevels[col.Name] = dLevel rowReadCounters[col.Name] = 0 colName = append(colName, col.Name) } @@ -118,6 +123,7 @@ func NewParquetReader(vrw types.ValueReadWriter, fr source.ParquetFile, sche sch rowReadCounters: rowReadCounters, fileData: data, rLevels: rLevels, + dLevels: dLevels, columnName: colName, }, nil } @@ -229,19 +235,37 @@ func (pr *ParquetReader) ReadSqlRow(ctx context.Context) (sql.Row, error) { } var val interface{} rLevels, isRepeated := pr.rLevels[col.Name] - if !isRepeated { - val = readVal() - } else { + dLevels, _ := pr.dLevels[col.Name] + readVals := func() (val interface{}) { var vals []interface{} for { + dLevel := dLevels[rowReadCounter] subVal := readVal() + if subVal == nil { + // dLevels tells us how to interpret this nil value: + // 0 -> the column value is NULL + // 1 -> the column exists but is empty + // 2 -> the column contains an empty value + // 3+ -> the column contains a non-empty value + switch dLevel { + case 0: + return nil + case 1: + return []interface{}{} + } + } vals = append(vals, subVal) // an rLevel of 0 marks the start of a new record. if rowReadCounter >= len(rLevels) || rLevels[rowReadCounter] == 0 { break } } - val = vals + return vals + } + if !isRepeated { + val = readVal() + } else { + val = readVals() } pr.rowReadCounters[col.Name] = rowReadCounter diff --git a/integration-tests/bats/helper/parquet/sequences.parquet b/integration-tests/bats/helper/parquet/sequences.parquet new file mode 100644 index 0000000000000000000000000000000000000000..efb895103fde7d0ca174b2ad260456ffb691a28b GIT binary patch literal 2070 zcmc&$&2Jh<6rWudym7EZqt0rnI3T1TADje|K$D0n%whxUV#nYb@Lt+wcfo6B7s4*4 zZd9qq9{QK`+(XYj^w5t>|AroW$f>HT9{Oeg+eCJxO0D_?GjHF=?|t%dKw5b$;ZJbA zf#Vq6K`18)=$3%QTWb=EY{*F3kV{cCr`Y1g2rKx#E$~R-7>=b>d8tulVPT`7&12w9qy{|Rt%3+g~vL~tbFLI|EI z_8WMe*Ht`WRXE-_JZH%Uu0NoT1e`ymRJhZ%N7Ej4Hb7vF{Lpm$Ak8#ymRdxcvAM6h zv0@00S1PNb>^IRAjxQ>ceA9DyZNCf3UqbdfDf?|Cbu-qr#guWWfn{Dy8lk8ly?!2D zt0Po~&_HsTUno(AoHIDP1#MW4XQ@%;IrsXxB5yAEMEOz*+IPyVcaeh#JAWRDkzY~v z2RVh;=cPPZcf6is+t9v1qHYkbIn?nSKZNQMFg7;1uh+7_u%(#Cx{7~8)a%Mk(gIRr zfpy||CXT@{?}}_e5RL=G`*{~NQV^mbs#kKtB91jmOf0E9KeiF;;IhmUDe+_~S^6&* z?_VDM|ImKr&!`g5%F1uicM(c%skcw2g8_`ZZ(5G}qwp%@`u$PnS^8C`@0j6q;spF* zJmi-ho{t^iz$z?0&TOSKA1@_slTMw?=OALAh*j$#oIr>h%rCc&J1xSl0`%#~ygA@S z4u{UmspDIa1`tx?q}6)Sc{blP^{40qMxT-jA*2a!MjsOLm}mo1C1gM<11{6F75$6} zcP{l!P_t(sj>7diAu|FBxH$;ignsnOhSFpYD*JMMYLnSMGN z0Ph){YR>NNQtMnV>h-|2YJvO0)!x^=R?g{al-cPXo!Z;SIo+KNbo%sCPK}dZy-jsL z7wDDU>2W8|?E!aa*XhhS1kB3rXLg;%wA@C8xZOvsf>k?$7{qOR)xho?x&1@H_1fXF z*B%-*IyD~cEpjmEakrJXJZAr9PDapsSk3L9Hs6lBlJ7pXx^3F?_95r|(|$L{S_l(i z{nJLQKdjND{t*oA2{1c*yq|1(SNjjc1A;jcVs!GZ(P<4?pDn<8c>u1(3;%6CG;KLg z=D~;!Q5)N}b_h(3%?}{q=2Rj37yi|^`15|xU79R!ZVJcI0