Skip to content

Commit

Permalink
bulker: additional performance tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Feb 13, 2025
1 parent f513d70 commit 30004c5
Show file tree
Hide file tree
Showing 12 changed files with 57 additions and 45 deletions.
24 changes: 13 additions & 11 deletions bulkerlib/implementations/flattener.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,21 @@ func (f *FlattenerImpl) FlattenObject(object types.Object, notFlatteningKeys typ
// recursive function for flatten key (if value is inner object -> recursion call)
// Reformat key
func (f *FlattenerImpl) flatten(key string, value types.Object, destination types.Object, notFlatteningKeys types2.Set[string]) error {
if _, ok := notFlatteningKeys[key]; ok {
if f.stringifyObjects {
// if there is sql type hint for nested object - we don't flatten it.
// Instead, we marshal it to json string hoping that database cast function will do the job
b, err := jsonorder.MarshalToString(value)
if err != nil {
return fmt.Errorf("error marshaling json object with key %s: %v", key, err)
if notFlatteningKeys != nil {
if _, ok := notFlatteningKeys[key]; ok {
if f.stringifyObjects {
// if there is sql type hint for nested object - we don't flatten it.
// Instead, we marshal it to json string hoping that database cast function will do the job
b, err := jsonorder.MarshalToString(value)
if err != nil {
return fmt.Errorf("error marshaling json object with key %s: %v", key, err)
}
destination.Set(key, b)
} else {
destination.Set(key, types.ObjectToMap(value))
}
destination.Set(key, b)
} else {
destination.Set(key, types.ObjectToMap(value))
return nil
}
return nil
}
for el := value.Front(); el != nil; el = el.Next() {
var newKey string
Expand Down
5 changes: 3 additions & 2 deletions bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,13 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable,
columnsAdded := 0
current := currentTable.Columns
unmappedObj := map[string]any{}
exists := existingTable.Exists()
for el := desiredTable.Columns.Front(); el != nil; el = el.Next() {
name := el.Key
newCol := el.Value
var existingCol types.SQLColumn
ok := false
if existingTable.Exists() {
if exists {
existingCol, ok = existingTable.Columns.Get(name)
}
if !ok {
Expand Down Expand Up @@ -261,7 +262,7 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable,
if len(unmappedObj) > 0 {
var existingCol types.SQLColumn
ok := false
if existingTable.Exists() {
if exists {
existingCol, ok = existingTable.Columns.Get(ps.unmappedDataColumn)
}
if !ok {
Expand Down
12 changes: 7 additions & 5 deletions bulkerlib/implementations/sql/batch_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package sql

import (
types2 "github.com/jitsucom/bulker/bulkerlib/types"
"github.com/jitsucom/bulker/jitsubase/types"
)

type Fields = *types.OrderedMap[string, Field]
type Fields = []Field

// TypesHeader is the schema result of parsing JSON objects
type TypesHeader struct {
Expand All @@ -16,25 +15,28 @@ type TypesHeader struct {

// Exists returns true if there is at least one field
func (bh *TypesHeader) Exists() bool {
return bh != nil && bh.Fields.Len() > 0
return bh != nil && len(bh.Fields) > 0
}

// Field is a data type holder with sql type suggestion
type Field struct {
Name string
dataType *types2.DataType
suggestedType *types2.SQLColumn
}

// NewField returns Field instance
func NewField(t types2.DataType) Field {
func NewField(name string, t types2.DataType) Field {
return Field{
Name: name,
dataType: &t,
}
}

// NewFieldWithSQLType returns Field instance with configured suggested sql types
func NewFieldWithSQLType(t types2.DataType, suggestedType *types2.SQLColumn) Field {
func NewFieldWithSQLType(name string, t types2.DataType, suggestedType *types2.SQLColumn) Field {
return Field{
Name: name,
dataType: &t,
suggestedType: suggestedType,
}
Expand Down
3 changes: 2 additions & 1 deletion bulkerlib/implementations/sql/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,8 @@ func (bq *BigQuery) toWhenConditions(conditions *WhenConditions) (string, []bigq
queryConditions = append(queryConditions, bq.quotedColumnName(condition.Field)+" "+condition.Clause)
default:
queryConditions = append(queryConditions, bq.quotedColumnName(condition.Field)+" "+condition.Clause+" @when_"+condition.Field)
values = append(values, bigquery.QueryParameter{Name: "when_" + condition.Field, Value: types2.ReformatValue(condition.Value)})
v, _ := types2.ReformatValue(condition.Value)
values = append(values, bigquery.QueryParameter{Name: "when_" + condition.Field, Value: v})
}
}

Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ func (ch *ClickHouse) localTableName(tableName string) string {
}

func convertType(value any, column types.SQLColumn) (any, error) {
v := types.ReformatValue(value)
v, _ := types.ReformatValue(value)
lt := strings.ToLower(column.Type)
switch lt {
case "float64":
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *L
targetTable.Columns.ForEachIndexed(func(i int, name string, col types2.SQLColumn) {
val, ok := object[name]
if ok {
val = types2.ReformatValue(val)
val, _ = types2.ReformatValue(val)
}
args[i] = m.valueMappingFunction(val, ok, col)
})
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (p *Postgres) LoadTable(ctx context.Context, targetTable *Table, loadSource
targetTable.Columns.ForEachIndexed(func(i int, v string, col types2.SQLColumn) {
val, ok := object[v]
if ok {
val = types2.ReformatValue(val)
val, _ = types2.ReformatValue(val)
}
args[i] = p.valueMappingFunction(val, ok, col)
})
Expand Down
3 changes: 2 additions & 1 deletion bulkerlib/implementations/sql/sql_adapter_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,8 @@ func (b *SQLAdapterBase[T]) ToWhenConditions(conditions *WhenConditions, paramEx
queryConditions = append(queryConditions, b.quotedColumnName(condition.Field)+" "+condition.Clause)
default:
queryConditions = append(queryConditions, b.quotedColumnName(condition.Field)+" "+condition.Clause+" "+paramExpression(i+valuesShift+1, condition.Field))
values = append(values, types2.ReformatValue(condition.Value))
v, _ := types2.ReformatValue(condition.Value)
values = append(values, v)
}
}

Expand Down
5 changes: 2 additions & 3 deletions bulkerlib/implementations/sql/table_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ func (th *TableHelper) MapTableSchema(sqlAdapter SQLAdapter, batchHeader *TypesH
table.PrimaryKeyName = sqlAdapter.BuildConstraintName(table.Name)
}

for el := batchHeader.Fields.Front(); el != nil; el = el.Next() {
fieldName := el.Key
field := el.Value
for _, field := range batchHeader.Fields {
fieldName := field.Name
colName := th.ColumnName(fieldName)
if colName != fieldName {
object.Rename(fieldName, colName)
Expand Down
23 changes: 13 additions & 10 deletions bulkerlib/implementations/sql/type_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sql
import (
"fmt"
types2 "github.com/jitsucom/bulker/bulkerlib/types"
"github.com/jitsucom/bulker/jitsubase/types"
)

var DefaultTypeResolver = NewTypeResolver()
Expand All @@ -24,9 +23,7 @@ func NewDummyTypeResolver() *DummyTypeResolver {

// Resolve return one dummy field and types.Fields becomes not empty. (it is used in Facebook destination)
func (dtr *DummyTypeResolver) Resolve(object map[string]any, sqlTypeHints types2.SQLTypes) (Fields, error) {
fields := types.NewOrderedMap[string, Field]()
fields.Set("dummy", NewField(types2.UNKNOWN))
return fields, nil
return []Field{NewField("dummy", types2.UNKNOWN)}, nil
}

// TypeResolverImpl resolves types based on converter.go rules
Expand All @@ -43,12 +40,14 @@ func NewTypeResolver() *TypeResolverImpl {
// reformat from json.Number into int64 or float64 and put back
// reformat from string with timestamp into time.Time and put back
func (tr *TypeResolverImpl) Resolve(object types2.Object, sqlTypeHints types2.SQLTypes) (Fields, error) {
fields := types.NewOrderedMap[string, Field]()
fields := make(Fields, 0, object.Len())
//apply default typecast and define column types
for el := object.Front(); el != nil; el = el.Next() {
v := types2.ReformatValue(el.Value)
v, ok := types2.ReformatValue(el.Value)
k := el.Key
el.Value = v
if ok {
el.Value = v
}
//value type
resultColumnType, err := types2.TypeFromValue(v)
if err != nil {
Expand All @@ -65,10 +64,14 @@ func (tr *TypeResolverImpl) Resolve(object types2.Object, sqlTypeHints types2.SQ
// resultColumnType = defaultType
// object[k] = converted
//}
if sqlType, ok := sqlTypeHints[k]; ok {
fields.Set(k, NewFieldWithSQLType(resultColumnType, &sqlType))
if sqlTypeHints == nil {
fields = append(fields, NewField(k, resultColumnType))
} else {
fields.Set(k, NewField(resultColumnType))
if sqlType, ok := sqlTypeHints[k]; ok {
fields = append(fields, NewFieldWithSQLType(k, resultColumnType, &sqlType))
} else {
fields = append(fields, NewField(k, resultColumnType))
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions bulkerlib/types/datatype.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,16 @@ func StringFromType(dataType DataType) (string, error) {
// we have to check does json number have dot in string representation
//
// if have -> return float64 otherwise int64
func ReformatValue(v any) any {
func ReformatValue(v any) (any, bool) {
n, ok := ReformatNumberValue(v)
if ok {
return n
return n, true
}
ts, ok := ReformatTimeValue(v, false)
if ok {
return ts
return ts, true
} else {
return v
return v, false
}
}

Expand Down
13 changes: 8 additions & 5 deletions jitsubase/types/orderedmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func (m *OrderedMap[K, V]) Get(key K) (value V, ok bool) {

// GetN returns the value for a key. If the key does not exist, it returns the zero value for type V, such as 0 for int or an empty string for string.
func (m *OrderedMap[K, V]) GetN(key K) V {
var empty V
v, ok := m.kv[key]
if ok {
return v.Value
} else {
var empty V
return empty
}

return empty
}

// GetS returns the value for a key if it exists and has string type. Otherwise, it returns an empty string.
Expand Down Expand Up @@ -87,6 +87,9 @@ func (m *OrderedMap[K, V]) GetPathS(path K) string {
}

func (m *OrderedMap[K, V]) GetPathN(path []K) any {
if len(path) == 1 {
return m.GetN(path[0])
}
obj := m
for i, key := range path {
if i == len(path)-1 {
Expand All @@ -110,9 +113,9 @@ func (m *OrderedMap[K, V]) GetPathN(path []K) any {
// will be returned. The returned value will be false if the value was replaced
// (even if the value was the same).
func (m *OrderedMap[K, V]) Set(key K, value V) bool {
_, alreadyExist := m.kv[key]
v, alreadyExist := m.kv[key]
if alreadyExist {
m.kv[key].Value = value
v.Value = value
return false
}

Expand Down

0 comments on commit 30004c5

Please sign in to comment.