Skip to content

Commit

Permalink
Replace panic with error handling
Browse files Browse the repository at this point in the history
Starting from the processor.Block.Process all methods now return errors
if something goes wrong with unpacking of the blocks and reading the
transactions. The error is logged to the terminal and the go routine
shuts down gracefully which in tern shuts down gracefully the client,
i.e. listen.go and executes all deferred functions and closes the
context, the checkpointer and the gateway.

Before panics were used everywhere which was an issue because the
unpacking of the blocks happened in a go routine. When a panic happens
in a go routine only the deferred functions of the go routine are called
but not those of the client which lead to unexpected behavior.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
  • Loading branch information
twoGiants committed Jan 7, 2025
1 parent 4bb1670 commit 433bd6a
Show file tree
Hide file tree
Showing 10 changed files with 479 additions and 35 deletions.
4 changes: 2 additions & 2 deletions off_chain_data/application-go/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func listen(clientConnection *grpc.ClientConn) {
channelName,
)

if err := blockProcessor.Process(); err == store.ErrExpected {
fmt.Println(err)
if err := blockProcessor.ProcessV2(); err != nil {
fmt.Println("\033[31m[ERROR]\033[0m", err)
return
}
}
Expand Down
99 changes: 99 additions & 0 deletions off_chain_data/application-go/parser/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type Block struct {
}

func ParseBlock(block *common.Block) *Block {
// TODO add error handling because of nil init
// use null object pattern as in validTransaction.ChannelHeader().GetTxId()
return &Block{block, nil}
}

Expand All @@ -22,6 +24,14 @@ func (b *Block) Number() uint64 {
return header.GetNumber()
}

func (b *Block) NumberV2() (uint64, error) {
header, err := utils.AssertDefinedV2(b.block.GetHeader(), "missing block header")
if err != nil {
return 0, err
}
return header.GetNumber(), err
}

