Skip to content

Commit

Permalink
bulker: added temporaryBatchSize option to enable filling tmp table i…
Browse files Browse the repository at this point in the history
…n batches

sync-sidecar: use temporaryBatchSize=100000 for full-refresh streams.
bulker: redshift: use json_parse_truncate_strings=ON to load table
  • Loading branch information
absorbb committed Sep 19, 2024
1 parent b3aa309 commit 9792fe6
Show file tree
Hide file tree
Showing 30 changed files with 129 additions and 57 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-build-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.22.5
go-version: 1.23.1
cache: false

- name: Bulker Test
Expand Down
4 changes: 2 additions & 2 deletions admin/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/jitsucom/bulker/admin

go 1.22.0
go 1.23

toolchain go1.22.4
toolchain go1.23.1

require (
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0
Expand Down
2 changes: 1 addition & 1 deletion all.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ENV TZ=UTC

WORKDIR /app

FROM golang:1.22.5-bookworm as builder
FROM golang:1.23.1-bookworm as builder

ARG VERSION
ENV VERSION $VERSION
Expand Down
2 changes: 1 addition & 1 deletion bulker.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ RUN apt-get install -y ca-certificates curl

ENV TZ=UTC

FROM golang:1.22.2-bullseye as build
FROM golang:1.23.1-bullseye as build

ARG VERSION
ENV VERSION $VERSION
Expand Down
4 changes: 2 additions & 2 deletions bulkerapp/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/jitsucom/bulker/bulkerapp

go 1.22.0
go 1.23

toolchain go1.22.4
toolchain go1.23.1

require (
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0
Expand Down
4 changes: 2 additions & 2 deletions bulkerlib/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/jitsucom/bulker/bulkerlib

go 1.22.0
go 1.23

toolchain go1.22.4
toolchain go1.23.1

require (
cloud.google.com/go v0.115.0
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/file_storage/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (ps *AbstractFileStorageStream) flushBatchFile(ctx context.Context) (err er
_ = file.Close()
}()
scanner := bufio.NewScanner(file)
scanner.Buffer(make([]byte, 1024*10), 1024*1024)
scanner.Buffer(make([]byte, 1024*10), 1024*1024*10)
i := 0
for scanner.Scan() {
if !ps.batchFileSkipLines.Contains(i) {
Expand Down
56 changes: 40 additions & 16 deletions bulkerlib/implementations/sql/abstract_transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type AbstractTransactionalSQLStream struct {
//function that generate tmp table schema based on target table schema
tmpTableFunc func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table)
dstTable *Table
temporaryBatchSize int
localBatchFileName string
batchFile *os.File
marshaller types.Marshaller
targetMarshaller types.Marshaller
Expand Down Expand Up @@ -62,9 +64,28 @@ func newAbstractTransactionalStream(id string, p SQLAdapter, tableName string, m
ps.useDiscriminator = true
}
}
ps.localBatchFileName = localBatchFileOption.Get(&ps.options)
ps.temporaryBatchSize = TemporaryBatchSizeOption.Get(&ps.options)
return &ps, nil
}

func (ps *AbstractTransactionalSQLStream) initTmpFile(ctx context.Context) (err error) {
if ps.batchFile == nil {
if !ps.merge && ps.sqlAdapter.GetBatchFileFormat() == types.FileFormatNDJSON {
//without merge we can write file with compression - no need to convert
ps.marshaller, _ = types.NewMarshaller(ps.sqlAdapter.GetBatchFileFormat(), ps.sqlAdapter.GetBatchFileCompression())
} else {
ps.marshaller, _ = types.NewMarshaller(types.FileFormatNDJSON, types.FileCompressionNONE)
}
ps.targetMarshaller, err = types.NewMarshaller(ps.sqlAdapter.GetBatchFileFormat(), ps.sqlAdapter.GetBatchFileCompression())
if err != nil {
return err
}
ps.batchFile, err = os.CreateTemp("", ps.localBatchFileName+"_*"+ps.marshaller.FileExtension())
}
return
}

func (ps *AbstractTransactionalSQLStream) init(ctx context.Context) (err error) {
if ps.inited {
return nil
Expand All @@ -77,20 +98,10 @@ func (ps *AbstractTransactionalSQLStream) init(ctx context.Context) (err error)
return fmt.Errorf("failed to setup s3 client: %v", err)
}
}
localBatchFile := localBatchFileOption.Get(&ps.options)
if localBatchFile != "" && ps.batchFile == nil {
ps.marshaller, _ = types.NewMarshaller(types.FileFormatNDJSON, types.FileCompressionNONE)
ps.targetMarshaller, err = types.NewMarshaller(ps.sqlAdapter.GetBatchFileFormat(), ps.sqlAdapter.GetBatchFileCompression())
if ps.localBatchFileName != "" && ps.batchFile == nil {
err = ps.initTmpFile(ctx)
if err != nil {
return err
}
if !ps.merge && ps.sqlAdapter.GetBatchFileFormat() == types.FileFormatNDJSON {
//without merge we can write file with compression - no need to convert
ps.marshaller, _ = types.NewMarshaller(ps.sqlAdapter.GetBatchFileFormat(), ps.sqlAdapter.GetBatchFileCompression())
}
ps.batchFile, err = os.CreateTemp("", localBatchFile+"_*"+ps.marshaller.FileExtension())
if err != nil {
return err
return
}
}
err = ps.AbstractSQLStream.init(ctx)
Expand Down Expand Up @@ -146,9 +157,11 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (s
}
_ = ps.batchFile.Close()
_ = os.Remove(ps.batchFile.Name())
ps.batchFile = nil
ps.eventsInBatch = 0
}()
if ps.eventsInBatch > 0 {
err = ps.tx.CreateTable(ctx, table)
if ps.batchFile != nil && ps.eventsInBatch > 0 {
_, err = ps.sqlAdapter.TableHelper().EnsureTableWithCaching(ctx, ps.tx, ps.id, table)
if err != nil {
return state, errorj.Decorate(err, "failed to create table")
}
Expand Down Expand Up @@ -345,6 +358,17 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (s
//}

func (ps *AbstractTransactionalSQLStream) writeToBatchFile(ctx context.Context, targetTable *Table, processedObject types.Object) error {
if ps.temporaryBatchSize > 0 && ps.eventsInBatch >= ps.temporaryBatchSize {
ws, err := ps.flushBatchFile(ctx)
ps.state.AddWarehouseState(ws)
if err != nil {
return err
}
err = ps.initTmpFile(ctx)
if err != nil {
return err
}
}
ps.adjustTables(ctx, targetTable, processedObject)
ps.updateRepresentationTable(ps.tmpTable)
err := ps.marshaller.InitSchema(ps.batchFile, nil, nil)
Expand Down Expand Up @@ -435,7 +459,7 @@ func (ps *AbstractTransactionalSQLStream) Consume(ctx context.Context, object ty
if err != nil {
return
}
batchFile := ps.batchFile != nil
batchFile := ps.localBatchFileName != ""
if batchFile {
err = ps.writeToBatchFile(ctx, tableForObject, processedObject)
} else {
Expand Down
42 changes: 36 additions & 6 deletions bulkerlib/implementations/sql/bigdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@ func TestMillionRows(t *testing.T) {
}
tests := []bulkerTestConfig{
{
name: "one_million_rows",
modes: []bulker.BulkMode{bulker.Batch},
expectedTable: ExpectedTable{
Columns: justColumns("_timestamp", "id", "name"),
},
name: "one_million_rows",
modes: []bulker.BulkMode{bulker.Batch},
batchSize: eventsCount + 1,
expectedRowsCount: eventsCount,
configIds: configIds,
},
Expand Down Expand Up @@ -80,6 +78,38 @@ func TestMillionRowsBatched(t *testing.T) {
})
}
}

func TestMillionRowsTmpBatches(t *testing.T) {
configsEnabled := os.Getenv("BULKER_TEST_MILLION_ROWS_TMP_BATCHES")
if configsEnabled == "" {
t.Skip("This test is disabled by default. To enable it set BULKER_TEST_MILLION_ROWS env variable with comma separated list of bulker config ids")
return
}
configIds := strings.Split(configsEnabled, ",")
configIds = utils.ArrayIntersection(allBulkerConfigs, configIds)
if len(configIds) == 0 {
t.Skipf("Test was skipped. IDs: %v is not among configured configs: %v", configIds, allBulkerConfigs)
return
}
tests := []bulkerTestConfig{
{
name: "one_million_rows_tmp_batches",
modes: []bulker.BulkMode{bulker.Batch},
batchSize: eventsCount + 1,
expectedRowsCount: eventsCount,
configIds: configIds,
streamOptions: []bulker.StreamOption{WithTemporaryBatchSize(34000)},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
runTestConfig(t, tt, testLotOfEvents)
})
}
}

func testLotOfEvents(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode) {
reqr := require.New(t)
blk, err := bulker.CreateBulker(*testConfig.config)
Expand Down Expand Up @@ -149,7 +179,7 @@ func testLotOfEvents(t *testing.T, testConfig bulkerTestConfig, mode bulker.Bulk
state, err := stream.Complete(ctx)
sqlAdapter = blk.(SQLAdapter)
PostStep("stream_complete", testConfig, mode, reqr, err)
logging.Infof("%d. batch is completed in %s", i, time.Since(startTime))
logging.Infof("%d. batch is completed in %s: state: %s", i, time.Since(startTime), state.String())

if testConfig.expectedState != nil {
reqr.Equal(*testConfig.expectedState, state)
Expand Down
1 change: 1 addition & 0 deletions bulkerlib/implementations/sql/bulker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode)
scanner := bufio.NewScanner(file)
i := 0
streamNum := 0
scanner.Buffer(make([]byte, 1024*10), 1024*1024*10)
for scanner.Scan() {
if i > 0 && testConfig.batchSize > 0 && i%testConfig.batchSize == 0 {
_, err := stream.Complete(ctx)
Expand Down
14 changes: 13 additions & 1 deletion bulkerlib/implementations/sql/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ var (
ParseFunc: utils.ParseInt,
}

TemporaryBatchSizeOption = bulker.ImplementationOption[int]{
Key: "temporaryBatchSize",
DefaultValue: 0,
ParseFunc: utils.ParseInt,
}

localBatchFileOption = bulker.ImplementationOption[string]{Key: "BULKER_OPTION_LOCAL_BATCH_FILE"}

s3BatchFileOption = bulker.ImplementationOption[*S3OptionConfig]{Key: "BULKER_OPTION_S3_BATCH_FILE"}
Expand All @@ -71,7 +77,7 @@ func init() {
bulker.RegisterOption(&OmitNilsOption)
bulker.RegisterOption(&SchemaFreezeOption)
bulker.RegisterOption(&MaxColumnsCount)

bulker.RegisterOption(&TemporaryBatchSizeOption)
}

type S3OptionConfig struct {
Expand Down Expand Up @@ -142,3 +148,9 @@ func withS3BatchFile(s3OptionConfig *S3OptionConfig) bulker.StreamOption {
s3BatchFileOption.Set(options, s3OptionConfig)
}
}

func WithTemporaryBatchSize(temporaryBatchSize int) bulker.StreamOption {
return func(options *bulker.StreamOptions) {
TemporaryBatchSizeOption.Set(options, temporaryBatchSize)
}
}
1 change: 1 addition & 0 deletions bulkerlib/implementations/sql/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (p *Redshift) LoadTable(ctx context.Context, targetTable *Table, loadSource
if s3Config.Folder != "" {
fileKey = s3Config.Folder + "/" + fileKey
}
_, _ = p.txOrDb(ctx).ExecContext(ctx, "SET json_parse_truncate_strings=ON")
statement := fmt.Sprintf(redshiftCopyTemplate, namespace, quotedTableName, strings.Join(columnNames, ","), s3Config.Bucket, fileKey, s3Config.AccessKeyID, s3Config.SecretKey, s3Config.Region)
if _, err := p.txOrDb(ctx).ExecContext(ctx, statement); err != nil {
return state, errorj.CopyError.Wrap(err, "failed to copy data from s3").
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/replacepartition_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (ps *ReplacePartitionStream) Complete(ctx context.Context) (state bulker.St
}
err = ps.clearPartition(ctx, ps.tx)
if err == nil && ps.state.SuccessfulRows > 0 {
if ps.batchFile != nil {
if ps.localBatchFileName != "" {
ws, err := ps.flushBatchFile(ctx)
ps.state.AddWarehouseState(ws)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/replacetable_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (ps *ReplaceTableStream) Complete(ctx context.Context) (state bulker.State,
if ps.state.LastError == nil {
//if at least one object was inserted
if ps.state.SuccessfulRows > 0 {
if ps.batchFile != nil {
if ps.localBatchFileName != "" {
ws, err := ps.flushBatchFile(ctx)
ps.state.AddWarehouseState(ws)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/transactional_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (ps *TransactionalStream) Complete(ctx context.Context) (state bulker.State
Name: "consume",
TimeProcessedMs: time.Since(ps.startTime).Milliseconds(),
})
if ps.batchFile != nil {
if ps.localBatchFileName != "" {
ws, err := ps.flushBatchFile(ctx)
ps.state.AddWarehouseState(ws)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion connectors/airbytecdk/go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/jitsucom/bulker/airbytecdk

go 1.22
go 1.23
2 changes: 1 addition & 1 deletion connectors/firebase.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ RUN apt-get install -y ca-certificates curl

ENV TZ=UTC

FROM golang:1.22.2-bullseye as build
FROM golang:1.23.1-bullseye as build

RUN apt-get install gcc libc6-dev

Expand Down
2 changes: 1 addition & 1 deletion connectors/firebase/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/jitsucom/bulker/connectors/firebase

go 1.22
go 1.23

require (
cloud.google.com/go/firestore v1.15.0
Expand Down
2 changes: 1 addition & 1 deletion eventslog/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/jitsucom/bulker/eventslog

go 1.22
go 1.23

require (
github.com/ClickHouse/clickhouse-go/v2 v2.26.0
Expand Down
4 changes: 2 additions & 2 deletions go.work
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
go 1.22.0
go 1.23

toolchain go1.22.4
toolchain go1.23.1

use (
./bulkerapp
Expand Down
2 changes: 1 addition & 1 deletion ingest.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ RUN apt-get install -y ca-certificates curl

ENV TZ=UTC

FROM golang:1.22.2-bullseye as build
FROM golang:1.23.1-bullseye as build

ARG VERSION
ENV VERSION $VERSION
Expand Down
6 changes: 3 additions & 3 deletions ingest/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/jitsucom/bulker/ingest

go 1.22.0
go 1.23

toolchain go1.22.4
toolchain go1.23.1

require (
github.com/confluentinc/confluent-kafka-go/v2 v2.5.0
Expand All @@ -11,6 +11,7 @@ require (
github.com/penglongli/gin-metrics v0.1.10
github.com/prometheus/client_golang v1.19.1
github.com/spf13/viper v1.19.0
golang.org/x/net v0.27.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)

Expand Down Expand Up @@ -65,7 +66,6 @@ require (
golang.org/x/arch v0.8.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions ingress-manager/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/jitsucom/bulker/ingress-manager

go 1.22.0
go 1.23

toolchain go1.22.4
toolchain go1.23.1

require (
cloud.google.com/go/certificatemanager v1.8.3
Expand Down
Loading

0 comments on commit 9792fe6

Please sign in to comment.