Skip to content

Commit

Permalink
Update synced streams loop to “modify stream” and add a few streams a…
Browse files Browse the repository at this point in the history
…t a time (#2205)

keeps us from getting the “buffer full” messages
  • Loading branch information
texuf authored Feb 4, 2025
1 parent 8598b59 commit 30ed4aa
Show file tree
Hide file tree
Showing 10 changed files with 630 additions and 415 deletions.
8 changes: 4 additions & 4 deletions packages/encryption/src/decryptionExtensions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ export abstract class BaseDecryptionExtensions {
keySolicitation: KeySolicitationContent,
): void {
if (keySolicitation.deviceKey === this.userDevice.deviceKey) {
this.log.debug('ignoring key solicitation for our own device')
//this.log.debug('ignoring key solicitation for our own device')
return
}
const selectedQueue =
Expand All @@ -309,7 +309,7 @@ export abstract class BaseDecryptionExtensions {
selectedQueue.splice(index, 1)
}
if (keySolicitation.sessionIds.length > 0 || keySolicitation.isNewDevice) {
this.log.debug('new key solicitation', { fromUserId, streamId, keySolicitation })
//this.log.debug('new key solicitation', { fromUserId, streamId, keySolicitation })
this.keySolicitationsNeedsSort = true
selectedQueue.push({
streamId,
Expand All @@ -322,12 +322,12 @@ export abstract class BaseDecryptionExtensions {
} satisfies KeySolicitationItem)
this.checkStartTicking()
} else if (index > -1) {
this.log.debug('cleared key solicitation', keySolicitation)
//this.log.debug('cleared key solicitation', keySolicitation)
}
}

public setStreamUpToDate(streamId: string): void {
this.log.debug('streamUpToDate', streamId)
//this.log.debug('streamUpToDate', streamId)
this.upToDateStreams.add(streamId)
this.checkStartTicking()
}
Expand Down
46 changes: 30 additions & 16 deletions packages/sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,12 @@ import { Stream } from './stream'
import { getTime, usernameChecksum } from './utils'
import { isEncryptedContentKind, toDecryptedContent } from './encryptedContentTypes'
import { ClientDecryptionExtensions } from './clientDecryptionExtensions'
import { PersistenceStore, IPersistenceStore, StubPersistenceStore } from './persistenceStore'
import {
PersistenceStore,
IPersistenceStore,
StubPersistenceStore,
LoadedStream,
} from './persistenceStore'
import { SyncedStreams } from './syncedStreams'
import { SyncState } from './syncedStreamsLoop'
import { SyncedStream } from './syncedStream'
Expand Down Expand Up @@ -265,17 +270,21 @@ export class Client
}

this.streams = new SyncedStreams(this.userId, this.rpcClient, this, this.unpackEnvelopeOpts)
this.syncedStreamsExtensions = new SyncedStreamsExtension({
startSyncStreams: async () => {
await this.streams.startSyncStreams()
this.decryptionExtensions?.start()
this.mlsAdapter?.start()
this.syncedStreamsExtensions = new SyncedStreamsExtension(
highPriorityStreamIds,
{
startSyncStreams: async () => {
this.mlsAdapter?.start()
this.streams.startSyncStreams()
this.decryptionExtensions?.start()
},
initStream: (streamId, allowGetStream, persistedData) =>
this.initStream(streamId, allowGetStream, persistedData),
emitClientInitStatus: (status) => this.emit('clientInitStatusUpdated', status),
},
initStream: (streamId, allowGetStream) => this.initStream(streamId, allowGetStream),
emitClientInitStatus: (status) => this.emit('clientInitStatusUpdated', status),
})
this.persistenceStore,
)

this.syncedStreamsExtensions.setHighPriority(highPriorityStreamIds ?? [])
this.logCall('new Client')
}

Expand Down Expand Up @@ -391,7 +400,7 @@ export class Client
encryptionDeviceInit?: EncryptionDeviceInitOpts
}): Promise<{
initCryptoTime: number
initMlsTime: number
//initMlsTime: number
initUserStreamTime: number
initUserInboxStreamTime: number
initUserMetadataStreamTime: number
Expand All @@ -407,11 +416,11 @@ export class Client
this.logCall('initializeUser', this.userId)
assert(this.userStreamId === undefined, 'already initialized')
const initCrypto = await getTime(() => this.initCrypto(opts?.encryptionDeviceInit))
const initMls = await getTime(() => this.initMls())
//const initMls = await getTime(() => this.initMls())

check(isDefined(this.decryptionExtensions), 'decryptionExtensions must be defined')
check(isDefined(this.syncedStreamsExtensions), 'syncedStreamsExtensions must be defined')
check(isDefined(this.mlsAdapter), 'mlsAdapter must be defined')
//check(isDefined(this.mlsAdapter), 'mlsAdapter must be defined')

const [
initUserStream,
Expand All @@ -437,7 +446,7 @@ export class Client

return {
initCryptoTime: initCrypto.time,
initMlsTime: initMls.time,
//initMlsTime: initMls.time,
initUserStreamTime: initUserStream.time,
initUserInboxStreamTime: initUserInboxStream.time,
initUserMetadataStreamTime: initUserMetadataStream.time,
Expand Down Expand Up @@ -1402,14 +1411,15 @@ export class Client
async initStream(
streamId: string | Uint8Array,
allowGetStream: boolean = true,
persistedData?: LoadedStream,
): Promise<Stream> {
const streamIdStr = streamIdAsString(streamId)
const existingRequest = this.initStreamRequests.get(streamIdStr)
if (existingRequest) {
this.logCall('initStream: had existing request for', streamIdStr, 'returning promise')
return existingRequest
}
const request = this._initStream(streamId, allowGetStream)
const request = this._initStream(streamId, allowGetStream, persistedData)
this.initStreamRequests.set(streamIdStr, request)
let stream: Stream
try {
Expand All @@ -1423,6 +1433,7 @@ export class Client
private async _initStream(
streamId: string | Uint8Array,
allowGetStream: boolean = true,
persistedData?: LoadedStream,
): Promise<Stream> {
try {
this.logCall('initStream', streamId)
Expand All @@ -1439,7 +1450,7 @@ export class Client
const stream = this.createSyncedStream(streamId)

// Try initializing from persistence
if (await stream.initializeFromPersistence()) {
if (await stream.initializeFromPersistence(persistedData)) {
if (stream.view.syncCookie) {
await this.streams.addStreamToSync(stream.view.syncCookie)
}
Expand Down Expand Up @@ -2543,7 +2554,10 @@ export class Client
}

public setHighPriorityStreams(streamIds: string[]) {
this.logCall('setHighPriorityStreams', streamIds)
this.decryptionExtensions?.setHighPriorityStreams(streamIds)
this.syncedStreamsExtensions?.setHighPriorityStreams(streamIds)
this.streams.setHighPriorityStreams(streamIds)
}

public async ensureOutboundSession(
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/clientDecryptionExtensions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions {
const numMembers = stream.view.getMembers().joinedParticipants().size
const maxWaitTimeSeconds = Math.max(5, Math.min(30, numMembers))
const waitTime = maxWaitTimeSeconds * 1000 * Math.random() // this could be much better
this.log.debug('getRespondDelayMSForKeySolicitation', { streamId, userId, waitTime })
//this.log.debug('getRespondDelayMSForKeySolicitation', { streamId, userId, waitTime })
return waitTime * multiplier
}

Expand Down
Loading

0 comments on commit 30ed4aa

Please sign in to comment.