Skip to content

Commit

Permalink
bulker: redshift: use SUPER type for JSON objects
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Sep 10, 2024
1 parent 127ad32 commit 1fe4111
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 8 deletions.
4 changes: 1 addition & 3 deletions bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ func (ps *AbstractSQLStream) preprocess(object types.Object) (*Table, types.Obje
if !ps.schemaOptions.IsEmpty() {
notFlatteningKeys = types2.NewSet[string]()
for _, field := range ps.schemaOptions.Fields {
if field.Type == types.JSON {
notFlatteningKeys.Put(field.Name)
}
notFlatteningKeys.Put(field.Name)
}
}
batchHeader, processedObject, err := ProcessEvents(ps.tableName, object, ps.customTypes, ps.nameTransformer, ps.omitNils, ps.sqlAdapter.StringifyObjects(), notFlatteningKeys)
Expand Down
14 changes: 12 additions & 2 deletions bulkerlib/implementations/sql/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var (
types2.FLOAT64: {"double precision"},
types2.TIMESTAMP: {"timestamp with time zone", "timestamp", "timestamp without time zone"},
types2.BOOL: {"boolean"},
types2.JSON: {"character varying(65535)"},
types2.JSON: {"super"},
types2.UNKNOWN: {"character varying(65535)"},
}
)
Expand Down Expand Up @@ -85,11 +85,21 @@ func NewRedshift(bulkerConfig bulker.Config) (bulker.Bulker, error) {
if err != nil {
return nil, err
}
typecastFunc := func(placeholder string, column types2.SQLColumn) string {
if column.DataType == types2.JSON || column.Type == "super" {
return fmt.Sprintf("JSON_PARSE(%s)", placeholder)
}
if column.Override {
return placeholder + "::" + column.Type
}
return placeholder
}
r := &Redshift{Postgres: postgres.(*Postgres), s3Config: &config.S3OptionConfig}
r.batchFileFormat = types2.FileFormatCSV
r.batchFileCompression = types2.FileCompressionGZIP
r._columnDDLFunc = redshiftColumnDDL
r.typesMapping, r.reverseTypesMapping = InitTypes(redshiftTypes, false)
r.typecastFunc = typecastFunc
r.typesMapping, r.reverseTypesMapping = InitTypes(redshiftTypes, true)
r.tableHelper = NewTableHelper(127, '"')
r.temporaryTables = true
r.renameToSchemaless = true
Expand Down
79 changes: 77 additions & 2 deletions bulkerlib/implementations/sql/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,8 @@ func TestJSONTypes(t *testing.T) {
{"id", types2.SQLColumn{Type: "bigint"}},
{"name", types2.SQLColumn{Type: "character varying(65535)"}},
{"json1_nested", types2.SQLColumn{Type: "bigint"}},
{"json2", types2.SQLColumn{Type: "character varying(65535)"}},
{"array1", types2.SQLColumn{Type: "character varying(65535)"}},
{"json2", types2.SQLColumn{Type: "super"}},
{"array1", types2.SQLColumn{Type: "super"}},
{"json1_nested_nested", types2.SQLColumn{Type: "bigint"}},
}),
},
Expand Down Expand Up @@ -715,3 +715,78 @@ func TestTransactionalJSON(t *testing.T) {
sequentialGroup.Add(1)
}
}

func TestTransactionalJSONnString(t *testing.T) {
t.Parallel()
tests := []bulkerTestConfig{
{
//deletes any table leftovers from previous tests
name: "dummy_test_table_cleanup",
tableName: "transactional_json_string_test",
modes: []bulker.BulkMode{bulker.Batch},
dataFile: "test_data/empty.ndjson",
configIds: allBulkerConfigs,
},
{
name: "added_columns_first_run",
tableName: "transactional_json_string_test",
modes: []bulker.BulkMode{bulker.Batch},
leaveResultingTable: true,
dataFile: "test_data/types_json_part1.ndjson",
expectedTable: ExpectedTable{
Columns: justColumns("_timestamp", "id", "name", "json1", "array1"),
},
expectedRowsCount: 2,
streamOptions: []bulker.StreamOption{bulker.WithSchema(types2.Schema{
Name: "d",
Fields: []types2.SchemaField{
{Name: "_timestamp", Type: types2.TIMESTAMP},
{Name: "id", Type: types2.INT64},
{Name: "name", Type: types2.STRING},
{Name: "json1", Type: types2.STRING},
{Name: "array1", Type: types2.STRING},
},
})},
configIds: allBulkerConfigs,
},
{
name: "added_columns_second_run",
tableName: "transactional_json_string_test",
modes: []bulker.BulkMode{bulker.Batch},
leaveResultingTable: true,
dataFile: "test_data/types_json_part2.ndjson",
expectedTable: ExpectedTable{
Columns: justColumns("_timestamp", "id", "name", "json1", "array1"),
},
expectedRowsCount: 4,
streamOptions: []bulker.StreamOption{bulker.WithSchema(types2.Schema{
Name: "d",
Fields: []types2.SchemaField{
{Name: "_timestamp", Type: types2.TIMESTAMP},
{Name: "id", Type: types2.INT64},
{Name: "name", Type: types2.STRING},
{Name: "json1", Type: types2.JSON},
{Name: "array1", Type: types2.JSON},
},
})},
configIds: allBulkerConfigs,
},
{
name: "dummy_test_table_cleanup",
tableName: "transactional_json_string_test",
modes: []bulker.BulkMode{bulker.Batch},
dataFile: "test_data/empty.ndjson",
configIds: allBulkerConfigs,
},
}
sequentialGroup := sync.WaitGroup{}
sequentialGroup.Add(1)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
runTestConfig(t, tt, testStream)
sequentialGroup.Done()
})
sequentialGroup.Wait()
sequentialGroup.Add(1)
}
}
2 changes: 1 addition & 1 deletion bulkerlib/types/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (cm *CSVMarshaller) Marshal(object ...Object) error {
v := obj.GetN(field)
strValue := ""
if v == nil {
strValue = "\\N"
strValue = ""
} else {
switch v := v.(type) {
case string:
Expand Down

0 comments on commit 1fe4111

Please sign in to comment.