Skip to content

Commit

Permalink
bulker: improved test
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Sep 11, 2024
1 parent ce7cf6f commit cc4460c
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 34 deletions.
4 changes: 1 addition & 3 deletions bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ func (ps *AbstractSQLStream) preprocess(object types.Object) (*Table, types.Obje
if !ps.schemaOptions.IsEmpty() {
notFlatteningKeys = types2.NewSet[string]()
for _, field := range ps.schemaOptions.Fields {
if field.Type == types.JSON {
notFlatteningKeys.Put(field.Name)
}
notFlatteningKeys.Put(field.Name)
}
}
batchHeader, processedObject, err := ProcessEvents(ps.tableName, object, ps.customTypes, ps.nameTransformer, ps.omitNils, ps.sqlAdapter.StringifyObjects(), notFlatteningKeys)
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/test_data/types_json.ndjson
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"_timestamp": "2022-08-18T14:17:22.375Z", "id":1, "name": "a", "json1": {"nested": 1}, "json2": {"nested": 1}, "array1": ["1","2","3"]}
{"_timestamp": "2022-08-18T14:17:22.375Z", "id":2, "name": "b", "json1": {"nested": {"nested": 2}}, "json2": {"nested": {"nested": 2}}, "array1": [1,2,3]}
{"_timestamp": "2022-08-18T14:17:22.375Z", "id":2, "name": "b", "json1": {"nested2": {"nested": 2}}, "json2": {"nested": {"nested": 2}}, "array1": [1,2,3]}
{"_timestamp": "2022-08-18T14:17:22.375Z", "id":3, "name": "c", "json1": {"nested": 1}, "json2": {"nested": 1},"array1": [{"nested": 1},{"nested": 2},{"nested": 3}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"_timestamp": "2022-08-18T14:17:22.375Z", "id":5, "name": "e", "json1": {"nested": 5}, "array1": ["1","2","3"]}
157 changes: 127 additions & 30 deletions bulkerlib/implementations/sql/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,13 +429,13 @@ func TestJSONTypes(t *testing.T) {
{"json1_nested", types2.SQLColumn{Type: "bigint"}},
{"json2", types2.SQLColumn{Type: "jsonb"}},
{"array1", types2.SQLColumn{Type: "jsonb"}},
{"json1_nested_nested", types2.SQLColumn{Type: "bigint"}},
{"json1_nested2_nested", types2.SQLColumn{Type: "bigint"}},
}),
},
expectedRows: []map[string]any{
{"_timestamp": constantTime, "id": 1, "name": "a", "json1_nested": 1, "json2": "{\"nested\": 1}", "array1": "[\"1\", \"2\", \"3\"]", "json1_nested_nested": nil},
{"_timestamp": constantTime, "id": 2, "name": "b", "json1_nested": nil, "json2": "{\"nested\": {\"nested\": 2}}", "array1": "[1, 2, 3]", "json1_nested_nested": 2},
{"_timestamp": constantTime, "id": 3, "name": "c", "json1_nested": 1, "json2": "{\"nested\": 1}", "array1": "[{\"nested\": 1}, {\"nested\": 2}, {\"nested\": 3}]", "json1_nested_nested": nil},
{"_timestamp": constantTime, "id": 1, "name": "a", "json1_nested": 1, "json2": "{\"nested\": 1}", "array1": "[\"1\", \"2\", \"3\"]", "json1_nested2_nested": nil},
{"_timestamp": constantTime, "id": 2, "name": "b", "json1_nested": nil, "json2": "{\"nested\": {\"nested\": 2}}", "array1": "[1, 2, 3]", "json1_nested2_nested": 2},
{"_timestamp": constantTime, "id": 3, "name": "c", "json1_nested": 1, "json2": "{\"nested\": 1}", "array1": "[{\"nested\": 1}, {\"nested\": 2}, {\"nested\": 3}]", "json1_nested2_nested": nil},
},
streamOptions: []bulker.StreamOption{bulker.WithSchema(types2.Schema{
Name: "d",
Expand All @@ -446,7 +446,7 @@ func TestJSONTypes(t *testing.T) {
{Name: "json1_nested", Type: types2.INT64},
{Name: "json2", Type: types2.JSON},
{Name: "array1", Type: types2.JSON},
{Name: "json1_nested_nested", Type: types2.INT64},
{Name: "json1_nested2_nested", Type: types2.INT64},
},
})},
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId}),
Expand All @@ -465,13 +465,13 @@ func TestJSONTypes(t *testing.T) {
{"json1_nested", types2.SQLColumn{Type: "bigint"}},
{"json2", types2.SQLColumn{Type: "json"}},
{"array1", types2.SQLColumn{Type: "json"}},
{"json1_nested_nested", types2.SQLColumn{Type: "bigint"}},
{"json1_nested2_nested", types2.SQLColumn{Type: "bigint"}},
}),
},
expectedRows: []map[string]any{
{"_timestamp": constantTime, "id": 1, "name": "a", "json1_nested": 1, "json2": "{\"nested\": 1}", "array1": "[\"1\", \"2\", \"3\"]", "json1_nested_nested": nil},
{"_timestamp": constantTime, "id": 2, "name": "b", "json1_nested": nil, "json2": "{\"nested\": {\"nested\": 2}}", "array1": "[1, 2, 3]", "json1_nested_nested": 2},
{"_timestamp": constantTime, "id": 3, "name": "c", "json1_nested": 1, "json2": "{\"nested\": 1}", "array1": "[{\"nested\": 1}, {\"nested\": 2}, {\"nested\": 3}]", "json1_nested_nested": nil},
{"_timestamp": constantTime, "id": 1, "name": "a", "json1_nested": 1, "json2": "{\"nested\": 1}", "array1": "[\"1\", \"2\", \"3\"]", "json1_nested2_nested": nil},
{"_timestamp": constantTime, "id": 2, "name": "b", "json1_nested": nil, "json2": "{\"nested\": {\"nested\": 2}}", "array1": "[1, 2, 3]", "json1_nested2_nested": 2},
{"_timestamp": constantTime, "id": 3, "name": "c", "json1_nested": 1, "json2": "{\"nested\": 1}", "array1": "[{\"nested\": 1}, {\"nested\": 2}, {\"nested\": 3}]", "json1_nested2_nested": nil},
},
streamOptions: []bulker.StreamOption{bulker.WithSchema(types2.Schema{
Name: "d",
Expand All @@ -482,7 +482,7 @@ func TestJSONTypes(t *testing.T) {
{Name: "json1_nested", Type: types2.INT64},
{Name: "json2", Type: types2.JSON},
{Name: "array1", Type: types2.JSON},
{Name: "json1_nested_nested", Type: types2.INT64},
{Name: "json1_nested2_nested", Type: types2.INT64},
},
})},
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{MySQLBulkerTypeId}),
Expand All @@ -501,13 +501,13 @@ func TestJSONTypes(t *testing.T) {
{"json1_nested", types2.SQLColumn{Type: "bigint"}},
{"json2", types2.SQLColumn{Type: "super"}},
{"array1", types2.SQLColumn{Type: "super"}},
{"json1_nested_nested", types2.SQLColumn{Type: "bigint"}},
{"json1_nested2_nested", types2.SQLColumn{Type: "bigint"}},
}),
},
expectedRows: []map[string]any{
{"_timestamp": constantTime, "id": 1, "name": "a", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[\"1\",\"2\",\"3\"]", "json1_nested_nested": nil},
{"_timestamp": constantTime, "id": 2, "name": "b", "json1_nested": nil, "json2": "{\"nested\":{\"nested\":2}}", "array1": "[1,2,3]", "json1_nested_nested": 2},
{"_timestamp": constantTime, "id": 3, "name": "c", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[{\"nested\":1},{\"nested\":2},{\"nested\":3}]", "json1_nested_nested": nil},
{"_timestamp": constantTime, "id": 1, "name": "a", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[\"1\",\"2\",\"3\"]", "json1_nested2_nested": nil},
{"_timestamp": constantTime, "id": 2, "name": "b", "json1_nested": nil, "json2": "{\"nested\":{\"nested\":2}}", "array1": "[1,2,3]", "json1_nested2_nested": 2},
{"_timestamp": constantTime, "id": 3, "name": "c", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[{\"nested\":1},{\"nested\":2},{\"nested\":3}]", "json1_nested2_nested": nil},
},
streamOptions: []bulker.StreamOption{bulker.WithSchema(types2.Schema{
Name: "d",
Expand All @@ -518,7 +518,7 @@ func TestJSONTypes(t *testing.T) {
{Name: "json1_nested", Type: types2.INT64},
{Name: "json2", Type: types2.JSON},
{Name: "array1", Type: types2.JSON},
{Name: "json1_nested_nested", Type: types2.INT64},
{Name: "json1_nested2_nested", Type: types2.INT64},
},
})},
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{RedshiftBulkerTypeId, RedshiftBulkerTypeId + "_serverless"}),
Expand All @@ -537,13 +537,13 @@ func TestJSONTypes(t *testing.T) {
{"json1_nested", types2.SQLColumn{Type: "Int64"}},
{"json2", types2.SQLColumn{Type: "String"}},
{"array1", types2.SQLColumn{Type: "String"}},
{"json1_nested_nested", types2.SQLColumn{Type: "Int64"}},
{"json1_nested2_nested", types2.SQLColumn{Type: "Int64"}},
}),
},
expectedRows: []map[string]any{
{"_timestamp": constantTime, "id": 1, "name": "a", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[\"1\",\"2\",\"3\"]", "json1_nested_nested": 0},
{"_timestamp": constantTime, "id": 2, "name": "b", "json1_nested": 0, "json2": "{\"nested\":{\"nested\":2}}", "array1": "[1,2,3]", "json1_nested_nested": 2},
{"_timestamp": constantTime, "id": 3, "name": "c", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[{\"nested\":1},{\"nested\":2},{\"nested\":3}]", "json1_nested_nested": 0},
{"_timestamp": constantTime, "id": 1, "name": "a", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[\"1\",\"2\",\"3\"]", "json1_nested2_nested": 0},
{"_timestamp": constantTime, "id": 2, "name": "b", "json1_nested": 0, "json2": "{\"nested\":{\"nested\":2}}", "array1": "[1,2,3]", "json1_nested2_nested": 2},
{"_timestamp": constantTime, "id": 3, "name": "c", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[{\"nested\":1},{\"nested\":2},{\"nested\":3}]", "json1_nested2_nested": 0},
},
streamOptions: []bulker.StreamOption{bulker.WithSchema(types2.Schema{
Name: "d",
Expand All @@ -554,7 +554,7 @@ func TestJSONTypes(t *testing.T) {
{Name: "json1_nested", Type: types2.INT64},
{Name: "json2", Type: types2.JSON},
{Name: "array1", Type: types2.JSON},
{Name: "json1_nested_nested", Type: types2.INT64},
{Name: "json1_nested2_nested", Type: types2.INT64},
},
})},
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{ClickHouseBulkerTypeId}),
Expand All @@ -573,13 +573,13 @@ func TestJSONTypes(t *testing.T) {
{"json1_nested", types2.SQLColumn{Type: "NUMBER(38,0)"}},
{"json2", types2.SQLColumn{Type: "VARCHAR(16777216)"}},
{"array1", types2.SQLColumn{Type: "VARCHAR(16777216)"}},
{"json1_nested_nested", types2.SQLColumn{Type: "NUMBER(38,0)"}},
{"json1_nested2_nested", types2.SQLColumn{Type: "NUMBER(38,0)"}},
}),
},
expectedRows: []map[string]any{
{"_timestamp": constantTime, "id": 1, "name": "a", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[\"1\",\"2\",\"3\"]", "json1_nested_nested": nil},
{"_timestamp": constantTime, "id": 2, "name": "b", "json1_nested": nil, "json2": "{\"nested\":{\"nested\":2}}", "array1": "[1,2,3]", "json1_nested_nested": 2},
{"_timestamp": constantTime, "id": 3, "name": "c", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[{\"nested\":1},{\"nested\":2},{\"nested\":3}]", "json1_nested_nested": nil},
{"_timestamp": constantTime, "id": 1, "name": "a", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[\"1\",\"2\",\"3\"]", "json1_nested2_nested": nil},
{"_timestamp": constantTime, "id": 2, "name": "b", "json1_nested": nil, "json2": "{\"nested\":{\"nested\":2}}", "array1": "[1,2,3]", "json1_nested2_nested": 2},
{"_timestamp": constantTime, "id": 3, "name": "c", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[{\"nested\":1},{\"nested\":2},{\"nested\":3}]", "json1_nested2_nested": nil},
},
streamOptions: []bulker.StreamOption{bulker.WithSchema(types2.Schema{
Name: "d",
Expand All @@ -590,7 +590,7 @@ func TestJSONTypes(t *testing.T) {
{Name: "json1_nested", Type: types2.INT64},
{Name: "json2", Type: types2.JSON},
{Name: "array1", Type: types2.JSON},
{Name: "json1_nested_nested", Type: types2.INT64},
{Name: "json1_nested2_nested", Type: types2.INT64},
},
})},
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{SnowflakeBulkerTypeId}),
Expand All @@ -609,13 +609,13 @@ func TestJSONTypes(t *testing.T) {
{"json1_nested", types2.SQLColumn{Type: "INTEGER"}},
{"json2", types2.SQLColumn{Type: "JSON"}},
{"array1", types2.SQLColumn{Type: "JSON"}},
{"json1_nested_nested", types2.SQLColumn{Type: "INTEGER"}},
{"json1_nested2_nested", types2.SQLColumn{Type: "INTEGER"}},
}),
},
expectedRows: []map[string]any{
{"_timestamp": constantTime, "id": 1, "name": "a", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[\"1\",\"2\",\"3\"]", "json1_nested_nested": nil},
{"_timestamp": constantTime, "id": 2, "name": "b", "json1_nested": nil, "json2": "{\"nested\":{\"nested\":2}}", "array1": "[1,2,3]", "json1_nested_nested": 2},
{"_timestamp": constantTime, "id": 3, "name": "c", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[{\"nested\":1},{\"nested\":2},{\"nested\":3}]", "json1_nested_nested": nil},
{"_timestamp": constantTime, "id": 1, "name": "a", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[\"1\",\"2\",\"3\"]", "json1_nested2_nested": nil},
{"_timestamp": constantTime, "id": 2, "name": "b", "json1_nested": nil, "json2": "{\"nested\":{\"nested\":2}}", "array1": "[1,2,3]", "json1_nested2_nested": 2},
{"_timestamp": constantTime, "id": 3, "name": "c", "json1_nested": 1, "json2": "{\"nested\":1}", "array1": "[{\"nested\":1},{\"nested\":2},{\"nested\":3}]", "json1_nested2_nested": nil},
},
streamOptions: []bulker.StreamOption{bulker.WithSchema(types2.Schema{
Name: "d",
Expand All @@ -626,7 +626,7 @@ func TestJSONTypes(t *testing.T) {
{Name: "json1_nested", Type: types2.INT64},
{Name: "json2", Type: types2.JSON},
{Name: "array1", Type: types2.JSON},
{Name: "json1_nested_nested", Type: types2.INT64},
{Name: "json1_nested2_nested", Type: types2.INT64},
},
})},
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{BigqueryBulkerTypeId}),
Expand Down Expand Up @@ -715,3 +715,100 @@ func TestTransactionalJSON(t *testing.T) {
sequentialGroup.Add(1)
}
}

func TestTransactionalJSONnString(t *testing.T) {
t.Parallel()
tests := []bulkerTestConfig{
{
//deletes any table leftovers from previous tests
name: "dummy_test_table_cleanup",
tableName: "transactional_json_string_test",
modes: []bulker.BulkMode{bulker.Batch},
dataFile: "test_data/empty.ndjson",
configIds: allBulkerConfigs,
},
{
name: "added_columns_first_run",
tableName: "transactional_json_string_test",
modes: []bulker.BulkMode{bulker.Batch},
leaveResultingTable: true,
dataFile: "test_data/types_json_part1.ndjson",
expectedTable: ExpectedTable{
Columns: justColumns("_timestamp", "id", "name", "json1", "array1"),
},
expectedRowsCount: 2,
streamOptions: []bulker.StreamOption{bulker.WithSchema(types2.Schema{
Name: "d",
Fields: []types2.SchemaField{
{Name: "_timestamp", Type: types2.TIMESTAMP},
{Name: "id", Type: types2.INT64},
{Name: "name", Type: types2.STRING},
{Name: "json1", Type: types2.STRING},
{Name: "array1", Type: types2.STRING},
},
})},
configIds: allBulkerConfigs,
},
{
name: "added_columns_second_run",
tableName: "transactional_json_string_test",
modes: []bulker.BulkMode{bulker.Batch},
leaveResultingTable: true,
dataFile: "test_data/types_json_part2.ndjson",
expectedTable: ExpectedTable{
Columns: justColumns("_timestamp", "id", "name", "json1", "array1"),
},
expectedRowsCount: 4,
streamOptions: []bulker.StreamOption{bulker.WithSchema(types2.Schema{
Name: "d",
Fields: []types2.SchemaField{
{Name: "_timestamp", Type: types2.TIMESTAMP},
{Name: "id", Type: types2.INT64},
{Name: "name", Type: types2.STRING},
{Name: "json1", Type: types2.JSON},
{Name: "array1", Type: types2.JSON},
},
})},
configIds: allBulkerConfigs,
},
{
name: "added_columns_third_run",
tableName: "transactional_json_string_test",
modes: []bulker.BulkMode{bulker.Batch},
leaveResultingTable: true,
dataFile: "test_data/types_json_part3.ndjson",
expectedTable: ExpectedTable{
Columns: justColumns("_timestamp", "id", "name", "json1", "array1"),
},
expectedRowsCount: 5,
streamOptions: []bulker.StreamOption{bulker.WithSchema(types2.Schema{
Name: "d",
Fields: []types2.SchemaField{
{Name: "_timestamp", Type: types2.TIMESTAMP},
{Name: "id", Type: types2.INT64},
{Name: "name", Type: types2.STRING},
{Name: "json1", Type: types2.JSON},
{Name: "array1", Type: types2.JSON},
},
})},
configIds: allBulkerConfigs,
},
{
name: "dummy_test_table_cleanup",
tableName: "transactional_json_string_test",
modes: []bulker.BulkMode{bulker.Batch},
dataFile: "test_data/empty.ndjson",
configIds: allBulkerConfigs,
},
}
sequentialGroup := sync.WaitGroup{}
sequentialGroup.Add(1)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
runTestConfig(t, tt, testStream)
sequentialGroup.Done()
})
sequentialGroup.Wait()
sequentialGroup.Add(1)
}
}

0 comments on commit cc4460c

Please sign in to comment.