From 30004c54781ab13a2f0049fca7589374e6fcaf01 Mon Sep 17 00:00:00 2001 From: Ildar Nurislamov Date: Thu, 13 Feb 2025 16:47:02 +0400 Subject: [PATCH] bulker: additional performance tweaks --- bulkerlib/implementations/flattener.go | 24 ++++++++++--------- bulkerlib/implementations/sql/abstract.go | 5 ++-- bulkerlib/implementations/sql/batch_header.go | 12 ++++++---- bulkerlib/implementations/sql/bigquery.go | 3 ++- bulkerlib/implementations/sql/clickhouse.go | 2 +- bulkerlib/implementations/sql/mysql.go | 2 +- bulkerlib/implementations/sql/postgres.go | 2 +- .../implementations/sql/sql_adapter_base.go | 3 ++- bulkerlib/implementations/sql/table_helper.go | 5 ++-- .../implementations/sql/type_resolver.go | 23 ++++++++++-------- bulkerlib/types/datatype.go | 8 +++---- jitsubase/types/orderedmap.go | 13 ++++++---- 12 files changed, 57 insertions(+), 45 deletions(-) diff --git a/bulkerlib/implementations/flattener.go b/bulkerlib/implementations/flattener.go index d36399a1..16c56c2a 100644 --- a/bulkerlib/implementations/flattener.go +++ b/bulkerlib/implementations/flattener.go @@ -56,19 +56,21 @@ func (f *FlattenerImpl) FlattenObject(object types.Object, notFlatteningKeys typ // recursive function for flatten key (if value is inner object -> recursion call) // Reformat key func (f *FlattenerImpl) flatten(key string, value types.Object, destination types.Object, notFlatteningKeys types2.Set[string]) error { - if _, ok := notFlatteningKeys[key]; ok { - if f.stringifyObjects { - // if there is sql type hint for nested object - we don't flatten it. - // Instead, we marshal it to json string hoping that database cast function will do the job - b, err := jsonorder.MarshalToString(value) - if err != nil { - return fmt.Errorf("error marshaling json object with key %s: %v", key, err) + if notFlatteningKeys != nil { + if _, ok := notFlatteningKeys[key]; ok { + if f.stringifyObjects { + // if there is sql type hint for nested object - we don't flatten it. + // Instead, we marshal it to json string hoping that database cast function will do the job + b, err := jsonorder.MarshalToString(value) + if err != nil { + return fmt.Errorf("error marshaling json object with key %s: %v", key, err) + } + destination.Set(key, b) + } else { + destination.Set(key, types.ObjectToMap(value)) } - destination.Set(key, b) - } else { - destination.Set(key, types.ObjectToMap(value)) + return nil } - return nil } for el := value.Front(); el != nil; el = el.Next() { var newKey string diff --git a/bulkerlib/implementations/sql/abstract.go b/bulkerlib/implementations/sql/abstract.go index 7e8cddd0..ed7849fd 100644 --- a/bulkerlib/implementations/sql/abstract.go +++ b/bulkerlib/implementations/sql/abstract.go @@ -175,12 +175,13 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable, columnsAdded := 0 current := currentTable.Columns unmappedObj := map[string]any{} + exists := existingTable.Exists() for el := desiredTable.Columns.Front(); el != nil; el = el.Next() { name := el.Key newCol := el.Value var existingCol types.SQLColumn ok := false - if existingTable.Exists() { + if exists { existingCol, ok = existingTable.Columns.Get(name) } if !ok { @@ -261,7 +262,7 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable, if len(unmappedObj) > 0 { var existingCol types.SQLColumn ok := false - if existingTable.Exists() { + if exists { existingCol, ok = existingTable.Columns.Get(ps.unmappedDataColumn) } if !ok { diff --git a/bulkerlib/implementations/sql/batch_header.go b/bulkerlib/implementations/sql/batch_header.go index dd73045d..006958e3 100644 --- a/bulkerlib/implementations/sql/batch_header.go +++ b/bulkerlib/implementations/sql/batch_header.go @@ -2,10 +2,9 @@ package sql import ( types2 "github.com/jitsucom/bulker/bulkerlib/types" - "github.com/jitsucom/bulker/jitsubase/types" ) -type Fields = *types.OrderedMap[string, Field] +type Fields = []Field // TypesHeader is the schema result of parsing JSON objects type TypesHeader struct { @@ -16,25 +15,28 @@ type TypesHeader struct { // Exists returns true if there is at least one field func (bh *TypesHeader) Exists() bool { - return bh != nil && bh.Fields.Len() > 0 + return bh != nil && len(bh.Fields) > 0 } // Field is a data type holder with sql type suggestion type Field struct { + Name string dataType *types2.DataType suggestedType *types2.SQLColumn } // NewField returns Field instance -func NewField(t types2.DataType) Field { +func NewField(name string, t types2.DataType) Field { return Field{ + Name: name, dataType: &t, } } // NewFieldWithSQLType returns Field instance with configured suggested sql types -func NewFieldWithSQLType(t types2.DataType, suggestedType *types2.SQLColumn) Field { +func NewFieldWithSQLType(name string, t types2.DataType, suggestedType *types2.SQLColumn) Field { return Field{ + Name: name, dataType: &t, suggestedType: suggestedType, } diff --git a/bulkerlib/implementations/sql/bigquery.go b/bulkerlib/implementations/sql/bigquery.go index 72210ef7..7f381571 100644 --- a/bulkerlib/implementations/sql/bigquery.go +++ b/bulkerlib/implementations/sql/bigquery.go @@ -978,7 +978,8 @@ func (bq *BigQuery) toWhenConditions(conditions *WhenConditions) (string, []bigq queryConditions = append(queryConditions, bq.quotedColumnName(condition.Field)+" "+condition.Clause) default: queryConditions = append(queryConditions, bq.quotedColumnName(condition.Field)+" "+condition.Clause+" @when_"+condition.Field) - values = append(values, bigquery.QueryParameter{Name: "when_" + condition.Field, Value: types2.ReformatValue(condition.Value)}) + v, _ := types2.ReformatValue(condition.Value) + values = append(values, bigquery.QueryParameter{Name: "when_" + condition.Field, Value: v}) } } diff --git a/bulkerlib/implementations/sql/clickhouse.go b/bulkerlib/implementations/sql/clickhouse.go index 473b91a8..e7f45b9b 100644 --- a/bulkerlib/implementations/sql/clickhouse.go +++ b/bulkerlib/implementations/sql/clickhouse.go @@ -952,7 +952,7 @@ func (ch *ClickHouse) localTableName(tableName string) string { } func convertType(value any, column types.SQLColumn) (any, error) { - v := types.ReformatValue(value) + v, _ := types.ReformatValue(value) lt := strings.ToLower(column.Type) switch lt { case "float64": diff --git a/bulkerlib/implementations/sql/mysql.go b/bulkerlib/implementations/sql/mysql.go index 4c669833..de246434 100644 --- a/bulkerlib/implementations/sql/mysql.go +++ b/bulkerlib/implementations/sql/mysql.go @@ -307,7 +307,7 @@ func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *L targetTable.Columns.ForEachIndexed(func(i int, name string, col types2.SQLColumn) { val, ok := object[name] if ok { - val = types2.ReformatValue(val) + val, _ = types2.ReformatValue(val) } args[i] = m.valueMappingFunction(val, ok, col) }) diff --git a/bulkerlib/implementations/sql/postgres.go b/bulkerlib/implementations/sql/postgres.go index 072fd285..5690a455 100644 --- a/bulkerlib/implementations/sql/postgres.go +++ b/bulkerlib/implementations/sql/postgres.go @@ -373,7 +373,7 @@ func (p *Postgres) LoadTable(ctx context.Context, targetTable *Table, loadSource targetTable.Columns.ForEachIndexed(func(i int, v string, col types2.SQLColumn) { val, ok := object[v] if ok { - val = types2.ReformatValue(val) + val, _ = types2.ReformatValue(val) } args[i] = p.valueMappingFunction(val, ok, col) }) diff --git a/bulkerlib/implementations/sql/sql_adapter_base.go b/bulkerlib/implementations/sql/sql_adapter_base.go index 1e8ad9d7..5ba19721 100644 --- a/bulkerlib/implementations/sql/sql_adapter_base.go +++ b/bulkerlib/implementations/sql/sql_adapter_base.go @@ -767,7 +767,8 @@ func (b *SQLAdapterBase[T]) ToWhenConditions(conditions *WhenConditions, paramEx queryConditions = append(queryConditions, b.quotedColumnName(condition.Field)+" "+condition.Clause) default: queryConditions = append(queryConditions, b.quotedColumnName(condition.Field)+" "+condition.Clause+" "+paramExpression(i+valuesShift+1, condition.Field)) - values = append(values, types2.ReformatValue(condition.Value)) + v, _ := types2.ReformatValue(condition.Value) + values = append(values, v) } } diff --git a/bulkerlib/implementations/sql/table_helper.go b/bulkerlib/implementations/sql/table_helper.go index 03903991..104551b1 100644 --- a/bulkerlib/implementations/sql/table_helper.go +++ b/bulkerlib/implementations/sql/table_helper.go @@ -85,9 +85,8 @@ func (th *TableHelper) MapTableSchema(sqlAdapter SQLAdapter, batchHeader *TypesH table.PrimaryKeyName = sqlAdapter.BuildConstraintName(table.Name) } - for el := batchHeader.Fields.Front(); el != nil; el = el.Next() { - fieldName := el.Key - field := el.Value + for _, field := range batchHeader.Fields { + fieldName := field.Name colName := th.ColumnName(fieldName) if colName != fieldName { object.Rename(fieldName, colName) diff --git a/bulkerlib/implementations/sql/type_resolver.go b/bulkerlib/implementations/sql/type_resolver.go index bae23622..75703e19 100644 --- a/bulkerlib/implementations/sql/type_resolver.go +++ b/bulkerlib/implementations/sql/type_resolver.go @@ -3,7 +3,6 @@ package sql import ( "fmt" types2 "github.com/jitsucom/bulker/bulkerlib/types" - "github.com/jitsucom/bulker/jitsubase/types" ) var DefaultTypeResolver = NewTypeResolver() @@ -24,9 +23,7 @@ func NewDummyTypeResolver() *DummyTypeResolver { // Resolve return one dummy field and types.Fields becomes not empty. (it is used in Facebook destination) func (dtr *DummyTypeResolver) Resolve(object map[string]any, sqlTypeHints types2.SQLTypes) (Fields, error) { - fields := types.NewOrderedMap[string, Field]() - fields.Set("dummy", NewField(types2.UNKNOWN)) - return fields, nil + return []Field{NewField("dummy", types2.UNKNOWN)}, nil } // TypeResolverImpl resolves types based on converter.go rules @@ -43,12 +40,14 @@ func NewTypeResolver() *TypeResolverImpl { // reformat from json.Number into int64 or float64 and put back // reformat from string with timestamp into time.Time and put back func (tr *TypeResolverImpl) Resolve(object types2.Object, sqlTypeHints types2.SQLTypes) (Fields, error) { - fields := types.NewOrderedMap[string, Field]() + fields := make(Fields, 0, object.Len()) //apply default typecast and define column types for el := object.Front(); el != nil; el = el.Next() { - v := types2.ReformatValue(el.Value) + v, ok := types2.ReformatValue(el.Value) k := el.Key - el.Value = v + if ok { + el.Value = v + } //value type resultColumnType, err := types2.TypeFromValue(v) if err != nil { @@ -65,10 +64,14 @@ func (tr *TypeResolverImpl) Resolve(object types2.Object, sqlTypeHints types2.SQ // resultColumnType = defaultType // object[k] = converted //} - if sqlType, ok := sqlTypeHints[k]; ok { - fields.Set(k, NewFieldWithSQLType(resultColumnType, &sqlType)) + if sqlTypeHints == nil { + fields = append(fields, NewField(k, resultColumnType)) } else { - fields.Set(k, NewField(resultColumnType)) + if sqlType, ok := sqlTypeHints[k]; ok { + fields = append(fields, NewFieldWithSQLType(k, resultColumnType, &sqlType)) + } else { + fields = append(fields, NewField(k, resultColumnType)) + } } } diff --git a/bulkerlib/types/datatype.go b/bulkerlib/types/datatype.go index 5c3f8698..125e79aa 100644 --- a/bulkerlib/types/datatype.go +++ b/bulkerlib/types/datatype.go @@ -118,16 +118,16 @@ func StringFromType(dataType DataType) (string, error) { // we have to check does json number have dot in string representation // // if have -> return float64 otherwise int64 -func ReformatValue(v any) any { +func ReformatValue(v any) (any, bool) { n, ok := ReformatNumberValue(v) if ok { - return n + return n, true } ts, ok := ReformatTimeValue(v, false) if ok { - return ts + return ts, true } else { - return v + return v, false } } diff --git a/jitsubase/types/orderedmap.go b/jitsubase/types/orderedmap.go index d4b46cb8..4d3175fa 100644 --- a/jitsubase/types/orderedmap.go +++ b/jitsubase/types/orderedmap.go @@ -35,13 +35,13 @@ func (m *OrderedMap[K, V]) Get(key K) (value V, ok bool) { // GetN returns the value for a key. If the key does not exist, it returns the zero value for type V, such as 0 for int or an empty string for string. func (m *OrderedMap[K, V]) GetN(key K) V { - var empty V v, ok := m.kv[key] if ok { return v.Value + } else { + var empty V + return empty } - - return empty } // GetS returns the value for a key if it exists and has string type. Otherwise, it returns an empty string. @@ -87,6 +87,9 @@ func (m *OrderedMap[K, V]) GetPathS(path K) string { } func (m *OrderedMap[K, V]) GetPathN(path []K) any { + if len(path) == 1 { + return m.GetN(path[0]) + } obj := m for i, key := range path { if i == len(path)-1 { @@ -110,9 +113,9 @@ func (m *OrderedMap[K, V]) GetPathN(path []K) any { // will be returned. The returned value will be false if the value was replaced // (even if the value was the same). func (m *OrderedMap[K, V]) Set(key K, value V) bool { - _, alreadyExist := m.kv[key] + v, alreadyExist := m.kv[key] if alreadyExist { - m.kv[key].Value = value + v.Value = value return false }