func (b *Block) Transactions() []*Transaction {
return utils.Cache(func() []*Transaction {
envelopes := b.unmarshalEnvelopesFromBlockData()
Expand All @@ -34,6 +44,27 @@ func (b *Block) Transactions() []*Transaction {
})()
}

func (b *Block) TransactionsV2() ([]*Transaction, error) {
return utils.CacheV2(func() ([]*Transaction, error) {
envelopes, err := b.unmarshalEnvelopesFromBlockDataV2()
if err != nil {
return nil, err
}

commonPayloads, err := b.unmarshalPayloadsFromV2(envelopes)
if err != nil {
return nil, err
}

payloads, err := b.parseV2(commonPayloads)
if err != nil {
return nil, err
}

return b.createTransactionsFrom(payloads), nil
})()
}

func (b *Block) unmarshalEnvelopesFromBlockData() []*common.Envelope {
result := []*common.Envelope{}
for _, blockData := range b.block.GetData().GetData() {
Expand All @@ -46,6 +77,18 @@ func (b *Block) unmarshalEnvelopesFromBlockData() []*common.Envelope {
return result
}

func (b *Block) unmarshalEnvelopesFromBlockDataV2() ([]*common.Envelope, error) {
result := []*common.Envelope{}
for _, blockData := range b.block.GetData().GetData() {
envelope := &common.Envelope{}
if err := proto.Unmarshal(blockData, envelope); err != nil {
return nil, err
}
result = append(result, envelope)
}
return result, nil
}

func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) []*common.Payload {
result := []*common.Payload{}
for _, envelope := range envelopes {
Expand All @@ -58,6 +101,18 @@ func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) []*common.Payl
return result
}

func (*Block) unmarshalPayloadsFromV2(envelopes []*common.Envelope) ([]*common.Payload, error) {
result := []*common.Payload{}
for _, envelope := range envelopes {
commonPayload := &common.Payload{}
if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil {
return nil, err
}
result = append(result, commonPayload)
}
return result, nil
}

func (b *Block) parse(commonPayloads []*common.Payload) []*payload {
validationCodes := b.extractTransactionValidationCodes()
result := []*payload{}
Expand All @@ -77,6 +132,35 @@ func (b *Block) parse(commonPayloads []*common.Payload) []*payload {
return result
}

func (b *Block) parseV2(commonPayloads []*common.Payload) ([]*payload, error) {
validationCodes, err := b.extractTransactionValidationCodesV2()
if err != nil {
return nil, err
}

result := []*payload{}
for i, commonPayload := range commonPayloads {
statusCode, err := utils.AssertDefinedV2(
validationCodes[i],
fmt.Sprint("missing validation code index", i),
)
if err != nil {
return nil, err
}

payload := parsePayload(commonPayload, int32(statusCode))
is, err := payload.isEndorserTransactionV2()
if err != nil {
return nil, err
}
if is {
result = append(result, payload)
}
}

return result, nil
}

func (b *Block) extractTransactionValidationCodes() []byte {
metadata := utils.AssertDefined(
b.block.GetMetadata(),
Expand All @@ -89,6 +173,21 @@ func (b *Block) extractTransactionValidationCodes() []byte {
)
}

func (b *Block) extractTransactionValidationCodesV2() ([]byte, error) {
metadata, err := utils.AssertDefinedV2(
b.block.GetMetadata(),
"missing block metadata",
)
if err != nil {
return nil, err
}

return utils.AssertDefinedV2(
metadata.GetMetadata()[common.BlockMetadataIndex_TRANSACTIONS_FILTER],
"missing transaction validation code",
)
}

func (*Block) createTransactionsFrom(payloads []*payload) []*Transaction {
result := []*Transaction{}
for _, payload := range payloads {
Expand Down
99 changes: 99 additions & 0 deletions off_chain_data/application-go/parser/endorserTransaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,37 @@ func (p *endorserTransaction) readWriteSets() []*readWriteSet {
})()
}

func (p *endorserTransaction) readWriteSetsV2() ([]*readWriteSet, error) {
return utils.CacheV2(func() ([]*readWriteSet, error) {
chaincodeActionPayloads, err := p.unmarshalChaincodeActionPayloadsV2()
if err != nil {
return nil, err
}

chaincodeEndorsedActions, err := p.extractChaincodeEndorsedActionsFromV2(chaincodeActionPayloads)
if err != nil {
return nil, err
}

proposalResponsePayloads, err := p.unmarshalProposalResponsePayloadsFromV2(chaincodeEndorsedActions)
if err != nil {
return nil, err
}

chaincodeActions, err := p.unmarshalChaincodeActionsFromV2(proposalResponsePayloads)
if err != nil {
return nil, err
}

txReadWriteSets, err := p.unmarshalTxReadWriteSetsFromV2(chaincodeActions)
if err != nil {
return nil, err
}

return p.parseReadWriteSets(txReadWriteSets), nil
})()
}

func (p *endorserTransaction) unmarshalChaincodeActionPayloads() []*peer.ChaincodeActionPayload {
result := []*peer.ChaincodeActionPayload{}
for _, transactionAction := range p.transaction.GetActions() {
Expand All @@ -45,6 +76,19 @@ func (p *endorserTransaction) unmarshalChaincodeActionPayloads() []*peer.Chainco
return result
}

func (p *endorserTransaction) unmarshalChaincodeActionPayloadsV2() ([]*peer.ChaincodeActionPayload, error) {
result := []*peer.ChaincodeActionPayload{}
for _, transactionAction := range p.transaction.GetActions() {
chaincodeActionPayload := &peer.ChaincodeActionPayload{}
if err := proto.Unmarshal(transactionAction.GetPayload(), chaincodeActionPayload); err != nil {
return nil, err
}

result = append(result, chaincodeActionPayload)
}
return result, nil
}

func (*endorserTransaction) extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads []*peer.ChaincodeActionPayload) []*peer.ChaincodeEndorsedAction {
result := []*peer.ChaincodeEndorsedAction{}
for _, payload := range chaincodeActionPayloads {
Expand All @@ -59,6 +103,25 @@ func (*endorserTransaction) extractChaincodeEndorsedActionsFrom(chaincodeActionP
return result
}

func (*endorserTransaction) extractChaincodeEndorsedActionsFromV2(chaincodeActionPayloads []*peer.ChaincodeActionPayload) ([]*peer.ChaincodeEndorsedAction, error) {
result := []*peer.ChaincodeEndorsedAction{}
for _, payload := range chaincodeActionPayloads {
chaincodeEndorsedAction, err := utils.AssertDefinedV2(
payload.GetAction(),
"missing chaincode endorsed action",
)
if err != nil {
return nil, err
}

result = append(
result,
chaincodeEndorsedAction,
)
}
return result, nil
}

func (*endorserTransaction) unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions []*peer.ChaincodeEndorsedAction) []*peer.ProposalResponsePayload {
result := []*peer.ProposalResponsePayload{}
for _, endorsedAction := range chaincodeEndorsedActions {
Expand All @@ -71,6 +134,18 @@ func (*endorserTransaction) unmarshalProposalResponsePayloadsFrom(chaincodeEndor
return result
}

func (*endorserTransaction) unmarshalProposalResponsePayloadsFromV2(chaincodeEndorsedActions []*peer.ChaincodeEndorsedAction) ([]*peer.ProposalResponsePayload, error) {
result := []*peer.ProposalResponsePayload{}
for _, endorsedAction := range chaincodeEndorsedActions {
proposalResponsePayload := &peer.ProposalResponsePayload{}
if err := proto.Unmarshal(endorsedAction.GetProposalResponsePayload(), proposalResponsePayload); err != nil {
return nil, err
}
result = append(result, proposalResponsePayload)
}
return result, nil
}

func (*endorserTransaction) unmarshalChaincodeActionsFrom(proposalResponsePayloads []*peer.ProposalResponsePayload) []*peer.ChaincodeAction {
result := []*peer.ChaincodeAction{}
for _, proposalResponsePayload := range proposalResponsePayloads {
Expand All @@ -83,6 +158,18 @@ func (*endorserTransaction) unmarshalChaincodeActionsFrom(proposalResponsePayloa
return result
}

func (*endorserTransaction) unmarshalChaincodeActionsFromV2(proposalResponsePayloads []*peer.ProposalResponsePayload) ([]*peer.ChaincodeAction, error) {
result := []*peer.ChaincodeAction{}
for _, proposalResponsePayload := range proposalResponsePayloads {
chaincodeAction := &peer.ChaincodeAction{}
if err := proto.Unmarshal(proposalResponsePayload.GetExtension(), chaincodeAction); err != nil {
return nil, err
}
result = append(result, chaincodeAction)
}
return result, nil
}

func (*endorserTransaction) unmarshalTxReadWriteSetsFrom(chaincodeActions []*peer.ChaincodeAction) []*rwset.TxReadWriteSet {
result := []*rwset.TxReadWriteSet{}
for _, chaincodeAction := range chaincodeActions {
Expand All @@ -95,6 +182,18 @@ func (*endorserTransaction) unmarshalTxReadWriteSetsFrom(chaincodeActions []*pee
return result
}

func (*endorserTransaction) unmarshalTxReadWriteSetsFromV2(chaincodeActions []*peer.ChaincodeAction) ([]*rwset.TxReadWriteSet, error) {
result := []*rwset.TxReadWriteSet{}
for _, chaincodeAction := range chaincodeActions {
txReadWriteSet := &rwset.TxReadWriteSet{}
if err := proto.Unmarshal(chaincodeAction.GetResults(), txReadWriteSet); err != nil {
return nil, err
}
result = append(result, txReadWriteSet)
}
return result, nil
}

func (*endorserTransaction) parseReadWriteSets(txReadWriteSets []*rwset.TxReadWriteSet) []*readWriteSet {
result := []*readWriteSet{}
for _, txReadWriteSet := range txReadWriteSets {
Expand Down
11 changes: 11 additions & 0 deletions off_chain_data/application-go/parser/namespaceReadWriteSet.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,14 @@ func (p *NamespaceReadWriteSet) ReadWriteSet() *kvrwset.KVRWSet {
return &result
})()
}

func (p *NamespaceReadWriteSet) ReadWriteSetV2() (*kvrwset.KVRWSet, error) {
return utils.CacheV2(func() (*kvrwset.KVRWSet, error) {
result := kvrwset.KVRWSet{}
if err := proto.Unmarshal(p.nsReadWriteSet.GetRwset(), &result); err != nil {
return nil, err
}

return &result, nil
})()
}
46 changes: 46 additions & 0 deletions off_chain_data/application-go/parser/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ func (p *payload) channelHeader() *common.ChannelHeader {
})()
}

func (p *payload) channelHeaderV2() (*common.ChannelHeader, error) {
return utils.CacheV2(func() (*common.ChannelHeader, error) {
header, err := utils.AssertDefinedV2(p.commonPayload.GetHeader(), "missing payload header")
if err != nil {
return nil, err
}

result := &common.ChannelHeader{}
if err := proto.Unmarshal(header.GetChannelHeader(), result); err != nil {
return nil, err
}

return result, nil
})()
}

func (p *payload) endorserTransaction() *endorserTransaction {
if !p.isEndorserTransaction() {
panic(fmt.Errorf("unexpected payload type: %d", p.channelHeader().GetType()))
Expand All @@ -44,10 +60,40 @@ func (p *payload) endorserTransaction() *endorserTransaction {
return parseEndorserTransaction(result)
}

func (p *payload) endorserTransactionV2() (*endorserTransaction, error) {
is, err := p.isEndorserTransactionV2()
if err != nil {
return nil, err
}
if !is {
channelHeader, err := p.channelHeaderV2()
if err != nil {
return nil, err
}
return nil, fmt.Errorf("unexpected payload type: %d", channelHeader.GetType())
}

result := &peer.Transaction{}
if err := proto.Unmarshal(p.commonPayload.GetData(), result); err != nil {
return nil, err
}

return parseEndorserTransaction(result), nil
}

func (p *payload) isEndorserTransaction() bool {
return p.channelHeader().GetType() == int32(common.HeaderType_ENDORSER_TRANSACTION)
}

func (p *payload) isEndorserTransactionV2() (bool, error) {
channelHeader, err := p.channelHeaderV2()
if err != nil {
return false, err
}

return channelHeader.GetType() == int32(common.HeaderType_ENDORSER_TRANSACTION), nil
}

func (p *payload) isValid() bool {
return p.statusCode == int32(peer.TxValidationCode_VALID)
}
Loading

0 comments on commit 433bd6a

Please sign in to comment.