Skip to content

Commit

Permalink
Fix simulated failure issue
Browse files Browse the repository at this point in the history
Before all transactions were processed and when the failure was
simulated a message was printed and all the transactions still
processed. Now the store returns an error when the failure is simulated
which the listener expects so that it can gracefully shutdown the system
and close the context. The context must be closed correctly or the
checkpointer won't save the last processed transactionId to the file
system.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
  • Loading branch information
twoGiants committed Jan 4, 2025
1 parent f465eb8 commit 43f67fb
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 17 deletions.
17 changes: 12 additions & 5 deletions off_chain_data/application-go/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,20 @@ func listen(clientConnection *grpc.ClientConn) {
fmt.Printf("Simulating a write failure every %d transactions\n", store.SimulatedFailureCount)
}

ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
ctx, close := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer func() {
stop()
close()
fmt.Println("Context closed.")
}()

network := gateway.GetNetwork(channelName)
blocks, err := network.BlockEvents(
ctx,
// Used only if there is no checkpoint block number.
// Order matters. WithStartBlock must be set before
// WithCheckpoint to work.
client.WithStartBlock(0),
client.WithCheckpoint(checkpointer),
client.WithStartBlock(0), // Used only if there is no checkpoint block number
)
if err != nil {
panic(err)
Expand All @@ -76,11 +79,15 @@ func listen(clientConnection *grpc.ClientConn) {
store.ApplyWritesToOffChainStore,
channelName,
)
blockProcessor.Process()

if err := blockProcessor.Process(); err != nil && err.Error() == "[expected error]: simulated write failure" {
fmt.Println(err.Error())
return
}
}
}
}()

wg.Wait()
fmt.Println("\nReceived 'SIGTERM' signal. Shutting down listener gracefully...")
fmt.Println("\nShutting down listener gracefully...")
}
8 changes: 6 additions & 2 deletions off_chain_data/application-go/processor/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewBlock(
}
}

func (b *block) Process() {
func (b *block) Process() error {
blockNumber := b.parsedBlock.Number()

fmt.Println("\nReceived block", blockNumber)
Expand All @@ -42,13 +42,17 @@ func (b *block) Process() {
b.writeToStore,
b.channelName,
}
aTransaction.process()
if err := aTransaction.process(); err != nil {
return err
}

transactionId := validTransaction.ChannelHeader().GetTxId()
b.checkpointer.CheckpointTransaction(blockNumber, transactionId)
}

b.checkpointer.CheckpointBlock(b.parsedBlock.Number())

return nil
}

func (b *block) validTransactions() []*parser.Transaction {
Expand Down
11 changes: 7 additions & 4 deletions off_chain_data/application-go/processor/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,25 @@ type transaction struct {
channelName string
}

func (t *transaction) process() {
func (t *transaction) process() error {
transactionId := t.transaction.ChannelHeader().GetTxId()

writes := t.writes()
if len(writes) == 0 {
fmt.Println("Skipping read-only or system transaction", transactionId)
return
return nil
}

fmt.Println("Process transaction", transactionId)

t.writeToStore(store.LedgerUpdate{
if err := t.writeToStore(store.LedgerUpdate{
BlockNumber: t.blockNumber,
TransactionId: transactionId,
Writes: writes,
})
}); err != nil {
return err
}
return nil
}

func (t *transaction) writes() []store.Write {
Expand Down
10 changes: 5 additions & 5 deletions off_chain_data/application-go/store/flatFille.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ var transactionCount uint = 0 // Used only to simulate failures

// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
// This implementation just writes to a file.
func ApplyWritesToOffChainStore(data LedgerUpdate) {
func ApplyWritesToOffChainStore(data LedgerUpdate) error {
if err := simulateFailureIfRequired(); err != nil {
fmt.Println("[expected error]: " + err.Error())
return
return err
}

writes := []string{}
for _, write := range data.Writes {
// TODO write also the TxID and block number so that you can compare easier to the output
marshaled, err := json.Marshal(write)
if err != nil {
panic(err)
Expand All @@ -47,12 +45,14 @@ func ApplyWritesToOffChainStore(data LedgerUpdate) {
if err := f.Close(); err != nil {
panic(err)
}

return nil
}

func simulateFailureIfRequired() error {
if SimulatedFailureCount > 0 && transactionCount >= SimulatedFailureCount {
transactionCount = 0
return errors.New("simulated write failure")
return errors.New("[expected error]: simulated write failure")
}

transactionCount += 1
Expand Down
2 changes: 1 addition & 1 deletion off_chain_data/application-go/store/model.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package store

// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
type Writer = func(data LedgerUpdate)
type Writer = func(data LedgerUpdate) error

// Ledger update made by a specific transaction.
type LedgerUpdate struct {
Expand Down

0 comments on commit 43f67fb

Please sign in to comment.