From 82f3aaf0dedb90548ff4164d3105f0abead53e38 Mon Sep 17 00:00:00 2001 From: Slava Markeyev Date: Tue, 20 Aug 2024 16:21:12 -0700 Subject: [PATCH] fix edge case with empty batch and msg too big (#134) * fix edge case with empty batch and msg too big * go imports * make linter happy * have batcher operate quicker * Add retry if ledger has not been cleanedup yet --- itests/common.bash | 35 ++++- itests/containers/defaults.env | 3 +- .../tests/kafka/test_big_record/envfile.env | 7 + .../tests/kafka/test_big_record/golden/test.0 | 5 + .../tests/kafka/test_big_record/input/001.sql | 18 +++ itests/tests/kafka/test_big_record/test.bats | 5 + transport/batcher/batcher.go | 22 ++-- transport/batcher/batcher_test.go | 120 ++++++++++++++---- transport/factory/factory.go | 1 + 9 files changed, 181 insertions(+), 35 deletions(-) create mode 100644 itests/tests/kafka/test_big_record/envfile.env create mode 100644 itests/tests/kafka/test_big_record/golden/test.0 create mode 100644 itests/tests/kafka/test_big_record/input/001.sql create mode 100644 itests/tests/kafka/test_big_record/test.bats diff --git a/itests/common.bash b/itests/common.bash index db848bac..e085f7d6 100644 --- a/itests/common.bash +++ b/itests/common.bash @@ -166,6 +166,35 @@ _check_lsn() { return 0 } +_wait_for_ledger_signal() { + bifrost_logs=$(docker logs bifrost 2>&1) + + if ! echo "${bifrost_logs}" | grep -q 'Got a signal 29'; then + return 1 + fi + + return 0 +} + +_check_ledger() { + docker kill -s SIGIO bifrost # dump ledger to stdout + _retry _wait_for_ledger_signal + bifrost_logs=$(docker logs bifrost 2>&1) + + if ! echo "${bifrost_logs}" | grep -q 'No ledger entries to dump'; then + if ! echo "${bifrost_logs}" | grep -q 'entry:'; then + log "ledger did not dump as expected" + return 1 + else + log "ledger had entries in it" + return 2 + fi + fi + + log "ledger was empty" + return 0 +} + _gather_test_output() { log "Exporting messages test output from containers" # Copy output directories from the containers to the host @@ -216,6 +245,11 @@ _verify() { FAILED=1 _retry _check_lsn FAILED=0 + + log "Verifying ledger is empty" + FAILED=1 + _retry _check_ledger + FAILED=0 } _profile() { @@ -269,7 +303,6 @@ teardown() { _end_timer # Print current state of the ledger for debugging - TEST_NAME=$BATS_TEST_DESCRIPTION docker-compose kill -s IO bifrost # dump ledger to stdout TEST_NAME=$BATS_TEST_DESCRIPTION docker-compose logs bifrost TEST_NAME=$BATS_TEST_DESCRIPTION docker-compose logs data-poller diff --git a/itests/containers/defaults.env b/itests/containers/defaults.env index 3de75c5f..d77a4a87 100644 --- a/itests/containers/defaults.env +++ b/itests/containers/defaults.env @@ -10,4 +10,5 @@ CREATE_SLOT=true AWS_ACCESS_KEY_ID=DUMMYACCESSKEYID AWS_SECRET_ACCESS_KEY=DUMMYSECRETACCESSKEY AWS_REGION=us-east-1 -NO_MARSHAL_OLD_VALUE=true \ No newline at end of file +NO_MARSHAL_OLD_VALUE=true +BATCHER_TICK_RATE=100 \ No newline at end of file diff --git a/itests/tests/kafka/test_big_record/envfile.env b/itests/tests/kafka/test_big_record/envfile.env new file mode 100644 index 00000000..ca6b2807 --- /dev/null +++ b/itests/tests/kafka/test_big_record/envfile.env @@ -0,0 +1,7 @@ +# Bifrost +WORKERS=1 +KAFKA_PARTITION_METHOD=transaction-constant + +# Verifier +KAFKA_PARTITION_COUNT=1 +EXPECTED_COUNT=5 diff --git a/itests/tests/kafka/test_big_record/golden/test.0 b/itests/tests/kafka/test_big_record/golden/test.0 new file mode 100644 index 00000000..70075628 --- /dev/null +++ b/itests/tests/kafka/test_big_record/golden/test.0 @@ -0,0 +1,5 @@ +{"table":"public.customers","operation":"INSERT","columns":{"first_name":{"new":{"q":"true","t":"text","v":"1111"}},"id":{"new":{"q":"false","t":"integer","v":"1"}},"last_name":{"new":{"q":"true","t":"text","v":"1111"}}}} +{"table":"public.customers","operation":"INSERT","columns":{"first_name":{"new":{"q":"true","t":"text","v":"3333"}},"id":{"new":{"q":"false","t":"integer","v":"3"}},"last_name":{"new":{"q":"true","t":"text","v":"3333"}}}} +{"table":"public.customers","operation":"INSERT","columns":{"first_name":{"new":{"q":"true","t":"text","v":"4444"}},"id":{"new":{"q":"false","t":"integer","v":"4"}},"last_name":{"new":{"q":"true","t":"text","v":"4444"}}}} +{"table":"public.customers","operation":"INSERT","columns":{"first_name":{"new":{"q":"true","t":"text","v":"6666"}},"id":{"new":{"q":"false","t":"integer","v":"6"}},"last_name":{"new":{"q":"true","t":"text","v":"6666"}}}} +{"table":"public.customers","operation":"INSERT","columns":{"first_name":{"new":{"q":"true","t":"text","v":"8888"}},"id":{"new":{"q":"false","t":"integer","v":"8"}},"last_name":{"new":{"q":"true","t":"text","v":"8888"}}}} diff --git a/itests/tests/kafka/test_big_record/input/001.sql b/itests/tests/kafka/test_big_record/input/001.sql new file mode 100644 index 00000000..63f60118 --- /dev/null +++ b/itests/tests/kafka/test_big_record/input/001.sql @@ -0,0 +1,18 @@ +CREATE TABLE customers (id serial primary key, first_name text, last_name text); +ALTER TABLE customers REPLICA IDENTITY FULL; + +INSERT INTO customers (first_name, last_name) VALUES ('1111', '1111'); +INSERT INTO customers (first_name, last_name) VALUES ('2222', repeat('2', 1048576)); +INSERT INTO customers (first_name, last_name) VALUES ('3333', '3333'); + +BEGIN; +INSERT INTO customers (first_name, last_name) VALUES ('4444', '4444'); +INSERT INTO customers (first_name, last_name) VALUES ('5555', repeat('5', 1048576)); +INSERT INTO customers (first_name, last_name) VALUES ('6666', '6666'); +COMMIT; + +BEGIN; +INSERT INTO customers (first_name, last_name) VALUES ('7777', repeat('7', 1048576)); +COMMIT; + +INSERT INTO customers (first_name, last_name) VALUES ('8888', '8888'); diff --git a/itests/tests/kafka/test_big_record/test.bats b/itests/tests/kafka/test_big_record/test.bats new file mode 100644 index 00000000..3d74290c --- /dev/null +++ b/itests/tests/kafka/test_big_record/test.bats @@ -0,0 +1,5 @@ +load ../../../common + +@test "kafka/test_big_record" { + do_test "kafka_topic_wait" +} diff --git a/transport/batcher/batcher.go b/transport/batcher/batcher.go index 940fb348..66ea9ce3 100644 --- a/transport/batcher/batcher.go +++ b/transport/batcher/batcher.go @@ -5,6 +5,8 @@ import ( "os" "time" + "github.com/cevaris/ordered_map" + "github.com/Nextdoor/pg-bifrost.git/marshaller" "github.com/Nextdoor/pg-bifrost.git/shutdown" "github.com/Nextdoor/pg-bifrost.git/stats" @@ -63,6 +65,7 @@ type Batcher struct { inputChan <-chan *marshaller.MarshalledMessage // receive single MarshalledMessages outputChans []chan transport.Batch // one output channel per Transporter worker txnsSeenChan chan<- []*progress.Seen // channel to report transactions seen to ProgressTracker + txnsWritten chan<- *ordered_map.OrderedMap // channel to report empty batches with commit messages statsChan chan stats.Stat tickRate time.Duration // controls frequency that batcher looks for input. This should be non-zero to avoid CPU spin. @@ -84,6 +87,7 @@ type Batcher struct { func NewBatcher(shutdownHandler shutdown.ShutdownHandler, inputChan <-chan *marshaller.MarshalledMessage, txnsSeenChan chan<- []*progress.Seen, + txnsWritten chan<- *ordered_map.OrderedMap, statsChan chan stats.Stat, tickRate int, // number of milliseconds that batcher will wait to check for input. @@ -110,6 +114,7 @@ func NewBatcher(shutdownHandler shutdown.ShutdownHandler, inputChan, outputChans, txnsSeenChan, + txnsWritten, statsChan, time.Duration(tickRate) * time.Millisecond, batchFactory, @@ -345,14 +350,9 @@ func (b *Batcher) handleTicker() bool { log.Debugf("flushing %s", key) curBatch := b.batches[key] - // If the batch is empty then don't send it. This could be the case - // when a new batch was created but nothing added to it. - if !curBatch.IsEmpty() { - ok := b.sendBatch(curBatch) - - if !ok { - return false - } + ok := b.sendBatch(curBatch) + if !ok { + return false } b.statsChan <- stats.NewStatCount("batcher", "batch_closed_early", 1, time.Now().UnixNano()) @@ -376,6 +376,12 @@ func (b *Batcher) sendBatch(batch transport.Batch) bool { } } + // On empty batches report their transactions as written because they may contain COMMITTs + if batch.IsEmpty() { + b.txnsWritten <- batch.GetTransactions() + return true + } + ok, err := batch.Close() if !ok { log.Error(err) diff --git a/transport/batcher/batcher_test.go b/transport/batcher/batcher_test.go index 2fe49b39..6e62b137 100644 --- a/transport/batcher/batcher_test.go +++ b/transport/batcher/batcher_test.go @@ -5,6 +5,8 @@ import ( "fmt" "testing" + "github.com/cevaris/ordered_map" + "github.com/Nextdoor/pg-bifrost.git/transport" "github.com/Nextdoor/pg-bifrost.git/transport/mocks" "github.com/golang/mock/gomock" @@ -28,12 +30,13 @@ func TestBatchSizeOneOneTxnOneData(t *testing.T) { in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen, 1000) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) statsChan := make(chan stats.Stat, 1000) batchFactory := batch.NewGenericBatchFactory(batchSize) sh := shutdown.NewShutdownHandler() - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, batchFactory, 1, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, batchFactory, 1, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) outs := b.GetOutputChans() go b.StartBatching() @@ -119,12 +122,13 @@ func TestBatchSizeOneOneTxnTwoData(t *testing.T) { in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen, 1000) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) statsChan := make(chan stats.Stat, 1000) batchFactory := batch.NewGenericBatchFactory(batchSize) sh := shutdown.NewShutdownHandler() - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, batchFactory, 1, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, batchFactory, 1, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) outs := b.GetOutputChans() go b.StartBatching() @@ -223,11 +227,13 @@ func TestBatchSizeThreeTwoTxnTwoData(t *testing.T) { in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen, 1000) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) + statsChan := make(chan stats.Stat, 1000) batchFactory := batch.NewGenericBatchFactory(batchSize) sh := shutdown.NewShutdownHandler() - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, batchFactory, 1, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, batchFactory, 1, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) outs := b.GetOutputChans() go b.StartBatching() @@ -400,11 +406,13 @@ func TestBatchSizeOneTwoTxnTwoWorkers(t *testing.T) { in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen, 1000) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) + statsChan := make(chan stats.Stat, 1000) batchFactory := batch.NewGenericBatchFactory(1) sh := shutdown.NewShutdownHandler() - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, batchFactory, 2, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, batchFactory, 2, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) outs := b.GetOutputChans() go b.StartBatching() @@ -527,11 +535,13 @@ func TestInputChannelClose(t *testing.T) { // Setup in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen, 1000) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) + statsChan := make(chan stats.Stat, 1000) batchFactory := batch.NewGenericBatchFactory(1) sh := shutdown.NewShutdownHandler() - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, batchFactory, 1, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, batchFactory, 1, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) outs := b.GetOutputChans() go b.StartBatching() @@ -570,43 +580,82 @@ func TestErrMsgTooBig(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen, 1000) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + omap1 := ordered_map.NewOrderedMap() + omap1.Set("1-1", &progress.Written{Transaction: "1", TimeBasedKey: "1-1", Count: 1}) + + flushMaxAge := 100 + + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, flushMaxAge, flushMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + + begin := &marshaller.MarshalledMessage{ + Operation: "BEGIN", + TimeBasedKey: "1-1", + Transaction: "1", + WalStart: 900, + } message := &marshaller.MarshalledMessage{ Operation: "INSERT", Json: []byte("{MSG1}"), - TimeBasedKey: "1", + TimeBasedKey: "1-1", + Transaction: "1", WalStart: 901, - PartitionKey: "foo", + } + + commit := &marshaller.MarshalledMessage{ + Operation: "COMMIT", + TimeBasedKey: "1-1", + Transaction: "1", + WalStart: 902, } // Expects - mockBatchFactory.EXPECT().NewBatch("foo").Return(mockBatch) + mockBatchFactory.EXPECT().NewBatch("").Return(mockBatch) + mockBatch.EXPECT().IsFull().Return(false) + in <- begin - // Loop iteration 1 - add a message that is too big - in <- message + // add a message that is too big mockBatch.EXPECT().IsFull().Return(false) mockBatch.EXPECT().Add(message).Return(false, errors.New(transport.ERR_MSG_TOOBIG)) - - // Loop iteration 2 - add a normal message in <- message + mockBatch.EXPECT().IsFull().Return(false) - mockBatch.EXPECT().Add(message).Return(true, nil) + in <- commit + + // On batcher tick + mockBatch.EXPECT().IsEmpty().Return(true) + mockBatch.EXPECT().ModifyTime().Return(int64(0)) + mockBatch.EXPECT().CreateTime().Return(int64(0)) + mockBatch.EXPECT().IsEmpty().Return(true) + mockBatch.EXPECT().IsFull().Return(false) + mockBatch.EXPECT().GetTransactions().Return(omap1) go b.StartBatching() // Let test run time.Sleep(time.Millisecond * 5) + // Expect to see batch produced + select { + case w := <-txnsWritten: + assert.Equal(t, omap1, w) + case <-time.After(DEFAULT_TICK_RATE*time.Millisecond + time.Millisecond*time.Duration(flushMaxAge)): + assert.Fail(t, "expected an update to txnsWritten") + } + // Shutdown test close(in) time.Sleep(time.Millisecond * 5) // Verify stats - expected := []stats.Stat{stats.NewStatCount("batcher", "dropped_too_big", 1, time.Now().UnixNano())} + expected := []stats.Stat{ + stats.NewStatCount("batcher", "dropped_too_big", 1, time.Now().UnixNano()), + stats.NewStatCount("batcher", "batch_closed_early", 1, time.Now().UnixNano()), + } stats.VerifyStats(t, statsChan, expected) } @@ -621,10 +670,12 @@ func TestErrCantFit(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen, 1000) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) + statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) message := &marshaller.MarshalledMessage{ Operation: "INSERT", @@ -636,6 +687,7 @@ func TestErrCantFit(t *testing.T) { // Expects mockBatchFactory.EXPECT().NewBatch("foo").Return(mockBatch) + mockBatch.EXPECT().IsEmpty().Return(false) // Loop iteration 1 - add a message that can't fit into existing batch in <- message @@ -679,10 +731,11 @@ func TestErrMsgInvalid(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen, 1000) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) message := &marshaller.MarshalledMessage{ Operation: "INSERT", @@ -730,10 +783,12 @@ func TestErrUnknown(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen, 1000) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) + statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, 500, 1000, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) outs := b.GetOutputChans() @@ -785,13 +840,15 @@ func TestFlushBatchTimeoutUpdate(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen, 1000) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) + statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() flushBatchUpdateAge := 500 flushBatchMaxAge := 1000000 - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, flushBatchUpdateAge, flushBatchMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, flushBatchUpdateAge, flushBatchMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) insert := &marshaller.MarshalledMessage{ Operation: "INSERT", @@ -837,13 +894,15 @@ func TestFlushBatchTimeoutMaxAge(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen, 1000) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) + statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() flushBatchUpdateAge := 9000 flushBatchMaxAge := 1000 - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, flushBatchUpdateAge, flushBatchMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, flushBatchUpdateAge, flushBatchMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) insert := &marshaller.MarshalledMessage{ Operation: "INSERT", @@ -889,13 +948,15 @@ func TestFlushFullBatch(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen, 1000) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) + statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() flushBatchUpdateAge := 500 flushBatchMaxAge := 1000 - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, flushBatchUpdateAge, flushBatchMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, flushBatchUpdateAge, flushBatchMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) commit := &marshaller.MarshalledMessage{ Operation: "INSERT", @@ -955,12 +1016,14 @@ func TestFlushEmptyBatchTimeout(t *testing.T) { in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen, 1000) statsChan := make(chan stats.Stat, 1000) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) + sh := shutdown.NewShutdownHandler() flushBatchUpdateAge := 500 flushBatchMaxAge := 1000 - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, flushBatchUpdateAge, flushBatchMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, flushBatchUpdateAge, flushBatchMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) go b.StartBatching() @@ -995,12 +1058,14 @@ func TestTxnsSeenTimeout(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen) // unbuffered channel which will block (this is the test) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) + statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() flushBatchUpdateAge := 500 flushBatchMaxAge := 1000 - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, flushBatchUpdateAge, flushBatchMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, flushBatchUpdateAge, flushBatchMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) b.txnsSeenTimeout = time.Millisecond * 50 commitA := &marshaller.MarshalledMessage{ @@ -1048,12 +1113,14 @@ func getBasicSetup(t *testing.T) (*gomock.Controller, chan *marshaller.Marshalle // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen, 1000) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) + statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() flushBatchUpdateAge := 500 flushBatchMaxAge := 1000 - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, flushBatchUpdateAge, flushBatchMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 1, flushBatchUpdateAge, flushBatchMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_ROUND_ROBIN) return mockCtrl, in, txnSeen, b, mockBatchFactory, mockBatch } @@ -1106,6 +1173,7 @@ func TestAddToBatchSendFatal(t *testing.T) { txnSeen := make(chan []*progress.Seen) b.txnsSeenChan = txnSeen + mockBatch.EXPECT().IsEmpty().Return(false) mockBatch.EXPECT().IsFull().Return(false) mockBatch.EXPECT().Add(commit).Return(false, errors.New(transport.ERR_CANT_FIT)) mockBatch.EXPECT().Close().Return(false, errors.New("expected error")) @@ -1376,12 +1444,14 @@ func TestPartitionRouting(t *testing.T) { // Setup IO in := make(chan *marshaller.MarshalledMessage, 1000) txnSeen := make(chan []*progress.Seen, 1000) + txnsWritten := make(chan *ordered_map.OrderedMap, 1000) + statsChan := make(chan stats.Stat, 1000) sh := shutdown.NewShutdownHandler() flushBatchUpdateAge := 500 flushBatchMaxAge := 1000 - b := NewBatcher(sh, in, txnSeen, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 2, flushBatchUpdateAge, flushBatchMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_PARTITION) + b := NewBatcher(sh, in, txnSeen, txnsWritten, statsChan, DEFAULT_TICK_RATE, mockBatchFactory, 2, flushBatchUpdateAge, flushBatchMaxAge, 1, DEFAULT_MAX_MEMORY_BYTES, BATCH_ROUTING_PARTITION) defer mockCtrl.Finish() diff --git a/transport/factory/factory.go b/transport/factory/factory.go index 1aa3c14e..a7de822d 100644 --- a/transport/factory/factory.go +++ b/transport/factory/factory.go @@ -71,6 +71,7 @@ func NewTransport(shutdownHandler shutdown.ShutdownHandler, shutdownHandler, inputChan, txnsSeen, + txnsWritten, statsChan, batcherTickRate, batchFactory,