Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-2745: Extended integration tests #20

Merged
merged 2 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ jobs:
- name: Run integration tests
run: |
go test -c -o fq-connector-go-tests -coverpkg=./... -covermode=atomic ./tests
./fq-connector-go-tests -projectPath=$(pwd) -test.coverprofile=coverage_integratoin_tests.out
./fq-connector-go-tests -projectPath=$(pwd) -test.coverprofile=coverage_integration_tests.out
- name: Union coverage
run: |
cat coverage* | grep -v pb.go | grep -v mock.go > coverage.out
cat coverage_unit_tests.out | grep -v 'pb.go\|mock.go\|library' > coverage.out
cat coverage_integration_tests.out | grep -v 'atomic\|pb.go\|mock.go\|library' >> coverage.out
go tool cover -func=coverage.out
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4-beta
with:
Expand Down
17 changes: 13 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
PROJECT_PATH = $(shell pwd)

build:
go build -o fq-connector-go ./app

Expand All @@ -10,14 +12,21 @@ lint:
unit_test:
go test -v ./app/...

PROJECT_PATH = $(shell pwd)
integration_test: integration_test_clean
docker-compose -f ./tests/infra/datasource/docker-compose.yaml up -d
integration_test: integration_test_env
go test -c -o fq-connector-go-tests ./tests
./fq-connector-go-tests -projectPath=$(PROJECT_PATH)

integration_test_clean:
integration_test_env:
docker-compose -f ./tests/infra/datasource/docker-compose.yaml rm -f -v
docker-compose -f ./tests/infra/datasource/docker-compose.yaml up -d

test_coverage: integration_test_env
go test -coverpkg=./... -coverprofile=coverage_unit_tests.out -covermode=atomic ./app/...
go test -c -o fq-connector-go-tests -coverpkg=./... -covermode=atomic ./tests
./fq-connector-go-tests -projectPath=$(PROJECT_PATH) -test.coverprofile=coverage_integration_tests.out
cat coverage_unit_tests.out | grep -v 'pb.go\|mock.go\|library' > coverage.out
cat coverage_integration_tests.out | grep -v 'atomic\|pb.go\|mock.go\|library' >> coverage.out
go tool cover -func=coverage.out

build_image_base:
docker build -t ghcr.io/ydb-platform/fq-connector-go:base -f ./Dockerfile.base .
20 changes: 10 additions & 10 deletions app/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
api_common "github.com/ydb-platform/fq-connector-go/api/common"
api_service "github.com/ydb-platform/fq-connector-go/api/service"
api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
"github.com/ydb-platform/fq-connector-go/app/common"
"github.com/ydb-platform/fq-connector-go/app/config"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
)

