From 30ed4aad608b84be9f650d7d6e1d0225588cd322 Mon Sep 17 00:00:00 2001 From: texuf Date: Tue, 4 Feb 2025 13:39:05 -0800 Subject: [PATCH] =?UTF-8?q?Update=20synced=20streams=20loop=20to=20?= =?UTF-8?q?=E2=80=9Cmodify=20stream=E2=80=9D=20and=20add=20a=20few=20strea?= =?UTF-8?q?ms=20at=20a=20time=20(#2205)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit keeps us from getting the “buffer full” messages --- .../encryption/src/decryptionExtensions.ts | 8 +- packages/sdk/src/client.ts | 46 +- .../sdk/src/clientDecryptionExtensions.ts | 2 +- packages/sdk/src/persistenceStore.ts | 217 ++++++++- packages/sdk/src/streamUtils.ts | 21 +- packages/sdk/src/syncedStream.ts | 154 +------ packages/sdk/src/syncedStreams.ts | 11 +- packages/sdk/src/syncedStreamsExtension.ts | 148 ++++-- packages/sdk/src/syncedStreamsLoop.ts | 436 ++++++++++-------- .../src/tests/multi_ne/syncedStreams.test.ts | 2 +- 10 files changed, 630 insertions(+), 415 deletions(-) diff --git a/packages/encryption/src/decryptionExtensions.ts b/packages/encryption/src/decryptionExtensions.ts index 08ee1e575a..9960cdd3c6 100644 --- a/packages/encryption/src/decryptionExtensions.ts +++ b/packages/encryption/src/decryptionExtensions.ts @@ -293,7 +293,7 @@ export abstract class BaseDecryptionExtensions { keySolicitation: KeySolicitationContent, ): void { if (keySolicitation.deviceKey === this.userDevice.deviceKey) { - this.log.debug('ignoring key solicitation for our own device') + //this.log.debug('ignoring key solicitation for our own device') return } const selectedQueue = @@ -309,7 +309,7 @@ export abstract class BaseDecryptionExtensions { selectedQueue.splice(index, 1) } if (keySolicitation.sessionIds.length > 0 || keySolicitation.isNewDevice) { - this.log.debug('new key solicitation', { fromUserId, streamId, keySolicitation }) + //this.log.debug('new key solicitation', { fromUserId, streamId, keySolicitation }) this.keySolicitationsNeedsSort = true selectedQueue.push({ streamId, @@ -322,12 +322,12 @@ export abstract class BaseDecryptionExtensions { } satisfies KeySolicitationItem) this.checkStartTicking() } else if (index > -1) { - this.log.debug('cleared key solicitation', keySolicitation) + //this.log.debug('cleared key solicitation', keySolicitation) } } public setStreamUpToDate(streamId: string): void { - this.log.debug('streamUpToDate', streamId) + //this.log.debug('streamUpToDate', streamId) this.upToDateStreams.add(streamId) this.checkStartTicking() } diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index ad07c1e40e..af571c3493 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -147,7 +147,12 @@ import { Stream } from './stream' import { getTime, usernameChecksum } from './utils' import { isEncryptedContentKind, toDecryptedContent } from './encryptedContentTypes' import { ClientDecryptionExtensions } from './clientDecryptionExtensions' -import { PersistenceStore, IPersistenceStore, StubPersistenceStore } from './persistenceStore' +import { + PersistenceStore, + IPersistenceStore, + StubPersistenceStore, + LoadedStream, +} from './persistenceStore' import { SyncedStreams } from './syncedStreams' import { SyncState } from './syncedStreamsLoop' import { SyncedStream } from './syncedStream' @@ -265,17 +270,21 @@ export class Client } this.streams = new SyncedStreams(this.userId, this.rpcClient, this, this.unpackEnvelopeOpts) - this.syncedStreamsExtensions = new SyncedStreamsExtension({ - startSyncStreams: async () => { - await this.streams.startSyncStreams() - this.decryptionExtensions?.start() - this.mlsAdapter?.start() + this.syncedStreamsExtensions = new SyncedStreamsExtension( + highPriorityStreamIds, + { + startSyncStreams: async () => { + this.mlsAdapter?.start() + this.streams.startSyncStreams() + this.decryptionExtensions?.start() + }, + initStream: (streamId, allowGetStream, persistedData) => + this.initStream(streamId, allowGetStream, persistedData), + emitClientInitStatus: (status) => this.emit('clientInitStatusUpdated', status), }, - initStream: (streamId, allowGetStream) => this.initStream(streamId, allowGetStream), - emitClientInitStatus: (status) => this.emit('clientInitStatusUpdated', status), - }) + this.persistenceStore, + ) - this.syncedStreamsExtensions.setHighPriority(highPriorityStreamIds ?? []) this.logCall('new Client') } @@ -391,7 +400,7 @@ export class Client encryptionDeviceInit?: EncryptionDeviceInitOpts }): Promise<{ initCryptoTime: number - initMlsTime: number + //initMlsTime: number initUserStreamTime: number initUserInboxStreamTime: number initUserMetadataStreamTime: number @@ -407,11 +416,11 @@ export class Client this.logCall('initializeUser', this.userId) assert(this.userStreamId === undefined, 'already initialized') const initCrypto = await getTime(() => this.initCrypto(opts?.encryptionDeviceInit)) - const initMls = await getTime(() => this.initMls()) + //const initMls = await getTime(() => this.initMls()) check(isDefined(this.decryptionExtensions), 'decryptionExtensions must be defined') check(isDefined(this.syncedStreamsExtensions), 'syncedStreamsExtensions must be defined') - check(isDefined(this.mlsAdapter), 'mlsAdapter must be defined') + //check(isDefined(this.mlsAdapter), 'mlsAdapter must be defined') const [ initUserStream, @@ -437,7 +446,7 @@ export class Client return { initCryptoTime: initCrypto.time, - initMlsTime: initMls.time, + //initMlsTime: initMls.time, initUserStreamTime: initUserStream.time, initUserInboxStreamTime: initUserInboxStream.time, initUserMetadataStreamTime: initUserMetadataStream.time, @@ -1402,6 +1411,7 @@ export class Client async initStream( streamId: string | Uint8Array, allowGetStream: boolean = true, + persistedData?: LoadedStream, ): Promise { const streamIdStr = streamIdAsString(streamId) const existingRequest = this.initStreamRequests.get(streamIdStr) @@ -1409,7 +1419,7 @@ export class Client this.logCall('initStream: had existing request for', streamIdStr, 'returning promise') return existingRequest } - const request = this._initStream(streamId, allowGetStream) + const request = this._initStream(streamId, allowGetStream, persistedData) this.initStreamRequests.set(streamIdStr, request) let stream: Stream try { @@ -1423,6 +1433,7 @@ export class Client private async _initStream( streamId: string | Uint8Array, allowGetStream: boolean = true, + persistedData?: LoadedStream, ): Promise { try { this.logCall('initStream', streamId) @@ -1439,7 +1450,7 @@ export class Client const stream = this.createSyncedStream(streamId) // Try initializing from persistence - if (await stream.initializeFromPersistence()) { + if (await stream.initializeFromPersistence(persistedData)) { if (stream.view.syncCookie) { await this.streams.addStreamToSync(stream.view.syncCookie) } @@ -2543,7 +2554,10 @@ export class Client } public setHighPriorityStreams(streamIds: string[]) { + this.logCall('setHighPriorityStreams', streamIds) this.decryptionExtensions?.setHighPriorityStreams(streamIds) + this.syncedStreamsExtensions?.setHighPriorityStreams(streamIds) + this.streams.setHighPriorityStreams(streamIds) } public async ensureOutboundSession( diff --git a/packages/sdk/src/clientDecryptionExtensions.ts b/packages/sdk/src/clientDecryptionExtensions.ts index 8c1b503ea7..21937f1cf8 100644 --- a/packages/sdk/src/clientDecryptionExtensions.ts +++ b/packages/sdk/src/clientDecryptionExtensions.ts @@ -144,7 +144,7 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions { const numMembers = stream.view.getMembers().joinedParticipants().size const maxWaitTimeSeconds = Math.max(5, Math.min(30, numMembers)) const waitTime = maxWaitTimeSeconds * 1000 * Math.random() // this could be much better - this.log.debug('getRespondDelayMSForKeySolicitation', { streamId, userId, waitTime }) + //this.log.debug('getRespondDelayMSForKeySolicitation', { streamId, userId, waitTime }) return waitTime * multiplier } diff --git a/packages/sdk/src/persistenceStore.ts b/packages/sdk/src/persistenceStore.ts index 548d17741e..d2c350556b 100644 --- a/packages/sdk/src/persistenceStore.ts +++ b/packages/sdk/src/persistenceStore.ts @@ -1,19 +1,31 @@ -import { PersistedMiniblock, PersistedSyncedStream, SyncCookie } from '@river-build/proto' +import { PersistedMiniblock, PersistedSyncedStream, Snapshot } from '@river-build/proto' import Dexie, { Table } from 'dexie' -import { ParsedEvent, ParsedMiniblock } from './types' +import { ParsedMiniblock } from './types' import { persistedSyncedStreamToParsedSyncedStream, persistedMiniblockToParsedMiniblock, parsedMiniblockToPersistedMiniblock, + ParsedPersistedSyncedStream, } from './streamUtils' -import { dlog, dlogError } from '@river-build/dlog' +import { bin_toHexString, dlog, dlogError } from '@river-build/dlog' import { isDefined } from './check' +import { isChannelStreamId, isDMChannelStreamId, isGDMChannelStreamId } from './id' +const MAX_CACHED_SCROLLBACK_COUNT = 3 const DEFAULT_RETRY_COUNT = 2 -const log = dlog('csb:persistence') +const log = dlog('csb:persistence', { defaultEnabled: false }) const logError = dlogError('csb:persistence:error') +export interface LoadedStream { + persistedSyncedStream: ParsedPersistedSyncedStream + miniblocks: ParsedMiniblock[] + cleartexts: Record | undefined + snapshot: Snapshot + prependedMiniblocks: ParsedMiniblock[] + prevSnapshotMiniblockNum: bigint +} + async function fnReadRetryer( fn: () => Promise, retries: number, @@ -67,16 +79,13 @@ export interface IPersistenceStore { saveCleartext(eventId: string, cleartext: Uint8Array | string): Promise getCleartext(eventId: string): Promise getCleartexts(eventIds: string[]): Promise | undefined> - getSyncedStream(streamId: string): Promise< - | { - syncCookie: SyncCookie - lastSnapshotMiniblockNum: bigint - minipoolEvents: ParsedEvent[] - lastMiniblockNum: bigint - } - | undefined - > + getSyncedStream(streamId: string): Promise saveSyncedStream(streamId: string, syncedStream: PersistedSyncedStream): Promise + loadStream( + streamId: string, + inPersistedSyncedStream?: ParsedPersistedSyncedStream, + ): Promise + loadStreams(streamIds: string[]): Promise> saveMiniblock(streamId: string, miniblock: ParsedMiniblock): Promise saveMiniblocks( streamId: string, @@ -153,7 +162,98 @@ export class PersistenceStore extends Dexie implements IPersistenceStore { return undefined } const cachedSyncedStream = PersistedSyncedStream.fromBinary(record.data) - return persistedSyncedStreamToParsedSyncedStream(cachedSyncedStream) + const psstpss = persistedSyncedStreamToParsedSyncedStream(streamId, cachedSyncedStream) + return psstpss + } + + async loadStream( + streamId: string, + inPersistedSyncedStream?: ParsedPersistedSyncedStream, + ): Promise { + const persistedSyncedStream = + inPersistedSyncedStream ?? (await this.getSyncedStream(streamId)) + if (!persistedSyncedStream) { + return undefined + } + + const miniblocks = await this.getMiniblocks( + streamId, + persistedSyncedStream.lastSnapshotMiniblockNum, + persistedSyncedStream.lastMiniblockNum, + ) + if (miniblocks.length === 0) { + return undefined + } + + const snapshot = miniblocks[0].header.snapshot + if (!snapshot) { + return undefined + } + + const isChannelStream = + isChannelStreamId(streamId) || + isDMChannelStreamId(streamId) || + isGDMChannelStreamId(streamId) + const prependedMiniblocks = isChannelStream + ? hasTopLevelRenderableEvent(miniblocks) + ? [] + : await this.cachedScrollback( + streamId, + miniblocks[0].header.prevSnapshotMiniblockNum, + miniblocks[0].header.miniblockNum, + ) + : [] + + const snapshotEventIds = eventIdsFromSnapshot(snapshot) + const eventIds = miniblocks.flatMap((mb) => mb.events.map((e) => e.hashStr)) + const prependedEventIds = prependedMiniblocks.flatMap((mb) => + mb.events.map((e) => e.hashStr), + ) + const cleartexts = await this.getCleartexts([ + ...eventIds, + ...snapshotEventIds, + ...prependedEventIds, + ]) + return { + persistedSyncedStream, + miniblocks, + cleartexts, + snapshot, + prependedMiniblocks, + prevSnapshotMiniblockNum: miniblocks[0].header.prevSnapshotMiniblockNum, + } + } + + async loadStreams(streamIds: string[]) { + const result = await this.transaction( + 'r', + [this.syncedStreams, this.cleartexts, this.miniblocks], + async () => { + const syncedStreams = await this.getSyncedStreams(streamIds) + const retVal: Record = {} + for (const streamId of streamIds) { + retVal[streamId] = await this.loadStream(streamId, syncedStreams[streamId]) + } + return retVal + }, + ) + return result + } + + private async getSyncedStreams(streamIds: string[]) { + const records = await this.syncedStreams.bulkGet(streamIds) + const cachedSyncedStreams = records.map((x) => + x + ? { streamId: x.streamId, data: PersistedSyncedStream.fromBinary(x.data) } + : undefined, + ) + const psstpss = cachedSyncedStreams.reduce((acc, x) => { + if (x) { + acc[x.streamId] = persistedSyncedStreamToParsedSyncedStream(x.streamId, x.data) + } + return acc + }, {} as Record) + return psstpss } async saveSyncedStream(streamId: string, syncedStream: PersistedSyncedStream) { @@ -256,6 +356,87 @@ export class PersistenceStore extends Dexie implements IPersistenceStore { log('navigator.storage unavailable') } } + private async cachedScrollback( + streamId: string, + fromInclusive: bigint, + toExclusive: bigint, + ): Promise { + // If this is a channel, DM or GDM, perform a few scrollbacks + if ( + !isChannelStreamId(streamId) && + !isDMChannelStreamId(streamId) && + !isGDMChannelStreamId(streamId) + ) { + return [] + } + let miniblocks: ParsedMiniblock[] = [] + for (let i = 0; i < MAX_CACHED_SCROLLBACK_COUNT; i++) { + if (toExclusive <= 0n) { + break + } + const result = await this.getMiniblocks(streamId, fromInclusive, toExclusive - 1n) + if (result.length > 0) { + miniblocks = [...result, ...miniblocks] + fromInclusive = result[0].header.prevSnapshotMiniblockNum + toExclusive = result[0].header.miniblockNum + if (hasTopLevelRenderableEvent(miniblocks)) { + break + } + } else { + break + } + } + return miniblocks + } +} + +function hasTopLevelRenderableEvent(miniblocks: ParsedMiniblock[]): boolean { + for (const mb of miniblocks) { + if (topLevelRenderableEventInMiniblock(mb)) { + return true + } + } + return false +} + +function topLevelRenderableEventInMiniblock(miniblock: ParsedMiniblock): boolean { + for (const e of miniblock.events) { + switch (e.event.payload.case) { + case 'channelPayload': + case 'gdmChannelPayload': + case 'dmChannelPayload': + switch (e.event.payload.value.content.case) { + case 'message': + if (!e.event.payload.value.content.value.refEventId) { + return true + } + } + } + } + return false +} + +function eventIdsFromSnapshot(snapshot: Snapshot): string[] { + const usernameEventIds = + snapshot.members?.joined + .filter((m) => isDefined(m.username)) + .map((m) => bin_toHexString(m.username!.eventHash)) ?? [] + const displayNameEventIds = + snapshot.members?.joined + .filter((m) => isDefined(m.displayName)) + .map((m) => bin_toHexString(m.displayName!.eventHash)) ?? [] + + switch (snapshot.content.case) { + case 'gdmChannelContent': { + const channelPropertiesEventIds = snapshot.content.value.channelProperties + ? [bin_toHexString(snapshot.content.value.channelProperties.eventHash)] + : [] + + return [...usernameEventIds, ...displayNameEventIds, ...channelPropertiesEventIds] + } + default: + return [...usernameEventIds, ...displayNameEventIds] + } } //Linting below is disable as this is a stub class which is used for testing and just follows the interface @@ -277,6 +458,14 @@ export class StubPersistenceStore implements IPersistenceStore { return Promise.resolve(undefined) } + async loadStream(streamId: string, inPersistedSyncedStream?: ParsedPersistedSyncedStream) { + return Promise.resolve(undefined) + } + + async loadStreams(streamIds: string[]) { + return Promise.resolve({}) + } + async saveSyncedStream(streamId: string, syncedStream: PersistedSyncedStream) { return Promise.resolve() } diff --git a/packages/sdk/src/streamUtils.ts b/packages/sdk/src/streamUtils.ts index 3c70122ee9..8855bc224e 100644 --- a/packages/sdk/src/streamUtils.ts +++ b/packages/sdk/src/streamUtils.ts @@ -8,6 +8,14 @@ import { ParsedEvent, ParsedMiniblock } from './types' import { bin_toHexString } from '@river-build/dlog' import { isDefined, logNever } from './check' +export interface ParsedPersistedSyncedStream { + streamId: string + syncCookie: SyncCookie + lastSnapshotMiniblockNum: bigint + minipoolEvents: ParsedEvent[] + lastMiniblockNum: bigint +} + export function isPersistedEvent(event: ParsedEvent, direction: 'forward' | 'backward'): boolean { if (!event.event) { return false @@ -105,18 +113,15 @@ function parsedEventToPersistedEvent(event: ParsedEvent) { }) } -export function persistedSyncedStreamToParsedSyncedStream(stream: PersistedSyncedStream): - | { - syncCookie: SyncCookie - lastSnapshotMiniblockNum: bigint - minipoolEvents: ParsedEvent[] - lastMiniblockNum: bigint - } - | undefined { +export function persistedSyncedStreamToParsedSyncedStream( + streamId: string, + stream: PersistedSyncedStream, +): ParsedPersistedSyncedStream | undefined { if (!stream.syncCookie) { return undefined } return { + streamId, syncCookie: stream.syncCookie, lastSnapshotMiniblockNum: stream.lastSnapshotMiniblockNum, minipoolEvents: stream.minipoolEvents.map(persistedEventToParsedEvent).filter(isDefined), diff --git a/packages/sdk/src/syncedStream.ts b/packages/sdk/src/syncedStream.ts index a7445ebf87..bc8dc207d2 100644 --- a/packages/sdk/src/syncedStream.ts +++ b/packages/sdk/src/syncedStream.ts @@ -4,12 +4,10 @@ import { Stream } from './stream' import { ParsedMiniblock, ParsedEvent, ParsedStreamResponse } from './types' import { DLogger, bin_toHexString, dlog } from '@river-build/dlog' import { isDefined } from './check' -import { IPersistenceStore } from './persistenceStore' +import { IPersistenceStore, LoadedStream } from './persistenceStore' import { StreamEvents } from './streamEvents' -import { isChannelStreamId, isDMChannelStreamId, isGDMChannelStreamId } from './id' import { ISyncedStream } from './syncedStreamsLoop' -const MAX_CACHED_SCROLLBACK_COUNT = 3 export class SyncedStream extends Stream implements ISyncedStream { log: DLogger isUpToDate = false @@ -22,65 +20,26 @@ export class SyncedStream extends Stream implements ISyncedStream { persistenceStore: IPersistenceStore, ) { super(userId, streamId, clientEmitter, logEmitFromStream) - this.log = dlog('csb:syncedStream').extend(userId) + this.log = dlog('csb:syncedStream', { defaultEnabled: false }).extend(userId) this.persistenceStore = persistenceStore } - async initializeFromPersistence(): Promise { - const cachedSyncedStream = await this.persistenceStore.getSyncedStream(this.streamId) - if (!cachedSyncedStream) { + async initializeFromPersistence(persistedData?: LoadedStream): Promise { + const loadedStream = + persistedData ?? (await this.persistenceStore.loadStream(this.streamId)) + if (!loadedStream) { + this.log('No persisted data found for stream', this.streamId, persistedData) return false } - const miniblocks = await this.persistenceStore.getMiniblocks( - this.streamId, - cachedSyncedStream.lastSnapshotMiniblockNum, - cachedSyncedStream.lastMiniblockNum, - ) - - if (miniblocks.length === 0) { - return false - } - - const snapshot = miniblocks[0].header.snapshot - if (!snapshot) { - return false - } - - const isChannelStream = - isChannelStreamId(this.streamId) || - isDMChannelStreamId(this.streamId) || - isGDMChannelStreamId(this.streamId) - const prependedMiniblocks = isChannelStream - ? hasTopLevelRenderableEvent(miniblocks) - ? [] - : await this.cachedScrollback( - miniblocks[0].header.prevSnapshotMiniblockNum, - miniblocks[0].header.miniblockNum, - ) - : [] - - const snapshotEventIds = eventIdsFromSnapshot(snapshot) - const eventIds = miniblocks.flatMap((mb) => mb.events.map((e) => e.hashStr)) - const prependedEventIds = prependedMiniblocks.flatMap((mb) => - mb.events.map((e) => e.hashStr), - ) - - const cleartexts = await this.persistenceStore.getCleartexts([ - ...eventIds, - ...snapshotEventIds, - ...prependedEventIds, - ]) - - this.log('Initializing from persistence', this.streamId) try { super.initialize( - cachedSyncedStream.syncCookie, - cachedSyncedStream.minipoolEvents, - snapshot, - miniblocks, - prependedMiniblocks, - miniblocks[0].header.prevSnapshotMiniblockNum, - cleartexts, + loadedStream.persistedSyncedStream.syncCookie, + loadedStream.persistedSyncedStream.minipoolEvents, + loadedStream.snapshot, + loadedStream.miniblocks, + loadedStream.prependedMiniblocks, + loadedStream.miniblocks[0].header.prevSnapshotMiniblockNum, + loadedStream.cleartexts, ) } catch (e) { this.log('Error initializing from persistence', this.streamId, e) @@ -205,42 +164,6 @@ export class SyncedStream extends Stream implements ISyncedStream { await this.persistenceStore.saveSyncedStream(this.streamId, cachedSyncedStream) } - private async cachedScrollback( - fromInclusive: bigint, - toExclusive: bigint, - ): Promise { - // If this is a channel, DM or GDM, perform a few scrollbacks - if ( - !isChannelStreamId(this.streamId) && - !isDMChannelStreamId(this.streamId) && - !isGDMChannelStreamId(this.streamId) - ) { - return [] - } - let miniblocks: ParsedMiniblock[] = [] - for (let i = 0; i < MAX_CACHED_SCROLLBACK_COUNT; i++) { - if (toExclusive <= 0n) { - break - } - const result = await this.persistenceStore.getMiniblocks( - this.streamId, - fromInclusive, - toExclusive - 1n, - ) - if (result.length > 0) { - miniblocks = [...result, ...miniblocks] - fromInclusive = result[0].header.prevSnapshotMiniblockNum - toExclusive = result[0].header.miniblockNum - if (hasTopLevelRenderableEvent(miniblocks)) { - break - } - } else { - break - } - } - return miniblocks - } - private markUpToDate(): void { if (this.isUpToDate) { return @@ -249,52 +172,3 @@ export class SyncedStream extends Stream implements ISyncedStream { this.emit('streamUpToDate', this.streamId) } } - -function eventIdsFromSnapshot(snapshot: Snapshot): string[] { - const usernameEventIds = - snapshot.members?.joined - .filter((m) => isDefined(m.username)) - .map((m) => bin_toHexString(m.username!.eventHash)) ?? [] - const displayNameEventIds = - snapshot.members?.joined - .filter((m) => isDefined(m.displayName)) - .map((m) => bin_toHexString(m.displayName!.eventHash)) ?? [] - - switch (snapshot.content.case) { - case 'gdmChannelContent': { - const channelPropertiesEventIds = snapshot.content.value.channelProperties - ? [bin_toHexString(snapshot.content.value.channelProperties.eventHash)] - : [] - - return [...usernameEventIds, ...displayNameEventIds, ...channelPropertiesEventIds] - } - default: - return [...usernameEventIds, ...displayNameEventIds] - } -} - -function hasTopLevelRenderableEvent(miniblocks: ParsedMiniblock[]): boolean { - for (const mb of miniblocks) { - if (topLevelRenderableEventInMiniblock(mb)) { - return true - } - } - return false -} - -function topLevelRenderableEventInMiniblock(miniblock: ParsedMiniblock): boolean { - for (const e of miniblock.events) { - switch (e.event.payload.case) { - case 'channelPayload': - case 'gdmChannelPayload': - case 'dmChannelPayload': - switch (e.event.payload.value.content.case) { - case 'message': - if (!e.event.payload.value.content.value.refEventId) { - return true - } - } - } - } - return false -} diff --git a/packages/sdk/src/syncedStreams.ts b/packages/sdk/src/syncedStreams.ts index cfc82b3602..74f04b4332 100644 --- a/packages/sdk/src/syncedStreams.ts +++ b/packages/sdk/src/syncedStreams.ts @@ -11,6 +11,7 @@ import { UnpackEnvelopeOpts } from './sign' export class SyncedStreams { private syncedStreamsLoop: SyncedStreamsLoop | undefined + private highPriorityIds: Set = new Set() // userId is the current user id private readonly userId: string private readonly logNamespace: string @@ -67,6 +68,11 @@ export class SyncedStreams { this.streams.set(id, stream) } + public setHighPriorityStreams(streamIds: string[]) { + this.highPriorityIds = new Set(streamIds) + this.syncedStreamsLoop?.setHighPriorityStreams(streamIds) + } + public delete(inStreamId: string | Uint8Array): void { const streamId = streamIdAsString(inStreamId) this.streams.get(streamId)?.stop() @@ -98,7 +104,7 @@ export class SyncedStreams { } } - public async startSyncStreams() { + public startSyncStreams() { const streamRecords = Array.from(this.streams.values()) .filter((x) => isDefined(x.syncCookie)) .map((stream) => ({ syncCookie: stream.syncCookie!, stream })) @@ -109,8 +115,9 @@ export class SyncedStreams { streamRecords, this.logNamespace, this.unpackEnvelopeOpts, + this.highPriorityIds, ) - await this.syncedStreamsLoop.start() + this.syncedStreamsLoop.start() } public async stopSync() { diff --git a/packages/sdk/src/syncedStreamsExtension.ts b/packages/sdk/src/syncedStreamsExtension.ts index 1275c81c17..d95d35db9c 100644 --- a/packages/sdk/src/syncedStreamsExtension.ts +++ b/packages/sdk/src/syncedStreamsExtension.ts @@ -5,11 +5,15 @@ import { isUserSettingsStreamId, isUserStreamId, isUserInboxStreamId, + spaceIdFromChannelId, + isDMChannelStreamId, + isGDMChannelStreamId, } from './id' import { check, dlog, dlogError } from '@river-build/dlog' import { Stream } from './stream' import { ClientInitStatus } from './types' import pLimit from 'p-limit' +import { IPersistenceStore, LoadedStream } from './persistenceStore' interface StreamSyncItem { streamId: string @@ -18,20 +22,25 @@ interface StreamSyncItem { interface SyncedStreamsExtensionDelegate { startSyncStreams: () => Promise - initStream(streamId: string, allowGetStream: boolean): Promise + initStream( + streamId: string, + allowGetStream: boolean, + persistedData?: LoadedStream, + ): Promise emitClientInitStatus: (status: ClientInitStatus) => void } -const concurrencyLimit = pLimit(50) +const concurrencyLimit = pLimit(20) export class SyncedStreamsExtension { - private log = dlog('csb:syncedStreamsExtension') + private log = dlog('csb:syncedStreamsExtension', { defaultEnabled: true }) + private logDebug = dlog('csb:syncedStreamsExtension:debug', { defaultEnabled: false }) private logError = dlogError('csb:syncedStreamsExtension:error') private readonly delegate: SyncedStreamsExtensionDelegate private readonly tasks = new Array<() => Promise>() private streamIds = new Set() - private highPriorityIds = new Set() + private highPriorityIds: Set private started: boolean = false private inProgressTick?: Promise private timeoutId?: NodeJS.Timeout @@ -54,7 +63,12 @@ export class SyncedStreamsExtension { progress: 0, } - constructor(delegate: SyncedStreamsExtensionDelegate) { + constructor( + highPriorityStreamIds: string[] | undefined, + delegate: SyncedStreamsExtensionDelegate, + private persistenceStore: IPersistenceStore, + ) { + this.highPriorityIds = new Set(highPriorityStreamIds ?? []) this.delegate = delegate } @@ -64,8 +78,7 @@ export class SyncedStreamsExtension { this.totalStreamCount = streamIds.length } - public setHighPriority(streamIds: string[]) { - check(this.highPriorityIds.size === 0, 'setHighPriority called twice') + public setHighPriorityStreams(streamIds: string[]) { this.highPriorityIds = new Set(streamIds) } @@ -83,7 +96,6 @@ export class SyncedStreamsExtension { this.numStreamsLoadedFromNetwork = 0 this.numStreamsFailedToLoad = 0 - this.tasks.push(() => this.loadHighPriorityStreams()) this.tasks.push(() => this.loadStreamsFromPersistence()) this.tasks.push(() => this.loadStreamsFromNetwork()) @@ -131,37 +143,64 @@ export class SyncedStreamsExtension { }, 0) } - private async loadHighPriorityStreams() { - const streamIds = Array.from(this.highPriorityIds) - await Promise.all(streamIds.map((streamId) => this.loadStreamFromPersistence(streamId))) + private async loadStreamsFromPersistence() { + this.log('####loadingStreamsFromPersistence') + const now = performance.now() + // aellis it seems like it would be faster to pull the high priority streams first + // then load the rest of the streams after, but it's not! + // for 300ish streams,loading the rest of the streams after the application has started + // going takes 30-50 seconds,doing it this way takes 4 seconds + const loadedStreams = await this.persistenceStore.loadStreams([ + ...Array.from(this.highPriorityIds), + ...Array.from(this.streamIds), + ]) + const t1 = performance.now() + this.log('####Performance: loaded streams from persistence!!', t1 - now) + + let streamIds = Array.from(this.highPriorityIds) + await Promise.all( + streamIds.map((streamId) => + this.loadStreamFromPersistence(streamId, loadedStreams[streamId]), + ), + ) this.didLoadHighPriorityStreams = true this.emitClientStatus() - } - - private async loadStreamsFromPersistence() { - const syncItems = Array.from(this.streamIds).map((streamId) => { - return { - streamId, - priority: this.priorityFromStreamId(streamId), - } satisfies StreamSyncItem - }) - syncItems.sort((a, b) => a.priority - b.priority) - await Promise.all(syncItems.map((item) => this.loadStreamFromPersistence(item.streamId))) + // wait for 10ms to allow the client to update the status + await new Promise((resolve) => setTimeout(resolve, 10)) + const t2 = performance.now() + this.log('####Performance: loadedHighPriorityStreams!!', t2 - t1) + streamIds = Array.from(this.streamIds).toSorted( + (a, b) => + priorityFromStreamId(a, this.highPriorityIds) - + priorityFromStreamId(b, this.highPriorityIds), + ) + // because of how concurrency limit works, resort the streams on every iteration of the loop + // to allow for updates to the priority of the streams + while (streamIds.length > 0) { + const item = streamIds.shift()! + //this.log('Performance: loading stream from persistence', item) + await this.loadStreamFromPersistence(item, loadedStreams[item]) + } + const t3 = performance.now() + this.log('####Performance: loadedLowPriorityStreams!!', t3 - t2, 'total:', t3 - now) this.didLoadStreamsFromPersistence = true this.emitClientStatus() } - private async loadStreamFromPersistence(streamId: string) { + private async loadStreamFromPersistence( + streamId: string, + persistedData: LoadedStream | undefined, + ) { const allowGetStream = this.highPriorityIds.has(streamId) - return concurrencyLimit(async () => { + await concurrencyLimit(async () => { try { - await this.delegate.initStream(streamId, allowGetStream) + await this.delegate.initStream(streamId, allowGetStream, persistedData) this.loadedStreamCount++ this.numStreamsLoadedFromCache++ this.streamIds.delete(streamId) } catch (err) { this.streamCountRequiringNetworkAccess++ - this.log('Error initializing stream from persistence', streamId, err) + this.logError('Error initializing stream from persistence', streamId, err) } this.emitClientStatus() }) @@ -171,7 +210,7 @@ export class SyncedStreamsExtension { const syncItems = Array.from(this.streamIds).map((streamId) => { return { streamId, - priority: this.priorityFromStreamId(streamId), + priority: priorityFromStreamId(streamId, this.highPriorityIds), } satisfies StreamSyncItem }) syncItems.sort((a, b) => a.priority - b.priority) @@ -180,7 +219,7 @@ export class SyncedStreamsExtension { } private async loadStreamFromNetwork(streamId: string) { - this.log('Performance: adding stream from network', streamId) + this.logDebug('Performance: adding stream from network', streamId) return concurrencyLimit(async () => { try { await this.delegate.initStream(streamId, true) @@ -188,7 +227,6 @@ export class SyncedStreamsExtension { this.streamIds.delete(streamId) } catch (err) { this.logError('Error initializing stream', streamId, err) - this.log('Error initializing stream', streamId) this.numStreamsFailedToLoad++ } this.loadedStreamCount++ @@ -245,26 +283,44 @@ export class SyncedStreamsExtension { } } } +} - private priorityFromStreamId(streamId: string) { - if ( - isUserDeviceStreamId(streamId) || - isUserInboxStreamId(streamId) || - isUserStreamId(streamId) || - isUserSettingsStreamId(streamId) - ) { - return 0 - } - if (this.highPriorityIds.has(streamId)) { - return 1 +// priority from stream id for loading, we need spaces to structure the app, dms if that's what we're looking at +// and channels for any high priority spaces +function priorityFromStreamId(streamId: string, highPriorityIds: Set) { + if ( + isUserDeviceStreamId(streamId) || + isUserInboxStreamId(streamId) || + isUserStreamId(streamId) || + isUserSettingsStreamId(streamId) + ) { + return 0 + } + if (highPriorityIds.has(streamId)) { + return 1 + } + // if we're prioritizing dms, load other dms and gdm channels + if (highPriorityIds.size > 0) { + const firstHPI = Array.from(highPriorityIds.values())[0] + if (isDMChannelStreamId(firstHPI) || isGDMChannelStreamId(firstHPI)) { + if (isDMChannelStreamId(streamId) || isGDMChannelStreamId(streamId)) { + return 2 + } } + } - if (isSpaceStreamId(streamId)) { - return 2 - } - if (isChannelStreamId(streamId)) { - return 3 + // we need spaces to structure the app + if (isSpaceStreamId(streamId)) { + return 3 + } + + if (isChannelStreamId(streamId)) { + const spaceId = spaceIdFromChannelId(streamId) + if (highPriorityIds.has(spaceId)) { + return 4 + } else { + return 5 } - return 4 } + return 6 } diff --git a/packages/sdk/src/syncedStreamsLoop.ts b/packages/sdk/src/syncedStreamsLoop.ts index 3751dc8859..44390ed605 100644 --- a/packages/sdk/src/syncedStreamsLoop.ts +++ b/packages/sdk/src/syncedStreamsLoop.ts @@ -1,4 +1,4 @@ -import { Err, SyncCookie, SyncOp, SyncStreamsResponse } from '@river-build/proto' +import { SyncCookie, SyncOp, SyncStreamsResponse } from '@river-build/proto' import { DLogger, dlog, dlogError } from '@river-build/dlog' import { StreamRpcClient } from './makeStreamRpcClient' import { UnpackEnvelopeOpts, unpackStream, unpackStreamAndCookie } from './sign' @@ -6,10 +6,21 @@ import { SyncedStreamEvents } from './streamEvents' import TypedEmitter from 'typed-emitter' import { nanoid } from 'nanoid' import { isMobileSafari } from './utils' -import { streamIdAsBytes, streamIdAsString } from './id' +import { + spaceIdFromChannelId, + isDMChannelStreamId, + isGDMChannelStreamId, + isSpaceStreamId, + isUserDeviceStreamId, + isUserInboxStreamId, + isUserSettingsStreamId, + isUserStreamId, + streamIdAsBytes, + streamIdAsString, + isChannelStreamId, +} from './id' import { ParsedEvent, ParsedStreamResponse } from './types' -import { logNever } from './check' -import { errorContains } from './rpcInterceptors' +import { isDefined, logNever } from './check' export enum SyncState { Canceling = 'Canceling', // syncLoop, maybe syncId if was syncing, not is was starting or retrying @@ -81,6 +92,7 @@ export class SyncedStreamsLoop { private readonly streams: Map // loggers private readonly logSync: DLogger + private readonly logDebug: DLogger private readonly logError: DLogger // clientEmitter is used to proxy the events from the streams to the client private readonly clientEmitter: TypedEmitter @@ -113,6 +125,11 @@ export class SyncedStreamsLoop { // and are cleared when sync stops private responsesQueue: SyncStreamsResponse[] = [] private inProgressTick?: Promise + private pendingSyncCookies: string[] = [] + private inFlightSyncCookies = new Set() + private readonly MAX_IN_FLIGHT_COOKIES = 40 + private readonly MIN_IN_FLIGHT_COOKIES = 10 + public pingInfo: PingInfo = { currentSequence: 0, nonces: {}, @@ -124,6 +141,7 @@ export class SyncedStreamsLoop { streams: { syncCookie: SyncCookie; stream: ISyncedStream }[], logNamespace: string, readonly unpackEnvelopeOpts: UnpackEnvelopeOpts | undefined, + private highPriorityIds: Set, ) { this.rpcClient = rpcClient this.clientEmitter = clientEmitter @@ -133,7 +151,8 @@ export class SyncedStreamsLoop { { syncCookie, stream }, ]), ) - this.logSync = dlog('csb:cl:sync').extend(logNamespace) + this.logDebug = dlog('csb:cl:sync:debug').extend(logNamespace) + this.logSync = dlog('csb:cl:sync', { defaultEnabled: true }).extend(logNamespace) this.logError = dlogError('csb:cl:sync:stream').extend(logNamespace) } @@ -154,12 +173,12 @@ export class SyncedStreamsLoop { return this.syncId } - public async start(): Promise { + public start() { if (isMobileSafari()) { document.addEventListener('visibilitychange', this.onMobileSafariBackgrounded) } - await this.createSyncLoop() + this.createSyncLoop() } public async stop() { @@ -205,37 +224,14 @@ export class SyncedStreamsLoop { // adds stream to the sync subscription public async addStreamToSync(syncCookie: SyncCookie, stream: ISyncedStream): Promise { const streamId = streamIdAsString(syncCookie.streamId) - this.log('addStreamToSync', streamId) + this.logDebug('addStreamToSync', streamId) if (this.streams.has(streamId)) { this.log('stream already in sync', streamId) return } this.streams.set(streamId, { syncCookie, stream }) - - if (this.syncState === SyncState.Starting || this.syncState === SyncState.Retrying) { - await this.waitForSyncingState() - } - if (this.syncState === SyncState.Syncing) { - try { - await this.rpcClient.addStreamToSync({ - syncId: this.syncId, - syncPos: syncCookie, - }) - this.log('addStreamToSync complete', syncCookie) - } catch (err) { - // Trigger restart of sync loop - this.log(`addStreamToSync error`, err) - if (errorContains(err, Err.BAD_SYNC_COOKIE)) { - this.log('addStreamToSync BAD_SYNC_COOKIE', syncCookie) - throw err - } - } - } else { - this.log( - 'addStreamToSync: not in "syncing" state; let main sync loop handle this with its streams map', - { streamId: syncCookie.streamId, syncState: this.syncState }, - ) - } + this.pendingSyncCookies.push(streamId) + this.checkStartTicking() } // remove stream from the sync subsbscription @@ -247,6 +243,14 @@ export class SyncedStreamsLoop { // no such stream return } + const pendingIndex = this.pendingSyncCookies.indexOf(streamId) + if (pendingIndex !== -1) { + this.pendingSyncCookies.splice(pendingIndex, 1) + streamRecord.stream.stop() + this.streams.delete(streamId) + this.log('removed stream from pending sync', streamId) + return + } if (this.syncState === SyncState.Starting || this.syncState === SyncState.Retrying) { await this.waitForSyncingState() } @@ -272,170 +276,165 @@ export class SyncedStreamsLoop { } } - private async createSyncLoop() { - return new Promise((resolve, reject) => { - if (stateConstraints[this.syncState].has(SyncState.Starting)) { - this.setSyncState(SyncState.Starting) - this.log('starting sync loop') - } else { - this.log( - 'runSyncLoop: invalid state transition', - this.syncState, - '->', - SyncState.Starting, - ) - reject(new Error('invalid state transition')) - } + public setHighPriorityStreams(streamIds: string[]) { + this.highPriorityIds = new Set(streamIds) + } - if (this.syncLoop) { - reject(new Error('createSyncLoop called while a loop exists')) - } + private createSyncLoop() { + if (stateConstraints[this.syncState].has(SyncState.Starting)) { + this.setSyncState(SyncState.Starting) + this.log('starting sync loop') + } else { + this.log( + 'runSyncLoop: invalid state transition', + this.syncState, + '->', + SyncState.Starting, + ) + throw new Error('invalid state transition') + } - this.syncLoop = (async (): Promise => { - let iteration = 0 + if (this.syncLoop) { + throw new Error('createSyncLoop called while a loop exists') + } - this.log('sync loop created') - resolve() + this.syncLoop = (async (): Promise => { + let iteration = 0 - try { - while ( - this.syncState === SyncState.Starting || - this.syncState === SyncState.Syncing || - this.syncState === SyncState.Retrying - ) { - this.log('sync ITERATION start', ++iteration, this.syncState) - if (this.syncState === SyncState.Retrying) { - this.setSyncState(SyncState.Starting) - } + this.log('sync loop created') - // get cookies from all the known streams to sync - const syncCookies = Array.from(this.streams.values()).map( - (streamRecord) => streamRecord.syncCookie, - ) + try { + while ( + this.syncState === SyncState.Starting || + this.syncState === SyncState.Syncing || + this.syncState === SyncState.Retrying + ) { + this.log('sync ITERATION start', ++iteration, this.syncState) + if (this.syncState === SyncState.Retrying) { + this.setSyncState(SyncState.Starting) + } - try { - // syncId needs to be reset before starting a new syncStreams - // syncStreams() should return a new syncId - this.syncId = undefined - const streams = this.rpcClient.syncStreams( - { - syncPos: syncCookies, - }, - { timeoutMs: -1 }, - ) + // get cookies from all the known streams to sync + this.pendingSyncCookies = Array.from(this.streams.keys()) + try { + // syncId needs to be reset before starting a new syncStreams + // syncStreams() should return a new syncId + this.syncId = undefined + const streams = this.rpcClient.syncStreams( + { + syncPos: [], + }, + { timeoutMs: -1 }, + ) - const iterator = streams[Symbol.asyncIterator]() - - while ( - this.syncState === SyncState.Syncing || - this.syncState === SyncState.Starting - ) { - const interruptSyncPromise = new Promise( - (resolve, reject) => { - this.forceStopSyncStreams = () => { - this.log('forceStopSyncStreams called') - resolve() - } - this.interruptSync = (e: unknown) => { - this.logError('sync interrupted', e) - reject(e) - } - }, - ) - const { value, done } = await Promise.race([ - iterator.next(), - interruptSyncPromise.then(() => ({ - value: undefined, - done: true, - })), - ]) - if (done || value === undefined) { - this.log('exiting syncStreams', done, value) - // exit the syncLoop, it's done - this.forceStopSyncStreams = undefined - this.interruptSync = undefined - return iteration - } + const iterator = streams[Symbol.asyncIterator]() - this.log( - 'got syncStreams response', - 'syncOp', - value.syncOp, - 'syncId', - value.syncId, - ) - - if (!value.syncId || !value.syncOp) { - this.log('missing syncId or syncOp', value) - continue + while ( + this.syncState === SyncState.Syncing || + this.syncState === SyncState.Starting + ) { + const interruptSyncPromise = new Promise((resolve, reject) => { + this.forceStopSyncStreams = () => { + this.log('forceStopSyncStreams called') + resolve() } - let pingStats: NonceStats | undefined - switch (value.syncOp) { - case SyncOp.SYNC_NEW: - this.syncStarted(value.syncId) - break - case SyncOp.SYNC_CLOSE: - this.syncClosed() - break - case SyncOp.SYNC_UPDATE: - this.responsesQueue.push(value) - this.checkStartTicking() - break - case SyncOp.SYNC_PONG: - pingStats = this.pingInfo.nonces[value.pongNonce] - if (pingStats) { - pingStats.receivedAt = performance.now() - pingStats.duration = - pingStats.receivedAt - pingStats.pingAt - } else { - this.logError('pong nonce not found', value.pongNonce) - this.printNonces() - } - break - case SyncOp.SYNC_DOWN: - this.syncDown(value.streamId) - break - default: - logNever( - value.syncOp, - `unknown syncOp { syncId: ${this.syncId}, syncOp: ${value.syncOp} }`, - ) - break + this.interruptSync = (e: unknown) => { + this.logError('sync interrupted', e) + reject(e) } + }) + const { value, done } = await Promise.race([ + iterator.next(), + interruptSyncPromise.then(() => ({ + value: undefined, + done: true, + })), + ]) + if (done || value === undefined) { + this.log('exiting syncStreams', done, value) + // exit the syncLoop, it's done + this.forceStopSyncStreams = undefined + this.interruptSync = undefined + return iteration + } + + this.logDebug( + 'got syncStreams response', + 'syncOp', + value.syncOp, + 'syncId', + value.syncId, + ) + + if (!value.syncId || !value.syncOp) { + this.log('missing syncId or syncOp', value) + continue + } + let pingStats: NonceStats | undefined + switch (value.syncOp) { + case SyncOp.SYNC_NEW: + this.syncStarted(value.syncId) + break + case SyncOp.SYNC_CLOSE: + this.syncClosed() + break + case SyncOp.SYNC_UPDATE: + this.responsesQueue.push(value) + this.checkStartTicking() + break + case SyncOp.SYNC_PONG: + pingStats = this.pingInfo.nonces[value.pongNonce] + if (pingStats) { + pingStats.receivedAt = performance.now() + pingStats.duration = pingStats.receivedAt - pingStats.pingAt + } else { + this.logError('pong nonce not found', value.pongNonce) + this.printNonces() + } + break + case SyncOp.SYNC_DOWN: + this.syncDown(value.streamId) + break + default: + logNever( + value.syncOp, + `unknown syncOp { syncId: ${this.syncId}, syncOp: ${value.syncOp} }`, + ) + break } - } catch (err) { - this.logError('syncLoop error', err) - await this.attemptRetry() } + } catch (err) { + this.logError('syncLoop error', err) + await this.attemptRetry() } - } finally { - this.log('sync loop stopping ITERATION', { - iteration, - syncState: this.syncState, + } + } finally { + this.log('sync loop stopping ITERATION', { + iteration, + syncState: this.syncState, + }) + this.stopPing() + if (stateConstraints[this.syncState].has(SyncState.NotSyncing)) { + this.setSyncState(SyncState.NotSyncing) + this.streams.forEach((streamRecord) => { + streamRecord.stream.stop() }) - this.stopPing() - if (stateConstraints[this.syncState].has(SyncState.NotSyncing)) { - this.setSyncState(SyncState.NotSyncing) - this.streams.forEach((streamRecord) => { - streamRecord.stream.stop() - }) - this.streams.clear() - this.releaseRetryWait = undefined - this.syncId = undefined - this.clientEmitter.emit('streamSyncActive', false) - } else { - this.log( - 'onStopped: invalid state transition', - this.syncState, - '->', - SyncState.NotSyncing, - ) - } - this.log('sync loop stopped ITERATION', iteration) + this.streams.clear() + this.releaseRetryWait = undefined + this.syncId = undefined + this.clientEmitter.emit('streamSyncActive', false) + } else { + this.log( + 'onStopped: invalid state transition', + this.syncState, + '->', + SyncState.NotSyncing, + ) } - return iteration - })() - }) + this.log('sync loop stopped ITERATION', iteration) + } + return iteration + })() } private onMobileSafariBackgrounded = () => { @@ -454,7 +453,7 @@ export class SyncedStreamsLoop { return } - if (this.responsesQueue.length === 0) { + if (this.responsesQueue.length === 0 && this.pendingSyncCookies.length === 0) { return } @@ -473,6 +472,33 @@ export class SyncedStreamsLoop { } private async tick() { + if (this.syncState === SyncState.Syncing) { + if ( + this.inFlightSyncCookies.size <= this.MIN_IN_FLIGHT_COOKIES && + this.pendingSyncCookies.length > 0 + ) { + this.pendingSyncCookies.sort((a, b) => { + const aPriority = priorityFromStreamId(a, this.highPriorityIds) + const bPriority = priorityFromStreamId(b, this.highPriorityIds) + return aPriority - bPriority + }) + const streamsToAdd = this.pendingSyncCookies.splice(0, this.MAX_IN_FLIGHT_COOKIES) + streamsToAdd.forEach((x) => this.inFlightSyncCookies.add(x)) + const syncPos = streamsToAdd.map((x) => this.streams.get(x)?.syncCookie) + this.logSync('tick: modifySync', { + syncId: this.syncId, + addStreams: streamsToAdd, + }) + try { + await this.rpcClient.modifySync({ + syncId: this.syncId, + addStreams: syncPos.filter(isDefined), + }) + } catch (err) { + this.logError('modifySync error', err) + } + } + } const item = this.responsesQueue.shift() if (!item || item.syncId !== this.syncId) { return @@ -574,6 +600,7 @@ export class SyncedStreamsLoop { this.log('syncStarted', 'syncId', this.syncId) this.clientEmitter.emit('streamSyncActive', true) this.log('emitted streamSyncActive', true) + this.checkStartTicking() } else { this.log( 'syncStarted: invalid state transition', @@ -674,6 +701,7 @@ export class SyncedStreamsLoop { */ const streamIdBytes = syncStream.nextSyncCookie?.streamId ?? Uint8Array.from([]) const streamId = streamIdAsString(streamIdBytes) + this.inFlightSyncCookies.delete(streamId) const streamRecord = this.streams.get(streamId) if (streamRecord === undefined) { this.log('sync got stream', streamId, 'NOT FOUND') @@ -789,3 +817,45 @@ export class SyncedStreamsLoop { this.logSync(...args) } } + +// priority from stream id for syncing, dms if that's what we're looking at +// channels for any high priority spaces are more important than spaces we're not looking at +// then spaces, then other channels +function priorityFromStreamId(streamId: string, highPriorityIds: Set) { + if ( + isUserDeviceStreamId(streamId) || + isUserInboxStreamId(streamId) || + isUserStreamId(streamId) || + isUserSettingsStreamId(streamId) + ) { + return 0 + } + if (highPriorityIds.has(streamId)) { + return 1 + } + + if (isChannelStreamId(streamId)) { + const spaceId = spaceIdFromChannelId(streamId) + if (highPriorityIds.has(spaceId)) { + return 2 + } else { + return 3 + } + } + // if we're prioritizing dms, load other dms and gdm channels + if (highPriorityIds.size > 0) { + const firstHPI = Array.from(highPriorityIds.values())[0] + if (isDMChannelStreamId(firstHPI) || isGDMChannelStreamId(firstHPI)) { + if (isDMChannelStreamId(streamId) || isGDMChannelStreamId(streamId)) { + return 4 + } + } + } + + // we need spaces to structure the app + if (isSpaceStreamId(streamId)) { + return 5 + } + + return 6 +} diff --git a/packages/sdk/src/tests/multi_ne/syncedStreams.test.ts b/packages/sdk/src/tests/multi_ne/syncedStreams.test.ts index cfcbeb8797..e0caeba6df 100644 --- a/packages/sdk/src/tests/multi_ne/syncedStreams.test.ts +++ b/packages/sdk/src/tests/multi_ne/syncedStreams.test.ts @@ -94,7 +94,7 @@ describe('syncStreams', () => { ) await userInboxStream.initializeFromResponse(userInboxStreamResponse) - await alicesSyncedStreams.startSyncStreams() + alicesSyncedStreams.startSyncStreams() await done1.promise alicesSyncedStreams.set(alicesUserInboxStreamIdStr, userInboxStream)