diff --git a/off_chain_data/application-go/listen.go b/off_chain_data/application-go/listen.go index d6200dbae..60d499fc2 100644 --- a/off_chain_data/application-go/listen.go +++ b/off_chain_data/application-go/listen.go @@ -43,17 +43,20 @@ func listen(clientConnection *grpc.ClientConn) { fmt.Printf("Simulating a write failure every %d transactions\n", store.SimulatedFailureCount) } - ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + ctx, close := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer func() { - stop() + close() fmt.Println("Context closed.") }() network := gateway.GetNetwork(channelName) blocks, err := network.BlockEvents( ctx, + // Used only if there is no checkpoint block number. + // Order matters. WithStartBlock must be set before + // WithCheckpoint to work. + client.WithStartBlock(0), client.WithCheckpoint(checkpointer), - client.WithStartBlock(0), // Used only if there is no checkpoint block number ) if err != nil { panic(err) @@ -76,11 +79,15 @@ func listen(clientConnection *grpc.ClientConn) { store.ApplyWritesToOffChainStore, channelName, ) - blockProcessor.Process() + + if err := blockProcessor.Process(); err != nil && err.Error() == "[expected error]: simulated write failure" { + fmt.Println(err.Error()) + return + } } } }() wg.Wait() - fmt.Println("\nReceived 'SIGTERM' signal. Shutting down listener gracefully...") + fmt.Println("\nShutting down listener gracefully...") } diff --git a/off_chain_data/application-go/processor/block.go b/off_chain_data/application-go/processor/block.go index 3734b2cd6..fcf44c9f8 100644 --- a/off_chain_data/application-go/processor/block.go +++ b/off_chain_data/application-go/processor/block.go @@ -29,7 +29,7 @@ func NewBlock( } } -func (b *block) Process() { +func (b *block) Process() error { blockNumber := b.parsedBlock.Number() fmt.Println("\nReceived block", blockNumber) @@ -42,13 +42,17 @@ func (b *block) Process() { b.writeToStore, b.channelName, } - aTransaction.process() + if err := aTransaction.process(); err != nil { + return err + } transactionId := validTransaction.ChannelHeader().GetTxId() b.checkpointer.CheckpointTransaction(blockNumber, transactionId) } b.checkpointer.CheckpointBlock(b.parsedBlock.Number()) + + return nil } func (b *block) validTransactions() []*parser.Transaction { diff --git a/off_chain_data/application-go/processor/transaction.go b/off_chain_data/application-go/processor/transaction.go index 3ff78eddc..5d705279f 100644 --- a/off_chain_data/application-go/processor/transaction.go +++ b/off_chain_data/application-go/processor/transaction.go @@ -14,22 +14,25 @@ type transaction struct { channelName string } -func (t *transaction) process() { +func (t *transaction) process() error { transactionId := t.transaction.ChannelHeader().GetTxId() writes := t.writes() if len(writes) == 0 { fmt.Println("Skipping read-only or system transaction", transactionId) - return + return nil } fmt.Println("Process transaction", transactionId) - t.writeToStore(store.LedgerUpdate{ + if err := t.writeToStore(store.LedgerUpdate{ BlockNumber: t.blockNumber, TransactionId: transactionId, Writes: writes, - }) + }); err != nil { + return err + } + return nil } func (t *transaction) writes() []store.Write { diff --git a/off_chain_data/application-go/store/flatFille.go b/off_chain_data/application-go/store/flatFille.go index b05379ac2..0d356bdc2 100644 --- a/off_chain_data/application-go/store/flatFille.go +++ b/off_chain_data/application-go/store/flatFille.go @@ -17,15 +17,13 @@ var transactionCount uint = 0 // Used only to simulate failures // Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. // This implementation just writes to a file. -func ApplyWritesToOffChainStore(data LedgerUpdate) { +func ApplyWritesToOffChainStore(data LedgerUpdate) error { if err := simulateFailureIfRequired(); err != nil { - fmt.Println("[expected error]: " + err.Error()) - return + return err } writes := []string{} for _, write := range data.Writes { - // TODO write also the TxID and block number so that you can compare easier to the output marshaled, err := json.Marshal(write) if err != nil { panic(err) @@ -47,12 +45,14 @@ func ApplyWritesToOffChainStore(data LedgerUpdate) { if err := f.Close(); err != nil { panic(err) } + + return nil } func simulateFailureIfRequired() error { if SimulatedFailureCount > 0 && transactionCount >= SimulatedFailureCount { transactionCount = 0 - return errors.New("simulated write failure") + return errors.New("[expected error]: simulated write failure") } transactionCount += 1 diff --git a/off_chain_data/application-go/store/model.go b/off_chain_data/application-go/store/model.go index 1a6e8f33f..a68efbea8 100644 --- a/off_chain_data/application-go/store/model.go +++ b/off_chain_data/application-go/store/model.go @@ -1,7 +1,7 @@ package store // Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. -type Writer = func(data LedgerUpdate) +type Writer = func(data LedgerUpdate) error // Ledger update made by a specific transaction. type LedgerUpdate struct {