diff --git a/app/server/paging/columnar_buffer_arrow_ipc_streaming_default.go b/app/server/paging/columnar_buffer_arrow_ipc_streaming_default.go index 4ebebfcf..3a255e84 100644 --- a/app/server/paging/columnar_buffer_arrow_ipc_streaming_default.go +++ b/app/server/paging/columnar_buffer_arrow_ipc_streaming_default.go @@ -41,6 +41,7 @@ func (cb *columnarBufferArrowIPCStreamingDefault[T]) addRow(transformer RowTrans // ToResponse returns all the accumulated data and clears buffer func (cb *columnarBufferArrowIPCStreamingDefault[T]) ToResponse() (*api_service_protos.TReadSplitsResponse, error) { + // chunk consists of columns chunk := make([]arrow.Array, 0, len(cb.builders)) // prepare arrow record diff --git a/app/server/paging/size.go b/app/server/paging/size.go index 88347ee4..108338ce 100644 --- a/app/server/paging/size.go +++ b/app/server/paging/size.go @@ -34,7 +34,7 @@ func (sp *sizePattern[T]) estimate(acceptors []T) (uint64, error) { sizeTotal := sp.fixedSizeTotal for _, ix := range sp.varyingSizeIx { - sizeVariable, _, err := sizeOfValue(acceptors[ix]) + sizeVariable, _, err := sizeOfValueBloated(acceptors[ix]) if err != nil { return 0, fmt.Errorf("size of value #%d: %w", ix, err) } @@ -49,7 +49,7 @@ func newSizePattern[T Acceptor](acceptors []T) (*sizePattern[T], error) { sp := &sizePattern[T]{} for i, acceptor := range acceptors { - size, kind, err := sizeOfValue(acceptor) + size, kind, err := sizeOfValueBloated(acceptor) if err != nil { return nil, fmt.Errorf("estimate size of value #%d: %w", i, err) } @@ -67,8 +67,10 @@ func newSizePattern[T Acceptor](acceptors []T) (*sizePattern[T], error) { return sp, nil } +// TODO: take money for empty []byte and string? at least 24 bytes +// //nolint:gocyclo -func sizeOfValue(v any) (uint64, acceptorKind, error) { +func sizeOfValueReflection(v any) (uint64, acceptorKind, error) { reflected := reflect.ValueOf(v) // for nil values @@ -128,3 +130,80 @@ func sizeOfValue(v any) (uint64, acceptorKind, error) { return 0, 0, fmt.Errorf("value %v of unexpected data type %T: %w", t, t, common.ErrDataTypeNotSupported) } } + +// TODO: take money for empty []byte and string? at least 24 bytes +// +//nolint:gocyclo +func sizeOfValueBloated(v any) (uint64, acceptorKind, error) { + switch t := v.(type) { + case bool, *bool, **bool: + return 1, fixedSize, nil + case int8, *int8, **int8, + uint8, *uint8, **uint8: + return 1, fixedSize, nil + case int16, *int16, **int16, + uint16, *uint16, **uint16: + return 2, fixedSize, nil + case int32, *int32, **int32, + uint32, *uint32, **uint32, + float32, *float32, **float32: + return 4, fixedSize, nil + case int64, *int64, **int64, + uint64, *uint64, **uint64, + float64, *float64, **float64: + return 8, fixedSize, nil + case time.Time, *time.Time, **time.Time: + // time.Time and all its derivatives consist of two 8-byte ints: + // https://cs.opensource.google/go/go/+/refs/tags/go1.21.4:src/time/time.go;l=141-142 + // Location is ignored. + return 16, fixedSize, nil + case []byte: + return uint64(len(t)), variableSize, nil + case *[]byte: + if t == nil { + return 0, variableSize, nil + } + + return uint64(len(*t)), variableSize, nil + case **[]byte: + if t == nil || *t == nil { + return 0, variableSize, nil + } + + return uint64(len(**t)), variableSize, nil + case string: + return uint64(len(t)), variableSize, nil + case *string: + if t == nil { + return 0, variableSize, nil + } + + return uint64(len(*t)), variableSize, nil + case **string: + if t == nil || *t == nil { + return 0, variableSize, nil + } + + return uint64(len(**t)), variableSize, nil + case *pgtype.Bool: + return 1, fixedSize, nil + case *pgtype.Int2: + return 2, fixedSize, nil + case *pgtype.Int4: + return 4, fixedSize, nil + case *pgtype.Int8: + return 8, fixedSize, nil + case *pgtype.Float4: + return 4, fixedSize, nil + case *pgtype.Float8: + return 8, fixedSize, nil + case *pgtype.Text: + return uint64(len(t.String)), variableSize, nil + case *pgtype.Date: + return 16, fixedSize, nil + case *pgtype.Timestamp: + return 16, fixedSize, nil + default: + return 0, 0, fmt.Errorf("value %v of unexpected data type %T: %w", t, t, common.ErrDataTypeNotSupported) + } +} diff --git a/app/server/paging/size_test.go b/app/server/paging/size_test.go index 91c79b3b..28efe54e 100644 --- a/app/server/paging/size_test.go +++ b/app/server/paging/size_test.go @@ -6,8 +6,17 @@ import ( "time" "github.com/stretchr/testify/require" + + "github.com/ydb-platform/fq-connector-go/library/go/ptr" ) +type sizeFn func(any) (uint64, acceptorKind, error) + +var sizeFns = map[string]sizeFn{ + "sizeOfValueReflection": sizeOfValueReflection, + "sizeOfValueBloated": sizeOfValueBloated, +} + type testCaseSize[Type any] struct { value Type expectedSize uint64 @@ -15,30 +24,34 @@ type testCaseSize[Type any] struct { } func (tc testCaseSize[Type]) execute(t *testing.T) { - name := reflect.TypeOf(tc.value).Name() - - t.Run(name, func(t *testing.T) { - x0 := tc.value - x1 := new(Type) - *x1 = x0 - x2 := new(*Type) - *x2 = x1 - - size0, kind0, err := sizeOfValue(x0) - require.NoError(t, err) - require.Equal(t, size0, tc.expectedSize) - require.Equal(t, kind0, tc.expectedKind) - - size1, kind1, err := sizeOfValue(x1) - require.NoError(t, err) - require.Equal(t, size1, tc.expectedSize) - require.Equal(t, kind1, tc.expectedKind) - - size2, kind2, err := sizeOfValue(x2) - require.NoError(t, err) - require.Equal(t, size2, tc.expectedSize) - require.Equal(t, kind2, tc.expectedKind) - }) + typeName := reflect.TypeOf(tc.value).Name() + + for fnName, fn := range sizeFns { + fnName, fn := fnName, fn + + t.Run(fnName+"_"+typeName, func(t *testing.T) { + x0 := tc.value + x1 := new(Type) + *x1 = x0 + x2 := new(*Type) + *x2 = x1 + + size0, kind0, err := fn(x0) + require.NoError(t, err) + require.Equal(t, size0, tc.expectedSize) + require.Equal(t, kind0, tc.expectedKind) + + size1, kind1, err := fn(x1) + require.NoError(t, err) + require.Equal(t, size1, tc.expectedSize) + require.Equal(t, kind1, tc.expectedKind) + + size2, kind2, err := fn(x2) + require.NoError(t, err) + require.Equal(t, size2, tc.expectedSize) + require.Equal(t, kind2, tc.expectedKind) + }) + } } func TestSize(t *testing.T) { @@ -68,3 +81,18 @@ func TestSize(t *testing.T) { tc.execute(t) } } + +func BenchmarkSizeOfValue(b *testing.B) { + for fnName, fn := range sizeFns { + b.Run(fnName, func(b *testing.B) { + for i := 0; i < b.N; i++ { + _, _, _ = fn(true) + _, _, _ = fn(ptr.Bool(true)) + _, _, _ = fn(int64(123)) + _, _, _ = fn(ptr.Int64(123)) + _, _, _ = fn(string("abcde")) + _, _, _ = fn(ptr.String("abcde")) + } + }) + } +} diff --git a/common/arrow_helpers.go b/common/arrow_helpers.go index cfa00452..5b263dae 100644 --- a/common/arrow_helpers.go +++ b/common/arrow_helpers.go @@ -150,10 +150,14 @@ func ydbTypeToArrowBuilder(typeID Ydb.Type_PrimitiveTypeId, arrowAllocator memor builder = array.NewFloat64Builder(arrowAllocator) case Ydb.Type_STRING: builder = array.NewBinaryBuilder(arrowAllocator, arrow.BinaryTypes.Binary) + // TODO: find more reasonable constant, maybe make dependency on paging settings + builder.(*array.BinaryBuilder).ReserveData(1 << 20) case Ydb.Type_UTF8: // TODO: what about LargeString? // https://arrow.apache.org/docs/cpp/api/datatype.html#_CPPv4N5arrow4Type4type12LARGE_STRINGE builder = array.NewStringBuilder(arrowAllocator) + // TODO: find more reasonable constant, maybe make dependency on paging settings + builder.(*array.StringBuilder).ReserveData(1 << 20) case Ydb.Type_DATE: builder = array.NewUint16Builder(arrowAllocator) case Ydb.Type_DATETIME: @@ -164,6 +168,9 @@ func ydbTypeToArrowBuilder(typeID Ydb.Type_PrimitiveTypeId, arrowAllocator memor return nil, fmt.Errorf("register type '%v': %w", typeID, ErrDataTypeNotSupported) } + // TODO: find more reasonable constant, maybe make dependency on paging settings + builder.Reserve(1 << 15) + return builder, nil }