Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serge/repl 11 voyager #2253

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/env/local/multi/on_chain.csv
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"media.streamMembershipLimits.88",0,2
"stream.recencyConstraints.ageSeconds",0,11
"stream.recencyConstraints.generations",0,5
"stream.replicationFactor",0,1
"stream.replicationFactor",0,5
"stream.defaultMinEventsPerSnapshot",0,100
"stream.minEventsPerSnapshot.a1",0,10
"stream.minEventsPerSnapshot.a5",0,10
Expand Down
2 changes: 1 addition & 1 deletion core/env/local/multi_ne/on_chain.csv
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"media.streamMembershipLimits.88",0,2
"stream.recencyConstraints.ageSeconds",0,11
"stream.recencyConstraints.generations",0,5
"stream.replicationFactor",0,1
"stream.replicationFactor",0,5
"stream.defaultMinEventsPerSnapshot",0,100
"stream.minEventsPerSnapshot.a1",0,10
"stream.minEventsPerSnapshot.a5",0,10
Expand Down
5 changes: 1 addition & 4 deletions core/node/base/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,7 @@ func IsRiverError(err error) bool {
}

func IsRiverErrorCode(err error, code protocol.Err) bool {
if e, ok := err.(*RiverErrorImpl); ok {
return e.Code == code
}
return false
return AsRiverError(err).Code == code
}

// IsCodeWithBases checks if the error or any of base errors match the given code.
Expand Down
28 changes: 8 additions & 20 deletions core/node/events/miniblock_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
. "github.com/towns-protocol/towns/core/node/base"
"github.com/towns-protocol/towns/core/node/logging"
. "github.com/towns-protocol/towns/core/node/protocol"
"github.com/towns-protocol/towns/core/node/shared"
)

// mbJos tracks single miniblock production attempt for a single stream.
Expand Down Expand Up @@ -167,32 +166,21 @@ func (j *mbJob) processRemoteProposals(ctx context.Context) ([]*mbProposal, *Str
}

