Skip to content

Commit

Permalink
Cover ReadSplits method
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalyisaev2 committed Dec 29, 2023
1 parent 31dbd3e commit 3dc52d5
Show file tree
Hide file tree
Showing 14 changed files with 344 additions and 135 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ jobs:
- name: Run integration tests
run: |
go test -c -o fq-connector-go-tests -coverpkg=./... -covermode=atomic ./tests
./fq-connector-go-tests -projectPath=$(pwd) -test.coverprofile=coverage_integratoin_tests.out
./fq-connector-go-tests -projectPath=$(pwd) -test.coverprofile=coverage_integration_tests.out
- name: Union coverage
run: |
cat coverage* | grep -v pb.go | grep -v mock.go > coverage.out
cat coverage_unit_tests.out | grep -v 'pb.go\|mock.go\|library' > coverage.out
cat coverage_integration_tests.out | grep -v 'atomic\|pb.go\|mock.go\|library' >> coverage.out
go tool cover -func=coverage.out
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4-beta
with:
Expand Down
17 changes: 13 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
PROJECT_PATH = $(shell pwd)

build:
go build -o fq-connector-go ./app

Expand All @@ -10,14 +12,21 @@ lint:
unit_test:
go test -v ./app/...

PROJECT_PATH = $(shell pwd)
integration_test: integration_test_clean
docker-compose -f ./tests/infra/datasource/docker-compose.yaml up -d
integration_test: integration_test_env
go test -c -o fq-connector-go-tests ./tests
./fq-connector-go-tests -projectPath=$(PROJECT_PATH)

integration_test_clean:
integration_test_env:
docker-compose -f ./tests/infra/datasource/docker-compose.yaml rm -f -v
docker-compose -f ./tests/infra/datasource/docker-compose.yaml up -d

test_coverage: integration_test_env
go test -coverpkg=./... -coverprofile=coverage_unit_tests.out -covermode=atomic ./app/...
go test -c -o fq-connector-go-tests -coverpkg=./... -covermode=atomic ./tests
./fq-connector-go-tests -projectPath=$(PROJECT_PATH) -test.coverprofile=coverage_integration_tests.out
cat coverage_unit_tests.out | grep -v 'pb.go\|mock.go\|library' > coverage.out
cat coverage_integration_tests.out | grep -v 'atomic\|pb.go\|mock.go\|library' >> coverage.out
go tool cover -func=coverage.out

