From dddf6549d59efe204e6bb5ea4c3bea2cf3cfa10f Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Wed, 31 Jan 2024 14:20:50 +0300 Subject: [PATCH 1/3] Optimize DateToString converter --- app/bench/analyze.py | 68 ++++++++++---- .../rdbms/clickhouse/type_mapper.go | 8 +- .../rdbms/postgresql/type_mapper.go | 53 ++++++++++- .../datasource/rdbms/ydb/type_mapper.go | 91 ++++++++++++++----- app/server/service_pprof.go | 2 +- app/server/utils/converters.go | 74 ++++++++++++++- app/server/utils/converters_test.go | 65 +++++++++++++ 7 files changed, 305 insertions(+), 56 deletions(-) create mode 100644 app/server/utils/converters_test.go diff --git a/app/bench/analyze.py b/app/bench/analyze.py index 6b5a6172..cc6f6945 100755 --- a/app/bench/analyze.py +++ b/app/bench/analyze.py @@ -1,8 +1,9 @@ #!/usr/bin/env python3 - +from typing import Dict from pathlib import Path import json +import matplotlib import matplotlib.pyplot as plt import pandas as pd @@ -15,39 +16,66 @@ def read_report(path: Path) -> pd.Series: "bytes_internal_rate": data["bytes_internal_rate"], "bytes_arrow_rate": data["bytes_arrow_rate"], "rows_rate": data["rows_rate"], + "cpu_utilization": data["cpu_utilization"], } ) -def make_dataframe(result_dir: Path) -> pd.DataFrame: - series = [read_report(path) for path in result_dir.glob("*.json")] - return pd.DataFrame(series) +def make_dataframe(result_dirs: Dict[str, Path]) -> pd.DataFrame: + dfs = [] + for key, result_dir in result_dirs.items(): + series = [read_report(path) for path in result_dir.glob("*.json")] + df = pd.DataFrame(series).sort_values("columns") + df["key"] = key + dfs.append(df) + + return pd.concat(dfs) + +def draw_subplot( + df_: pd.DataFrame, label: str, y_column: str, ax: matplotlib.figure.Figure +) -> matplotlib.figure.Figure: + ax.set_ylabel(label) + ax.set_xlabel("Number of columns to SELECT") -def draw_plot(df: pd.DataFrame, result_dir: Path) -> pd.Series: - fig, ax1 = plt.subplots() + keys = { + "baseline": "red", + "optimized": "blue", + } - ax1.set_xlabel("Number of columns in SELECT") - ax1.set_ylabel("Throughput, MB/sec", color="red") - ax1.scatter(df["columns"], df["bytes_internal_rate"], color="red") - ax1.tick_params(axis="y", labelcolor="red") + for key, color in keys.items(): + df = df_.loc[df_["key"] == key] + ax.plot(df["columns"], df[y_column], color=color, label=key) - ax2 = ax1.twinx() + return ax - ax2.set_ylabel("Throughput, rows/sec", color="blue") - ax2.scatter(df["columns"], df["rows_rate"], color="blue") - ax2.tick_params(axis="y", labelcolor="blue") - fig.savefig(result_dir.joinpath("report.png")) +def draw_plot(df: pd.DataFrame) -> pd.Series: + fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(12, 4)) + fig.subplots_adjust(bottom=0.25, wspace=0.5) + draw_subplot(df, "Throughput, MB/sec", "bytes_internal_rate", axes[0]) + draw_subplot(df, "Throughput, rows/sec", "rows_rate", axes[1]) + ax = draw_subplot(df, "CPU Utilization, %", "cpu_utilization", axes[2]) + + handles, labels = ax.get_legend_handles_labels() + fig.legend(handles, labels, loc="lower right") + fig.suptitle("Reading TPC-H S-10 Lineitem from PostgreSQL", fontsize=14) + + fig.savefig("report.png") def main(): - result_dir = Path( - "/home/vitalyisaev/projects/fq-connector-go/scripts/bench/postgresql/results/columns/" - ) - df = make_dataframe(result_dir) + result_dirs = { + "baseline": Path( + "/home/vitalyisaev/projects/fq-connector-go/scripts/bench/postgresql/results/columns_baseline/" + ), + "optimized": Path( + "/home/vitalyisaev/projects/fq-connector-go/scripts/bench/postgresql/results/columns/" + ), + } + df = make_dataframe(result_dirs) print(df) - draw_plot(df, result_dir) + draw_plot(df) if __name__ == "__main__": diff --git a/app/server/datasource/rdbms/clickhouse/type_mapper.go b/app/server/datasource/rdbms/clickhouse/type_mapper.go index 693e0d32..ec97b84d 100644 --- a/app/server/datasource/rdbms/clickhouse/type_mapper.go +++ b/app/server/datasource/rdbms/clickhouse/type_mapper.go @@ -281,7 +281,7 @@ var ( maxClickHouseDatetime64 = time.Date(2299, time.December, 31, 23, 59, 59, 99999999, time.UTC) ) -func saturateDateTime(in, min, max time.Time) time.Time { +func saturateDateTime(in, min, max time.Time) *time.Time { if in.Before(min) { in = min } @@ -290,19 +290,19 @@ func saturateDateTime(in, min, max time.Time) time.Time { in = max } - return in + return &in } type dateToStringConverter struct{} func (dateToStringConverter) Convert(in time.Time) (string, error) { - return utils.DateToStringConverter{}.Convert(saturateDateTime(in, minClickHouseDate, maxClickHouseDate)) + return utils.DateToStringConverterV2{}.Convert(saturateDateTime(in, minClickHouseDate, maxClickHouseDate)) } type date32ToStringConverter struct{} func (date32ToStringConverter) Convert(in time.Time) (string, error) { - return utils.DateToStringConverter{}.Convert(saturateDateTime(in, minClickHouseDate32, maxClickHouseDate32)) + return utils.DateToStringConverterV2{}.Convert(saturateDateTime(in, minClickHouseDate32, maxClickHouseDate32)) } type dateTimeToStringConverter struct{} diff --git a/app/server/datasource/rdbms/postgresql/type_mapper.go b/app/server/datasource/rdbms/postgresql/type_mapper.go index c377a106..38f9ad28 100644 --- a/app/server/datasource/rdbms/postgresql/type_mapper.go +++ b/app/server/datasource/rdbms/postgresql/type_mapper.go @@ -158,8 +158,9 @@ func transformerFromOIDs(oids []uint32, ydbTypes []*Ydb.Type) (paging.RowTransfo appenders = append(appenders, func(acceptor any, builder array.Builder) error { cast := acceptor.(*pgtype.Date) - return appendValueToArrowBuilder[time.Time, string, *array.StringBuilder, utils.DateToStringConverter]( - cast.Time, builder, cast.Valid) + return appendValuePtrToArrowBuilder[ + time.Time, string, *array.StringBuilder, utils.DateToStringConverterV2]( + &cast.Time, builder, cast.Valid) }) case Ydb.Type_DATE: appenders = append(appenders, func(acceptor any, builder array.Builder) error { @@ -184,11 +185,11 @@ func transformerFromOIDs(oids []uint32, ydbTypes []*Ydb.Type) (paging.RowTransfo appenders = append(appenders, func(acceptor any, builder array.Builder) error { cast := acceptor.(*pgtype.Timestamp) - return appendValueToArrowBuilder[ + return appendValuePtrToArrowBuilder[ time.Time, string, *array.StringBuilder, - utils.TimestampToStringConverter](cast.Time, builder, cast.Valid) + utils.TimestampToStringConverter](&cast.Time, builder, cast.Valid) }) case Ydb.Type_TIMESTAMP: appenders = append(appenders, func(acceptor any, builder array.Builder) error { @@ -208,7 +209,12 @@ func transformerFromOIDs(oids []uint32, ydbTypes []*Ydb.Type) (paging.RowTransfo return paging.NewRowTransformer[any](acceptors, appenders, nil), nil } -func appendValueToArrowBuilder[IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT], CONV utils.ValueConverter[IN, OUT]]( +func appendValueToArrowBuilder[ + IN common.ValueType, + OUT common.ValueType, + AB common.ArrowBuilder[OUT], + CONV utils.ValueConverter[IN, OUT], +]( value any, builder array.Builder, valid bool, @@ -240,4 +246,41 @@ func appendValueToArrowBuilder[IN common.ValueType, OUT common.ValueType, AB com return nil } +func appendValuePtrToArrowBuilder[ + IN common.ValueType, + OUT common.ValueType, + AB common.ArrowBuilder[OUT], + CONV utils.ValuePtrConverter[IN, OUT], +]( + value any, + builder array.Builder, + valid bool, +) error { + if !valid { + builder.AppendNull() + + return nil + } + + cast := value.(*IN) + + var converter CONV + + out, err := converter.Convert(cast) + if err != nil { + if errors.Is(err, common.ErrValueOutOfTypeBounds) { + // TODO: logger ? + builder.AppendNull() + + return nil + } + + return fmt.Errorf("convert value: %w", err) + } + + builder.(AB).Append(out) + + return nil +} + func NewTypeMapper() datasource.TypeMapper { return typeMapper{} } diff --git a/app/server/datasource/rdbms/ydb/type_mapper.go b/app/server/datasource/rdbms/ydb/type_mapper.go index 22e3fdf0..83d4bd8f 100644 --- a/app/server/datasource/rdbms/ydb/type_mapper.go +++ b/app/server/datasource/rdbms/ydb/type_mapper.go @@ -108,19 +108,24 @@ func makePrimitiveTypeFromString(typeName string, rules *api_service_protos.TTyp } } -func appendValueToArrowBuilder[IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT], CONV utils.ValueConverter[IN, OUT]]( +func appendToBuilderWithValueConverter[ + IN common.ValueType, + OUT common.ValueType, + AB common.ArrowBuilder[OUT], + CONV utils.ValueConverter[IN, OUT], +]( acceptor any, builder array.Builder, ) error { - cast := acceptor.(**IN) + doublePtr := acceptor.(**IN) - if *cast == nil { + if *doublePtr == nil { builder.AppendNull() return nil } - value := **cast + value := **doublePtr var converter CONV @@ -142,6 +147,44 @@ func appendValueToArrowBuilder[IN common.ValueType, OUT common.ValueType, AB com return nil } +func appendToBuilderWithValuePtrConverter[ + IN common.ValueType, + OUT common.ValueType, + AB common.ArrowBuilder[OUT], + CONV utils.ValuePtrConverter[IN, OUT], +]( + acceptor any, + builder array.Builder, +) error { + doublePtr := acceptor.(**IN) + + ptr := *doublePtr + if ptr == nil { + builder.AppendNull() + + return nil + } + + var converter CONV + + out, err := converter.Convert(ptr) + if err != nil { + if errors.Is(err, common.ErrValueOutOfTypeBounds) { + // TODO: write warning to logger + builder.AppendNull() + + return nil + } + + return fmt.Errorf("convert value %v: %w", ptr, err) + } + + //nolint:forcetypeassert + builder.(AB).Append(out) + + return nil +} + func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type) (paging.RowTransformer[any], error) { acceptors := make([]any, 0, len(typeNames)) appenders := make([]func(acceptor any, builder array.Builder) error, 0, len(typeNames)) @@ -175,37 +218,37 @@ func makeAcceptorAndAppenderFromSQLType( ) (any, func(acceptor any, builder array.Builder) error, error) { switch typeName { case BoolType: - return new(*bool), appendValueToArrowBuilder[bool, uint8, *array.Uint8Builder, utils.BoolConverter], nil + return new(*bool), appendToBuilderWithValueConverter[bool, uint8, *array.Uint8Builder, utils.BoolConverter], nil case Int8Type: - return new(*int8), appendValueToArrowBuilder[int8, int8, *array.Int8Builder, utils.Int8Converter], nil + return new(*int8), appendToBuilderWithValueConverter[int8, int8, *array.Int8Builder, utils.Int8Converter], nil case Int16Type: - return new(*int16), appendValueToArrowBuilder[int16, int16, *array.Int16Builder, utils.Int16Converter], nil + return new(*int16), appendToBuilderWithValueConverter[int16, int16, *array.Int16Builder, utils.Int16Converter], nil case Int32Type: - return new(*int32), appendValueToArrowBuilder[int32, int32, *array.Int32Builder, utils.Int32Converter], nil + return new(*int32), appendToBuilderWithValueConverter[int32, int32, *array.Int32Builder, utils.Int32Converter], nil case Int64Type: - return new(*int64), appendValueToArrowBuilder[int64, int64, *array.Int64Builder, utils.Int64Converter], nil + return new(*int64), appendToBuilderWithValueConverter[int64, int64, *array.Int64Builder, utils.Int64Converter], nil case Uint8Type: - return new(*uint8), appendValueToArrowBuilder[uint8, uint8, *array.Uint8Builder, utils.Uint8Converter], nil + return new(*uint8), appendToBuilderWithValueConverter[uint8, uint8, *array.Uint8Builder, utils.Uint8Converter], nil case Uint16Type: - return new(*uint16), appendValueToArrowBuilder[uint16, uint16, *array.Uint16Builder, utils.Uint16Converter], nil + return new(*uint16), appendToBuilderWithValueConverter[uint16, uint16, *array.Uint16Builder, utils.Uint16Converter], nil case Uint32Type: - return new(*uint32), appendValueToArrowBuilder[uint32, uint32, *array.Uint32Builder, utils.Uint32Converter], nil + return new(*uint32), appendToBuilderWithValueConverter[uint32, uint32, *array.Uint32Builder, utils.Uint32Converter], nil case Uint64Type: - return new(*uint64), appendValueToArrowBuilder[uint64, uint64, *array.Uint64Builder, utils.Uint64Converter], nil + return new(*uint64), appendToBuilderWithValueConverter[uint64, uint64, *array.Uint64Builder, utils.Uint64Converter], nil case FloatType: - return new(*float32), appendValueToArrowBuilder[float32, float32, *array.Float32Builder, utils.Float32Converter], nil + return new(*float32), appendToBuilderWithValueConverter[float32, float32, *array.Float32Builder, utils.Float32Converter], nil case DoubleType: - return new(*float64), appendValueToArrowBuilder[float64, float64, *array.Float64Builder, utils.Float64Converter], nil + return new(*float64), appendToBuilderWithValueConverter[float64, float64, *array.Float64Builder, utils.Float64Converter], nil case StringType: - return new(*[]byte), appendValueToArrowBuilder[[]byte, []byte, *array.BinaryBuilder, utils.BytesConverter], nil + return new(*[]byte), appendToBuilderWithValueConverter[[]byte, []byte, *array.BinaryBuilder, utils.BytesConverter], nil case Utf8Type: - return new(*string), appendValueToArrowBuilder[string, string, *array.StringBuilder, utils.StringConverter], nil + return new(*string), appendToBuilderWithValueConverter[string, string, *array.StringBuilder, utils.StringConverter], nil case DateType: switch ydbTypeID { case Ydb.Type_DATE: - return new(*time.Time), appendValueToArrowBuilder[time.Time, uint16, *array.Uint16Builder, utils.DateConverter], nil + return new(*time.Time), appendToBuilderWithValueConverter[time.Time, uint16, *array.Uint16Builder, utils.DateConverter], nil case Ydb.Type_UTF8: - return new(*time.Time), appendValueToArrowBuilder[time.Time, string, *array.StringBuilder, utils.DateToStringConverter], nil + return new(*time.Time), appendToBuilderWithValuePtrConverter[time.Time, string, *array.StringBuilder, utils.DateToStringConverterV2], nil default: return nil, nil, fmt.Errorf("unexpected ydb type id %v with sql type %s: %w", ydbTypeID, typeName, common.ErrDataTypeNotSupported) @@ -213,9 +256,11 @@ func makeAcceptorAndAppenderFromSQLType( case DatetimeType: switch ydbTypeID { case Ydb.Type_DATETIME: - return new(*time.Time), appendValueToArrowBuilder[time.Time, uint32, *array.Uint32Builder, utils.DatetimeConverter], nil + return new(*time.Time), appendToBuilderWithValueConverter[ + time.Time, uint32, *array.Uint32Builder, utils.DatetimeConverter], nil case Ydb.Type_UTF8: - return new(*time.Time), appendValueToArrowBuilder[time.Time, string, *array.StringBuilder, utils.DatetimeToStringConverter], nil + return new(*time.Time), appendToBuilderWithValuePtrConverter[ + time.Time, string, *array.StringBuilder, utils.DatetimeToStringConverter], nil default: return nil, nil, fmt.Errorf("unexpected ydb type id %v with sql type %s: %w", ydbTypeID, typeName, common.ErrDataTypeNotSupported) @@ -223,10 +268,10 @@ func makeAcceptorAndAppenderFromSQLType( case TimestampType: switch ydbTypeID { case Ydb.Type_TIMESTAMP: - return new(*time.Time), appendValueToArrowBuilder[time.Time, uint64, *array.Uint64Builder, utils.TimestampConverter], nil + return new(*time.Time), appendToBuilderWithValueConverter[time.Time, uint64, *array.Uint64Builder, utils.TimestampConverter], nil case Ydb.Type_UTF8: return new(*time.Time), - appendValueToArrowBuilder[time.Time, string, *array.StringBuilder, utils.TimestampToStringConverter], nil + appendToBuilderWithValuePtrConverter[time.Time, string, *array.StringBuilder, utils.TimestampToStringConverter], nil default: return nil, nil, fmt.Errorf("unexpected ydb type id %v with sql type %s: %w", ydbTypeID, typeName, common.ErrDataTypeNotSupported) diff --git a/app/server/service_pprof.go b/app/server/service_pprof.go index da4f6ad5..f2671e7f 100644 --- a/app/server/service_pprof.go +++ b/app/server/service_pprof.go @@ -35,7 +35,7 @@ func (s *servicePprof) stop() { defer cancel() err := s.httpServer.Shutdown(ctx) - if err != nil { + if err != nil && err != ctx.Err() { s.logger.Error("shutdown http server", zap.Error(err)) } } diff --git a/app/server/utils/converters.go b/app/server/utils/converters.go index a8e25bbc..94b2f4ec 100644 --- a/app/server/utils/converters.go +++ b/app/server/utils/converters.go @@ -3,6 +3,7 @@ package utils import ( "fmt" "time" + "unsafe" "github.com/ydb-platform/fq-connector-go/common" ) @@ -11,6 +12,10 @@ type ValueConverter[IN common.ValueType, OUT common.ValueType] interface { Convert(in IN) (OUT, error) } +type ValuePtrConverter[IN common.ValueType, OUT common.ValueType] interface { + Convert(in *IN) (OUT, error) +} + type BoolConverter struct{} func (BoolConverter) Convert(in bool) (uint8, error) { @@ -89,10 +94,73 @@ func (DateConverter) Convert(in time.Time) (uint16, error) { type DateToStringConverter struct{} -func (DateToStringConverter) Convert(in time.Time) (string, error) { +func (DateToStringConverter) Convert(in *time.Time) (string, error) { return in.Format("2006-01-02"), nil } +func absInt(x int) int { + if x < 0 { + return -x + } + + return x +} + +//go:linkname decomposeDate time.(*Time).date +func decomposeDate(*time.Time, bool) (year int, month int, day int, dayOfYear int) + +//go:linkname formatBits strconv.formatBits +func formatBits([]byte, uint64, int, bool, bool) (b []byte, s string) + +type DateToStringConverterV2 struct{} + +func (DateToStringConverterV2) Convert(in *time.Time) (string, error) { + buf := make([]byte, 0, 11) + + year, month, day, _ := decomposeDate(in, true) + + // year + + if year < 0 { + buf = append(buf, byte('-')) + } + + absYear := absInt(year) + + switch { + case absYear < 10: + buf = append(buf, []byte("000")...) + case absYear < 100: + buf = append(buf, []byte("00")...) + case absYear < 1000: + buf = append(buf, byte('0')) + } + + buf, _ = formatBits(buf, uint64(absYear), 10, false, true) + + // month + + buf = append(buf, byte('-')) + if month < 10 { + buf = append(buf, byte('0')) + } + + buf, _ = formatBits(buf, uint64(month), 10, false, true) + + // day + + buf = append(buf, byte('-')) + if day < 10 { + buf = append(buf, byte('0')) + } + + buf, _ = formatBits(buf, uint64(day), 10, false, true) + + p := unsafe.SliceData(buf) + + return unsafe.String(p, len(buf)), nil +} + type DatetimeConverter struct{} func (DatetimeConverter) Convert(in time.Time) (uint32, error) { @@ -107,7 +175,7 @@ func (DatetimeConverter) Convert(in time.Time) (uint32, error) { type DatetimeToStringConverter struct{} -func (DatetimeToStringConverter) Convert(in time.Time) (string, error) { +func (DatetimeToStringConverter) Convert(in *time.Time) (string, error) { return in.UTC().Format("2006-01-02T15:04:05Z"), nil } @@ -125,7 +193,7 @@ func (TimestampConverter) Convert(in time.Time) (uint64, error) { type TimestampToStringConverter struct{} -func (TimestampToStringConverter) Convert(in time.Time) (string, error) { +func (TimestampToStringConverter) Convert(in *time.Time) (string, error) { // Using accuracy of 9 decimal places is enough for supported data sources // Max accuracy of date/time formats: // PostgreSQL - 1 microsecond (10^-6 s) diff --git a/app/server/utils/converters_test.go b/app/server/utils/converters_test.go new file mode 100644 index 00000000..045e09fc --- /dev/null +++ b/app/server/utils/converters_test.go @@ -0,0 +1,65 @@ +package utils + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestDateToStringConverter(t *testing.T) { + type testCase struct { + in time.Time + } + + testCases := []testCase{ + { + in: time.Date(1950, 5, 27, 0, 0, 0, 0, time.UTC), + }, + { + in: time.Date(-1, 1, 1, 0, 0, 0, 0, time.UTC), + }, + } + + const format = "2006-01-02" + + var converter DateToStringConverterV2 + + for _, tc := range testCases { + tc := tc + t.Run(tc.in.Format(format), func(t *testing.T) { + actual, err := converter.Convert(&tc.in) + require.NoError(t, err) + // check equivalence + require.Equal(t, tc.in.Format(format), actual) + }) + } +} + +func BenchmarkDateToStringConverter(b *testing.B) { + t := time.Now() + + b.Run("V1", func(b *testing.B) { + var converter DateToStringConverter + + for i := 0; i < b.N; i++ { + out, err := converter.Convert(&t) + if err != nil { + b.Fatal(err) + } + _ = out + } + }) + + b.Run("V2", func(b *testing.B) { + var converter DateToStringConverterV2 + + for i := 0; i < b.N; i++ { + out, err := converter.Convert(&t) + if err != nil { + b.Fatal(err) + } + _ = out + } + }) +} From ad40b42cb538bdb5e4821c3d00fa147fcff2431c Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Thu, 1 Feb 2024 14:48:04 +0300 Subject: [PATCH 2/3] Configurable converters (special flag for unsafe converters) --- .gitignore | 2 + api/common/data_source.pb.go | 4 +- api/common/endpoint.pb.go | 4 +- api/service/connector.pb.go | 4 +- api/service/connector_grpc.pb.go | 21 +- api/service/protos/connector.pb.go | 4 +- app/bench/test_case_runner.go | 5 + app/config/client.pb.go | 4 +- app/config/server.pb.go | 239 ++++++++++++------ app/config/server.proto | 11 +- app/server/config.go | 18 ++ app/server/conversion/converters_default.go | 121 +++++++++ .../{utils => conversion}/converters_test.go | 12 +- app/server/conversion/converters_unsafe.go | 79 ++++++ app/server/conversion/factory.go | 13 + app/server/conversion/interface.go | 38 +++ app/server/data_source_collection.go | 4 +- .../rdbms/clickhouse/connection_manager.go | 7 +- .../rdbms/clickhouse/type_mapper.go | 99 +++++--- app/server/datasource/rdbms/data_source.go | 26 +- .../datasource/rdbms/data_source_factory.go | 20 +- .../datasource/rdbms/data_source_test.go | 7 +- .../rdbms/postgresql/connection_manager.go | 5 +- .../rdbms/postgresql/type_mapper.go | 49 ++-- app/server/datasource/rdbms/utils/sql.go | 3 +- app/server/datasource/rdbms/utils/sql_mock.go | 3 +- .../rdbms/ydb/connection_manager.go | 5 +- .../datasource/rdbms/ydb/type_mapper.go | 139 +++++----- app/server/embedded.go | 12 + app/server/service_connector.go | 2 + app/server/streaming/streamer_test.go | 5 +- app/server/utils/converters.go | 203 --------------- example.conf | 4 + scripts/bench/postgresql_datetime.txt | 43 ++++ tests/suite/suite.go | 19 +- 35 files changed, 747 insertions(+), 487 deletions(-) create mode 100644 app/server/conversion/converters_default.go rename app/server/{utils => conversion}/converters_test.go (80%) create mode 100644 app/server/conversion/converters_unsafe.go create mode 100644 app/server/conversion/factory.go create mode 100644 app/server/conversion/interface.go delete mode 100644 app/server/utils/converters.go create mode 100644 scripts/bench/postgresql_datetime.txt diff --git a/.gitignore b/.gitignore index ca8cb737..5504b8b2 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ fq-connector-go-tests coverage* scripts/bench/postgresql __pycache__ +*.png +*.test diff --git a/api/common/data_source.pb.go b/api/common/data_source.pb.go index cc44a66f..5df8540f 100644 --- a/api/common/data_source.pb.go +++ b/api/common/data_source.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.31.0 -// protoc v4.25.2 +// protoc-gen-go v1.28.1 +// protoc v3.12.4 // source: ydb/library/yql/providers/generic/connector/api/common/data_source.proto package common diff --git a/api/common/endpoint.pb.go b/api/common/endpoint.pb.go index d548c938..75c8a32f 100644 --- a/api/common/endpoint.pb.go +++ b/api/common/endpoint.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.31.0 -// protoc v4.25.2 +// protoc-gen-go v1.28.1 +// protoc v3.12.4 // source: ydb/library/yql/providers/generic/connector/api/common/endpoint.proto package common diff --git a/api/service/connector.pb.go b/api/service/connector.pb.go index 759e889c..e46bf487 100644 --- a/api/service/connector.pb.go +++ b/api/service/connector.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.31.0 -// protoc v4.25.2 +// protoc-gen-go v1.28.1 +// protoc v3.12.4 // source: ydb/library/yql/providers/generic/connector/api/service/connector.proto package service diff --git a/api/service/connector_grpc.pb.go b/api/service/connector_grpc.pb.go index 58e80fef..72d569d6 100644 --- a/api/service/connector_grpc.pb.go +++ b/api/service/connector_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.3.0 -// - protoc v4.25.2 +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.12.4 // source: ydb/library/yql/providers/generic/connector/api/service/connector.proto package service @@ -19,13 +19,6 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 -const ( - Connector_ListTables_FullMethodName = "/NYql.NConnector.NApi.Connector/ListTables" - Connector_DescribeTable_FullMethodName = "/NYql.NConnector.NApi.Connector/DescribeTable" - Connector_ListSplits_FullMethodName = "/NYql.NConnector.NApi.Connector/ListSplits" - Connector_ReadSplits_FullMethodName = "/NYql.NConnector.NApi.Connector/ReadSplits" -) - // ConnectorClient is the client API for Connector service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -50,7 +43,7 @@ func NewConnectorClient(cc grpc.ClientConnInterface) ConnectorClient { } func (c *connectorClient) ListTables(ctx context.Context, in *protos.TListTablesRequest, opts ...grpc.CallOption) (Connector_ListTablesClient, error) { - stream, err := c.cc.NewStream(ctx, &Connector_ServiceDesc.Streams[0], Connector_ListTables_FullMethodName, opts...) + stream, err := c.cc.NewStream(ctx, &Connector_ServiceDesc.Streams[0], "/NYql.NConnector.NApi.Connector/ListTables", opts...) if err != nil { return nil, err } @@ -83,7 +76,7 @@ func (x *connectorListTablesClient) Recv() (*protos.TListTablesResponse, error) func (c *connectorClient) DescribeTable(ctx context.Context, in *protos.TDescribeTableRequest, opts ...grpc.CallOption) (*protos.TDescribeTableResponse, error) { out := new(protos.TDescribeTableResponse) - err := c.cc.Invoke(ctx, Connector_DescribeTable_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/NYql.NConnector.NApi.Connector/DescribeTable", in, out, opts...) if err != nil { return nil, err } @@ -91,7 +84,7 @@ func (c *connectorClient) DescribeTable(ctx context.Context, in *protos.TDescrib } func (c *connectorClient) ListSplits(ctx context.Context, in *protos.TListSplitsRequest, opts ...grpc.CallOption) (Connector_ListSplitsClient, error) { - stream, err := c.cc.NewStream(ctx, &Connector_ServiceDesc.Streams[1], Connector_ListSplits_FullMethodName, opts...) + stream, err := c.cc.NewStream(ctx, &Connector_ServiceDesc.Streams[1], "/NYql.NConnector.NApi.Connector/ListSplits", opts...) if err != nil { return nil, err } @@ -123,7 +116,7 @@ func (x *connectorListSplitsClient) Recv() (*protos.TListSplitsResponse, error) } func (c *connectorClient) ReadSplits(ctx context.Context, in *protos.TReadSplitsRequest, opts ...grpc.CallOption) (Connector_ReadSplitsClient, error) { - stream, err := c.cc.NewStream(ctx, &Connector_ServiceDesc.Streams[2], Connector_ReadSplits_FullMethodName, opts...) + stream, err := c.cc.NewStream(ctx, &Connector_ServiceDesc.Streams[2], "/NYql.NConnector.NApi.Connector/ReadSplits", opts...) if err != nil { return nil, err } @@ -230,7 +223,7 @@ func _Connector_DescribeTable_Handler(srv interface{}, ctx context.Context, dec } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: Connector_DescribeTable_FullMethodName, + FullMethod: "/NYql.NConnector.NApi.Connector/DescribeTable", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(ConnectorServer).DescribeTable(ctx, req.(*protos.TDescribeTableRequest)) diff --git a/api/service/protos/connector.pb.go b/api/service/protos/connector.pb.go index ce4e8cae..a2a188ed 100644 --- a/api/service/protos/connector.pb.go +++ b/api/service/protos/connector.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.31.0 -// protoc v4.25.2 +// protoc-gen-go v1.28.1 +// protoc v3.12.4 // source: ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto package protos diff --git a/app/bench/test_case_runner.go b/app/bench/test_case_runner.go index 4a990113..688b24fe 100644 --- a/app/bench/test_case_runner.go +++ b/app/bench/test_case_runner.go @@ -36,6 +36,11 @@ func newTestCaseRunner( server.WithPprofServerConfig(&config.TPprofServerConfig{ Endpoint: &api_common.TEndpoint{Host: "localhost", Port: 50052}, }), + server.WithConversionConfig( + &config.TConversionConfig{ + UseUnsafeConverters: true, + }, + ), ) if err != nil { diff --git a/app/config/client.pb.go b/app/config/client.pb.go index 87034002..1ee6adf3 100644 --- a/app/config/client.pb.go +++ b/app/config/client.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.31.0 -// protoc v4.25.2 +// protoc-gen-go v1.28.1 +// protoc v3.12.4 // source: app/config/client.proto package config diff --git a/app/config/server.pb.go b/app/config/server.pb.go index 4fb05746..7f651a6e 100644 --- a/app/config/server.pb.go +++ b/app/config/server.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.31.0 -// protoc v4.25.2 +// protoc-gen-go v1.28.1 +// protoc v3.12.4 // source: app/config/server.proto package config @@ -88,9 +88,9 @@ type TServerConfig struct { // TODO: remove it after YQ-2057 // - // Deprecated: Marked as deprecated in app/config/server.proto. + // Deprecated: Do not use. Endpoint *common.TEndpoint `protobuf:"bytes,1,opt,name=endpoint,proto3" json:"endpoint,omitempty"` - // Deprecated: Marked as deprecated in app/config/server.proto. + // Deprecated: Do not use. Tls *TServerTLSConfig `protobuf:"bytes,2,opt,name=tls,proto3" json:"tls,omitempty"` ConnectorServer *TConnectorServerConfig `protobuf:"bytes,5,opt,name=connector_server,json=connectorServer,proto3" json:"connector_server,omitempty"` // This is a rough restriction for YQ memory consumption until @@ -104,8 +104,10 @@ type TServerConfig struct { PprofServer *TPprofServerConfig `protobuf:"bytes,6,opt,name=pprof_server,json=pprofServer,proto3" json:"pprof_server,omitempty"` // Metrics server config MetricsServer *TMetricsServerConfig `protobuf:"bytes,7,opt,name=metrics_server,json=metricsServer,proto3" json:"metrics_server,omitempty"` - // Paging settings + // Paging config Paging *TPagingConfig `protobuf:"bytes,8,opt,name=paging,proto3" json:"paging,omitempty"` + // Data types conversion config + Conversion *TConversionConfig `protobuf:"bytes,9,opt,name=conversion,proto3" json:"conversion,omitempty"` } func (x *TServerConfig) Reset() { @@ -140,7 +142,7 @@ func (*TServerConfig) Descriptor() ([]byte, []int) { return file_app_config_server_proto_rawDescGZIP(), []int{0} } -// Deprecated: Marked as deprecated in app/config/server.proto. +// Deprecated: Do not use. func (x *TServerConfig) GetEndpoint() *common.TEndpoint { if x != nil { return x.Endpoint @@ -148,7 +150,7 @@ func (x *TServerConfig) GetEndpoint() *common.TEndpoint { return nil } -// Deprecated: Marked as deprecated in app/config/server.proto. +// Deprecated: Do not use. func (x *TServerConfig) GetTls() *TServerTLSConfig { if x != nil { return x.Tls @@ -198,6 +200,13 @@ func (x *TServerConfig) GetPaging() *TPagingConfig { return nil } +func (x *TServerConfig) GetConversion() *TConversionConfig { + if x != nil { + return x.Conversion + } + return nil +} + // TConnectorServerConfig - configuration of the main GRPC server type TConnectorServerConfig struct { state protoimpl.MessageState @@ -612,6 +621,56 @@ func (x *TPagingConfig) GetPrefetchQueueCapacity() uint32 { return 0 } +// TConversionConfig configures some aspects of the data conversion process +// between the data source native type system, Go type system and Arrow type system +type TConversionConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Enables microoptimized code that can potentially crash the service + UseUnsafeConverters bool `protobuf:"varint,1,opt,name=use_unsafe_converters,json=useUnsafeConverters,proto3" json:"use_unsafe_converters,omitempty"` +} + +func (x *TConversionConfig) Reset() { + *x = TConversionConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_app_config_server_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TConversionConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TConversionConfig) ProtoMessage() {} + +func (x *TConversionConfig) ProtoReflect() protoreflect.Message { + mi := &file_app_config_server_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TConversionConfig.ProtoReflect.Descriptor instead. +func (*TConversionConfig) Descriptor() ([]byte, []int) { + return file_app_config_server_proto_rawDescGZIP(), []int{8} +} + +func (x *TConversionConfig) GetUseUnsafeConverters() bool { + if x != nil { + return x.UseUnsafeConverters + } + return false +} + var File_app_config_server_proto protoreflect.FileDescriptor var file_app_config_server_proto_rawDesc = []byte{ @@ -622,7 +681,7 @@ var file_app_config_server_proto_rawDesc = []byte{ 0x79, 0x2f, 0x79, 0x71, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x73, 0x2f, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, 0x2f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x65, 0x6e, 0x64, - 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xeb, 0x04, 0x0a, 0x0d, + 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xb9, 0x05, 0x0a, 0x0d, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3f, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x4e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, @@ -661,42 +720,13 @@ var file_app_config_server_proto_rawDesc = []byte{ 0x67, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x50, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x52, 0x06, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x22, 0x94, 0x01, 0x0a, 0x16, 0x54, 0x43, - 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3b, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x4e, 0x43, - 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x4e, 0x41, 0x70, 0x69, 0x2e, 0x54, 0x45, - 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, - 0x74, 0x12, 0x3d, 0x0a, 0x03, 0x74, 0x6c, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, - 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, - 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x53, 0x65, 0x72, 0x76, - 0x65, 0x72, 0x54, 0x4c, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x03, 0x74, 0x6c, 0x73, - 0x22, 0x3e, 0x0a, 0x10, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x54, 0x4c, 0x53, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x65, 0x72, 0x74, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x63, 0x65, 0x72, 0x74, 0x4a, 0x04, 0x08, 0x01, 0x10, 0x02, - 0x22, 0x26, 0x0a, 0x10, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x61, 0x64, 0x4c, - 0x69, 0x6d, 0x69, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x04, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x22, 0x8b, 0x01, 0x0a, 0x0d, 0x54, 0x4c, 0x6f, - 0x67, 0x67, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x41, 0x0a, 0x09, 0x6c, 0x6f, - 0x67, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x24, 0x2e, + 0x67, 0x52, 0x06, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x4c, 0x0a, 0x0a, 0x63, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, - 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x45, 0x4c, 0x6f, 0x67, 0x4c, 0x65, - 0x76, 0x65, 0x6c, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x37, 0x0a, - 0x18, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x71, 0x6c, 0x5f, 0x71, 0x75, 0x65, 0x72, - 0x79, 0x5f, 0x6c, 0x6f, 0x67, 0x67, 0x69, 0x6e, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, - 0x15, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x71, 0x6c, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4c, - 0x6f, 0x67, 0x67, 0x69, 0x6e, 0x67, 0x22, 0x90, 0x01, 0x0a, 0x12, 0x54, 0x50, 0x70, 0x72, 0x6f, - 0x66, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3b, 0x0a, - 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x1f, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x4e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, - 0x72, 0x2e, 0x4e, 0x41, 0x70, 0x69, 0x2e, 0x54, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, - 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x3d, 0x0a, 0x03, 0x74, 0x6c, - 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, - 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, - 0x66, 0x69, 0x67, 0x2e, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x54, 0x4c, 0x53, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x52, 0x03, 0x74, 0x6c, 0x73, 0x22, 0x92, 0x01, 0x0a, 0x14, 0x54, 0x4d, - 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, + 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x43, 0x6f, 0x6e, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0a, 0x63, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x94, 0x01, 0x0a, 0x16, 0x54, 0x43, 0x6f, 0x6e, + 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3b, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x4e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x4e, 0x41, 0x70, 0x69, 0x2e, 0x54, 0x45, 0x6e, 0x64, @@ -704,25 +734,64 @@ var file_app_config_server_proto_rawDesc = []byte{ 0x3d, 0x0a, 0x03, 0x74, 0x6c, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x54, 0x4c, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x03, 0x74, 0x6c, 0x73, 0x22, 0x91, - 0x01, 0x0a, 0x0d, 0x54, 0x50, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x12, 0x22, 0x0a, 0x0d, 0x72, 0x6f, 0x77, 0x73, 0x5f, 0x70, 0x65, 0x72, 0x5f, 0x70, 0x61, 0x67, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x72, 0x6f, 0x77, 0x73, 0x50, 0x65, 0x72, - 0x50, 0x61, 0x67, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, 0x70, 0x65, - 0x72, 0x5f, 0x70, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x62, 0x79, - 0x74, 0x65, 0x73, 0x50, 0x65, 0x72, 0x50, 0x61, 0x67, 0x65, 0x12, 0x36, 0x0a, 0x17, 0x70, 0x72, - 0x65, 0x66, 0x65, 0x74, 0x63, 0x68, 0x5f, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x63, 0x61, 0x70, - 0x61, 0x63, 0x69, 0x74, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x15, 0x70, 0x72, 0x65, - 0x66, 0x65, 0x74, 0x63, 0x68, 0x51, 0x75, 0x65, 0x75, 0x65, 0x43, 0x61, 0x70, 0x61, 0x63, 0x69, - 0x74, 0x79, 0x2a, 0x4b, 0x0a, 0x09, 0x45, 0x4c, 0x6f, 0x67, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, - 0x09, 0x0a, 0x05, 0x54, 0x52, 0x41, 0x43, 0x45, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x44, 0x45, - 0x42, 0x55, 0x47, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x49, 0x4e, 0x46, 0x4f, 0x10, 0x02, 0x12, - 0x08, 0x0a, 0x04, 0x57, 0x41, 0x52, 0x4e, 0x10, 0x03, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, - 0x4f, 0x52, 0x10, 0x04, 0x12, 0x09, 0x0a, 0x05, 0x46, 0x41, 0x54, 0x41, 0x4c, 0x10, 0x05, 0x42, - 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x64, - 0x62, 0x2d, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x66, 0x71, 0x2d, 0x63, 0x6f, - 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2d, 0x67, 0x6f, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x63, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x54, 0x4c, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x03, 0x74, 0x6c, 0x73, 0x22, 0x3e, + 0x0a, 0x10, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x54, 0x4c, 0x53, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x03, 0x6b, 0x65, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x65, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x04, 0x63, 0x65, 0x72, 0x74, 0x4a, 0x04, 0x08, 0x01, 0x10, 0x02, 0x22, 0x26, + 0x0a, 0x10, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x61, 0x64, 0x4c, 0x69, 0x6d, + 0x69, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x22, 0x8b, 0x01, 0x0a, 0x0d, 0x54, 0x4c, 0x6f, 0x67, 0x67, + 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x41, 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x5f, + 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x24, 0x2e, 0x4e, 0x59, + 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, + 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x45, 0x4c, 0x6f, 0x67, 0x4c, 0x65, 0x76, 0x65, + 0x6c, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x37, 0x0a, 0x18, 0x65, + 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x71, 0x6c, 0x5f, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, + 0x6c, 0x6f, 0x67, 0x67, 0x69, 0x6e, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x15, 0x65, + 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x71, 0x6c, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4c, 0x6f, 0x67, + 0x67, 0x69, 0x6e, 0x67, 0x22, 0x90, 0x01, 0x0a, 0x12, 0x54, 0x50, 0x70, 0x72, 0x6f, 0x66, 0x53, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3b, 0x0a, 0x08, 0x65, + 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, + 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x4e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, + 0x4e, 0x41, 0x70, 0x69, 0x2e, 0x54, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x08, + 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x3d, 0x0a, 0x03, 0x74, 0x6c, 0x73, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, + 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x2e, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x54, 0x4c, 0x53, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x52, 0x03, 0x74, 0x6c, 0x73, 0x22, 0x92, 0x01, 0x0a, 0x14, 0x54, 0x4d, 0x65, 0x74, + 0x72, 0x69, 0x63, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x12, 0x3b, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x4e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, + 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x4e, 0x41, 0x70, 0x69, 0x2e, 0x54, 0x45, 0x6e, 0x64, 0x70, 0x6f, + 0x69, 0x6e, 0x74, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x3d, 0x0a, + 0x03, 0x74, 0x6c, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x4e, 0x59, 0x71, + 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, + 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x54, 0x4c, + 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x03, 0x74, 0x6c, 0x73, 0x22, 0x91, 0x01, 0x0a, + 0x0d, 0x54, 0x50, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x22, + 0x0a, 0x0d, 0x72, 0x6f, 0x77, 0x73, 0x5f, 0x70, 0x65, 0x72, 0x5f, 0x70, 0x61, 0x67, 0x65, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x72, 0x6f, 0x77, 0x73, 0x50, 0x65, 0x72, 0x50, 0x61, + 0x67, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, 0x70, 0x65, 0x72, 0x5f, + 0x70, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x62, 0x79, 0x74, 0x65, + 0x73, 0x50, 0x65, 0x72, 0x50, 0x61, 0x67, 0x65, 0x12, 0x36, 0x0a, 0x17, 0x70, 0x72, 0x65, 0x66, + 0x65, 0x74, 0x63, 0x68, 0x5f, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x63, 0x61, 0x70, 0x61, 0x63, + 0x69, 0x74, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x15, 0x70, 0x72, 0x65, 0x66, 0x65, + 0x74, 0x63, 0x68, 0x51, 0x75, 0x65, 0x75, 0x65, 0x43, 0x61, 0x70, 0x61, 0x63, 0x69, 0x74, 0x79, + 0x22, 0x47, 0x0a, 0x11, 0x54, 0x43, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x32, 0x0a, 0x15, 0x75, 0x73, 0x65, 0x5f, 0x75, 0x6e, 0x73, + 0x61, 0x66, 0x65, 0x5f, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x74, 0x65, 0x72, 0x73, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x13, 0x75, 0x73, 0x65, 0x55, 0x6e, 0x73, 0x61, 0x66, 0x65, 0x43, + 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x74, 0x65, 0x72, 0x73, 0x2a, 0x4b, 0x0a, 0x09, 0x45, 0x4c, 0x6f, + 0x67, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x52, 0x41, 0x43, 0x45, 0x10, + 0x00, 0x12, 0x09, 0x0a, 0x05, 0x44, 0x45, 0x42, 0x55, 0x47, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, + 0x49, 0x4e, 0x46, 0x4f, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x57, 0x41, 0x52, 0x4e, 0x10, 0x03, + 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x04, 0x12, 0x09, 0x0a, 0x05, 0x46, + 0x41, 0x54, 0x41, 0x4c, 0x10, 0x05, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x64, 0x62, 0x2d, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, + 0x6d, 0x2f, 0x66, 0x71, 0x2d, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2d, 0x67, + 0x6f, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -738,7 +807,7 @@ func file_app_config_server_proto_rawDescGZIP() []byte { } var file_app_config_server_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_app_config_server_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_app_config_server_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_app_config_server_proto_goTypes = []interface{}{ (ELogLevel)(0), // 0: NYql.Connector.App.Config.ELogLevel (*TServerConfig)(nil), // 1: NYql.Connector.App.Config.TServerConfig @@ -749,10 +818,11 @@ var file_app_config_server_proto_goTypes = []interface{}{ (*TPprofServerConfig)(nil), // 6: NYql.Connector.App.Config.TPprofServerConfig (*TMetricsServerConfig)(nil), // 7: NYql.Connector.App.Config.TMetricsServerConfig (*TPagingConfig)(nil), // 8: NYql.Connector.App.Config.TPagingConfig - (*common.TEndpoint)(nil), // 9: NYql.NConnector.NApi.TEndpoint + (*TConversionConfig)(nil), // 9: NYql.Connector.App.Config.TConversionConfig + (*common.TEndpoint)(nil), // 10: NYql.NConnector.NApi.TEndpoint } var file_app_config_server_proto_depIdxs = []int32{ - 9, // 0: NYql.Connector.App.Config.TServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint + 10, // 0: NYql.Connector.App.Config.TServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint 3, // 1: NYql.Connector.App.Config.TServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig 2, // 2: NYql.Connector.App.Config.TServerConfig.connector_server:type_name -> NYql.Connector.App.Config.TConnectorServerConfig 4, // 3: NYql.Connector.App.Config.TServerConfig.read_limit:type_name -> NYql.Connector.App.Config.TServerReadLimit @@ -760,18 +830,19 @@ var file_app_config_server_proto_depIdxs = []int32{ 6, // 5: NYql.Connector.App.Config.TServerConfig.pprof_server:type_name -> NYql.Connector.App.Config.TPprofServerConfig 7, // 6: NYql.Connector.App.Config.TServerConfig.metrics_server:type_name -> NYql.Connector.App.Config.TMetricsServerConfig 8, // 7: NYql.Connector.App.Config.TServerConfig.paging:type_name -> NYql.Connector.App.Config.TPagingConfig - 9, // 8: NYql.Connector.App.Config.TConnectorServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint - 3, // 9: NYql.Connector.App.Config.TConnectorServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig - 0, // 10: NYql.Connector.App.Config.TLoggerConfig.log_level:type_name -> NYql.Connector.App.Config.ELogLevel - 9, // 11: NYql.Connector.App.Config.TPprofServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint - 3, // 12: NYql.Connector.App.Config.TPprofServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig - 9, // 13: NYql.Connector.App.Config.TMetricsServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint - 3, // 14: NYql.Connector.App.Config.TMetricsServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig - 15, // [15:15] is the sub-list for method output_type - 15, // [15:15] is the sub-list for method input_type - 15, // [15:15] is the sub-list for extension type_name - 15, // [15:15] is the sub-list for extension extendee - 0, // [0:15] is the sub-list for field type_name + 9, // 8: NYql.Connector.App.Config.TServerConfig.conversion:type_name -> NYql.Connector.App.Config.TConversionConfig + 10, // 9: NYql.Connector.App.Config.TConnectorServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint + 3, // 10: NYql.Connector.App.Config.TConnectorServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig + 0, // 11: NYql.Connector.App.Config.TLoggerConfig.log_level:type_name -> NYql.Connector.App.Config.ELogLevel + 10, // 12: NYql.Connector.App.Config.TPprofServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint + 3, // 13: NYql.Connector.App.Config.TPprofServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig + 10, // 14: NYql.Connector.App.Config.TMetricsServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint + 3, // 15: NYql.Connector.App.Config.TMetricsServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig + 16, // [16:16] is the sub-list for method output_type + 16, // [16:16] is the sub-list for method input_type + 16, // [16:16] is the sub-list for extension type_name + 16, // [16:16] is the sub-list for extension extendee + 0, // [0:16] is the sub-list for field type_name } func init() { file_app_config_server_proto_init() } @@ -876,6 +947,18 @@ func file_app_config_server_proto_init() { return nil } } + file_app_config_server_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TConversionConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -883,7 +966,7 @@ func file_app_config_server_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_app_config_server_proto_rawDesc, NumEnums: 1, - NumMessages: 8, + NumMessages: 9, NumExtensions: 0, NumServices: 0, }, diff --git a/app/config/server.proto b/app/config/server.proto index 909dd8bb..0a2beeeb 100644 --- a/app/config/server.proto +++ b/app/config/server.proto @@ -23,8 +23,10 @@ message TServerConfig { TPprofServerConfig pprof_server = 6; // Metrics server config TMetricsServerConfig metrics_server = 7; - // Paging settings + // Paging config TPagingConfig paging = 8; + // Data types conversion config + TConversionConfig conversion = 9; } // TConnectorServerConfig - configuration of the main GRPC server @@ -104,3 +106,10 @@ message TPagingConfig { // Tune this carefully cause this may cause service OOMs. uint32 prefetch_queue_capacity = 3; } + +// TConversionConfig configures some aspects of the data conversion process +// between the data source native type system, Go type system and Arrow type system +message TConversionConfig { + // Enables microoptimized code that can potentially crash the service + bool use_unsafe_converters = 1; +} diff --git a/app/server/config.go b/app/server/config.go index 99499212..5f2cac03 100644 --- a/app/server/config.go +++ b/app/server/config.go @@ -25,6 +25,12 @@ func fillServerConfigDefaults(c *config.TServerConfig) { EnableSqlQueryLogging: false, } } + + if c.Conversion == nil { + c.Conversion = &config.TConversionConfig{ + UseUnsafeConverters: false, + } + } } func validateServerConfig(c *config.TServerConfig) error { @@ -44,6 +50,10 @@ func validateServerConfig(c *config.TServerConfig) error { return fmt.Errorf("validate `paging`: %w", err) } + if err := validateConversionConfig(c.Conversion); err != nil { + return fmt.Errorf("validate `conversion`: %w", err) + } + return nil } @@ -146,6 +156,14 @@ func validatePagingConfig(c *config.TPagingConfig) error { return nil } +func validateConversionConfig(c *config.TConversionConfig) error { + if c == nil { + return fmt.Errorf("required section is missing") + } + + return nil +} + func fileMustExist(path string) error { info, err := os.Stat(path) if os.IsNotExist(err) { diff --git a/app/server/conversion/converters_default.go b/app/server/conversion/converters_default.go new file mode 100644 index 00000000..f29c4046 --- /dev/null +++ b/app/server/conversion/converters_default.go @@ -0,0 +1,121 @@ +package conversion + +import ( + "fmt" + "time" + + "github.com/ydb-platform/fq-connector-go/common" +) + +var _ Collection = collectionDefault{} + +type collectionDefault struct{} + +func (collectionDefault) Bool() ValueConverter[bool, uint8] { return boolConverter{} } +func (collectionDefault) Int8() ValueConverter[int8, int8] { return noopConverter[int8]{} } +func (collectionDefault) Int16() ValueConverter[int16, int16] { return noopConverter[int16]{} } +func (collectionDefault) Int32() ValueConverter[int32, int32] { return noopConverter[int32]{} } +func (collectionDefault) Int64() ValueConverter[int64, int64] { return noopConverter[int64]{} } +func (collectionDefault) Uint8() ValueConverter[uint8, uint8] { return noopConverter[uint8]{} } +func (collectionDefault) Uint16() ValueConverter[uint16, uint16] { return noopConverter[uint16]{} } +func (collectionDefault) Uint32() ValueConverter[uint32, uint32] { return noopConverter[uint32]{} } +func (collectionDefault) Uint64() ValueConverter[uint64, uint64] { return noopConverter[uint64]{} } +func (collectionDefault) Float32() ValueConverter[float32, float32] { return noopConverter[float32]{} } +func (collectionDefault) Float64() ValueConverter[float64, float64] { return noopConverter[float64]{} } +func (collectionDefault) String() ValueConverter[string, string] { return noopConverter[string]{} } +func (collectionDefault) StringToBytes() ValueConverter[string, []byte] { + return stringToBytesConverter{} +} +func (collectionDefault) Bytes() ValueConverter[[]byte, []byte] { return noopConverter[[]byte]{} } +func (collectionDefault) Date() ValueConverter[time.Time, uint16] { return dateConverter{} } +func (collectionDefault) DateToString() ValuePtrConverter[time.Time, string] { + return dateToStringConverter{} +} +func (collectionDefault) Datetime() ValueConverter[time.Time, uint32] { return datetimeConverter{} } +func (collectionDefault) DatetimeToString() ValuePtrConverter[time.Time, string] { + return datetimeToStringConverter{} +} +func (collectionDefault) Timestamp() ValueConverter[time.Time, uint64] { return timestampConverter{} } +func (collectionDefault) TimestampToString() ValuePtrConverter[time.Time, string] { + return timestampToStringConverter{} +} + +type noopConverter[T common.ValueType] struct { +} + +func (noopConverter[T]) Convert(in T) (T, error) { return in, nil } + +type boolConverter struct{} + +func (boolConverter) Convert(in bool) (uint8, error) { + // For a some reason, Bool values are converted to Arrow Uint8 rather than to Arrow native Bool. + // See https://st.yandex-team.ru/YQL-15332 for more details. + if in { + return 1, nil + } + + return 0, nil +} + +type stringToBytesConverter struct{} + +func (stringToBytesConverter) Convert(in string) ([]byte, error) { return []byte(in), nil } + +type dateConverter struct{} + +func (dateConverter) Convert(in time.Time) (uint16, error) { + out, err := common.TimeToYDBDate(&in) + + if err != nil { + return 0, fmt.Errorf("convert time to YDB Date: %w", err) + } + + return out, nil +} + +type dateToStringConverter struct{} + +func (dateToStringConverter) Convert(in *time.Time) (string, error) { + return in.Format("2006-01-02"), nil +} + +type datetimeConverter struct{} + +func (datetimeConverter) Convert(in time.Time) (uint32, error) { + out, err := common.TimeToYDBDatetime(&in) + + if err != nil { + return 0, fmt.Errorf("convert time to YDB Datetime: %w", err) + } + + return out, nil +} + +type datetimeToStringConverter struct{} + +func (datetimeToStringConverter) Convert(in *time.Time) (string, error) { + return in.UTC().Format("2006-01-02T15:04:05Z"), nil +} + +type timestampConverter struct{} + +func (timestampConverter) Convert(in time.Time) (uint64, error) { + out, err := common.TimeToYDBTimestamp(&in) + + if err != nil { + return 0, fmt.Errorf("convert time to YDB Timestamp: %w", err) + } + + return out, nil +} + +type timestampToStringConverter struct{} + +func (timestampToStringConverter) Convert(in *time.Time) (string, error) { + // Using accuracy of 9 decimal places is enough for supported data sources + // Max accuracy of date/time formats: + // PostgreSQL - 1 microsecond (10^-6 s) + // ClickHouse - 1 nanosecond (10^-9 s) + // Trailing zeros are omitted + return in.UTC().Format("2006-01-02T15:04:05.999999999Z"), nil +} diff --git a/app/server/utils/converters_test.go b/app/server/conversion/converters_test.go similarity index 80% rename from app/server/utils/converters_test.go rename to app/server/conversion/converters_test.go index 045e09fc..efe0b22f 100644 --- a/app/server/utils/converters_test.go +++ b/app/server/conversion/converters_test.go @@ -1,4 +1,4 @@ -package utils +package conversion import ( "testing" @@ -23,7 +23,7 @@ func TestDateToStringConverter(t *testing.T) { const format = "2006-01-02" - var converter DateToStringConverterV2 + var converter dateToStringConverterUnsafe for _, tc := range testCases { tc := tc @@ -39,8 +39,8 @@ func TestDateToStringConverter(t *testing.T) { func BenchmarkDateToStringConverter(b *testing.B) { t := time.Now() - b.Run("V1", func(b *testing.B) { - var converter DateToStringConverter + b.Run("Default", func(b *testing.B) { + var converter dateToStringConverter for i := 0; i < b.N; i++ { out, err := converter.Convert(&t) @@ -51,8 +51,8 @@ func BenchmarkDateToStringConverter(b *testing.B) { } }) - b.Run("V2", func(b *testing.B) { - var converter DateToStringConverterV2 + b.Run("Unsafe", func(b *testing.B) { + var converter dateToStringConverterUnsafe for i := 0; i < b.N; i++ { out, err := converter.Convert(&t) diff --git a/app/server/conversion/converters_unsafe.go b/app/server/conversion/converters_unsafe.go new file mode 100644 index 00000000..5ad9d014 --- /dev/null +++ b/app/server/conversion/converters_unsafe.go @@ -0,0 +1,79 @@ +package conversion + +import ( + "time" + "unsafe" +) + +var _ Collection = collectionUnsafe{} + +type collectionUnsafe struct { + collectionDefault +} + +func (collectionUnsafe) DateToString() ValuePtrConverter[time.Time, string] { + return dateToStringConverterUnsafe{} +} + +func absInt(x int) int { + if x < 0 { + return -x + } + + return x +} + +//go:linkname decomposeDate time.(*Time).date +func decomposeDate(*time.Time, bool) (year int, month int, day int, dayOfYear int) + +//go:linkname formatBits strconv.formatBits +func formatBits([]byte, uint64, int, bool, bool) (b []byte, s string) + +type dateToStringConverterUnsafe struct{} + +func (dateToStringConverterUnsafe) Convert(in *time.Time) (string, error) { + buf := make([]byte, 0, 11) + + year, month, day, _ := decomposeDate(in, true) + + // year + + if year < 0 { + buf = append(buf, byte('-')) + } + + absYear := absInt(year) + + switch { + case absYear < 10: + buf = append(buf, []byte("000")...) + case absYear < 100: + buf = append(buf, []byte("00")...) + case absYear < 1000: + buf = append(buf, byte('0')) + } + + buf, _ = formatBits(buf, uint64(absYear), 10, false, true) + + // month + + buf = append(buf, byte('-')) + if month < 10 { + buf = append(buf, byte('0')) + } + + buf, _ = formatBits(buf, uint64(month), 10, false, true) + + // day + + buf = append(buf, byte('-')) + if day < 10 { + buf = append(buf, byte('0')) + } + + buf, _ = formatBits(buf, uint64(day), 10, false, true) + + p := unsafe.SliceData(buf) + + return unsafe.String(p, len(buf)), nil +} diff --git a/app/server/conversion/factory.go b/app/server/conversion/factory.go new file mode 100644 index 00000000..c7d3fae3 --- /dev/null +++ b/app/server/conversion/factory.go @@ -0,0 +1,13 @@ +package conversion + +import ( + "github.com/ydb-platform/fq-connector-go/app/config" +) + +func NewCollection(cfg *config.TConversionConfig) Collection { + if cfg.UseUnsafeConverters { + return collectionUnsafe{} + } + + return collectionDefault{} +} diff --git a/app/server/conversion/interface.go b/app/server/conversion/interface.go new file mode 100644 index 00000000..21846b7a --- /dev/null +++ b/app/server/conversion/interface.go @@ -0,0 +1,38 @@ +package conversion + +import ( + "time" + + "github.com/ydb-platform/fq-connector-go/common" +) + +type ValueConverter[IN common.ValueType, OUT common.ValueType] interface { + Convert(in IN) (OUT, error) +} + +type ValuePtrConverter[IN common.ValueType, OUT common.ValueType] interface { + Convert(in *IN) (OUT, error) +} + +type Collection interface { + Bool() ValueConverter[bool, uint8] + Int8() ValueConverter[int8, int8] + Int16() ValueConverter[int16, int16] + Int32() ValueConverter[int32, int32] + Int64() ValueConverter[int64, int64] + Uint8() ValueConverter[uint8, uint8] + Uint16() ValueConverter[uint16, uint16] + Uint32() ValueConverter[uint32, uint32] + Uint64() ValueConverter[uint64, uint64] + Float32() ValueConverter[float32, float32] + Float64() ValueConverter[float64, float64] + String() ValueConverter[string, string] + StringToBytes() ValueConverter[string, []byte] + Bytes() ValueConverter[[]byte, []byte] + Date() ValueConverter[time.Time, uint16] + DateToString() ValuePtrConverter[time.Time, string] + Datetime() ValueConverter[time.Time, uint32] + DatetimeToString() ValuePtrConverter[time.Time, string] + Timestamp() ValueConverter[time.Time, uint64] + TimestampToString() ValuePtrConverter[time.Time, string] +} diff --git a/app/server/data_source_collection.go b/app/server/data_source_collection.go index 8e549723..fb4315bc 100644 --- a/app/server/data_source_collection.go +++ b/app/server/data_source_collection.go @@ -11,6 +11,7 @@ import ( api_service "github.com/ydb-platform/fq-connector-go/api/service" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" "github.com/ydb-platform/fq-connector-go/app/config" + "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms" "github.com/ydb-platform/fq-connector-go/app/server/datasource/s3" @@ -133,10 +134,11 @@ func NewDataSourceCollection( queryLoggerFactory common.QueryLoggerFactory, memoryAllocator memory.Allocator, readLimiterFactory *paging.ReadLimiterFactory, + converterCollection conversion.Collection, cfg *config.TServerConfig, ) *DataSourceCollection { return &DataSourceCollection{ - rdbms: rdbms.NewDataSourceFactory(queryLoggerFactory), + rdbms: rdbms.NewDataSourceFactory(queryLoggerFactory, converterCollection), memoryAllocator: memoryAllocator, readLimiterFactory: readLimiterFactory, cfg: cfg, diff --git a/app/server/datasource/rdbms/clickhouse/connection_manager.go b/app/server/datasource/rdbms/clickhouse/connection_manager.go index e44cfdd3..504293c7 100644 --- a/app/server/datasource/rdbms/clickhouse/connection_manager.go +++ b/app/server/datasource/rdbms/clickhouse/connection_manager.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" api_common "github.com/ydb-platform/fq-connector-go/api/common" + "github.com/ydb-platform/fq-connector-go/app/server/conversion" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/app/server/paging" "github.com/ydb-platform/fq-connector-go/common" @@ -24,11 +25,13 @@ type Connection struct { logger common.QueryLogger } +var _ rdbms_utils.Rows = (*rows)(nil) + type rows struct { *sql.Rows } -func (r rows) MakeTransformer(ydbTypes []*Ydb.Type) (paging.RowTransformer[any], error) { +func (r rows) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) { columns, err := r.ColumnTypes() if err != nil { return nil, fmt.Errorf("column types: %w", err) @@ -39,7 +42,7 @@ func (r rows) MakeTransformer(ydbTypes []*Ydb.Type) (paging.RowTransformer[any], typeNames = append(typeNames, column.DatabaseTypeName()) } - transformer, err := transformerFromSQLTypes(typeNames, ydbTypes) + transformer, err := transformerFromSQLTypes(typeNames, ydbTypes, cc) if err != nil { return nil, fmt.Errorf("transformer from sql types: %w", err) } diff --git a/app/server/datasource/rdbms/clickhouse/type_mapper.go b/app/server/datasource/rdbms/clickhouse/type_mapper.go index ec97b84d..5599a9e6 100644 --- a/app/server/datasource/rdbms/clickhouse/type_mapper.go +++ b/app/server/datasource/rdbms/clickhouse/type_mapper.go @@ -10,9 +10,9 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/paging" - "github.com/ydb-platform/fq-connector-go/app/server/utils" "github.com/ydb-platform/fq-connector-go/common" ) @@ -108,7 +108,7 @@ func (tm typeMapper) SQLTypeToYDBColumn( } //nolint:funlen,gocyclo -func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type) (paging.RowTransformer[any], error) { +func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) { acceptors := make([]any, 0, len(typeNames)) appenders := make([]func(acceptor any, builder array.Builder) error, 0, len(typeNames)) isNullable := regexp.MustCompile(`Nullable\((?P[\w\(\)]+)\)`) @@ -124,41 +124,41 @@ func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type) (paging.R switch { case typeName == "Bool": acceptors = append(acceptors, new(*bool)) - appenders = append(appenders, appendValueToArrowBuilder[bool, uint8, *array.Uint8Builder, utils.BoolConverter]) + appenders = append(appenders, makeAppender[bool, uint8, *array.Uint8Builder](cc.Bool())) case typeName == "Int8": acceptors = append(acceptors, new(*int8)) - appenders = append(appenders, appendValueToArrowBuilder[int8, int8, *array.Int8Builder, utils.Int8Converter]) + appenders = append(appenders, makeAppender[int8, int8, *array.Int8Builder](cc.Int8())) case typeName == "Int16": acceptors = append(acceptors, new(*int16)) - appenders = append(appenders, appendValueToArrowBuilder[int16, int16, *array.Int16Builder, utils.Int16Converter]) + appenders = append(appenders, makeAppender[int16, int16, *array.Int16Builder](cc.Int16())) case typeName == "Int32": acceptors = append(acceptors, new(*int32)) - appenders = append(appenders, appendValueToArrowBuilder[int32, int32, *array.Int32Builder, utils.Int32Converter]) + appenders = append(appenders, makeAppender[int32, int32, *array.Int32Builder](cc.Int32())) case typeName == "Int64": acceptors = append(acceptors, new(*int64)) - appenders = append(appenders, appendValueToArrowBuilder[int64, int64, *array.Int64Builder, utils.Int64Converter]) + appenders = append(appenders, makeAppender[int64, int64, *array.Int64Builder](cc.Int64())) case typeName == "UInt8": acceptors = append(acceptors, new(*uint8)) - appenders = append(appenders, appendValueToArrowBuilder[uint8, uint8, *array.Uint8Builder, utils.Uint8Converter]) + appenders = append(appenders, makeAppender[uint8, uint8, *array.Uint8Builder](cc.Uint8())) case typeName == "UInt16": acceptors = append(acceptors, new(*uint16)) - appenders = append(appenders, appendValueToArrowBuilder[uint16, uint16, *array.Uint16Builder, utils.Uint16Converter]) + appenders = append(appenders, makeAppender[uint16, uint16, *array.Uint16Builder](cc.Uint16())) case typeName == "UInt32": acceptors = append(acceptors, new(*uint32)) - appenders = append(appenders, appendValueToArrowBuilder[uint32, uint32, *array.Uint32Builder, utils.Uint32Converter]) + appenders = append(appenders, makeAppender[uint32, uint32, *array.Uint32Builder](cc.Uint32())) case typeName == "UInt64": acceptors = append(acceptors, new(*uint64)) - appenders = append(appenders, appendValueToArrowBuilder[uint64, uint64, *array.Uint64Builder, utils.Uint64Converter]) + appenders = append(appenders, makeAppender[uint64, uint64, *array.Uint64Builder](cc.Uint64())) case typeName == "Float32": acceptors = append(acceptors, new(*float32)) - appenders = append(appenders, appendValueToArrowBuilder[float32, float32, *array.Float32Builder, utils.Float32Converter]) + appenders = append(appenders, makeAppender[float32, float32, *array.Float32Builder](cc.Float32())) case typeName == "Float64": acceptors = append(acceptors, new(*float64)) - appenders = append(appenders, appendValueToArrowBuilder[float64, float64, *array.Float64Builder, utils.Float64Converter]) + appenders = append(appenders, makeAppender[float64, float64, *array.Float64Builder](cc.Float64())) case typeName == "String", isFixedString.MatchString(typeName): - // Looks like []byte would be a better choice here, but clickhouse driver prefers string + // Looks like []byte would be a better option here, but clickhouse driver prefers string acceptors = append(acceptors, new(*string)) - appenders = append(appenders, appendValueToArrowBuilder[string, []byte, *array.BinaryBuilder, utils.StringToBytesConverter]) + appenders = append(appenders, makeAppender[string, []byte, *array.BinaryBuilder](cc.StringToBytes())) case typeName == "Date": acceptors = append(acceptors, new(*time.Time)) @@ -169,9 +169,10 @@ func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type) (paging.R switch ydbTypeID { case Ydb.Type_UTF8: - appenders = append(appenders, appendValueToArrowBuilder[time.Time, string, *array.StringBuilder, dateToStringConverter]) + appenders = append(appenders, + makeAppender[time.Time, string, *array.StringBuilder](dateToStringConverter{conv: cc.DateToString()})) case Ydb.Type_DATE: - appenders = append(appenders, appendValueToArrowBuilder[time.Time, uint16, *array.Uint16Builder, utils.DateConverter]) + appenders = append(appenders, makeAppender[time.Time, uint16, *array.Uint16Builder](cc.Date())) default: return nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbTypes[i], typeName, common.ErrDataTypeNotSupported) } @@ -185,9 +186,10 @@ func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type) (paging.R switch ydbTypeID { case Ydb.Type_UTF8: - appenders = append(appenders, appendValueToArrowBuilder[time.Time, string, *array.StringBuilder, date32ToStringConverter]) + appenders = append(appenders, + makeAppender[time.Time, string, *array.StringBuilder](date32ToStringConverter{conv: cc.DateToString()})) case Ydb.Type_DATE: - appenders = append(appenders, appendValueToArrowBuilder[time.Time, uint16, *array.Uint16Builder, utils.DateConverter]) + appenders = append(appenders, makeAppender[time.Time, uint16, *array.Uint16Builder](cc.Date())) default: return nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbTypes[i], typeName, common.ErrDataTypeNotSupported) } @@ -202,10 +204,9 @@ func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type) (paging.R switch ydbTypeID { case Ydb.Type_UTF8: appenders = append(appenders, - appendValueToArrowBuilder[time.Time, string, *array.StringBuilder, dateTime64ToStringConverter]) + makeAppender[time.Time, string, *array.StringBuilder](dateTime64ToStringConverter{conv: cc.TimestampToString()})) case Ydb.Type_TIMESTAMP: - appenders = append(appenders, - appendValueToArrowBuilder[time.Time, uint64, *array.Uint64Builder, utils.TimestampConverter]) + appenders = append(appenders, makeAppender[time.Time, uint64, *array.Uint64Builder](cc.Timestamp())) default: return nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbTypes[i], typeName, common.ErrDataTypeNotSupported) } @@ -219,9 +220,10 @@ func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type) (paging.R switch ydbTypeID { case Ydb.Type_UTF8: - appenders = append(appenders, appendValueToArrowBuilder[time.Time, string, *array.StringBuilder, dateTimeToStringConverter]) + appenders = append(appenders, + makeAppender[time.Time, string, *array.StringBuilder](dateTimeToStringConverter{conv: cc.DatetimeToString()})) case Ydb.Type_DATETIME: - appenders = append(appenders, appendValueToArrowBuilder[time.Time, uint32, *array.Uint32Builder, utils.DatetimeConverter]) + appenders = append(appenders, makeAppender[time.Time, uint32, *array.Uint32Builder](cc.Datetime())) default: return nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbTypes[i], typeName, common.ErrDataTypeNotSupported) } @@ -233,9 +235,20 @@ func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type) (paging.R return paging.NewRowTransformer[any](acceptors, appenders, nil), nil } -func appendValueToArrowBuilder[IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT], CONV utils.ValueConverter[IN, OUT]]( +func makeAppender[ + IN common.ValueType, + OUT common.ValueType, + AB common.ArrowBuilder[OUT], +](conv conversion.ValueConverter[IN, OUT]) func(acceptor any, builder array.Builder) error { + return func(acceptor any, builder array.Builder) error { + return appendValueToArrowBuilder[IN, OUT, AB](acceptor, builder, conv) + } +} + +func appendValueToArrowBuilder[IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT]]( acceptor any, builder array.Builder, + conv conversion.ValueConverter[IN, OUT], ) error { cast := acceptor.(**IN) @@ -247,9 +260,7 @@ func appendValueToArrowBuilder[IN common.ValueType, OUT common.ValueType, AB com value := **cast - var converter CONV - - out, err := converter.Convert(value) + out, err := conv.Convert(value) if err != nil { if errors.Is(err, common.ErrValueOutOfTypeBounds) { // TODO: write warning to logger @@ -293,28 +304,36 @@ func saturateDateTime(in, min, max time.Time) *time.Time { return &in } -type dateToStringConverter struct{} +type dateToStringConverter struct { + conv conversion.ValuePtrConverter[time.Time, string] +} -func (dateToStringConverter) Convert(in time.Time) (string, error) { - return utils.DateToStringConverterV2{}.Convert(saturateDateTime(in, minClickHouseDate, maxClickHouseDate)) +func (c dateToStringConverter) Convert(in time.Time) (string, error) { + return c.conv.Convert(saturateDateTime(in, minClickHouseDate, maxClickHouseDate)) } -type date32ToStringConverter struct{} +type date32ToStringConverter struct { + conv conversion.ValuePtrConverter[time.Time, string] +} -func (date32ToStringConverter) Convert(in time.Time) (string, error) { - return utils.DateToStringConverterV2{}.Convert(saturateDateTime(in, minClickHouseDate32, maxClickHouseDate32)) +func (c date32ToStringConverter) Convert(in time.Time) (string, error) { + return c.conv.Convert(saturateDateTime(in, minClickHouseDate32, maxClickHouseDate32)) } -type dateTimeToStringConverter struct{} +type dateTimeToStringConverter struct { + conv conversion.ValuePtrConverter[time.Time, string] +} -func (dateTimeToStringConverter) Convert(in time.Time) (string, error) { - return utils.DatetimeToStringConverter{}.Convert(saturateDateTime(in, minClickHouseDatetime, maxClickHouseDatetime)) +func (c dateTimeToStringConverter) Convert(in time.Time) (string, error) { + return c.conv.Convert(saturateDateTime(in, minClickHouseDatetime, maxClickHouseDatetime)) } -type dateTime64ToStringConverter struct{} +type dateTime64ToStringConverter struct { + conv conversion.ValuePtrConverter[time.Time, string] +} -func (dateTime64ToStringConverter) Convert(in time.Time) (string, error) { - return utils.TimestampToStringConverter{}.Convert(saturateDateTime(in, minClickHouseDatetime64, maxClickHouseDatetime64)) +func (c dateTime64ToStringConverter) Convert(in time.Time) (string, error) { + return c.conv.Convert(saturateDateTime(in, minClickHouseDatetime64, maxClickHouseDatetime64)) } func NewTypeMapper() datasource.TypeMapper { diff --git a/app/server/datasource/rdbms/data_source.go b/app/server/datasource/rdbms/data_source.go index e53e8cbd..8d5fad6e 100644 --- a/app/server/datasource/rdbms/data_source.go +++ b/app/server/datasource/rdbms/data_source.go @@ -7,6 +7,7 @@ import ( "go.uber.org/zap" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/datasource" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/app/server/paging" @@ -23,11 +24,12 @@ type Preset struct { var _ datasource.DataSource[any] = (*dataSourceImpl)(nil) type dataSourceImpl struct { - typeMapper datasource.TypeMapper - sqlFormatter rdbms_utils.SQLFormatter - connectionManager rdbms_utils.ConnectionManager - schemaProvider rdbms_utils.SchemaProvider - logger *zap.Logger + typeMapper datasource.TypeMapper + sqlFormatter rdbms_utils.SQLFormatter + connectionManager rdbms_utils.ConnectionManager + schemaProvider rdbms_utils.SchemaProvider + converterCollection conversion.Collection + logger *zap.Logger } func (ds *dataSourceImpl) DescribeTable( @@ -80,7 +82,7 @@ func (ds *dataSourceImpl) doReadSplit( return fmt.Errorf("convert Select.What to Ydb types: %w", err) } - transformer, err := rows.MakeTransformer(ydbTypes) + transformer, err := rows.MakeTransformer(ydbTypes, ds.converterCollection) if err != nil { return fmt.Errorf("make transformer: %w", err) } @@ -121,12 +123,14 @@ func (ds *dataSourceImpl) ReadSplit( func NewDataSource( logger *zap.Logger, preset *Preset, + converterCollection conversion.Collection, ) datasource.DataSource[any] { return &dataSourceImpl{ - logger: logger, - sqlFormatter: preset.SQLFormatter, - connectionManager: preset.ConnectionManager, - typeMapper: preset.TypeMapper, - schemaProvider: preset.SchemaProvider, + logger: logger, + sqlFormatter: preset.SQLFormatter, + connectionManager: preset.ConnectionManager, + typeMapper: preset.TypeMapper, + schemaProvider: preset.SchemaProvider, + converterCollection: converterCollection, } } diff --git a/app/server/datasource/rdbms/data_source_factory.go b/app/server/datasource/rdbms/data_source_factory.go index 1c7f3d1d..9b4b93ca 100644 --- a/app/server/datasource/rdbms/data_source_factory.go +++ b/app/server/datasource/rdbms/data_source_factory.go @@ -6,6 +6,7 @@ import ( "go.uber.org/zap" api_common "github.com/ydb-platform/fq-connector-go/api/common" + "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/clickhouse" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/postgresql" @@ -17,9 +18,10 @@ import ( var _ datasource.Factory[any] = (*dataSourceFactory)(nil) type dataSourceFactory struct { - clickhouse Preset - postgresql Preset - ydb Preset + clickhouse Preset + postgresql Preset + ydb Preset + converterCollection conversion.Collection } func (dsf *dataSourceFactory) Make( @@ -28,16 +30,19 @@ func (dsf *dataSourceFactory) Make( ) (datasource.DataSource[any], error) { switch dataSourceType { case api_common.EDataSourceKind_CLICKHOUSE: - return NewDataSource(logger, &dsf.clickhouse), nil + return NewDataSource(logger, &dsf.clickhouse, dsf.converterCollection), nil case api_common.EDataSourceKind_POSTGRESQL: - return NewDataSource(logger, &dsf.postgresql), nil + return NewDataSource(logger, &dsf.postgresql, dsf.converterCollection), nil case api_common.EDataSourceKind_YDB: - return NewDataSource(logger, &dsf.ydb), nil + return NewDataSource(logger, &dsf.ydb, dsf.converterCollection), nil default: return nil, fmt.Errorf("pick handler for data source type '%v': %w", dataSourceType, common.ErrDataSourceNotSupported) } } -func NewDataSourceFactory(qlf common.QueryLoggerFactory) datasource.Factory[any] { +func NewDataSourceFactory( + qlf common.QueryLoggerFactory, + converterCollection conversion.Collection, +) datasource.Factory[any] { connManagerCfg := rdbms_utils.ConnectionManagerBase{ QueryLoggerFactory: qlf, } @@ -65,5 +70,6 @@ func NewDataSourceFactory(qlf common.QueryLoggerFactory) datasource.Factory[any] TypeMapper: ydbTypeMapper, SchemaProvider: ydb.NewSchemaProvider(ydbTypeMapper), }, + converterCollection: converterCollection, } } diff --git a/app/server/datasource/rdbms/data_source_test.go b/app/server/datasource/rdbms/data_source_test.go index 8bfaf86a..f72fe0ba 100644 --- a/app/server/datasource/rdbms/data_source_test.go +++ b/app/server/datasource/rdbms/data_source_test.go @@ -11,6 +11,8 @@ import ( api_common "github.com/ydb-platform/fq-connector-go/api/common" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/config" + "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/postgresql" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/app/server/paging" @@ -47,6 +49,7 @@ func TestReadSplit(t *testing.T) { }, }, } + converterCollection := conversion.NewCollection(&config.TConversionConfig{UseUnsafeConverters: true}) t.Run("positive", func(t *testing.T) { logger := common.NewTestLogger(t) @@ -90,7 +93,7 @@ func TestReadSplit(t *testing.T) { sink.On("AddRow", transformer).Return(nil).Times(2) sink.On("Finish").Return().Once() - dataSource := NewDataSource(logger, preset) + dataSource := NewDataSource(logger, preset, converterCollection) dataSource.ReadSplit(ctx, logger, split, sink) mock.AssertExpectationsForObjects(t, connectionManager, connection, rows, sink) @@ -144,7 +147,7 @@ func TestReadSplit(t *testing.T) { })).Return().Once() sink.On("Finish").Return().Once() - datasource := NewDataSource(logger, preset) + datasource := NewDataSource(logger, preset, converterCollection) datasource.ReadSplit(ctx, logger, split, sink) mock.AssertExpectationsForObjects(t, connectionManager, connection, rows, sink) diff --git a/app/server/datasource/rdbms/postgresql/connection_manager.go b/app/server/datasource/rdbms/postgresql/connection_manager.go index 49c60bf2..236bfa74 100644 --- a/app/server/datasource/rdbms/postgresql/connection_manager.go +++ b/app/server/datasource/rdbms/postgresql/connection_manager.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" api_common "github.com/ydb-platform/fq-connector-go/api/common" + "github.com/ydb-platform/fq-connector-go/app/server/conversion" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/app/server/paging" "github.com/ydb-platform/fq-connector-go/common" @@ -28,7 +29,7 @@ func (r rows) Close() error { return nil } -func (r rows) MakeTransformer(ydbTypes []*Ydb.Type) (paging.RowTransformer[any], error) { +func (r rows) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) { fields := r.FieldDescriptions() oids := make([]uint32, 0, len(fields)) @@ -36,7 +37,7 @@ func (r rows) MakeTransformer(ydbTypes []*Ydb.Type) (paging.RowTransformer[any], oids = append(oids, field.DataTypeOID) } - return transformerFromOIDs(oids, ydbTypes) + return transformerFromOIDs(oids, ydbTypes, cc) } type Connection struct { diff --git a/app/server/datasource/rdbms/postgresql/type_mapper.go b/app/server/datasource/rdbms/postgresql/type_mapper.go index 38f9ad28..ad4826db 100644 --- a/app/server/datasource/rdbms/postgresql/type_mapper.go +++ b/app/server/datasource/rdbms/postgresql/type_mapper.go @@ -10,9 +10,9 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/paging" - "github.com/ydb-platform/fq-connector-go/app/server/utils" "github.com/ydb-platform/fq-connector-go/common" ) @@ -71,7 +71,7 @@ func (typeMapper) SQLTypeToYDBColumn(columnName, typeName string, rules *api_ser } //nolint:gocyclo -func transformerFromOIDs(oids []uint32, ydbTypes []*Ydb.Type) (paging.RowTransformer[any], error) { +func transformerFromOIDs(oids []uint32, ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) { acceptors := make([]any, 0, len(oids)) appenders := make([]func(acceptor any, builder array.Builder) error, 0, len(oids)) @@ -82,52 +82,49 @@ func transformerFromOIDs(oids []uint32, ydbTypes []*Ydb.Type) (paging.RowTransfo appenders = append(appenders, func(acceptor any, builder array.Builder) error { cast := acceptor.(*pgtype.Bool) - return appendValueToArrowBuilder[bool, uint8, *array.Uint8Builder, utils.BoolConverter](cast.Bool, builder, cast.Valid) + return appendValueToArrowBuilder[bool, uint8, *array.Uint8Builder](cast.Bool, builder, cast.Valid, cc.Bool()) }) case pgtype.Int2OID: acceptors = append(acceptors, new(pgtype.Int2)) appenders = append(appenders, func(acceptor any, builder array.Builder) error { cast := acceptor.(*pgtype.Int2) - return appendValueToArrowBuilder[int16, int16, *array.Int16Builder, utils.Int16Converter](cast.Int16, builder, cast.Valid) + return appendValueToArrowBuilder[int16, int16, *array.Int16Builder](cast.Int16, builder, cast.Valid, cc.Int16()) }) case pgtype.Int4OID: acceptors = append(acceptors, new(pgtype.Int4)) appenders = append(appenders, func(acceptor any, builder array.Builder) error { cast := acceptor.(*pgtype.Int4) - return appendValueToArrowBuilder[int32, int32, *array.Int32Builder, utils.Int32Converter](cast.Int32, builder, cast.Valid) + return appendValueToArrowBuilder[int32, int32, *array.Int32Builder](cast.Int32, builder, cast.Valid, cc.Int32()) }) case pgtype.Int8OID: acceptors = append(acceptors, new(pgtype.Int8)) appenders = append(appenders, func(acceptor any, builder array.Builder) error { cast := acceptor.(*pgtype.Int8) - return appendValueToArrowBuilder[int64, int64, *array.Int64Builder, utils.Int64Converter](cast.Int64, builder, cast.Valid) + return appendValueToArrowBuilder[int64, int64, *array.Int64Builder](cast.Int64, builder, cast.Valid, cc.Int64()) }) case pgtype.Float4OID: acceptors = append(acceptors, new(pgtype.Float4)) appenders = append(appenders, func(acceptor any, builder array.Builder) error { cast := acceptor.(*pgtype.Float4) - return appendValueToArrowBuilder[float32, float32, *array.Float32Builder, utils.Float32Converter]( - cast.Float32, builder, cast.Valid) + return appendValueToArrowBuilder[float32, float32, *array.Float32Builder](cast.Float32, builder, cast.Valid, cc.Float32()) }) case pgtype.Float8OID: acceptors = append(acceptors, new(pgtype.Float8)) appenders = append(appenders, func(acceptor any, builder array.Builder) error { cast := acceptor.(*pgtype.Float8) - return appendValueToArrowBuilder[float64, float64, *array.Float64Builder, utils.Float64Converter]( - cast.Float64, builder, cast.Valid) + return appendValueToArrowBuilder[float64, float64, *array.Float64Builder](cast.Float64, builder, cast.Valid, cc.Float64()) }) case pgtype.TextOID, pgtype.BPCharOID, pgtype.VarcharOID: acceptors = append(acceptors, new(pgtype.Text)) appenders = append(appenders, func(acceptor any, builder array.Builder) error { cast := acceptor.(*pgtype.Text) - return appendValueToArrowBuilder[string, string, *array.StringBuilder, utils.StringConverter]( - cast.String, builder, cast.Valid) + return appendValueToArrowBuilder[string, string, *array.StringBuilder](cast.String, builder, cast.Valid, cc.String()) }) case pgtype.ByteaOID: acceptors = append(acceptors, new(*[]byte)) @@ -158,16 +155,15 @@ func transformerFromOIDs(oids []uint32, ydbTypes []*Ydb.Type) (paging.RowTransfo appenders = append(appenders, func(acceptor any, builder array.Builder) error { cast := acceptor.(*pgtype.Date) - return appendValuePtrToArrowBuilder[ - time.Time, string, *array.StringBuilder, utils.DateToStringConverterV2]( - &cast.Time, builder, cast.Valid) + return appendValuePtrToArrowBuilder[time.Time, string, *array.StringBuilder]( + &cast.Time, builder, cast.Valid, cc.DateToString()) }) case Ydb.Type_DATE: appenders = append(appenders, func(acceptor any, builder array.Builder) error { cast := acceptor.(*pgtype.Date) - return appendValueToArrowBuilder[time.Time, uint16, *array.Uint16Builder, utils.DateConverter]( - cast.Time, builder, cast.Valid) + return appendValueToArrowBuilder[time.Time, uint16, *array.Uint16Builder]( + cast.Time, builder, cast.Valid, cc.Date()) }) default: return nil, fmt.Errorf("unexpected ydb type %v with type oid %d: %w", ydbTypes[i], oid, common.ErrDataTypeNotSupported) @@ -188,15 +184,14 @@ func transformerFromOIDs(oids []uint32, ydbTypes []*Ydb.Type) (paging.RowTransfo return appendValuePtrToArrowBuilder[ time.Time, string, - *array.StringBuilder, - utils.TimestampToStringConverter](&cast.Time, builder, cast.Valid) + *array.StringBuilder](&cast.Time, builder, cast.Valid, cc.TimestampToString()) }) case Ydb.Type_TIMESTAMP: appenders = append(appenders, func(acceptor any, builder array.Builder) error { cast := acceptor.(*pgtype.Timestamp) - return appendValueToArrowBuilder[time.Time, uint64, *array.Uint64Builder, utils.TimestampConverter]( - cast.Time, builder, cast.Valid) + return appendValueToArrowBuilder[time.Time, uint64, *array.Uint64Builder]( + cast.Time, builder, cast.Valid, cc.Timestamp()) }) default: return nil, fmt.Errorf("unexpected ydb type %v with type oid %d: %w", ydbTypes[i], oid, common.ErrDataTypeNotSupported) @@ -213,11 +208,11 @@ func appendValueToArrowBuilder[ IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT], - CONV utils.ValueConverter[IN, OUT], ]( value any, builder array.Builder, valid bool, + conv conversion.ValueConverter[IN, OUT], ) error { if !valid { builder.AppendNull() @@ -227,9 +222,7 @@ func appendValueToArrowBuilder[ cast := value.(IN) - var converter CONV - - out, err := converter.Convert(cast) + out, err := conv.Convert(cast) if err != nil { if errors.Is(err, common.ErrValueOutOfTypeBounds) { // TODO: logger ? @@ -250,11 +243,11 @@ func appendValuePtrToArrowBuilder[ IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT], - CONV utils.ValuePtrConverter[IN, OUT], ]( value any, builder array.Builder, valid bool, + conv conversion.ValuePtrConverter[IN, OUT], ) error { if !valid { builder.AppendNull() @@ -264,9 +257,7 @@ func appendValuePtrToArrowBuilder[ cast := value.(*IN) - var converter CONV - - out, err := converter.Convert(cast) + out, err := conv.Convert(cast) if err != nil { if errors.Is(err, common.ErrValueOutOfTypeBounds) { // TODO: logger ? diff --git a/app/server/datasource/rdbms/utils/sql.go b/app/server/datasource/rdbms/utils/sql.go index 29cea8a4..9d355baf 100644 --- a/app/server/datasource/rdbms/utils/sql.go +++ b/app/server/datasource/rdbms/utils/sql.go @@ -8,6 +8,7 @@ import ( api_common "github.com/ydb-platform/fq-connector-go/api/common" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/paging" "github.com/ydb-platform/fq-connector-go/common" ) @@ -22,7 +23,7 @@ type Rows interface { Err() error Next() bool Scan(dest ...any) error - MakeTransformer(ydbTypes []*Ydb.Type) (paging.RowTransformer[any], error) + MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) } type ConnectionManager interface { diff --git a/app/server/datasource/rdbms/utils/sql_mock.go b/app/server/datasource/rdbms/utils/sql_mock.go index 13d2d6dc..b04678bc 100644 --- a/app/server/datasource/rdbms/utils/sql_mock.go +++ b/app/server/datasource/rdbms/utils/sql_mock.go @@ -9,6 +9,7 @@ import ( "go.uber.org/zap" api_common "github.com/ydb-platform/fq-connector-go/api/common" + "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/paging" ) @@ -89,7 +90,7 @@ func (m *RowsMock) Scan(dest ...any) error { return args.Error(0) } -func (m *RowsMock) MakeTransformer(ydbType []*Ydb.Type) (paging.RowTransformer[any], error) { +func (m *RowsMock) MakeTransformer(ydbType []*Ydb.Type, _ conversion.Collection) (paging.RowTransformer[any], error) { args := m.Called(ydbType) return args.Get(0).(*RowTransformerMock), args.Error(1) diff --git a/app/server/datasource/rdbms/ydb/connection_manager.go b/app/server/datasource/rdbms/ydb/connection_manager.go index 7c576292..aa6d34c5 100644 --- a/app/server/datasource/rdbms/ydb/connection_manager.go +++ b/app/server/datasource/rdbms/ydb/connection_manager.go @@ -13,6 +13,7 @@ import ( "go.uber.org/zap" api_common "github.com/ydb-platform/fq-connector-go/api/common" + "github.com/ydb-platform/fq-connector-go/app/server/conversion" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/app/server/paging" "github.com/ydb-platform/fq-connector-go/common" @@ -29,7 +30,7 @@ type rows struct { *sql.Rows } -func (r rows) MakeTransformer(ydbTypes []*Ydb.Type) (paging.RowTransformer[any], error) { +func (r rows) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) { columns, err := r.ColumnTypes() if err != nil { return nil, fmt.Errorf("column types: %w", err) @@ -40,7 +41,7 @@ func (r rows) MakeTransformer(ydbTypes []*Ydb.Type) (paging.RowTransformer[any], typeNames = append(typeNames, column.DatabaseTypeName()) } - transformer, err := transformerFromSQLTypes(typeNames, ydbTypes) + transformer, err := transformerFromSQLTypes(typeNames, ydbTypes, cc) if err != nil { return nil, fmt.Errorf("transformer from sql types: %w", err) } diff --git a/app/server/datasource/rdbms/ydb/type_mapper.go b/app/server/datasource/rdbms/ydb/type_mapper.go index 83d4bd8f..33167ae2 100644 --- a/app/server/datasource/rdbms/ydb/type_mapper.go +++ b/app/server/datasource/rdbms/ydb/type_mapper.go @@ -10,9 +10,10 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + + "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/paging" - "github.com/ydb-platform/fq-connector-go/app/server/utils" "github.com/ydb-platform/fq-connector-go/common" ) @@ -112,80 +113,76 @@ func appendToBuilderWithValueConverter[ IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT], - CONV utils.ValueConverter[IN, OUT], ]( - acceptor any, - builder array.Builder, -) error { - doublePtr := acceptor.(**IN) + conv conversion.ValueConverter[IN, OUT], +) func(acceptor any, builder array.Builder) error { + return func(acceptor any, builder array.Builder) error { + doublePtr := acceptor.(**IN) - if *doublePtr == nil { - builder.AppendNull() + if *doublePtr == nil { + builder.AppendNull() - return nil - } + return nil + } - value := **doublePtr + value := **doublePtr - var converter CONV + out, err := conv.Convert(value) + if err != nil { + if errors.Is(err, common.ErrValueOutOfTypeBounds) { + // TODO: write warning to logger + builder.AppendNull() - out, err := converter.Convert(value) - if err != nil { - if errors.Is(err, common.ErrValueOutOfTypeBounds) { - // TODO: write warning to logger - builder.AppendNull() + return nil + } - return nil + return fmt.Errorf("convert value %v: %w", value, err) } - return fmt.Errorf("convert value %v: %w", value, err) - } + //nolint:forcetypeassert + builder.(AB).Append(out) - //nolint:forcetypeassert - builder.(AB).Append(out) - - return nil + return nil + } } func appendToBuilderWithValuePtrConverter[ IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT], - CONV utils.ValuePtrConverter[IN, OUT], ]( - acceptor any, - builder array.Builder, -) error { - doublePtr := acceptor.(**IN) + conv conversion.ValuePtrConverter[IN, OUT], +) func(acceptor any, builder array.Builder) error { + return func(acceptor any, builder array.Builder) error { + doublePtr := acceptor.(**IN) - ptr := *doublePtr - if ptr == nil { - builder.AppendNull() - - return nil - } - - var converter CONV - - out, err := converter.Convert(ptr) - if err != nil { - if errors.Is(err, common.ErrValueOutOfTypeBounds) { - // TODO: write warning to logger + ptr := *doublePtr + if ptr == nil { builder.AppendNull() return nil } - return fmt.Errorf("convert value %v: %w", ptr, err) - } + out, err := conv.Convert(ptr) + if err != nil { + if errors.Is(err, common.ErrValueOutOfTypeBounds) { + // TODO: write warning to logger + builder.AppendNull() + + return nil + } + + return fmt.Errorf("convert value %v: %w", ptr, err) + } - //nolint:forcetypeassert - builder.(AB).Append(out) + //nolint:forcetypeassert + builder.(AB).Append(out) - return nil + return nil + } } -func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type) (paging.RowTransformer[any], error) { +func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) { acceptors := make([]any, 0, len(typeNames)) appenders := make([]func(acceptor any, builder array.Builder) error, 0, len(typeNames)) @@ -199,7 +196,7 @@ func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type) (paging.R return nil, fmt.Errorf("ydb type to ydb primitive type id: %w", err) } - acceptor, appender, err := makeAcceptorAndAppenderFromSQLType(typeName, ydbTypeID) + acceptor, appender, err := makeAcceptorAndAppenderFromSQLType(typeName, ydbTypeID, cc) if err != nil { return nil, fmt.Errorf("make transformer: %w", err) } @@ -215,40 +212,41 @@ func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type) (paging.R func makeAcceptorAndAppenderFromSQLType( typeName string, ydbTypeID Ydb.Type_PrimitiveTypeId, + cc conversion.Collection, ) (any, func(acceptor any, builder array.Builder) error, error) { switch typeName { case BoolType: - return new(*bool), appendToBuilderWithValueConverter[bool, uint8, *array.Uint8Builder, utils.BoolConverter], nil + return new(*bool), appendToBuilderWithValueConverter[bool, uint8, *array.Uint8Builder](cc.Bool()), nil case Int8Type: - return new(*int8), appendToBuilderWithValueConverter[int8, int8, *array.Int8Builder, utils.Int8Converter], nil + return new(*int8), appendToBuilderWithValueConverter[int8, int8, *array.Int8Builder](cc.Int8()), nil case Int16Type: - return new(*int16), appendToBuilderWithValueConverter[int16, int16, *array.Int16Builder, utils.Int16Converter], nil + return new(*int16), appendToBuilderWithValueConverter[int16, int16, *array.Int16Builder](cc.Int16()), nil case Int32Type: - return new(*int32), appendToBuilderWithValueConverter[int32, int32, *array.Int32Builder, utils.Int32Converter], nil + return new(*int32), appendToBuilderWithValueConverter[int32, int32, *array.Int32Builder](cc.Int32()), nil case Int64Type: - return new(*int64), appendToBuilderWithValueConverter[int64, int64, *array.Int64Builder, utils.Int64Converter], nil + return new(*int64), appendToBuilderWithValueConverter[int64, int64, *array.Int64Builder](cc.Int64()), nil case Uint8Type: - return new(*uint8), appendToBuilderWithValueConverter[uint8, uint8, *array.Uint8Builder, utils.Uint8Converter], nil + return new(*uint8), appendToBuilderWithValueConverter[uint8, uint8, *array.Uint8Builder](cc.Uint8()), nil case Uint16Type: - return new(*uint16), appendToBuilderWithValueConverter[uint16, uint16, *array.Uint16Builder, utils.Uint16Converter], nil + return new(*uint16), appendToBuilderWithValueConverter[uint16, uint16, *array.Uint16Builder](cc.Uint16()), nil case Uint32Type: - return new(*uint32), appendToBuilderWithValueConverter[uint32, uint32, *array.Uint32Builder, utils.Uint32Converter], nil + return new(*uint32), appendToBuilderWithValueConverter[uint32, uint32, *array.Uint32Builder](cc.Uint32()), nil case Uint64Type: - return new(*uint64), appendToBuilderWithValueConverter[uint64, uint64, *array.Uint64Builder, utils.Uint64Converter], nil + return new(*uint64), appendToBuilderWithValueConverter[uint64, uint64, *array.Uint64Builder](cc.Uint64()), nil case FloatType: - return new(*float32), appendToBuilderWithValueConverter[float32, float32, *array.Float32Builder, utils.Float32Converter], nil + return new(*float32), appendToBuilderWithValueConverter[float32, float32, *array.Float32Builder](cc.Float32()), nil case DoubleType: - return new(*float64), appendToBuilderWithValueConverter[float64, float64, *array.Float64Builder, utils.Float64Converter], nil + return new(*float64), appendToBuilderWithValueConverter[float64, float64, *array.Float64Builder](cc.Float64()), nil case StringType: - return new(*[]byte), appendToBuilderWithValueConverter[[]byte, []byte, *array.BinaryBuilder, utils.BytesConverter], nil + return new(*[]byte), appendToBuilderWithValueConverter[[]byte, []byte, *array.BinaryBuilder](cc.Bytes()), nil case Utf8Type: - return new(*string), appendToBuilderWithValueConverter[string, string, *array.StringBuilder, utils.StringConverter], nil + return new(*string), appendToBuilderWithValueConverter[string, string, *array.StringBuilder](cc.String()), nil case DateType: switch ydbTypeID { case Ydb.Type_DATE: - return new(*time.Time), appendToBuilderWithValueConverter[time.Time, uint16, *array.Uint16Builder, utils.DateConverter], nil + return new(*time.Time), appendToBuilderWithValueConverter[time.Time, uint16, *array.Uint16Builder](cc.Date()), nil case Ydb.Type_UTF8: - return new(*time.Time), appendToBuilderWithValuePtrConverter[time.Time, string, *array.StringBuilder, utils.DateToStringConverterV2], nil + return new(*time.Time), appendToBuilderWithValuePtrConverter[time.Time, string, *array.StringBuilder](cc.DateToString()), nil default: return nil, nil, fmt.Errorf("unexpected ydb type id %v with sql type %s: %w", ydbTypeID, typeName, common.ErrDataTypeNotSupported) @@ -256,11 +254,11 @@ func makeAcceptorAndAppenderFromSQLType( case DatetimeType: switch ydbTypeID { case Ydb.Type_DATETIME: - return new(*time.Time), appendToBuilderWithValueConverter[ - time.Time, uint32, *array.Uint32Builder, utils.DatetimeConverter], nil + return new(*time.Time), appendToBuilderWithValueConverter[time.Time, uint32, *array.Uint32Builder](cc.Datetime()), nil case Ydb.Type_UTF8: - return new(*time.Time), appendToBuilderWithValuePtrConverter[ - time.Time, string, *array.StringBuilder, utils.DatetimeToStringConverter], nil + return new(*time.Time), + appendToBuilderWithValuePtrConverter[time.Time, string, *array.StringBuilder](cc.DatetimeToString()), + nil default: return nil, nil, fmt.Errorf("unexpected ydb type id %v with sql type %s: %w", ydbTypeID, typeName, common.ErrDataTypeNotSupported) @@ -268,10 +266,11 @@ func makeAcceptorAndAppenderFromSQLType( case TimestampType: switch ydbTypeID { case Ydb.Type_TIMESTAMP: - return new(*time.Time), appendToBuilderWithValueConverter[time.Time, uint64, *array.Uint64Builder, utils.TimestampConverter], nil + return new(*time.Time), appendToBuilderWithValueConverter[time.Time, uint64, *array.Uint64Builder](cc.Timestamp()), nil case Ydb.Type_UTF8: return new(*time.Time), - appendToBuilderWithValuePtrConverter[time.Time, string, *array.StringBuilder, utils.TimestampToStringConverter], nil + appendToBuilderWithValuePtrConverter[time.Time, string, *array.StringBuilder](cc.TimestampToString()), + nil default: return nil, nil, fmt.Errorf("unexpected ydb type id %v with sql type %s: %w", ydbTypeID, typeName, common.ErrDataTypeNotSupported) diff --git a/app/server/embedded.go b/app/server/embedded.go index 885b991a..14caf518 100644 --- a/app/server/embedded.go +++ b/app/server/embedded.go @@ -113,6 +113,18 @@ func WithPprofServerConfig(pprofServerConfig *config.TPprofServerConfig) Embedde return &withPprofServerConfig{pprofServerConfig: pprofServerConfig} } +type withConversionConfig struct { + conversionConfig *config.TConversionConfig +} + +func (o *withConversionConfig) apply(cfg *config.TServerConfig) { + cfg.Conversion = o.conversionConfig +} + +func WithConversionConfig(conversionConfig *config.TConversionConfig) EmbeddedOption { + return &withConversionConfig{conversionConfig: conversionConfig} +} + func NewEmbedded(options ...EmbeddedOption) (*Embedded, error) { cfg := NewDefaultConfig() for _, o := range options { diff --git a/app/server/service_connector.go b/app/server/service_connector.go index 5cb1c034..183144ac 100644 --- a/app/server/service_connector.go +++ b/app/server/service_connector.go @@ -16,6 +16,7 @@ import ( api_service "github.com/ydb-platform/fq-connector-go/api/service" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" "github.com/ydb-platform/fq-connector-go/app/config" + "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/paging" "github.com/ydb-platform/fq-connector-go/common" "github.com/ydb-platform/fq-connector-go/library/go/core/metrics/solomon" @@ -290,6 +291,7 @@ func newServiceConnector( queryLoggerFactory, memory.DefaultAllocator, paging.NewReadLimiterFactory(cfg.ReadLimit), + conversion.NewCollection(cfg.Conversion), cfg), logger: logger, grpcServer: grpcServer, diff --git a/app/server/streaming/streamer_test.go b/app/server/streaming/streamer_test.go index d20d162c..cb6d6592 100644 --- a/app/server/streaming/streamer_test.go +++ b/app/server/streaming/streamer_test.go @@ -19,6 +19,7 @@ import ( api_service "github.com/ydb-platform/fq-connector-go/api/service" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" "github.com/ydb-platform/fq-connector-go/app/config" + "github.com/ydb-platform/fq-connector-go/app/server/conversion" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/clickhouse" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" @@ -235,7 +236,9 @@ func (tc testCaseStreaming) execute(t *testing.T) { TypeMapper: typeMapper, } - dataSource := rdbms.NewDataSource(logger, dataSourcePreset) + converterCollection := conversion.NewCollection(&config.TConversionConfig{UseUnsafeConverters: true}) + + dataSource := rdbms.NewDataSource(logger, dataSourcePreset, converterCollection) columnarBufferFactory, err := paging.NewColumnarBufferFactory[any]( logger, diff --git a/app/server/utils/converters.go b/app/server/utils/converters.go deleted file mode 100644 index 94b2f4ec..00000000 --- a/app/server/utils/converters.go +++ /dev/null @@ -1,203 +0,0 @@ -package utils - -import ( - "fmt" - "time" - "unsafe" - - "github.com/ydb-platform/fq-connector-go/common" -) - -type ValueConverter[IN common.ValueType, OUT common.ValueType] interface { - Convert(in IN) (OUT, error) -} - -type ValuePtrConverter[IN common.ValueType, OUT common.ValueType] interface { - Convert(in *IN) (OUT, error) -} - -type BoolConverter struct{} - -func (BoolConverter) Convert(in bool) (uint8, error) { - // For a some reason, Bool values are converted to Arrow Uint8 rather than to Arrow native Bool. - // See https://st.yandex-team.ru/YQL-15332 for more details. - if in { - return 1, nil - } - - return 0, nil -} - -type Int8Converter struct{} - -func (Int8Converter) Convert(in int8) (int8, error) { return in, nil } - -type Int16Converter struct{} - -func (Int16Converter) Convert(in int16) (int16, error) { return in, nil } - -type Int32Converter struct{} - -func (Int32Converter) Convert(in int32) (int32, error) { return in, nil } - -type Int64Converter struct{} - -func (Int64Converter) Convert(in int64) (int64, error) { return in, nil } - -type Uint8Converter struct{} - -func (Uint8Converter) Convert(in uint8) (uint8, error) { return in, nil } - -type Uint16Converter struct{} - -func (Uint16Converter) Convert(in uint16) (uint16, error) { return in, nil } - -type Uint32Converter struct{} - -func (Uint32Converter) Convert(in uint32) (uint32, error) { return in, nil } - -type Uint64Converter struct{} - -func (Uint64Converter) Convert(in uint64) (uint64, error) { return in, nil } - -type Float32Converter struct{} - -func (Float32Converter) Convert(in float32) (float32, error) { return in, nil } - -type Float64Converter struct{} - -func (Float64Converter) Convert(in float64) (float64, error) { return in, nil } - -type StringConverter struct{} - -func (StringConverter) Convert(in string) (string, error) { return in, nil } - -type StringToBytesConverter struct{} - -func (StringToBytesConverter) Convert(in string) ([]byte, error) { return []byte(in), nil } - -type BytesConverter struct{} - -func (BytesConverter) Convert(in []byte) ([]byte, error) { return in, nil } - -type DateConverter struct{} - -func (DateConverter) Convert(in time.Time) (uint16, error) { - out, err := common.TimeToYDBDate(&in) - - if err != nil { - return 0, fmt.Errorf("convert time to YDB Date: %w", err) - } - - return out, nil -} - -type DateToStringConverter struct{} - -func (DateToStringConverter) Convert(in *time.Time) (string, error) { - return in.Format("2006-01-02"), nil -} - -func absInt(x int) int { - if x < 0 { - return -x - } - - return x -} - -//go:linkname decomposeDate time.(*Time).date -func decomposeDate(*time.Time, bool) (year int, month int, day int, dayOfYear int) - -//go:linkname formatBits strconv.formatBits -func formatBits([]byte, uint64, int, bool, bool) (b []byte, s string) - -type DateToStringConverterV2 struct{} - -func (DateToStringConverterV2) Convert(in *time.Time) (string, error) { - buf := make([]byte, 0, 11) - - year, month, day, _ := decomposeDate(in, true) - - // year - - if year < 0 { - buf = append(buf, byte('-')) - } - - absYear := absInt(year) - - switch { - case absYear < 10: - buf = append(buf, []byte("000")...) - case absYear < 100: - buf = append(buf, []byte("00")...) - case absYear < 1000: - buf = append(buf, byte('0')) - } - - buf, _ = formatBits(buf, uint64(absYear), 10, false, true) - - // month - - buf = append(buf, byte('-')) - if month < 10 { - buf = append(buf, byte('0')) - } - - buf, _ = formatBits(buf, uint64(month), 10, false, true) - - // day - - buf = append(buf, byte('-')) - if day < 10 { - buf = append(buf, byte('0')) - } - - buf, _ = formatBits(buf, uint64(day), 10, false, true) - - p := unsafe.SliceData(buf) - - return unsafe.String(p, len(buf)), nil -} - -type DatetimeConverter struct{} - -func (DatetimeConverter) Convert(in time.Time) (uint32, error) { - out, err := common.TimeToYDBDatetime(&in) - - if err != nil { - return 0, fmt.Errorf("convert time to YDB Datetime: %w", err) - } - - return out, nil -} - -type DatetimeToStringConverter struct{} - -func (DatetimeToStringConverter) Convert(in *time.Time) (string, error) { - return in.UTC().Format("2006-01-02T15:04:05Z"), nil -} - -type TimestampConverter struct{} - -func (TimestampConverter) Convert(in time.Time) (uint64, error) { - out, err := common.TimeToYDBTimestamp(&in) - - if err != nil { - return 0, fmt.Errorf("convert time to YDB Timestamp: %w", err) - } - - return out, nil -} - -type TimestampToStringConverter struct{} - -func (TimestampToStringConverter) Convert(in *time.Time) (string, error) { - // Using accuracy of 9 decimal places is enough for supported data sources - // Max accuracy of date/time formats: - // PostgreSQL - 1 microsecond (10^-6 s) - // ClickHouse - 1 nanosecond (10^-9 s) - // Trailing zeros are omitted - return in.UTC().Format("2006-01-02T15:04:05.999999999Z"), nil -} diff --git a/example.conf b/example.conf index 5fb764bd..d3b5b044 100644 --- a/example.conf +++ b/example.conf @@ -21,3 +21,7 @@ paging { bytes_per_page: 4194304 prefetch_queue_capacity: 2 } + +conversion { + use_unsafe_converters: false +} diff --git a/scripts/bench/postgresql_datetime.txt b/scripts/bench/postgresql_datetime.txt new file mode 100644 index 00000000..90cc3097 --- /dev/null +++ b/scripts/bench/postgresql_datetime.txt @@ -0,0 +1,43 @@ +server_local: { + endpoint: { + host: "localhost" + port: 50051 + } +} + +data_source_instance { + kind: POSTGRESQL + endpoint { + host: "localhost" + port: 5432 + } + database: "tpch" + credentials { + basic { + username: "admin" + password: "password" + } + } + protocol: NATIVE + pg_options: { + schema: "public" + } +} + +table: "lineitem" + +test_cases: [ + { + server_params: { + paging: { + bytes_per_page: 4194304 + prefetch_queue_capacity: 2 + } + }, + columns: [ + "l_shipdate" + ] + } +] + +result_dir: "/home/vitalyisaev/projects/fq-connector-go/scripts/bench/postgresql/results/datetime" diff --git a/tests/suite/suite.go b/tests/suite/suite.go index 97ffc3a9..8a2ef2bb 100644 --- a/tests/suite/suite.go +++ b/tests/suite/suite.go @@ -44,12 +44,19 @@ func (b *Base) SetupSuite() { // We want to run a distinct instance of Connector for every suite var err error - loggerCfg := &config.TLoggerConfig{ - LogLevel: config.ELogLevel_DEBUG, - EnableSqlQueryLogging: true, - } - - b.Connector, err = server.NewEmbedded(server.WithLoggerConfig(loggerCfg)) + b.Connector, err = server.NewEmbedded( + server.WithLoggerConfig( + &config.TLoggerConfig{ + LogLevel: config.ELogLevel_ERROR, + EnableSqlQueryLogging: true, + }, + ), + server.WithConversionConfig( + &config.TConversionConfig{ + UseUnsafeConverters: true, + }, + ), + ) b.Require().NoError(err) b.Connector.Start() } From 24ea4ecccd578b660e815a6468dcfe6b770a1f12 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Thu, 1 Feb 2024 17:55:57 +0300 Subject: [PATCH 3/3] More comprehensive testing for unsafe DateToString converter --- app/server/conversion/converters_test.go | 63 ++++++++++++++++++------ 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/app/server/conversion/converters_test.go b/app/server/conversion/converters_test.go index efe0b22f..ff0bfb59 100644 --- a/app/server/conversion/converters_test.go +++ b/app/server/conversion/converters_test.go @@ -1,6 +1,7 @@ package conversion import ( + "math" "testing" "time" @@ -8,30 +9,36 @@ import ( ) func TestDateToStringConverter(t *testing.T) { - type testCase struct { - in time.Time - } - - testCases := []testCase{ - { - in: time.Date(1950, 5, 27, 0, 0, 0, 0, time.UTC), - }, - { - in: time.Date(-1, 1, 1, 0, 0, 0, 0, time.UTC), - }, + testCases := []time.Time{ + time.Date(math.MaxInt, math.MaxInt, math.MaxInt, math.MaxInt, math.MaxInt, math.MaxInt, math.MaxInt, time.UTC), + time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC), + time.Date(math.MinInt, math.MinInt, math.MinInt, math.MinInt, math.MinInt, math.MinInt, math.MinInt, time.UTC), + time.Date(1950, 5, 27, 0, 0, 0, 0, time.UTC), + time.Date(-1, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(100500, 5, 20, -12, -55, -8, 0, time.UTC), + time.Date(1988, 11, 20, 12, 55, 8, 0, time.UTC), + time.Date(100, 2, 3, 4, 5, 6, 7, time.UTC), + time.Date(10, 2, 3, 4, 5, 6, 7, time.UTC), + time.Date(1, 2, 3, 4, 5, 6, 7, time.UTC), + time.Date(-100500, -10, -35, -8, -2000, -300000, -50404040, time.UTC), } const format = "2006-01-02" - var converter dateToStringConverterUnsafe + var ( + converterUnsafe dateToStringConverterUnsafe + converterDefault dateToStringConverter + ) for _, tc := range testCases { tc := tc - t.Run(tc.in.Format(format), func(t *testing.T) { - actual, err := converter.Convert(&tc.in) + t.Run(tc.Format(format), func(t *testing.T) { + // Check equivalence of results produced by default and unsafe converters + expectedOut, err := converterDefault.Convert(&tc) require.NoError(t, err) - // check equivalence - require.Equal(t, tc.in.Format(format), actual) + actualOut, err := converterUnsafe.Convert(&tc) + require.NoError(t, err) + require.Equal(t, expectedOut, actualOut) }) } } @@ -63,3 +70,27 @@ func BenchmarkDateToStringConverter(b *testing.B) { } }) } + +func FuzzDateToStringConverter(f *testing.F) { + var ( + converterUnsafe dateToStringConverterUnsafe + converterDefault dateToStringConverter + ) + + f.Add(100500, 5, 20, -12, -55, -8, 0) + f.Add(1988, 11, 20, 12, 55, 8, 0) + f.Add(100, 2, 3, 4, 5, 6, 7) + f.Add(10, 2, 3, 4, 5, 6, 7) + f.Add(1, 2, 3, 4, 5, 6, 7) + f.Add(-1, 1, 1, 0, 0, 0, 0) + f.Add(-100500, -10, -35, -8, -2000, -300000, -50404040) + + f.Fuzz(func(t *testing.T, year int, month int, day int, hour int, min int, sec int, nsec int) { + in := time.Date(year, time.Month(month), day, hour, min, sec, nsec, time.UTC) + expectedOut, err := converterDefault.Convert(&in) + require.NoError(t, err) + actualOut, err := converterUnsafe.Convert(&in) + require.NoError(t, err) + require.Equal(t, expectedOut, actualOut) + }) +}