From 43b4f44c7dcaf592aa956eec7a24f26b8a472eab Mon Sep 17 00:00:00 2001 From: andrew-coleman Date: Fri, 20 Jan 2023 16:06:36 +0000 Subject: [PATCH] Gateway should apply OrdererEndpointOverrides The peer local config allows the endpoint addresses and TLS root certs or ordering nodes to be overridden. The gateway should apply these overrides when connecting to orderer nodes for transaction submit. Signed-off-by: andrew-coleman (cherry picked from commit aea8ea8dad0935cfce872422a888094a50e846cb) --- internal/pkg/gateway/api.go | 6 +- internal/pkg/gateway/api_test.go | 134 ++++++++++++++++++++++++------- internal/pkg/gateway/apiutils.go | 2 +- internal/pkg/gateway/endpoint.go | 29 ++++--- internal/pkg/gateway/gateway.go | 19 ++++- internal/pkg/gateway/registry.go | 8 +- 6 files changed, 146 insertions(+), 52 deletions(-) diff --git a/internal/pkg/gateway/api.go b/internal/pkg/gateway/api.go index 7dc5b49a139..1c563ffe104 100644 --- a/internal/pkg/gateway/api.go +++ b/internal/pkg/gateway/api.go @@ -431,11 +431,11 @@ func (gs *Server) Submit(ctx context.Context, request *gp.SubmitRequest) (*gp.Su var errDetails []proto.Message for _, index := range rand.Perm(len(orderers)) { orderer := orderers[index] - logger.Infow("Sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.address) + logger.Infow("Sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.logAddress) response, err := gs.broadcast(ctx, orderer, txn) if err != nil { errDetails = append(errDetails, errorDetail(orderer.endpointConfig, err.Error())) - logger.Warnw("Error sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.address, "err", err) + logger.Warnw("Error sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.logAddress, "err", err) continue } @@ -444,7 +444,7 @@ func (gs *Server) Submit(ctx context.Context, request *gp.SubmitRequest) (*gp.Su return &gp.SubmitResponse{}, nil } - logger.Warnw("Unsuccessful response sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.address, "status", status, "info", response.GetInfo()) + logger.Warnw("Unsuccessful response sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.logAddress, "status", status, "info", response.GetInfo()) if status >= 400 && status < 500 { // client error - don't retry diff --git a/internal/pkg/gateway/api_test.go b/internal/pkg/gateway/api_test.go index 2e765e9867f..e65e25f891e 100644 --- a/internal/pkg/gateway/api_test.go +++ b/internal/pkg/gateway/api_test.go @@ -36,6 +36,7 @@ import ( ledgermocks "github.com/hyperledger/fabric/internal/pkg/gateway/ledger/mocks" "github.com/hyperledger/fabric/internal/pkg/gateway/mocks" idmocks "github.com/hyperledger/fabric/internal/pkg/identity/mocks" + "github.com/hyperledger/fabric/internal/pkg/peer/orderers" "github.com/hyperledger/fabric/protoutil" "github.com/pkg/errors" "github.com/spf13/viper" @@ -123,32 +124,33 @@ const ( ) type testDef struct { - name string - plan endorsementPlan - layouts []endorsementLayout - members []networkMember - config *dp.ConfigResult - identity []byte - localResponse string - errString string - errCode codes.Code - errDetails []*pb.ErrorDetail - endpointDefinition *endpointDef - endorsingOrgs []string - postSetup func(t *testing.T, def *preparedTest) - postTest func(t *testing.T, def *preparedTest) - expectedEndorsers []string - finderStatus *commit.Status - finderErr error - eventErr error - policyErr error - expectedResponse proto.Message - expectedResponses []proto.Message - transientData map[string][]byte - interest *peer.ChaincodeInterest - blocks []*cp.Block - startPosition *ab.SeekPosition - afterTxID string + name string + plan endorsementPlan + layouts []endorsementLayout + members []networkMember + config *dp.ConfigResult + identity []byte + localResponse string + errString string + errCode codes.Code + errDetails []*pb.ErrorDetail + endpointDefinition *endpointDef + endorsingOrgs []string + postSetup func(t *testing.T, def *preparedTest) + postTest func(t *testing.T, def *preparedTest) + expectedEndorsers []string + finderStatus *commit.Status + finderErr error + eventErr error + policyErr error + expectedResponse proto.Message + expectedResponses []proto.Message + transientData map[string][]byte + interest *peer.ChaincodeInterest + blocks []*cp.Block + startPosition *ab.SeekPosition + afterTxID string + ordererEndpointOverrides map[string]*orderers.Endpoint } type preparedTest struct { @@ -1349,6 +1351,67 @@ func TestSubmit(t *testing.T) { }, }, }, + { + name: "orderer endpoint overrides", + plan: endorsementPlan{ + "g1": {{endorser: localhostMock}}, + }, + ordererEndpointOverrides: map[string]*orderers.Endpoint{ + "orderer1:7050": {Address: "override1:1234"}, + "orderer3:7050": {Address: "override3:4321"}, + }, + config: &dp.ConfigResult{ + Orderers: map[string]*dp.Endpoints{ + "msp1": { + Endpoint: []*dp.Endpoint{ + {Host: "orderer1", Port: 7050}, + {Host: "orderer2", Port: 7050}, + {Host: "orderer3", Port: 7050}, + }, + }, + }, + Msps: map[string]*msp.FabricMSPConfig{ + "msp1": { + TlsRootCerts: [][]byte{}, + }, + }, + }, + endpointDefinition: &endpointDef{ + proposalResponseStatus: 200, + ordererBroadcastError: status.Error(codes.Unavailable, "Orderer not listening!"), + }, + errCode: codes.Unavailable, + errString: "no orderers could successfully process transaction", + errDetails: []*pb.ErrorDetail{ + { + Address: "override1:1234 (mapped from orderer1:7050)", + MspId: "msp1", + Message: "rpc error: code = Unavailable desc = Orderer not listening!", + }, + { + Address: "orderer2:7050", + MspId: "msp1", + Message: "rpc error: code = Unavailable desc = Orderer not listening!", + }, + { + Address: "override3:4321 (mapped from orderer3:7050)", + MspId: "msp1", + Message: "rpc error: code = Unavailable desc = Orderer not listening!", + }, + }, + postTest: func(t *testing.T, def *preparedTest) { + var addresses []string + for i := 0; i < def.dialer.CallCount(); i++ { + _, address, _ := def.dialer.ArgsForCall(i) + addresses = append(addresses, address) + } + require.Contains(t, addresses, "override1:1234") + require.NotContains(t, addresses, "orderer1:7050") + require.Contains(t, addresses, "orderer2:7050") + require.Contains(t, addresses, "override3:4321") + require.NotContains(t, addresses, "orderer3:7050") + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -1368,11 +1431,18 @@ func TestSubmit(t *testing.T) { if checkError(t, &tt, err) { require.Nil(t, submitResponse, "response on error") + if tt.postTest != nil { + tt.postTest(t, test) + } return } require.NoError(t, err) require.True(t, proto.Equal(&pb.SubmitResponse{}, submitResponse), "Incorrect response") + + if tt.postTest != nil { + tt.postTest(t, test) + } }) } } @@ -2001,6 +2071,7 @@ func TestNilArgs(t *testing.T) { &comm.SecureOptions{}, config.GetOptions(viper.New()), nil, + nil, ) ctx := context.Background() @@ -2220,11 +2291,11 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest { Endpoint: "localhost:7051", } - server := newServer(localEndorser, disc, mockFinder, mockPolicy, mockLedgerProvider, member, "msp1", &comm.SecureOptions{}, options, nil) + server := newServer(localEndorser, disc, mockFinder, mockPolicy, mockLedgerProvider, member, "msp1", &comm.SecureOptions{}, options, nil, tt.ordererEndpointOverrides) dialer := &mocks.Dialer{} dialer.Returns(nil, nil) - server.registry.endpointFactory = createEndpointFactory(t, epDef, dialer.Spy) + server.registry.endpointFactory = createEndpointFactory(t, epDef, dialer.Spy, tt.ordererEndpointOverrides) ctx := context.WithValue(context.Background(), contextKey("orange"), "apples") @@ -2413,7 +2484,7 @@ func createMockPeer(t *testing.T, endorser *endorserState) *dp.Peer { } } -func createEndpointFactory(t *testing.T, definition *endpointDef, dialer dialer) *endpointFactory { +func createEndpointFactory(t *testing.T, definition *endpointDef, dialer dialer, ordererEndpointOverrides map[string]*orderers.Endpoint) *endpointFactory { var endpoint string ca, err := tlsgen.NewCA() require.NoError(t, err, "failed to create CA") @@ -2446,8 +2517,9 @@ func createEndpointFactory(t *testing.T, definition *endpointDef, dialer dialer) endpoint = target return dialer(ctx, target, opts...) }, - clientKey: pair.Key, - clientCert: pair.Cert, + clientKey: pair.Key, + clientCert: pair.Cert, + ordererEndpointOverrides: ordererEndpointOverrides, } } diff --git a/internal/pkg/gateway/apiutils.go b/internal/pkg/gateway/apiutils.go index ca60e6d8283..411fcd71b80 100644 --- a/internal/pkg/gateway/apiutils.go +++ b/internal/pkg/gateway/apiutils.go @@ -95,7 +95,7 @@ func toRpcStatus(err error) *status.Status { } func errorDetail(e *endpointConfig, msg string) *gp.ErrorDetail { - return &gp.ErrorDetail{Address: e.address, MspId: e.mspid, Message: msg} + return &gp.ErrorDetail{Address: e.logAddress, MspId: e.mspid, Message: msg} } func getResultFromProposalResponse(proposalResponse *peer.ProposalResponse) ([]byte, error) { diff --git a/internal/pkg/gateway/endpoint.go b/internal/pkg/gateway/endpoint.go index 6ccf0df2c30..c63537d8f01 100644 --- a/internal/pkg/gateway/endpoint.go +++ b/internal/pkg/gateway/endpoint.go @@ -15,6 +15,7 @@ import ( "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/internal/pkg/comm" + "github.com/hyperledger/fabric/internal/pkg/peer/orderers" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" ) @@ -34,6 +35,7 @@ type orderer struct { type endpointConfig struct { pkiid common.PKIidType address string + logAddress string mspid string tlsRootCerts [][]byte } @@ -47,12 +49,13 @@ type ( type dialer func(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) type endpointFactory struct { - timeout time.Duration - connectEndorser endorserConnector - connectOrderer ordererConnector - dialer dialer - clientCert []byte - clientKey []byte + timeout time.Duration + connectEndorser endorserConnector + connectOrderer ordererConnector + dialer dialer + clientCert []byte + clientKey []byte + ordererEndpointOverrides map[string]*orderers.Endpoint } func (ef *endpointFactory) newEndorser(pkiid common.PKIidType, address, mspid string, tlsRootCerts [][]byte) (*endorser, error) { @@ -74,12 +77,20 @@ func (ef *endpointFactory) newEndorser(pkiid common.PKIidType, address, mspid st return &endorser{ client: connectEndorser(conn), closeConnection: close, - endpointConfig: &endpointConfig{pkiid: pkiid, address: address, mspid: mspid, tlsRootCerts: tlsRootCerts}, + endpointConfig: &endpointConfig{pkiid: pkiid, address: address, logAddress: address, mspid: mspid, tlsRootCerts: tlsRootCerts}, }, nil } func (ef *endpointFactory) newOrderer(address, mspid string, tlsRootCerts [][]byte) (*orderer, error) { - conn, err := ef.newConnection(address, tlsRootCerts) + connAddress := address + logAddess := address + connCerts := tlsRootCerts + if override, ok := ef.ordererEndpointOverrides[address]; ok { + connAddress = override.Address + connCerts = override.RootCerts + logAddess = fmt.Sprintf("%s (mapped from %s)", connAddress, address) + } + conn, err := ef.newConnection(connAddress, connCerts) if err != nil { return nil, err } @@ -90,7 +101,7 @@ func (ef *endpointFactory) newOrderer(address, mspid string, tlsRootCerts [][]by return &orderer{ client: connectOrderer(conn), closeConnection: conn.Close, - endpointConfig: &endpointConfig{address: address, mspid: mspid, tlsRootCerts: tlsRootCerts}, + endpointConfig: &endpointConfig{address: address, logAddress: logAddess, mspid: mspid, tlsRootCerts: tlsRootCerts}, }, nil } diff --git a/internal/pkg/gateway/gateway.go b/internal/pkg/gateway/gateway.go index 9fadd7546b8..30973e831cb 100644 --- a/internal/pkg/gateway/gateway.go +++ b/internal/pkg/gateway/gateway.go @@ -17,6 +17,7 @@ import ( "github.com/hyperledger/fabric/internal/pkg/gateway/commit" "github.com/hyperledger/fabric/internal/pkg/gateway/config" "github.com/hyperledger/fabric/internal/pkg/gateway/ledger" + "github.com/hyperledger/fabric/internal/pkg/peer/orderers" "google.golang.org/grpc" ) @@ -77,6 +78,7 @@ func CreateServer( secureOptions, options, systemChaincodes, + peerInstance.OrdererEndpointOverrides, ) peerInstance.AddConfigCallbacks(server.registry.configUpdate) @@ -94,13 +96,22 @@ func newServer(localEndorser peerproto.EndorserClient, secureOptions *comm.SecureOptions, options config.Options, systemChaincodes scc.BuiltinSCCs, + ordererEndpointOverrides map[string]*orderers.Endpoint, ) *Server { return &Server{ registry: ®istry{ - localEndorser: &endorser{client: localEndorser, endpointConfig: &endpointConfig{pkiid: localInfo.PKIid, address: localInfo.Endpoint, mspid: localMSPID}}, - discovery: discovery, - logger: logger, - endpointFactory: &endpointFactory{timeout: options.DialTimeout, clientCert: secureOptions.Certificate, clientKey: secureOptions.Key}, + localEndorser: &endorser{ + client: localEndorser, + endpointConfig: &endpointConfig{pkiid: localInfo.PKIid, address: localInfo.Endpoint, logAddress: localInfo.Endpoint, mspid: localMSPID}, + }, + discovery: discovery, + logger: logger, + endpointFactory: &endpointFactory{ + timeout: options.DialTimeout, + clientCert: secureOptions.Certificate, + clientKey: secureOptions.Key, + ordererEndpointOverrides: ordererEndpointOverrides, + }, remoteEndorsers: map[string]*endorser{}, channelInitialized: map[string]bool{}, systemChaincodes: systemChaincodes, diff --git a/internal/pkg/gateway/registry.go b/internal/pkg/gateway/registry.go index 0fcaf6f1a6b..82ed4b5a1bc 100644 --- a/internal/pkg/gateway/registry.go +++ b/internal/pkg/gateway/registry.go @@ -275,7 +275,7 @@ func (reg *registry) orderers(channel string) ([]*orderer, error) { client, err := reg.endpointFactory.newOrderer(ep.address, ep.mspid, ep.tlsRootCerts) if err != nil { // Failed to connect to this orderer for some reason. Log the problem and skip to the next one. - reg.logger.Warnw("Failed to connect to orderer", "address", ep.address, "err", err) + reg.logger.Warnw("Failed to connect to orderer", "address", ep.logAddress, "err", err) continue } var loaded bool @@ -285,10 +285,10 @@ func (reg *registry) orderers(channel string) ([]*orderer, error) { err = client.closeConnection() if err != nil { // Failed to close this new connection. Log the problem. - reg.logger.Warnw("Failed to close connection to orderer", "address", ep.address, "err", err) + reg.logger.Warnw("Failed to close connection to orderer", "address", ep.logAddress, "err", err) } } else { - reg.logger.Infow("Added orderer to registry", "address", ep.address) + reg.logger.Infow("Added orderer to registry", "address", ep.logAddress) } } orderers = append(orderers, entry.(*orderer)) @@ -460,7 +460,7 @@ func (reg *registry) closeStaleOrdererConnections(channel string, channelOrderer if found { err := client.(*orderer).closeConnection() if err != nil { - reg.logger.Errorw("Failed to close connection to orderer", "address", ep.address, "mspid", ep.mspid, "err", err) + reg.logger.Errorw("Failed to close connection to orderer", "address", ep.logAddress, "mspid", ep.mspid, "err", err) } } }