const (
Expand Down Expand Up @@ -53,7 +53,7 @@ func runClient(_ *cobra.Command, args []string) error {
return fmt.Errorf("unknown instance: %w", err)
}

logger := utils.NewDefaultLogger()
logger := common.NewDefaultLogger()

if err := callServer(logger, cfg); err != nil {
return fmt.Errorf("call server: %w", err)
Expand Down Expand Up @@ -89,7 +89,7 @@ func makeConnection(logger *zap.Logger, cfg *config.ClientConfig) (*grpc.ClientC
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

conn, err := grpc.Dial(utils.EndpointToString(cfg.Endpoint), opts...)
conn, err := grpc.Dial(common.EndpointToString(cfg.Endpoint), opts...)
if err != nil {
return nil, fmt.Errorf("grpc dial: %w", err)
}
Expand All @@ -103,7 +103,7 @@ func callServer(logger *zap.Logger, cfg *config.ClientConfig) error {
return fmt.Errorf("grpc dial: %w", err)
}

defer utils.LogCloserError(logger, conn, "connection close")
defer common.LogCloserError(logger, conn, "connection close")

connectorClient := api_service.NewConnectorClient(conn)

Expand Down Expand Up @@ -150,15 +150,15 @@ func describeTable(
return nil, fmt.Errorf("describe table: %w", err)
}

if utils.IsSuccess(resp.Error) {
if common.IsSuccess(resp.Error) {
logger.Debug("DescribeTable", zap.String("response", resp.String()))

return resp.Schema, nil
}

logger.Error("DescribeTable", zap.String("response", resp.String()))

return nil, utils.NewSTDErrorFromAPIError(resp.Error)
return nil, common.NewSTDErrorFromAPIError(resp.Error)
}

func listSplits(
Expand Down Expand Up @@ -203,10 +203,10 @@ func listSplits(
return nil, fmt.Errorf("stream list splits: %w", err)
}

if !utils.IsSuccess(resp.Error) {
if !common.IsSuccess(resp.Error) {
logger.Error("ListSplits", zap.String("response", resp.String()))

return splits, utils.NewSTDErrorFromAPIError(resp.Error)
return splits, common.NewSTDErrorFromAPIError(resp.Error)
}

logger.Debug("ListSplits", zap.String("response", resp.String()))
Expand Down Expand Up @@ -247,8 +247,8 @@ func readSplits(
return fmt.Errorf("stream list splits: %w", err)
}

if !utils.IsSuccess(resp.Error) {
return utils.NewSTDErrorFromAPIError(resp.Error)
if !common.IsSuccess(resp.Error) {
return common.NewSTDErrorFromAPIError(resp.Error)
}

responses = append(responses, resp)
Expand Down
2 changes: 1 addition & 1 deletion app/server/utils/endpoint.go → app/common/endpoint.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package common

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion app/server/utils/errors.go → app/common/errors.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package common

import (
"errors"
Expand Down
55 changes: 21 additions & 34 deletions app/server/utils/logger.go → app/common/logger.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package common

import (
"fmt"
Expand All @@ -16,22 +16,27 @@ import (

// TODO: it's better to do this in GRPC middleware

func AnnotateLogger(l *zap.Logger, method string, dsi *api_common.TDataSourceInstance) *zap.Logger {
logger := l.With(zap.String("method", method))

if dsi != nil {
logger = logger.With(
zap.String("data_source_kind", api_common.EDataSourceKind_name[int32(dsi.GetKind())]),
zap.String("host", dsi.GetEndpoint().GetHost()),
zap.Uint32("port", dsi.GetEndpoint().GetPort()),
zap.String("database", dsi.GetDatabase()),
zap.Bool("use_tls", dsi.GetUseTls()),
zap.String("protocol", dsi.GetProtocol().String()),
// TODO: can we print just a login without a password?
)
}
func AnnotateLoggerWithMethod(l *zap.Logger, method string) *zap.Logger {
return l.With(zap.String("method", method))
}

func AnnotateLoggerWithDataSourceInstance(l *zap.Logger, dsi *api_common.TDataSourceInstance) *zap.Logger {
return l.With(
zap.String("data_source_kind", api_common.EDataSourceKind_name[int32(dsi.GetKind())]),
zap.String("host", dsi.GetEndpoint().GetHost()),
zap.Uint32("port", dsi.GetEndpoint().GetPort()),
zap.String("database", dsi.GetDatabase()),
zap.Bool("use_tls", dsi.GetUseTls()),
zap.String("protocol", dsi.GetProtocol().String()),
// TODO: can we print just a login without a password?
)
}

func AnnotateLoggerForUnaryCall(l *zap.Logger, method string, dsi *api_common.TDataSourceInstance) *zap.Logger {
l = AnnotateLoggerWithMethod(l, method)
l = AnnotateLoggerWithDataSourceInstance(l, dsi)

return logger
return l
}

func LogCloserError(logger *zap.Logger, closer io.Closer, msg string) {
Expand Down Expand Up @@ -82,24 +87,6 @@ func newDefaultLoggerConfig() zap.Config {

func NewTestLogger(t *testing.T) *zap.Logger { return zaptest.NewLogger(t) }

func DumpReadSplitsResponse(logger *zap.Logger, resp *api_service_protos.TReadSplitsResponse) {
switch t := resp.GetPayload().(type) {
case *api_service_protos.TReadSplitsResponse_ArrowIpcStreaming:
if dump := resp.GetArrowIpcStreaming(); dump != nil {
logger.Debug("response", zap.Int("arrow_blob_length", len(dump)))
}
case *api_service_protos.TReadSplitsResponse_ColumnSet:
for i := range t.ColumnSet.Data {
data := t.ColumnSet.Data[i]
meta := t.ColumnSet.Meta[i]

logger.Debug("response", zap.Int("column_id", i), zap.String("meta", meta.String()), zap.String("data", data.String()))
}
default:
panic(fmt.Sprintf("unexpected message type %v", t))
}
}

func SelectToFields(slct *api_service_protos.TSelect) []zap.Field {
result := []zap.Field{
zap.Any("from", slct.From),
Expand Down
4 changes: 2 additions & 2 deletions app/server/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

"github.com/spf13/cobra"

"github.com/ydb-platform/fq-connector-go/app/server/utils"
"github.com/ydb-platform/fq-connector-go/app/common"
)

var Cmd = &cobra.Command{
Expand Down Expand Up @@ -42,7 +42,7 @@
return fmt.Errorf("new config: %w", err)
}

logger, err := utils.NewLoggerFromConfig(cfg.Logger)
logger, err := common.NewLoggerFromConfig(cfg.Logger)

Check warning on line 45 in app/server/cmd.go

View check run for this annotation

Codecov / codecov/patch

app/server/cmd.go#L45

Added line #L45 was not covered by tests
if err != nil {
return fmt.Errorf("new logger from config: %w", err)
}
Expand Down
20 changes: 10 additions & 10 deletions app/server/data_source_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
api_common "github.com/ydb-platform/fq-connector-go/api/common"
api_service "github.com/ydb-platform/fq-connector-go/api/service"
api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
"github.com/ydb-platform/fq-connector-go/app/common"
"github.com/ydb-platform/fq-connector-go/app/config"
"github.com/ydb-platform/fq-connector-go/app/server/datasource"
"github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms"
Expand Down Expand Up @@ -44,7 +45,7 @@

return ds.DescribeTable(ctx, logger, request)
default:
return nil, fmt.Errorf("unsupported data source type '%v': %w", kind, utils.ErrDataSourceNotSupported)
return nil, fmt.Errorf("unsupported data source type '%v': %w", kind, common.ErrDataSourceNotSupported)

Check warning on line 48 in app/server/data_source_collection.go

View check run for this annotation

Codecov / codecov/patch

app/server/data_source_collection.go#L48

Added line #L48 was not covered by tests
}
}

Expand All @@ -54,39 +55,39 @@
request *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
) error {
switch kind := request.GetDataSourceInstance().GetKind(); kind {
switch kind := split.GetSelect().GetDataSourceInstance().GetKind(); kind {
case api_common.EDataSourceKind_CLICKHOUSE, api_common.EDataSourceKind_POSTGRESQL, api_common.EDataSourceKind_YDB:
ds, err := dsc.rdbms.Make(logger, kind)
if err != nil {
return err
}

return readSplit[any](logger, stream, request, split, ds, dsc.memoryAllocator, dsc.readLimiterFactory, dsc.cfg)
return readSplit[any](logger, stream, request.GetFormat(), split, ds, dsc.memoryAllocator, dsc.readLimiterFactory, dsc.cfg)
case api_common.EDataSourceKind_S3:
ds := s3.NewDataSource()

return readSplit[string](logger, stream, request, split, ds, dsc.memoryAllocator, dsc.readLimiterFactory, dsc.cfg)
return readSplit[string](logger, stream, request.GetFormat(), split, ds, dsc.memoryAllocator, dsc.readLimiterFactory, dsc.cfg)

Check warning on line 69 in app/server/data_source_collection.go

View check run for this annotation

Codecov / codecov/patch

app/server/data_source_collection.go#L69

Added line #L69 was not covered by tests
default:
return fmt.Errorf("unsupported data source type '%v': %w", kind, utils.ErrDataSourceNotSupported)
return fmt.Errorf("unsupported data source type '%v': %w", kind, common.ErrDataSourceNotSupported)

Check warning on line 71 in app/server/data_source_collection.go

View check run for this annotation

Codecov / codecov/patch

app/server/data_source_collection.go#L71

Added line #L71 was not covered by tests
}
}

func readSplit[T utils.Acceptor](
logger *zap.Logger,
stream api_service.Connector_ReadSplitsServer,
request *api_service_protos.TReadSplitsRequest,
format api_service_protos.TReadSplitsRequest_EFormat,
split *api_service_protos.TSplit,
dataSource datasource.DataSource[T],
memoryAllocator memory.Allocator,
readLimiterFactory *paging.ReadLimiterFactory,
cfg *config.TServerConfig,
) error {
logger.Debug("split reading started", utils.SelectToFields(split.Select)...)
logger.Debug("split reading started", common.SelectToFields(split.Select)...)

columnarBufferFactory, err := paging.NewColumnarBufferFactory[T](
logger,
memoryAllocator,
request.Format,
format,
split.Select.What)
if err != nil {
return fmt.Errorf("new columnar buffer factory: %w", err)
Expand All @@ -109,7 +110,6 @@
streamer := streaming.NewStreamer(
logger,
stream,
request,
split,
sink,
dataSource,
Expand All @@ -131,7 +131,7 @@
}

func NewDataSourceCollection(
queryLoggerFactory utils.QueryLoggerFactory,
queryLoggerFactory common.QueryLoggerFactory,
memoryAllocator memory.Allocator,
readLimiterFactory *paging.ReadLimiterFactory,
cfg *config.TServerConfig,
Expand Down
6 changes: 6 additions & 0 deletions app/server/datasource/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (

"go.uber.org/zap"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"

api_common "github.com/ydb-platform/fq-connector-go/api/common"
api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
"github.com/ydb-platform/fq-connector-go/app/server/paging"
Expand Down Expand Up @@ -38,3 +40,7 @@ type DataSource[T utils.Acceptor] interface {
sink paging.Sink[T],
)
}

type TypeMapper interface {
SQLTypeToYDBColumn(columnName, typeName string, rules *api_service_protos.TTypeMappingSettings) (*Ydb.Column, error)
}
4 changes: 0 additions & 4 deletions app/server/datasource/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,3 @@ func (m *DataSourceMock[T]) ReadSplit(
) {
m.Called(split, pagingWriter)
}

func (*DataSourceMock[T]) TypeMapper() utils.TypeMapper {
panic("not implemented") // TODO: Implement
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.uber.org/zap"

api_common "github.com/ydb-platform/fq-connector-go/api/common"
"github.com/ydb-platform/fq-connector-go/app/common"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
)
Expand All @@ -20,7 +21,7 @@ var _ rdbms_utils.Connection = (*Connection)(nil)

type Connection struct {
*sql.DB
logger utils.QueryLogger
logger common.QueryLogger
}

type rows struct {
Expand Down Expand Up @@ -95,7 +96,7 @@ func (c *connectionManager) Make(
}

opts := &clickhouse.Options{
Addr: []string{utils.EndpointToString(dsi.GetEndpoint())},
Addr: []string{common.EndpointToString(dsi.GetEndpoint())},
Auth: clickhouse.Auth{
Database: dsi.Database,
Username: dsi.Credentials.GetBasic().Username,
Expand Down Expand Up @@ -140,7 +141,7 @@ func (c *connectionManager) Make(
}

func (*connectionManager) Release(logger *zap.Logger, conn rdbms_utils.Connection) {
utils.LogCloserError(logger, conn, "close clickhouse connection")
common.LogCloserError(logger, conn, "close clickhouse connection")
}

func NewConnectionManager(cfg rdbms_utils.ConnectionManagerBase) rdbms_utils.ConnectionManager {
Expand Down
Loading