diff --git a/asset-transfer-private-data/application-gateway-go/connect.go b/asset-transfer-private-data/application-gateway-go/connect.go index 3bd768634c..334ffd9fb4 100644 --- a/asset-transfer-private-data/application-gateway-go/connect.go +++ b/asset-transfer-private-data/application-gateway-go/connect.go @@ -1,5 +1,5 @@ /* -Copyright 2022 IBM All Rights Reserved. +Copyright 2024 IBM All Rights Reserved. SPDX-License-Identifier: Apache-2.0 */ @@ -77,8 +77,8 @@ func newIdentity(certDirectoryPath, mspId string) *identity.X509Identity { } // newSign creates a function that generates a digital signature from a message digest using a private key. -func newSign(keyDirectoryPash string) identity.Sign { - privateKeyPEM, err := readFirstFile(keyDirectoryPash) +func newSign(keyDirectoryPath string) identity.Sign { + privateKeyPEM, err := readFirstFile(keyDirectoryPath) if err != nil { panic(fmt.Errorf("failed to read private key file: %w", err)) } diff --git a/ci/scripts/run-test-network-off-chain.sh b/ci/scripts/run-test-network-off-chain.sh index 872cda415a..dc7bfc1e18 100755 --- a/ci/scripts/run-test-network-off-chain.sh +++ b/ci/scripts/run-test-network-off-chain.sh @@ -41,11 +41,22 @@ print "Initializing Typescript off-chain data application" pushd ../off_chain_data/application-typescript rm -f checkpoint.json store.log npm install -print "Running the output app" +print "Running the Typescript app" SIMULATED_FAILURE_COUNT=1 npm start getAllAssets transact getAllAssets listen SIMULATED_FAILURE_COUNT=1 npm start listen popd +# Run off-chain data Go application +export CHAINCODE_NAME=go_off_chain_data +deployChaincode +print "Initializing Go off-chain data application" +pushd ../off_chain_data/application-go +rm -f checkpoint.json store.log +print "Running the Go app" +SIMULATED_FAILURE_COUNT=1 go run . getAllAssets transact getAllAssets listen +SIMULATED_FAILURE_COUNT=1 go run . listen +popd + # Run off-chain data Java application #createNetwork export CHAINCODE_NAME=off_chain_data diff --git a/off_chain_data/README.md b/off_chain_data/README.md index 3b931492ac..22e1948f80 100644 --- a/off_chain_data/README.md +++ b/off_chain_data/README.md @@ -28,7 +28,7 @@ The client application provides several "commands" that can be invoked using the To keep the sample code concise, the **listen** command writes ledger updates to an output file named `store.log` in the current working directory (which for the Java sample is the `application-java/app` directory). A real implementation could write ledger updates directly to an off-chain data store of choice. You can inspect the information captured in this file as you run the sample. -Note that the **listen** command is is restartable and will resume event listening after the last successfully processed block / transaction. This is achieved using a checkpointer to persist the current listening position. Checkpoint state is persisted to a file named `checkpoint.json` in the current working directory. If no checkpoint state is present, event listening begins from the start of the ledger (block number zero). +Note that the **listen** command is restartable and will resume event listening after the last successfully processed block / transaction. This is achieved using a checkpointer to persist the current listening position. Checkpoint state is persisted to a file named `checkpoint.json` in the current working directory. If no checkpoint state is present, event listening begins from the start of the ledger (block number zero). ### Smart Contract @@ -65,6 +65,10 @@ The Fabric test network is used to deploy and run this sample. Follow these step npm install npm start transact listen + # To run the Go sample application + cd application-go + go run . transact listen + # To run the Java sample application cd application-java ./gradlew run --quiet --args='transact listen' @@ -79,6 +83,10 @@ The Fabric test network is used to deploy and run this sample. Follow these step cd application-typescript npm --silent start getAllAssets + # To run the Go sample application + cd application-go + go run . getAllAssets + # To run the Java sample application cd application-java ./gradlew run --quiet --args=getAllAssets @@ -93,6 +101,12 @@ The Fabric test network is used to deploy and run this sample. Follow these step SIMULATED_FAILURE_COUNT=5 npm start listen npm start listen + # To run the Go sample application + cd application-go + go run . transact + SIMULATED_FAILURE_COUNT=5 go run . listen + go run . listen + # To run the Java sample application cd application-java ./gradlew run --quiet --args=transact @@ -112,4 +126,4 @@ When you are finished, you can bring down the test network (from the `test-netwo ``` ./network.sh down -``` \ No newline at end of file +``` diff --git a/off_chain_data/application-go/app.go b/off_chain_data/application-go/app.go new file mode 100644 index 0000000000..07d46dd8cc --- /dev/null +++ b/off_chain_data/application-go/app.go @@ -0,0 +1,62 @@ +/* + * Copyright 2024 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package main + +import ( + "errors" + "fmt" + "os" + "strings" + + "google.golang.org/grpc" +) + +var allCommands = map[string]func(clientConnection *grpc.ClientConn){ + "getAllAssets": getAllAssets, + "transact": transact, + "listen": listen, +} + +func main() { + commands := os.Args[1:] + if len(commands) == 0 { + printUsage() + panic(errors.New("missing command")) + } + + for _, name := range commands { + if _, exists := allCommands[name]; !exists { + printUsage() + panic(fmt.Errorf("unknown command: %s", name)) + } + fmt.Println("command:", name) + } + + client := newGrpcConnection() + defer client.Close() + + for _, name := range commands { + command := allCommands[name] + command(client) + } +} + +func printUsage() { + fmt.Println("Arguments: [ ...]") + fmt.Println("Available commands:", availableCommands()) +} + +func availableCommands() string { + result := make([]string, len(allCommands)) + i := 0 + for command := range allCommands { + result[i] = command + i++ + } + + return strings.Join(result, ", ") +} diff --git a/off_chain_data/application-go/connect.go b/off_chain_data/application-go/connect.go new file mode 100644 index 0000000000..728e9888dd --- /dev/null +++ b/off_chain_data/application-go/connect.go @@ -0,0 +1,135 @@ +/* + * Copyright 2024 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package main + +import ( + "crypto/x509" + "fmt" + "offChainData/utils" + "os" + "path" + "time" + + "github.com/hyperledger/fabric-gateway/pkg/client" + "github.com/hyperledger/fabric-gateway/pkg/hash" + "github.com/hyperledger/fabric-gateway/pkg/identity" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +const peerName = "peer0.org1.example.com" + +var ( + channelName = utils.EnvOrDefault("CHANNEL_NAME", "mychannel") + chaincodeName = utils.EnvOrDefault("CHAINCODE_NAME", "basic") + mspID = utils.EnvOrDefault("MSP_ID", "Org1MSP") + + // Path to crypto materials. + cryptoPath = utils.EnvOrDefault("CRYPTO_PATH", "../../test-network/organizations/peerOrganizations/org1.example.com") + + // Path to user private key directory. + keyDirectoryPath = utils.EnvOrDefault("KEY_DIRECTORY_PATH", cryptoPath+"/users/User1@org1.example.com/msp/keystore") + + // Path to user certificate. + certPath = utils.EnvOrDefault("CERT_PATH", cryptoPath+"/users/User1@org1.example.com/msp/signcerts/cert.pem") + + // Path to peer tls certificate. + tlsCertPath = utils.EnvOrDefault("TLS_CERT_PATH", cryptoPath+"/peers/peer0.org1.example.com/tls/ca.crt") + + // Gateway peer endpoint. + peerEndpoint = utils.EnvOrDefault("PEER_ENDPOINT", "dns:///localhost:7051") + + // Gateway peer SSL host name override. + peerHostAlias = utils.EnvOrDefault("PEER_HOST_ALIAS", peerName) +) + +func newGrpcConnection() *grpc.ClientConn { + certificatePEM, err := os.ReadFile(tlsCertPath) + if err != nil { + panic(fmt.Errorf("failed to read TLS certificate file: %w", err)) + } + + certificate, err := identity.CertificateFromPEM(certificatePEM) + if err != nil { + panic(err) + } + + certPool := x509.NewCertPool() + certPool.AddCert(certificate) + transportCredentials := credentials.NewClientTLSFromCert(certPool, peerHostAlias) + + connection, err := grpc.NewClient(peerEndpoint, grpc.WithTransportCredentials(transportCredentials)) + if err != nil { + panic(fmt.Errorf("failed to create gRPC connection: %w", err)) + } + + return connection +} + +func newConnectOptions(clientConnection *grpc.ClientConn) (identity.Identity, []client.ConnectOption) { + return newIdentity(), []client.ConnectOption{ + client.WithSign(newSign()), + client.WithHash(hash.SHA256), + client.WithClientConnection(clientConnection), + client.WithEvaluateTimeout(5 * time.Second), + client.WithEndorseTimeout(15 * time.Second), + client.WithSubmitTimeout(5 * time.Second), + client.WithCommitStatusTimeout(1 * time.Minute), + } +} + +func newIdentity() *identity.X509Identity { + certificatePEM, err := os.ReadFile(certPath) + if err != nil { + panic(fmt.Errorf("failed to read certificate file: %w", err)) + } + + certificate, err := identity.CertificateFromPEM(certificatePEM) + if err != nil { + panic(err) + } + + id, err := identity.NewX509Identity(mspID, certificate) + if err != nil { + panic(err) + } + + return id +} + +func newSign() identity.Sign { + privateKeyPEM, err := readFirstFile(keyDirectoryPath) + if err != nil { + panic(fmt.Errorf("failed to read private key file: %w", err)) + } + + privateKey, err := identity.PrivateKeyFromPEM(privateKeyPEM) + if err != nil { + panic(err) + } + + sign, err := identity.NewPrivateKeySign(privateKey) + if err != nil { + panic(err) + } + + return sign +} + +func readFirstFile(dirPath string) ([]byte, error) { + dir, err := os.Open(dirPath) + if err != nil { + return nil, err + } + + fileNames, err := dir.Readdirnames(1) + if err != nil { + return nil, err + } + + return os.ReadFile(path.Join(dirPath, fileNames[0])) +} diff --git a/off_chain_data/application-go/contract/contract.go b/off_chain_data/application-go/contract/contract.go new file mode 100644 index 0000000000..59e27135c3 --- /dev/null +++ b/off_chain_data/application-go/contract/contract.go @@ -0,0 +1,71 @@ +/* + * Copyright 2024 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package contract + +import ( + "fmt" + "strconv" + + "github.com/hyperledger/fabric-gateway/pkg/client" +) + +type AssetTransferBasic struct { + contract *client.Contract +} + +func NewAssetTransferBasic(contract *client.Contract) *AssetTransferBasic { + return &AssetTransferBasic{contract} +} + +func (atb *AssetTransferBasic) CreateAsset(anAsset Asset) error { + if _, err := atb.contract.Submit( + "CreateAsset", + client.WithArguments( + anAsset.ID, + anAsset.Color, + strconv.FormatUint(anAsset.Size, 10), + anAsset.Owner, + strconv.FormatUint(anAsset.AppraisedValue, 10), + )); err != nil { + return fmt.Errorf("in CreateAsset: %w", err) + } + return nil +} + +func (atb *AssetTransferBasic) TransferAsset(id, newOwner string) (string, error) { + result, err := atb.contract.Submit( + "TransferAsset", + client.WithArguments( + id, + newOwner, + ), + ) + if err != nil { + return "", fmt.Errorf("in TransferAsset: %w", err) + } + + return string(result), nil +} + +func (atb *AssetTransferBasic) DeleteAsset(id string) error { + if _, err := atb.contract.Submit( + "DeleteAsset", + client.WithArguments( + id, + ), + ); err != nil { + return fmt.Errorf("in DeleteAsset: %w", err) + } + return nil +} + +func (atb *AssetTransferBasic) GetAllAssets() ([]byte, error) { + result, err := atb.contract.Evaluate("GetAllAssets") + if err != nil { + return []byte{}, fmt.Errorf("in GetAllAssets: %w", err) + } + return result, nil +} diff --git a/off_chain_data/application-go/contract/model.go b/off_chain_data/application-go/contract/model.go new file mode 100644 index 0000000000..70c4e688d2 --- /dev/null +++ b/off_chain_data/application-go/contract/model.go @@ -0,0 +1,45 @@ +/* + * Copyright 2024 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package contract + +import ( + "offChainData/utils" + + "github.com/google/uuid" +) + +var ( + colors = []string{"red", "green", "blue"} + Owners = []string{"alice", "bob", "charlie"} +) + +const ( + maxInitialValue = 1000 + maxInitialSize = 10 +) + +type Asset struct { + ID string `json:"ID"` + Color string `json:"Color"` + Size uint64 `json:"Size"` + Owner string `json:"Owner"` + AppraisedValue uint64 `json:"AppraisedValue"` +} + +func NewAsset() Asset { + id, err := uuid.NewRandom() + if err != nil { + panic(err) + } + + return Asset{ + ID: id.String(), + Color: utils.RandomElement(colors), + Size: uint64(utils.RandomInt(maxInitialSize) + 1), + Owner: utils.RandomElement(Owners), + AppraisedValue: uint64(utils.RandomInt(maxInitialValue) + 1), + } +} diff --git a/off_chain_data/application-go/getAllAssets.go b/off_chain_data/application-go/getAllAssets.go new file mode 100644 index 0000000000..c2ccf2c70f --- /dev/null +++ b/off_chain_data/application-go/getAllAssets.go @@ -0,0 +1,48 @@ +/* + * Copyright 2024 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package main + +import ( + "bytes" + "encoding/json" + "fmt" + atb "offChainData/contract" + + "github.com/hyperledger/fabric-gateway/pkg/client" + "google.golang.org/grpc" +) + +func getAllAssets(clientConnection *grpc.ClientConn) { + id, options := newConnectOptions(clientConnection) + gateway, err := client.Connect(id, options...) + if err != nil { + panic(err) + } + defer gateway.Close() + + contract := gateway.GetNetwork(channelName).GetContract(chaincodeName) + smartContract := atb.NewAssetTransferBasic(contract) + assets, err := smartContract.GetAllAssets() + if err != nil { + panic(err) + } + + if len(assets) == 0 { + fmt.Println("no assets") + return + } + + fmt.Println(formatJSON(assets)) +} + +func formatJSON(data []byte) string { + var result bytes.Buffer + if err := json.Indent(&result, data, "", " "); err != nil { + panic(fmt.Errorf("failed to parse JSON: %w", err)) + } + return result.String() +} diff --git a/off_chain_data/application-go/go.mod b/off_chain_data/application-go/go.mod new file mode 100644 index 0000000000..83adb07dc7 --- /dev/null +++ b/off_chain_data/application-go/go.mod @@ -0,0 +1,20 @@ +module offChainData + +go 1.22.0 + +require ( + github.com/google/uuid v1.6.0 + github.com/hyperledger/fabric-gateway v1.7.0 + github.com/hyperledger/fabric-protos-go-apiv2 v0.3.4 + google.golang.org/grpc v1.68.0-dev + google.golang.org/protobuf v1.35.2 +) + +require ( + github.com/miekg/pkcs11 v1.1.1 // indirect + golang.org/x/crypto v0.29.0 // indirect + golang.org/x/net v0.31.0 // indirect + golang.org/x/sys v0.27.0 // indirect + golang.org/x/text v0.20.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 // indirect +) diff --git a/off_chain_data/application-go/go.sum b/off_chain_data/application-go/go.sum new file mode 100644 index 0000000000..e3d2874e07 --- /dev/null +++ b/off_chain_data/application-go/go.sum @@ -0,0 +1,34 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hyperledger/fabric-gateway v1.7.0 h1:bd1quU8qYPYqYO69m1tPIDSjB+D+u/rBJfE1eWFcpjY= +github.com/hyperledger/fabric-gateway v1.7.0/go.mod h1:TItDGnq71eJcgz5TW+m5Sq3kWGp0AEI1HPCNxj0Eu7k= +github.com/hyperledger/fabric-protos-go-apiv2 v0.3.4 h1:YJrd+gMaeY0/vsN0aS0QkEKTivGoUnSRIXxGJ7KI+Pc= +github.com/hyperledger/fabric-protos-go-apiv2 v0.3.4/go.mod h1:bau/6AJhvEcu9GKKYHlDXAxXKzYNfhP6xu2GXuxEcFk= +github.com/miekg/pkcs11 v1.1.1 h1:Ugu9pdy6vAYku5DEpVWVFPYnzV+bxB+iRdbuFSu7TvU= +github.com/miekg/pkcs11 v1.1.1/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= +golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= +golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo= +golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 h1:LWZqQOEjDyONlF1H6afSWpAL/znlREo2tHfLoe+8LMA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= +google.golang.org/grpc v1.68.0-dev h1:Qao/m2HpklhJt2QbpdRutxyNfRuwM8nGPpmi2UkuEHw= +google.golang.org/grpc v1.68.0-dev/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/off_chain_data/application-go/listen.go b/off_chain_data/application-go/listen.go new file mode 100644 index 0000000000..af3116261d --- /dev/null +++ b/off_chain_data/application-go/listen.go @@ -0,0 +1,93 @@ +package main + +import ( + "context" + "fmt" + "offChainData/parser" + "offChainData/processor" + "offChainData/store" + "offChainData/utils" + "os" + "os/signal" + "sync" + "syscall" + + "github.com/hyperledger/fabric-gateway/pkg/client" + "google.golang.org/grpc" +) + +func listen(clientConnection *grpc.ClientConn) { + id, options := newConnectOptions(clientConnection) + gateway, err := client.Connect(id, options...) + if err != nil { + panic(err) + } + defer func() { + gateway.Close() + fmt.Println("Gateway closed.") + }() + + checkpointFile := utils.EnvOrDefault("CHECKPOINT_FILE", "checkpoint.json") + checkpointer, err := client.NewFileCheckpointer(checkpointFile) + if err != nil { + panic(err) + } + defer func() { + checkpointer.Close() + fmt.Println("Checkpointer closed.") + }() + + fmt.Println("Start event listening from block", checkpointer.BlockNumber()) + fmt.Println("Last processed transaction ID within block:", checkpointer.TransactionID()) + if store.SimulatedFailureCount > 0 { + fmt.Printf("Simulating a write failure every %d transactions\n", store.SimulatedFailureCount) + } + + ctx, close := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer func() { + close() + fmt.Println("Context closed.") + }() + + network := gateway.GetNetwork(channelName) + blocks, err := network.BlockEvents( + ctx, + // Used only if there is no checkpoint block number. + // Order matters. WithStartBlock must be set before + // WithCheckpoint to work. + client.WithStartBlock(0), + client.WithCheckpoint(checkpointer), + ) + if err != nil { + panic(err) + } + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + + for blockProto := range blocks { + select { + case <-ctx.Done(): + return + default: + blockProcessor := processor.NewBlock( + parser.ParseBlock(blockProto), + checkpointer, + store.ApplyWritesToOffChainStore, + channelName, + ) + + if err := blockProcessor.Process(); err != nil { + fmt.Println("\033[31m[ERROR]\033[0m", err) + return + } + } + } + }() + + wg.Wait() + fmt.Println("\nShutting down listener gracefully...") +} diff --git a/off_chain_data/application-go/parser/block.go b/off_chain_data/application-go/parser/block.go new file mode 100644 index 0000000000..ec105eea0e --- /dev/null +++ b/off_chain_data/application-go/parser/block.go @@ -0,0 +1,126 @@ +package parser + +import ( + "fmt" + "offChainData/utils" + + "github.com/hyperledger/fabric-protos-go-apiv2/common" + "google.golang.org/protobuf/proto" +) + +type Block struct { + block *common.Block + transactions []*Transaction +} + +func ParseBlock(block *common.Block) *Block { + return &Block{block, []*Transaction{}} +} + +func (b *Block) Number() (uint64, error) { + header, err := utils.AssertDefined(b.block.GetHeader(), "missing block header") + if err != nil { + return 0, fmt.Errorf("in Number: %w", err) + } + return header.GetNumber(), nil +} + +func (b *Block) Transactions() ([]*Transaction, error) { + return utils.Cache(func() ([]*Transaction, error) { + funcName := "Transactions" + envelopes, err := b.unmarshalEnvelopesFromBlockData() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + commonPayloads, err := b.unmarshalPayloadsFrom(envelopes) + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + payloads, err := b.parse(commonPayloads) + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + return b.createTransactionsFrom(payloads), nil + })() +} + +func (b *Block) unmarshalEnvelopesFromBlockData() ([]*common.Envelope, error) { + result := []*common.Envelope{} + for _, blockData := range b.block.GetData().GetData() { + envelope := &common.Envelope{} + if err := proto.Unmarshal(blockData, envelope); err != nil { + return nil, fmt.Errorf("in unmarshalEnvelopesFromBlockData: %w", err) + } + result = append(result, envelope) + } + return result, nil +} + +func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) ([]*common.Payload, error) { + result := []*common.Payload{} + for _, envelope := range envelopes { + commonPayload := &common.Payload{} + if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil { + return nil, fmt.Errorf("in unmarshalPayloadsFrom: %w", err) + } + result = append(result, commonPayload) + } + return result, nil +} + +func (b *Block) parse(commonPayloads []*common.Payload) ([]*payload, error) { + funcName := "parse" + + validationCodes, err := b.extractTransactionValidationCodes() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + result := []*payload{} + for i, commonPayload := range commonPayloads { + statusCode, err := utils.AssertDefined( + validationCodes[i], + fmt.Sprint("missing validation code index", i), + ) + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + payload := parsePayload(commonPayload, int32(statusCode)) + is, err := payload.isEndorserTransaction() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + if is { + result = append(result, payload) + } + } + + return result, nil +} + +func (b *Block) extractTransactionValidationCodes() ([]byte, error) { + metadata, err := utils.AssertDefined( + b.block.GetMetadata(), + "missing block metadata", + ) + if err != nil { + return nil, fmt.Errorf("in extractTransactionValidationCodes: %w", err) + } + + return utils.AssertDefined( + metadata.GetMetadata()[common.BlockMetadataIndex_TRANSACTIONS_FILTER], + "missing transaction validation code", + ) +} + +func (*Block) createTransactionsFrom(payloads []*payload) []*Transaction { + result := []*Transaction{} + for _, payload := range payloads { + result = append(result, newTransaction(payload)) + } + return result +} diff --git a/off_chain_data/application-go/parser/block_test.go b/off_chain_data/application-go/parser/block_test.go new file mode 100644 index 0000000000..b8546adf0a --- /dev/null +++ b/off_chain_data/application-go/parser/block_test.go @@ -0,0 +1,139 @@ +package parser + +import ( + "encoding/json" + "testing" + + atb "offChainData/contract" + + "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" + "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset/kvrwset" + "github.com/hyperledger/fabric-protos-go-apiv2/peer" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" +) + +func Test_GetReadWriteSetsFromEndorserTransaction(t *testing.T) { + nsReadWriteSetFake, expectedNamespace, expectedAsset := nsReadWriteSetFake() + + transaction := &peer.Transaction{ + Actions: []*peer.TransactionAction{ + { + Payload: protoMarshalOrPanic(&peer.ChaincodeActionPayload{ + Action: &peer.ChaincodeEndorsedAction{ + ProposalResponsePayload: protoMarshalOrPanic(&peer.ProposalResponsePayload{ + Extension: protoMarshalOrPanic(&peer.ChaincodeAction{ + Results: protoMarshalOrPanic(&rwset.TxReadWriteSet{ + NsRwset: []*rwset.NsReadWriteSet{nsReadWriteSetFake}, + }), + }), + }), + }, + }), + }, + }, + } + + parsedEndorserTransaction := parseEndorserTransaction(transaction) + readWriteSets, err := parsedEndorserTransaction.readWriteSets() + if err != nil { + t.Fatal("unexpected error:", err) + } + + if len(readWriteSets) != 1 { + t.Fatal("expected 1 ReadWriteSet, got", len(readWriteSets)) + } + + assertReadWriteSet( + readWriteSets[0].namespaceReadWriteSets()[0], + expectedNamespace, + expectedAsset, + t, + ) +} + +func assertReadWriteSet( + parsedNsRwSet *NamespaceReadWriteSet, + expectedNamespace string, + expectedAsset atb.Asset, + t *testing.T, +) { + if parsedNsRwSet.Namespace() != expectedNamespace { + t.Errorf("expected namespace %s, got %s", expectedNamespace, parsedNsRwSet.Namespace()) + } + + actualKVRWSet, err := parsedNsRwSet.ReadWriteSet() + if err != nil { + t.Fatal("unexpected error:", err) + } + if len(actualKVRWSet.Writes) != 1 { + t.Fatal("expected 1 write, got", len(actualKVRWSet.Writes)) + } + + actualWrite := actualKVRWSet.Writes[0] + if actualWrite.GetKey() != expectedAsset.ID { + t.Errorf("expected key %s, got %s", expectedAsset.ID, actualWrite.GetKey()) + } + + if string(actualWrite.GetValue()) != string(jsonMarshalOrPanic(expectedAsset)) { + t.Errorf("expected value %s, got %s", jsonMarshalOrPanic(expectedAsset), actualWrite.GetValue()) + } +} + +func Test_ReadWriteSetWrapping(t *testing.T) { + nsReadWriteSetFake, _, _ := nsReadWriteSetFake() + + txReadWriteSetFake := &rwset.TxReadWriteSet{ + NsRwset: []*rwset.NsReadWriteSet{nsReadWriteSetFake}, + } + + parsedRwSet := parseReadWriteSet(txReadWriteSetFake) + if len(parsedRwSet.namespaceReadWriteSets()) != 1 { + t.Fatalf("Expected 1 NamespaceReadWriteSet, got %d", len(parsedRwSet.namespaceReadWriteSets())) + } +} + +func Test_NamespaceReadWriteSetParsing(t *testing.T) { + nsReadWriteSetFake, expectedNamespace, expectedAsset := nsReadWriteSetFake() + + parsedNsRwSet := parseNamespaceReadWriteSet(nsReadWriteSetFake) + assertReadWriteSet( + parsedNsRwSet, + expectedNamespace, + expectedAsset, + t, + ) +} + +func nsReadWriteSetFake() (*rwset.NsReadWriteSet, string, atb.Asset) { + expectedNamespace := "basic" + expectedAsset := atb.NewAsset() + + result := &rwset.NsReadWriteSet{ + Namespace: expectedNamespace, + Rwset: protoMarshalOrPanic(&kvrwset.KVRWSet{ + Writes: []*kvrwset.KVWrite{{ + Key: expectedAsset.ID, + Value: []byte(jsonMarshalOrPanic(expectedAsset)), + }}, + }), + } + + return result, expectedNamespace, expectedAsset +} + +func protoMarshalOrPanic(v protoreflect.ProtoMessage) []byte { + result, err := proto.Marshal(v) + if err != nil { + panic(err) + } + return result +} + +func jsonMarshalOrPanic(v any) []byte { + result, err := json.Marshal(v) + if err != nil { + panic(err) + } + return result +} diff --git a/off_chain_data/application-go/parser/endorserTransaction.go b/off_chain_data/application-go/parser/endorserTransaction.go new file mode 100644 index 0000000000..161775d695 --- /dev/null +++ b/off_chain_data/application-go/parser/endorserTransaction.go @@ -0,0 +1,127 @@ +package parser + +import ( + "fmt" + "offChainData/utils" + + "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" + "github.com/hyperledger/fabric-protos-go-apiv2/peer" + "google.golang.org/protobuf/proto" +) + +type endorserTransaction struct { + transaction *peer.Transaction +} + +func parseEndorserTransaction(transaction *peer.Transaction) *endorserTransaction { + return &endorserTransaction{transaction} +} + +func (p *endorserTransaction) readWriteSets() ([]*readWriteSet, error) { + return utils.Cache(func() ([]*readWriteSet, error) { + funcName := "readWriteSets" + chaincodeActionPayloads, err := p.unmarshalChaincodeActionPayloads() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + chaincodeEndorsedActions, err := p.extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads) + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + proposalResponsePayloads, err := p.unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions) + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + chaincodeActions, err := p.unmarshalChaincodeActionsFrom(proposalResponsePayloads) + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + txReadWriteSets, err := p.unmarshalTxReadWriteSetsFrom(chaincodeActions) + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + return p.parseReadWriteSets(txReadWriteSets), nil + })() +} + +func (p *endorserTransaction) unmarshalChaincodeActionPayloads() ([]*peer.ChaincodeActionPayload, error) { + result := []*peer.ChaincodeActionPayload{} + for _, transactionAction := range p.transaction.GetActions() { + chaincodeActionPayload := &peer.ChaincodeActionPayload{} + if err := proto.Unmarshal(transactionAction.GetPayload(), chaincodeActionPayload); err != nil { + return nil, fmt.Errorf("in unmarshalChaincodeActionPayloads: %w", err) + } + + result = append(result, chaincodeActionPayload) + } + return result, nil +} + +func (*endorserTransaction) extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads []*peer.ChaincodeActionPayload) ([]*peer.ChaincodeEndorsedAction, error) { + result := []*peer.ChaincodeEndorsedAction{} + for _, payload := range chaincodeActionPayloads { + chaincodeEndorsedAction, err := utils.AssertDefined( + payload.GetAction(), + "missing chaincode endorsed action", + ) + if err != nil { + return nil, fmt.Errorf("in extractChaincodeEndorsedActionsFrom: %w", err) + } + + result = append( + result, + chaincodeEndorsedAction, + ) + } + return result, nil +} + +func (*endorserTransaction) unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions []*peer.ChaincodeEndorsedAction) ([]*peer.ProposalResponsePayload, error) { + result := []*peer.ProposalResponsePayload{} + for _, endorsedAction := range chaincodeEndorsedActions { + proposalResponsePayload := &peer.ProposalResponsePayload{} + if err := proto.Unmarshal(endorsedAction.GetProposalResponsePayload(), proposalResponsePayload); err != nil { + return nil, fmt.Errorf("in unmarshalProposalResponsePayloadsFrom: %w", err) + } + result = append(result, proposalResponsePayload) + } + return result, nil +} + +func (*endorserTransaction) unmarshalChaincodeActionsFrom(proposalResponsePayloads []*peer.ProposalResponsePayload) ([]*peer.ChaincodeAction, error) { + result := []*peer.ChaincodeAction{} + for _, proposalResponsePayload := range proposalResponsePayloads { + chaincodeAction := &peer.ChaincodeAction{} + if err := proto.Unmarshal(proposalResponsePayload.GetExtension(), chaincodeAction); err != nil { + return nil, fmt.Errorf("in unmarshalChaincodeActionsFrom: %w", err) + } + result = append(result, chaincodeAction) + } + return result, nil +} + +func (*endorserTransaction) unmarshalTxReadWriteSetsFrom(chaincodeActions []*peer.ChaincodeAction) ([]*rwset.TxReadWriteSet, error) { + result := []*rwset.TxReadWriteSet{} + for _, chaincodeAction := range chaincodeActions { + txReadWriteSet := &rwset.TxReadWriteSet{} + if err := proto.Unmarshal(chaincodeAction.GetResults(), txReadWriteSet); err != nil { + return nil, fmt.Errorf("in unmarshalTxReadWriteSetsFrom: %w", err) + } + result = append(result, txReadWriteSet) + } + return result, nil +} + +func (*endorserTransaction) parseReadWriteSets(txReadWriteSets []*rwset.TxReadWriteSet) []*readWriteSet { + result := []*readWriteSet{} + for _, txReadWriteSet := range txReadWriteSets { + parsedReadWriteSet := parseReadWriteSet(txReadWriteSet) + result = append(result, parsedReadWriteSet) + } + return result +} diff --git a/off_chain_data/application-go/parser/namespaceReadWriteSet.go b/off_chain_data/application-go/parser/namespaceReadWriteSet.go new file mode 100644 index 0000000000..4fdd8a91a2 --- /dev/null +++ b/off_chain_data/application-go/parser/namespaceReadWriteSet.go @@ -0,0 +1,33 @@ +package parser + +import ( + "fmt" + "offChainData/utils" + + "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" + "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset/kvrwset" + "google.golang.org/protobuf/proto" +) + +type NamespaceReadWriteSet struct { + nsReadWriteSet *rwset.NsReadWriteSet +} + +func parseNamespaceReadWriteSet(nsRwSet *rwset.NsReadWriteSet) *NamespaceReadWriteSet { + return &NamespaceReadWriteSet{nsRwSet} +} + +func (p *NamespaceReadWriteSet) Namespace() string { + return p.nsReadWriteSet.GetNamespace() +} + +func (p *NamespaceReadWriteSet) ReadWriteSet() (*kvrwset.KVRWSet, error) { + return utils.Cache(func() (*kvrwset.KVRWSet, error) { + result := kvrwset.KVRWSet{} + if err := proto.Unmarshal(p.nsReadWriteSet.GetRwset(), &result); err != nil { + return nil, fmt.Errorf("in ReadWriteSet: %w", err) + } + + return &result, nil + })() +} diff --git a/off_chain_data/application-go/parser/payload.go b/off_chain_data/application-go/parser/payload.go new file mode 100644 index 0000000000..f8ba31056f --- /dev/null +++ b/off_chain_data/application-go/parser/payload.go @@ -0,0 +1,73 @@ +package parser + +import ( + "fmt" + "offChainData/utils" + + "github.com/hyperledger/fabric-protos-go-apiv2/common" + "github.com/hyperledger/fabric-protos-go-apiv2/peer" + "google.golang.org/protobuf/proto" +) + +type payload struct { + commonPayload *common.Payload + statusCode int32 +} + +func parsePayload(commonPayload *common.Payload, statusCode int32) *payload { + return &payload{commonPayload, statusCode} +} + +func (p *payload) channelHeader() (*common.ChannelHeader, error) { + return utils.Cache(func() (*common.ChannelHeader, error) { + funcName := "channelHeader" + + header, err := utils.AssertDefined(p.commonPayload.GetHeader(), "missing payload header") + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + result := &common.ChannelHeader{} + if err := proto.Unmarshal(header.GetChannelHeader(), result); err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + return result, nil + })() +} + +func (p *payload) endorserTransaction() (*endorserTransaction, error) { + funcName := "endorserTransaction" + + is, err := p.isEndorserTransaction() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + if !is { + channelHeader, err := p.channelHeader() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + return nil, fmt.Errorf("unexpected payload type: %d", channelHeader.GetType()) + } + + result := &peer.Transaction{} + if err := proto.Unmarshal(p.commonPayload.GetData(), result); err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + return parseEndorserTransaction(result), nil +} + +func (p *payload) isEndorserTransaction() (bool, error) { + channelHeader, err := p.channelHeader() + if err != nil { + return false, fmt.Errorf("in isEndorserTransaction: %w", err) + } + + return channelHeader.GetType() == int32(common.HeaderType_ENDORSER_TRANSACTION), nil +} + +func (p *payload) isValid() bool { + return p.statusCode == int32(peer.TxValidationCode_VALID) +} diff --git a/off_chain_data/application-go/parser/readWriteSet.go b/off_chain_data/application-go/parser/readWriteSet.go new file mode 100644 index 0000000000..d795c0b8de --- /dev/null +++ b/off_chain_data/application-go/parser/readWriteSet.go @@ -0,0 +1,22 @@ +package parser + +import ( + "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" +) + +type readWriteSet struct { + readWriteSet *rwset.TxReadWriteSet +} + +func parseReadWriteSet(rwSet *rwset.TxReadWriteSet) *readWriteSet { + return &readWriteSet{rwSet} +} + +func (p *readWriteSet) namespaceReadWriteSets() []*NamespaceReadWriteSet { + result := []*NamespaceReadWriteSet{} + for _, nsReadWriteSet := range p.readWriteSet.GetNsRwset() { + parsedNamespaceReadWriteSet := parseNamespaceReadWriteSet(nsReadWriteSet) + result = append(result, parsedNamespaceReadWriteSet) + } + return result +} diff --git a/off_chain_data/application-go/parser/transaction.go b/off_chain_data/application-go/parser/transaction.go new file mode 100644 index 0000000000..7bead91413 --- /dev/null +++ b/off_chain_data/application-go/parser/transaction.go @@ -0,0 +1,44 @@ +package parser + +import ( + "fmt" + + "github.com/hyperledger/fabric-protos-go-apiv2/common" +) + +type Transaction struct { + payload *payload +} + +func newTransaction(payload *payload) *Transaction { + return &Transaction{payload} +} + +func (t *Transaction) ChannelHeader() (*common.ChannelHeader, error) { + return t.payload.channelHeader() +} + +func (t *Transaction) NamespaceReadWriteSets() ([]*NamespaceReadWriteSet, error) { + funcName := "NamespaceReadWriteSets" + + endorserTransaction, err := t.payload.endorserTransaction() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + txReadWriteSets, err := endorserTransaction.readWriteSets() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + result := []*NamespaceReadWriteSet{} + for _, readWriteSet := range txReadWriteSets { + result = append(result, readWriteSet.namespaceReadWriteSets()...) + } + + return result, nil +} + +func (t *Transaction) IsValid() bool { + return t.payload.isValid() +} diff --git a/off_chain_data/application-go/processor/block.go b/off_chain_data/application-go/processor/block.go new file mode 100644 index 0000000000..530bf23f57 --- /dev/null +++ b/off_chain_data/application-go/processor/block.go @@ -0,0 +1,147 @@ +package processor + +import ( + "fmt" + "offChainData/parser" + "offChainData/store" + + "github.com/hyperledger/fabric-gateway/pkg/client" +) + +type block struct { + parsedBlock *parser.Block + checkpointer *client.FileCheckpointer + writeToStore store.Writer + channelName string +} + +func NewBlock( + parsedBlock *parser.Block, + checkpointer *client.FileCheckpointer, + writeToStore store.Writer, + channelName string, +) *block { + return &block{ + parsedBlock, + checkpointer, + writeToStore, + channelName, + } +} + +func (b *block) Process() error { + funcName := "Process" + + blockNumber, err := b.parsedBlock.Number() + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + + fmt.Println("\nReceived block", blockNumber) + + validTransactions, err := b.validTransactions() + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + + for _, validTransaction := range validTransactions { + aTransaction := transaction{ + blockNumber, + validTransaction, + // TODO use pointer to parent and get blockNumber, store and channelName from parent + b.writeToStore, + b.channelName, + } + if err := aTransaction.process(); err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + + channelHeader, err := validTransaction.ChannelHeader() + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + transactionId := channelHeader.GetTxId() + b.checkpointer.CheckpointTransaction(blockNumber, transactionId) + } + + b.checkpointer.CheckpointBlock(blockNumber) + + return nil +} + +func (b *block) validTransactions() ([]*parser.Transaction, error) { + result := []*parser.Transaction{} + newTransactions, err := b.getNewTransactions() + if err != nil { + return nil, fmt.Errorf("in validTransactions: %w", err) + } + + for _, transaction := range newTransactions { + if transaction.IsValid() { + result = append(result, transaction) + } + } + return result, nil +} + +func (b *block) getNewTransactions() ([]*parser.Transaction, error) { + funcName := "getNewTransactions" + + transactions, err := b.parsedBlock.Transactions() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + lastTransactionId := b.checkpointer.TransactionID() + if lastTransactionId == "" { + // No previously processed transactions within this block so all are new + return transactions, nil + } + + // Ignore transactions up to the last processed transaction ID + lastProcessedIndex, err := b.findLastProcessedIndex() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + return transactions[lastProcessedIndex+1:], nil +} + +func (b *block) findLastProcessedIndex() (int, error) { + funcName := "findLastProcessedIndex" + + transactions, err := b.parsedBlock.Transactions() + if err != nil { + return 0, fmt.Errorf("in %s: %w", funcName, err) + } + + blockTransactionIds := []string{} + for _, transaction := range transactions { + channelHeader, err := transaction.ChannelHeader() + if err != nil { + return 0, fmt.Errorf("in %s: %w", funcName, err) + } + blockTransactionIds = append(blockTransactionIds, channelHeader.GetTxId()) + } + + lastTransactionId := b.checkpointer.TransactionID() + lastProcessedIndex := -1 + for index, id := range blockTransactionIds { + if id == lastTransactionId { + lastProcessedIndex = index + } + } + + if lastProcessedIndex < 0 { + blockNumber, err := b.parsedBlock.Number() + if err != nil { + return 0, fmt.Errorf("in %s: %w", funcName, err) + } + return lastProcessedIndex, newTxIdNotFoundError( + lastTransactionId, + blockNumber, + blockTransactionIds, + ) + } + + return lastProcessedIndex, nil +} diff --git a/off_chain_data/application-go/processor/transaction.go b/off_chain_data/application-go/processor/transaction.go new file mode 100644 index 0000000000..ec9c960887 --- /dev/null +++ b/off_chain_data/application-go/processor/transaction.go @@ -0,0 +1,104 @@ +package processor + +import ( + "fmt" + "offChainData/parser" + "offChainData/store" + "slices" +) + +type transaction struct { + blockNumber uint64 + transaction *parser.Transaction + writeToStore store.Writer + channelName string +} + +func (t *transaction) process() error { + funcName := "process" + + channelHeader, err := t.transaction.ChannelHeader() + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + transactionId := channelHeader.GetTxId() + + writes, err := t.writes() + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + + if len(writes) == 0 { + fmt.Println("Skipping read-only or system transaction", transactionId) + return nil + } + + fmt.Println("Process transaction", transactionId) + + if err := t.writeToStore(store.LedgerUpdate{ + BlockNumber: t.blockNumber, + TransactionId: transactionId, + Writes: writes, + }); err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + + return nil +} + +func (t *transaction) writes() ([]store.Write, error) { + funcName := "writes" + // TODO this entire code should live in the parser and just return the kvWrite which + // we then map to store.Write and return + channelHeader, err := t.transaction.ChannelHeader() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + t.channelName = channelHeader.GetChannelId() + + nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + nonSystemCCReadWriteSets := []*parser.NamespaceReadWriteSet{} + for _, nsReadWriteSet := range nsReadWriteSets { + if !t.isSystemChaincode(nsReadWriteSet.Namespace()) { + nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet) + } + } + + writes := []store.Write{} + for _, readWriteSet := range nonSystemCCReadWriteSets { + namespace := readWriteSet.Namespace() + + kvReadWriteSet, err := readWriteSet.ReadWriteSet() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + for _, kvWrite := range kvReadWriteSet.GetWrites() { + writes = append(writes, store.Write{ + ChannelName: t.channelName, + Namespace: namespace, + Key: kvWrite.GetKey(), + IsDelete: kvWrite.GetIsDelete(), + Value: string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output + }) + } + } + + return writes, nil +} + +func (t *transaction) isSystemChaincode(chaincodeName string) bool { + systemChaincodeNames := []string{ + "_lifecycle", + "cscc", + "escc", + "lscc", + "qscc", + "vscc", + } + return slices.Contains(systemChaincodeNames, chaincodeName) +} diff --git a/off_chain_data/application-go/processor/txIdNotFoundError.go b/off_chain_data/application-go/processor/txIdNotFoundError.go new file mode 100644 index 0000000000..7a8f230a71 --- /dev/null +++ b/off_chain_data/application-go/processor/txIdNotFoundError.go @@ -0,0 +1,32 @@ +package processor + +import "fmt" + +type txIdNotFoundError struct { + txId string + blockNumber uint64 + blockTxIds []string +} + +func newTxIdNotFoundError(txId string, blockNumber uint64, blockTxIds []string) *txIdNotFoundError { + return &txIdNotFoundError{ + txId, blockNumber, blockTxIds, + } +} + +func (t *txIdNotFoundError) Error() string { + format := "checkpoint transaction ID %s not found in block %d containing transactions: %s" + return fmt.Sprintf(format, t.txId, t.blockNumber, t.blockTxIdsJoinedByComma()) +} + +func (t *txIdNotFoundError) blockTxIdsJoinedByComma() string { + result := "" + for index, item := range t.blockTxIds { + if len(t.blockTxIds)-1 == index { + result += item + } else { + result += item + ", " + } + } + return result +} diff --git a/off_chain_data/application-go/store/flatFille.go b/off_chain_data/application-go/store/flatFille.go new file mode 100644 index 0000000000..81794b6cce --- /dev/null +++ b/off_chain_data/application-go/store/flatFille.go @@ -0,0 +1,78 @@ +package store + +import ( + "encoding/json" + "errors" + "fmt" + "math" + "offChainData/utils" + "os" + "strconv" + "strings" +) + +var storeFile = utils.EnvOrDefault("STORE_FILE", "store.log") +var SimulatedFailureCount = getSimulatedFailureCount() +var transactionCount uint = 0 // Used only to simulate failures + +// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. +// This implementation just writes to a file. +func ApplyWritesToOffChainStore(data LedgerUpdate) error { + funcName := "ApplyWritesToOffChainStore" + + if err := simulateFailureIfRequired(); err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + + writes := []string{} + for _, write := range data.Writes { + marshaled, err := json.Marshal(write) + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + + writes = append(writes, string(marshaled)) + } + + f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + + if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil { + f.Close() + return fmt.Errorf("in %s: %w", funcName, err) + } + + if err := f.Close(); err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + + return nil +} + +func simulateFailureIfRequired() error { + if SimulatedFailureCount > 0 && transactionCount >= SimulatedFailureCount { + transactionCount = 0 + return errors.New("expected error: simulated write failure") + } + + transactionCount += 1 + + return nil +} + +func getSimulatedFailureCount() uint { + valueAsString := utils.EnvOrDefault("SIMULATED_FAILURE_COUNT", "0") + valueAsFloat, err := strconv.ParseFloat(valueAsString, 64) + if err != nil { + panic(err) + } + + result := math.Floor(valueAsFloat) + if valueAsFloat < 0 { + panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString)) + } + + return uint(result) +} diff --git a/off_chain_data/application-go/store/model.go b/off_chain_data/application-go/store/model.go new file mode 100644 index 0000000000..a68efbea8c --- /dev/null +++ b/off_chain_data/application-go/store/model.go @@ -0,0 +1,25 @@ +package store + +// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. +type Writer = func(data LedgerUpdate) error + +// Ledger update made by a specific transaction. +type LedgerUpdate struct { + BlockNumber uint64 + TransactionId string + Writes []Write +} + +// Description of a ledger Write that can be applied to an off-chain data store. +type Write struct { + // Channel whose ledger is being updated. + ChannelName string `json:"channelName"` + // Namespace within the ledger. + Namespace string `json:"namespace"` + // Key name within the ledger namespace. + Key string `json:"key"` + // Whether the key and associated value are being deleted. + IsDelete bool `json:"isDelete"` + // If `isDelete` is false, the Value written to the key; otherwise ignored. + Value string `json:"value"` +} diff --git a/off_chain_data/application-go/transact.go b/off_chain_data/application-go/transact.go new file mode 100644 index 0000000000..532aa09871 --- /dev/null +++ b/off_chain_data/application-go/transact.go @@ -0,0 +1,89 @@ +package main + +import ( + "fmt" + "sync" + + atb "offChainData/contract" + "offChainData/utils" + + "github.com/hyperledger/fabric-gateway/pkg/client" + "google.golang.org/grpc" +) + +func transact(clientConnection *grpc.ClientConn) { + id, options := newConnectOptions(clientConnection) + gateway, err := client.Connect(id, options...) + if err != nil { + panic((err)) + } + defer func() { + gateway.Close() + fmt.Println("Gateway closed.") + }() + + contract := gateway.GetNetwork(channelName).GetContract(chaincodeName) + + smartContract := atb.NewAssetTransferBasic(contract) + app := newTransactApp(smartContract) + app.run() +} + +type transactApp struct { + smartContract *atb.AssetTransferBasic + batchSize int +} + +func newTransactApp(smartContract *atb.AssetTransferBasic) *transactApp { + return &transactApp{smartContract, 10} +} + +func (t *transactApp) run() { + var wg sync.WaitGroup + + for i := 0; i < t.batchSize; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + if err := t.transact(); err != nil { + fmt.Println("\033[31m[ERROR]\033[0m", err) + return + } + }() + } + + wg.Wait() +} + +func (t *transactApp) transact() error { + funcName := "transact" + + anAsset := atb.NewAsset() + + err := t.smartContract.CreateAsset(anAsset) + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + fmt.Println("Created asset", anAsset.ID) + + // Transfer randomly 1 in 2 assets to a new owner. + if utils.RandomInt(2) == 0 { + newOwner := utils.DifferentElement(atb.Owners, anAsset.Owner) + oldOwner, err := t.smartContract.TransferAsset(anAsset.ID, newOwner) + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + fmt.Printf("Transferred asset %s from %s to %s\n", anAsset.ID, oldOwner, newOwner) + } + + // Delete randomly 1 in 4 created assets. + if utils.RandomInt(4) == 0 { + err := t.smartContract.DeleteAsset(anAsset.ID) + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + fmt.Println("Deleted asset", anAsset.ID) + } + return nil +} diff --git a/off_chain_data/application-go/utils/utils.go b/off_chain_data/application-go/utils/utils.go new file mode 100644 index 0000000000..c3714931ec --- /dev/null +++ b/off_chain_data/application-go/utils/utils.go @@ -0,0 +1,74 @@ +package utils + +import ( + "crypto/rand" + "errors" + "fmt" + "math/big" + "os" +) + +// Pick a random element from an array. +func RandomElement(values []string) string { + result := values[RandomInt(len(values))] + return result +} + +// Generate a random integer in the range 0 to max - 1. +func RandomInt(max int) int { + result, err := rand.Int(rand.Reader, big.NewInt(int64(max))) + if err != nil { + panic(err) + } + + return int(result.Int64()) +} + +// Pick a random element from an array, excluding the current value. +func DifferentElement(values []string, currentValue string) string { + candidateValues := []string{} + for _, v := range values { + if v != currentValue { + candidateValues = append(candidateValues, v) + } + } + return RandomElement(candidateValues) +} + +// Return the value if it is defined; otherwise panics with an error message. +func AssertDefined[T any](value T, message string) (T, error) { + if any(value) == any(nil) { + var zeroValue T + return zeroValue, errors.New(message) + } + + return value, nil +} + +// Wrap a function call with a cache. On first call the wrapped function is invoked to +// obtain a result. Subsequent calls return the cached result. +func Cache[T any](f func() (T, error)) func() (T, error) { + var value T + var err error + var cached bool + + return func() (T, error) { + if !cached { + value, err = f() + if err != nil { + var zeroValue T + return zeroValue, fmt.Errorf("in Cache: %w", err) + } + cached = true + } + return value, nil + } +} + +func EnvOrDefault(key, defaultValue string) string { + result := os.Getenv(key) + if result == "" { + return defaultValue + } + return result +} diff --git a/off_chain_data/application-go/utils/utils_test.go b/off_chain_data/application-go/utils/utils_test.go new file mode 100644 index 0000000000..d806c6e40c --- /dev/null +++ b/off_chain_data/application-go/utils/utils_test.go @@ -0,0 +1,100 @@ +package utils_test + +import ( + "errors" + "offChainData/utils" + "testing" +) + +func Test_cachePrimitiveFunctionResult(t *testing.T) { + counter := 0 + f := func() (int, error) { + counter++ + return 5, nil + } + + cachedFunc := utils.Cache(f) + result1, err := cachedFunc() + if err != nil { + t.Fatal("unexpected error:", err) + } + result2, err := cachedFunc() + if err != nil { + t.Fatal("unexpected error:", err) + } + + if counter != 1 { + t.Error("expected counter to be 1, but got", counter) + } + + if result1 != 5 || result2 != 5 { + t.Fatal("expected results to be 5, but got", result1, result2) + } +} + +func Test_whenCachedFunctionsErrors_returnError(t *testing.T) { + errorMsg := "error" + f := func() (int, error) { + return 0, errors.New(errorMsg) + } + + cachedFunc := utils.Cache(f) + _, err := cachedFunc() + if err == nil { + t.Fatal("expected error, but got", err) + } + + if err.Error() != errorMsg { + t.Fatal("expected error message to be 'error', but got", err) + } +} + +func Test_cacheWrappedPrimitiveFunctionResult(t *testing.T) { + controlValue := 5 + multiplyControlValueBy := func(n int) (int, error) { controlValue *= n; return controlValue, nil } + + cachedFunc := utils.Cache(func() (int, error) { return multiplyControlValueBy(5) }) + result1, err := cachedFunc() + if err != nil { + t.Fatal("unexpected error:", err) + } + result2, err := cachedFunc() + if err != nil { + t.Fatal("unexpected error:", err) + } + + if controlValue != 25 { + t.Error("expected control value to be 25, but got", controlValue) + } + + if result1 != 25 || result2 != 25 { + t.Fatal("expected cached results to be 25, but got", result1, result2) + } +} + +func Test_cacheWrappedDataStructureResult(t *testing.T) { + type GreetMe struct { + helloTo string + } + + controlStruct := &GreetMe{helloTo: "Hello "} + greet := func(name string) (*GreetMe, error) { controlStruct.helloTo += name; return controlStruct, nil } + + cachedFunc := utils.Cache(func() (*GreetMe, error) { return greet("John Doe") }) + result1, err := cachedFunc() + if err != nil { + t.Fatal("unexpected error:", err) + } + result2, err := cachedFunc() + if err != nil { + t.Fatal("unexpected error:", err) + } + + if controlStruct.helloTo != "Hello John Doe" { + t.Error("expected control value to be 'Hello John Doe', but got", controlStruct) + } + + if result1.helloTo != "Hello John Doe" || result2.helloTo != "Hello John Doe" { + t.Fatal("expected cached results to be 'Hello John Doe', but got", result1.helloTo, result2.helloTo) + } +} diff --git a/off_chain_data/application-typescript/src/utils.ts b/off_chain_data/application-typescript/src/utils.ts index e798a20123..756b2e3a40 100644 --- a/off_chain_data/application-typescript/src/utils.ts +++ b/off_chain_data/application-typescript/src/utils.ts @@ -67,7 +67,7 @@ export function assertDefined(value: T | null | undefined, message: string): */ export function cache(f: () => T): () => T { let value: T | undefined; - return () => { + return (): T => { if (value === undefined) { value = f(); }