From 16c807e00154aeaeca948f8f9b94c0a1638e5a73 Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Fri, 10 Jan 2025 17:33:58 +0100 Subject: [PATCH 01/17] Fleshing out QueueService --- .../sdk/src/mls/epoch/epochSecretService.ts | 13 ++- .../src/mls/externalGroup/externalCrypto.ts | 9 ++ .../mls/externalGroup/externalGroupService.ts | 16 +++ packages/sdk/src/mls/group/crypto.ts | 4 + packages/sdk/src/mls/group/groupService.ts | 4 + packages/sdk/src/mls/queue/queueService.ts | 109 ++++++++++++++++++ 6 files changed, 154 insertions(+), 1 deletion(-) create mode 100644 packages/sdk/src/mls/queue/queueService.ts diff --git a/packages/sdk/src/mls/epoch/epochSecretService.ts b/packages/sdk/src/mls/epoch/epochSecretService.ts index 712ccd55e1..ce4e1ec002 100644 --- a/packages/sdk/src/mls/epoch/epochSecretService.ts +++ b/packages/sdk/src/mls/epoch/epochSecretService.ts @@ -7,11 +7,14 @@ import { } from '@river-build/mls-rs-wasm' import { bin_toHexString, DLogger, shortenHexString } from '@river-build/dlog' import { DerivedKeys, EpochSecret, EpochSecretId, epochSecretId } from './epochSecret' -import { EncryptedData } from '@river-build/proto' +import { EncryptedData, MemberPayload_Mls_EpochSecrets } from '@river-build/proto' import { IEpochSecretStore } from './epochSecretStore' +import { PlainMessage } from '@bufbuild/protobuf' const MLS_ALGORITHM = 'mls_0.0.1' +type EpochSecretsMessage = PlainMessage + export class EpochSecretService { private epochSecretStore: IEpochSecretStore private cipherSuite: MlsCipherSuite @@ -226,4 +229,12 @@ export class EpochSecretService { const ciphertext_ = HpkeCiphertext.fromBytes(ciphertext) return await this.cipherSuite.open(ciphertext_, secretKey_, publicKey_) } + + public async handleEpochSecrets(_streamId: string, _message: EpochSecretsMessage) { + throw new Error('Not implemented') + } + + public epochSecretMessage(streamId: string): EpochSecretsMessage { + throw new Error('Not implemented') + } } diff --git a/packages/sdk/src/mls/externalGroup/externalCrypto.ts b/packages/sdk/src/mls/externalGroup/externalCrypto.ts index 02bb1618a6..461197dfac 100644 --- a/packages/sdk/src/mls/externalGroup/externalCrypto.ts +++ b/packages/sdk/src/mls/externalGroup/externalCrypto.ts @@ -23,4 +23,13 @@ export class ExternalCrypto { public exportTree(group: ExternalGroup): Uint8Array { return group.externalGroup.exportTree() } + + public async externalGroupSnapshot( + groupInfoMessage: Uint8Array, + exportedTree: Uint8Array, + ): Promise { + const externalClient = new MlsExternalClient() + const externalGroup = await externalClient.observeGroup(groupInfoMessage, exportedTree) + return externalGroup.snapshot().toBytes() + } } diff --git a/packages/sdk/src/mls/externalGroup/externalGroupService.ts b/packages/sdk/src/mls/externalGroup/externalGroupService.ts index ebbc551e69..66511899e1 100644 --- a/packages/sdk/src/mls/externalGroup/externalGroupService.ts +++ b/packages/sdk/src/mls/externalGroup/externalGroupService.ts @@ -42,6 +42,7 @@ export class ExternalGroupService { this.externalGroupCache.delete(streamId) } + // change it so it accepts maybe undefined exernal group public async handleInitializeGroup(streamId: string, message: InitializeGroupMessage) { this.log.debug('handleInitializeGroup', { streamId }) @@ -59,6 +60,7 @@ export class ExternalGroupService { this.externalGroupCache.set(streamId, group) } + // TODO: change it so it accepts maybe undefined external group public async handleExternalJoin(streamId: string, message: ExternalJoinMessage) { this.log.debug('handleExternalJoin', { streamId }) @@ -77,4 +79,18 @@ export class ExternalGroupService { return this.crypto.exportTree(group) } + + public externalGroupSnapshot( + streamId: string, + groupInfoMessage: Uint8Array, + exportedTree: Uint8Array, + ): Promise { + this.log.debug('externalGroupSnapshot', { streamId }) + + return this.crypto.externalGroupSnapshot(groupInfoMessage, exportedTree) + } + + public latestGroupInfo(externalGroup: ExternalGroup): Uint8Array { + throw new Error('Not implemented') + } } diff --git a/packages/sdk/src/mls/group/crypto.ts b/packages/sdk/src/mls/group/crypto.ts index 3f5e099562..6ae58797cd 100644 --- a/packages/sdk/src/mls/group/crypto.ts +++ b/packages/sdk/src/mls/group/crypto.ts @@ -100,6 +100,10 @@ export class Crypto { return group.group.currentEpoch } + public exportTree(group: Group): Uint8Array { + return group.group.exportTree().toBytes() + } + public signaturePublicKey(): Uint8Array { if (!this.client) { this.log.error('signaturePublicKey: Client not initialized') diff --git a/packages/sdk/src/mls/group/groupService.ts b/packages/sdk/src/mls/group/groupService.ts index fffaadd5de..ead5188ad0 100644 --- a/packages/sdk/src/mls/group/groupService.ts +++ b/packages/sdk/src/mls/group/groupService.ts @@ -212,6 +212,10 @@ export class GroupService { } } + public exportTree(group: Group): Uint8Array { + return this.crypto.exportTree(group) + } + private getSignaturePublicKey(): Uint8Array { return this.crypto.signaturePublicKey() } diff --git a/packages/sdk/src/mls/queue/queueService.ts b/packages/sdk/src/mls/queue/queueService.ts new file mode 100644 index 0000000000..b4bf68147d --- /dev/null +++ b/packages/sdk/src/mls/queue/queueService.ts @@ -0,0 +1,109 @@ +import { PlainMessage, Message } from '@bufbuild/protobuf' +import { + EncryptedData, + MemberPayload_Mls_EpochSecrets, + MemberPayload_Mls_ExternalJoin, + MemberPayload_Mls_InitializeGroup, +} from '@river-build/proto' +import { GroupService, IGroupServiceCoordinator } from '../group' +import { EpochSecretService } from '../epoch' +import { ExternalGroupService } from '../externalGroup' + +type InitializeGroupMessage = PlainMessage +type ExternalJoinMessage = PlainMessage +type EpochSecretsMessage = PlainMessage + +// This feels more like a coordinator +export class QueueService implements IGroupServiceCoordinator { + private epochSecretService!: EpochSecretService + private groupService!: GroupService + private externalGroupService!: ExternalGroupService + + constructor() { + // nop + } + + // IGroupServiceCoordinator + public joinOrCreateGroup(_streamId: string): void { + throw new Error('Method not implemented.') + } + + // IGroupServiceCoordinator + public groupActive(_streamId: string): void { + throw new Error('Method not implemented.') + } + + // IGroupServiceCoordinator + public newEpochSecret(_streamId: string, _epoch: bigint): void { + throw new Error('Method not implemented.') + } + + // API needed by the client + public encryptGroupEventEpochSecret( + _streamId: string, + _event: Message, + ): Promise { + throw new Error('Not implemented') + } + + public async handleInitializeGroup(_streamId: string, _message: InitializeGroupMessage) { + const group = this.groupService.getGroup(_streamId) + if (group) { + await this.groupService.handleInitializeGroup(group, _message) + } + + const groupServiceHasGroup = this.groupService.getGroup(_streamId) !== undefined + if (!groupServiceHasGroup) { + const externalGroup = this.externalGroupService.getExternalGroup(_streamId) + // TODO: change first arg to externalGroup + await this.externalGroupService.handleInitializeGroup(externalGroup!.streamId, _message) + } + } + + public async handleExternalJoin(_streamId: string, _message: ExternalJoinMessage) { + const group = this.groupService.getGroup(_streamId) + if (group) { + await this.groupService.handleExternalJoin(group, _message) + } + + const groupServiceHasGroup = this.groupService.getGroup(_streamId) !== undefined + if (!groupServiceHasGroup) { + const externalGroup = this.externalGroupService.getExternalGroup(_streamId) + // TODO: change first arg to externalGroup + await this.externalGroupService.handleExternalJoin(externalGroup!.streamId, _message) + } + } + + public async handleEpochSecrets(_streamId: string, _message: EpochSecretsMessage) { + return this.epochSecretService.handleEpochSecrets(_streamId, _message) + } + + public async initializeGroupMessage(streamId: string): Promise { + // TODO: Check preconditions + // TODO: Change this API to return group as well + const message = await this.groupService.initializeGroupMessage(streamId) + const group = this.groupService.getGroup(streamId)! + const exportedTree = this.groupService.exportTree(group) + const externalGroupSnapshot = await this.externalGroupService.externalGroupSnapshot( + streamId, + message.groupInfoMessage, + exportedTree, + ) + + return { ...message, externalGroupSnapshot } + } + + public async externalJoinMessage(streamId: string): Promise { + // TODO: Check preconditions + const externalGroup = this.externalGroupService.getExternalGroup('streamId')! + const exportedTree = this.externalGroupService.exportTree(externalGroup) + const latestGroupInfo = this.externalGroupService.latestGroupInfo(externalGroup) + + return this.groupService.externalJoinMessage(streamId, latestGroupInfo, exportedTree) + } + + public epochSecretsMessage(streamId: string): EpochSecretsMessage { + // TODO: Check preconditions + return this.epochSecretService.epochSecretMessage(streamId) + } +} From b06701e1c60bf4c18c4b0eac2bb137852f639340 Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 09:19:14 +0100 Subject: [PATCH 02/17] Specify public API of ExternalGroupService --- .../mls/externalGroup/externalGroupService.ts | 71 ++----------------- 1 file changed, 7 insertions(+), 64 deletions(-) diff --git a/packages/sdk/src/mls/externalGroup/externalGroupService.ts b/packages/sdk/src/mls/externalGroup/externalGroupService.ts index 66511899e1..8aa4bd29e8 100644 --- a/packages/sdk/src/mls/externalGroup/externalGroupService.ts +++ b/packages/sdk/src/mls/externalGroup/externalGroupService.ts @@ -1,19 +1,10 @@ import { ExternalGroup } from './externalGroup' -import { PlainMessage } from '@bufbuild/protobuf' -import { - MemberPayload_Mls_ExternalJoin, - MemberPayload_Mls_InitializeGroup, -} from '@river-build/proto' import { ExternalCrypto } from './externalCrypto' import { dlog, DLogger } from '@river-build/dlog' -type InitializeGroupMessage = PlainMessage -type ExternalJoinMessage = PlainMessage - const defaultLogger = dlog('csb:mls:externalGroupService') export class ExternalGroupService { - private externalGroupCache: Map = new Map() private log: { debug: DLogger error: DLogger @@ -30,67 +21,19 @@ export class ExternalGroupService { } } - public getExternalGroup(streamId: string): ExternalGroup | undefined { - this.log.debug('getExternalGroup', { streamId }) - - return this.externalGroupCache.get(streamId) - } - - public deleteExternalGroup(streamId: string) { - this.log.debug('deleteExternalGroup', { streamId }) - - this.externalGroupCache.delete(streamId) - } - - // change it so it accepts maybe undefined exernal group - public async handleInitializeGroup(streamId: string, message: InitializeGroupMessage) { - this.log.debug('handleInitializeGroup', { streamId }) - - if (this.externalGroupCache.has(streamId)) { - const message = `group already present: ${streamId}` - this.log.error(`handleInitializeGroup: ${message}`) - throw new Error(message) - } - - const group = await this.crypto.loadExternalGroupFromSnapshot( - streamId, - message.externalGroupSnapshot, - ) - - this.externalGroupCache.set(streamId, group) - } - - // TODO: change it so it accepts maybe undefined external group - public async handleExternalJoin(streamId: string, message: ExternalJoinMessage) { - this.log.debug('handleExternalJoin', { streamId }) - - const group = this.externalGroupCache.get(streamId) - if (!group) { - const message = `group not found: ${streamId}` - this.log.error(`handleExternalJoin: ${message}`) - throw new Error(message) - } - - await this.crypto.processCommit(group, message.commit) + public async getExternalGroup(_streamId: string): Promise { + throw new Error('Not implemented') } - public exportTree(group: ExternalGroup): Uint8Array { - this.log.debug('exportTree', { streamId: group.streamId }) - - return this.crypto.exportTree(group) + public exportTree(_group: ExternalGroup): Uint8Array { + throw new Error('Not implemented') } - public externalGroupSnapshot( - streamId: string, - groupInfoMessage: Uint8Array, - exportedTree: Uint8Array, - ): Promise { - this.log.debug('externalGroupSnapshot', { streamId }) - - return this.crypto.externalGroupSnapshot(groupInfoMessage, exportedTree) + public snapshot(_group: ExternalGroup): Promise { + throw new Error('Not implemented') } - public latestGroupInfo(externalGroup: ExternalGroup): Uint8Array { + public latestGroupInfo(_group: ExternalGroup): Uint8Array { throw new Error('Not implemented') } } From 526dac537235e4aa0d4cd925226361c9ea69cff1 Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 09:41:58 +0100 Subject: [PATCH 03/17] Replace exportTree by exportGroupSnapshot --- packages/sdk/src/mls/externalGroup/externalGroupService.ts | 4 ---- packages/sdk/src/mls/group/groupService.ts | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/packages/sdk/src/mls/externalGroup/externalGroupService.ts b/packages/sdk/src/mls/externalGroup/externalGroupService.ts index 8aa4bd29e8..1cf8ff7c51 100644 --- a/packages/sdk/src/mls/externalGroup/externalGroupService.ts +++ b/packages/sdk/src/mls/externalGroup/externalGroupService.ts @@ -29,10 +29,6 @@ export class ExternalGroupService { throw new Error('Not implemented') } - public snapshot(_group: ExternalGroup): Promise { - throw new Error('Not implemented') - } - public latestGroupInfo(_group: ExternalGroup): Uint8Array { throw new Error('Not implemented') } diff --git a/packages/sdk/src/mls/group/groupService.ts b/packages/sdk/src/mls/group/groupService.ts index ead5188ad0..7a87283833 100644 --- a/packages/sdk/src/mls/group/groupService.ts +++ b/packages/sdk/src/mls/group/groupService.ts @@ -212,8 +212,8 @@ export class GroupService { } } - public exportTree(group: Group): Uint8Array { - return this.crypto.exportTree(group) + public exportGroupSnapshot(_group: Group): Uint8Array { + throw new Error('Not implemented') } private getSignaturePublicKey(): Uint8Array { From 145efe74e929e0094d28544c7dc1c165f98cb284 Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 09:46:37 +0100 Subject: [PATCH 04/17] Use externalGroupSnapshot API --- packages/sdk/src/mls/group/groupService.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/sdk/src/mls/group/groupService.ts b/packages/sdk/src/mls/group/groupService.ts index 7a87283833..c0743a5073 100644 --- a/packages/sdk/src/mls/group/groupService.ts +++ b/packages/sdk/src/mls/group/groupService.ts @@ -8,10 +8,7 @@ import { PlainMessage } from '@bufbuild/protobuf' import { Crypto } from './crypto' import { DLogger, dlog } from '@river-build/dlog' -type InitializeGroupMessage = Omit< - PlainMessage, - 'externalGroupSnapshot' -> +type InitializeGroupMessage = PlainMessage type ExternalJoinMessage = PlainMessage // Placeholder for a coordinator @@ -180,11 +177,14 @@ export class GroupService { const group = await this.crypto.createGroup(streamId) await this.saveGroup(group) + const externalGroupSnapshot = this.exportGroupSnapshot(group) + const signaturePublicKey = this.getSignaturePublicKey() return { groupInfoMessage: group.groupInfoWithExternalKey!, signaturePublicKey, + externalGroupSnapshot, } } From cc01f537dcb6c61f0256277ef379d964312d5151 Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 09:47:18 +0100 Subject: [PATCH 05/17] Use bin_equal from @river-build/dlog --- packages/sdk/src/mls/group/groupService.ts | 27 +++++----------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/packages/sdk/src/mls/group/groupService.ts b/packages/sdk/src/mls/group/groupService.ts index c0743a5073..4dc371884d 100644 --- a/packages/sdk/src/mls/group/groupService.ts +++ b/packages/sdk/src/mls/group/groupService.ts @@ -6,7 +6,7 @@ import { } from '@river-build/proto' import { PlainMessage } from '@bufbuild/protobuf' import { Crypto } from './crypto' -import { DLogger, dlog } from '@river-build/dlog' +import { DLogger, dlog, bin_equal } from '@river-build/dlog' type InitializeGroupMessage = PlainMessage type ExternalJoinMessage = PlainMessage @@ -115,8 +115,8 @@ export class GroupService { const wasInitializeGroupOurOwn = group.status === 'GROUP_PENDING_CREATE' && group.groupInfoWithExternalKey !== undefined && - uint8ArrayEqual(_message.groupInfoMessage, group.groupInfoWithExternalKey) && - uint8ArrayEqual(_message.signaturePublicKey, this.getSignaturePublicKey()) + bin_equal(_message.groupInfoMessage, group.groupInfoWithExternalKey) && + bin_equal(_message.signaturePublicKey, this.getSignaturePublicKey()) if (!wasInitializeGroupOurOwn) { await this.clearGroup(group.streamId) @@ -147,10 +147,10 @@ export class GroupService { const wasExternalJoinOurOwn = group.status === 'GROUP_PENDING_JOIN' && group.groupInfoWithExternalKey !== undefined && - uint8ArrayEqual(message.groupInfoMessage, group.groupInfoWithExternalKey) && + bin_equal(message.groupInfoMessage, group.groupInfoWithExternalKey) && group.commit !== undefined && - uint8ArrayEqual(message.commit, group.commit) && - uint8ArrayEqual(message.signaturePublicKey, this.getSignaturePublicKey()) + bin_equal(message.commit, group.commit) && + bin_equal(message.signaturePublicKey, this.getSignaturePublicKey()) if (!wasExternalJoinOurOwn) { await this.clearGroup(group.streamId) @@ -220,18 +220,3 @@ export class GroupService { return this.crypto.signaturePublicKey() } } - -function uint8ArrayEqual(a: Uint8Array, b: Uint8Array): boolean { - if (a === b) { - return true - } - if (a.length !== b.length) { - return false - } - for (const [i, byte] of a.entries()) { - if (byte !== b[i]) { - return false - } - } - return true -} From 445c8e7fe5e6038a0fa8bea61b64ba105bbda5a5 Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 11:18:46 +0100 Subject: [PATCH 06/17] Queue is roughly finished --- packages/sdk/src/mls/queue/queueService.ts | 273 +++++++++++++++++---- 1 file changed, 227 insertions(+), 46 deletions(-) diff --git a/packages/sdk/src/mls/queue/queueService.ts b/packages/sdk/src/mls/queue/queueService.ts index b4bf68147d..fd088d80a8 100644 --- a/packages/sdk/src/mls/queue/queueService.ts +++ b/packages/sdk/src/mls/queue/queueService.ts @@ -1,4 +1,4 @@ -import { PlainMessage, Message } from '@bufbuild/protobuf' +import { Message, PlainMessage } from '@bufbuild/protobuf' import { EncryptedData, MemberPayload_Mls_EpochSecrets, @@ -8,37 +8,81 @@ import { import { GroupService, IGroupServiceCoordinator } from '../group' import { EpochSecretService } from '../epoch' import { ExternalGroupService } from '../externalGroup' +import { DLogger } from '@river-build/dlog' type InitializeGroupMessage = PlainMessage type ExternalJoinMessage = PlainMessage type EpochSecretsMessage = PlainMessage +// Commands, which are internal commands of the Queue + +type JoinOrCreateGroupCommand = { + tag: 'joinOrCreateGroup' + streamId: string +} + +type GroupActiveCommand = { + tag: 'groupActive' + streamId: string +} + +type NewEpochSecretCommand = { + tag: 'newEpochSecrets' + streamId: string + epoch: bigint +} + +type QueueCommand = JoinOrCreateGroupCommand | GroupActiveCommand | NewEpochSecretCommand + +// Events, which we are processing from outside +type InitializeGroupEvent = { + tag: 'initializeGroup' + streamId: string + message: InitializeGroupMessage +} + +type ExternalJoinEvent = { + tag: 'externalJoin' + streamId: string + message: ExternalJoinMessage +} + +type EpochSecretsEvent = { + tag: 'epochSecrets' + streamId: string + message: EpochSecretsMessage +} + +// TODO: Should encrypted content get its own queue? +type EncryptedContentEvent = { + tag: 'encryptedContent' + streamId: string + message: EncryptedData +} + +type QueueEvent = + | InitializeGroupEvent + | ExternalJoinEvent + | EpochSecretsEvent + | EncryptedContentEvent + // This feels more like a coordinator -export class QueueService implements IGroupServiceCoordinator { +export class QueueService { private epochSecretService!: EpochSecretService private groupService!: GroupService private externalGroupService!: ExternalGroupService - - constructor() { - // nop - } - - // IGroupServiceCoordinator - public joinOrCreateGroup(_streamId: string): void { - throw new Error('Method not implemented.') + private log!: { + error: DLogger + debug: DLogger } - // IGroupServiceCoordinator - public groupActive(_streamId: string): void { - throw new Error('Method not implemented.') - } - // IGroupServiceCoordinator - public newEpochSecret(_streamId: string, _epoch: bigint): void { - throw new Error('Method not implemented.') + constructor() { + // nop } // API needed by the client + // TODO: How long will be the timeout here? public encryptGroupEventEpochSecret( _streamId: string, _event: Message, @@ -46,32 +90,20 @@ export class QueueService implements IGroupServiceCoordinator { throw new Error('Not implemented') } + // # MLS Coordinator # + public async handleInitializeGroup(_streamId: string, _message: InitializeGroupMessage) { const group = this.groupService.getGroup(_streamId) - if (group) { + if (group !== undefined) { await this.groupService.handleInitializeGroup(group, _message) } - - const groupServiceHasGroup = this.groupService.getGroup(_streamId) !== undefined - if (!groupServiceHasGroup) { - const externalGroup = this.externalGroupService.getExternalGroup(_streamId) - // TODO: change first arg to externalGroup - await this.externalGroupService.handleInitializeGroup(externalGroup!.streamId, _message) - } } public async handleExternalJoin(_streamId: string, _message: ExternalJoinMessage) { const group = this.groupService.getGroup(_streamId) - if (group) { + if (group !== undefined) { await this.groupService.handleExternalJoin(group, _message) } - - const groupServiceHasGroup = this.groupService.getGroup(_streamId) !== undefined - if (!groupServiceHasGroup) { - const externalGroup = this.externalGroupService.getExternalGroup(_streamId) - // TODO: change first arg to externalGroup - await this.externalGroupService.handleExternalJoin(externalGroup!.streamId, _message) - } } public async handleEpochSecrets(_streamId: string, _message: EpochSecretsMessage) { @@ -80,22 +112,17 @@ export class QueueService implements IGroupServiceCoordinator { public async initializeGroupMessage(streamId: string): Promise { // TODO: Check preconditions - // TODO: Change this API to return group as well - const message = await this.groupService.initializeGroupMessage(streamId) - const group = this.groupService.getGroup(streamId)! - const exportedTree = this.groupService.exportTree(group) - const externalGroupSnapshot = await this.externalGroupService.externalGroupSnapshot( - streamId, - message.groupInfoMessage, - exportedTree, - ) - - return { ...message, externalGroupSnapshot } + // TODO: Catch the error + return this.groupService.initializeGroupMessage(streamId) } public async externalJoinMessage(streamId: string): Promise { - // TODO: Check preconditions - const externalGroup = this.externalGroupService.getExternalGroup('streamId')! + const externalGroup = await this.externalGroupService.getExternalGroup('streamId') + if (externalGroup === undefined) { + this.log.error('External group not found', { streamId }) + throw new Error('External group not found') + } + const exportedTree = this.externalGroupService.exportTree(externalGroup) const latestGroupInfo = this.externalGroupService.latestGroupInfo(externalGroup) @@ -106,4 +133,158 @@ export class QueueService implements IGroupServiceCoordinator { // TODO: Check preconditions return this.epochSecretService.epochSecretMessage(streamId) } + + public async joinOrCreateGroup(_streamId: string): Promise { + throw new Error('Not implemented') + } + + public async groupActive(_streamId: string): Promise { + throw new Error('Not implemented') + } + + public async newEpochSecrets(_streamId: string, _epoch: bigint): Promise { + throw new Error('Not implemented') + } + + // # Queue-related operations # + + // Queue-related fields + private commandQueue: QueueCommand[] = [] + private eventQueue: QueueEvent[] = [] + private delayMs = 15 + private started: boolean = false + private stopping: boolean = false + private timeoutId?: NodeJS.Timeout + private inProgressTick?: Promise + + public enqueueCommand(command: QueueCommand) { + this.commandQueue.push(command) + } + + private dequeueCommand(): QueueCommand | undefined { + return this.commandQueue.shift() + } + + public enqueueEvent(event: QueueEvent) { + this.eventQueue.push(event) + } + + private dequeueEvent(): QueueEvent | undefined { + return this.eventQueue.shift() + } + + getDelayMs(): number { + return this.delayMs + } + + public start() { + // nop + this.started = true + this.checkStartTicking() + } + + public async stop(): Promise { + this.started = false + await this.stopTicking() + // nop + } + + private checkStartTicking() { + // TODO: pause if take mobile safari is backgrounded (idb issue) + + if (this.stopping) { + this.log.debug('ticking is being stopped') + return + } + + if (!this.started || this.timeoutId) { + this.log.debug('previous tick is still running') + return + } + + // TODO: should this have any timeout? + this.timeoutId = setTimeout(() => { + this.inProgressTick = this.tick() + .catch((e) => this.log.error('MLS ProcessTick Error', e)) + .finally(() => { + this.timeoutId = undefined + this.checkStartTicking() + }) + }, this.getDelayMs()) + } + + private async stopTicking() { + if (this.stopping) { + return + } + this.stopping = true + + if (this.timeoutId) { + clearTimeout(this.timeoutId) + this.timeoutId = undefined + } + if (this.inProgressTick) { + try { + await this.inProgressTick + } catch (e) { + this.log.error('ProcessTick Error while stopping', e) + } finally { + this.inProgressTick = undefined + } + } + this.stopping = false + } + + public async tick() { + // noop + const command = this.dequeueCommand() + if (command !== undefined) { + return this.processCommand(command) + } + + const event = this.dequeueEvent() + if (event !== undefined) { + return this.processEvent(event) + } + } + + public async processCommand(command: QueueCommand): Promise { + switch (command.tag) { + case 'joinOrCreateGroup': + return this.joinOrCreateGroup(command.streamId) + case 'groupActive': + return this.groupActive(command.streamId) + case 'newEpochSecrets': + return this.newEpochSecrets(command.streamId, command.epoch) + } + } + + public async processEvent(event: QueueEvent): Promise { + switch (event.tag) { + case 'initializeGroup': + return this.handleInitializeGroup(event.streamId, event.message) + case 'externalJoin': + return this.handleExternalJoin(event.streamId, event.message) + case 'epochSecrets': + return this.handleEpochSecrets(event.streamId, event.message) + } + } +} + +export class GroupServiceCoordinatorAdapter implements IGroupServiceCoordinator { + public readonly queueService: QueueService + + constructor(queueService: QueueService) { + this.queueService = queueService + } + + joinOrCreateGroup(streamId: string): void { + this.queueService.enqueueCommand({ tag: 'joinOrCreateGroup', streamId }) + } + groupActive(streamId: string): void { + this.queueService.enqueueCommand({ tag: 'groupActive', streamId }) + } + newEpochSecret(streamId: string, epoch: bigint): void { + this.queueService.enqueueCommand({ tag: 'newEpochSecrets', streamId, epoch }) + } } From 5b95fc192bbcaf23abb4b1fb69454ca6cbc38606 Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 12:28:50 +0100 Subject: [PATCH 07/17] Add currentEpoch to GroupService --- packages/sdk/src/mls/group/groupService.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/sdk/src/mls/group/groupService.ts b/packages/sdk/src/mls/group/groupService.ts index 4dc371884d..ee9021c7f5 100644 --- a/packages/sdk/src/mls/group/groupService.ts +++ b/packages/sdk/src/mls/group/groupService.ts @@ -216,6 +216,10 @@ export class GroupService { throw new Error('Not implemented') } + public currentEpoch(group: Group): bigint { + return this.crypto.currentEpoch(group) + } + private getSignaturePublicKey(): Uint8Array { return this.crypto.signaturePublicKey() } From 8bdfdba521462ebc9ca34f5f79bd5882dd875bff Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 12:29:11 +0100 Subject: [PATCH 08/17] Add Awaiter --- packages/sdk/src/mls/queue/awaiter.ts | 45 +++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 packages/sdk/src/mls/queue/awaiter.ts diff --git a/packages/sdk/src/mls/queue/awaiter.ts b/packages/sdk/src/mls/queue/awaiter.ts new file mode 100644 index 0000000000..651c2fd31d --- /dev/null +++ b/packages/sdk/src/mls/queue/awaiter.ts @@ -0,0 +1,45 @@ +export interface IAwaiter { + promise: Promise + resolve: () => void +} + +export class NoopAwaiter implements IAwaiter { + public promise = Promise.resolve() + public resolve = () => {} +} + +export class IndefiniteAwaiter implements IAwaiter { + public promise: Promise + public resolve!: () => void + + public constructor() { + this.promise = new Promise((resolve) => { + this.resolve = resolve + }) + } +} + +export class TimeoutAwaiter implements IAwaiter { + // top level promise + public promise: Promise + // resolve handler to the inner promise + public resolve!: () => void + public constructor(timeoutMS: number, msg: string = 'Awaiter timed out') { + let timeout: NodeJS.Timeout + const timeoutPromise = new Promise((_resolve, reject) => { + timeout = setTimeout(() => { + reject(new Error(msg)) + }, timeoutMS) + }) + const internalPromise: Promise = new Promise( + (resolve: (value: void) => void, _reject) => { + this.resolve = () => { + resolve() + } + }, + ).finally(() => { + clearTimeout(timeout) + }) + this.promise = Promise.race([internalPromise, timeoutPromise]) + } +} From ba2d0a4a2ae1674a12ef9b530ce886e293edec19 Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 12:30:07 +0100 Subject: [PATCH 09/17] Almost finished queue --- packages/sdk/src/mls/queue/queueService.ts | 205 +++++++++++++++++++-- 1 file changed, 185 insertions(+), 20 deletions(-) diff --git a/packages/sdk/src/mls/queue/queueService.ts b/packages/sdk/src/mls/queue/queueService.ts index fd088d80a8..4b06267c52 100644 --- a/packages/sdk/src/mls/queue/queueService.ts +++ b/packages/sdk/src/mls/queue/queueService.ts @@ -6,9 +6,19 @@ import { MemberPayload_Mls_InitializeGroup, } from '@river-build/proto' import { GroupService, IGroupServiceCoordinator } from '../group' -import { EpochSecretService } from '../epoch' +import { EpochSecret, EpochSecretService } from '../epoch' import { ExternalGroupService } from '../externalGroup' -import { DLogger } from '@river-build/dlog' +import { check, DLogger } from '@river-build/dlog' +import { isDefined, logNever } from '../../check' +import { EncryptedContentItem } from '@river-build/encryption' +import { + EncryptedContent, + isEncryptedContentKind, + toDecryptedContent, +} from '../../encryptedContentTypes' +import { Client } from '../../client' +import { IPersistenceStore } from '../../persistenceStore' +import { IAwaiter, IndefiniteAwaiter } from './awaiter' type InitializeGroupMessage = PlainMessage type ExternalJoinMessage = PlainMessage @@ -57,7 +67,8 @@ type EpochSecretsEvent = { type EncryptedContentEvent = { tag: 'encryptedContent' streamId: string - message: EncryptedData + eventId: string + message: EncryptedContent } type QueueEvent = @@ -66,48 +77,167 @@ type QueueEvent = | EpochSecretsEvent | EncryptedContentEvent +type MlsEncryptedContentItem = { + streamId: string + eventId: string + kind: string + encryptedData: EncryptedData +} + +const textEncoder = new TextEncoder() +const textDecoder = new TextDecoder() + +function encode(value: string): Uint8Array { + return textEncoder.encode(value) +} + +function decode(value: Uint8Array): string { + return textDecoder.decode(value) +} + // This feels more like a coordinator export class QueueService { private epochSecretService!: EpochSecretService private groupService!: GroupService private externalGroupService!: ExternalGroupService + private decryptionFailures: Map> = new Map() + private client!: Client + private persistenceStore!: IPersistenceStore + private awaitingGroupActive: Map = new Map() + private log!: { error: DLogger debug: DLogger } - constructor() { // nop } // API needed by the client // TODO: How long will be the timeout here? - public encryptGroupEventEpochSecret( - _streamId: string, - _event: Message, + public async encryptGroupEventEpochSecret( + streamId: string, + event: Message, ): Promise { - throw new Error('Not implemented') + const hasGroup = this.groupService.getGroup(streamId) !== undefined + if (!hasGroup) { + // No group so we request joining + // NOTE: We are enqueueing command instead of doing the async call + this.enqueueCommand({ tag: 'joinOrCreateGroup', streamId }) + } + // Wait for the group to become active + await this.awaitGroupActive(streamId) + const activeGroup = this.groupService.getGroup(streamId) + if (activeGroup === undefined) { + throw new Error('Fatal: no group after awaitGroupActive') + } + + if (activeGroup.status !== 'GROUP_ACTIVE') { + throw new Error('Fatal: group is not active') + } + + const epoch = this.groupService.currentEpoch(activeGroup) + + const epochSecret = this.epochSecretService.getEpochSecret(streamId, epoch) + + if (epochSecret === undefined) { + throw new Error('Fatal: no epoch secret for active group current epoch') + } + + const plaintext_ = event.toJsonString() + const plaintext = encode(plaintext_) + + return this.epochSecretService.encryptMessage(epochSecret, plaintext) + } + + // TODO: Maybe this could be refactored into a separate class + private async decryptGroupEvent( + epochSecret: EpochSecret, + streamId: string, + eventId: string, + kind: string, // kind of data + encryptedData: EncryptedData, + ) { + // this.logCall('decryptGroupEvent', streamId, eventId, kind, + // encryptedData) + const stream = this.client.stream(streamId) + check(isDefined(stream), 'stream not found') + check(isEncryptedContentKind(kind), `invalid kind ${kind}`) + + // check cache + let cleartext = await this.persistenceStore.getCleartext(eventId) + if (cleartext === undefined) { + const cleartext_ = await this.epochSecretService.decryptMessage( + epochSecret, + encryptedData, + ) + cleartext = decode(cleartext_) + } + const decryptedContent = toDecryptedContent(kind, cleartext) + + stream.updateDecryptedContent(eventId, decryptedContent) } // # MLS Coordinator # - public async handleInitializeGroup(_streamId: string, _message: InitializeGroupMessage) { - const group = this.groupService.getGroup(_streamId) + public async handleInitializeGroup(streamId: string, message: InitializeGroupMessage) { + const group = this.groupService.getGroup(streamId) if (group !== undefined) { - await this.groupService.handleInitializeGroup(group, _message) + await this.groupService.handleInitializeGroup(group, message) } } - public async handleExternalJoin(_streamId: string, _message: ExternalJoinMessage) { - const group = this.groupService.getGroup(_streamId) + public async handleExternalJoin(streamId: string, message: ExternalJoinMessage) { + const group = this.groupService.getGroup(streamId) if (group !== undefined) { - await this.groupService.handleExternalJoin(group, _message) + await this.groupService.handleExternalJoin(group, message) } } - public async handleEpochSecrets(_streamId: string, _message: EpochSecretsMessage) { - return this.epochSecretService.handleEpochSecrets(_streamId, _message) + public async handleEpochSecrets(streamId: string, message: EpochSecretsMessage) { + return this.epochSecretService.handleEpochSecrets(streamId, message) + } + + public async handleEncryptedContent( + streamId: string, + eventId: string, + message: EncryptedContent, + ) { + const encryptedData = message.content + // TODO: Check if message was encrypted with MLS + // const ciphertext = encryptedData.mls!.ciphertext + const epoch = encryptedData.mls!.epoch + const kind = message.kind + + const epochSecret = this.epochSecretService.getEpochSecret(streamId, epoch) + if (epochSecret === undefined) { + this.log.debug('Epoch secret not found', { streamId, epoch }) + this.enqueueDecryptionFailure(streamId, epoch, { + streamId, + eventId, + kind, + encryptedData, + }) + return + } + + // Decrypt immediately + return this.decryptGroupEvent(epochSecret, streamId, eventId, kind, encryptedData) + } + + private enqueueDecryptionFailure(streamId: string, epoch: bigint, item: EncryptedContentItem) { + let perStream = this.decryptionFailures.get(streamId) + if (perStream === undefined) { + perStream = new Map() + this.decryptionFailures.set(item.streamId, perStream) + } + let perEpoch = perStream.get(epoch) + if (perEpoch === undefined) { + perEpoch = [] + perStream.set(epoch, perEpoch) + } + perEpoch.push(item) } public async initializeGroupMessage(streamId: string): Promise { @@ -138,11 +268,40 @@ export class QueueService { throw new Error('Not implemented') } - public async groupActive(_streamId: string): Promise { - throw new Error('Not implemented') + // NOTE: Critical section, no awaits permitted + public awaitGroupActive(streamId: string): Promise { + // this.log(`awaitGroupActive ${streamId}`) + if (this.groupService.getGroup(streamId)?.status === 'GROUP_ACTIVE') { + return Promise.resolve() + } + + let awaiter = this.awaitingGroupActive.get(streamId) + if (awaiter === undefined) { + const internalAwaiter = new IndefiniteAwaiter() + // NOTE: we clear after the promise has resolved + const promise = internalAwaiter.promise.finally(() => { + this.awaitingGroupActive.delete(streamId) + }) + awaiter = { + promise, + resolve: internalAwaiter.resolve, + } + this.awaitingGroupActive.set(streamId, awaiter) + } + + return awaiter.promise + } + + public groupActive(streamId: string): void { + const awaiter = this.awaitingGroupActive.get(streamId) + if (awaiter !== undefined) { + awaiter.resolve() + } } - public async newEpochSecrets(_streamId: string, _epoch: bigint): Promise { + public async newEpochSecret(_streamId: string, _epoch: bigint): Promise { + // TODO: Decrypt all messages for that particular epoch secret + // TODO: Try opening a new epoch throw new Error('Not implemented') } @@ -255,7 +414,9 @@ export class QueueService { case 'groupActive': return this.groupActive(command.streamId) case 'newEpochSecrets': - return this.newEpochSecrets(command.streamId, command.epoch) + return this.newEpochSecret(command.streamId, command.epoch) + default: + logNever(command) } } @@ -267,6 +428,10 @@ export class QueueService { return this.handleExternalJoin(event.streamId, event.message) case 'epochSecrets': return this.handleEpochSecrets(event.streamId, event.message) + case 'encryptedContent': + return this.handleEncryptedContent(event.streamId, event.eventId, event.message) + default: + logNever(event) } } } From 221d81b69b895645a9a888e02a49cd21b7136f38 Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 14:31:37 +0100 Subject: [PATCH 10/17] WIP --- .../sdk/src/mls/epoch/epochSecretService.ts | 22 +- packages/sdk/src/mls/group/crypto.ts | 5 + packages/sdk/src/mls/group/groupService.ts | 12 +- packages/sdk/src/mls/queue/queueService.ts | 209 ++++++++++++++++-- 4 files changed, 223 insertions(+), 25 deletions(-) diff --git a/packages/sdk/src/mls/epoch/epochSecretService.ts b/packages/sdk/src/mls/epoch/epochSecretService.ts index ce4e1ec002..da4297f91e 100644 --- a/packages/sdk/src/mls/epoch/epochSecretService.ts +++ b/packages/sdk/src/mls/epoch/epochSecretService.ts @@ -15,20 +15,28 @@ const MLS_ALGORITHM = 'mls_0.0.1' type EpochSecretsMessage = PlainMessage +export interface IEpochSecretServiceCoordinator { + newOpenEpochSecret(streamId: string, epoch: bigint): void + newSealedEpochSecret(streamId: string, epoch: bigint): void +} + export class EpochSecretService { private epochSecretStore: IEpochSecretStore private cipherSuite: MlsCipherSuite private cache: Map = new Map() + private coordinator?: IEpochSecretServiceCoordinator log: DLogger public constructor( cipherSuite: MlsCipherSuite, epochSecretStore: IEpochSecretStore, log: DLogger, + coordinator?: IEpochSecretServiceCoordinator, ) { this.log = log this.cipherSuite = cipherSuite this.epochSecretStore = epochSecretStore + this.coordinator = coordinator } /// Gets epochKey from the cache @@ -107,6 +115,7 @@ export class EpochSecretService { } // TODO: Should this method store epochKey? await this.saveEpochSecret(epochSecret) + this.coordinator?.newSealedEpochSecret(streamId, epoch) } // TODO: Should this method persist the epoch secret? @@ -140,6 +149,7 @@ export class EpochSecretService { } // TODO: Should this method store epochKey await this.saveEpochSecret(epochSecret) + this.coordinator?.newOpenEpochSecret(streamId, epoch) } public async openSealedEpochSecret( @@ -234,7 +244,17 @@ export class EpochSecretService { throw new Error('Not implemented') } - public epochSecretMessage(streamId: string): EpochSecretsMessage { + public epochSecretMessage(_epochSecret: EpochSecret): EpochSecretsMessage { throw new Error('Not implemented') } + + public isOpen(epochSecret: EpochSecret): boolean { + return epochSecret.openEpochSecret !== undefined + } + + public canBeOpened(epochSecret: EpochSecret): boolean { + return ( + epochSecret.openEpochSecret === undefined && epochSecret.sealedEpochSecret !== undefined + ) + } } diff --git a/packages/sdk/src/mls/group/crypto.ts b/packages/sdk/src/mls/group/crypto.ts index 6ae58797cd..14bbcae4aa 100644 --- a/packages/sdk/src/mls/group/crypto.ts +++ b/packages/sdk/src/mls/group/crypto.ts @@ -111,4 +111,9 @@ export class Crypto { } return this.client.signaturePublicKey() } + + public async exportEpochSecret(group: Group): Promise { + const secret = await group.group.currentEpochSecret() + return secret.toBytes() + } } diff --git a/packages/sdk/src/mls/group/groupService.ts b/packages/sdk/src/mls/group/groupService.ts index ee9021c7f5..7de7ce2d9b 100644 --- a/packages/sdk/src/mls/group/groupService.ts +++ b/packages/sdk/src/mls/group/groupService.ts @@ -15,7 +15,7 @@ type ExternalJoinMessage = PlainMessage export interface IGroupServiceCoordinator { joinOrCreateGroup(streamId: string): void groupActive(streamId: string): void - newEpochSecret(streamId: string, epoch: bigint): void + newEpoch(streamId: string, epoch: bigint): void } const defaultLogger = dlog('csb:mls:groupService') @@ -129,7 +129,7 @@ export class GroupService { this.coordinator?.groupActive(group.streamId) const epoch = this.crypto.currentEpoch(group) - this.coordinator?.newEpochSecret(group.streamId, epoch) + this.coordinator?.newEpoch(group.streamId, epoch) } public async handleExternalJoin(group: Group, message: ExternalJoinMessage) { @@ -140,7 +140,7 @@ export class GroupService { await this.crypto.processCommit(group, message.commit) await this.saveGroup(group) const epoch = this.crypto.currentEpoch(group) - this.coordinator?.newEpochSecret(group.streamId, epoch) + this.coordinator?.newEpoch(group.streamId, epoch) return } @@ -163,7 +163,7 @@ export class GroupService { this.coordinator?.groupActive(group.streamId) const epoch = this.crypto.currentEpoch(group) - this.coordinator?.newEpochSecret(group.streamId, epoch) + this.coordinator?.newEpoch(group.streamId, epoch) } public async initializeGroupMessage(streamId: string): Promise { @@ -223,4 +223,8 @@ export class GroupService { private getSignaturePublicKey(): Uint8Array { return this.crypto.signaturePublicKey() } + + public async exportEpochSecret(group: Group): Promise { + return this.crypto.exportEpochSecret(group) + } } diff --git a/packages/sdk/src/mls/queue/queueService.ts b/packages/sdk/src/mls/queue/queueService.ts index 4b06267c52..9c445107c7 100644 --- a/packages/sdk/src/mls/queue/queueService.ts +++ b/packages/sdk/src/mls/queue/queueService.ts @@ -6,7 +6,7 @@ import { MemberPayload_Mls_InitializeGroup, } from '@river-build/proto' import { GroupService, IGroupServiceCoordinator } from '../group' -import { EpochSecret, EpochSecretService } from '../epoch' +import { EpochSecret, EpochSecretService, IEpochSecretServiceCoordinator } from '../epoch' import { ExternalGroupService } from '../externalGroup' import { check, DLogger } from '@river-build/dlog' import { isDefined, logNever } from '../../check' @@ -36,13 +36,37 @@ type GroupActiveCommand = { streamId: string } -type NewEpochSecretCommand = { - tag: 'newEpochSecrets' +type NewEpochCommand = { + tag: 'newEpoch' streamId: string epoch: bigint } -type QueueCommand = JoinOrCreateGroupCommand | GroupActiveCommand | NewEpochSecretCommand +type NewOpenEpochSecretCommand = { + tag: 'newOpenEpochSecret' + streamId: string + epoch: bigint +} + +type NewSealedEpochSecretCommand = { + tag: 'newSealedEpochSecret' + streamId: string + epoch: bigint +} + +type AnnounceEpochSecretCommand = { + tag: 'announceEpochSecret' + streamId: string + epoch: bigint +} + +type QueueCommand = + | JoinOrCreateGroupCommand + | GroupActiveCommand + | NewEpochCommand + | NewOpenEpochSecretCommand + | NewSealedEpochSecretCommand + | AnnounceEpochSecretCommand // Events, which we are processing from outside type InitializeGroupEvent = { @@ -139,10 +163,15 @@ export class QueueService { const epoch = this.groupService.currentEpoch(activeGroup) - const epochSecret = this.epochSecretService.getEpochSecret(streamId, epoch) + let epochSecret = this.epochSecretService.getEpochSecret(streamId, epoch) if (epochSecret === undefined) { - throw new Error('Fatal: no epoch secret for active group current epoch') + // NOTE: queue has not processed new epoch event yet, force it manually + await this.newEpoch(streamId, epoch) + epochSecret = this.epochSecretService.getEpochSecret(streamId, epoch) + if (epochSecret === undefined) { + throw new Error('Fatal: epoch secret not found') + } } const plaintext_ = event.toJsonString() @@ -258,11 +287,11 @@ export class QueueService { return this.groupService.externalJoinMessage(streamId, latestGroupInfo, exportedTree) } - - public epochSecretsMessage(streamId: string): EpochSecretsMessage { - // TODO: Check preconditions - return this.epochSecretService.epochSecretMessage(streamId) - } + // + // public epochSecretsMessage(streamId: string): EpochSecretsMessage { + // // TODO: Check preconditions + // return this.epochSecretService.epochSecretMessage(epochSecret) + // } public async joinOrCreateGroup(_streamId: string): Promise { throw new Error('Not implemented') @@ -299,10 +328,128 @@ export class QueueService { } } - public async newEpochSecret(_streamId: string, _epoch: bigint): Promise { + public async newEpoch(streamId: string, epoch: bigint): Promise { + const epochAlreadyProcessed = + this.epochSecretService.getEpochSecret(streamId, epoch) !== undefined + if (epochAlreadyProcessed) { + return + } + + const group = this.groupService.getGroup(streamId) + if (group === undefined) { + throw new Error('Fatal: newEpoch called for missing group') + } + + if (group.status !== 'GROUP_ACTIVE') { + throw new Error('Fatal: newEpoch called for non-active group') + } + + if (this.groupService.currentEpoch(group) !== epoch) { + throw new Error('Fatal: newEpoch called for wrong epoch') + } + + const epochSecret = await this.groupService.exportEpochSecret(group) + await this.epochSecretService.addOpenEpochSecret(streamId, epoch, epochSecret) + this.enqueueCommand({ tag: 'announceEpochSecret', streamId, epoch }) + } + + public async newOpenEpochSecret(streamId: string, _epoch: bigint): Promise { + const epochSecret = this.epochSecretService.getEpochSecret(streamId, _epoch) + if (epochSecret === undefined) { + throw new Error('Fatal: newEpochSecret called for missing epoch secret') + } + + if (epochSecret.derivedKeys === undefined) { + throw new Error('Fatal: missing derived keys for open secret') + } + // TODO: Decrypt all messages for that particular epoch secret - // TODO: Try opening a new epoch - throw new Error('Not implemented') + const perStream = this.decryptionFailures.get(streamId) + if (perStream !== undefined) { + const perEpoch = perStream.get(_epoch) + if (perEpoch !== undefined) { + perStream.delete(_epoch) + // TODO: Can this be Promise.all? + for (const decryptionFailure of perEpoch) { + await this.decryptGroupEvent( + epochSecret, + decryptionFailure.streamId, + decryptionFailure.eventId, + decryptionFailure.kind, + decryptionFailure.encryptedData, + ) + } + } + } + + const previousEpochSecret = this.epochSecretService.getEpochSecret(streamId, _epoch - 1n) + if ( + previousEpochSecret !== undefined && + this.epochSecretService.canBeOpened(previousEpochSecret) + ) { + await this.epochSecretService.openSealedEpochSecret( + previousEpochSecret, + epochSecret.derivedKeys, + ) + } + } + + private async newSealedEpochSecret(streamId: string, epoch: bigint): Promise { + const epochSecret = this.epochSecretService.getEpochSecret(streamId, epoch) + if (epochSecret === undefined) { + throw new Error('Fatal: newSealedEpochSecret called for missing epoch secret') + } + + if (epochSecret.sealedEpochSecret === undefined) { + throw new Error('Fatal: missing sealed secret for sealed secret') + } + + // TODO: Maybe this can be Promise.all? + await this.tryOpeningSealedEpochSecret(epochSecret) + await this.tryAnnouncingSealedEpochSecret(epochSecret) + } + + private async tryOpeningSealedEpochSecret(sealedEpochSecret: EpochSecret): Promise { + if (sealedEpochSecret.sealedEpochSecret === undefined) { + throw new Error('Fatal: tryOpeningSealedEpochSecret called for missing sealed secret') + } + + // Already open + if (sealedEpochSecret.openEpochSecret !== undefined) { + return + } + + // Missing derived keys needed to open + const nextEpochSecret = this.epochSecretService.getEpochSecret( + sealedEpochSecret.streamId, + sealedEpochSecret.epoch + 1n, + ) + if (nextEpochSecret?.derivedKeys === undefined) { + return + } + + return this.epochSecretService.openSealedEpochSecret( + sealedEpochSecret, + nextEpochSecret.derivedKeys, + ) + } + + public async announceEpochSecret(_streamId: string, _epoch: bigint) { + // NOP + } + + private async tryAnnouncingSealedEpochSecret(epochSecret: EpochSecret): Promise { + if (epochSecret.sealedEpochSecret === undefined) { + throw new Error('Fatal: announceSealedEpoch called for missing sealed secret') + } + + if (epochSecret.announced) { + return + } + + const message = this.epochSecretService.epochSecretMessage(epochSecret) + // TODO: Client sends message to the stream + throw new Error('Not finished') } // # Queue-related operations # @@ -413,8 +560,14 @@ export class QueueService { return this.joinOrCreateGroup(command.streamId) case 'groupActive': return this.groupActive(command.streamId) - case 'newEpochSecrets': - return this.newEpochSecret(command.streamId, command.epoch) + case 'newEpoch': + return this.newOpenEpochSecret(command.streamId, command.epoch) + case 'newOpenEpochSecret': + return this.newOpenEpochSecret(command.streamId, command.epoch) + case 'newSealedEpochSecret': + return this.newSealedEpochSecret(command.streamId, command.epoch) + case 'announceEpochSecret': + return this.announceEpochSecret(command.streamId, command.epoch) default: logNever(command) } @@ -434,6 +587,7 @@ export class QueueService { logNever(event) } } + } export class GroupServiceCoordinatorAdapter implements IGroupServiceCoordinator { @@ -443,13 +597,28 @@ export class GroupServiceCoordinatorAdapter implements IGroupServiceCoordinator this.queueService = queueService } - joinOrCreateGroup(streamId: string): void { + public joinOrCreateGroup(streamId: string): void { this.queueService.enqueueCommand({ tag: 'joinOrCreateGroup', streamId }) } - groupActive(streamId: string): void { + public groupActive(streamId: string): void { this.queueService.enqueueCommand({ tag: 'groupActive', streamId }) } - newEpochSecret(streamId: string, epoch: bigint): void { - this.queueService.enqueueCommand({ tag: 'newEpochSecrets', streamId, epoch }) + public newEpoch(streamId: string, epoch: bigint): void { + this.queueService.enqueueCommand({ tag: 'newEpoch', streamId, epoch }) + } +} + +export class EpochSecretServiceCoordinatorAdapter implements IEpochSecretServiceCoordinator { + public readonly queueService: QueueService + + constructor(queueService: QueueService) { + this.queueService = queueService + } + + public newOpenEpochSecret(streamId: string, epoch: bigint): void { + this.queueService.enqueueCommand({ tag: 'newOpenEpochSecret', streamId, epoch }) + } + public newSealedEpochSecret(streamId: string, epoch: bigint): void { + this.queueService.enqueueCommand({ tag: 'newSealedEpochSecret', streamId, epoch }) } } From e77ccf05e2c52a28bfab6e1e5132a88983e3e771 Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 15:01:49 +0100 Subject: [PATCH 11/17] Split Coordinator and Queue --- .../src/mls/{queue => coordinator}/awaiter.ts | 0 .../sdk/src/mls/coordinator/coordinator.ts | 396 ++++++++++++++++++ packages/sdk/src/mls/queue/queueService.ts | 394 ++--------------- 3 files changed, 424 insertions(+), 366 deletions(-) rename packages/sdk/src/mls/{queue => coordinator}/awaiter.ts (100%) create mode 100644 packages/sdk/src/mls/coordinator/coordinator.ts diff --git a/packages/sdk/src/mls/queue/awaiter.ts b/packages/sdk/src/mls/coordinator/awaiter.ts similarity index 100% rename from packages/sdk/src/mls/queue/awaiter.ts rename to packages/sdk/src/mls/coordinator/awaiter.ts diff --git a/packages/sdk/src/mls/coordinator/coordinator.ts b/packages/sdk/src/mls/coordinator/coordinator.ts new file mode 100644 index 0000000000..2393577400 --- /dev/null +++ b/packages/sdk/src/mls/coordinator/coordinator.ts @@ -0,0 +1,396 @@ +import { Message, PlainMessage } from '@bufbuild/protobuf' +import { + EncryptedData, + MemberPayload_Mls_EpochSecrets, + MemberPayload_Mls_ExternalJoin, + MemberPayload_Mls_InitializeGroup, +} from '@river-build/proto' +import { GroupService } from '../group' +import { EpochSecret, EpochSecretService } from '../epoch' +import { ExternalGroupService } from '../externalGroup' +import { check, DLogger } from '@river-build/dlog' +import { isDefined } from '../../check' +import { EncryptedContentItem } from '@river-build/encryption' +import { + EncryptedContent, + isEncryptedContentKind, + toDecryptedContent, +} from '../../encryptedContentTypes' +import { Client } from '../../client' +import { IPersistenceStore } from '../../persistenceStore' +import { IAwaiter, IndefiniteAwaiter } from './awaiter' +import { IQueueService } from '../queue/queueService' + +type InitializeGroupMessage = PlainMessage +type ExternalJoinMessage = PlainMessage +type EpochSecretsMessage = PlainMessage + +type MlsEncryptedContentItem = { + streamId: string + eventId: string + kind: string + encryptedData: EncryptedData +} + +const encoder = new TextEncoder() +const decoder = new TextDecoder() + +function encode(text: string): Uint8Array { + return encoder.encode(text) +} + +function decode(bytes: Uint8Array): string { + return decoder.decode(bytes) +} + +export interface ICoordinator { + // Commands + joinOrCreateGroup(streamId: string): Promise + groupActive(streamId: string): void + newOpenEpochSecret(streamId: string, epoch: bigint): Promise + newSealedEpochSecret(streamId: string, epoch: bigint): Promise + announceEpochSecret(streamId: string, epoch: bigint): Promise + // Events + handleInitializeGroup(streamId: string, message: InitializeGroupMessage): Promise + handleExternalJoin(streamId: string, message: ExternalJoinMessage): Promise + handleEpochSecrets(streamId: string, message: EpochSecretsMessage): Promise + handleEncryptedContent( + streamId: string, + eventId: string, + message: EncryptedContent, + ): Promise +} + +export class Coordinator implements ICoordinator { + private epochSecretService!: EpochSecretService + private groupService!: GroupService + private externalGroupService!: ExternalGroupService + private decryptionFailures: Map> = new Map() + private client!: Client + private persistenceStore!: IPersistenceStore + private awaitingGroupActive: Map = new Map() + private queueService: IQueueService | undefined + + private log!: { + error: DLogger + debug: DLogger + } + + constructor() { + // nop + } + + // API needed by the client + // TODO: How long will be the timeout here? + public async encryptGroupEventEpochSecret( + streamId: string, + event: Message, + ): Promise { + const hasGroup = this.groupService.getGroup(streamId) !== undefined + if (!hasGroup) { + // No group so we request joining + // NOTE: We are enqueueing command instead of doing the async call + this.queueService?.enqueueCommand({ tag: 'joinOrCreateGroup', streamId }) + } + // Wait for the group to become active + await this.awaitGroupActive(streamId) + const activeGroup = this.groupService.getGroup(streamId) + if (activeGroup === undefined) { + throw new Error('Fatal: no group after awaitGroupActive') + } + + if (activeGroup.status !== 'GROUP_ACTIVE') { + throw new Error('Fatal: group is not active') + } + + const epoch = this.groupService.currentEpoch(activeGroup) + + let epochSecret = this.epochSecretService.getEpochSecret(streamId, epoch) + + if (epochSecret === undefined) { + // NOTE: queue has not processed new epoch event yet, force it manually + await this.newEpoch(streamId, epoch) + epochSecret = this.epochSecretService.getEpochSecret(streamId, epoch) + if (epochSecret === undefined) { + throw new Error('Fatal: epoch secret not found') + } + } + + const plaintext_ = event.toJsonString() + const plaintext = encode(plaintext_) + + return this.epochSecretService.encryptMessage(epochSecret, plaintext) + } + + // TODO: Maybe this could be refactored into a separate class + private async decryptGroupEvent( + epochSecret: EpochSecret, + streamId: string, + eventId: string, + kind: string, // kind of data + encryptedData: EncryptedData, + ) { + // this.logCall('decryptGroupEvent', streamId, eventId, kind, + // encryptedData) + const stream = this.client.stream(streamId) + check(isDefined(stream), 'stream not found') + check(isEncryptedContentKind(kind), `invalid kind ${kind}`) + + // check cache + let cleartext = await this.persistenceStore.getCleartext(eventId) + if (cleartext === undefined) { + const cleartext_ = await this.epochSecretService.decryptMessage( + epochSecret, + encryptedData, + ) + cleartext = decode(cleartext_) + } + const decryptedContent = toDecryptedContent(kind, cleartext) + + stream.updateDecryptedContent(eventId, decryptedContent) + } + + // # MLS Coordinator # + + public async handleInitializeGroup(streamId: string, message: InitializeGroupMessage) { + const group = this.groupService.getGroup(streamId) + if (group !== undefined) { + await this.groupService.handleInitializeGroup(group, message) + } + } + + public async handleExternalJoin(streamId: string, message: ExternalJoinMessage) { + const group = this.groupService.getGroup(streamId) + if (group !== undefined) { + await this.groupService.handleExternalJoin(group, message) + } + } + + public async handleEpochSecrets(streamId: string, message: EpochSecretsMessage) { + return this.epochSecretService.handleEpochSecrets(streamId, message) + } + + public async handleEncryptedContent( + streamId: string, + eventId: string, + message: EncryptedContent, + ) { + const encryptedData = message.content + // TODO: Check if message was encrypted with MLS + // const ciphertext = encryptedData.mls!.ciphertext + const epoch = encryptedData.mls!.epoch + const kind = message.kind + + const epochSecret = this.epochSecretService.getEpochSecret(streamId, epoch) + if (epochSecret === undefined) { + this.log.debug('Epoch secret not found', { streamId, epoch }) + this.enqueueDecryptionFailure(streamId, epoch, { + streamId, + eventId, + kind, + encryptedData, + }) + return + } + + // Decrypt immediately + return this.decryptGroupEvent(epochSecret, streamId, eventId, kind, encryptedData) + } + + private enqueueDecryptionFailure(streamId: string, epoch: bigint, item: EncryptedContentItem) { + let perStream = this.decryptionFailures.get(streamId) + if (perStream === undefined) { + perStream = new Map() + this.decryptionFailures.set(item.streamId, perStream) + } + let perEpoch = perStream.get(epoch) + if (perEpoch === undefined) { + perEpoch = [] + perStream.set(epoch, perEpoch) + } + perEpoch.push(item) + } + + public async initializeGroupMessage(streamId: string): Promise { + // TODO: Check preconditions + // TODO: Catch the error + return this.groupService.initializeGroupMessage(streamId) + } + + public async externalJoinMessage(streamId: string): Promise { + const externalGroup = await this.externalGroupService.getExternalGroup('streamId') + if (externalGroup === undefined) { + this.log.error('External group not found', { streamId }) + throw new Error('External group not found') + } + + const exportedTree = this.externalGroupService.exportTree(externalGroup) + const latestGroupInfo = this.externalGroupService.latestGroupInfo(externalGroup) + + return this.groupService.externalJoinMessage(streamId, latestGroupInfo, exportedTree) + } + // + // public epochSecretsMessage(streamId: string): EpochSecretsMessage { + // // TODO: Check preconditions + // return this.epochSecretService.epochSecretMessage(epochSecret) + // } + + public async joinOrCreateGroup(_streamId: string): Promise { + throw new Error('Not implemented') + } + + // NOTE: Critical section, no awaits permitted + public awaitGroupActive(streamId: string): Promise { + // this.log(`awaitGroupActive ${streamId}`) + if (this.groupService.getGroup(streamId)?.status === 'GROUP_ACTIVE') { + return Promise.resolve() + } + + let awaiter = this.awaitingGroupActive.get(streamId) + if (awaiter === undefined) { + const internalAwaiter = new IndefiniteAwaiter() + // NOTE: we clear after the promise has resolved + const promise = internalAwaiter.promise.finally(() => { + this.awaitingGroupActive.delete(streamId) + }) + awaiter = { + promise, + resolve: internalAwaiter.resolve, + } + this.awaitingGroupActive.set(streamId, awaiter) + } + + return awaiter.promise + } + + public groupActive(streamId: string): void { + const awaiter = this.awaitingGroupActive.get(streamId) + if (awaiter !== undefined) { + awaiter.resolve() + } + } + + public async newEpoch(streamId: string, epoch: bigint): Promise { + const epochAlreadyProcessed = + this.epochSecretService.getEpochSecret(streamId, epoch) !== undefined + if (epochAlreadyProcessed) { + return + } + + const group = this.groupService.getGroup(streamId) + if (group === undefined) { + throw new Error('Fatal: newEpoch called for missing group') + } + + if (group.status !== 'GROUP_ACTIVE') { + throw new Error('Fatal: newEpoch called for non-active group') + } + + if (this.groupService.currentEpoch(group) !== epoch) { + throw new Error('Fatal: newEpoch called for wrong epoch') + } + + const epochSecret = await this.groupService.exportEpochSecret(group) + await this.epochSecretService.addOpenEpochSecret(streamId, epoch, epochSecret) + this.queueService?.enqueueCommand({ tag: 'announceEpochSecret', streamId, epoch }) + } + + public async newOpenEpochSecret(streamId: string, _epoch: bigint): Promise { + const epochSecret = this.epochSecretService.getEpochSecret(streamId, _epoch) + if (epochSecret === undefined) { + throw new Error('Fatal: newEpochSecret called for missing epoch secret') + } + + if (epochSecret.derivedKeys === undefined) { + throw new Error('Fatal: missing derived keys for open secret') + } + + // TODO: Decrypt all messages for that particular epoch secret + const perStream = this.decryptionFailures.get(streamId) + if (perStream !== undefined) { + const perEpoch = perStream.get(_epoch) + if (perEpoch !== undefined) { + perStream.delete(_epoch) + // TODO: Can this be Promise.all? + for (const decryptionFailure of perEpoch) { + await this.decryptGroupEvent( + epochSecret, + decryptionFailure.streamId, + decryptionFailure.eventId, + decryptionFailure.kind, + decryptionFailure.encryptedData, + ) + } + } + } + + const previousEpochSecret = this.epochSecretService.getEpochSecret(streamId, _epoch - 1n) + if ( + previousEpochSecret !== undefined && + this.epochSecretService.canBeOpened(previousEpochSecret) + ) { + await this.epochSecretService.openSealedEpochSecret( + previousEpochSecret, + epochSecret.derivedKeys, + ) + } + } + + public async newSealedEpochSecret(streamId: string, epoch: bigint): Promise { + const epochSecret = this.epochSecretService.getEpochSecret(streamId, epoch) + if (epochSecret === undefined) { + throw new Error('Fatal: newSealedEpochSecret called for missing epoch secret') + } + + if (epochSecret.sealedEpochSecret === undefined) { + throw new Error('Fatal: missing sealed secret for sealed secret') + } + + // TODO: Maybe this can be Promise.all? + await this.tryOpeningSealedEpochSecret(epochSecret) + await this.tryAnnouncingSealedEpochSecret(epochSecret) + } + + private async tryOpeningSealedEpochSecret(sealedEpochSecret: EpochSecret): Promise { + if (sealedEpochSecret.sealedEpochSecret === undefined) { + throw new Error('Fatal: tryOpeningSealedEpochSecret called for missing sealed secret') + } + + // Already open + if (sealedEpochSecret.openEpochSecret !== undefined) { + return + } + + // Missing derived keys needed to open + const nextEpochSecret = this.epochSecretService.getEpochSecret( + sealedEpochSecret.streamId, + sealedEpochSecret.epoch + 1n, + ) + if (nextEpochSecret?.derivedKeys === undefined) { + return + } + + return this.epochSecretService.openSealedEpochSecret( + sealedEpochSecret, + nextEpochSecret.derivedKeys, + ) + } + + public async announceEpochSecret(_streamId: string, _epoch: bigint) { + // NOP + } + + private async tryAnnouncingSealedEpochSecret(epochSecret: EpochSecret): Promise { + if (epochSecret.sealedEpochSecret === undefined) { + throw new Error('Fatal: announceSealedEpoch called for missing sealed secret') + } + + if (epochSecret.announced) { + return + } + + const _message = this.epochSecretService.epochSecretMessage(epochSecret) + // TODO: Client sends message to the stream + throw new Error('Not finished') + } +} diff --git a/packages/sdk/src/mls/queue/queueService.ts b/packages/sdk/src/mls/queue/queueService.ts index 9c445107c7..57240d6b46 100644 --- a/packages/sdk/src/mls/queue/queueService.ts +++ b/packages/sdk/src/mls/queue/queueService.ts @@ -1,24 +1,15 @@ -import { Message, PlainMessage } from '@bufbuild/protobuf' +import { PlainMessage } from '@bufbuild/protobuf' import { - EncryptedData, MemberPayload_Mls_EpochSecrets, MemberPayload_Mls_ExternalJoin, MemberPayload_Mls_InitializeGroup, } from '@river-build/proto' -import { GroupService, IGroupServiceCoordinator } from '../group' -import { EpochSecret, EpochSecretService, IEpochSecretServiceCoordinator } from '../epoch' -import { ExternalGroupService } from '../externalGroup' -import { check, DLogger } from '@river-build/dlog' -import { isDefined, logNever } from '../../check' -import { EncryptedContentItem } from '@river-build/encryption' -import { - EncryptedContent, - isEncryptedContentKind, - toDecryptedContent, -} from '../../encryptedContentTypes' -import { Client } from '../../client' -import { IPersistenceStore } from '../../persistenceStore' -import { IAwaiter, IndefiniteAwaiter } from './awaiter' +import { IGroupServiceCoordinator } from '../group' +import { IEpochSecretServiceCoordinator } from '../epoch' +import { DLogger } from '@river-build/dlog' +import { logNever } from '../../check' +import { EncryptedContent } from '../../encryptedContentTypes' +import { ICoordinator } from '../coordinator/coordinator' type InitializeGroupMessage = PlainMessage type ExternalJoinMessage = PlainMessage @@ -101,357 +92,25 @@ type QueueEvent = | EpochSecretsEvent | EncryptedContentEvent -type MlsEncryptedContentItem = { - streamId: string - eventId: string - kind: string - encryptedData: EncryptedData -} - -const textEncoder = new TextEncoder() -const textDecoder = new TextDecoder() - -function encode(value: string): Uint8Array { - return textEncoder.encode(value) -} - -function decode(value: Uint8Array): string { - return textDecoder.decode(value) +export interface IQueueService { + enqueueCommand(command: QueueCommand): void + enqueueEvent(event: QueueEvent): void } // This feels more like a coordinator -export class QueueService { - private epochSecretService!: EpochSecretService - private groupService!: GroupService - private externalGroupService!: ExternalGroupService - private decryptionFailures: Map> = new Map() - private client!: Client - private persistenceStore!: IPersistenceStore - private awaitingGroupActive: Map = new Map() +export class QueueService implements IQueueService { + private coordinator: ICoordinator private log!: { error: DLogger debug: DLogger } - constructor() { + constructor(coordinator: ICoordinator) { + this.coordinator = coordinator // nop } - // API needed by the client - // TODO: How long will be the timeout here? - public async encryptGroupEventEpochSecret( - streamId: string, - event: Message, - ): Promise { - const hasGroup = this.groupService.getGroup(streamId) !== undefined - if (!hasGroup) { - // No group so we request joining - // NOTE: We are enqueueing command instead of doing the async call - this.enqueueCommand({ tag: 'joinOrCreateGroup', streamId }) - } - // Wait for the group to become active - await this.awaitGroupActive(streamId) - const activeGroup = this.groupService.getGroup(streamId) - if (activeGroup === undefined) { - throw new Error('Fatal: no group after awaitGroupActive') - } - - if (activeGroup.status !== 'GROUP_ACTIVE') { - throw new Error('Fatal: group is not active') - } - - const epoch = this.groupService.currentEpoch(activeGroup) - - let epochSecret = this.epochSecretService.getEpochSecret(streamId, epoch) - - if (epochSecret === undefined) { - // NOTE: queue has not processed new epoch event yet, force it manually - await this.newEpoch(streamId, epoch) - epochSecret = this.epochSecretService.getEpochSecret(streamId, epoch) - if (epochSecret === undefined) { - throw new Error('Fatal: epoch secret not found') - } - } - - const plaintext_ = event.toJsonString() - const plaintext = encode(plaintext_) - - return this.epochSecretService.encryptMessage(epochSecret, plaintext) - } - - // TODO: Maybe this could be refactored into a separate class - private async decryptGroupEvent( - epochSecret: EpochSecret, - streamId: string, - eventId: string, - kind: string, // kind of data - encryptedData: EncryptedData, - ) { - // this.logCall('decryptGroupEvent', streamId, eventId, kind, - // encryptedData) - const stream = this.client.stream(streamId) - check(isDefined(stream), 'stream not found') - check(isEncryptedContentKind(kind), `invalid kind ${kind}`) - - // check cache - let cleartext = await this.persistenceStore.getCleartext(eventId) - if (cleartext === undefined) { - const cleartext_ = await this.epochSecretService.decryptMessage( - epochSecret, - encryptedData, - ) - cleartext = decode(cleartext_) - } - const decryptedContent = toDecryptedContent(kind, cleartext) - - stream.updateDecryptedContent(eventId, decryptedContent) - } - - // # MLS Coordinator # - - public async handleInitializeGroup(streamId: string, message: InitializeGroupMessage) { - const group = this.groupService.getGroup(streamId) - if (group !== undefined) { - await this.groupService.handleInitializeGroup(group, message) - } - } - - public async handleExternalJoin(streamId: string, message: ExternalJoinMessage) { - const group = this.groupService.getGroup(streamId) - if (group !== undefined) { - await this.groupService.handleExternalJoin(group, message) - } - } - - public async handleEpochSecrets(streamId: string, message: EpochSecretsMessage) { - return this.epochSecretService.handleEpochSecrets(streamId, message) - } - - public async handleEncryptedContent( - streamId: string, - eventId: string, - message: EncryptedContent, - ) { - const encryptedData = message.content - // TODO: Check if message was encrypted with MLS - // const ciphertext = encryptedData.mls!.ciphertext - const epoch = encryptedData.mls!.epoch - const kind = message.kind - - const epochSecret = this.epochSecretService.getEpochSecret(streamId, epoch) - if (epochSecret === undefined) { - this.log.debug('Epoch secret not found', { streamId, epoch }) - this.enqueueDecryptionFailure(streamId, epoch, { - streamId, - eventId, - kind, - encryptedData, - }) - return - } - - // Decrypt immediately - return this.decryptGroupEvent(epochSecret, streamId, eventId, kind, encryptedData) - } - - private enqueueDecryptionFailure(streamId: string, epoch: bigint, item: EncryptedContentItem) { - let perStream = this.decryptionFailures.get(streamId) - if (perStream === undefined) { - perStream = new Map() - this.decryptionFailures.set(item.streamId, perStream) - } - let perEpoch = perStream.get(epoch) - if (perEpoch === undefined) { - perEpoch = [] - perStream.set(epoch, perEpoch) - } - perEpoch.push(item) - } - - public async initializeGroupMessage(streamId: string): Promise { - // TODO: Check preconditions - // TODO: Catch the error - return this.groupService.initializeGroupMessage(streamId) - } - - public async externalJoinMessage(streamId: string): Promise { - const externalGroup = await this.externalGroupService.getExternalGroup('streamId') - if (externalGroup === undefined) { - this.log.error('External group not found', { streamId }) - throw new Error('External group not found') - } - - const exportedTree = this.externalGroupService.exportTree(externalGroup) - const latestGroupInfo = this.externalGroupService.latestGroupInfo(externalGroup) - - return this.groupService.externalJoinMessage(streamId, latestGroupInfo, exportedTree) - } - // - // public epochSecretsMessage(streamId: string): EpochSecretsMessage { - // // TODO: Check preconditions - // return this.epochSecretService.epochSecretMessage(epochSecret) - // } - - public async joinOrCreateGroup(_streamId: string): Promise { - throw new Error('Not implemented') - } - - // NOTE: Critical section, no awaits permitted - public awaitGroupActive(streamId: string): Promise { - // this.log(`awaitGroupActive ${streamId}`) - if (this.groupService.getGroup(streamId)?.status === 'GROUP_ACTIVE') { - return Promise.resolve() - } - - let awaiter = this.awaitingGroupActive.get(streamId) - if (awaiter === undefined) { - const internalAwaiter = new IndefiniteAwaiter() - // NOTE: we clear after the promise has resolved - const promise = internalAwaiter.promise.finally(() => { - this.awaitingGroupActive.delete(streamId) - }) - awaiter = { - promise, - resolve: internalAwaiter.resolve, - } - this.awaitingGroupActive.set(streamId, awaiter) - } - - return awaiter.promise - } - - public groupActive(streamId: string): void { - const awaiter = this.awaitingGroupActive.get(streamId) - if (awaiter !== undefined) { - awaiter.resolve() - } - } - - public async newEpoch(streamId: string, epoch: bigint): Promise { - const epochAlreadyProcessed = - this.epochSecretService.getEpochSecret(streamId, epoch) !== undefined - if (epochAlreadyProcessed) { - return - } - - const group = this.groupService.getGroup(streamId) - if (group === undefined) { - throw new Error('Fatal: newEpoch called for missing group') - } - - if (group.status !== 'GROUP_ACTIVE') { - throw new Error('Fatal: newEpoch called for non-active group') - } - - if (this.groupService.currentEpoch(group) !== epoch) { - throw new Error('Fatal: newEpoch called for wrong epoch') - } - - const epochSecret = await this.groupService.exportEpochSecret(group) - await this.epochSecretService.addOpenEpochSecret(streamId, epoch, epochSecret) - this.enqueueCommand({ tag: 'announceEpochSecret', streamId, epoch }) - } - - public async newOpenEpochSecret(streamId: string, _epoch: bigint): Promise { - const epochSecret = this.epochSecretService.getEpochSecret(streamId, _epoch) - if (epochSecret === undefined) { - throw new Error('Fatal: newEpochSecret called for missing epoch secret') - } - - if (epochSecret.derivedKeys === undefined) { - throw new Error('Fatal: missing derived keys for open secret') - } - - // TODO: Decrypt all messages for that particular epoch secret - const perStream = this.decryptionFailures.get(streamId) - if (perStream !== undefined) { - const perEpoch = perStream.get(_epoch) - if (perEpoch !== undefined) { - perStream.delete(_epoch) - // TODO: Can this be Promise.all? - for (const decryptionFailure of perEpoch) { - await this.decryptGroupEvent( - epochSecret, - decryptionFailure.streamId, - decryptionFailure.eventId, - decryptionFailure.kind, - decryptionFailure.encryptedData, - ) - } - } - } - - const previousEpochSecret = this.epochSecretService.getEpochSecret(streamId, _epoch - 1n) - if ( - previousEpochSecret !== undefined && - this.epochSecretService.canBeOpened(previousEpochSecret) - ) { - await this.epochSecretService.openSealedEpochSecret( - previousEpochSecret, - epochSecret.derivedKeys, - ) - } - } - - private async newSealedEpochSecret(streamId: string, epoch: bigint): Promise { - const epochSecret = this.epochSecretService.getEpochSecret(streamId, epoch) - if (epochSecret === undefined) { - throw new Error('Fatal: newSealedEpochSecret called for missing epoch secret') - } - - if (epochSecret.sealedEpochSecret === undefined) { - throw new Error('Fatal: missing sealed secret for sealed secret') - } - - // TODO: Maybe this can be Promise.all? - await this.tryOpeningSealedEpochSecret(epochSecret) - await this.tryAnnouncingSealedEpochSecret(epochSecret) - } - - private async tryOpeningSealedEpochSecret(sealedEpochSecret: EpochSecret): Promise { - if (sealedEpochSecret.sealedEpochSecret === undefined) { - throw new Error('Fatal: tryOpeningSealedEpochSecret called for missing sealed secret') - } - - // Already open - if (sealedEpochSecret.openEpochSecret !== undefined) { - return - } - - // Missing derived keys needed to open - const nextEpochSecret = this.epochSecretService.getEpochSecret( - sealedEpochSecret.streamId, - sealedEpochSecret.epoch + 1n, - ) - if (nextEpochSecret?.derivedKeys === undefined) { - return - } - - return this.epochSecretService.openSealedEpochSecret( - sealedEpochSecret, - nextEpochSecret.derivedKeys, - ) - } - - public async announceEpochSecret(_streamId: string, _epoch: bigint) { - // NOP - } - - private async tryAnnouncingSealedEpochSecret(epochSecret: EpochSecret): Promise { - if (epochSecret.sealedEpochSecret === undefined) { - throw new Error('Fatal: announceSealedEpoch called for missing sealed secret') - } - - if (epochSecret.announced) { - return - } - - const message = this.epochSecretService.epochSecretMessage(epochSecret) - // TODO: Client sends message to the stream - throw new Error('Not finished') - } - // # Queue-related operations # // Queue-related fields @@ -557,17 +216,17 @@ export class QueueService { public async processCommand(command: QueueCommand): Promise { switch (command.tag) { case 'joinOrCreateGroup': - return this.joinOrCreateGroup(command.streamId) + return this.coordinator.joinOrCreateGroup(command.streamId) case 'groupActive': - return this.groupActive(command.streamId) + return this.coordinator.groupActive(command.streamId) case 'newEpoch': - return this.newOpenEpochSecret(command.streamId, command.epoch) + return this.coordinator.newOpenEpochSecret(command.streamId, command.epoch) case 'newOpenEpochSecret': - return this.newOpenEpochSecret(command.streamId, command.epoch) + return this.coordinator.newOpenEpochSecret(command.streamId, command.epoch) case 'newSealedEpochSecret': - return this.newSealedEpochSecret(command.streamId, command.epoch) + return this.coordinator.newSealedEpochSecret(command.streamId, command.epoch) case 'announceEpochSecret': - return this.announceEpochSecret(command.streamId, command.epoch) + return this.coordinator.announceEpochSecret(command.streamId, command.epoch) default: logNever(command) } @@ -576,18 +235,21 @@ export class QueueService { public async processEvent(event: QueueEvent): Promise { switch (event.tag) { case 'initializeGroup': - return this.handleInitializeGroup(event.streamId, event.message) + return this.coordinator.handleInitializeGroup(event.streamId, event.message) case 'externalJoin': - return this.handleExternalJoin(event.streamId, event.message) + return this.coordinator.handleExternalJoin(event.streamId, event.message) case 'epochSecrets': - return this.handleEpochSecrets(event.streamId, event.message) + return this.coordinator.handleEpochSecrets(event.streamId, event.message) case 'encryptedContent': - return this.handleEncryptedContent(event.streamId, event.eventId, event.message) + return this.coordinator.handleEncryptedContent( + event.streamId, + event.eventId, + event.message, + ) default: logNever(event) } } - } export class GroupServiceCoordinatorAdapter implements IGroupServiceCoordinator { From 6b2d67e5e94e9ad597fa086cf4167a68ea9385ba Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 15:06:58 +0100 Subject: [PATCH 12/17] Add index to queue and coordinator --- packages/sdk/src/mls/coordinator/coordinator.ts | 2 +- packages/sdk/src/mls/coordinator/index.ts | 2 ++ packages/sdk/src/mls/queue/index.ts | 1 + packages/sdk/src/mls/queue/queueService.ts | 2 +- 4 files changed, 5 insertions(+), 2 deletions(-) create mode 100644 packages/sdk/src/mls/coordinator/index.ts create mode 100644 packages/sdk/src/mls/queue/index.ts diff --git a/packages/sdk/src/mls/coordinator/coordinator.ts b/packages/sdk/src/mls/coordinator/coordinator.ts index 2393577400..e384310057 100644 --- a/packages/sdk/src/mls/coordinator/coordinator.ts +++ b/packages/sdk/src/mls/coordinator/coordinator.ts @@ -19,7 +19,7 @@ import { import { Client } from '../../client' import { IPersistenceStore } from '../../persistenceStore' import { IAwaiter, IndefiniteAwaiter } from './awaiter' -import { IQueueService } from '../queue/queueService' +import { IQueueService } from '../queue' type InitializeGroupMessage = PlainMessage type ExternalJoinMessage = PlainMessage diff --git a/packages/sdk/src/mls/coordinator/index.ts b/packages/sdk/src/mls/coordinator/index.ts new file mode 100644 index 0000000000..df7c80d9f0 --- /dev/null +++ b/packages/sdk/src/mls/coordinator/index.ts @@ -0,0 +1,2 @@ +export * from './awaiter' +export * from './coordinator' diff --git a/packages/sdk/src/mls/queue/index.ts b/packages/sdk/src/mls/queue/index.ts new file mode 100644 index 0000000000..eb682cb75e --- /dev/null +++ b/packages/sdk/src/mls/queue/index.ts @@ -0,0 +1 @@ +export * from './queueService' diff --git a/packages/sdk/src/mls/queue/queueService.ts b/packages/sdk/src/mls/queue/queueService.ts index 57240d6b46..182c3d9790 100644 --- a/packages/sdk/src/mls/queue/queueService.ts +++ b/packages/sdk/src/mls/queue/queueService.ts @@ -9,7 +9,7 @@ import { IEpochSecretServiceCoordinator } from '../epoch' import { DLogger } from '@river-build/dlog' import { logNever } from '../../check' import { EncryptedContent } from '../../encryptedContentTypes' -import { ICoordinator } from '../coordinator/coordinator' +import { ICoordinator } from '../coordinator' type InitializeGroupMessage = PlainMessage type ExternalJoinMessage = PlainMessage From 0ca7d0028ce4b4532cac613a6e94d99c6e75224e Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 16:29:46 +0100 Subject: [PATCH 13/17] Fix the constructor for Coordinator --- .../sdk/src/mls/coordinator/coordinator.ts | 70 +++++++++++++++---- .../sdk/src/mls/epoch/epochSecretService.ts | 31 +++++--- .../sdk/src/mls/epoch/epochSecretStore.ts | 6 -- packages/sdk/src/mls/group/crypto.ts | 5 ++ packages/sdk/src/mls/group/groupStore.ts | 6 +- packages/sdk/src/mls/group/index.ts | 1 + packages/sdk/src/mls/queue/queueService.ts | 8 +-- 7 files changed, 90 insertions(+), 37 deletions(-) diff --git a/packages/sdk/src/mls/coordinator/coordinator.ts b/packages/sdk/src/mls/coordinator/coordinator.ts index e384310057..a76c17e19b 100644 --- a/packages/sdk/src/mls/coordinator/coordinator.ts +++ b/packages/sdk/src/mls/coordinator/coordinator.ts @@ -5,10 +5,10 @@ import { MemberPayload_Mls_ExternalJoin, MemberPayload_Mls_InitializeGroup, } from '@river-build/proto' -import { GroupService } from '../group' -import { EpochSecret, EpochSecretService } from '../epoch' -import { ExternalGroupService } from '../externalGroup' -import { check, DLogger } from '@river-build/dlog' +import { GroupService, InMemoryGroupStore, Crypto } from '../group' +import { EpochSecret, EpochSecretService, InMemoryEpochSecretStore } from '../epoch' +import { ExternalCrypto, ExternalGroupService } from '../externalGroup' +import { check, dlog, DLogger } from '@river-build/dlog' import { isDefined } from '../../check' import { EncryptedContentItem } from '@river-build/encryption' import { @@ -19,7 +19,13 @@ import { import { Client } from '../../client' import { IPersistenceStore } from '../../persistenceStore' import { IAwaiter, IndefiniteAwaiter } from './awaiter' -import { IQueueService } from '../queue' +import { + IQueueService, + QueueService, + EpochSecretServiceCoordinatorAdapter, + GroupServiceCoordinatorAdapter, +} from '../queue' +import { addressFromUserId } from '../../id' type InitializeGroupMessage = PlainMessage type ExternalJoinMessage = PlainMessage @@ -61,23 +67,63 @@ export interface ICoordinator { ): Promise } +const defaultLogger = dlog('csb:mls:coordinator') + export class Coordinator implements ICoordinator { - private epochSecretService!: EpochSecretService - private groupService!: GroupService - private externalGroupService!: ExternalGroupService + private userId: string + private readonly userAddress: Uint8Array + private readonly deviceKey: Uint8Array + private epochSecretService: EpochSecretService + private groupService: GroupService + private externalGroupService: ExternalGroupService private decryptionFailures: Map> = new Map() private client!: Client private persistenceStore!: IPersistenceStore private awaitingGroupActive: Map = new Map() - private queueService: IQueueService | undefined + private queueService: IQueueService - private log!: { + private log: { error: DLogger debug: DLogger } - constructor() { - // nop + constructor( + userId: string, + deviceKey: Uint8Array, + client: Client, + persistenceStore: IPersistenceStore, + opts?: { log: DLogger }, + ) { + this.client = client + this.persistenceStore = persistenceStore + const logger = opts?.log ?? defaultLogger + this.log = { + debug: logger.extend('debug'), + error: logger.extend('error'), + } + + this.userId = userId + this.userAddress = addressFromUserId(userId) + this.deviceKey = deviceKey + + // Composing all the dependencies + this.queueService = new QueueService(this) + this.externalGroupService = new ExternalGroupService(new ExternalCrypto(), opts) + const groupStore = new InMemoryGroupStore() + const crypto = new Crypto(this.userAddress, this.deviceKey, opts) + const groupServiceCoordinator = new GroupServiceCoordinatorAdapter(this.queueService) + + this.groupService = new GroupService(groupStore, crypto, groupServiceCoordinator, opts) + const epochSecretStore = new InMemoryEpochSecretStore() + const epochSecretServiceCoordinator = new EpochSecretServiceCoordinatorAdapter( + this.queueService, + ) + this.epochSecretService = new EpochSecretService( + crypto.ciphersuite(), + epochSecretStore, + epochSecretServiceCoordinator, + opts, + ) } // API needed by the client diff --git a/packages/sdk/src/mls/epoch/epochSecretService.ts b/packages/sdk/src/mls/epoch/epochSecretService.ts index da4297f91e..2b46a46820 100644 --- a/packages/sdk/src/mls/epoch/epochSecretService.ts +++ b/packages/sdk/src/mls/epoch/epochSecretService.ts @@ -5,7 +5,7 @@ import { HpkeSecretKey, Secret as MlsSecret, } from '@river-build/mls-rs-wasm' -import { bin_toHexString, DLogger, shortenHexString } from '@river-build/dlog' +import { bin_toHexString, dlog, DLogger, shortenHexString } from '@river-build/dlog' import { DerivedKeys, EpochSecret, EpochSecretId, epochSecretId } from './epochSecret' import { EncryptedData, MemberPayload_Mls_EpochSecrets } from '@river-build/proto' import { IEpochSecretStore } from './epochSecretStore' @@ -20,23 +20,32 @@ export interface IEpochSecretServiceCoordinator { newSealedEpochSecret(streamId: string, epoch: bigint): void } +const defaultLogger = dlog('csb:mls:epochSecretService') + export class EpochSecretService { private epochSecretStore: IEpochSecretStore private cipherSuite: MlsCipherSuite private cache: Map = new Map() private coordinator?: IEpochSecretServiceCoordinator - log: DLogger + private log: { + error: DLogger + debug: DLogger + } public constructor( cipherSuite: MlsCipherSuite, epochSecretStore: IEpochSecretStore, - log: DLogger, coordinator?: IEpochSecretServiceCoordinator, + opts?: { log: DLogger }, ) { - this.log = log this.cipherSuite = cipherSuite this.epochSecretStore = epochSecretStore this.coordinator = coordinator + const logger = opts?.log ?? defaultLogger + this.log = { + debug: logger.extend('debug'), + error: logger.extend('error'), + } } /// Gets epochKey from the cache @@ -61,7 +70,7 @@ export class EpochSecretService { } private async saveEpochSecret(epochSecret: EpochSecret): Promise { - this.log('saveEpochSecret', { + this.log.debug('saveEpochSecret', { streamId: epochSecret.streamId, epoch: epochSecret.epoch, }) @@ -74,7 +83,7 @@ export class EpochSecretService { epochKey: EpochSecret, { publicKey }: { publicKey: Uint8Array }, ): Promise { - this.log('sealEpochSecret', { + this.log.debug('sealEpochSecret', { streamId: epochKey.streamId, epoch: epochKey.epoch, publicKey: shortenHexString(bin_toHexString(publicKey)), @@ -102,7 +111,7 @@ export class EpochSecretService { epoch: bigint, sealedEpochSecret: Uint8Array, ): Promise { - this.log('addSealedEpochSecret', { + this.log.debug('addSealedEpochSecret', { streamId, epoch, sealedEpochSecretBytes: shortenHexString(bin_toHexString(sealedEpochSecret)), @@ -124,7 +133,7 @@ export class EpochSecretService { epoch: bigint, openEpochSecret: Uint8Array, ): Promise { - this.log('addOpenEpochSecret', { + this.log.debug('addOpenEpochSecret', { streamId, epoch, openEpochSecret: shortenHexString(bin_toHexString(openEpochSecret)), @@ -156,7 +165,7 @@ export class EpochSecretService { epochSecret: EpochSecret, nextEpochKeys: DerivedKeys, ): Promise { - this.log('openSealedEpochSecret', { + this.log.debug('openSealedEpochSecret', { streamId: epochSecret.streamId, epoch: epochSecret.epoch, }) @@ -186,7 +195,7 @@ export class EpochSecretService { epochSecret: EpochSecret, message: Uint8Array, ): Promise { - this.log('encryptMessage', { + this.log.debug('encryptMessage', { streamId: epochSecret.streamId, epoch: epochSecret.epoch, }) @@ -212,7 +221,7 @@ export class EpochSecretService { epochSecret: EpochSecret, message: EncryptedData, ): Promise { - this.log('decryptMessage', { + this.log.debug('decryptMessage', { streamId: epochSecret.streamId, epoch: epochSecret.epoch, }) diff --git a/packages/sdk/src/mls/epoch/epochSecretStore.ts b/packages/sdk/src/mls/epoch/epochSecretStore.ts index c3783b9095..753f508d5d 100644 --- a/packages/sdk/src/mls/epoch/epochSecretStore.ts +++ b/packages/sdk/src/mls/epoch/epochSecretStore.ts @@ -1,5 +1,4 @@ import { EpochSecret, epochSecretId, EpochSecretId } from './epochSecret' -import { DLogger } from '@river-build/dlog' export interface IEpochSecretStore { getEpochSecret(streamId: string, epoch: bigint): Promise @@ -9,11 +8,6 @@ export interface IEpochSecretStore { export class InMemoryEpochSecretStore implements IEpochSecretStore { private epochKeySates: Map = new Map() - log: DLogger - - constructor(log: DLogger) { - this.log = log - } public async getEpochSecret(streamId: string, epoch: bigint): Promise { const epochId: EpochSecretId = epochSecretId(streamId, epoch) diff --git a/packages/sdk/src/mls/group/crypto.ts b/packages/sdk/src/mls/group/crypto.ts index 14bbcae4aa..0e12eae681 100644 --- a/packages/sdk/src/mls/group/crypto.ts +++ b/packages/sdk/src/mls/group/crypto.ts @@ -2,6 +2,7 @@ import { Client as MlsClient, ExportedTree as MlsExportedTree, Group as MlsGroup, + CipherSuite as MlsCipherSuite, MlsMessage, } from '@river-build/mls-rs-wasm' import { dlog, DLogger } from '@river-build/dlog' @@ -116,4 +117,8 @@ export class Crypto { const secret = await group.group.currentEpochSecret() return secret.toBytes() } + + public ciphersuite(): MlsCipherSuite { + return new MlsCipherSuite() + } } diff --git a/packages/sdk/src/mls/group/groupStore.ts b/packages/sdk/src/mls/group/groupStore.ts index fb4727a287..f93858210b 100644 --- a/packages/sdk/src/mls/group/groupStore.ts +++ b/packages/sdk/src/mls/group/groupStore.ts @@ -1,4 +1,3 @@ -import { DLogger } from '@river-build/dlog' import { Group } from './group' // Group DTO replaces group with groupId @@ -11,12 +10,11 @@ export interface IGroupStore { clearGroup(streamId: string): Promise } + export class InMemoryGroupStore implements IGroupStore { private groups: Map = new Map() - log: DLogger - constructor(log: DLogger) { - this.log = log + constructor() { } public async hasGroup(streamId: string): Promise { diff --git a/packages/sdk/src/mls/group/index.ts b/packages/sdk/src/mls/group/index.ts index 3a057edb3f..8e03f36eda 100644 --- a/packages/sdk/src/mls/group/index.ts +++ b/packages/sdk/src/mls/group/index.ts @@ -1,4 +1,5 @@ // Re-export everything for now +export * from './crypto' export * from './group' export * from './groupService' export * from './groupStore' diff --git a/packages/sdk/src/mls/queue/queueService.ts b/packages/sdk/src/mls/queue/queueService.ts index 182c3d9790..0c91d62e42 100644 --- a/packages/sdk/src/mls/queue/queueService.ts +++ b/packages/sdk/src/mls/queue/queueService.ts @@ -253,9 +253,9 @@ export class QueueService implements IQueueService { } export class GroupServiceCoordinatorAdapter implements IGroupServiceCoordinator { - public readonly queueService: QueueService + public readonly queueService: IQueueService - constructor(queueService: QueueService) { + constructor(queueService: IQueueService) { this.queueService = queueService } @@ -271,9 +271,9 @@ export class GroupServiceCoordinatorAdapter implements IGroupServiceCoordinator } export class EpochSecretServiceCoordinatorAdapter implements IEpochSecretServiceCoordinator { - public readonly queueService: QueueService + public readonly queueService: IQueueService - constructor(queueService: QueueService) { + constructor(queueService: IQueueService) { this.queueService = queueService } From 67fc977a97c0ab06f57e93af1054b2c0815e741d Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 16:42:41 +0100 Subject: [PATCH 14/17] Start working on joinOrCreateGroup flow --- packages/sdk/src/mls/coordinator/coordinator.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/sdk/src/mls/coordinator/coordinator.ts b/packages/sdk/src/mls/coordinator/coordinator.ts index a76c17e19b..fa52c9ad82 100644 --- a/packages/sdk/src/mls/coordinator/coordinator.ts +++ b/packages/sdk/src/mls/coordinator/coordinator.ts @@ -282,6 +282,11 @@ export class Coordinator implements ICoordinator { // } public async joinOrCreateGroup(_streamId: string): Promise { + const hasGroup = this.groupService.getGroup(_streamId) !== undefined + if (hasGroup) { + return + } + throw new Error('Not implemented') } From f61c6fd989562d292a72a76a5b3c26a9f4db2e86 Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 17:42:21 +0100 Subject: [PATCH 15/17] Final set of changes for Coordinator --- packages/sdk/src/client.ts | 10 +- .../sdk/src/mls/coordinator/coordinator.ts | 121 ++++++++++++++---- .../sdk/src/mls/epoch/epochSecretService.ts | 22 +++- .../mls/externalGroup/externalGroupService.ts | 12 +- packages/sdk/src/mls/utils/mlsutils.ts | 13 +- packages/sdk/src/tests/multi_ne/mls.test.ts | 2 +- packages/sdk/src/tests/unit/mls_epoch.test.ts | 10 +- 7 files changed, 146 insertions(+), 44 deletions(-) diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 9158ff171e..28ce628d42 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -152,7 +152,7 @@ import { SignerContext } from './signerContext' import { decryptAESGCM, deriveKeyAndIV, encryptAESGCM, uint8ArrayToBase64 } from './crypto_utils' import { makeTags, makeTipTags } from './tags' import { TipEventObject } from '@river-build/generated/dev/typings/ITipping' -import { extractMlsExternalGroup } from './mls/utils/mlsutils' +import {extractMlsExternalGroup, ExtractMlsExternalGroupResult} from './mls/utils/mlsutils' export type ClientEvents = StreamEvents & DecryptionEvents @@ -2527,11 +2527,9 @@ export class Client }) } - public async getMlsExternalGroupInfo(streamId: string): Promise<{ - externalGroupSnapshot: Uint8Array - groupInfoMessage: Uint8Array - commits: { commit: Uint8Array; groupInfoMessage: Uint8Array }[] - }> { + public async getMlsExternalGroupInfo( + streamId: string, + ): Promise { let streamView = this.stream(streamId)?.view if (!streamView || !streamView.isInitialized) { streamView = await this.getStream(streamId) diff --git a/packages/sdk/src/mls/coordinator/coordinator.ts b/packages/sdk/src/mls/coordinator/coordinator.ts index fa52c9ad82..e767044009 100644 --- a/packages/sdk/src/mls/coordinator/coordinator.ts +++ b/packages/sdk/src/mls/coordinator/coordinator.ts @@ -4,6 +4,7 @@ import { MemberPayload_Mls_EpochSecrets, MemberPayload_Mls_ExternalJoin, MemberPayload_Mls_InitializeGroup, + StreamEvent, } from '@river-build/proto' import { GroupService, InMemoryGroupStore, Crypto } from '../group' import { EpochSecret, EpochSecretService, InMemoryEpochSecretStore } from '../epoch' @@ -26,6 +27,7 @@ import { GroupServiceCoordinatorAdapter, } from '../queue' import { addressFromUserId } from '../../id' +import { make_MemberPayload_Mls } from '../../types' type InitializeGroupMessage = PlainMessage type ExternalJoinMessage = PlainMessage @@ -80,7 +82,7 @@ export class Coordinator implements ICoordinator { private client!: Client private persistenceStore!: IPersistenceStore private awaitingGroupActive: Map = new Map() - private queueService: IQueueService + private readonly queueService: IQueueService private log: { error: DLogger @@ -257,37 +259,74 @@ export class Coordinator implements ICoordinator { perEpoch.push(item) } - public async initializeGroupMessage(streamId: string): Promise { + private async initializeGroupMessage(streamId: string): Promise { // TODO: Check preconditions // TODO: Catch the error return this.groupService.initializeGroupMessage(streamId) } - public async externalJoinMessage(streamId: string): Promise { - const externalGroup = await this.externalGroupService.getExternalGroup('streamId') - if (externalGroup === undefined) { - this.log.error('External group not found', { streamId }) - throw new Error('External group not found') + private async externalJoinMessage( + streamId: string, + externalInfo: { + externalGroupSnapshot: Uint8Array + groupInfoMessage: Uint8Array + commits: { commit: Uint8Array; groupInfoMessage: Uint8Array }[] + }, + ): Promise { + const externalGroup = await this.externalGroupService.loadSnapshot( + streamId, + externalInfo.externalGroupSnapshot, + ) + for (const commit of externalInfo.commits) { + await this.externalGroupService.processCommit(externalGroup, commit.commit) } - const exportedTree = this.externalGroupService.exportTree(externalGroup) - const latestGroupInfo = this.externalGroupService.latestGroupInfo(externalGroup) + const latestGroupInfo = externalInfo.groupInfoMessage return this.groupService.externalJoinMessage(streamId, latestGroupInfo, exportedTree) } - // - // public epochSecretsMessage(streamId: string): EpochSecretsMessage { - // // TODO: Check preconditions - // return this.epochSecretService.epochSecretMessage(epochSecret) - // } - - public async joinOrCreateGroup(_streamId: string): Promise { - const hasGroup = this.groupService.getGroup(_streamId) !== undefined + + private async epochSecretsMessage(epochSecret: EpochSecret): Promise { + // TODO: Check preconditions + return this.epochSecretService.epochSecretMessage(epochSecret) + } + + public async joinOrCreateGroup(streamId: string): Promise { + const hasGroup = this.groupService.getGroup(streamId) !== undefined if (hasGroup) { return } + const externalInfo = await this.client.getMlsExternalGroupInfo(streamId) - throw new Error('Not implemented') + let joinOrCreateGroupMessage: PlainMessage['payload'] + + if (externalInfo === undefined) { + const initializeGroupMessage = await this.initializeGroupMessage(streamId) + joinOrCreateGroupMessage = make_MemberPayload_Mls({ + content: { + case: 'initializeGroup', + value: initializeGroupMessage, + }, + }) + } else { + const externalJoinMessage = await this.externalJoinMessage(streamId, externalInfo) + joinOrCreateGroupMessage = make_MemberPayload_Mls({ + content: { + case: 'externalJoin', + value: externalJoinMessage, + }, + }) + } + + // Send message to the node + try { + await this.client.makeEventAndAddToStream(streamId, joinOrCreateGroupMessage) + } catch (e) { + this.log.error('Failed to join or create group', { streamId, error: e }) + if (this.groupService.getGroup(streamId) !== undefined) { + await this.groupService.clearGroup(streamId) + } + } } // NOTE: Critical section, no awaits permitted @@ -428,10 +467,35 @@ export class Coordinator implements ICoordinator { } public async announceEpochSecret(_streamId: string, _epoch: bigint) { - // NOP + let epochSecret = this.epochSecretService.getEpochSecret(_streamId, _epoch) + if (epochSecret === undefined) { + throw new Error('Fatal: announceEpochSecret called for missing epoch secret') + } + + if (epochSecret.sealedEpochSecret === undefined) { + const nextEpochKey = this.epochSecretService.getEpochSecret( + epochSecret.streamId, + epochSecret.epoch + 1n, + ) + if (nextEpochKey?.derivedKeys !== undefined) { + await this.epochSecretService.sealEpochSecret(epochSecret, nextEpochKey.derivedKeys) + epochSecret = this.epochSecretService.getEpochSecret( + epochSecret.streamId, + epochSecret.epoch, + ) + if (epochSecret === undefined) { + throw new Error('Fatal: epoch secret not found after sealing') + } + } + } + + return this.tryAnnouncingSealedEpochSecret(epochSecret) } private async tryAnnouncingSealedEpochSecret(epochSecret: EpochSecret): Promise { + const streamId = epochSecret.streamId + const epoch = epochSecret.epoch + if (epochSecret.sealedEpochSecret === undefined) { throw new Error('Fatal: announceSealedEpoch called for missing sealed secret') } @@ -440,8 +504,21 @@ export class Coordinator implements ICoordinator { return } - const _message = this.epochSecretService.epochSecretMessage(epochSecret) - // TODO: Client sends message to the stream - throw new Error('Not finished') + const epochSecretsMessage = await this.epochSecretsMessage(epochSecret) + + try { + await this.client.makeEventAndAddToStream( + streamId, + make_MemberPayload_Mls({ + content: { + case: 'epochSecrets', + value: epochSecretsMessage, + }, + }), + ) + } catch (e) { + this.log.error('Failed to announce epoch secret', { streamId, epoch, error: e }) + this.queueService.enqueueCommand({ tag: 'announceEpochSecret', streamId, epoch }) + } } } diff --git a/packages/sdk/src/mls/epoch/epochSecretService.ts b/packages/sdk/src/mls/epoch/epochSecretService.ts index 2b46a46820..bebbc4b1db 100644 --- a/packages/sdk/src/mls/epoch/epochSecretService.ts +++ b/packages/sdk/src/mls/epoch/epochSecretService.ts @@ -250,11 +250,29 @@ export class EpochSecretService { } public async handleEpochSecrets(_streamId: string, _message: EpochSecretsMessage) { - throw new Error('Not implemented') + for (const epochSecret of _message.secrets) { + await this.addAnnouncedSealedEpochSecret( + _streamId, + epochSecret.epoch, + epochSecret.secret, + ) + this.coordinator?.newSealedEpochSecret(_streamId, epochSecret.epoch) + } } public epochSecretMessage(_epochSecret: EpochSecret): EpochSecretsMessage { - throw new Error('Not implemented') + if (_epochSecret.sealedEpochSecret === undefined) { + throw new Error('Fatal: epoch secret not sealed') + } + + return { + secrets: [ + { + epoch: _epochSecret.epoch, + secret: _epochSecret.sealedEpochSecret, + }, + ], + } } public isOpen(epochSecret: EpochSecret): boolean { diff --git a/packages/sdk/src/mls/externalGroup/externalGroupService.ts b/packages/sdk/src/mls/externalGroup/externalGroupService.ts index 1cf8ff7c51..efc2b53a72 100644 --- a/packages/sdk/src/mls/externalGroup/externalGroupService.ts +++ b/packages/sdk/src/mls/externalGroup/externalGroupService.ts @@ -21,15 +21,15 @@ export class ExternalGroupService { } } - public async getExternalGroup(_streamId: string): Promise { - throw new Error('Not implemented') + public exportTree(group: ExternalGroup): Uint8Array { + return this.crypto.exportTree(group) } - public exportTree(_group: ExternalGroup): Uint8Array { - throw new Error('Not implemented') + public async loadSnapshot(streamId: string, snapshot: Uint8Array): Promise { + return this.crypto.loadExternalGroupFromSnapshot(streamId, snapshot) } - public latestGroupInfo(_group: ExternalGroup): Uint8Array { - throw new Error('Not implemented') + public async processCommit(externalGroup: ExternalGroup, commit: Uint8Array): Promise { + return this.crypto.processCommit(externalGroup, commit) } } diff --git a/packages/sdk/src/mls/utils/mlsutils.ts b/packages/sdk/src/mls/utils/mlsutils.ts index f5b5ed7b23..121a48edcd 100644 --- a/packages/sdk/src/mls/utils/mlsutils.ts +++ b/packages/sdk/src/mls/utils/mlsutils.ts @@ -1,11 +1,20 @@ import { check } from '@river-build/dlog' import { IStreamStateView } from '../../streamStateView' -export function extractMlsExternalGroup(streamView: IStreamStateView): { +export type ExtractMlsExternalGroupResult = { externalGroupSnapshot: Uint8Array groupInfoMessage: Uint8Array commits: { commit: Uint8Array; groupInfoMessage: Uint8Array }[] -} { +} + +export function extractMlsExternalGroup( + streamView: IStreamStateView, +): ExtractMlsExternalGroupResult | undefined { + // check if there is group info at all + if (streamView.snapshot?.members?.mls?.groupInfoMessage === undefined) { + return undefined + } + const indexOfLastSnapshot = streamView.timeline.findLastIndex((event) => { const payload = event.remoteEvent?.event.payload if (payload?.case !== 'miniblockHeader') { diff --git a/packages/sdk/src/tests/multi_ne/mls.test.ts b/packages/sdk/src/tests/multi_ne/mls.test.ts index 54bee57168..134e92fb75 100644 --- a/packages/sdk/src/tests/multi_ne/mls.test.ts +++ b/packages/sdk/src/tests/multi_ne/mls.test.ts @@ -590,7 +590,7 @@ describe('mlsTests', () => { }) test('correct external group info is returned', async () => { - const externalGroupInfo = await bobClient.getMlsExternalGroupInfo(streamId) + const externalGroupInfo = (await bobClient.getMlsExternalGroupInfo(streamId))! const externalClient = new ExternalClient() const externalGroupSnapshot = ExternalSnapshot.fromBytes( externalGroupInfo.externalGroupSnapshot, diff --git a/packages/sdk/src/tests/unit/mls_epoch.test.ts b/packages/sdk/src/tests/unit/mls_epoch.test.ts index 2f601d3f89..23a0b2e462 100644 --- a/packages/sdk/src/tests/unit/mls_epoch.test.ts +++ b/packages/sdk/src/tests/unit/mls_epoch.test.ts @@ -26,8 +26,8 @@ describe('mlsEpochTests', () => { const streamId = 'stream' beforeEach(() => { - epochStore = new InMemoryEpochSecretStore(log.extend('store')) - epochService = new EpochSecretService(cipherSuite, epochStore, log.extend('service')) + epochStore = new InMemoryEpochSecretStore() + epochService = new EpochSecretService(cipherSuite, epochStore) }) it('shouldCreateEpochSecretService', () => { @@ -102,8 +102,8 @@ describe('mlsEpochTests', () => { // Create another service that seals it's epoch secret and gives us beforeEach(async () => { - epochStore2 = new InMemoryEpochSecretStore(log.extend('store2')) - epochService2 = new EpochSecretService(cipherSuite, epochStore2, log.extend('service2')) + epochStore2 = new InMemoryEpochSecretStore() + epochService2 = new EpochSecretService(cipherSuite, epochStore2) await epochService2.addOpenEpochSecret(streamId, epoch, secret) await epochService2.addOpenEpochSecret(streamId, epoch2, secret2) @@ -189,7 +189,7 @@ describe('mlsEpochTests', () => { describe('epochSecretStorage', () => { beforeEach(async () => { await epochService.addOpenEpochSecret(streamId, epoch, secret) - epochService = new EpochSecretService(cipherSuite, epochStore, log.extend('service2')) + epochService = new EpochSecretService(cipherSuite, epochStore) }) it('shouldLoadEpochSecretFromStorage', async () => { From 7258839dad764dd6a9386ae47c09156438ac1443 Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 18:34:50 +0100 Subject: [PATCH 16/17] Fix prettier issues --- packages/sdk/src/client.ts | 2 +- packages/sdk/src/mls/group/groupStore.ts | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 28ce628d42..8bd78c3f5a 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -152,7 +152,7 @@ import { SignerContext } from './signerContext' import { decryptAESGCM, deriveKeyAndIV, encryptAESGCM, uint8ArrayToBase64 } from './crypto_utils' import { makeTags, makeTipTags } from './tags' import { TipEventObject } from '@river-build/generated/dev/typings/ITipping' -import {extractMlsExternalGroup, ExtractMlsExternalGroupResult} from './mls/utils/mlsutils' +import { extractMlsExternalGroup, ExtractMlsExternalGroupResult } from './mls/utils/mlsutils' export type ClientEvents = StreamEvents & DecryptionEvents diff --git a/packages/sdk/src/mls/group/groupStore.ts b/packages/sdk/src/mls/group/groupStore.ts index f93858210b..f0ceef39ec 100644 --- a/packages/sdk/src/mls/group/groupStore.ts +++ b/packages/sdk/src/mls/group/groupStore.ts @@ -10,13 +10,9 @@ export interface IGroupStore { clearGroup(streamId: string): Promise } - export class InMemoryGroupStore implements IGroupStore { private groups: Map = new Map() - constructor() { - } - public async hasGroup(streamId: string): Promise { return this.groups.has(streamId) } From 716ecf8b7b43d44f3ee77935652963d618b26d0d Mon Sep 17 00:00:00 2001 From: Jakub Zalewski Date: Mon, 13 Jan 2025 18:36:17 +0100 Subject: [PATCH 17/17] Fix yarn lint issues --- packages/sdk/src/tests/unit/mls_epoch.test.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/sdk/src/tests/unit/mls_epoch.test.ts b/packages/sdk/src/tests/unit/mls_epoch.test.ts index 23a0b2e462..4c0003d08f 100644 --- a/packages/sdk/src/tests/unit/mls_epoch.test.ts +++ b/packages/sdk/src/tests/unit/mls_epoch.test.ts @@ -4,7 +4,6 @@ import { beforeEach, describe, expect } from 'vitest' import { CipherSuite } from '@river-build/mls-rs-wasm' -import { dlog } from '@river-build/dlog' import { EpochSecret, EpochSecretService, @@ -13,8 +12,6 @@ import { } from '../../mls/epoch' import { EncryptedData } from '@river-build/proto' -const log = dlog('test:mls:epoch') - const encoder = new TextEncoder() describe('mlsEpochTests', () => {