From 74afbfac93e4708aa1e8f8416deeb901f476fc39 Mon Sep 17 00:00:00 2001 From: texuf Date: Wed, 5 Feb 2025 20:44:50 -0800 Subject: [PATCH] Cover edge case where we lose sync and recover and re-initialize the user inbox stream (#2267) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit if lots of keys have come in in the meantime we will not download them until the user refreshes the page. If a key comes in via sync, we would reset the indexes in the stream and never download them. This is probably the long standing bug where some random message doesn’t sync randomly. --- .../encryption/src/decryptionExtensions.ts | 32 +++++++++++++++---- .../sdk/src/clientDecryptionExtensions.ts | 9 ++++++ 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/packages/encryption/src/decryptionExtensions.ts b/packages/encryption/src/decryptionExtensions.ts index 2686733b0..0bef0d781 100644 --- a/packages/encryption/src/decryptionExtensions.ts +++ b/packages/encryption/src/decryptionExtensions.ts @@ -107,6 +107,13 @@ class StreamTasks { this.keySolicitations.sort((a, b) => a.respondAfter - b.respondAfter) this.keySolicitationsNeedsSort = false } + isEmpty() { + return ( + this.encryptedContent.length === 0 && + this.keySolicitations.length === 0 && + !this.isMissingKeys + ) + } } class StreamQueues { @@ -124,11 +131,7 @@ class StreamQueues { } isEmpty() { for (const tasks of this.streams.values()) { - if ( - tasks.encryptedContent.length > 0 || - tasks.keySolicitations.length > 0 || - tasks.isMissingKeys - ) { + if (!tasks.isEmpty()) { return false } } @@ -394,11 +397,16 @@ export abstract class BaseDecryptionExtensions { // enqueue a task to upload device keys this.mainQueues.priorityTasks.push(() => this.uploadDeviceKeys()) // enqueue a task to download new to-device messages - this.mainQueues.priorityTasks.push(() => this.downloadNewMessages()) + this.enqueueNewMessageDownload() // start the tick loop this.checkStartTicking() } + // enqueue a task to download new to-device messages, should be safe to call multiple times + public enqueueNewMessageDownload() { + this.mainQueues.priorityTasks.push(() => this.downloadNewMessages()) + } + public onStart(): void { // let the subclass override and do any custom startup tasks } @@ -454,6 +462,18 @@ export abstract class BaseDecryptionExtensions { .map(([key, q]) => `${key}: ${q.length}`) .join(', ')} ${this.streamQueues.toString()}`, ) + this.log.info( + `priorityTasks: ${Array.from(this.streamQueues.streams.entries()) + .filter(([_, value]) => !value.isEmpty()) + .map(([key, _]) => key) + .sort( + (a, b) => + this.getPriorityForStream(a, this.highPriorityIds) - + this.getPriorityForStream(b, this.highPriorityIds), + ) + .slice(0, 4) + .join(', ')}`, + ) this.lastPrintedAt = Date.now() } diff --git a/packages/sdk/src/clientDecryptionExtensions.ts b/packages/sdk/src/clientDecryptionExtensions.ts index e081971be..0784803c4 100644 --- a/packages/sdk/src/clientDecryptionExtensions.ts +++ b/packages/sdk/src/clientDecryptionExtensions.ts @@ -89,6 +89,12 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions { }[], ) => this.enqueueInitKeySolicitations(streamId, members) + const onStreamInitialized = (streamId: string) => { + if (isUserInboxStreamId(streamId)) { + this.enqueueNewMessageDownload() + } + } + client.on('streamUpToDate', onStreamUpToDate) client.on('newGroupSessions', onNewGroupSessions) client.on('newEncryptedContent', onNewEncryptedContent) @@ -96,6 +102,7 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions { client.on('updatedKeySolicitation', onKeySolicitation) client.on('initKeySolicitations', onInitKeySolicitations) client.on('streamNewUserJoined', onMembershipChange) + client.on('streamInitialized', onStreamInitialized) this._onStopFn = () => { client.off('streamUpToDate', onStreamUpToDate) @@ -105,6 +112,7 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions { client.off('updatedKeySolicitation', onKeySolicitation) client.off('initKeySolicitations', onInitKeySolicitations) client.off('streamNewUserJoined', onMembershipChange) + client.off('streamInitialized', onStreamInitialized) } this.log.debug('new ClientDecryptionExtensions', { userDevice }) } @@ -135,6 +143,7 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions { } public downloadNewMessages(): Promise { + this.log.info('downloadNewInboxMessages') return this.client.downloadNewInboxMessages() }