diff --git a/core/env/local/multi/on_chain.csv b/core/env/local/multi/on_chain.csv index ad0faffa75..e388b6f349 100644 --- a/core/env/local/multi/on_chain.csv +++ b/core/env/local/multi/on_chain.csv @@ -4,7 +4,7 @@ "media.streamMembershipLimits.88",0,2 "stream.recencyConstraints.ageSeconds",0,11 "stream.recencyConstraints.generations",0,5 -"stream.replicationFactor",0,1 +"stream.replicationFactor",0,5 "stream.defaultMinEventsPerSnapshot",0,100 "stream.minEventsPerSnapshot.a1",0,10 "stream.minEventsPerSnapshot.a5",0,10 diff --git a/core/env/local/multi_ne/on_chain.csv b/core/env/local/multi_ne/on_chain.csv index ad0faffa75..e388b6f349 100644 --- a/core/env/local/multi_ne/on_chain.csv +++ b/core/env/local/multi_ne/on_chain.csv @@ -4,7 +4,7 @@ "media.streamMembershipLimits.88",0,2 "stream.recencyConstraints.ageSeconds",0,11 "stream.recencyConstraints.generations",0,5 -"stream.replicationFactor",0,1 +"stream.replicationFactor",0,5 "stream.defaultMinEventsPerSnapshot",0,100 "stream.minEventsPerSnapshot.a1",0,10 "stream.minEventsPerSnapshot.a5",0,10 diff --git a/core/node/base/error.go b/core/node/base/error.go index 378edb5ad1..dafdf68ece 100644 --- a/core/node/base/error.go +++ b/core/node/base/error.go @@ -234,10 +234,7 @@ func IsRiverError(err error) bool { } func IsRiverErrorCode(err error, code protocol.Err) bool { - if e, ok := err.(*RiverErrorImpl); ok { - return e.Code == code - } - return false + return AsRiverError(err).Code == code } // IsCodeWithBases checks if the error or any of base errors match the given code. diff --git a/core/node/events/miniblock_job.go b/core/node/events/miniblock_job.go index 1e2ad45bd9..bf874e7dce 100644 --- a/core/node/events/miniblock_job.go +++ b/core/node/events/miniblock_job.go @@ -10,7 +10,6 @@ import ( . "github.com/towns-protocol/towns/core/node/base" "github.com/towns-protocol/towns/core/node/logging" . "github.com/towns-protocol/towns/core/node/protocol" - "github.com/towns-protocol/towns/core/node/shared" ) // mbJos tracks single miniblock production attempt for a single stream. @@ -167,32 +166,21 @@ func (j *mbJob) processRemoteProposals(ctx context.Context) ([]*mbProposal, *Str } // Check if we have enough remote proposals and return them. - if len(proposals) >= RemoteQuorumNum(len(j.remoteNodes), true) { + if len(converted) >= RemoteQuorumNum(len(j.remoteNodes), true) { return converted, view, nil } // if one of the nodes returned MINIBLOCK_TOO_OLD it indicates that this node has fallen behind, sync to catch up. - if slices.ContainsFunc(errs, func(err error) bool { - return IsRiverErrorCode(err, Err_MINIBLOCK_TOO_OLD) - }) { - contractStream, err := j.cache.params.Registry.StreamRegistry.GetStream(nil, j.stream.streamId) - if err != nil { - return nil, nil, RiverError( - Err_CANNOT_CALL_CONTRACT, "mbJob.processRemoteProposals: cannot get contract stream") - } - - if err := j.cache.syncStreamFromPeers(ctx, j.stream.streamId, &shared.MiniblockRef{ - Hash: contractStream.LastMiniblockHash, - Num: int64(contractStream.LastMiniblockNum), - }); err != nil { - return nil, nil, err - } - - return nil, nil, RiverError(Err_MINIBLOCK_TOO_OLD, "mbJob.processRemoteProposals: node out of sync") + if slices.ContainsFunc(errs, func(err error) bool { return IsRiverErrorCode(err, Err_MINIBLOCK_TOO_OLD) }) { + j.cache.submitSyncStreamTask(ctx, j.stream.streamId) } if len(errs) > 0 { - return nil, nil, errs[0] + return nil, nil, RiverErrorWithBases(Err_QUORUM_FAILED, "mbJob.processRemoteProposals: quorum failed", errs, + "streamId", j.stream.streamId, + "currentLastMb", view.LastBlock().Ref, + "attemptedMbNum", request.NewMiniblockNum, + ) } return nil, nil, RiverError(Err_INTERNAL, "mbJob.processRemoteProposals: no proposals and no errors") diff --git a/core/node/events/stream.go b/core/node/events/stream.go index 7d60bf7f36..4fcb419ff9 100644 --- a/core/node/events/stream.go +++ b/core/node/events/stream.go @@ -386,6 +386,7 @@ func (s *Stream) promoteCandidateLocked(ctx context.Context, mb *MiniblockRef) e } // schedulePromotionLocked should be called with a lock held. +// TODO: REPLICATION: FIX: there should be periodic check to trigger reconciliation if scheduled promotion is not acted upon. func (s *Stream) schedulePromotionLocked(mb *MiniblockRef) error { if len(s.local.pendingCandidates) == 0 { if mb.Num != s.view().LastBlock().Ref.Num+1 { diff --git a/core/node/events/stream_cache.go b/core/node/events/stream_cache.go index 782708fa61..6adfb50be1 100644 --- a/core/node/events/stream_cache.go +++ b/core/node/events/stream_cache.go @@ -128,7 +128,7 @@ func (s *StreamCache) Start(ctx context.Context) error { si.nodesLocked.Reset(stream.Nodes, s.params.Wallet.Address) s.cache.Store(stream.StreamId, si) if s.params.Config.StreamReconciliation.InitialWorkerPoolSize > 0 { - s.submitSyncStreamTask( + s.submitSyncStreamTaskToPool( ctx, initialSyncWorkerPool, stream.StreamId, diff --git a/core/node/events/stream_sync_task.go b/core/node/events/stream_sync_task.go index 1d8bddda71..0eff02e1b2 100644 --- a/core/node/events/stream_sync_task.go +++ b/core/node/events/stream_sync_task.go @@ -3,6 +3,7 @@ package events import ( "context" + "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/gammazero/workerpool" @@ -13,6 +14,13 @@ import ( ) func (s *StreamCache) submitSyncStreamTask( + ctx context.Context, + streamId StreamId, +) { + s.submitSyncStreamTaskToPool(ctx, s.onlineSyncWorkerPool, streamId, nil) +} + +func (s *StreamCache) submitSyncStreamTaskToPool( ctx context.Context, pool *workerpool.WorkerPool, streamId StreamId, @@ -35,6 +43,19 @@ func (s *StreamCache) syncStreamFromPeers( streamId StreamId, lastMbInContract *MiniblockRef, ) error { + if lastMbInContract == nil { + contractStream, err := s.params.Registry.StreamRegistry.GetStream(&bind.CallOpts{ + Context: ctx, + }, streamId) + if err != nil { + return err + } + lastMbInContract = &MiniblockRef{ + Hash: contractStream.LastMiniblockHash, + Num: int64(contractStream.LastMiniblockNum), + } + } + stream, err := s.getStreamImpl(ctx, streamId, false) if err != nil { return err diff --git a/core/node/rpc/repl_multiclient_test.go b/core/node/rpc/repl_multiclient_test.go index cbeb5a8082..e25797909d 100644 --- a/core/node/rpc/repl_multiclient_test.go +++ b/core/node/rpc/repl_multiclient_test.go @@ -125,14 +125,12 @@ func TestReplMcConversation(t *testing.T) { testReplMcConversation(t, 5, 100, 10, 100) }) t.Run("10x1000", func(t *testing.T) { - t.Skip("TODO: REPLICATION: FIX: flacky") if testing.Short() { t.Skip("skipping 10x1000 in short mode") } testReplMcConversation(t, 10, 1000, 20, 1000) }) t.Run("30x1000", func(t *testing.T) { - t.Skip("TODO: REPLICATION: FIX: flacky") if testing.Short() { t.Skip("skipping 30x1000 in short mode") } diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 804969ed8e..08aae51901 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -138,6 +138,7 @@ import { make_UserPayload_BlockchainTransaction, ContractReceipt, make_MemberPayload_EncryptionAlgorithm, + MiniblockRef, } from './types' import debug from 'debug' @@ -816,7 +817,7 @@ export class Client userId: string | undefined, chunkCount: number, streamSettings?: PlainMessage, - ): Promise<{ streamId: string; prevMiniblockHash: Uint8Array }> { + ): Promise<{ streamId: string; prevMiniblock: MiniblockRef }> { assert(this.userStreamId !== undefined, 'userStreamId must be set') if (!channelId && !spaceId && !userId) { throw Error('channelId, spaceId or userId must be set') @@ -870,9 +871,9 @@ export class Client undefined, ) - check(isDefined(streamView.prevMiniblockHash), 'prevMiniblockHash must be defined') + check(isDefined(streamView.prevMiniblock), 'prevMiniblock must be defined') - return { streamId: streamId, prevMiniblockHash: streamView.prevMiniblockHash } + return { streamId: streamId, prevMiniblock: streamView.prevMiniblock } } async updateChannel( @@ -1745,13 +1746,13 @@ export class Client streamId: string, data: Uint8Array, chunkIndex: number, - prevMiniblockHash: Uint8Array, - ): Promise<{ prevMiniblockHash: Uint8Array; eventId: string }> { + prevMiniblock: MiniblockRef, + ): Promise<{ prevMiniblock: MiniblockRef; eventId: string }> { const payload = make_MediaPayload_Chunk({ data: data, chunkIndex: chunkIndex, }) - return this.makeEventWithHashAndAddToStream(streamId, payload, prevMiniblockHash) + return this.makeEventWithHashAndAddToStream(streamId, payload, prevMiniblock) } async getMediaPayload( @@ -2241,11 +2242,10 @@ export class Client streamView = await this.getStream(streamId) } check(isDefined(streamView), `stream not found: ${streamId}`) - check(isDefined(streamView.miniblockInfo), `stream not initialized: ${streamId}`) - check(isDefined(streamView.prevMiniblockHash), `prevMiniblockHash not found: ${streamId}`) + check(isDefined(streamView.prevMiniblock), `prevMiniblockHash not found: ${streamId}`) return { - miniblockNum: streamView.miniblockInfo.max, - miniblockHash: streamView.prevMiniblockHash, + miniblockNum: streamView.prevMiniblock.num, + miniblockHash: streamView.prevMiniblock.hash, } } @@ -2340,15 +2340,12 @@ export class Client const stream = this.streams.get(streamId) assert(stream !== undefined, 'unknown stream ' + streamIdAsString(streamId)) - const prevHash = stream.view.prevMiniblockHash - assert( - isDefined(prevHash), - 'no prev miniblock hash for stream ' + streamIdAsString(streamId), - ) + const prevMb = stream.view.prevMiniblock + assert(isDefined(prevMb), 'no prev miniblock hash for stream ' + streamIdAsString(streamId)) const { eventId, error } = await this.makeEventWithHashAndAddToStream( streamId, payload, - prevHash, + prevMb, options.optional, options.localId, options.cleartext, @@ -2360,16 +2357,16 @@ export class Client async makeEventWithHashAndAddToStream( streamId: string | Uint8Array, payload: PlainMessage['payload'], - prevMiniblockHash: Uint8Array, + prevMiniblock: MiniblockRef, optional?: boolean, localId?: string, cleartext?: Uint8Array, tags?: PlainMessage, retryCount?: number, - ): Promise<{ prevMiniblockHash: Uint8Array; eventId: string; error?: AddEventResponse_Error }> { + ): Promise<{ prevMiniblock: MiniblockRef; eventId: string; error?: AddEventResponse_Error }> { const streamIdStr = streamIdAsString(streamId) check(isDefined(streamIdStr) && streamIdStr !== '', 'streamId must be defined') - const event = await makeEvent(this.signerContext, payload, prevMiniblockHash, tags) + const event = await makeEvent(this.signerContext, payload, prevMiniblock, tags) // TODO: REPLICATION: update to mb ref const eventId = bin_toHexString(event.hash) if (localId) { // when we have a localId, we need to update the local event with the eventId @@ -2393,8 +2390,9 @@ export class Client const stream = this.streams.get(streamId) stream?.updateLocalEvent(localId, eventId, 'sent') } - return { prevMiniblockHash, eventId, error } + return { prevMiniblock, eventId, error } } catch (err) { + // TODO: REPLICATION: this retry is very sketchy, also error codes could be TOO_OLD, TOO_NEW as well // custom retry logic for addEvent // if we send up a stale prevMiniblockHash, the server will return a BAD_PREV_MINIBLOCK_HASH // error and include the expected hash in the error message @@ -2405,14 +2403,14 @@ export class Client this.logInfo('RETRYING event after BAD_PREV_MINIBLOCK_HASH response', { syncStats: this.streams.stats(), retryCount, - prevMiniblockHash, + prevMiniblock, expectedHash, }) check(isDefined(expectedHash), 'expected hash not found in error') return await this.makeEventWithHashAndAddToStream( streamId, payload, - bin_fromHexString(expectedHash), + { hash: bin_fromHexString(expectedHash), num: -1n }, optional, isDefined(localId) ? eventId : undefined, cleartext, @@ -2429,9 +2427,9 @@ export class Client } } - async getStreamLastMiniblockHash(streamId: string | Uint8Array): Promise { + async getStreamLastMiniblockRef(streamId: string | Uint8Array): Promise { const r = await this.rpcClient.getLastMiniblockHash({ streamId: streamIdAsBytes(streamId) }) - return r.hash + return { hash: r.hash, num: r.miniblockNum } } private async initCrypto(opts?: EncryptionDeviceInitOpts): Promise { @@ -2610,7 +2608,7 @@ export class Client return } const toStreamId: string = makeUserInboxStreamId(userId) - const miniblockHash = await this.getStreamLastMiniblockHash(toStreamId) + const miniblockRef = await this.getStreamLastMiniblockRef(toStreamId) this.logCall("encryptAndShareGroupSessions: sent to user's devices", { toStreamId, deviceKeys: deviceKeys.map((d) => d.deviceKey).join(','), @@ -2624,7 +2622,7 @@ export class Client ciphertexts: ciphertext, algorithm: algorithm, }), - miniblockHash, + miniblockRef, ) } catch (error) { this.logError('encryptAndShareGroupSessions: ERROR', error) diff --git a/packages/sdk/src/sign.ts b/packages/sdk/src/sign.ts index e85a6f5d1c..d14f706d5d 100644 --- a/packages/sdk/src/sign.ts +++ b/packages/sdk/src/sign.ts @@ -20,7 +20,13 @@ import { import { assertBytes } from 'ethereum-cryptography/utils' import { recoverPublicKey, signSync, verify } from 'ethereum-cryptography/secp256k1' import { genIdBlob, streamIdAsBytes, streamIdAsString, userIdFromAddress } from './id' -import { ParsedEvent, ParsedMiniblock, ParsedStreamAndCookie, ParsedStreamResponse } from './types' +import { + MiniblockRef, + ParsedEvent, + ParsedMiniblock, + ParsedStreamAndCookie, + ParsedStreamResponse, +} from './types' import { SignerContext, checkDelegateSig } from './signerContext' import { keccak256 } from 'ethereum-cryptography/keccak' import { createHash } from 'crypto' @@ -41,13 +47,23 @@ export interface UnpackEnvelopeOpts { export const _impl_makeEvent_impl_ = async ( context: SignerContext, payload: PlainMessage['payload'], - prevMiniblockHash?: Uint8Array, + prevMiniblock?: MiniblockRef, tags?: PlainMessage, ): Promise => { + let prevMiniblockHash: Uint8Array | undefined + let prevMiniblockNum: bigint = 0n + if (isDefined(prevMiniblock)) { + prevMiniblockHash = prevMiniblock.hash + if (prevMiniblock.num >= 0n) { + prevMiniblockNum = prevMiniblock.num + } + } + const streamEvent = new StreamEvent({ creatorAddress: context.creatorAddress, salt: genIdBlob(), prevMiniblockHash, + prevMiniblockNum, payload, createdAtEpochMs: BigInt(Date.now()), tags, @@ -64,10 +80,11 @@ export const _impl_makeEvent_impl_ = async ( return new Envelope({ hash, signature, event }) } +// TODO: REPLICATION: modify to require both hash and miniblock number instead of prevMiniblockHash export const makeEvent = async ( context: SignerContext, payload: PlainMessage['payload'], - prevMiniblockHash?: Uint8Array, + prevMiniblock?: MiniblockRef, tags?: PlainMessage, ): Promise => { // const pl: Payload = payload instanceof Payload ? payload : new Payload(payload) @@ -78,25 +95,25 @@ export const makeEvent = async ( check(isDefined(pl.value.content), "Payload content can't be empty", Err.BAD_PAYLOAD) check(isDefined(pl.value.content.case), "Payload content case can't be empty", Err.BAD_PAYLOAD) - if (prevMiniblockHash) { + if (isDefined(prevMiniblock)) { check( - prevMiniblockHash.length === 32, - `prevMiniblockHash should be 32 bytes, got ${prevMiniblockHash.length}`, + prevMiniblock.hash.length === 32, + `prevMiniblockHash should be 32 bytes, got ${prevMiniblock.hash.length}`, Err.BAD_HASH_FORMAT, ) } - return _impl_makeEvent_impl_(context, pl, prevMiniblockHash, tags) + return _impl_makeEvent_impl_(context, pl, prevMiniblock, tags) } export const makeEvents = async ( context: SignerContext, payloads: PlainMessage['payload'][], - prevMiniblockHash?: Uint8Array, + prevMiniblock?: MiniblockRef, ): Promise => { const events: Envelope[] = [] for (const payload of payloads) { - const event = await makeEvent(context, payload, prevMiniblockHash) + const event = await makeEvent(context, payload, prevMiniblock) events.push(event) } return events @@ -169,7 +186,7 @@ export const unpackStreamAndCookie = async ( // returns all events + the header event and pointer to header content export const unpackMiniblock = async ( miniblock: Miniblock, - opts: UnpackEnvelopeOpts | undefined, + opts?: UnpackEnvelopeOpts, ): Promise => { check(isDefined(miniblock.header), 'Miniblock header is not set') const header = await unpackEnvelope(miniblock.header, opts) @@ -179,7 +196,7 @@ export const unpackMiniblock = async ( ) const events = await unpackEnvelopes(miniblock.events, opts) return { - hash: miniblock.header.hash, + ref: { hash: miniblock.header.hash, num: header.event.payload.value.miniblockNum }, header: header.event.payload.value, events: [...events, header], } @@ -187,7 +204,7 @@ export const unpackMiniblock = async ( export const unpackEnvelope = async ( envelope: Envelope, - opts: UnpackEnvelopeOpts | undefined, + opts?: UnpackEnvelopeOpts, ): Promise => { check(hasElements(envelope.event), 'Event base is not set', Err.BAD_EVENT) check(hasElements(envelope.hash), 'Event hash is not set', Err.BAD_EVENT) diff --git a/packages/sdk/src/streamStateView.ts b/packages/sdk/src/streamStateView.ts index 3cba60aa81..8b2392ce19 100644 --- a/packages/sdk/src/streamStateView.ts +++ b/packages/sdk/src/streamStateView.ts @@ -14,6 +14,7 @@ import { EventSignatureBundle, LocalEventStatus, LocalTimelineEvent, + MiniblockRef, ParsedEvent, ParsedMiniblock, RemoteTimelineEvent, @@ -67,7 +68,7 @@ export interface IStreamStateView { readonly timeline: StreamTimelineEvent[] readonly events: Map isInitialized: boolean - prevMiniblockHash?: Uint8Array + prevMiniblock?: MiniblockRef lastEventNum: bigint prevSnapshotMiniblockNum: bigint miniblockInfo?: { max: bigint; min: bigint; terminusReached: boolean } @@ -101,7 +102,7 @@ export class StreamStateView implements IStreamStateView { readonly events = new Map() readonly signatures = new Map() isInitialized = false - prevMiniblockHash?: Uint8Array + prevMiniblock?: MiniblockRef lastEventNum = 0n prevSnapshotMiniblockNum: bigint miniblockInfo?: { max: bigint; min: bigint; terminusReached: boolean } @@ -388,7 +389,7 @@ export class StreamStateView implements IStreamStateView { if (this.saveSnapshots && payload.value.snapshot) { this._snapshot = payload.value.snapshot } - this.prevMiniblockHash = event.hash + this.prevMiniblock = { hash: event.hash, num: payload.value.miniblockNum } this.updateMiniblockInfo(payload.value, { max: payload.value.miniblockNum }) timelineEvent.confirmedEventNum = payload.value.eventNumOffset + BigInt(payload.value.eventHashes.length) @@ -596,7 +597,7 @@ export class StreamStateView implements IStreamStateView { check(miniblocks.length > 0, `Stream has no miniblocks ${this.streamId}`, Err.STREAM_EMPTY) // parse the blocks // initialize from snapshot data, this gets all memberships and channel data, etc - this.applySnapshot(bin_toHexString(miniblocks[0].hash), snapshot, cleartexts, emitter) + this.applySnapshot(bin_toHexString(miniblocks[0].ref.hash), snapshot, cleartexts, emitter) // initialize from miniblocks, the first minblock is the snapshot block, it's events are accounted for const block0Events = miniblocks[0].events.map((parsedEvent, i) => { const eventNum = miniblocks[0].header.eventNumOffset + BigInt(i) @@ -640,7 +641,7 @@ export class StreamStateView implements IStreamStateView { const lastBlock = miniblocks[miniblocks.length - 1] this.lastEventNum = lastBlock.header.eventNumOffset + BigInt(lastBlock.events.length) // and the prev miniblock has (if there were more than 1 miniblocks, this should already be set) - this.prevMiniblockHash = lastBlock.hash + this.prevMiniblock = lastBlock.ref // append the minipool events this.appendStreamAndCookie(nextSyncCookie, minipoolEvents, cleartexts, emitter, undefined) this.prevSnapshotMiniblockNum = prevSnapshotMiniblockNum diff --git a/packages/sdk/src/streamUtils.ts b/packages/sdk/src/streamUtils.ts index 8855bc224e..8e69654cb4 100644 --- a/packages/sdk/src/streamUtils.ts +++ b/packages/sdk/src/streamUtils.ts @@ -84,7 +84,7 @@ export function persistedMiniblockToParsedMiniblock( return undefined } return { - hash: miniblock.hash, + ref: { hash: miniblock.hash, num: miniblock.header.miniblockNum }, header: miniblock.header, events: miniblock.events.map(persistedEventToParsedEvent).filter(isDefined), } @@ -95,7 +95,7 @@ export function parsedMiniblockToPersistedMiniblock( direction: 'forward' | 'backward', ) { return new PersistedMiniblock({ - hash: miniblock.hash, + hash: miniblock.ref.hash, header: miniblock.header, events: miniblock.events .filter((event) => isPersistedEvent(event, direction)) diff --git a/packages/sdk/src/syncedStream.ts b/packages/sdk/src/syncedStream.ts index bc8dc207d2..0462d7503d 100644 --- a/packages/sdk/src/syncedStream.ts +++ b/packages/sdk/src/syncedStream.ts @@ -134,7 +134,7 @@ export class SyncedStream extends Stream implements ISyncedStream { } const miniblock: ParsedMiniblock = { - hash: hash, + ref: { hash: hash, num: miniblockHeader.miniblockNum }, header: miniblockHeader, events: [...events, miniblockEvent], } diff --git a/packages/sdk/src/tests/bob_testUtils.ts b/packages/sdk/src/tests/bob_testUtils.ts index 2f36128bde..c421a7c7c9 100644 --- a/packages/sdk/src/tests/bob_testUtils.ts +++ b/packages/sdk/src/tests/bob_testUtils.ts @@ -3,6 +3,7 @@ import { MembershipOp, SyncStreamsResponse, Envelope, SyncOp } from '@river-buil import { DLogger } from '@river-build/dlog' import { lastEventFiltered, + lastMiniblockRef, makeEvent_test, makeTestRpcClient, makeUniqueSpaceStreamId, @@ -137,7 +138,7 @@ export const bobTalksToHimself = async ( ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: 'presync', }), - channel.stream?.miniblocks.at(-1)?.header?.hash, + await lastMiniblockRef(channel), ) await bob.addEvent({ streamId: channelId, @@ -199,7 +200,7 @@ export const bobTalksToHimself = async ( ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: 'hello', }), - hashResponse.hash, + { hash: hashResponse.hash, num: hashResponse.miniblockNum }, ) await bob.addEvent({ streamId: channelId, @@ -225,7 +226,7 @@ export const bobTalksToHimself = async ( ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: 'hello', }), - Uint8Array.from([1, 2, 3]), + { hash: Uint8Array.from([1, 2, 3]), num: -1n }, ) await expect( bob.addEvent({ diff --git a/packages/sdk/src/tests/multi_ne/client.test.ts b/packages/sdk/src/tests/multi_ne/client.test.ts index 868e307e39..0c9827e3d2 100644 --- a/packages/sdk/src/tests/multi_ne/client.test.ts +++ b/packages/sdk/src/tests/multi_ne/client.test.ts @@ -195,7 +195,7 @@ describe('clientTest', () => { bobsClient.makeEventWithHashAndAddToStream( channelId, message, - Uint8Array.from(Array(32).fill(0)), // just going to throw any old thing in there... the retry should pick it up + { hash: Uint8Array.from(Array(32).fill(0)), num: 0n }, // just going to throw any old thing in there... the retry should pick it up ), ).resolves.not.toThrow() }) diff --git a/packages/sdk/src/tests/multi_ne/media.test.ts b/packages/sdk/src/tests/multi_ne/media.test.ts index 314102db80..36ad507bba 100644 --- a/packages/sdk/src/tests/multi_ne/media.test.ts +++ b/packages/sdk/src/tests/multi_ne/media.test.ts @@ -7,6 +7,7 @@ import { Client } from '../../client' import { makeUniqueChannelStreamId, makeDMStreamId } from '../../id' import { InfoRequest } from '@river-build/proto' import { deriveKeyAndIV, encryptAESGCM } from '../../crypto_utils' +import { MiniblockRef } from '../../types' describe('mediaTests', () => { let bobsClient: Client @@ -23,7 +24,7 @@ describe('mediaTests', () => { async function bobCreateMediaStream( chunkCount: number, - ): Promise<{ streamId: string; prevMiniblockHash: Uint8Array }> { + ): Promise<{ streamId: string; prevMiniblock: MiniblockRef }> { const spaceId = makeUniqueSpaceStreamId() await expect(bobsClient.createSpace(spaceId)).resolves.not.toThrow() @@ -38,17 +39,16 @@ describe('mediaTests', () => { async function bobSendMediaPayloads( streamId: string, chunks: number, - prevMiniblockHash: Uint8Array, - ): Promise { - let prevHash = prevMiniblockHash + prevMiniblock: MiniblockRef, + ): Promise { for (let i = 0; i < chunks; i++) { const chunk = new Uint8Array(100) // Create novel chunk content for testing purposes chunk.fill(i, 0, 100) - const result = await bobsClient.sendMediaPayload(streamId, chunk, i, prevHash) - prevHash = result.prevMiniblockHash + const result = await bobsClient.sendMediaPayload(streamId, chunk, i, prevMiniblock) + prevMiniblock = result.prevMiniblock } - return prevHash + return prevMiniblock } async function bobSendEncryptedMediaPayload( @@ -56,13 +56,11 @@ describe('mediaTests', () => { data: Uint8Array, key: Uint8Array, iv: Uint8Array, - prevMiniblockHash: Uint8Array, - ): Promise { - let prevHash = prevMiniblockHash + prevMiniblock: MiniblockRef, + ): Promise { const { ciphertext } = await encryptAESGCM(data, key, iv) - const result = await bobsClient.sendMediaPayload(streamId, ciphertext, 0, prevHash) - prevHash = result.prevMiniblockHash - return prevHash + const result = await bobsClient.sendMediaPayload(streamId, ciphertext, 0, prevMiniblock) + return result.prevMiniblock } function createTestMediaChunks(chunks: number): Uint8Array { @@ -78,15 +76,9 @@ describe('mediaTests', () => { async function bobCreateSpaceMediaStream( spaceId: string, chunkCount: number, - ): Promise<{ streamId: string; prevMiniblockHash: Uint8Array }> { + ): Promise<{ streamId: string; prevMiniblock: MiniblockRef }> { await expect(bobsClient.createSpace(spaceId)).resolves.not.toThrow() - const mediaInfo = await bobsClient.createMediaStream( - undefined, - spaceId, - undefined, - chunkCount, - ) - return mediaInfo + return await bobsClient.createMediaStream(undefined, spaceId, undefined, chunkCount) } test('clientCanCreateMediaStream', async () => { @@ -100,14 +92,14 @@ describe('mediaTests', () => { test('clientCanSendMediaPayload', async () => { const mediaStreamInfo = await bobCreateMediaStream(10) - await bobSendMediaPayloads(mediaStreamInfo.streamId, 10, mediaStreamInfo.prevMiniblockHash) + await bobSendMediaPayloads(mediaStreamInfo.streamId, 10, mediaStreamInfo.prevMiniblock) }) test('clientCanSendSpaceMediaPayload', async () => { const spaceId = makeUniqueSpaceStreamId() const mediaStreamInfo = await bobCreateSpaceMediaStream(spaceId, 10) await expect( - bobSendMediaPayloads(mediaStreamInfo.streamId, 10, mediaStreamInfo.prevMiniblockHash), + bobSendMediaPayloads(mediaStreamInfo.streamId, 10, mediaStreamInfo.prevMiniblock), ).resolves.not.toThrow() }) @@ -122,7 +114,7 @@ describe('mediaTests', () => { data, key, iv, - mediaStreamInfo.prevMiniblockHash, + mediaStreamInfo.prevMiniblock, ), ).resolves.not.toThrow() }) @@ -137,7 +129,7 @@ describe('mediaTests', () => { data, key, iv, - mediaStreamInfo.prevMiniblockHash, + mediaStreamInfo.prevMiniblock, ) const decryptedChunks = await bobsClient.getMediaPayload(mediaStreamInfo.streamId, key, iv) expect(decryptedChunks).toEqual(data) @@ -147,10 +139,10 @@ describe('mediaTests', () => { const result = await bobCreateMediaStream(10) const chunk = new Uint8Array(100) await expect( - bobsClient.sendMediaPayload(result.streamId, chunk, -1, result.prevMiniblockHash), + bobsClient.sendMediaPayload(result.streamId, chunk, -1, result.prevMiniblock), ).rejects.toThrow() await expect( - bobsClient.sendMediaPayload(result.streamId, chunk, 11, result.prevMiniblockHash), + bobsClient.sendMediaPayload(result.streamId, chunk, 11, result.prevMiniblock), ).rejects.toThrow() }) @@ -158,7 +150,7 @@ describe('mediaTests', () => { const result = await bobCreateMediaStream(10) const chunk = new Uint8Array(500000) await expect( - bobsClient.sendMediaPayload(result.streamId, chunk, 0, result.prevMiniblockHash), + bobsClient.sendMediaPayload(result.streamId, chunk, 0, result.prevMiniblock), ).resolves.not.toThrow() }) @@ -166,7 +158,7 @@ describe('mediaTests', () => { const result = await bobCreateMediaStream(10) const chunk = new Uint8Array(500001) await expect( - bobsClient.sendMediaPayload(result.streamId, chunk, 0, result.prevMiniblockHash), + bobsClient.sendMediaPayload(result.streamId, chunk, 0, result.prevMiniblock), ).rejects.toThrow() }) @@ -183,7 +175,7 @@ describe('mediaTests', () => { alicesClient.startSync() await expect( - alicesClient.sendMediaPayload(result.streamId, chunk, 5, result.prevMiniblockHash), + alicesClient.sendMediaPayload(result.streamId, chunk, 5, result.prevMiniblock), ).rejects.toThrow() await alicesClient.stop() }) @@ -198,7 +190,7 @@ describe('mediaTests', () => { alicesClient.startSync() await expect( - alicesClient.sendMediaPayload(result.streamId, chunk, 5, result.prevMiniblockHash), + alicesClient.sendMediaPayload(result.streamId, chunk, 5, result.prevMiniblock), ).rejects.toThrow() await alicesClient.stop() }) @@ -279,9 +271,9 @@ describe('mediaTests', () => { // This test is flaky because there is a bug in GetStreamEx where sometimes the miniblock is not // finalized before the client tries to fetch it. This is a known issue, see HNT-5291. test.skip('mediaStreamGetStreamEx', async () => { - const { streamId, prevMiniblockHash } = await bobCreateMediaStream(10) + const { streamId, prevMiniblock } = await bobCreateMediaStream(10) // Send a series of media chunks - await bobSendMediaPayloads(streamId, 10, prevMiniblockHash) + await bobSendMediaPayloads(streamId, 10, prevMiniblock) // Force server to flush minipool events into a block await bobsClient.rpcClient.info( new InfoRequest({ diff --git a/packages/sdk/src/tests/multi_ne/restart.test.ts b/packages/sdk/src/tests/multi_ne/restart.test.ts index 9c729482e0..fa3326099a 100644 --- a/packages/sdk/src/tests/multi_ne/restart.test.ts +++ b/packages/sdk/src/tests/multi_ne/restart.test.ts @@ -21,10 +21,12 @@ import { make_MemberPayload_Membership2, make_SpacePayload_Inception, make_UserPayload_Inception, + miniblockRefFromResponse, } from '../../types' import { TEST_ENCRYPTED_MESSAGE_PROPS, lastEventFiltered, + lastMiniblockRef, makeRandomUserContext, makeTestRpcClient, makeUniqueSpaceStreamId, @@ -167,8 +169,8 @@ const createNewChannelAndPostHello = async ( expect(channel).toBeDefined() expect(channel.stream).toBeDefined() expect(channel.stream?.nextSyncCookie?.streamId).toEqual(channelId) - let nextHash = channel.stream?.miniblocks.at(-1)?.header?.hash - expect(nextHash).toBeDefined() + let nextMiniblock = await lastMiniblockRef(channel) + expect(nextMiniblock).toBeDefined() // Now there must be "channel created" event in the space stream. const spaceResponse = await bob.getStream({ streamId: spacedStreamId }) @@ -187,7 +189,7 @@ const createNewChannelAndPostHello = async ( ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: `hello ${i}`, }), - nextHash, + nextMiniblock, ) await expect( bob.addEvent({ @@ -195,7 +197,9 @@ const createNewChannelAndPostHello = async ( event: e, }), ).resolves.not.toThrow() - nextHash = (await bob.getLastMiniblockHash({ streamId: channelId })).hash + nextMiniblock = miniblockRefFromResponse( + await bob.getLastMiniblockHash({ streamId: channelId }), + ) } // Post just hello to the channel @@ -205,7 +209,7 @@ const createNewChannelAndPostHello = async ( ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: 'hello', }), - nextHash, + nextMiniblock, ) const lastHash = (await bob.getLastMiniblockHash({ streamId: channelId })).hash await expect( diff --git a/packages/sdk/src/tests/multi_ne/sign.test.ts b/packages/sdk/src/tests/multi_ne/sign.test.ts index 8f7e828219..24e37cc0d9 100644 --- a/packages/sdk/src/tests/multi_ne/sign.test.ts +++ b/packages/sdk/src/tests/multi_ne/sign.test.ts @@ -35,9 +35,12 @@ describe('sign', () => { address: publicKeyToAddress(pub), } }) - const hash = bin_fromHexString( - '0x8dc27dbd6fc775e3a05c509c6eb1c63c4ab5bc6e7010bf9a9a80a42ae1ea56b0', - ) + const miniblockRef = { + hash: bin_fromHexString( + '0x8dc27dbd6fc775e3a05c509c6eb1c63c4ab5bc6e7010bf9a9a80a42ae1ea56b0', + ), + num: 1n, + } const badHashes = [ bin_fromHexString('0x8dc27dbd6fc775e3a05c509c6eb1c63c4ab5bc6e7010bf9a9a80a42ae1ea56b000'), bin_fromHexString('0x8dc27dbd6fc775e3a05c509c6eb1c63c4ab5bc6e7010bf9a9a80a42ae1ea56'), @@ -230,11 +233,11 @@ describe('sign', () => { }, }, } - const event = await makeEvent(context, payload, hash) + const event = await makeEvent(context, payload, miniblockRef) expect(await unpackEnvelope(event, undefined)).toBeDefined() // Event with same payload from different wallet doesn't match - const event2 = await makeEvent(context2, payload, hash) + const event2 = await makeEvent(context2, payload, miniblockRef) expect(await unpackEnvelope(event2, undefined)).toBeDefined() expect(event2).not.toEqual(event) @@ -257,7 +260,7 @@ describe('sign', () => { }).rejects.toThrow() // Event with same payload from the same wallet doesn't match - const event3 = await makeEvent(context, payload, hash) + const event3 = await makeEvent(context, payload, miniblockRef) expect(await unpackEnvelope(event3, undefined)).toBeDefined() expect(event3.hash).not.toEqual(event.hash) expect(event3).not.toEqual(event) @@ -287,10 +290,10 @@ describe('sign', () => { }, }, } - expect(await makeEvent(context, payload, hash)).toBeDefined() + expect(await makeEvent(context, payload, miniblockRef)).toBeDefined() for (const h of badHashes) { - await expect(makeEvent(context, payload, h)).rejects.toThrow() + await expect(makeEvent(context, payload, { hash: h, num: 1n })).rejects.toThrow() } }, ) diff --git a/packages/sdk/src/tests/multi_ne/streamRpcClient.test.ts b/packages/sdk/src/tests/multi_ne/streamRpcClient.test.ts index 622e43cbe7..0c24831cd1 100644 --- a/packages/sdk/src/tests/multi_ne/streamRpcClient.test.ts +++ b/packages/sdk/src/tests/multi_ne/streamRpcClient.test.ts @@ -14,6 +14,7 @@ import { iterableWrapper, TEST_ENCRYPTED_MESSAGE_PROPS, waitForSyncStreams, + lastMiniblockRef, } from '../testUtils' import { addressFromUserId, @@ -31,6 +32,7 @@ import { make_UserPayload_Inception, make_UserPayload_UserMembership, make_UserPayload_UserMembershipAction, + miniblockRefFromResponse, ParsedEvent, } from '../../types' import { bobTalksToHimself } from '../bob_testUtils' @@ -250,7 +252,7 @@ describe('streamRpcClient using v2 sync', () => { streamId: channelId, streamParentId: spaceId, }), - bobsUserStream.stream?.miniblocks.at(-1)?.header?.hash, + await lastMiniblockRef(bobsUserStream), ) await bob.addEvent({ streamId: bobsUserStreamId, @@ -269,7 +271,7 @@ describe('streamRpcClient using v2 sync', () => { ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: 'hello', }), - alicesChannel.stream?.miniblocks.at(-1)?.header?.hash, + await lastMiniblockRef(alicesChannel), ) await alice.addEvent({ streamId: channelId, @@ -406,7 +408,7 @@ describe('streamRpcClient', () => { ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: 'hello', }), - userStream.stream?.miniblocks.at(-1)?.header?.hash, + await lastMiniblockRef(userStream), ) const promise = bob.addEvent({ streamId: bobsUserStreamId, @@ -519,7 +521,7 @@ describe('streamRpcClient', () => { ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: 'hello', }), - createChannelResponse.stream?.miniblocks.at(-1)?.header?.hash, + await lastMiniblockRef(createChannelResponse), ) await bob.addEvent({ streamId: channelId, @@ -537,7 +539,7 @@ describe('streamRpcClient', () => { ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: 'hello', }), - createChannelResponse.stream?.miniblocks.at(-1)?.header?.hash, + await lastMiniblockRef(createChannelResponse), ), }), ).rejects.toThrow( @@ -579,7 +581,7 @@ describe('streamRpcClient', () => { streamId: channelId, streamParentId: spaceId, }), - bobsStream.stream?.miniblocks.at(-1)?.header?.hash, + await lastMiniblockRef(bobsStream), ) await bob.addEvent({ streamId: bobsUserStreamId, @@ -634,7 +636,7 @@ describe('streamRpcClient', () => { streamId: channelId, streamParentId: spaceId, }), - alicesStream.stream?.miniblocks.at(-1)?.header?.hash, + await lastMiniblockRef(alicesStream), ) await alice.addEvent({ streamId: alicesUserStreamId, @@ -679,7 +681,7 @@ describe('streamRpcClient', () => { ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: 'Hello, Alice!', }), - channel.stream?.miniblocks.at(-1)?.header?.hash, + await lastMiniblockRef(channel), ) await bob.addEvent({ streamId: channelId, @@ -765,16 +767,16 @@ describe('streamRpcClient', () => { // try to leave, first with expired context, then with good context const addEventWith = async (context: SignerContext) => { - const lastMiniblockHash = ( - await jimmy.getLastMiniblockHash({ streamId: jimmysUserStreamId }) - ).hash + const lastMiniblock = miniblockRefFromResponse( + await jimmy.getLastMiniblockHash({ streamId: jimmysUserStreamId }), + ) const messageEvent = await makeEvent( context, make_UserPayload_UserMembership({ streamId: spacedStreamId, op: MembershipOp.SO_LEAVE, }), - lastMiniblockHash, + lastMiniblock, ) return jimmy.addEvent({ streamId: jimmysUserStreamId, @@ -870,7 +872,7 @@ describe('streamRpcClient', () => { op: MembershipOp.SO_JOIN, initiatorId: bobsUserId, }), - Uint8Array.from(Array(32).fill('1')), + { hash: Uint8Array.from(Array(32).fill('1')), num: 1n }, ) // TODO: fix up error codes Err.BAD_PREV_EVENTS await expect( @@ -885,14 +887,16 @@ describe('streamRpcClient', () => { ) log('Bob adds event with correct hash') - const lastMiniblockHash = (await bob.getLastMiniblockHash({ streamId: channelId })).hash + const lastMiniblock = miniblockRefFromResponse( + await bob.getLastMiniblockHash({ streamId: channelId }), + ) const messageEvent = await makeEvent( bobsContext, make_ChannelPayload_Message({ ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: 'Hello, World!', }), - lastMiniblockHash, + lastMiniblock, ) await expect( bob.addEvent({ @@ -981,14 +985,16 @@ describe('streamRpcClient', () => { log('Bob created channel') log('Bob adds event with correct signature') - const lastMiniblockHash = (await bob.getLastMiniblockHash({ streamId: channelId })).hash + const lastMiniblock = miniblockRefFromResponse( + await bob.getLastMiniblockHash({ streamId: channelId }), + ) const messageEvent = await makeEvent( bobsContext, make_ChannelPayload_Message({ ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: 'Hello, World!', }), - lastMiniblockHash, + lastMiniblock, ) channelEvents.push(messageEvent) await expect( @@ -1017,7 +1023,7 @@ describe('streamRpcClient', () => { ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: 'Nah, not really', }), - lastMiniblockHash, + lastMiniblock, ) badEvent.signature = messageEvent.signature await expect( @@ -1034,14 +1040,14 @@ describe('streamRpcClient', () => { ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: 'Nah, not really', }), - Uint8Array.from(Array(32).fill('1')), + { hash: Uint8Array.from(Array(32).fill('1')), num: 1n }, ) await expect( bob.addEvent({ streamId: channelId, event: expiredEvent, }), - ).rejects.toThrow('24:BAD_PREV_MINIBLOCK_HASH') + ).rejects.toThrow('63:MINIBLOCK_TOO_NEW') }) }) diff --git a/packages/sdk/src/tests/multi_ne/syncWithBlocks.test.ts b/packages/sdk/src/tests/multi_ne/syncWithBlocks.test.ts index c09a07ecf9..4b9a16513d 100644 --- a/packages/sdk/src/tests/multi_ne/syncWithBlocks.test.ts +++ b/packages/sdk/src/tests/multi_ne/syncWithBlocks.test.ts @@ -18,6 +18,7 @@ import { make_MemberPayload_Membership2, make_SpacePayload_Inception, make_UserPayload_Inception, + miniblockRefFromHeader, } from '../../types' import { TEST_ENCRYPTED_MESSAGE_PROPS, @@ -25,6 +26,7 @@ import { makeTestRpcClient, makeUniqueSpaceStreamId, iterableWrapper, + lastMiniblockRef, } from '../testUtils' import { SignerContext } from '../../signerContext' @@ -134,7 +136,7 @@ describe('syncWithBlocks', () => { ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: text, }), - channel.stream?.miniblocks.at(-1)?.header?.hash, + await lastMiniblockRef(channel), ) nextHash = messageEvent.hash const resp = await bob.addEvent({ @@ -204,7 +206,7 @@ describe('syncWithBlocks', () => { ...TEST_ENCRYPTED_MESSAGE_PROPS, ciphertext: text, }), - p.hash, + miniblockRefFromHeader(p), ) nextHash = messageEvent.hash const response = await bob.addEvent({ diff --git a/packages/sdk/src/tests/multi_ne/syncedStreams.test.ts b/packages/sdk/src/tests/multi_ne/syncedStreams.test.ts index 922b919624..a8388e4be8 100644 --- a/packages/sdk/src/tests/multi_ne/syncedStreams.test.ts +++ b/packages/sdk/src/tests/multi_ne/syncedStreams.test.ts @@ -110,7 +110,7 @@ describe('syncStreams', () => { event: await makeEvent( alicesContext, payload, - userInboxStreamResponse.streamAndCookie.miniblocks[0].hash, + userInboxStreamResponse.streamAndCookie.miniblocks.at(-1)!.ref, ), }) } diff --git a/packages/sdk/src/tests/testDriver_testUtils.ts b/packages/sdk/src/tests/testDriver_testUtils.ts index 35db02955f..da77c6c4c0 100644 --- a/packages/sdk/src/tests/testDriver_testUtils.ts +++ b/packages/sdk/src/tests/testDriver_testUtils.ts @@ -2,7 +2,7 @@ import { Client } from '../client' import { DLogger, check, dlog } from '@river-build/dlog' import { makeTestClient, makeUniqueSpaceStreamId } from './testUtils' import { makeUniqueChannelStreamId } from '../id' -import { MembershipOp, SnapshotCaseType } from '@river-build/proto' +import { SnapshotCaseType } from '@river-build/proto' import { DecryptedTimelineEvent } from '../types' class TestDriver { @@ -24,31 +24,20 @@ class TestDriver { } async start(): Promise { - this.log(`driver starting client`) + this.log('driver starting client') await this.client.initializeUser() - this.client.on('userInvitedToStream', (s) => void this.userInvitedToStream.bind(this)(s)) - this.client.on('userJoinedStream', (s) => void this.userJoinedStream.bind(this)(s)) this.client.on('eventDecrypted', (e, f, g) => void this.eventDecrypted.bind(this)(e, f, g)) this.client.startSync() - this.log(`driver started client`) + this.log('driver started client') } async stop(): Promise { - this.log(`driver stopping client`) + this.log('driver stopping client') await this.client.stop() - this.log(`driver stopped client`) - } - - async userInvitedToStream(streamId: string): Promise { - this.log(`userInvitedToStream streamId=${streamId}`) - await this.client.joinStream(streamId) - } - - userJoinedStream(streamId: string): void { - this.log(`userJoinedStream streamId=${streamId}`) + this.log('driver stopped client') } eventDecrypted( @@ -63,34 +52,38 @@ class TestDriver { payload.content?.payload?.case !== 'post' || payload.content?.payload?.value.content.case !== 'text' ) { - throw new Error(`eventDecrypted is not a post`) + throw new Error('eventDecrypted is not a post') } content = payload.content?.payload?.value.content.value.body this.log( - `eventDecrypted channelId=${streamId} message=${content}`, + 'eventDecrypted channelId=', + streamId, + 'message=', + content, this.expected ? [...this.expected] : undefined, ) if (this.expected?.delete(content)) { - this.log(`eventDecrypted expected message Received, text=${content}`) + this.log('eventDecrypted expected message Received, text=', content) if (this.expected.size === 0) { this.expected = undefined if (this.allExpectedReceived === undefined) { throw new Error('allExpectedReceived is undefined') } - this.log(`eventDecrypted all expected messages Received, text=${content}`) + this.log('eventDecrypted all expected messages Received, text=', content) this.allExpectedReceived() } else { - this.log(`eventDecrypted still expecting messages`, this.expected) + this.log('eventDecrypted still expecting messages', this.expected) } } else { if (this.badMessageReceived === undefined) { throw new Error('badMessageReceived is undefined') } this.log( - `channelNewMessage badMessageReceived text=${content}}, expected=${Array.from( - this.expected?.values() ?? [], - ).join(', ')}`, + 'channelNewMessage badMessageReceived text=', + content, + 'expected=', + Array.from(this.expected?.values() ?? []).join(', '), ) this.badMessageReceived( `badMessageReceived text=${content}, expected=${Array.from( @@ -109,7 +102,7 @@ class TestDriver { this.stepNum = stepNum this.log = dlog(`test:${this.testName} client:${this.num}:step:${this.stepNum}`) - this.log(`step start`, message) + this.log('step start', message) this.expected = new Set(expected) const ret = new Promise((resolve, reject) => { @@ -118,7 +111,7 @@ class TestDriver { }) if (message !== '') { - this.log(`step sending channelId=${channelId} message=${message}`) + this.log('step sending channelId=', channelId, 'message=', message) await this.client.sendMessage(channelId, message) } if (expected.size > 0) { @@ -127,7 +120,7 @@ class TestDriver { this.allExpectedReceived = undefined this.badMessageReceived = undefined - this.log(`step end`, message) + this.log('step end', message) this.stepNum = undefined this.log = dlog(`test:client:${this.num}:step:${this.stepNum}`) } @@ -145,119 +138,92 @@ export const converse = async (conversation: string[][], testName: string): Prom const numDrivers = conversation[0].length const numConversationSteps = conversation.length - log(`START, numDrivers=${numDrivers}, steps=${numConversationSteps}`) + log('START, numDrivers=', numDrivers, 'steps=', numConversationSteps) const drivers = await Promise.all( - Array.from({ length: numDrivers }) - .fill('') - .map(async (_, i) => await makeTestDriver(i, testName)), + Array.from({ length: numDrivers }, async (_, i) => makeTestDriver(i, testName)), ) - log(`starting all drivers`) + log('starting all drivers') await Promise.all( drivers.map(async (d) => { - log(`starting driver`, { + log('starting driver', { num: d.num, userId: d.client.userId, }) await d.start() - log(`started driver`, { num: d.num }) + log('started driver', { num: d.num }) }), ) - log(`started all drivers`) + log('started all drivers') const alice = drivers[0] const others = drivers.slice(1) const spaceId = makeUniqueSpaceStreamId() - log(`creating space ${spaceId}`) + log('creating space', spaceId) await alice.client.createSpace(spaceId) await alice.client.waitForStream(spaceId) - // Invite and join space. - log(`inviting others to space`) - const allJoinedSpace = Promise.all( - others.map(async (d) => { - log(`awaiting userJoinedStream for`, d.client.userId) - const stream = await d.client.waitForStream(spaceId) - await stream.waitForMembership(MembershipOp.SO_JOIN) - log(`received userJoinedStream for`, d.client.userId) - }), - ) + // Join others to space. + log('joining others to space') await Promise.all( others.map(async (d) => { - log(`${alice.client.userId} inviting other to space`, d.client.userId) - await alice.client.inviteUser(spaceId, d.client.userId) - log(`invited other to space`, d.client.userId) + await d.client.joinStream(spaceId) + log( + 'joined space', + d.num, + 'last know miniblock', + d.client.stream(spaceId)?.view.prevMiniblock, + ) }), ) - log(`and wait for all to join space...`) - await allJoinedSpace - log(`all joined space`) - log( - `${testName} inviting others to space after`, - others.map((d) => ({ num: d.num, userStreamId: d.client.userStreamId })), - ) + log('all joined space') - log(`creating channel`) + log('creating channel') const channelId = makeUniqueChannelStreamId(spaceId) - const channelName = 'Alica channel' - const channelTopic = 'Alica channel topic' + const channelName = 'Alice channel' + const channelTopic = 'Alice channel topic' await alice.client.createChannel(spaceId, channelName, channelTopic, channelId) await alice.client.waitForStream(channelId) - // Invite and join channel. - log( - `${testName} inviting others to channel`, - others.map((d) => ({ num: d.num, userStreamId: d.client.userStreamId })), - ) - const allJoined = Promise.all( - others.map(async (d) => { - log(`awaiting userJoinedStream channel for`, d.client.userId, channelId) - const stream = await d.client.waitForStream(channelId) - await stream.waitForMembership(MembershipOp.SO_JOIN) - log(`received userJoinedStream channel for`, d.client.userId, channelId) - }), - ) + // Join others to channel. + log('joining others to channel') await Promise.all( others.map(async (d) => { - log(`inviting user to channel`, d.client.userId, channelId) - await alice.client.inviteUser(channelId, d.client.userId) - log(`invited user to channel`, d.client.userId, channelId) + await d.client.joinStream(channelId) + log( + 'joined channel', + d.num, + 'last know miniblock', + d.client.stream(channelId)?.view.prevMiniblock, + ) }), ) - log(`and wait for all to join...`) - await allJoined - log(`all joined`) + log('all joined channel') for (const [conv_idx, conv] of conversation.entries()) { - log(`conversation stepping start ${conv_idx}`, conv) + log('conversation step START =====', conv_idx, conv) await Promise.all( conv.map(async (msg, msg_idx) => { - log(`conversation step before send conv: ${conv_idx} msg: ${msg_idx}`, msg) // expect to recieve everyone elses messages (don't worry about your own, they render locally) const expected = new Set( [...conv.slice(0, msg_idx), ...conv.slice(msg_idx + 1)].filter( (s) => s !== '', ), ) - log(`conversation step execute ${msg_idx}`, msg, [...expected]) await drivers[msg_idx].step(channelId, conv_idx, expected, msg) - log( - `${testName} conversation step after send conv: ${conv_idx} msg: ${msg_idx}`, - msg, - ) }), ) - log(`conversation stepping end ${conv_idx}`, conv) + log('conversation step END =====', conv_idx) } - log(`conversation complete, now stopping drivers`) + log('conversation complete, now stopping drivers') await Promise.all(drivers.map((d) => d.stop())) - log(`drivers stopped`) + log('drivers stopped') return 'success' } catch (e) { - log(`converse ERROR`, e) + log('converse ERROR', e) throw e } } diff --git a/packages/sdk/src/tests/testUtils.ts b/packages/sdk/src/tests/testUtils.ts index ae53a9c1d6..e5898510d5 100644 --- a/packages/sdk/src/tests/testUtils.ts +++ b/packages/sdk/src/tests/testUtils.ts @@ -1,7 +1,12 @@ /* eslint-disable @typescript-eslint/no-redundant-type-constituents */ /* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-unsafe-argument */ -import { _impl_makeEvent_impl_, publicKeyToAddress, unpackStreamEnvelopes } from '../sign' +import { + _impl_makeEvent_impl_, + publicKeyToAddress, + unpackEnvelope, + unpackStreamEnvelopes, +} from '../sign' import { EncryptedData, @@ -13,6 +18,8 @@ import { SyncStreamsResponse, SyncOp, EncryptedDataVersion, + CreateStreamResponse, + GetStreamResponse, } from '@river-build/proto' import { Entitlements } from '../sync-agent/entitlements/entitlements' import { PlainMessage } from '@bufbuild/protobuf' @@ -32,7 +39,7 @@ import { makeUserStreamId, userIdFromAddress, } from '../id' -import { ParsedEvent, DecryptedTimelineEvent } from '../types' +import { ParsedEvent, DecryptedTimelineEvent, MiniblockRef, miniblockRefFromHeader } from '../types' import { getPublicKey, utils } from 'ethereum-cryptography/secp256k1' import { EntitlementsDelegate } from '@river-build/encryption' import { bin_fromHexString, check, dlog } from '@river-build/dlog' @@ -146,9 +153,9 @@ export const makeTestRpcClient = async (opts?: RpcOptions) => { export const makeEvent_test = async ( context: SignerContext, payload: PlainMessage['payload'], - prevMiniblockHash?: Uint8Array, + prevMiniblock?: MiniblockRef, ): Promise => { - return _impl_makeEvent_impl_(context, payload, prevMiniblockHash) + return _impl_makeEvent_impl_(context, payload, prevMiniblock) } export const TEST_ENCRYPTED_MESSAGE_PROPS: PlainMessage = { @@ -1469,3 +1476,11 @@ export const findMessageByText = ( event.content.body === text, ) } + +export async function lastMiniblockRef( + response: CreateStreamResponse | GetStreamResponse, +): Promise { + const header = response.stream!.miniblocks.at(-1)!.header! + const parsedHeader = await unpackEnvelope(header) + return miniblockRefFromHeader(parsedHeader) +} diff --git a/packages/sdk/src/types.ts b/packages/sdk/src/types.ts index d536396177..0af1f58d23 100644 --- a/packages/sdk/src/types.ts +++ b/packages/sdk/src/types.ts @@ -1,44 +1,45 @@ import { PlainMessage } from '@bufbuild/protobuf' import { - StreamEvent, - ChannelMessage, + BlockchainTransaction, ChannelMessage_Post_Content_Text, - UserMetadataPayload_Inception, - UserPayload_Inception, - SpacePayload_Inception, - ChannelPayload_Inception, - UserSettingsPayload_Inception, - SpacePayload_ChannelUpdate, - SpacePayload_UpdateChannelAutojoin, - SpacePayload_UpdateChannelHideUserJoinLeaveEvents, - EncryptedData, - UserPayload_UserMembership, - UserSettingsPayload_UserBlock, - UserSettingsPayload_FullyReadMarkers, - MiniblockHeader, ChannelMessage_Post_Mention, ChannelMessage_Post, - MediaPayload_Inception, - MediaPayload_Chunk, + ChannelMessage, + ChannelPayload_Inception, DmChannelPayload_Inception, + EncryptedData, GdmChannelPayload_Inception, + GetLastMiniblockHashResponse, + MediaPayload_Chunk, + MediaPayload_Inception, + MemberPayload_KeyFulfillment, + MemberPayload_KeySolicitation, + MemberPayload_Membership, + MemberPayload_Nft, + MemberPayload, + MembershipOp, + MiniblockHeader, + Snapshot, + SpacePayload_ChannelUpdate, + SpacePayload_Inception, + SpacePayload_UpdateChannelAutojoin, + SpacePayload_UpdateChannelHideUserJoinLeaveEvents, + StreamEvent, + SyncCookie, UserInboxPayload_Ack, + UserInboxPayload_GroupEncryptionSessions, UserInboxPayload_Inception, UserMetadataPayload_EncryptionDevice, - UserInboxPayload_GroupEncryptionSessions, - SyncCookie, - Snapshot, + UserMetadataPayload_Inception, + UserPayload_Inception, + UserPayload_UserMembership, UserPayload_UserMembershipAction, - MemberPayload_Membership, - MembershipOp, - MemberPayload_KeyFulfillment, - MemberPayload_KeySolicitation, - MemberPayload, - MemberPayload_Nft, - BlockchainTransaction, + UserSettingsPayload_FullyReadMarkers, + UserSettingsPayload_Inception, + UserSettingsPayload_UserBlock, } from '@river-build/proto' import { keccak256 } from 'ethereum-cryptography/keccak' -import { bin_toHexString } from '@river-build/dlog' +import { bin_toHexString, check } from '@river-build/dlog' import { isDefined } from './check' import { DecryptedContent } from './encryptedContentTypes' import { addressFromUserId, streamIdAsBytes } from './id' @@ -159,8 +160,28 @@ export function makeRemoteTimelineEvent(params: { } } -export interface ParsedMiniblock { +export interface MiniblockRef { hash: Uint8Array + num: bigint +} + +export function miniblockRefFromHeader(header: ParsedEvent): MiniblockRef { + check(header.event.payload?.case === 'miniblockHeader', 'Expected miniblock header event') + return { + hash: header.hash, + num: header.event.payload.value.miniblockNum, + } +} + +export function miniblockRefFromResponse(r: GetLastMiniblockHashResponse): MiniblockRef { + return { + hash: r.hash, + num: r.miniblockNum, + } +} + +export interface ParsedMiniblock { + ref: MiniblockRef header: MiniblockHeader events: ParsedEvent[] }