// Check if we have enough remote proposals and return them.
if len(proposals) >= RemoteQuorumNum(len(j.remoteNodes), true) {
if len(converted) >= RemoteQuorumNum(len(j.remoteNodes), true) {
return converted, view, nil
}

// if one of the nodes returned MINIBLOCK_TOO_OLD it indicates that this node has fallen behind, sync to catch up.
if slices.ContainsFunc(errs, func(err error) bool {
return IsRiverErrorCode(err, Err_MINIBLOCK_TOO_OLD)
}) {
contractStream, err := j.cache.params.Registry.StreamRegistry.GetStream(nil, j.stream.streamId)
if err != nil {
return nil, nil, RiverError(
Err_CANNOT_CALL_CONTRACT, "mbJob.processRemoteProposals: cannot get contract stream")
}

if err := j.cache.syncStreamFromPeers(ctx, j.stream.streamId, &shared.MiniblockRef{
Hash: contractStream.LastMiniblockHash,
Num: int64(contractStream.LastMiniblockNum),
}); err != nil {
return nil, nil, err
}

return nil, nil, RiverError(Err_MINIBLOCK_TOO_OLD, "mbJob.processRemoteProposals: node out of sync")
if slices.ContainsFunc(errs, func(err error) bool { return IsRiverErrorCode(err, Err_MINIBLOCK_TOO_OLD) }) {
j.cache.submitSyncStreamTask(ctx, j.stream.streamId)
}

if len(errs) > 0 {
return nil, nil, errs[0]
return nil, nil, RiverErrorWithBases(Err_QUORUM_FAILED, "mbJob.processRemoteProposals: quorum failed", errs,
"streamId", j.stream.streamId,
"currentLastMb", view.LastBlock().Ref,
"attemptedMbNum", request.NewMiniblockNum,
)
}

return nil, nil, RiverError(Err_INTERNAL, "mbJob.processRemoteProposals: no proposals and no errors")
Expand Down
1 change: 1 addition & 0 deletions core/node/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ func (s *Stream) promoteCandidateLocked(ctx context.Context, mb *MiniblockRef) e
}

// schedulePromotionLocked should be called with a lock held.
// TODO: REPLICATION: FIX: there should be periodic check to trigger reconciliation if scheduled promotion is not acted upon.
func (s *Stream) schedulePromotionLocked(mb *MiniblockRef) error {
if len(s.local.pendingCandidates) == 0 {
if mb.Num != s.view().LastBlock().Ref.Num+1 {
Expand Down
2 changes: 1 addition & 1 deletion core/node/events/stream_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (s *StreamCache) Start(ctx context.Context) error {
si.nodesLocked.Reset(stream.Nodes, s.params.Wallet.Address)
s.cache.Store(stream.StreamId, si)
if s.params.Config.StreamReconciliation.InitialWorkerPoolSize > 0 {
s.submitSyncStreamTask(
s.submitSyncStreamTaskToPool(
ctx,
initialSyncWorkerPool,
stream.StreamId,
Expand Down
21 changes: 21 additions & 0 deletions core/node/events/stream_sync_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package events
import (
"context"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/gammazero/workerpool"

Expand All @@ -13,6 +14,13 @@ import (
)

func (s *StreamCache) submitSyncStreamTask(
ctx context.Context,
streamId StreamId,
) {
s.submitSyncStreamTaskToPool(ctx, s.onlineSyncWorkerPool, streamId, nil)
}

func (s *StreamCache) submitSyncStreamTaskToPool(
ctx context.Context,
pool *workerpool.WorkerPool,
streamId StreamId,
Expand All @@ -35,6 +43,19 @@ func (s *StreamCache) syncStreamFromPeers(
streamId StreamId,
lastMbInContract *MiniblockRef,
) error {
if lastMbInContract == nil {
contractStream, err := s.params.Registry.StreamRegistry.GetStream(&bind.CallOpts{
Context: ctx,
}, streamId)
if err != nil {
return err
}
lastMbInContract = &MiniblockRef{
Hash: contractStream.LastMiniblockHash,
Num: int64(contractStream.LastMiniblockNum),
}
}

stream, err := s.getStreamImpl(ctx, streamId, false)
if err != nil {
return err
Expand Down
2 changes: 0 additions & 2 deletions core/node/rpc/repl_multiclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,12 @@ func TestReplMcConversation(t *testing.T) {
testReplMcConversation(t, 5, 100, 10, 100)
})
t.Run("10x1000", func(t *testing.T) {
t.Skip("TODO: REPLICATION: FIX: flacky")
if testing.Short() {
t.Skip("skipping 10x1000 in short mode")
}
testReplMcConversation(t, 10, 1000, 20, 1000)
})
t.Run("30x1000", func(t *testing.T) {
t.Skip("TODO: REPLICATION: FIX: flacky")
if testing.Short() {
t.Skip("skipping 30x1000 in short mode")
}
Expand Down
50 changes: 24 additions & 26 deletions packages/sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ import {
make_UserPayload_BlockchainTransaction,
ContractReceipt,
make_MemberPayload_EncryptionAlgorithm,
MiniblockRef,
} from './types'

import debug from 'debug'
Expand Down Expand Up @@ -816,7 +817,7 @@ export class Client
userId: string | undefined,
chunkCount: number,
streamSettings?: PlainMessage<StreamSettings>,
): Promise<{ streamId: string; prevMiniblockHash: Uint8Array }> {
): Promise<{ streamId: string; prevMiniblock: MiniblockRef }> {
assert(this.userStreamId !== undefined, 'userStreamId must be set')
if (!channelId && !spaceId && !userId) {
throw Error('channelId, spaceId or userId must be set')
Expand Down Expand Up @@ -870,9 +871,9 @@ export class Client
undefined,
)

check(isDefined(streamView.prevMiniblockHash), 'prevMiniblockHash must be defined')
check(isDefined(streamView.prevMiniblock), 'prevMiniblock must be defined')

return { streamId: streamId, prevMiniblockHash: streamView.prevMiniblockHash }
return { streamId: streamId, prevMiniblock: streamView.prevMiniblock }
}

async updateChannel(
Expand Down Expand Up @@ -1745,13 +1746,13 @@ export class Client
streamId: string,
data: Uint8Array,
chunkIndex: number,
prevMiniblockHash: Uint8Array,
): Promise<{ prevMiniblockHash: Uint8Array; eventId: string }> {
prevMiniblock: MiniblockRef,
): Promise<{ prevMiniblock: MiniblockRef; eventId: string }> {
const payload = make_MediaPayload_Chunk({
data: data,
chunkIndex: chunkIndex,
})
return this.makeEventWithHashAndAddToStream(streamId, payload, prevMiniblockHash)
return this.makeEventWithHashAndAddToStream(streamId, payload, prevMiniblock)
}

async getMediaPayload(
Expand Down Expand Up @@ -2241,11 +2242,10 @@ export class Client
streamView = await this.getStream(streamId)
}
check(isDefined(streamView), `stream not found: ${streamId}`)
check(isDefined(streamView.miniblockInfo), `stream not initialized: ${streamId}`)
check(isDefined(streamView.prevMiniblockHash), `prevMiniblockHash not found: ${streamId}`)
check(isDefined(streamView.prevMiniblock), `prevMiniblockHash not found: ${streamId}`)
return {
miniblockNum: streamView.miniblockInfo.max,
miniblockHash: streamView.prevMiniblockHash,
miniblockNum: streamView.prevMiniblock.num,
miniblockHash: streamView.prevMiniblock.hash,
}
}

Expand Down Expand Up @@ -2340,15 +2340,12 @@ export class Client
const stream = this.streams.get(streamId)
assert(stream !== undefined, 'unknown stream ' + streamIdAsString(streamId))

const prevHash = stream.view.prevMiniblockHash
assert(
isDefined(prevHash),
'no prev miniblock hash for stream ' + streamIdAsString(streamId),
)
const prevMb = stream.view.prevMiniblock
assert(isDefined(prevMb), 'no prev miniblock hash for stream ' + streamIdAsString(streamId))
const { eventId, error } = await this.makeEventWithHashAndAddToStream(
streamId,
payload,
prevHash,
prevMb,
options.optional,
options.localId,
options.cleartext,
Expand All @@ -2360,16 +2357,16 @@ export class Client
async makeEventWithHashAndAddToStream(
streamId: string | Uint8Array,
payload: PlainMessage<StreamEvent>['payload'],
prevMiniblockHash: Uint8Array,
prevMiniblock: MiniblockRef,
optional?: boolean,
localId?: string,
cleartext?: Uint8Array,
tags?: PlainMessage<Tags>,
retryCount?: number,
): Promise<{ prevMiniblockHash: Uint8Array; eventId: string; error?: AddEventResponse_Error }> {
): Promise<{ prevMiniblock: MiniblockRef; eventId: string; error?: AddEventResponse_Error }> {
const streamIdStr = streamIdAsString(streamId)
check(isDefined(streamIdStr) && streamIdStr !== '', 'streamId must be defined')
const event = await makeEvent(this.signerContext, payload, prevMiniblockHash, tags)
const event = await makeEvent(this.signerContext, payload, prevMiniblock, tags) // TODO: REPLICATION: update to mb ref
const eventId = bin_toHexString(event.hash)
if (localId) {
// when we have a localId, we need to update the local event with the eventId
Expand All @@ -2393,8 +2390,9 @@ export class Client
const stream = this.streams.get(streamId)
stream?.updateLocalEvent(localId, eventId, 'sent')
}
return { prevMiniblockHash, eventId, error }
return { prevMiniblock, eventId, error }
} catch (err) {
// TODO: REPLICATION: this retry is very sketchy, also error codes could be TOO_OLD, TOO_NEW as well
// custom retry logic for addEvent
// if we send up a stale prevMiniblockHash, the server will return a BAD_PREV_MINIBLOCK_HASH
// error and include the expected hash in the error message
Expand All @@ -2405,14 +2403,14 @@ export class Client
this.logInfo('RETRYING event after BAD_PREV_MINIBLOCK_HASH response', {
syncStats: this.streams.stats(),
retryCount,
prevMiniblockHash,
prevMiniblock,
expectedHash,
})
check(isDefined(expectedHash), 'expected hash not found in error')
return await this.makeEventWithHashAndAddToStream(
streamId,
payload,
bin_fromHexString(expectedHash),
{ hash: bin_fromHexString(expectedHash), num: -1n },
optional,
isDefined(localId) ? eventId : undefined,
cleartext,
Expand All @@ -2429,9 +2427,9 @@ export class Client
}
}

async getStreamLastMiniblockHash(streamId: string | Uint8Array): Promise<Uint8Array> {
async getStreamLastMiniblockRef(streamId: string | Uint8Array): Promise<MiniblockRef> {
const r = await this.rpcClient.getLastMiniblockHash({ streamId: streamIdAsBytes(streamId) })
return r.hash
return { hash: r.hash, num: r.miniblockNum }
}

private async initCrypto(opts?: EncryptionDeviceInitOpts): Promise<void> {
Expand Down Expand Up @@ -2610,7 +2608,7 @@ export class Client
return
}
const toStreamId: string = makeUserInboxStreamId(userId)
const miniblockHash = await this.getStreamLastMiniblockHash(toStreamId)
const miniblockRef = await this.getStreamLastMiniblockRef(toStreamId)
this.logCall("encryptAndShareGroupSessions: sent to user's devices", {
toStreamId,
deviceKeys: deviceKeys.map((d) => d.deviceKey).join(','),
Expand All @@ -2624,7 +2622,7 @@ export class Client
ciphertexts: ciphertext,
algorithm: algorithm,
}),
miniblockHash,
miniblockRef,
)
} catch (error) {
this.logError('encryptAndShareGroupSessions: ERROR', error)
Expand Down
Loading
Loading