diff --git a/README.md b/README.md index e69f5871..d7e5c765 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,10 @@ # Hyperledger FireFly Transaction Manager -Plugable microservice component of Hyperledger FireFly, responsible for: +The core component of the FireFly Connector Framework for Blockchains, responsible for: - Submission of transactions to blockchains of all types + - Nonce management - idempotent submission of transactions, and assignment of nonces - Protocol connectivity decoupled with additional lightweight API connector - Easy to add additional protocols that conform to normal patterns of TX submission / events @@ -16,12 +17,96 @@ Plugable microservice component of Hyperledger FireFly, responsible for: - Extensible policy engine - Gas station API integration -- Event streaming [* work in progress to extract from previous location in ethconnect] +- Event streaming - Protocol agnostic event polling/streaming support - Reliable checkpoint restart - At least once delivery API -![Hyperledger FireFly Transaction Manager](./images/firefly_transaction_manager.jpg) +## Architecture + +The core architecture of the FireFly Connector Framework is as follows: + +[![Hyperledger FireFly Transaction Manager](./images/firefly_connector_framework_architecture.jpg)](./images/firefly_connector_framework_architecture.jpg) + +This re-usable codebase contains as much as possible of the re-usable heavy lifting code needed across any blockchain. + +The framework is currently constrained to blockchains that adhere to certain basic principals: + +1. Has transactions + - That are signed + - That can optionally have gas semantics (limits and prices, expressed in a blockchain specific way) +2. Has events (or "logs") + - That are emitted as a deterministic outcome of transactions +3. Has blocks + - Containing zero or more transactions, with their associated events + - With a sequential numeric order + - With a hash + - With a parent hash +4. Has finality for transactions & events that can be expressed as a level of confidence over time + - Confirmations: A number of sequential blocks in the canonical chain that contain the transaction + +## Nonce management + +The nonces for transactions is assigned as early as possible in the flow: +- Before the REST API for submission of the transaction occurs +- After the FFCAPI blockchain connector verifies the transaction can be encoded successfully to the chain +- With protection against multiple parallel API requests for the same signing address +- With stateful persistence meaning the connector knows about all nonces it previously allocated, to avoids duplicates + +This "at source" allocation of nonces provides the strictest assurance of order of transactions possible, +because the order is locked in with the coordination of the business logic of the application submitting the transaction. + +As well as protecting against loss of transactions, this protects against duplication of transactions - even in crash +recovery scenarios with a sufficiently reliable persistence layer. + +### Avoid multiple nonce management systems against the same signing key + +FFTM is optimized for cases where all transactions for a given signing address flow through the +same FireFly connector. If you have signing and nonce allocation happening elsewhere, not going through the +FireFly blockchain connector, then it is possible that the same nonce will be allocated in two places. + +> Be careful that the signing keys for transactions you stream through the Nonce Management of the FireFly +> blockchain connector are not used elsewhere. + +If you must have multiple systems performing nonce management against the same keys you use with FireFly nonce management, +you can set the `transactions.nonceStateTimeout` to `0` (or a low threshold like `100ms`) to cause the nonce management +to query the pending transaction pool of the node every time a nonce is allocated. + +This reduces the window for concurrent nonce allocation to be small (basically the same as if you had +multiple simple web/mobile wallets used against the same key), but it does not eliminate it completely it. + +### Why "at source" nonce management was chosen vs. "at target" + +The "at source" approach to ordering used in FFTM could be compared with the "at target" allocation of nonces used in +[EthConnect](https://github.com/hyperledger/firefly-ethconnect)). + +The "at target" approach optimizes for throughput and ability to send new transactions to the chain, +with an at-least-once delivery assurance to the applications. + +An "at target" algorithm as used in EthConnect could resume transaction delivery automatically without operator intervention +from almost all scenarios, including where nonces have been double allocated. + +However, "at target" comes with two compromises that mean FFTM chose the "at source" approach was chosen for FFTM: + +- Individual transactions might fail in certain scenarios, and subsequent transactions will still be streamed to the chain. + While desirable for automation and throughput, this reduces the ordering guarantee for high value transactions. + +- In crash recovery scenarios the assurance is at-least-once delivery for "at target" ordering (rather than "exactly once"), + although the window can be made very small through various optimizations included in the EthConnect codebase. + +## Policy Manager + +> TODO: Add more detail to describe the pluggability of the Policy Manager component, to perform transaction gas price +> estimation, advanced monitoring of transactions, submission and re-submission of the transactions with updated +> parameters (such as gas price) etc. + +## Event streaming + +One of the most sophisticated parts of the FireFly Connector Framework is the handling of event streams. + +> TODO: More detail to back up this diagram. + +[![Event Streams](./images/fftm_event_streams_architecture.jpg)](./images/fftm_event_streams_architecture.jpg) # Configuration diff --git a/images/fftm_event_streams_architecture.jpg b/images/fftm_event_streams_architecture.jpg new file mode 100644 index 00000000..f9280d87 Binary files /dev/null and b/images/fftm_event_streams_architecture.jpg differ diff --git a/images/firefly_connector_framework_architecture.jpg b/images/firefly_connector_framework_architecture.jpg new file mode 100644 index 00000000..a6584927 Binary files /dev/null and b/images/firefly_connector_framework_architecture.jpg differ diff --git a/images/firefly_transaction_manager.jpg b/images/firefly_transaction_manager.jpg deleted file mode 100644 index d7b9c7f5..00000000 Binary files a/images/firefly_transaction_manager.jpg and /dev/null differ diff --git a/internal/tmmsgs/en_api_descriptions.go b/internal/tmmsgs/en_api_descriptions.go index 1070cac8..bbcb2dab 100644 --- a/internal/tmmsgs/en_api_descriptions.go +++ b/internal/tmmsgs/en_api_descriptions.go @@ -36,6 +36,7 @@ var ( APIEndpointGetEventStreams = ffm("api.endpoints.get.eventstreams", "List event streams") APIEndpointGetEventStream = ffm("api.endpoints.get.eventstream", "Get an event stream with status") APIEndpointDeleteEventStream = ffm("api.endpoints.delete.eventstream", "Delete an event stream") + APIEndpointDeleteTransaction = ffm("api.endpoints.delete.transaction", "Request transaction deletion by the policy engine. Result could be immediate (200), asynchronous (202), or rejected with an error") APIEndpointGetSubscriptions = ffm("api.endpoints.get.subscriptions", "Get listeners - route deprecated in favor of /eventstreams/{streamId}/listeners") APIEndpointGetSubscription = ffm("api.endpoints.get.subscription", "Get listener - route deprecated in favor of /eventstreams/{streamId}/listeners/{listenerId}") APIEndpointPostSubscriptions = ffm("api.endpoints.post.subscriptions", "Create new listener - route deprecated in favor of /eventstreams/{streamId}/listeners") diff --git a/internal/tmmsgs/en_error_messges.go b/internal/tmmsgs/en_error_messges.go index 96e45597..ae024ec9 100644 --- a/internal/tmmsgs/en_error_messges.go +++ b/internal/tmmsgs/en_error_messges.go @@ -82,4 +82,7 @@ var ( MsgInvalidSortDirection = ffe("FF21064", "Sort direction must be 'asc'/'ascending' or 'desc'/'descending': '%s'", http.StatusBadRequest) MsgDuplicateID = ffe("FF21065", "ID '%s' is not unique", http.StatusConflict) MsgTransactionFailed = ffe("FF21066", "Transaction execution failed") + MsgTransactionNotFound = ffe("FF21067", "Transaction '%s' not found", http.StatusNotFound) + MsgPolicyEngineRequestTimeout = ffe("FF21068", "The policy engine did not acknowledge the request after %.2fs", 408) + MsgPolicyEngineRequestInvalid = ffe("FF21069", "Invalid policy engine request type '%d'") ) diff --git a/mocks/policyenginemocks/policy_engine.go b/mocks/policyenginemocks/policy_engine.go index 063bb917..693704df 100644 --- a/mocks/policyenginemocks/policy_engine.go +++ b/mocks/policyenginemocks/policy_engine.go @@ -10,6 +10,8 @@ import ( ffcapi "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" mock "github.com/stretchr/testify/mock" + + policyengine "github.com/hyperledger/firefly-transaction-manager/pkg/policyengine" ) // PolicyEngine is an autogenerated mock type for the PolicyEngine type @@ -18,14 +20,14 @@ type PolicyEngine struct { } // Execute provides a mock function with given fields: ctx, cAPI, mtx -func (_m *PolicyEngine) Execute(ctx context.Context, cAPI ffcapi.API, mtx *apitypes.ManagedTX) (bool, ffcapi.ErrorReason, error) { +func (_m *PolicyEngine) Execute(ctx context.Context, cAPI ffcapi.API, mtx *apitypes.ManagedTX) (policyengine.UpdateType, ffcapi.ErrorReason, error) { ret := _m.Called(ctx, cAPI, mtx) - var r0 bool - if rf, ok := ret.Get(0).(func(context.Context, ffcapi.API, *apitypes.ManagedTX) bool); ok { + var r0 policyengine.UpdateType + if rf, ok := ret.Get(0).(func(context.Context, ffcapi.API, *apitypes.ManagedTX) policyengine.UpdateType); ok { r0 = rf(ctx, cAPI, mtx) } else { - r0 = ret.Get(0).(bool) + r0 = ret.Get(0).(policyengine.UpdateType) } var r1 ffcapi.ErrorReason diff --git a/pkg/apitypes/managed_tx.go b/pkg/apitypes/managed_tx.go index 88d69f04..355d6ed3 100644 --- a/pkg/apitypes/managed_tx.go +++ b/pkg/apitypes/managed_tx.go @@ -61,6 +61,7 @@ type ManagedTX struct { Created *fftypes.FFTime `json:"created"` Updated *fftypes.FFTime `json:"updated"` Status TxStatus `json:"status"` + DeleteRequested *fftypes.FFTime `json:"deleteRequested,omitempty"` SequenceID *fftypes.UUID `json:"sequenceId"` Nonce *fftypes.FFBigInt `json:"nonce"` Gas *fftypes.FFBigInt `json:"gas"` diff --git a/pkg/fftm/manager.go b/pkg/fftm/manager.go index 35a006e4..c06438de 100644 --- a/pkg/fftm/manager.go +++ b/pkg/fftm/manager.go @@ -44,6 +44,26 @@ type Manager interface { Close() } +type policyEngineAPIRequestType int + +const ( + policyEngineAPIRequestTypeDelete policyEngineAPIRequestType = iota +) + +// policyEngineAPIRequest requests are queued to the policy engine thread for processing against a given Transaction +type policyEngineAPIRequest struct { + requestType policyEngineAPIRequestType + txID string + startTime time.Time + response chan policyEngineAPIResponse +} + +type policyEngineAPIResponse struct { + tx *apitypes.ManagedTX + err error + status int // http status code (200 Ok vs. 202 Accepted) - only set for success cases +} + type manager struct { ctx context.Context cancelCtx func() @@ -58,14 +78,15 @@ type manager struct { inflightUpdate chan bool inflight []*pendingState - mux sync.Mutex - lockedNonces map[string]*lockedNonce - eventStreams map[fftypes.UUID]events.Stream - streamsByName map[string]*fftypes.UUID - policyLoopDone chan struct{} - blockListenerDone chan struct{} - started bool - apiServerDone chan error + mux sync.Mutex + policyEngineAPIRequests []*policyEngineAPIRequest + lockedNonces map[string]*lockedNonce + eventStreams map[fftypes.UUID]events.Stream + streamsByName map[string]*fftypes.UUID + policyLoopDone chan struct{} + blockListenerDone chan struct{} + started bool + apiServerDone chan error policyLoopInterval time.Duration nonceStateTimeout time.Duration diff --git a/pkg/fftm/policyloop.go b/pkg/fftm/policyloop.go index 411b4450..06c09f37 100644 --- a/pkg/fftm/policyloop.go +++ b/pkg/fftm/policyloop.go @@ -18,6 +18,7 @@ package fftm import ( "context" + "net/http" "time" "github.com/hyperledger/firefly-common/pkg/fftypes" @@ -28,6 +29,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/internal/tmmsgs" "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" + "github.com/hyperledger/firefly-transaction-manager/pkg/policyengine" ) func (m *manager) policyLoop() { @@ -107,6 +109,9 @@ func (m *manager) updateInflightSet(ctx context.Context) bool { func (m *manager) policyLoopCycle(ctx context.Context, inflightStale bool) { + // Process any synchronous commands first - these might not be in our inflight set + m.processPolicyAPIRequests(ctx) + if inflightStale { if !m.updateInflightSet(ctx) { return @@ -115,7 +120,7 @@ func (m *manager) policyLoopCycle(ctx context.Context, inflightStale bool) { // Go through executing the policy engine against them for _, pending := range m.inflight { - err := m.execPolicy(ctx, pending) + err := m.execPolicy(ctx, pending, false) if err != nil { log.L(ctx).Errorf("Failed policy cycle transaction=%s operation=%s: %s", pending.mtx.TransactionHash, pending.mtx.ID, err) } @@ -123,6 +128,56 @@ func (m *manager) policyLoopCycle(ctx context.Context, inflightStale bool) { } +// processPolicyAPIRequests executes any API calls requested that require policy engine involvement - such as transaction deletions +func (m *manager) processPolicyAPIRequests(ctx context.Context) { + + m.mux.Lock() + requests := m.policyEngineAPIRequests + if len(requests) > 0 { + m.policyEngineAPIRequests = []*policyEngineAPIRequest{} + } + m.mux.Unlock() + + for _, request := range requests { + var pending *pendingState + // If this transaction is in-flight, we use that record + for _, inflight := range m.inflight { + if inflight.mtx.ID == request.txID { + pending = inflight + break + } + } + if pending == nil { + mtx, err := m.getTransactionByID(ctx, request.txID) + if err != nil { + request.response <- policyEngineAPIResponse{err: err} + continue + } + // This transaction was valid, but outside of our in-flight set - we still evaluate the policy engine in-line for it. + // This does NOT cause it to be added to the in-flight set + pending = &pendingState{mtx: mtx} + } + + switch request.requestType { + case policyEngineAPIRequestTypeDelete: + if err := m.execPolicy(ctx, pending, true); err != nil { + request.response <- policyEngineAPIResponse{err: err} + } else { + res := policyEngineAPIResponse{tx: pending.mtx, status: http.StatusAccepted} + if pending.remove { + res.status = http.StatusOK // synchronously completed + } + request.response <- res + } + default: + request.response <- policyEngineAPIResponse{ + err: i18n.NewError(ctx, tmmsgs.MsgPolicyEngineRequestInvalid, request.requestType), + } + } + } + +} + func (m *manager) addError(mtx *apitypes.ManagedTX, reason ffcapi.ErrorReason, err error) { newLen := len(mtx.ErrorHistory) + 1 if newLen > m.errorHistoryCount { @@ -142,20 +197,23 @@ func (m *manager) addError(mtx *apitypes.ManagedTX, reason ffcapi.ErrorReason, e } } -func (m *manager) execPolicy(ctx context.Context, pending *pendingState) (err error) { +func (m *manager) execPolicy(ctx context.Context, pending *pendingState, syncDeleteRequest bool) (err error) { - var updated bool + update := policyengine.UpdateNo completed := false // Check whether this has been confirmed by the confirmation manager m.mux.Lock() mtx := pending.mtx confirmed := pending.confirmed + if syncDeleteRequest && mtx.DeleteRequested == nil { + mtx.DeleteRequested = fftypes.Now() + } m.mux.Unlock() switch { - case confirmed: - updated = true + case confirmed && !syncDeleteRequest: + update = policyengine.UpdateYes completed = true if mtx.Receipt.Success { mtx.Status = apitypes.TxStatusSucceeded @@ -169,11 +227,12 @@ func (m *manager) execPolicy(ctx context.Context, pending *pendingState) (err er // We get woken for lots of reasons to go through the policy loop, but we only want // to drive the policy engine at regular intervals. // So we track the last time we ran the policy engine against each pending item. - if time.Since(pending.lastPolicyCycle) > m.policyLoopInterval { + // We always call the policy engine on every loop, when deletion has been requested. + if syncDeleteRequest || time.Since(pending.lastPolicyCycle) > m.policyLoopInterval { // Pass the state to the pluggable policy engine to potentially perform more actions against it, // such as submitting for the first time, or raising the gas etc. var reason ffcapi.ErrorReason - updated, reason, err = m.policyEngine.Execute(ctx, m.connector, pending.mtx) + update, reason, err = m.policyEngine.Execute(ctx, m.connector, pending.mtx) if err != nil { log.L(ctx).Errorf("Policy engine returned error for transaction %s reason=%s: %s", mtx.ID, reason, err) m.addError(mtx, reason, err) @@ -187,14 +246,25 @@ func (m *manager) execPolicy(ctx context.Context, pending *pendingState) (err er } } - if updated || err != nil { - mtx.Updated = fftypes.Now() - err := m.persistence.WriteTransaction(ctx, mtx, false) - if err != nil { - log.L(ctx).Errorf("Failed to update transaction %s (status=%s): %s", mtx.ID, mtx.Status, err) - return err - } - if completed { + if err == nil { + switch update { + case policyengine.UpdateYes: + mtx.Updated = fftypes.Now() + err := m.persistence.WriteTransaction(ctx, mtx, false) + if err != nil { + log.L(ctx).Errorf("Failed to update transaction %s (status=%s): %s", mtx.ID, mtx.Status, err) + return err + } + if completed { + pending.remove = true // for the next time round the loop + m.markInflightStale() + } + case policyengine.UpdateDelete: + err := m.persistence.DeleteTransaction(ctx, mtx.ID) + if err != nil { + log.L(ctx).Errorf("Failed to delete transaction %s (status=%s): %s", mtx.ID, mtx.Status, err) + return err + } pending.remove = true // for the next time round the loop m.markInflightStale() } @@ -270,3 +340,20 @@ func (m *manager) trackSubmittedTransaction(ctx context.Context, pending *pendin pending.trackingTransactionHash = pending.mtx.TransactionHash } } + +func (m *manager) policyEngineAPIRequest(ctx context.Context, req *policyEngineAPIRequest) policyEngineAPIResponse { + m.mux.Lock() + m.policyEngineAPIRequests = append(m.policyEngineAPIRequests, req) + m.mux.Unlock() + m.markInflightUpdate() + req.response = make(chan policyEngineAPIResponse, 1) + req.startTime = time.Now() + select { + case res := <-req.response: + return res + case <-ctx.Done(): + return policyEngineAPIResponse{ + err: i18n.NewError(ctx, tmmsgs.MsgPolicyEngineRequestTimeout, time.Since(req.startTime).Seconds()), + } + } +} diff --git a/pkg/fftm/policyloop_test.go b/pkg/fftm/policyloop_test.go index 16e2882a..3b5055b4 100644 --- a/pkg/fftm/policyloop_test.go +++ b/pkg/fftm/policyloop_test.go @@ -19,6 +19,7 @@ package fftm import ( "context" "fmt" + "net/http" "testing" "time" @@ -31,6 +32,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/mocks/policyenginemocks" "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" + "github.com/hyperledger/firefly-transaction-manager/pkg/policyengine" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) @@ -318,7 +320,7 @@ func TestPolicyEngineFailStaleThenUpdated(t *testing.T) { m.policyEngine = mpe done1 := make(chan struct{}) mpe.On("Execute", mock.Anything, mock.Anything, mock.Anything). - Return(false, ffcapi.ErrorReason(""), fmt.Errorf("pop")). + Return(policyengine.UpdateNo, ffcapi.ErrorReason(""), fmt.Errorf("pop")). Once(). Run(func(args mock.Arguments) { close(done1) @@ -327,7 +329,7 @@ func TestPolicyEngineFailStaleThenUpdated(t *testing.T) { done2 := make(chan struct{}) mpe.On("Execute", mock.Anything, mock.Anything, mock.Anything). - Return(false, ffcapi.ErrorReason(""), fmt.Errorf("pop")). + Return(policyengine.UpdateNo, ffcapi.ErrorReason(""), fmt.Errorf("pop")). Once(). Run(func(args mock.Arguments) { close(done2) @@ -364,3 +366,129 @@ func TestMarkInflightUpdateDoesNotBlock(t *testing.T) { m.markInflightUpdate() } + +func TestExecPolicyDeleteFail(t *testing.T) { + + _, m, cancel := newTestManagerMockPersistence(t) + defer cancel() + + mpe := &policyenginemocks.PolicyEngine{} + m.policyEngine = mpe + mpe.On("Execute", mock.Anything, mock.Anything, mock.Anything).Return(policyengine.UpdateDelete, ffcapi.ErrorReason(""), nil).Maybe() + + tx := genTestTxn("0xabcd1234", 12345, apitypes.TxStatusPending) + + mp := m.persistence.(*persistencemocks.Persistence) + mp.On("GetTransactionByID", m.ctx, tx.ID).Return(tx, nil) + mp.On("DeleteTransaction", m.ctx, tx.ID).Return(fmt.Errorf("pop")) + + req := &policyEngineAPIRequest{ + requestType: policyEngineAPIRequestTypeDelete, + txID: tx.ID, + response: make(chan policyEngineAPIResponse, 1), + } + m.policyEngineAPIRequests = append(m.policyEngineAPIRequests, req) + + m.processPolicyAPIRequests(m.ctx) + + res := <-req.response + assert.Regexp(t, "pop", res.err) + + mp.AssertExpectations(t) + +} + +func TestExecPolicyDeleteInflightSync(t *testing.T) { + + _, m, cancel := newTestManagerMockPersistence(t) + defer cancel() + + mpe := &policyenginemocks.PolicyEngine{} + m.policyEngine = mpe + mpe.On("Execute", mock.Anything, mock.Anything, mock.Anything).Return(policyengine.UpdateDelete, ffcapi.ErrorReason(""), nil).Maybe() + + tx := genTestTxn("0xabcd1234", 12345, apitypes.TxStatusPending) + m.inflight = []*pendingState{{mtx: tx}} + + mp := m.persistence.(*persistencemocks.Persistence) + mp.On("DeleteTransaction", m.ctx, tx.ID).Return(nil) + + req := &policyEngineAPIRequest{ + requestType: policyEngineAPIRequestTypeDelete, + txID: tx.ID, + response: make(chan policyEngineAPIResponse, 1), + } + m.policyEngineAPIRequests = append(m.policyEngineAPIRequests, req) + + m.processPolicyAPIRequests(m.ctx) + + res := <-req.response + assert.NoError(t, res.err) + assert.Equal(t, http.StatusOK, res.status) + assert.True(t, m.inflight[0].remove) + + mp.AssertExpectations(t) + +} + +func TestExecPolicyDeleteNotFound(t *testing.T) { + + _, m, cancel := newTestManagerMockPersistence(t) + defer cancel() + + mp := m.persistence.(*persistencemocks.Persistence) + mp.On("GetTransactionByID", m.ctx, "bad-id").Return(nil, nil) + + req := &policyEngineAPIRequest{ + requestType: policyEngineAPIRequestTypeDelete, + txID: "bad-id", + response: make(chan policyEngineAPIResponse, 1), + } + m.policyEngineAPIRequests = append(m.policyEngineAPIRequests, req) + + m.processPolicyAPIRequests(m.ctx) + + res := <-req.response + assert.Regexp(t, "FF21067", res.err) + + mp.AssertExpectations(t) + +} + +func TestBadPolicyAPIRequest(t *testing.T) { + + _, m, cancel := newTestManagerMockPersistence(t) + defer cancel() + + tx := genTestTxn("0xabcd1234", 12345, apitypes.TxStatusPending) + mp := m.persistence.(*persistencemocks.Persistence) + mp.On("GetTransactionByID", m.ctx, tx.ID).Return(tx, nil) + + req := &policyEngineAPIRequest{ + requestType: policyEngineAPIRequestType(999), + txID: tx.ID, + response: make(chan policyEngineAPIResponse, 1), + } + m.policyEngineAPIRequests = append(m.policyEngineAPIRequests, req) + + m.processPolicyAPIRequests(m.ctx) + + res := <-req.response + assert.Regexp(t, "FF21069", res.err) + + mp.AssertExpectations(t) + +} + +func TestBadPolicyAPITimeout(t *testing.T) { + + _, m, cancel := newTestManagerMockPersistence(t) + defer cancel() + + ctx, cancelCtx := context.WithCancel(context.Background()) + cancelCtx() + + res := m.policyEngineAPIRequest(ctx, &policyEngineAPIRequest{}) + assert.Regexp(t, "FF21068", res.err) + +} diff --git a/pkg/fftm/route_delete_transaction.go b/pkg/fftm/route_delete_transaction.go new file mode 100644 index 00000000..1f5b8e5a --- /dev/null +++ b/pkg/fftm/route_delete_transaction.go @@ -0,0 +1,45 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fftm + +import ( + "net/http" + + "github.com/hyperledger/firefly-common/pkg/ffapi" + "github.com/hyperledger/firefly-transaction-manager/internal/tmmsgs" + "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" +) + +var deleteTransaction = func(m *manager) *ffapi.Route { + return &ffapi.Route{ + Name: "deleteTransaction", + Path: "/transactions/{transactionId}", + Method: http.MethodDelete, + PathParams: []*ffapi.PathParam{ + {Name: "transactionId", Description: tmmsgs.APIParamTransactionID}, + }, + QueryParams: nil, + Description: tmmsgs.APIEndpointDeleteTransaction, + JSONInputValue: nil, + JSONOutputValue: func() interface{} { return &apitypes.ManagedTX{} }, + JSONOutputCodes: []int{http.StatusOK, http.StatusAccepted}, + JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { + r.SuccessStatus, output, err = m.requestTransactionDeletion(r.Req.Context(), r.PP["transactionId"]) + return output, err + }, + } +} diff --git a/pkg/fftm/route_delete_transaction_test.go b/pkg/fftm/route_delete_transaction_test.go new file mode 100644 index 00000000..044d0f1e --- /dev/null +++ b/pkg/fftm/route_delete_transaction_test.go @@ -0,0 +1,55 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fftm + +import ( + "fmt" + "testing" + + "github.com/go-resty/resty/v2" + "github.com/hyperledger/firefly-transaction-manager/mocks/policyenginemocks" + "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" + "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" + "github.com/hyperledger/firefly-transaction-manager/pkg/policyengine" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestDeleteTransaction(t *testing.T) { + + url, m, done := newTestManager(t) + defer done() + + mpe := &policyenginemocks.PolicyEngine{} + m.policyEngine = mpe + mpe.On("Execute", mock.Anything, mock.Anything, mock.Anything).Return(policyengine.UpdateDelete, ffcapi.ErrorReason(""), nil).Maybe() + + err := m.Start() + assert.NoError(t, err) + + txIn := newTestTxn(t, m, "0xaaaaa", 10001, apitypes.TxStatusSucceeded) + + var txOut *apitypes.ManagedTX + res, err := resty.New().R(). + SetResult(&txOut). + Delete(fmt.Sprintf("%s/transactions/%s", url, txIn.ID)) + assert.NoError(t, err) + assert.Equal(t, 200, res.StatusCode()) + assert.Equal(t, txIn.ID, txOut.ID) + assert.NotNil(t, txOut.DeleteRequested) + +} diff --git a/pkg/fftm/route_get_transactions_test.go b/pkg/fftm/route_get_transactions_test.go index b2fd9924..c23ab383 100644 --- a/pkg/fftm/route_get_transactions_test.go +++ b/pkg/fftm/route_get_transactions_test.go @@ -26,12 +26,13 @@ import ( "github.com/hyperledger/firefly-transaction-manager/mocks/policyenginemocks" "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" + "github.com/hyperledger/firefly-transaction-manager/pkg/policyengine" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) -func newTestTxn(t *testing.T, m *manager, signer string, nonce int64, status apitypes.TxStatus) *apitypes.ManagedTX { - tx := &apitypes.ManagedTX{ +func genTestTxn(signer string, nonce int64, status apitypes.TxStatus) *apitypes.ManagedTX { + return &apitypes.ManagedTX{ ID: fmt.Sprintf("ns1:%s", fftypes.NewUUID()), Created: fftypes.Now(), SequenceID: apitypes.NewULID(), @@ -41,6 +42,10 @@ func newTestTxn(t *testing.T, m *manager, signer string, nonce int64, status api From: signer, }, } +} + +func newTestTxn(t *testing.T, m *manager, signer string, nonce int64, status apitypes.TxStatus) *apitypes.ManagedTX { + tx := genTestTxn(signer, nonce, status) err := m.persistence.WriteTransaction(context.Background(), tx, true) assert.NoError(t, err) return tx @@ -49,7 +54,7 @@ func newTestTxn(t *testing.T, m *manager, signer string, nonce int64, status api func noopPolicyEngine(m *manager) { mpe := &policyenginemocks.PolicyEngine{} m.policyEngine = mpe - mpe.On("Execute", mock.Anything, mock.Anything, mock.Anything).Return(false, ffcapi.ErrorReason(""), nil).Maybe() + mpe.On("Execute", mock.Anything, mock.Anything, mock.Anything).Return(policyengine.UpdateNo, ffcapi.ErrorReason(""), nil).Maybe() } func TestGetTransactions(t *testing.T) { diff --git a/pkg/fftm/routes.go b/pkg/fftm/routes.go index 04f57582..efe4c8f4 100644 --- a/pkg/fftm/routes.go +++ b/pkg/fftm/routes.go @@ -23,6 +23,7 @@ func (m *manager) routes() []*ffapi.Route { deleteEventStream(m), deleteEventStreamListener(m), deleteSubscription(m), + deleteTransaction(m), getEventStream(m), getEventStreamListener(m), getEventStreamListeners(m), diff --git a/pkg/fftm/transaction_management.go b/pkg/fftm/transaction_management.go index fa897318..b562193b 100644 --- a/pkg/fftm/transaction_management.go +++ b/pkg/fftm/transaction_management.go @@ -28,7 +28,14 @@ import ( ) func (m *manager) getTransactionByID(ctx context.Context, txID string) (transaction *apitypes.ManagedTX, err error) { - return m.persistence.GetTransactionByID(ctx, txID) + tx, err := m.persistence.GetTransactionByID(ctx, txID) + if err != nil { + return nil, err + } + if tx == nil { + return nil, i18n.NewError(ctx, tmmsgs.MsgTransactionNotFound, txID) + } + return tx, nil } func (m *manager) getTransactions(ctx context.Context, afterStr, limitStr, signer string, pending bool, dirString string) (transactions []*apitypes.ManagedTX, err error) { @@ -76,3 +83,11 @@ func (m *manager) getTransactions(ctx context.Context, afterStr, limitStr, signe } } + +func (m *manager) requestTransactionDeletion(ctx context.Context, txID string) (status int, transaction *apitypes.ManagedTX, err error) { + res := m.policyEngineAPIRequest(ctx, &policyEngineAPIRequest{ + requestType: policyEngineAPIRequestTypeDelete, + txID: txID, + }) + return res.status, res.tx, res.err +} diff --git a/pkg/fftm/transaction_management_test.go b/pkg/fftm/transaction_management_test.go index f3b7e952..c12e7d79 100644 --- a/pkg/fftm/transaction_management_test.go +++ b/pkg/fftm/transaction_management_test.go @@ -25,6 +25,26 @@ import ( "github.com/stretchr/testify/mock" ) +func TestGetTransactionErrors(t *testing.T) { + + _, m, close := newTestManagerMockPersistence(t) + defer close() + + mp := m.persistence.(*persistencemocks.Persistence) + mp.On("GetTransactionByID", m.ctx, mock.Anything).Return(nil, fmt.Errorf("pop")).Once() + mp.On("GetTransactionByID", m.ctx, mock.Anything).Return(nil, nil).Once() + mp.On("Close", mock.Anything).Return(nil).Maybe() + + _, err := m.getTransactionByID(m.ctx, "id") + assert.Regexp(t, "pop", err) + + _, err = m.getTransactionByID(m.ctx, "id") + assert.Regexp(t, "FF21067", err) + + mp.AssertExpectations(t) + +} + func TestGetTransactionsErrors(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) @@ -50,4 +70,6 @@ func TestGetTransactionsErrors(t *testing.T) { _, err = m.getTransactions(m.ctx, "after-not-found", "", "", false, "") assert.Regexp(t, "FF21062", err) + mp.AssertExpectations(t) + } diff --git a/pkg/policyengine/policyengine.go b/pkg/policyengine/policyengine.go index 96478358..5d9aceb0 100644 --- a/pkg/policyengine/policyengine.go +++ b/pkg/policyengine/policyengine.go @@ -23,6 +23,15 @@ import ( "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" ) +// UpdateType informs FFTM whether the transaction needs an update to be persisted after this execution of the policy engine +type UpdateType int + +const ( + UpdateNo UpdateType = iota // Instructs that no update is necessary + UpdateYes // Instructs that the transaction should be updated in persistence + UpdateDelete // Instructs that the transaction should be removed completely from persistence - generally only returned when TX status is TxStatusDeleteRequested +) + type PolicyEngine interface { - Execute(ctx context.Context, cAPI ffcapi.API, mtx *apitypes.ManagedTX) (updated bool, reason ffcapi.ErrorReason, err error) + Execute(ctx context.Context, cAPI ffcapi.API, mtx *apitypes.ManagedTX) (updateType UpdateType, reason ffcapi.ErrorReason, err error) } diff --git a/pkg/policyengines/simple/simple_policy_engine.go b/pkg/policyengines/simple/simple_policy_engine.go index e721f287..2ef641b6 100644 --- a/pkg/policyengines/simple/simple_policy_engine.go +++ b/pkg/policyengines/simple/simple_policy_engine.go @@ -94,7 +94,7 @@ type simplePolicyInfo struct { } // withPolicyInfo is a convenience helper to run some logic that accesses/updates our policy section -func (p *simplePolicyEngine) withPolicyInfo(ctx context.Context, mtx *apitypes.ManagedTX, fn func(info *simplePolicyInfo) (updated bool, reason ffcapi.ErrorReason, err error)) (updated bool, reason ffcapi.ErrorReason, err error) { +func (p *simplePolicyEngine) withPolicyInfo(ctx context.Context, mtx *apitypes.ManagedTX, fn func(info *simplePolicyInfo) (update policyengine.UpdateType, reason ffcapi.ErrorReason, err error)) (update policyengine.UpdateType, reason ffcapi.ErrorReason, err error) { var info simplePolicyInfo infoBytes := []byte(mtx.PolicyInfo.String()) if len(infoBytes) > 0 { @@ -103,12 +103,12 @@ func (p *simplePolicyEngine) withPolicyInfo(ctx context.Context, mtx *apitypes.M log.L(ctx).Warnf("Failed to parse existing info `%s`: %s", infoBytes, err) } } - updated, reason, err = fn(&info) - if updated { + update, reason, err = fn(&info) + if update != policyengine.UpdateNo { infoBytes, _ = json.Marshal(&info) mtx.PolicyInfo = fftypes.JSONAnyPtrBytes(infoBytes) } - return updated, reason, err + return update, reason, err } func (p *simplePolicyEngine) submitTX(ctx context.Context, cAPI ffcapi.API, mtx *apitypes.ManagedTX) (reason ffcapi.ErrorReason, err error) { @@ -146,27 +146,33 @@ func (p *simplePolicyEngine) submitTX(ctx context.Context, cAPI ffcapi.API, mtx return "", nil } -func (p *simplePolicyEngine) Execute(ctx context.Context, cAPI ffcapi.API, mtx *apitypes.ManagedTX) (updated bool, reason ffcapi.ErrorReason, err error) { +func (p *simplePolicyEngine) Execute(ctx context.Context, cAPI ffcapi.API, mtx *apitypes.ManagedTX) (update policyengine.UpdateType, reason ffcapi.ErrorReason, err error) { + + // Simply policy engine allows deletion of the transaction without additional checks ( ensuring the TX has not been submitted / gap filling the nonce etc. ) + if mtx.DeleteRequested != nil { + return policyengine.UpdateDelete, "", nil + } + // Simple policy engine only submits once. if mtx.FirstSubmit == nil { // Only calculate gas price here in the simple policy engine mtx.GasPrice, err = p.getGasPrice(ctx, cAPI) if err != nil { - return false, "", err + return policyengine.UpdateNo, "", err } // Submit the first time if reason, err := p.submitTX(ctx, cAPI, mtx); err != nil { - return true, reason, err + return policyengine.UpdateYes, reason, err } mtx.FirstSubmit = mtx.LastSubmit - return true, "", nil + return policyengine.UpdateYes, "", nil } else if mtx.Receipt == nil { // A more sophisticated policy engine would look at the reason for the lack of a receipt, and consider taking progressive // action such as increasing the gas cost slowly over time. This simple example shows how the policy engine // can use the FireFly core operation as a store for its historical state/decisions (in this case the last time we warned). - return p.withPolicyInfo(ctx, mtx, func(info *simplePolicyInfo) (updated bool, reason ffcapi.ErrorReason, err error) { + return p.withPolicyInfo(ctx, mtx, func(info *simplePolicyInfo) (update policyengine.UpdateType, reason ffcapi.ErrorReason, err error) { lastWarnTime := info.LastWarnTime if lastWarnTime == nil { lastWarnTime = mtx.FirstSubmit @@ -179,17 +185,17 @@ func (p *simplePolicyEngine) Execute(ctx context.Context, cAPI ffcapi.API, mtx * // We do a resubmit at this point - as it might no longer be in the TX pool if reason, err := p.submitTX(ctx, cAPI, mtx); err != nil { if reason != ffcapi.ErrorKnownTransaction { - return true, reason, err + return policyengine.UpdateYes, reason, err } } - return true, "", nil + return policyengine.UpdateYes, "", nil } - return false, "", nil + return policyengine.UpdateNo, "", nil }) } // No action in the case we have a receipt - return false, "", nil + return policyengine.UpdateNo, "", nil } // getGasPrice either uses a fixed gas price, or invokes a gas station API diff --git a/pkg/policyengines/simple/simple_policy_engine_test.go b/pkg/policyengines/simple/simple_policy_engine_test.go index 5d2ab4e8..d9f5a406 100644 --- a/pkg/policyengines/simple/simple_policy_engine_test.go +++ b/pkg/policyengines/simple/simple_policy_engine_test.go @@ -31,6 +31,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/mocks/ffcapimocks" "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" + "github.com/hyperledger/firefly-transaction-manager/pkg/policyengine" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) @@ -82,7 +83,7 @@ func TestFixedGasPriceOK(t *testing.T) { ctx := context.Background() updated, reason, err := p.Execute(ctx, mockFFCAPI, mtx) assert.NoError(t, err) - assert.True(t, updated) + assert.Equal(t, policyengine.UpdateYes, updated) assert.Empty(t, reason) assert.NotNil(t, mtx.FirstSubmit) assert.NotNil(t, mtx.LastSubmit) @@ -147,7 +148,7 @@ func TestGasOracleSendOK(t *testing.T) { updated, reason, err := p.Execute(ctx, mockFFCAPI, mtx) assert.NoError(t, err) assert.Empty(t, reason) - assert.True(t, updated) + assert.Equal(t, policyengine.UpdateYes, updated) assert.NotNil(t, mtx.FirstSubmit) assert.NotNil(t, mtx.LastSubmit) assert.Equal(t, `{"unit":"gwei","value":32.146027800733336}`, mtx.GasPrice.String()) @@ -191,7 +192,7 @@ func TestConnectorGasOracleSendOK(t *testing.T) { updated, reason, err := p.Execute(ctx, mockFFCAPI, mtx) assert.NoError(t, err) assert.Empty(t, reason) - assert.True(t, updated) + assert.Equal(t, policyengine.UpdateYes, updated) assert.NotNil(t, mtx.FirstSubmit) assert.NotNil(t, mtx.LastSubmit) assert.Equal(t, `"12345"`, mtx.GasPrice.String()) @@ -397,7 +398,7 @@ func TestWarnStaleWarningCannotParse(t *testing.T) { ctx := context.Background() updated, _, err := p.Execute(ctx, mockFFCAPI, mtx) assert.NoError(t, err) - assert.True(t, updated) + assert.Equal(t, policyengine.UpdateYes, updated) assert.NotEmpty(t, mtx.PolicyInfo.JSONObject().GetString("lastWarnTime")) mockFFCAPI.AssertExpectations(t) @@ -426,7 +427,7 @@ func TestKnownTransactionHashKnown(t *testing.T) { ctx := context.Background() updated, _, err := p.Execute(ctx, mockFFCAPI, mtx) assert.NoError(t, err) - assert.True(t, updated) + assert.Equal(t, policyengine.UpdateYes, updated) mockFFCAPI.AssertExpectations(t) } @@ -456,7 +457,7 @@ func TestWarnStaleAdditionalWarningResubmitFail(t *testing.T) { updated, reason, err := p.Execute(ctx, mockFFCAPI, mtx) assert.Regexp(t, "pop", err) assert.Empty(t, reason) - assert.True(t, updated) + assert.Equal(t, policyengine.UpdateYes, updated) assert.NotEmpty(t, mtx.PolicyInfo.JSONObject().GetString("lastWarnTime")) mockFFCAPI.AssertExpectations(t) @@ -486,7 +487,7 @@ func TestWarnStaleNoWarning(t *testing.T) { updated, reason, err := p.Execute(ctx, mockFFCAPI, mtx) assert.Empty(t, reason) assert.NoError(t, err) - assert.False(t, updated) + assert.Equal(t, policyengine.UpdateNo, updated) mockFFCAPI.AssertExpectations(t) } @@ -516,7 +517,29 @@ func TestNoOpWithReceipt(t *testing.T) { updated, reason, err := p.Execute(ctx, mockFFCAPI, mtx) assert.Empty(t, reason) assert.NoError(t, err) - assert.False(t, updated) + assert.Equal(t, policyengine.UpdateNo, updated) + + mockFFCAPI.AssertExpectations(t) +} + +func TestAllowsDeleteRequest(t *testing.T) { + f, conf := newTestPolicyEngineFactory(t) + conf.Set(FixedGasPrice, `12345`) + conf.Set(ResubmitInterval, "100s") + p, err := f.NewPolicyEngine(context.Background(), conf) + assert.NoError(t, err) + + mtx := &apitypes.ManagedTX{ + DeleteRequested: fftypes.Now(), + } + + mockFFCAPI := &ffcapimocks.API{} + + ctx := context.Background() + updated, reason, err := p.Execute(ctx, mockFFCAPI, mtx) + assert.Empty(t, reason) + assert.NoError(t, err) + assert.Equal(t, policyengine.UpdateDelete, updated) mockFFCAPI.AssertExpectations(t) }