Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended type coverage in integration tests #23

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
# Note: By default, the `.golangci.yml` file should be at the root of the repository.
# The location of the configuration file can be changed by using `--config=`
# args: --timeout=30m --config=/my/path/.golangci.yml --issues-exit-code=0
args: ./app/...
args: ./app/... ./tests/...

# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ integration_test: integration_test_env
./fq-connector-go-tests -projectPath=$(PROJECT_PATH)

integration_test_env:
docker-compose -f ./tests/infra/datasource/docker-compose.yaml stop
docker-compose -f ./tests/infra/datasource/docker-compose.yaml rm -f -v
docker-compose -f ./tests/infra/datasource/docker-compose.yaml up -d

Expand Down
47 changes: 19 additions & 28 deletions app/client/app.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package client

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"

"github.com/apache/arrow/go/v13/arrow/ipc"
"github.com/apache/arrow/go/v13/arrow"
"github.com/spf13/cobra"
"go.uber.org/zap"
"google.golang.org/protobuf/encoding/prototext"
Expand Down Expand Up @@ -67,7 +66,11 @@

switch cfg.DataSourceInstance.Kind {
case api_common.EDataSourceKind_CLICKHOUSE, api_common.EDataSourceKind_POSTGRESQL, api_common.EDataSourceKind_YDB:
splits, err = prepareSplits(cl, tableName, cfg.DataSourceInstance)
typeMappingSettings := &api_service_protos.TTypeMappingSettings{
DateTimeFormat: api_service_protos.EDateTimeFormat_YQL_FORMAT,

Check warning on line 70 in app/client/app.go

View check run for this annotation

Codecov / codecov/patch

app/client/app.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}

splits, err = prepareSplits(cl, cfg.DataSourceInstance, typeMappingSettings, tableName)

Check warning on line 73 in app/client/app.go

View check run for this annotation

Codecov / codecov/patch

app/client/app.go#L73

Added line #L73 was not covered by tests

if err != nil {
return fmt.Errorf("prepare splits: %w", err)
Expand All @@ -86,11 +89,12 @@

func prepareSplits(
cl Client,
tableName string,
dsi *api_common.TDataSourceInstance,
typeMappingSettings *api_service_protos.TTypeMappingSettings,
tableName string,
) ([]*api_service_protos.TSplit, error) {
// DescribeTable
describeTableResponse, err := cl.DescribeTable(context.TODO(), dsi, tableName)
describeTableResponse, err := cl.DescribeTable(context.TODO(), dsi, typeMappingSettings, tableName)

Check warning on line 97 in app/client/app.go

View check run for this annotation

Codecov / codecov/patch

app/client/app.go#L97

Added line #L97 was not covered by tests
if err != nil {
return nil, fmt.Errorf("describe table: %w", err)
}
Expand Down Expand Up @@ -126,38 +130,25 @@
return fmt.Errorf("read splits: %w", err)
}

if err := dumpReadResponses(logger, readSplitsResponses); err != nil {
return fmt.Errorf("dump read responses: %w", err)
records, err := common.ReadResponsesToArrowRecords(readSplitsResponses)
if err != nil {
return fmt.Errorf("read responses to arrow records: %w", err)

Check warning on line 135 in app/client/app.go

View check run for this annotation

Codecov / codecov/patch

app/client/app.go#L133-L135

Added lines #L133 - L135 were not covered by tests
}

dumpReadResponses(logger, records)

Check warning on line 138 in app/client/app.go

View check run for this annotation

Codecov / codecov/patch

app/client/app.go#L138

Added line #L138 was not covered by tests

return nil
}

func dumpReadResponses(
logger *zap.Logger,
responses []*api_service_protos.TReadSplitsResponse,
) error {
for _, resp := range responses {
buf := bytes.NewBuffer(resp.GetArrowIpcStreaming())

reader, err := ipc.NewReader(buf)
if err != nil {
return fmt.Errorf("new reader: %w", err)
}

for reader.Next() {
record := reader.Record()
logger.Debug("schema", zap.String("schema", record.Schema().String()))

for i, column := range record.Columns() {
logger.Debug("column", zap.Int("id", i), zap.String("data", column.String()))
}
records []arrow.Record,
) {
for _, record := range records {
for i, column := range record.Columns() {
logger.Debug("column", zap.Int("id", i), zap.String("data", column.String()))

Check warning on line 149 in app/client/app.go

View check run for this annotation

Codecov / codecov/patch

app/client/app.go#L146-L149

Added lines #L146 - L149 were not covered by tests
}

reader.Release()
}

return nil
}

var Cmd = &cobra.Command{
Expand Down
7 changes: 5 additions & 2 deletions app/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Client interface {
DescribeTable(
ctx context.Context,
dsi *api_common.TDataSourceInstance,
typeMappingSettings *api_service_protos.TTypeMappingSettings,
tableName string,
) (*api_service_protos.TDescribeTableResponse, error)

Expand All @@ -49,11 +50,13 @@ type clientImpl struct {
func (c *clientImpl) DescribeTable(
ctx context.Context,
dsi *api_common.TDataSourceInstance,
typeMappingSettings *api_service_protos.TTypeMappingSettings,
tableName string,
) (*api_service_protos.TDescribeTableResponse, error) {
request := &api_service_protos.TDescribeTableRequest{
DataSourceInstance: dsi,
Table: tableName,
DataSourceInstance: dsi,
Table: tableName,
TypeMappingSettings: typeMappingSettings,
}

return c.client.DescribeTable(ctx, request)
Expand Down
30 changes: 30 additions & 0 deletions app/common/api_helpers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package common

import (
"bytes"
"fmt"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/ipc"

api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
)

Expand Down Expand Up @@ -42,3 +48,27 @@

return out
}

func ReadResponsesToArrowRecords(responses []*api_service_protos.TReadSplitsResponse) ([]arrow.Record, error) {
var out []arrow.Record

for _, resp := range responses {
buf := bytes.NewBuffer(resp.GetArrowIpcStreaming())

reader, err := ipc.NewReader(buf)
if err != nil {
return nil, fmt.Errorf("new reader: %w", err)

Check warning on line 60 in app/common/api_helpers.go

View check run for this annotation

Codecov / codecov/patch

app/common/api_helpers.go#L60

Added line #L60 was not covered by tests
}

for reader.Next() {
record := reader.Record()

record.Retain()
out = append(out, record)
}

reader.Release()
}

return out, nil
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,38 @@
package utils
package common

import (
"fmt"
"time"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"

api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
"github.com/ydb-platform/fq-connector-go/app/common"
)

type ValueType interface {
bool |
int8 | int16 | int32 | int64 |
uint8 | uint16 | uint32 | uint64 |
float32 | float64 |
string | []byte |
time.Time
}

type ArrowArrayType[VT ValueType] interface {
*array.Boolean |
*array.Int8 | *array.Int16 | *array.Int32 | *array.Int64 |
*array.Uint8 | *array.Uint16 | *array.Uint32 | *array.Uint64 |
*array.Float32 | *array.Float64 |
*array.String | *array.Binary

Len() int
Value(int) VT
IsNull(int) bool
}

type ArrowBuilder[VT ValueType] interface {
AppendNull()
Append(value VT)
Expand Down Expand Up @@ -49,7 +70,7 @@
default:
err := fmt.Errorf(
"only primitive and optional types are supported, got '%T' instead: %w",
t, common.ErrDataTypeNotSupported,
t, ErrDataTypeNotSupported,

Check warning on line 73 in app/common/arrow_helpers.go

View check run for this annotation

Codecov / codecov/patch

app/common/arrow_helpers.go#L73

Added line #L73 was not covered by tests
)

return nil, err
Expand Down Expand Up @@ -86,7 +107,7 @@
default:
err := fmt.Errorf(
"only primitive and optional types are supported, got '%T' instead: %w",
t, common.ErrDataTypeNotSupported,
t, ErrDataTypeNotSupported,

Check warning on line 110 in app/common/arrow_helpers.go

View check run for this annotation

Codecov / codecov/patch

app/common/arrow_helpers.go#L110

Added line #L110 was not covered by tests
)

return nil, err
Expand Down Expand Up @@ -140,7 +161,7 @@
case Ydb.Type_TIMESTAMP:
builder = array.NewUint64Builder(arrowAllocator)
default:
return nil, fmt.Errorf("register type '%v': %w", typeID, common.ErrDataTypeNotSupported)
return nil, fmt.Errorf("register type '%v': %w", typeID, ErrDataTypeNotSupported)

Check warning on line 164 in app/common/arrow_helpers.go

View check run for this annotation

Codecov / codecov/patch

app/common/arrow_helpers.go#L164

Added line #L164 was not covered by tests
}

return builder, nil
Expand Down Expand Up @@ -188,7 +209,7 @@
case Ydb.Type_TIMESTAMP:
field = arrow.Field{Name: column.Name, Type: arrow.PrimitiveTypes.Uint64}
default:
return arrow.Field{}, fmt.Errorf("register type '%v': %w", typeID, common.ErrDataTypeNotSupported)
return arrow.Field{}, fmt.Errorf("register type '%v': %w", typeID, ErrDataTypeNotSupported)

Check warning on line 212 in app/common/arrow_helpers.go

View check run for this annotation

Codecov / codecov/patch

app/common/arrow_helpers.go#L212

Added line #L212 was not covered by tests
}

return field, nil
Expand Down
23 changes: 17 additions & 6 deletions app/server/utils/time.go → app/common/time.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package utils
package common

import (
"fmt"
"time"

"github.com/ydb-platform/fq-connector-go/app/common"
)

var (
Expand All @@ -15,7 +13,7 @@

func TimeToYDBDate(t *time.Time) (uint16, error) {
if t.Before(minYDBTime) || t.After(maxYDBTime) {
return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, common.ErrValueOutOfTypeBounds)
return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, ErrValueOutOfTypeBounds)
}

days := t.Sub(minYDBTime).Hours() / 24
Expand All @@ -25,7 +23,7 @@

func TimeToYDBDatetime(t *time.Time) (uint32, error) {
if t.Before(minYDBTime) || t.After(maxYDBTime) {
return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, common.ErrValueOutOfTypeBounds)
return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, ErrValueOutOfTypeBounds)
}

seconds := t.Unix()
Expand All @@ -35,10 +33,23 @@

func TimeToYDBTimestamp(t *time.Time) (uint64, error) {
if t.Before(minYDBTime) || t.After(maxYDBTime) {
return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, common.ErrValueOutOfTypeBounds)
return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, ErrValueOutOfTypeBounds)
}

seconds := t.UnixMicro()

return uint64(seconds), nil
}

type ydbTime interface {
uint16 | uint32 | uint64
}

func MustTimeToYDBType[OUT ydbTime](f func(t *time.Time) (OUT, error), t time.Time) OUT {
res, err := f(&t)
if err != nil {
panic(err)

Check warning on line 51 in app/common/time.go

View check run for this annotation

Codecov / codecov/patch

app/common/time.go#L51

Added line #L51 was not covered by tests
}

return res
}
22 changes: 10 additions & 12 deletions app/server/utils/time_test.go → app/common/time_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package utils
package common

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/ydb-platform/fq-connector-go/app/common"
)

func TestTimeToYDBDate(t *testing.T) {
Expand All @@ -31,12 +29,12 @@ func TestTimeToYDBDate(t *testing.T) {
{
input: time.Date(1969, 12, 31, 23, 59, 00, 00, time.UTC),
output: 0,
err: common.ErrValueOutOfTypeBounds,
err: ErrValueOutOfTypeBounds,
},
{
input: time.Date(9999, 01, 01, 00, 00, 00, 00, time.UTC),
output: 0,
err: common.ErrValueOutOfTypeBounds,
err: ErrValueOutOfTypeBounds,
},
}

Expand All @@ -48,7 +46,7 @@ func TestTimeToYDBDate(t *testing.T) {
require.Equal(t, tc.output, output)

if tc.err != nil {
require.True(t, errors.Is(tc.err, common.ErrValueOutOfTypeBounds))
require.True(t, errors.Is(tc.err, ErrValueOutOfTypeBounds))
} else {
require.NoError(t, err)
}
Expand Down Expand Up @@ -77,12 +75,12 @@ func TestTimeToYDBDatetime(t *testing.T) {
{
input: time.Date(1969, 12, 31, 23, 59, 00, 00, time.UTC),
output: 0,
err: common.ErrValueOutOfTypeBounds,
err: ErrValueOutOfTypeBounds,
},
{
input: time.Date(9999, 01, 01, 00, 00, 00, 00, time.UTC),
output: 0,
err: common.ErrValueOutOfTypeBounds,
err: ErrValueOutOfTypeBounds,
},
}

Expand All @@ -94,7 +92,7 @@ func TestTimeToYDBDatetime(t *testing.T) {
require.Equal(t, tc.output, output)

if tc.err != nil {
require.True(t, errors.Is(tc.err, common.ErrValueOutOfTypeBounds))
require.True(t, errors.Is(tc.err, ErrValueOutOfTypeBounds))
} else {
require.NoError(t, err)
}
Expand Down Expand Up @@ -123,12 +121,12 @@ func TestTimeToYDBTimestamp(t *testing.T) {
{
input: time.Date(1969, 12, 31, 23, 59, 00, 00, time.UTC),
output: 0,
err: common.ErrValueOutOfTypeBounds,
err: ErrValueOutOfTypeBounds,
},
{
input: time.Date(29427, 01, 01, 00, 00, 00, 00, time.UTC),
output: 0,
err: common.ErrValueOutOfTypeBounds,
err: ErrValueOutOfTypeBounds,
},
}

Expand All @@ -140,7 +138,7 @@ func TestTimeToYDBTimestamp(t *testing.T) {
require.Equal(t, tc.output, output)

if tc.err != nil {
require.True(t, errors.Is(tc.err, common.ErrValueOutOfTypeBounds))
require.True(t, errors.Is(tc.err, ErrValueOutOfTypeBounds))
} else {
require.NoError(t, err)
}
Expand Down
Loading