Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Miner] feat: add Miner component #168

Merged
merged 35 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
29b9bdf
refactor: `MapFn`s receive context arg
bryanchriswhite Nov 9, 2023
b6c9c71
chore: add `ForEach` map shorthand operator
bryanchriswhite Nov 9, 2023
f257b46
chore: add `/pkg/observable/filter`
bryanchriswhite Nov 9, 2023
194cee5
chore: add `/pkg/observable/logging`
bryanchriswhite Nov 9, 2023
cca55c2
chore: add `/pkg/relayer/protocol`
bryanchriswhite Nov 9, 2023
c666542
chore: add `Miner` interface
bryanchriswhite Nov 9, 2023
ea8d848
feat: add `Miner` implementation
bryanchriswhite Nov 9, 2023
38329bb
test: `Miner` implementation
bryanchriswhite Nov 9, 2023
d2f9cb4
chore: fix comment
bryanchriswhite Nov 9, 2023
d502b75
chore: add godoc comments
bryanchriswhite Nov 9, 2023
f85a381
Merge remote-tracking branch 'pokt/main' into issues/13/feat/miner
bryanchriswhite Nov 9, 2023
f9e1cbc
[Test] First step for automated E2E Relay test (#167)
Olshansk Nov 9, 2023
0e72490
[Relayer] refactor: simplify `RelayerSessionsManager` (#169)
bryanchriswhite Nov 9, 2023
107f6dd
chore: review feedback improvements
bryanchriswhite Nov 9, 2023
ccad087
chore: review feedback improvements
bryanchriswhite Nov 9, 2023
8737024
Merge remote-tracking branch 'pokt/main' into issues/13/feat/miner
bryanchriswhite Nov 9, 2023
85a49b7
fix: import cycle & goimports
bryanchriswhite Nov 9, 2023
0788e1d
chore: review feedback improvements
bryanchriswhite Nov 9, 2023
e6a558b
chore: cleanup TODO_THIS_COMMIT comments
bryanchriswhite Nov 9, 2023
d04ea2a
Merge branch 'main' into issues/13/feat/miner
bryanchriswhite Nov 9, 2023
9c75b2c
chore: improve var & func names for clarity and consistency
bryanchriswhite Nov 10, 2023
286e1b2
Merge remote-tracking branch 'pokt/main' into issues/13/feat/miner
bryanchriswhite Nov 10, 2023
a0ffe9d
refactor: move claim/proof lifecycle concerns to `relayerSessionsMana…
bryanchriswhite Nov 10, 2023
2019637
chore: review feedback improvements
bryanchriswhite Nov 10, 2023
709f661
chore: review feedback improvements
bryanchriswhite Nov 10, 2023
394575b
refactor: `miner#hash()` method
bryanchriswhite Nov 10, 2023
1eae9d2
chore: tidy up
bryanchriswhite Nov 10, 2023
efb8a4e
chore: simplify
bryanchriswhite Nov 10, 2023
3b2022a
chore: review feedback improvements
bryanchriswhite Nov 10, 2023
967dddc
chore: review feedback improvements
bryanchriswhite Nov 10, 2023
f9a6fb2
chore: review feedback improvements
bryanchriswhite Nov 10, 2023
7d23bd4
chore: review feedback improvements
bryanchriswhite Nov 10, 2023
916c0dd
chore: review feedback improvements
bryanchriswhite Nov 10, 2023
55c8118
fix: incomplete refactor
bryanchriswhite Nov 10, 2023
c1784f5
chore: simplify
bryanchriswhite Nov 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/either/types.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package either

import "github.com/pokt-network/poktroll/pkg/relayer"

type (
// AsyncError represents a value which could either be a synchronous error or
// an asynchronous error (sent through a channel). It wraps the more generic
// `Either` type specific for error channels.
AsyncError Either[chan error]
Bytes = Either[[]byte]
AsyncError Either[chan error]
Bytes = Either[[]byte]
SessionTree = Either[relayer.SessionTree]
)
5 changes: 5 additions & 0 deletions pkg/observable/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package observable

type (
Error = Observable[error]
)
50 changes: 38 additions & 12 deletions pkg/relayer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,23 @@ import (

"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/x/service/types"
servicetypes "github.com/pokt-network/poktroll/x/service/types"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

// Miner is responsible for observing servedRelayObs, hashing and checking the
// difficulty of each, finally publishing those with sufficient difficulty to
// minedRelayObs as they are applicable for relay volume.
type Miner interface {
MinedRelays(
ctx context.Context,
servedRelayObs observable.Observable[*servicetypes.Relay],
) (minedRelaysObs observable.Observable[*MinedRelay])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UNRELATED OPTION NIT: Thoughts on adding a TODO_TECHDEBT: Add the Obs suffix to all Observables in the codebase.

Might be worthing putting it in observable.go if you also agree that it's the right pattern to follow.

}

type MinerOption func(Miner)

// RelayerProxy is the interface for the proxy that serves relays to the application.
// It is responsible for starting and stopping all supported RelayServers.
// While handling requests and responding in a closed loop, it also notifies
Expand Down Expand Up @@ -59,19 +72,32 @@ type RelayServer interface {
Service() *sharedtypes.Service
}

// RelayerSessionsManager is an interface for managing the relayer's sessions and Sparse
// Merkle Sum Trees (SMSTs). It provides notifications about closing sessions that are
// ready to be claimed, and handles the creation and retrieval of SMSTs for a given session.
// It also handles the creation and retrieval of SMSTs for a given session.
// RelayerSessionsManager is responsible for managing the relayer's session lifecycles.
// It handles the creation and retrieval of SMSTs (trees) for a given session, as
// well as the respective and subsequent claim creation and proof submission.
// This is largely accomplished by pipelining observables of relays and sessions
// through a series of map operations.
//
// TODO_TECHDEBT: add architecture diagrams covering observable flows throughout
// the relayer package.
type RelayerSessionsManager interface {
// SessionsToClaim returns an observable that notifies of sessions ready to be claimed.
SessionsToClaim() observable.Observable[SessionTree]

// EnsureSessionTree returns the SMST (Sparse Merkle State Tree) for a given session header.
// It is used to retrieve the SMST and update it when a Relay has been successfully served.
// If the session is seen for the first time, it creates a new SMST for it before returning it.
// An error is returned if the corresponding KVStore for SMST fails to be created.
EnsureSessionTree(sessionHeader *sessiontypes.SessionHeader) (SessionTree, error)
// InsertRelays receives an observable of relays that should be included
// in their respective session's SMST (tree).
InsertRelays(minedRelaysObs observable.Observable[*MinedRelay])

// Start iterates over the session trees at the end of each, respective, session.
// The session trees are piped through a series of map operations which progress
// them through the claim/proof lifecycle, broadcasting transactions to the
// network as necessary.
Start(ctx context.Context)

// Stop unsubscribes all observables from the InsertRelays observable which
// will close downstream observables as they drain.
//
// TODO_TECHDEBT: Either add a mechanism to wait for draining to complete
// and/or ensure that the state at each pipeline stage is persisted to disk
// and exit as early as possible.
Stop()
}

type RelayerSessionsManagerOption func(RelayerSessionsManager)
Expand Down
122 changes: 122 additions & 0 deletions pkg/relayer/miner/miner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package miner

import (
"context"
"crypto/sha256"
"hash"

"github.com/pokt-network/poktroll/pkg/either"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/pkg/observable/filter"
"github.com/pokt-network/poktroll/pkg/observable/logging"
"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/pkg/relayer/protocol"
servicetypes "github.com/pokt-network/poktroll/x/service/types"
)

var (
_ relayer.Miner = (*miner)(nil)
defaultRelayHasher = sha256.New
// TODO_BLOCKER: query on-chain governance params once available.
// Setting this to 0 to effectively disables mining for now.
// I.e., all relays are added to the tree.
defaultRelayDifficulty = 0
)

// Miner is responsible for observing servedRelayObs, hashing and checking the
// difficulty of each, finally publishing those with sufficient difficulty to
// minedRelayObs as they are applicable for relay volume.
//
// TODO_BLOCKER: The relay hashing and relay difficulty mechanisms & values must come
type miner struct {
// relayHasher is a function which returns a hash.Hash interfact type. It is
// used to hash serialized relays to measure their mining difficulty.
relayHasher func() hash.Hash
// relayDifficulty is the minimum difficulty that a relay must have to be
// volume / reward applicable.
relayDifficulty int
}
Olshansk marked this conversation as resolved.
Show resolved Hide resolved

// NewMiner creates a new miner from the given dependencies and options. It
// returns an error if it has not been sufficiently configured or supplied.
func NewMiner(
opts ...relayer.MinerOption,
) (*miner, error) {
mnr := &miner{}

for _, opt := range opts {
opt(mnr)
}

mnr.setDefaults()

return mnr, nil
}

// MinedRelays maps servedRelaysObs through a pipeline which:
// 1. Hashes the relay
// 2. Checks if it's above the mining difficulty
// 3. Adds it to the session tree if so
// It DOES NOT BLOCK as map operations run in their own goroutines.
func (mnr *miner) MinedRelays(
ctx context.Context,
servedRelaysObs observable.Observable[*servicetypes.Relay],
) observable.Observable[*relayer.MinedRelay] {
// Map servedRelaysObs to a new observable of an either type, populated with
// the minedRelay or an error. It is notified after the relay has been mined
// or an error has been encountered, respectively.
eitherMinedRelaysObs := channel.Map(ctx, servedRelaysObs, mnr.mapMineRelay)
logging.LogErrors(ctx, filter.EitherError(ctx, eitherMinedRelaysObs))

return filter.EitherSuccess(ctx, eitherMinedRelaysObs)
}

// setDefaults ensures that the miner has been configured with a hasherConstructor and uses
// the default hasherConstructor if not.
func (mnr *miner) setDefaults() {
if mnr.relayHasher == nil {
mnr.relayHasher = defaultRelayHasher
}
}

// mapMineRelay is intended to be used as a MapFn.
// 1. It hashes the relay and compares its difficult to the minimum threshold.
// 2. If the relay difficulty is sufficient -> return an Either[MineRelay Value]
// 3. If an error is encountered -> return an Either[error]
// 4. Otherwise, skip the relay.
func (mnr *miner) mapMineRelay(
_ context.Context,
relay *servicetypes.Relay,
) (_ either.Either[*relayer.MinedRelay], skip bool) {
relayBz, err := relay.Marshal()
if err != nil {
return either.Error[*relayer.MinedRelay](err), false
}

// TODO_BLOCKER: Centralize the logic of hashing a relay. It should live
// alongside signing & verification.
//
// TODO_IMPROVE: We need to hash the key; it would be nice if smst.Update() could do it
// since smst has a reference to the hasherConstructor
relayHash := mnr.hash(relayBz)

// The relay IS NOT volume / reward applicable
if !protocol.BytesDifficultyGreaterThan(relayHash, defaultRelayDifficulty) {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
return either.Success[*relayer.MinedRelay](nil), true
}

// The relay IS volume / reward applicable
return either.Success(&relayer.MinedRelay{
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
Relay: *relay,
Bytes: relayBz,
Hash: relayHash,
}), false
}

// hash constructs a new hasher and hashes the given input bytes.
func (mnr *miner) hash(inputBz []byte) []byte {
hasher := mnr.relayHasher()
hasher.Write(inputBz)
return hasher.Sum(nil)
}
10 changes: 10 additions & 0 deletions pkg/relayer/miner/miner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package miner_test

import (
"testing"
)

// TODO_TECHDEBT(@bryanchriswhite): add all the test coverage...
func TestNewMiner(t *testing.T) {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
t.Skip("TODO_TECHDEBT(@bryanchriswhite): add all the test coverage...")
}
45 changes: 45 additions & 0 deletions pkg/relayer/protocol/block_heights.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package protocol

import (
"encoding/binary"
"log"
"math/rand"

"github.com/pokt-network/poktroll/pkg/client"
)

// GetEarliestCreateClaimHeight returns the earliest block height at which a claim
// for a session with the given createClaimWindowStartHeight can be created.
//
// TODO_TEST(@bryanchriswhite): Add test coverage and more logs
func GetEarliestCreateClaimHeight(createClaimWindowStartBlock client.Block) int64 {
createClaimWindowStartBlockHash := createClaimWindowStartBlock.Hash()
log.Printf("using createClaimWindowStartBlock %d's hash %x as randomness", createClaimWindowStartBlock.Height(), createClaimWindowStartBlockHash)
rngSeed, _ := binary.Varint(createClaimWindowStartBlockHash)
randomNumber := rand.NewSource(rngSeed).Int63()

// TODO_TECHDEBT: query the on-chain governance parameter once available.
// randCreateClaimHeightOffset := randomNumber % (claimproofparams.GovCreateClaimIntervalBlocks - claimproofparams.GovCreateClaimWindowBlocks - 1)
_ = randomNumber
randCreateClaimHeightOffset := int64(0)

return createClaimWindowStartBlock.Height() + randCreateClaimHeightOffset
}

// GetEarliestSubmitProofHeight returns the earliest block height at which a proof
// for a session with the given submitProofWindowStartHeight can be submitted.
//
// TODO_TEST(@bryanchriswhite): Add test coverage and more logs
func GetEarliestSubmitProofHeight(submitProofWindowStartBlock client.Block) int64 {
earliestSubmitProofBlockHash := submitProofWindowStartBlock.Hash()
log.Printf("using submitProofWindowStartBlock %d's hash %x as randomness", submitProofWindowStartBlock.Height(), earliestSubmitProofBlockHash)
rngSeed, _ := binary.Varint(earliestSubmitProofBlockHash)
randomNumber := rand.NewSource(rngSeed).Int63()

// TODO_TECHDEBT: query the on-chain governance parameter once available.
// randSubmitProofHeightOffset := randomNumber % (claimproofparams.GovSubmitProofIntervalBlocks - claimproofparams.GovSubmitProofWindowBlocks - 1)
_ = randomNumber
randSubmitProofHeightOffset := int64(0)

return submitProofWindowStartBlock.Height() + randSubmitProofHeightOffset
}
17 changes: 17 additions & 0 deletions pkg/relayer/protocol/difficulty.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package protocol

import (
"encoding/hex"
"strings"
)

// TODO_BLOCKER: Revisit this part of the algorithm after initial TestNet Launch.
// TODO_TEST: Add extensive tests for the core relay mining business logic.
// BytesDifficultyGreaterThan determines if the bytes exceed a certain difficulty, and it
// is used to determine if a relay is volume applicable. See the spec for more details: https://github.com/pokt-network/pocket-network-protocol
func BytesDifficultyGreaterThan(bz []byte, compDifficultyBytes int) bool {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
hexZerosPrefix := strings.Repeat("0", compDifficultyBytes*2) // 2 hex chars per byte.
hexBz := hex.EncodeToString(bz)

return strings.HasPrefix(hexBz, hexZerosPrefix)
}
Loading