Skip to content

Commit

Permalink
fix: incomplete refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Nov 10, 2023
1 parent 916c0dd commit 55c8118
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 29 deletions.
6 changes: 3 additions & 3 deletions pkg/relayer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,17 @@ type RelayServer interface {
// TODO_TECHDEBT: add architecture diagrams covering observable flows throughout
// the relayer package.
type RelayerSessionsManager interface {
// IncludeRelays receives an observable of relays that should be included
// InsertRelays receives an observable of relays that should be included
// in their respective session's SMST (tree).
IncludeRelays(relayObs observable.Observable[*MinedRelay])
InsertRelays(minedRelaysObs observable.Observable[*MinedRelay])

// Start iterates over the session trees at the end of each, respective, session.
// The session trees are piped through a series of map operations which progress
// them through the claim/proof lifecycle, broadcasting transactions to the
// network as necessary.
Start(ctx context.Context)

// Stop unsubscribes all observables from the RelaysToInclude observable which
// Stop unsubscribes all observables from the InsertRelays observable which
// will close downstream observables as they drain.
//
// TODO_TECHDEBT: Either add a mechanism to wait for draining to complete
Expand Down
19 changes: 9 additions & 10 deletions pkg/relayer/session/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,38 @@ import (
"github.com/pokt-network/poktroll/pkg/relayer/protocol"
)

// createClaims maps over the sessionsToClaim observable. For each claim, it:
// createClaims maps over the sessionsToClaimObs observable. For each claim, it:
// 1. Calculates the earliest block height at which it is safe to CreateClaim
// 2. Waits for said block and creates the claim on-chain
// 3. Maps errors to a new observable and logs them
// 4. Returns an observable of the successfully claimed sessions
// It DOES NOT BLOCK as map operations run in their own goroutines.
func (rs *relayerSessionsManager) createClaims(ctx context.Context) observable.Observable[relayer.SessionTree] {
// Map SessionsToClaim observable to a new observable of the same type which
// is notified when the session is eligible to be claimed.
// relayer.SessionTree ==> relayer.SessionTree
// Map sessionsToClaimObs to a new observable of the same type which is notified
// when the session is eligible to be claimed.
sessionsWithOpenClaimWindowObs := channel.Map(
ctx, rs.sessionsToClaim,
ctx, rs.sessionsToClaimObs,
rs.mapWaitForEarliestCreateClaimHeight,
)

failedCreateClaimSessionsObs, failedCreateClaimSessionsPublishCh :=
channel.NewObservable[relayer.SessionTree]()

// Map sessionsWithOpenClaimWindow to a new observable of an either type,
// Map sessionsWithOpenClaimWindowObs to a new observable of an either type,
// populated with the session or an error, which is notified after the session
// claim has been created or an error has been encountered, respectively.
eitherClaimedSessionsObs := channel.Map(
ctx, sessionsWithOpenClaimWindow,
ctx, sessionsWithOpenClaimWindowObs,
rs.newMapClaimSessionFn(failedCreateClaimSessionsPublishCh),
)

// TODO_TECHDEBT: pass failed create claim sessions to some retry mechanism.
_ = failedCreateClaimSessions
logging.LogErrors(ctx, filter.EitherError(ctx, eitherClaimedSessions))
_ = failedCreateClaimSessionsObs
logging.LogErrors(ctx, filter.EitherError(ctx, eitherClaimedSessionsObs))

// Map eitherClaimedSessions to a new observable of relayer.SessionTree which
// is notified when the corresponding claim creation succeeded.
return filter.EitherSuccess(ctx, eitherClaimedSessions)
return filter.EitherSuccess(ctx, eitherClaimedSessionsObs)
}

// mapWaitForEarliestCreateClaimHeight is intended to be used as a MapFn. It
Expand Down
18 changes: 9 additions & 9 deletions pkg/relayer/session/proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,27 @@ func (rs *relayerSessionsManager) submitProofs(
ctx context.Context,
claimedSessionsObs observable.Observable[relayer.SessionTree],
) {
// Map claimedSessions to a new observable of the same type which is notified
// Map claimedSessionsObs to a new observable of the same type which is notified
// when the session is eligible to be proven.
sessionsWithOpenProofWindowObs := channel.Map(
ctx, claimedSessions,
ctx, claimedSessionsObs,
rs.mapWaitForEarliestSubmitProofHeight,
)

failedSubmitProofSessionsObs, failedSubmitProveSessionsPublishCh :=
failedSubmitProofSessionsObs, failedSubmitProofSessionsPublishCh :=
channel.NewObservable[relayer.SessionTree]()

// Map sessionsWithOpenProofWindow to a new observable of an either type,
// populated with the session or an error, which is notified after the session
// proof has been submitted or an error has been encountered, respectively.
eitherProvenSessionsObs := channel.Map(
ctx, sessionsWithOpenProofWindow,
rs.newMapProveSessionFn(failedSubmitProveSessionsPublishCh),
ctx, sessionsWithOpenProofWindowObs,
rs.newMapProveSessionFn(failedSubmitProofSessionsPublishCh),
)

// TODO_TECHDEBT: pass failed submit proof sessions to some retry mechanism.
_ = failedSubmitProofSessions
logging.LogErrors(ctx, filter.EitherError(ctx, eitherProvenSessions))
_ = failedSubmitProofSessionsObs
logging.LogErrors(ctx, filter.EitherError(ctx, eitherProvenSessionsObs))
}

// mapWaitForEarliestSubmitProofHeight is intended to be used as a MapFn. It
Expand Down Expand Up @@ -91,7 +91,7 @@ func (rs *relayerSessionsManager) newMapProveSessionFn(
ctx context.Context,
session relayer.SessionTree,
) (_ either.SessionTree, skip bool) {
// TODO_BLOCKER: The block that'll be used as a source of entropy for which
// TODO_BLOCKER: The block that'll be used as a source of entropy for which
// branch(es) to prove should be deterministic and use on-chain governance params
// rather than latest.
latestBlock := rs.blockClient.LatestBlock(ctx)
Expand All @@ -107,7 +107,7 @@ func (rs *relayerSessionsManager) newMapProveSessionFn(
*session.GetSessionHeader(),
proof,
); err != nil {
failedSubmitProofSessions <- session
failedSubmitProofSessionsCh <- session
return either.Error[relayer.SessionTree](err), false
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/relayer/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ type sessionsTreesMap = map[int64]map[string]relayer.SessionTree
type relayerSessionsManager struct {
relayObs observable.Observable[*relayer.MinedRelay]

// sessionsToClaim notifies about sessions that are ready to be claimed.
sessionsToClaim observable.Observable[relayer.SessionTree]
// sessionsToClaimObs notifies about sessions that are ready to be claimed.
sessionsToClaimObs observable.Observable[relayer.SessionTree]

// sessionTrees is a map of block heights pointing to a map of SessionTrees
// indexed by their sessionId.
Expand Down Expand Up @@ -70,7 +70,7 @@ func NewRelayerSessions(
return nil, err
}

rs.sessionsToClaim = channel.MapExpand[client.Block, relayer.SessionTree](
rs.sessionsToClaimObs = channel.MapExpand[client.Block, relayer.SessionTree](
ctx,
rs.blockClient.CommittedBlocksSequence(ctx),
rs.mapBlockToSessionsToClaim,
Expand All @@ -88,14 +88,14 @@ func (rs *relayerSessionsManager) Start(ctx context.Context) {
// notified if an error was encountered while attampting to add the relay to
// the session tree.
miningErrorsObs := channel.Map(ctx, rs.relayObs, rs.mapAddRelayToSessionTree)
logging.LogErrors(ctx, miningErrors)
logging.LogErrors(ctx, miningErrorsObs)

// Start claim/proof pipeline.
claimedSessionsObs := rs.createClaims(ctx)
rs.submitProofs(ctx, claimedSessionsObservable)
rs.submitProofs(ctx, claimedSessionsObs)
}

// Stop unsubscribes all observables from the RelaysToInclude observable which
// Stop unsubscribes all observables from the InsertRelays observable which
// will close downstream observables as they drain.
//
// TODO_TECHDEBT: Either add a mechanism to wait for draining to complete
Expand All @@ -106,7 +106,7 @@ func (rs *relayerSessionsManager) Stop() {
}

// SessionsToClaim returns an observable that notifies when sessions are ready to be claimed.
func (rs *relayerSessionsManager) IncludeRelays(relays observable.Observable[*relayer.MinedRelay]) {
func (rs *relayerSessionsManager) InsertRelays(relays observable.Observable[*relayer.MinedRelay]) {
rs.relayObs = relays
}

Expand Down

0 comments on commit 55c8118

Please sign in to comment.