Skip to content

Commit

Permalink
YQ 2837.memory (#41)
Browse files Browse the repository at this point in the history
* Preallocate arrow builders

* Avoid using reflection for variable size estimation
  • Loading branch information
vitalyisaev2 authored Feb 2, 2024
1 parent c7a847b commit 0927edb
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (cb *columnarBufferArrowIPCStreamingDefault[T]) addRow(transformer RowTrans

// ToResponse returns all the accumulated data and clears buffer
func (cb *columnarBufferArrowIPCStreamingDefault[T]) ToResponse() (*api_service_protos.TReadSplitsResponse, error) {
// chunk consists of columns
chunk := make([]arrow.Array, 0, len(cb.builders))

// prepare arrow record
Expand Down
85 changes: 82 additions & 3 deletions app/server/paging/size.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (sp *sizePattern[T]) estimate(acceptors []T) (uint64, error) {
sizeTotal := sp.fixedSizeTotal

for _, ix := range sp.varyingSizeIx {
sizeVariable, _, err := sizeOfValue(acceptors[ix])
sizeVariable, _, err := sizeOfValueBloated(acceptors[ix])
if err != nil {
return 0, fmt.Errorf("size of value #%d: %w", ix, err)
}
Expand All @@ -49,7 +49,7 @@ func newSizePattern[T Acceptor](acceptors []T) (*sizePattern[T], error) {
sp := &sizePattern[T]{}

for i, acceptor := range acceptors {
size, kind, err := sizeOfValue(acceptor)
size, kind, err := sizeOfValueBloated(acceptor)
if err != nil {
return nil, fmt.Errorf("estimate size of value #%d: %w", i, err)
}
Expand All @@ -67,8 +67,10 @@ func newSizePattern[T Acceptor](acceptors []T) (*sizePattern[T], error) {
return sp, nil
}

// TODO: take money for empty []byte and string? at least 24 bytes
//
//nolint:gocyclo
func sizeOfValue(v any) (uint64, acceptorKind, error) {
func sizeOfValueReflection(v any) (uint64, acceptorKind, error) {
reflected := reflect.ValueOf(v)

// for nil values
Expand Down Expand Up @@ -128,3 +130,80 @@ func sizeOfValue(v any) (uint64, acceptorKind, error) {
return 0, 0, fmt.Errorf("value %v of unexpected data type %T: %w", t, t, common.ErrDataTypeNotSupported)
}
}

// TODO: take money for empty []byte and string? at least 24 bytes
//
//nolint:gocyclo
func sizeOfValueBloated(v any) (uint64, acceptorKind, error) {
switch t := v.(type) {
case bool, *bool, **bool:
return 1, fixedSize, nil
case int8, *int8, **int8,
uint8, *uint8, **uint8:
return 1, fixedSize, nil
case int16, *int16, **int16,
uint16, *uint16, **uint16:
return 2, fixedSize, nil
case int32, *int32, **int32,
uint32, *uint32, **uint32,
float32, *float32, **float32:
return 4, fixedSize, nil
case int64, *int64, **int64,
uint64, *uint64, **uint64,
float64, *float64, **float64:
return 8, fixedSize, nil
case time.Time, *time.Time, **time.Time:
// time.Time and all its derivatives consist of two 8-byte ints:
// https://cs.opensource.google/go/go/+/refs/tags/go1.21.4:src/time/time.go;l=141-142
// Location is ignored.
return 16, fixedSize, nil
case []byte:
return uint64(len(t)), variableSize, nil
case *[]byte:
if t == nil {
return 0, variableSize, nil
}

return uint64(len(*t)), variableSize, nil
case **[]byte:
if t == nil || *t == nil {
return 0, variableSize, nil
}

return uint64(len(**t)), variableSize, nil
case string:
return uint64(len(t)), variableSize, nil
case *string:
if t == nil {
return 0, variableSize, nil
}

return uint64(len(*t)), variableSize, nil
case **string:
if t == nil || *t == nil {
return 0, variableSize, nil
}

return uint64(len(**t)), variableSize, nil
case *pgtype.Bool:
return 1, fixedSize, nil
case *pgtype.Int2:
return 2, fixedSize, nil
case *pgtype.Int4:
return 4, fixedSize, nil
case *pgtype.Int8:
return 8, fixedSize, nil
case *pgtype.Float4:
return 4, fixedSize, nil
case *pgtype.Float8:
return 8, fixedSize, nil
case *pgtype.Text:
return uint64(len(t.String)), variableSize, nil
case *pgtype.Date:
return 16, fixedSize, nil
case *pgtype.Timestamp:
return 16, fixedSize, nil
default:
return 0, 0, fmt.Errorf("value %v of unexpected data type %T: %w", t, t, common.ErrDataTypeNotSupported)
}
}
76 changes: 52 additions & 24 deletions app/server/paging/size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,52 @@ import (
"time"

"github.com/stretchr/testify/require"

"github.com/ydb-platform/fq-connector-go/library/go/ptr"
)

type sizeFn func(any) (uint64, acceptorKind, error)

var sizeFns = map[string]sizeFn{
"sizeOfValueReflection": sizeOfValueReflection,
"sizeOfValueBloated": sizeOfValueBloated,
}

type testCaseSize[Type any] struct {
value Type
expectedSize uint64
expectedKind acceptorKind
}

func (tc testCaseSize[Type]) execute(t *testing.T) {
name := reflect.TypeOf(tc.value).Name()

t.Run(name, func(t *testing.T) {
x0 := tc.value
x1 := new(Type)
*x1 = x0
x2 := new(*Type)
*x2 = x1

size0, kind0, err := sizeOfValue(x0)
require.NoError(t, err)
require.Equal(t, size0, tc.expectedSize)
require.Equal(t, kind0, tc.expectedKind)

size1, kind1, err := sizeOfValue(x1)
require.NoError(t, err)
require.Equal(t, size1, tc.expectedSize)
require.Equal(t, kind1, tc.expectedKind)

size2, kind2, err := sizeOfValue(x2)
require.NoError(t, err)
require.Equal(t, size2, tc.expectedSize)
require.Equal(t, kind2, tc.expectedKind)
})
typeName := reflect.TypeOf(tc.value).Name()

for fnName, fn := range sizeFns {
fnName, fn := fnName, fn

t.Run(fnName+"_"+typeName, func(t *testing.T) {
x0 := tc.value
x1 := new(Type)
*x1 = x0
x2 := new(*Type)
*x2 = x1

size0, kind0, err := fn(x0)
require.NoError(t, err)
require.Equal(t, size0, tc.expectedSize)
require.Equal(t, kind0, tc.expectedKind)

size1, kind1, err := fn(x1)
require.NoError(t, err)
require.Equal(t, size1, tc.expectedSize)
require.Equal(t, kind1, tc.expectedKind)

size2, kind2, err := fn(x2)
require.NoError(t, err)
require.Equal(t, size2, tc.expectedSize)
require.Equal(t, kind2, tc.expectedKind)
})
}
}

func TestSize(t *testing.T) {
Expand Down Expand Up @@ -68,3 +81,18 @@ func TestSize(t *testing.T) {
tc.execute(t)
}
}

func BenchmarkSizeOfValue(b *testing.B) {
for fnName, fn := range sizeFns {
b.Run(fnName, func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, _, _ = fn(true)
_, _, _ = fn(ptr.Bool(true))
_, _, _ = fn(int64(123))
_, _, _ = fn(ptr.Int64(123))
_, _, _ = fn(string("abcde"))
_, _, _ = fn(ptr.String("abcde"))
}
})
}
}
7 changes: 7 additions & 0 deletions common/arrow_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,14 @@ func ydbTypeToArrowBuilder(typeID Ydb.Type_PrimitiveTypeId, arrowAllocator memor
builder = array.NewFloat64Builder(arrowAllocator)
case Ydb.Type_STRING:
builder = array.NewBinaryBuilder(arrowAllocator, arrow.BinaryTypes.Binary)
// TODO: find more reasonable constant, maybe make dependency on paging settings
builder.(*array.BinaryBuilder).ReserveData(1 << 20)
case Ydb.Type_UTF8:
// TODO: what about LargeString?
// https://arrow.apache.org/docs/cpp/api/datatype.html#_CPPv4N5arrow4Type4type12LARGE_STRINGE
builder = array.NewStringBuilder(arrowAllocator)
// TODO: find more reasonable constant, maybe make dependency on paging settings
builder.(*array.StringBuilder).ReserveData(1 << 20)
case Ydb.Type_DATE:
builder = array.NewUint16Builder(arrowAllocator)
case Ydb.Type_DATETIME:
Expand All @@ -164,6 +168,9 @@ func ydbTypeToArrowBuilder(typeID Ydb.Type_PrimitiveTypeId, arrowAllocator memor
return nil, fmt.Errorf("register type '%v': %w", typeID, ErrDataTypeNotSupported)
}

// TODO: find more reasonable constant, maybe make dependency on paging settings
builder.Reserve(1 << 15)

return builder, nil
}

Expand Down

0 comments on commit 0927edb

Please sign in to comment.