Skip to content

Commit

Permalink
New features: BulkUpsert request size limit and '.others' column for …
Browse files Browse the repository at this point in the history
…extra data as JSON
  • Loading branch information
Maksim Zinal committed May 1, 2024
1 parent a86b559 commit 745b841
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 36 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
require (
github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c
github.com/stretchr/testify v1.8.3
github.com/ydb-platform/ydb-go-sdk/v3 v3.66.0
github.com/ydb-platform/ydb-go-sdk/v3 v3.66.1
github.com/ydb-platform/ydb-go-yc v0.12.1
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,8 @@ github.com/ydb-platform/ydb-go-sdk/v3 v3.44.0/go.mod h1:oSLwnuilwIpaF5bJJMAofnGg
github.com/ydb-platform/ydb-go-sdk/v3 v3.47.3/go.mod h1:bWnOIcUHd7+Sl7DN+yhyY1H/I61z53GczvwJgXMgvj0=
github.com/ydb-platform/ydb-go-sdk/v3 v3.66.0 h1:ffZ1Kh60rA62BVHVafH/xrIn5yrlJ/ZDvIoVg7mhVGo=
github.com/ydb-platform/ydb-go-sdk/v3 v3.66.0/go.mod h1:hGX4CijskNnUTRgLlqMvZdrBQc1ALZgAnKHytF5nmj4=
github.com/ydb-platform/ydb-go-sdk/v3 v3.66.1 h1:K6TNapbuMFTFFMil26rP3fMgZmuLE9jP8curXRyKlZs=
github.com/ydb-platform/ydb-go-sdk/v3 v3.66.1/go.mod h1:hGX4CijskNnUTRgLlqMvZdrBQc1ALZgAnKHytF5nmj4=
github.com/ydb-platform/ydb-go-yc v0.12.1 h1:qw3Fa+T81+Kpu5Io2vYHJOwcrYrVjgJlT6t/0dOXJrA=
github.com/ydb-platform/ydb-go-yc v0.12.1/go.mod h1:t/ZA4ECdgPWjAb4jyDe8AzQZB5dhpGbi3iCahFaNwBY=
github.com/ydb-platform/ydb-go-yc-metadata v0.6.1 h1:9E5q8Nsy2RiJMZDNVy0A3KUrIMBPakJ2VgloeWbcI84=
Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (

KeyTimestamp = ".timestamp"
KeyInput = ".input"
KeyOthers = ".others"
)

type credentialsDescription struct {
Expand Down
223 changes: 189 additions & 34 deletions internal/storage/ydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path"
"reflect"
"strings"
"time"

ydb "github.com/ydb-platform/ydb-go-sdk/v3"
Expand Down Expand Up @@ -69,10 +70,11 @@ func New(cfg *config.Config) (*YDB, error) {
}

const (
textType = "Text"
bytesType = "Bytes"
jsonType = "Json"
timestampType = "Timestamp"
textType = "Text"
bytesType = "Bytes"
jsonType = "Json"
jsonDocumentType = "JsonDocument"
timestampType = "Timestamp"
)

func (s *YDB) resolveFieldMapping(ctx context.Context) error {
Expand Down Expand Up @@ -114,98 +116,231 @@ func (s *YDB) resolveFieldMapping(ctx context.Context) error {
return nil
}

func type2Type(t types.Type, v interface{}) (types.Value, error) {
func type2Type(t types.Type, v interface{}) (types.Value, int, error) {
optional, columnType := convertTypeIfOptional(t)
columnTypeYql := yqlType(columnType)

if v == nil {
if optional {
switch columnTypeYql {
case timestampType:
return types.NullableTimestampValue(nil), 64, nil
case bytesType:
return types.NullableBytesValue(nil), 16, nil
case textType:
return types.NullableTextValue(nil), 16, nil
case jsonType:
return types.NullableJSONValue(nil), 16, nil
case jsonDocumentType:
return types.NullableJSONDocumentValue(nil), 16, nil
}
} else {
switch columnTypeYql {
case timestampType:
return types.TimestampValueFromTime(time.UnixMicro(0)), 64, nil
case bytesType:
return types.BytesValue(make([]byte, 0)), 16, nil
case textType:
return types.TextValue(""), 16, nil
case jsonType:
return types.JSONValue("{}"), 32, nil
case jsonDocumentType:
return types.JSONDocumentValue("{}"), 32, nil
}
}
return nil, -1, fmt.Errorf("not supported conversion from NULL to '%s' (%s)", columnTypeYql, t)
}

switch v := v.(type) {
case time.Time:
switch columnTypeYql {
case timestampType:
return convertValueIfOptional(optional, types.TimestampValueFromTime(v)), nil
return convertValueIfOptional(optional, types.TimestampValueFromTime(v)), 32, nil
default:
return nil, fmt.Errorf("not supported conversion (time) from '%s' to '%s' (%s)", v, columnTypeYql, t)
return nil, -1, fmt.Errorf("not supported conversion (time) from '%s' to '%s' (%s)", v, columnTypeYql, t)
}
case []byte:
switch columnTypeYql {
case bytesType:
return convertValueIfOptional(optional, types.BytesValue(v)), nil
return convertValueIfOptional(optional, types.BytesValue(v)), 8 + len(v), nil
case textType:
return convertValueIfOptional(optional, types.TextValue(string(v))), nil
return convertValueIfOptional(optional, types.TextValue(string(v))), 8 + len(v), nil
case timestampType:
return convertTimestamp(optional, string(v)), 64, nil
default:
return nil, fmt.Errorf("not supported conversion (bytes) from '%s' to '%s' (%s)", v, columnTypeYql, t)
return nil, -1, fmt.Errorf("not supported conversion (bytes) from '%s' to '%s' (%s)", v, columnTypeYql, t)
}
case string:
switch columnTypeYql {
case bytesType:
return convertValueIfOptional(optional, types.BytesValueFromString(v)), nil
return convertValueIfOptional(optional, types.BytesValueFromString(v)), 8 + len(v), nil
case textType:
return convertValueIfOptional(optional, types.TextValue(v)), nil
return convertValueIfOptional(optional, types.TextValue(v)), 8 + len(v), nil
case timestampType:
return convertTimestamp(optional, v), 64, nil
default:
return nil, fmt.Errorf("not supported conversion (string) from '%s' to '%s' (%s)", v, columnTypeYql, t)
return nil, -1, fmt.Errorf("not supported conversion (string) from '%s' to '%s' (%s)", v, columnTypeYql, t)
}
case map[interface{}]interface{}:
j, err := json.Marshal(convertByteFieldsToString(v))
if err != nil {
return nil, fmt.Errorf("failed to marshal json value: %w. Value: %#v", err, v)
return nil, -1, fmt.Errorf("failed to marshal json value: %w. Value: %#v", err, v)
}

switch columnTypeYql {
case bytesType:
return convertValueIfOptional(optional, types.BytesValue(j)), nil
return convertValueIfOptional(optional, types.BytesValue(j)), 8 + len(j), nil
case textType:
return convertValueIfOptional(optional, types.TextValue(string(j))), nil
return convertValueIfOptional(optional, types.TextValue(string(j))), 8 + len(j), nil
case jsonType:
return convertValueIfOptional(optional, types.JSONValueFromBytes(j)), nil
return convertValueIfOptional(optional, types.JSONValue(string(j))), 8 + len(j), nil
case jsonDocumentType:
return convertValueIfOptional(optional, types.JSONDocumentValue(string(j))), 8 + len(j), nil
case timestampType:
return convertTimestamp(optional, string(j)), 64, nil
default:
return nil, fmt.Errorf("not supported conversion (map) '%s' to '%s' (%s)", v, columnTypeYql, t)
return nil, -1, fmt.Errorf("not supported conversion (map) '%s' to '%s' (%s)", v, columnTypeYql, t)
}
default:
return nil, fmt.Errorf("not supported source type '%s', type: %s", v, reflect.TypeOf(v))
return nil, -1, fmt.Errorf("not supported source type '%s', type: %s", v, reflect.TypeOf(v))
}
}

func (s *YDB) Write(events []*model.Event) error {
func (s *YDB) BuildColumnUsageMap() map[string]string {
m := make(map[string]string)
for k, v := range s.fieldMapping {

Check failure on line 211 in internal/storage/ydb.go

View workflow job for this annotation

GitHub Actions / golangci-lint

rangeValCopy: each iteration copies 48 bytes (consider pointers or indexing) (gocritic)
if !strings.HasPrefix(k, ".") {
m[k] = v.Name
}
}
return m
}

func (s *YDB) ConvertRows(events []*model.Event) ([]types.Value, int, error) {
rows := make([]types.Value, 0, len(events))
maxrowbytes := 1
colCount := len(s.fieldMapping)

var othersValue map[interface{}]interface{}
othersColumn, othersUsed := s.fieldMapping[config.KeyOthers]

for _, event := range events {
columns := make([]types.StructValueOption, 0, len(event.Message)+2) //nolint:gomnd
if othersUsed {
othersValue = make(map[interface{}]interface{})
}
rowbytes := 128
columns := make([]types.StructValueOption, 0, colCount) //nolint:gomnd

v, err := type2Type(s.fieldMapping[config.KeyTimestamp].Type, event.Timestamp)
v, vlen, err := type2Type(s.fieldMapping[config.KeyTimestamp].Type, event.Timestamp)
if err != nil {
return err
return nil, -1, err
}
columns = append(columns, types.StructFieldValue(s.fieldMapping[config.KeyTimestamp].Name, v))
rowbytes += 64 + vlen

Check failure on line 239 in internal/storage/ydb.go

View workflow job for this annotation

GitHub Actions / golangci-lint

mnd: Magic number: 64, in <operation> detected (gomnd)
rowbytes += len(s.fieldMapping[config.KeyTimestamp].Name)

v, err = type2Type(s.fieldMapping[config.KeyInput].Type, event.Metadata)
v, vlen, err = type2Type(s.fieldMapping[config.KeyInput].Type, event.Metadata)
if err != nil {
return err
return nil, -1, err
}
columns = append(columns, types.StructFieldValue(s.fieldMapping[config.KeyInput].Name, v))
rowbytes += 64 + vlen

Check failure on line 247 in internal/storage/ydb.go

View workflow job for this annotation

GitHub Actions / golangci-lint

mnd: Magic number: 64, in <operation> detected (gomnd)
rowbytes += len(s.fieldMapping[config.KeyInput].Name)

columnUsageMap := s.BuildColumnUsageMap()

for field, value := range event.Message {
column, exists := s.fieldMapping[field]
if !exists {
log.Warn(fmt.Sprintf("column for message key: %s (value: %s) not found, skip", field, value))

if othersUsed {
othersValue[field] = value
} else {
log.Debug(fmt.Sprintf("column for message key: %s (value: %v) not found, skipped", field, value))
}
continue
}

v, err := type2Type(column.Type, value)
v, vlen, err := type2Type(column.Type, value)
if err != nil {
return err
log.Warn(fmt.Sprintf("failed to convert column for message key: %s (value: %v), skipped. %v", field, value, err))
continue
}
columns = append(columns, types.StructFieldValue(s.fieldMapping[field].Name, v))
rowbytes += 64 + vlen

Check failure on line 269 in internal/storage/ydb.go

View workflow job for this annotation

GitHub Actions / golangci-lint

mnd: Magic number: 64, in <operation> detected (gomnd)
rowbytes += len(s.fieldMapping[field].Name)
delete(columnUsageMap, field)
}

if len(columnUsageMap) > 0 {
// some columns were not included
for cname := range columnUsageMap {
cdef := s.fieldMapping[cname]
v, vlen, err = type2Type(cdef.Type, nil)
if err != nil {
return nil, -1, err
}
columns = append(columns, types.StructFieldValue(cdef.Name, v))
rowbytes += 64 + vlen

Check failure on line 283 in internal/storage/ydb.go

View workflow job for this annotation

GitHub Actions / golangci-lint

mnd: Magic number: 64, in <operation> detected (gomnd)
rowbytes += len(cdef.Name)
}
}

if othersUsed {
v, vlen, err := type2Type(othersColumn.Type, othersValue)
if err != nil {
return nil, -1, err
}
columns = append(columns, types.StructFieldValue(othersColumn.Name, v))
rowbytes += 64 + vlen

Check failure on line 294 in internal/storage/ydb.go

View workflow job for this annotation

GitHub Actions / golangci-lint

mnd: Magic number: 64, in <operation> detected (gomnd)
rowbytes += len(othersColumn.Name)
}

rows = append(rows, types.StructValue(columns...))
if rowbytes > maxrowbytes {
maxrowbytes = rowbytes
}
}

err := s.db.Table().Do(context.Background(),
func(ctx context.Context, sess table.Session) error {
return sess.BulkUpsert(ctx, path.Join(s.db.Name(), s.cfg.TablePath), types.ListValue(rows...))
},
)
return rows, maxrowbytes, nil
}

func (s *YDB) Write(events []*model.Event) error {
// convert the input events to the database rows
rows, maxrowbytes, err := s.ConvertRows(events)
if err != nil {
return err
}
sz := len(events)
// split the rows into portions having size of no more than 30 megabytes
portion := (30 * 1024 * 1024) / maxrowbytes

Check failure on line 315 in internal/storage/ydb.go

View workflow job for this annotation

GitHub Actions / golangci-lint

mnd: Magic number: 1024, in <operation> detected (gomnd)
if portion < 1 {
portion = 1
}
if portion > sz {
portion = sz
}
log.Debug(fmt.Sprintf("Got events block of size %d with portion %d and %d max bytes per row...", sz, portion, maxrowbytes))

Check failure on line 322 in internal/storage/ydb.go

View workflow job for this annotation

GitHub Actions / golangci-lint

line is 124 characters (lll)
position := 0
for position < sz {
finish := position + portion
if finish > sz {
finish = sz
}
part := rows[position:finish]
log.Debug(fmt.Sprintf("...Processing positions [%d:%d], size %d", position, finish, len(part)))
err = s.db.Table().Do(context.Background(),
func(ctx context.Context, sess table.Session) error {
return sess.BulkUpsert(ctx, path.Join(s.db.Name(), s.cfg.TablePath), types.ListValue(part...))
},
)
if err != nil {
log.Debug(fmt.Sprintf("...BulkUpsert failed: %v", err))
//break

Check failure on line 338 in internal/storage/ydb.go

View workflow job for this annotation

GitHub Actions / golangci-lint

commentFormatting: put a space between `//` and comment text (gocritic)
} else {
log.Debug("...BulkUpsert succeeded")
}
position = finish
}

if ydb.IsOperationErrorSchemeError(err) {
log.Warn("Detected scheme error, trying to resolve field mapping from table description")
Expand Down Expand Up @@ -269,6 +404,26 @@ func convertValueIfOptional(optional bool, v types.Value) types.Value {
return v
}

func convertTimestamp(optional bool, v string) types.Value {
var err error
if len(v) == 24 {

Check failure on line 409 in internal/storage/ydb.go

View workflow job for this annotation

GitHub Actions / golangci-lint

mnd: Magic number: 24, in <condition> detected (gomnd)
var tv time.Time
tv, err = time.Parse(time.RFC3339, v)
if err == nil {
return convertValueIfOptional(optional, types.TimestampValueFromTime(tv))
}
}
if err == nil {
log.Warn(fmt.Sprintf("failed to parse value [%s] as timestamp - unknown format", v))
} else {
log.Warn(fmt.Sprintf("failed to parse value [%s] as timestamp - %s", v, err))
}
if optional {
return types.NullValue(types.TypeTimestamp)
}
return convertValueIfOptional(optional, types.TimestampValueFromTime(time.Now()))
}

func pointer[T any](v T) *T {
return &v
}
2 changes: 1 addition & 1 deletion internal/storage/ydb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestType2TypeOk(t *testing.T) {
tc := tc

t.Run(tc.name, func(t *testing.T) {
actual, err := type2Type(tc.column, tc.value)
actual, _, err := type2Type(tc.column, tc.value)

require.NoError(t, err)
require.Equal(t, tc.expected, actual)
Expand Down

0 comments on commit 745b841

Please sign in to comment.