diff --git a/packages/encryption/src/decryptionExtensions.ts b/packages/encryption/src/decryptionExtensions.ts index 2686733b0..0bef0d781 100644 --- a/packages/encryption/src/decryptionExtensions.ts +++ b/packages/encryption/src/decryptionExtensions.ts @@ -107,6 +107,13 @@ class StreamTasks { this.keySolicitations.sort((a, b) => a.respondAfter - b.respondAfter) this.keySolicitationsNeedsSort = false } + isEmpty() { + return ( + this.encryptedContent.length === 0 && + this.keySolicitations.length === 0 && + !this.isMissingKeys + ) + } } class StreamQueues { @@ -124,11 +131,7 @@ class StreamQueues { } isEmpty() { for (const tasks of this.streams.values()) { - if ( - tasks.encryptedContent.length > 0 || - tasks.keySolicitations.length > 0 || - tasks.isMissingKeys - ) { + if (!tasks.isEmpty()) { return false } } @@ -394,11 +397,16 @@ export abstract class BaseDecryptionExtensions { // enqueue a task to upload device keys this.mainQueues.priorityTasks.push(() => this.uploadDeviceKeys()) // enqueue a task to download new to-device messages - this.mainQueues.priorityTasks.push(() => this.downloadNewMessages()) + this.enqueueNewMessageDownload() // start the tick loop this.checkStartTicking() } + // enqueue a task to download new to-device messages, should be safe to call multiple times + public enqueueNewMessageDownload() { + this.mainQueues.priorityTasks.push(() => this.downloadNewMessages()) + } + public onStart(): void { // let the subclass override and do any custom startup tasks } @@ -454,6 +462,18 @@ export abstract class BaseDecryptionExtensions { .map(([key, q]) => `${key}: ${q.length}`) .join(', ')} ${this.streamQueues.toString()}`, ) + this.log.info( + `priorityTasks: ${Array.from(this.streamQueues.streams.entries()) + .filter(([_, value]) => !value.isEmpty()) + .map(([key, _]) => key) + .sort( + (a, b) => + this.getPriorityForStream(a, this.highPriorityIds) - + this.getPriorityForStream(b, this.highPriorityIds), + ) + .slice(0, 4) + .join(', ')}`, + ) this.lastPrintedAt = Date.now() } diff --git a/packages/sdk/src/clientDecryptionExtensions.ts b/packages/sdk/src/clientDecryptionExtensions.ts index e081971be..0784803c4 100644 --- a/packages/sdk/src/clientDecryptionExtensions.ts +++ b/packages/sdk/src/clientDecryptionExtensions.ts @@ -89,6 +89,12 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions { }[], ) => this.enqueueInitKeySolicitations(streamId, members) + const onStreamInitialized = (streamId: string) => { + if (isUserInboxStreamId(streamId)) { + this.enqueueNewMessageDownload() + } + } + client.on('streamUpToDate', onStreamUpToDate) client.on('newGroupSessions', onNewGroupSessions) client.on('newEncryptedContent', onNewEncryptedContent) @@ -96,6 +102,7 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions { client.on('updatedKeySolicitation', onKeySolicitation) client.on('initKeySolicitations', onInitKeySolicitations) client.on('streamNewUserJoined', onMembershipChange) + client.on('streamInitialized', onStreamInitialized) this._onStopFn = () => { client.off('streamUpToDate', onStreamUpToDate) @@ -105,6 +112,7 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions { client.off('updatedKeySolicitation', onKeySolicitation) client.off('initKeySolicitations', onInitKeySolicitations) client.off('streamNewUserJoined', onMembershipChange) + client.off('streamInitialized', onStreamInitialized) } this.log.debug('new ClientDecryptionExtensions', { userDevice }) } @@ -135,6 +143,7 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions { } public downloadNewMessages(): Promise { + this.log.info('downloadNewInboxMessages') return this.client.downloadNewInboxMessages() }