Skip to content

Commit

Permalink
PostgreSQL, MySQL, YDB: pushdown YQL Timestamp (#231)
Browse files Browse the repository at this point in the history
* Configure pushdown options for different data sources

* PostgreSQL: integrational tests for timestamp pushdown

* ClickHouse: troubles with DateTime64 pushdown

* Greenplum: integrational tests for timestamp pushdown

* YDB: integrational tests for timestamp pushdown

* MySQL: integrational tests for timestamp pushdown

* ClickHouse: rollback app reproducing SDK issue

* MS SQL Server: troubles with timestamp pushdown

* Review fixes
  • Loading branch information
vitalyisaev2 authored Jan 30, 2025
1 parent 3938bf0 commit 237a5fd
Show file tree
Hide file tree
Showing 41 changed files with 1,222 additions and 381 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ integration_test_build:
go test -c -o fq-connector-go-tests ./tests

integration_test_env_clean:
docker compose -f ./tests/infra/datasource/docker-compose.yaml stop
docker compose -f ./tests/infra/datasource/docker-compose.yaml rm -f -v
docker compose -f ./tests/infra/datasource/docker-compose.yaml down -v

integration_test_env_run: integration_test_env_clean
docker compose -f ./tests/infra/datasource/docker-compose.yaml up -d --build --pull=always
Expand Down
811 changes: 486 additions & 325 deletions app/config/server.pb.go

Large diffs are not rendered by default.

15 changes: 14 additions & 1 deletion app/config/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ message TExponentialBackoffConfig {
string max_elapsed_time = 5;
}

// TPushdownConfig contains various settings for predicate pushdown
message TPushdownConfig {
// Enables filter pushdown for columns of YQL Timestamp type
bool enable_timestamp_pushdown = 1;
}


// TClickHouseConfig contains settings specific for ClickHouse data source
message TClickHouseConfig {
Expand All @@ -147,6 +153,7 @@ message TClickHouseConfig {
string ping_connection_timeout = 2;

TExponentialBackoffConfig exponential_backoff = 10;
TPushdownConfig pushdown = 11;
}

// TGreenplumConfig contains settings specific for Greenplum data source
Expand All @@ -156,6 +163,7 @@ message TGreenplumConfig {
string open_connection_timeout = 1;

TExponentialBackoffConfig exponential_backoff = 10;
TPushdownConfig pushdown = 11;
}

// TMsSQLServerConfig contains settings specific for MsSQLServer data source
Expand All @@ -168,6 +176,7 @@ message TMsSQLServerConfig {
string ping_connection_timeout = 1;

TExponentialBackoffConfig exponential_backoff = 10;
TPushdownConfig pushdown = 11;
}


Expand All @@ -179,6 +188,7 @@ message TMySQLConfig {
string open_connection_timeout = 2;

TExponentialBackoffConfig exponential_backoff = 10;
TPushdownConfig pushdown = 11;
}

// TOracleConfig contains settings specific for Oracle data source
Expand All @@ -191,6 +201,7 @@ message TOracleConfig {
string ping_connection_timeout = 2;

TExponentialBackoffConfig exponential_backoff = 10;
TPushdownConfig pushdown = 11;
}

// TPostgreSQLConfig contains settings specific for PostgreSQL data source
Expand All @@ -200,6 +211,7 @@ message TPostgreSQLConfig {
string open_connection_timeout = 1;

TExponentialBackoffConfig exponential_backoff = 10;
TPushdownConfig pushdown = 11;
}

// TYdbConfig contains settings specific for YDB data source
Expand Down Expand Up @@ -238,9 +250,10 @@ message TYdbConfig {
NYql.TGenericEndpoint iam_endpoint = 6;

TExponentialBackoffConfig exponential_backoff = 10;
TPushdownConfig pushdown = 11;
}

// TLoggingConfig containes settings specific for Logging external datasource
// TLoggingConfig contains settings specific for Logging external datasource
message TLoggingConfig {
// Under the hood the Logging connector goes to YDB,
// so all YDB settings can be applied to this connector as well.
Expand Down
14 changes: 14 additions & 0 deletions app/server/config/config.debug.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,38 @@ data_source_default: &data_source_default_var
datasources:
clickhouse:
<<: *data_source_default_var
pushdown:
enable_timestamp_pushdown: false # YQ-4063

greenplum:
<<: *data_source_default_var
pushdown:
enable_timestamp_pushdown: true

ms_sql_server:
<<: *data_source_default_var
pushdown:
enable_timestamp_pushdown: false # YQ-4062

mysql:
<<: *data_source_default_var
result_chan_capacity: 1024
pushdown:
enable_timestamp_pushdown: true

postgresql:
<<: *data_source_default_var
pushdown:
enable_timestamp_pushdown: true

oracle:
<<: *data_source_default_var
pushdown:
enable_timestamp_pushdown: false # YQ-3527

ydb:
<<: *data_source_default_var
use_underlay_network_for_dedicated_databases: false
mode: MODE_QUERY_SERVICE_NATIVE
pushdown:
enable_timestamp_pushdown: true
92 changes: 88 additions & 4 deletions app/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ func makeDefaultExponentialBackoffConfig() *config.TExponentialBackoffConfig {
}
}

func makeDefaultPushdownConfig() *config.TPushdownConfig {
return &config.TPushdownConfig{
EnableTimestampPushdown: false,
}
}

// TODO: use reflection to generalize datasource setting code
//
//nolint:gocyclo
Expand Down Expand Up @@ -70,6 +76,10 @@ func fillServerConfigDefaults(c *config.TServerConfig) {
c.Datasources.Clickhouse.ExponentialBackoff = makeDefaultExponentialBackoffConfig()
}

if c.Datasources.Clickhouse.Pushdown == nil {
c.Datasources.Clickhouse.Pushdown = makeDefaultPushdownConfig()
}

// Greenplum

if c.Datasources.Greenplum == nil {
Expand All @@ -82,6 +92,10 @@ func fillServerConfigDefaults(c *config.TServerConfig) {
c.Datasources.Greenplum.ExponentialBackoff = makeDefaultExponentialBackoffConfig()
}

if c.Datasources.Greenplum.Pushdown == nil {
c.Datasources.Greenplum.Pushdown = makeDefaultPushdownConfig()
}

// MS SQL Server

if c.Datasources.MsSqlServer == nil {
Expand All @@ -95,6 +109,10 @@ func fillServerConfigDefaults(c *config.TServerConfig) {
c.Datasources.MsSqlServer.ExponentialBackoff = makeDefaultExponentialBackoffConfig()
}

if c.Datasources.MsSqlServer.Pushdown == nil {
c.Datasources.MsSqlServer.Pushdown = makeDefaultPushdownConfig()
}

// MySQL

if c.Datasources.Mysql == nil {
Expand All @@ -108,6 +126,10 @@ func fillServerConfigDefaults(c *config.TServerConfig) {
c.Datasources.Mysql.ExponentialBackoff = makeDefaultExponentialBackoffConfig()
}

if c.Datasources.Mysql.Pushdown == nil {
c.Datasources.Mysql.Pushdown = makeDefaultPushdownConfig()
}

// Oracle

if c.Datasources.Oracle == nil {
Expand All @@ -121,6 +143,10 @@ func fillServerConfigDefaults(c *config.TServerConfig) {
c.Datasources.Oracle.ExponentialBackoff = makeDefaultExponentialBackoffConfig()
}

if c.Datasources.Oracle.Pushdown == nil {
c.Datasources.Oracle.Pushdown = makeDefaultPushdownConfig()
}

// PostgreSQL

if c.Datasources.Postgresql == nil {
Expand All @@ -133,6 +159,10 @@ func fillServerConfigDefaults(c *config.TServerConfig) {
c.Datasources.Postgresql.ExponentialBackoff = makeDefaultExponentialBackoffConfig()
}

if c.Datasources.Postgresql.Pushdown == nil {
c.Datasources.Postgresql.Pushdown = makeDefaultPushdownConfig()
}

// YDB

if c.Datasources.Ydb == nil {
Expand Down Expand Up @@ -172,6 +202,10 @@ func fillYdbConfigDefaults(c *config.TYdbConfig) {
c.ExponentialBackoff = makeDefaultExponentialBackoffConfig()
}

if c.Pushdown == nil {
c.Pushdown = makeDefaultPushdownConfig()
}

if c.ServiceAccountKeyFileCredentials != "" {
if c.IamEndpoint == nil {
c.IamEndpoint = &api_common.TGenericEndpoint{
Expand Down Expand Up @@ -322,20 +356,70 @@ func validateDatasourcesConfig(c *config.TDatasourcesConfig) error {
return fmt.Errorf("required section is missing")
}

if err := validateYdbConfig(c.Ydb); err != nil {
return fmt.Errorf("validate `ydb`: %w", err)
if err := validateRelationalDatasourceConfig(c.Clickhouse); err != nil {
return fmt.Errorf("validate `clickhouse`: %w", err)
}

if err := validateRelationalDatasourceConfig(c.Greenplum); err != nil {
return fmt.Errorf("validate `greenplum`: %w", err)
}

if err := validateLoggingConfig(c.Logging); err != nil {
return fmt.Errorf("validate `logging`: %w", err)
}

if err := validateRelationalDatasourceConfig(c.MsSqlServer); err != nil {
return fmt.Errorf("validate `ms_sql_server`: %w", err)
}

if err := validateRelationalDatasourceConfig(c.Mysql); err != nil {
return fmt.Errorf("validate `mysql`: %w", err)
}

if err := validateRelationalDatasourceConfig(c.Oracle); err != nil {
return fmt.Errorf("validate `oracle`: %w", err)
}

if err := validateRelationalDatasourceConfig(c.Postgresql); err != nil {
return fmt.Errorf("validate `postgresql`: %w", err)
}

if err := validateYdbConfig(c.Ydb); err != nil {
return fmt.Errorf("validate `ydb`: %w", err)
}

return nil
}

type relationalDatasourceConfig interface {
GetOpenConnectionTimeout() string
GetExponentialBackoff() *config.TExponentialBackoffConfig
GetPushdown() *config.TPushdownConfig
}

func validateRelationalDatasourceConfig(c relationalDatasourceConfig) error {
if c == nil {
return nil
}

if _, err := common.DurationFromString(c.GetOpenConnectionTimeout()); err != nil {
return fmt.Errorf("validate `open_connection_timeout`: %v", err)
}

if c.GetExponentialBackoff() == nil {
return errors.New("missing `exponential_backoff`")
}

if c.GetPushdown() == nil {
return errors.New("missing `pushdown`")
}

return nil
}

func validateYdbConfig(c *config.TYdbConfig) error {
if c == nil {
return fmt.Errorf("required section is missing")
return nil
}

if _, err := common.DurationFromString(c.OpenConnectionTimeout); err != nil {
Expand Down Expand Up @@ -377,7 +461,7 @@ func validateYdbConfig(c *config.TYdbConfig) error {

func validateLoggingConfig(c *config.TLoggingConfig) error {
if c == nil {
return fmt.Errorf("required section is missing")
return nil
}

if err := validateYdbConfig(c.Ydb); err != nil {
Expand Down
10 changes: 7 additions & 3 deletions app/server/datasource/rdbms/clickhouse/sql_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ import (
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"

api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
"github.com/ydb-platform/fq-connector-go/app/config"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
)

var _ rdbms_utils.SQLFormatter = (*sqlFormatter)(nil)

type sqlFormatter struct {
cfg *config.TPushdownConfig
}

func (sqlFormatter) supportsType(typeID Ydb.Type_PrimitiveTypeId) bool {
func (f *sqlFormatter) supportsType(typeID Ydb.Type_PrimitiveTypeId) bool {
switch typeID {
case Ydb.Type_BOOL:
return true
Expand All @@ -40,6 +42,8 @@ func (sqlFormatter) supportsType(typeID Ydb.Type_PrimitiveTypeId) bool {
return true
case Ydb.Type_JSON:
return false
case Ydb.Type_TIMESTAMP:
return f.cfg.EnableTimestampPushdown
default:
return false
}
Expand Down Expand Up @@ -87,6 +91,6 @@ func (f sqlFormatter) FormatFrom(_, tableName string) string {
return f.SanitiseIdentifier(tableName)
}

func NewSQLFormatter() rdbms_utils.SQLFormatter {
return sqlFormatter{}
func NewSQLFormatter(cfg *config.TPushdownConfig) rdbms_utils.SQLFormatter {
return sqlFormatter{cfg: cfg}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestMakeSQLFormatterQuery(t *testing.T) {
}

logger := common.NewTestLogger(t)
formatter := NewSQLFormatter()
formatter := NewSQLFormatter(nil)

tcs := []testCase{
{
Expand Down
Loading

0 comments on commit 237a5fd

Please sign in to comment.