build_image_base:
docker build -t ghcr.io/ydb-platform/fq-connector-go:base -f ./Dockerfile.base .
11 changes: 5 additions & 6 deletions app/server/data_source_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,18 @@ func (dsc *DataSourceCollection) DoReadSplit(
request *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
) error {
switch kind := request.GetDataSourceInstance().GetKind(); kind {
switch kind := split.GetSelect().GetDataSourceInstance().GetKind(); kind {
case api_common.EDataSourceKind_CLICKHOUSE, api_common.EDataSourceKind_POSTGRESQL, api_common.EDataSourceKind_YDB:
ds, err := dsc.rdbms.Make(logger, kind)
if err != nil {
return err
}

return readSplit[any](logger, stream, request, split, ds, dsc.memoryAllocator, dsc.readLimiterFactory, dsc.cfg)
return readSplit[any](logger, stream, request.GetFormat(), split, ds, dsc.memoryAllocator, dsc.readLimiterFactory, dsc.cfg)
case api_common.EDataSourceKind_S3:
ds := s3.NewDataSource()

return readSplit[string](logger, stream, request, split, ds, dsc.memoryAllocator, dsc.readLimiterFactory, dsc.cfg)
return readSplit[string](logger, stream, request.GetFormat(), split, ds, dsc.memoryAllocator, dsc.readLimiterFactory, dsc.cfg)

Check warning on line 68 in app/server/data_source_collection.go

View check run for this annotation

Codecov / codecov/patch

app/server/data_source_collection.go#L68

Added line #L68 was not covered by tests
default:
return fmt.Errorf("unsupported data source type '%v': %w", kind, utils.ErrDataSourceNotSupported)
}
Expand All @@ -74,7 +74,7 @@ func (dsc *DataSourceCollection) DoReadSplit(
func readSplit[T utils.Acceptor](
logger *zap.Logger,
stream api_service.Connector_ReadSplitsServer,
request *api_service_protos.TReadSplitsRequest,
format api_service_protos.TReadSplitsRequest_EFormat,
split *api_service_protos.TSplit,
dataSource datasource.DataSource[T],
memoryAllocator memory.Allocator,
Expand All @@ -86,7 +86,7 @@ func readSplit[T utils.Acceptor](
columnarBufferFactory, err := paging.NewColumnarBufferFactory[T](
logger,
memoryAllocator,
request.Format,
format,
split.Select.What)
if err != nil {
return fmt.Errorf("new columnar buffer factory: %w", err)
Expand All @@ -109,7 +109,6 @@ func readSplit[T utils.Acceptor](
streamer := streaming.NewStreamer(
logger,
stream,
request,
split,
sink,
dataSource,
Expand Down
18 changes: 11 additions & 7 deletions app/server/service_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *serviceConnector) DescribeTable(
ctx context.Context,
request *api_service_protos.TDescribeTableRequest,
) (*api_service_protos.TDescribeTableResponse, error) {
logger := utils.AnnotateLogger(s.logger, "DescribeTable", request.DataSourceInstance)
logger := utils.AnnotateLoggerForUnaryCall(s.logger, "DescribeTable", request.DataSourceInstance)
logger.Info("request handling started", zap.String("table", request.GetTable()))

if err := ValidateDescribeTableRequest(logger, request); err != nil {
Expand All @@ -65,7 +65,7 @@ func (s *serviceConnector) DescribeTable(
}

func (s *serviceConnector) ListSplits(request *api_service_protos.TListSplitsRequest, stream api_service.Connector_ListSplitsServer) error {
logger := utils.AnnotateLogger(s.logger, "ListSplits", nil)
logger := utils.AnnotateLoggerWithMethod(s.logger, "ListSplits")
logger.Info("request handling started", zap.Int("total selects", len(request.Selects)))

if err := ValidateListSplitsRequest(logger, request); err != nil {
Expand All @@ -77,7 +77,7 @@ func (s *serviceConnector) ListSplits(request *api_service_protos.TListSplitsReq
totalSplits := 0

for _, slct := range request.Selects {
if err := s.doListSplitsHandleSelect(stream, slct, &totalSplits); err != nil {
if err := s.doListSplitsHandleSelect(logger, stream, slct, &totalSplits); err != nil {
logger.Error("request handling failed", zap.Error(err))

return err
Expand All @@ -90,11 +90,12 @@ func (s *serviceConnector) ListSplits(request *api_service_protos.TListSplitsReq
}

func (s *serviceConnector) doListSplitsHandleSelect(
logger *zap.Logger,
stream api_service.Connector_ListSplitsServer,
slct *api_service_protos.TSelect,
totalSplits *int,
) error {
logger := utils.AnnotateLogger(s.logger, "ListSplits", slct.DataSourceInstance)
logger = utils.AnnotateLoggerWithDataSourceInstance(logger, slct.DataSourceInstance)

args := []zap.Field{
zap.Int("split_id", *totalSplits),
Expand Down Expand Up @@ -146,8 +147,9 @@ func (*serviceConnector) doListSplitsResponse(

func (s *serviceConnector) ReadSplits(
request *api_service_protos.TReadSplitsRequest,
stream api_service.Connector_ReadSplitsServer) error {
logger := utils.AnnotateLogger(s.logger, "ReadSplits", request.DataSourceInstance)
stream api_service.Connector_ReadSplitsServer,
) error {
logger := utils.AnnotateLoggerWithMethod(s.logger, "ReadSplits")
logger.Info("request handling started", zap.Int("total_splits", len(request.Splits)))

err := s.doReadSplits(logger, request, stream)
Expand Down Expand Up @@ -176,7 +178,9 @@ func (s *serviceConnector) doReadSplits(
}

for i, split := range request.Splits {
splitLogger := logger.With(zap.Int("split_id", i))
splitLogger := utils.
AnnotateLoggerWithDataSourceInstance(logger, split.Select.DataSourceInstance).
With(zap.Int("split_id", i))

err := s.dataSourceCollection.DoReadSplit(
splitLogger,
Expand Down
3 changes: 0 additions & 3 deletions app/server/streaming/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

type Streamer[T utils.Acceptor] struct {
stream api_service.Connector_ReadSplitsServer
request *api_service_protos.TReadSplitsRequest
dataSource datasource.DataSource[T]
split *api_service_protos.TSplit
sink paging.Sink[T]
Expand Down Expand Up @@ -98,7 +97,6 @@ func (s *Streamer[T]) Run() error {
func NewStreamer[T utils.Acceptor](
logger *zap.Logger,
stream api_service.Connector_ReadSplitsServer,
request *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
sink paging.Sink[T],
dataSource datasource.DataSource[T],
Expand All @@ -109,7 +107,6 @@ func NewStreamer[T utils.Acceptor](
logger: logger,
stream: stream,
split: split,
request: request,
dataSource: dataSource,
sink: sink,
ctx: ctx,
Expand Down
3 changes: 1 addition & 2 deletions app/server/streaming/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (tc testCaseStreaming) messageParams() (sentMessages, rowsInLastMessage int

func (tc testCaseStreaming) execute(t *testing.T) {
logger := utils.NewTestLogger(t)
request := &api_service_protos.TReadSplitsRequest{}
split := utils.MakeTestSplit()

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -258,7 +257,7 @@ func (tc testCaseStreaming) execute(t *testing.T) {
)
require.NoError(t, err)

streamer := NewStreamer(logger, stream, request, split, sink, dataSource)
streamer := NewStreamer(logger, stream, split, sink, dataSource)

err = streamer.Run()

Expand Down
35 changes: 20 additions & 15 deletions app/server/utils/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,27 @@ import (

// TODO: it's better to do this in GRPC middleware

func AnnotateLogger(l *zap.Logger, method string, dsi *api_common.TDataSourceInstance) *zap.Logger {
logger := l.With(zap.String("method", method))

if dsi != nil {
logger = logger.With(
zap.String("data_source_kind", api_common.EDataSourceKind_name[int32(dsi.GetKind())]),
zap.String("host", dsi.GetEndpoint().GetHost()),
zap.Uint32("port", dsi.GetEndpoint().GetPort()),
zap.String("database", dsi.GetDatabase()),
zap.Bool("use_tls", dsi.GetUseTls()),
zap.String("protocol", dsi.GetProtocol().String()),
// TODO: can we print just a login without a password?
)
}
func AnnotateLoggerWithMethod(l *zap.Logger, method string) *zap.Logger {
return l.With(zap.String("method", method))
}

func AnnotateLoggerWithDataSourceInstance(l *zap.Logger, dsi *api_common.TDataSourceInstance) *zap.Logger {
return l.With(
zap.String("data_source_kind", api_common.EDataSourceKind_name[int32(dsi.GetKind())]),
zap.String("host", dsi.GetEndpoint().GetHost()),
zap.Uint32("port", dsi.GetEndpoint().GetPort()),
zap.String("database", dsi.GetDatabase()),
zap.Bool("use_tls", dsi.GetUseTls()),
zap.String("protocol", dsi.GetProtocol().String()),
// TODO: can we print just a login without a password?
)
}

func AnnotateLoggerForUnaryCall(l *zap.Logger, method string, dsi *api_common.TDataSourceInstance) *zap.Logger {
l = AnnotateLoggerWithMethod(l, method)
l = AnnotateLoggerWithDataSourceInstance(l, dsi)

return logger
return l
}

func LogCloserError(logger *zap.Logger, closer io.Closer, msg string) {
Expand Down
22 changes: 18 additions & 4 deletions app/server/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,24 @@ func ValidateListSplitsRequest(logger *zap.Logger, request *api_service_protos.T
}

func ValidateReadSplitsRequest(logger *zap.Logger, request *api_service_protos.TReadSplitsRequest) error {
if err := validateDataSourceInstance(logger, request.GetDataSourceInstance()); err != nil {
return fmt.Errorf("validate data source instance: %w", err)
}

if len(request.Splits) == 0 {
return fmt.Errorf("splits are empty: %w", utils.ErrInvalidRequest)
}

for i, split := range request.Splits {
if err := validateSplit(logger, split); err != nil {
return fmt.Errorf("validate split #%d: %w", i, err)

Check warning on line 46 in app/server/validate.go

View check run for this annotation

Codecov / codecov/patch

app/server/validate.go#L46

Added line #L46 was not covered by tests
}
}

return nil
}

func validateSplit(logger *zap.Logger, split *api_service_protos.TSplit) error {
if err := validateSelect(logger, split.Select); err != nil {
return fmt.Errorf("validate select: %w", err)

Check warning on line 55 in app/server/validate.go

View check run for this annotation

Codecov / codecov/patch

app/server/validate.go#L55

Added line #L55 was not covered by tests
}

return nil
}

Expand All @@ -61,6 +71,10 @@ func validateSelect(logger *zap.Logger, slct *api_service_protos.TSelect) error
}

func validateDataSourceInstance(logger *zap.Logger, dsi *api_common.TDataSourceInstance) error {
if dsi == nil {
return fmt.Errorf("empty data source instance: %w", utils.ErrInvalidRequest)

Check warning on line 75 in app/server/validate.go

View check run for this annotation

Codecov / codecov/patch

app/server/validate.go#L75

Added line #L75 was not covered by tests
}

if dsi.GetKind() == api_common.EDataSourceKind_DATA_SOURCE_KIND_UNSPECIFIED {
return fmt.Errorf("empty kind: %w", utils.ErrInvalidRequest)
}
Expand Down
83 changes: 0 additions & 83 deletions tests/clickhouse.go

This file was deleted.

Loading

0 comments on commit 3dc52d5

Please sign in to comment.