Skip to content

Commit

Permalink
Cover edge case where we lose sync and recover and re-initialize the …
Browse files Browse the repository at this point in the history
…user inbox stream (#2267)

if lots of keys have come in in the meantime we will not download them
until the user refreshes the page. If a key comes in via sync, we would
reset the indexes in the stream and never download them.

This is probably the long standing bug where some random message doesn’t
sync randomly.
  • Loading branch information
texuf authored Feb 6, 2025
1 parent ed85fca commit 74afbfa
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 6 deletions.
32 changes: 26 additions & 6 deletions packages/encryption/src/decryptionExtensions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ class StreamTasks {
this.keySolicitations.sort((a, b) => a.respondAfter - b.respondAfter)
this.keySolicitationsNeedsSort = false
}
isEmpty() {
return (
this.encryptedContent.length === 0 &&
this.keySolicitations.length === 0 &&
!this.isMissingKeys
)
}
}

class StreamQueues {
Expand All @@ -124,11 +131,7 @@ class StreamQueues {
}
isEmpty() {
for (const tasks of this.streams.values()) {
if (
tasks.encryptedContent.length > 0 ||
tasks.keySolicitations.length > 0 ||
tasks.isMissingKeys
) {
if (!tasks.isEmpty()) {
return false
}
}
Expand Down Expand Up @@ -394,11 +397,16 @@ export abstract class BaseDecryptionExtensions {
// enqueue a task to upload device keys
this.mainQueues.priorityTasks.push(() => this.uploadDeviceKeys())
// enqueue a task to download new to-device messages
this.mainQueues.priorityTasks.push(() => this.downloadNewMessages())
this.enqueueNewMessageDownload()
// start the tick loop
this.checkStartTicking()
}

// enqueue a task to download new to-device messages, should be safe to call multiple times
public enqueueNewMessageDownload() {
this.mainQueues.priorityTasks.push(() => this.downloadNewMessages())
}

public onStart(): void {
// let the subclass override and do any custom startup tasks
}
Expand Down Expand Up @@ -454,6 +462,18 @@ export abstract class BaseDecryptionExtensions {
.map(([key, q]) => `${key}: ${q.length}`)
.join(', ')} ${this.streamQueues.toString()}`,
)
this.log.info(
`priorityTasks: ${Array.from(this.streamQueues.streams.entries())
.filter(([_, value]) => !value.isEmpty())
.map(([key, _]) => key)
.sort(
(a, b) =>
this.getPriorityForStream(a, this.highPriorityIds) -
this.getPriorityForStream(b, this.highPriorityIds),
)
.slice(0, 4)
.join(', ')}`,
)
this.lastPrintedAt = Date.now()
}

Expand Down
9 changes: 9 additions & 0 deletions packages/sdk/src/clientDecryptionExtensions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,20 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions {
}[],
) => this.enqueueInitKeySolicitations(streamId, members)

const onStreamInitialized = (streamId: string) => {
if (isUserInboxStreamId(streamId)) {
this.enqueueNewMessageDownload()
}
}

client.on('streamUpToDate', onStreamUpToDate)
client.on('newGroupSessions', onNewGroupSessions)
client.on('newEncryptedContent', onNewEncryptedContent)
client.on('newKeySolicitation', onKeySolicitation)
client.on('updatedKeySolicitation', onKeySolicitation)
client.on('initKeySolicitations', onInitKeySolicitations)
client.on('streamNewUserJoined', onMembershipChange)
client.on('streamInitialized', onStreamInitialized)

this._onStopFn = () => {
client.off('streamUpToDate', onStreamUpToDate)
Expand All @@ -105,6 +112,7 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions {
client.off('updatedKeySolicitation', onKeySolicitation)
client.off('initKeySolicitations', onInitKeySolicitations)
client.off('streamNewUserJoined', onMembershipChange)
client.off('streamInitialized', onStreamInitialized)
}
this.log.debug('new ClientDecryptionExtensions', { userDevice })
}
Expand Down Expand Up @@ -135,6 +143,7 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions {
}

public downloadNewMessages(): Promise<void> {
this.log.info('downloadNewInboxMessages')
return this.client.downloadNewInboxMessages()
}

Expand Down

0 comments on commit 74afbfa

Please sign in to comment.