diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go index 89b1da09c..134ac8f53 100644 --- a/pkg/relayer/interface.go +++ b/pkg/relayer/interface.go @@ -81,9 +81,9 @@ type RelayServer interface { // TODO_TECHDEBT: add architecture diagrams covering observable flows throughout // the relayer package. type RelayerSessionsManager interface { - // IncludeRelays receives an observable of relays that should be included + // InsertRelays receives an observable of relays that should be included // in their respective session's SMST (tree). - IncludeRelays(relayObs observable.Observable[*MinedRelay]) + InsertRelays(minedRelaysObs observable.Observable[*MinedRelay]) // Start iterates over the session trees at the end of each, respective, session. // The session trees are piped through a series of map operations which progress @@ -91,7 +91,7 @@ type RelayerSessionsManager interface { // network as necessary. Start(ctx context.Context) - // Stop unsubscribes all observables from the RelaysToInclude observable which + // Stop unsubscribes all observables from the InsertRelays observable which // will close downstream observables as they drain. // // TODO_TECHDEBT: Either add a mechanism to wait for draining to complete diff --git a/pkg/relayer/session/claim.go b/pkg/relayer/session/claim.go index 737ad3d9c..712e7a9e5 100644 --- a/pkg/relayer/session/claim.go +++ b/pkg/relayer/session/claim.go @@ -13,39 +13,38 @@ import ( "github.com/pokt-network/poktroll/pkg/relayer/protocol" ) -// createClaims maps over the sessionsToClaim observable. For each claim, it: +// createClaims maps over the sessionsToClaimObs observable. For each claim, it: // 1. Calculates the earliest block height at which it is safe to CreateClaim // 2. Waits for said block and creates the claim on-chain // 3. Maps errors to a new observable and logs them // 4. Returns an observable of the successfully claimed sessions // It DOES NOT BLOCK as map operations run in their own goroutines. func (rs *relayerSessionsManager) createClaims(ctx context.Context) observable.Observable[relayer.SessionTree] { - // Map SessionsToClaim observable to a new observable of the same type which - // is notified when the session is eligible to be claimed. - // relayer.SessionTree ==> relayer.SessionTree + // Map sessionsToClaimObs to a new observable of the same type which is notified + // when the session is eligible to be claimed. sessionsWithOpenClaimWindowObs := channel.Map( - ctx, rs.sessionsToClaim, + ctx, rs.sessionsToClaimObs, rs.mapWaitForEarliestCreateClaimHeight, ) failedCreateClaimSessionsObs, failedCreateClaimSessionsPublishCh := channel.NewObservable[relayer.SessionTree]() - // Map sessionsWithOpenClaimWindow to a new observable of an either type, + // Map sessionsWithOpenClaimWindowObs to a new observable of an either type, // populated with the session or an error, which is notified after the session // claim has been created or an error has been encountered, respectively. eitherClaimedSessionsObs := channel.Map( - ctx, sessionsWithOpenClaimWindow, + ctx, sessionsWithOpenClaimWindowObs, rs.newMapClaimSessionFn(failedCreateClaimSessionsPublishCh), ) // TODO_TECHDEBT: pass failed create claim sessions to some retry mechanism. - _ = failedCreateClaimSessions - logging.LogErrors(ctx, filter.EitherError(ctx, eitherClaimedSessions)) + _ = failedCreateClaimSessionsObs + logging.LogErrors(ctx, filter.EitherError(ctx, eitherClaimedSessionsObs)) // Map eitherClaimedSessions to a new observable of relayer.SessionTree which // is notified when the corresponding claim creation succeeded. - return filter.EitherSuccess(ctx, eitherClaimedSessions) + return filter.EitherSuccess(ctx, eitherClaimedSessionsObs) } // mapWaitForEarliestCreateClaimHeight is intended to be used as a MapFn. It diff --git a/pkg/relayer/session/proof.go b/pkg/relayer/session/proof.go index 644073e7e..4a7c415aa 100644 --- a/pkg/relayer/session/proof.go +++ b/pkg/relayer/session/proof.go @@ -23,27 +23,27 @@ func (rs *relayerSessionsManager) submitProofs( ctx context.Context, claimedSessionsObs observable.Observable[relayer.SessionTree], ) { - // Map claimedSessions to a new observable of the same type which is notified + // Map claimedSessionsObs to a new observable of the same type which is notified // when the session is eligible to be proven. sessionsWithOpenProofWindowObs := channel.Map( - ctx, claimedSessions, + ctx, claimedSessionsObs, rs.mapWaitForEarliestSubmitProofHeight, ) - failedSubmitProofSessionsObs, failedSubmitProveSessionsPublishCh := + failedSubmitProofSessionsObs, failedSubmitProofSessionsPublishCh := channel.NewObservable[relayer.SessionTree]() // Map sessionsWithOpenProofWindow to a new observable of an either type, // populated with the session or an error, which is notified after the session // proof has been submitted or an error has been encountered, respectively. eitherProvenSessionsObs := channel.Map( - ctx, sessionsWithOpenProofWindow, - rs.newMapProveSessionFn(failedSubmitProveSessionsPublishCh), + ctx, sessionsWithOpenProofWindowObs, + rs.newMapProveSessionFn(failedSubmitProofSessionsPublishCh), ) // TODO_TECHDEBT: pass failed submit proof sessions to some retry mechanism. - _ = failedSubmitProofSessions - logging.LogErrors(ctx, filter.EitherError(ctx, eitherProvenSessions)) + _ = failedSubmitProofSessionsObs + logging.LogErrors(ctx, filter.EitherError(ctx, eitherProvenSessionsObs)) } // mapWaitForEarliestSubmitProofHeight is intended to be used as a MapFn. It @@ -91,7 +91,7 @@ func (rs *relayerSessionsManager) newMapProveSessionFn( ctx context.Context, session relayer.SessionTree, ) (_ either.SessionTree, skip bool) { - // TODO_BLOCKER: The block that'll be used as a source of entropy for which + // TODO_BLOCKER: The block that'll be used as a source of entropy for which // branch(es) to prove should be deterministic and use on-chain governance params // rather than latest. latestBlock := rs.blockClient.LatestBlock(ctx) @@ -107,7 +107,7 @@ func (rs *relayerSessionsManager) newMapProveSessionFn( *session.GetSessionHeader(), proof, ); err != nil { - failedSubmitProofSessions <- session + failedSubmitProofSessionsCh <- session return either.Error[relayer.SessionTree](err), false } diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go index 9e1d27aac..4cf8b0d2c 100644 --- a/pkg/relayer/session/session.go +++ b/pkg/relayer/session/session.go @@ -24,8 +24,8 @@ type sessionsTreesMap = map[int64]map[string]relayer.SessionTree type relayerSessionsManager struct { relayObs observable.Observable[*relayer.MinedRelay] - // sessionsToClaim notifies about sessions that are ready to be claimed. - sessionsToClaim observable.Observable[relayer.SessionTree] + // sessionsToClaimObs notifies about sessions that are ready to be claimed. + sessionsToClaimObs observable.Observable[relayer.SessionTree] // sessionTrees is a map of block heights pointing to a map of SessionTrees // indexed by their sessionId. @@ -70,7 +70,7 @@ func NewRelayerSessions( return nil, err } - rs.sessionsToClaim = channel.MapExpand[client.Block, relayer.SessionTree]( + rs.sessionsToClaimObs = channel.MapExpand[client.Block, relayer.SessionTree]( ctx, rs.blockClient.CommittedBlocksSequence(ctx), rs.mapBlockToSessionsToClaim, @@ -88,14 +88,14 @@ func (rs *relayerSessionsManager) Start(ctx context.Context) { // notified if an error was encountered while attampting to add the relay to // the session tree. miningErrorsObs := channel.Map(ctx, rs.relayObs, rs.mapAddRelayToSessionTree) - logging.LogErrors(ctx, miningErrors) + logging.LogErrors(ctx, miningErrorsObs) // Start claim/proof pipeline. claimedSessionsObs := rs.createClaims(ctx) - rs.submitProofs(ctx, claimedSessionsObservable) + rs.submitProofs(ctx, claimedSessionsObs) } -// Stop unsubscribes all observables from the RelaysToInclude observable which +// Stop unsubscribes all observables from the InsertRelays observable which // will close downstream observables as they drain. // // TODO_TECHDEBT: Either add a mechanism to wait for draining to complete @@ -106,7 +106,7 @@ func (rs *relayerSessionsManager) Stop() { } // SessionsToClaim returns an observable that notifies when sessions are ready to be claimed. -func (rs *relayerSessionsManager) IncludeRelays(relays observable.Observable[*relayer.MinedRelay]) { +func (rs *relayerSessionsManager) InsertRelays(relays observable.Observable[*relayer.MinedRelay]) { rs.relayObs = relays }