diff --git a/package-lock.json b/package-lock.json index bcabfed20..e2b756ed1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7955,17 +7955,13 @@ "link": true }, "node_modules/@walletconnect/relay-api": { - "version": "1.0.9", - "license": "MIT", + "version": "1.0.10", + "resolved": "https://registry.npmjs.org/@walletconnect/relay-api/-/relay-api-1.0.10.tgz", + "integrity": "sha512-tqrdd4zU9VBNqUaXXQASaexklv6A54yEyQQEXYOCr+Jz8Ket0dmPBDyg19LVSNUN2cipAghQc45/KVmfFJ0cYw==", "dependencies": { - "@walletconnect/jsonrpc-types": "^1.0.2", - "tslib": "1.14.1" + "@walletconnect/jsonrpc-types": "^1.0.2" } }, - "node_modules/@walletconnect/relay-api/node_modules/tslib": { - "version": "1.14.1", - "license": "0BSD" - }, "node_modules/@walletconnect/relay-auth": { "version": "1.0.4", "license": "MIT", @@ -27484,7 +27480,7 @@ "@walletconnect/jsonrpc-ws-connection": "1.0.14", "@walletconnect/keyvaluestorage": "1.1.1", "@walletconnect/logger": "2.1.2", - "@walletconnect/relay-api": "1.0.9", + "@walletconnect/relay-api": "1.0.10", "@walletconnect/relay-auth": "1.0.4", "@walletconnect/safe-json": "1.0.2", "@walletconnect/time": "1.0.2", @@ -27566,7 +27562,7 @@ "@aws-sdk/client-cloudwatch": "3.450.0", "@walletconnect/jsonrpc-provider": "1.0.13", "@walletconnect/jsonrpc-ws-connection": "1.0.14", - "@walletconnect/relay-api": "1.0.9" + "@walletconnect/relay-api": "1.0.10" } }, "packages/types": { @@ -27592,7 +27588,7 @@ "@stablelib/random": "1.0.2", "@stablelib/sha256": "1.0.1", "@stablelib/x25519": "1.0.3", - "@walletconnect/relay-api": "1.0.9", + "@walletconnect/relay-api": "1.0.10", "@walletconnect/safe-json": "1.0.2", "@walletconnect/time": "1.0.2", "@walletconnect/types": "2.12.2", @@ -33641,7 +33637,7 @@ "@walletconnect/jsonrpc-ws-connection": "1.0.14", "@walletconnect/keyvaluestorage": "1.1.1", "@walletconnect/logger": "2.1.2", - "@walletconnect/relay-api": "1.0.9", + "@walletconnect/relay-api": "1.0.10", "@walletconnect/relay-auth": "1.0.4", "@walletconnect/safe-json": "1.0.2", "@walletconnect/time": "1.0.2", @@ -33860,15 +33856,11 @@ } }, "@walletconnect/relay-api": { - "version": "1.0.9", + "version": "1.0.10", + "resolved": "https://registry.npmjs.org/@walletconnect/relay-api/-/relay-api-1.0.10.tgz", + "integrity": "sha512-tqrdd4zU9VBNqUaXXQASaexklv6A54yEyQQEXYOCr+Jz8Ket0dmPBDyg19LVSNUN2cipAghQc45/KVmfFJ0cYw==", "requires": { - "@walletconnect/jsonrpc-types": "^1.0.2", - "tslib": "1.14.1" - }, - "dependencies": { - "tslib": { - "version": "1.14.1" - } + "@walletconnect/jsonrpc-types": "^1.0.2" } }, "@walletconnect/relay-auth": { @@ -33909,7 +33901,7 @@ "@walletconnect/jsonrpc-utils": "1.0.8", "@walletconnect/jsonrpc-ws-connection": "1.0.14", "@walletconnect/logger": "2.1.2", - "@walletconnect/relay-api": "1.0.9", + "@walletconnect/relay-api": "1.0.10", "@walletconnect/time": "1.0.2", "@walletconnect/types": "2.12.2", "@walletconnect/utils": "2.12.2", @@ -34156,7 +34148,7 @@ "@stablelib/sha256": "1.0.1", "@stablelib/x25519": "1.0.3", "@types/lodash.isequal": "4.5.6", - "@walletconnect/relay-api": "1.0.9", + "@walletconnect/relay-api": "1.0.10", "@walletconnect/safe-json": "1.0.2", "@walletconnect/time": "1.0.2", "@walletconnect/types": "2.12.2", diff --git a/packages/core/package.json b/packages/core/package.json index 7a2319183..de84c6326 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -38,7 +38,7 @@ "@walletconnect/jsonrpc-ws-connection": "1.0.14", "@walletconnect/keyvaluestorage": "1.1.1", "@walletconnect/logger": "2.1.2", - "@walletconnect/relay-api": "1.0.9", + "@walletconnect/relay-api": "1.0.10", "@walletconnect/relay-auth": "1.0.4", "@walletconnect/safe-json": "1.0.2", "@walletconnect/time": "1.0.2", diff --git a/packages/core/src/controllers/relayer.ts b/packages/core/src/controllers/relayer.ts index 0df202d78..32f6f2538 100644 --- a/packages/core/src/controllers/relayer.ts +++ b/packages/core/src/controllers/relayer.ts @@ -334,6 +334,23 @@ export class Relayer extends IRelayer { throw new Error("No internet connection detected. Please restart your network and try again."); } + public async handleBatchMessageEvents(messages: RelayerTypes.MessageEvent[]) { + if (messages?.length === 0) { + this.logger.trace("Batch message events is empty. Ignoring..."); + return; + } + const sortedMessages = messages.sort((a, b) => a.publishedAt - b.publishedAt); + this.logger.trace(`Batch of ${sortedMessages.length} message events sorted`); + for (const message of sortedMessages) { + try { + await this.onMessageEvent(message); + } catch (e) { + this.logger.warn(e); + } + } + this.logger.trace(`Batch of ${sortedMessages.length} message events processed`); + } + // ---------- Private ----------------------------------------------- // /* * In Node, we must detect when the connection is stalled and terminate it. diff --git a/packages/core/src/controllers/subscriber.ts b/packages/core/src/controllers/subscriber.ts index a5e15528f..e327906a6 100644 --- a/packages/core/src/controllers/subscriber.ts +++ b/packages/core/src/controllers/subscriber.ts @@ -47,6 +47,8 @@ export class Subscriber extends ISubscriber { private restartInProgress = false; private clientId: string; private batchSubscribeTopicsLimit = 500; + private pendingBatchMessages: RelayerTypes.MessageEvent[] = []; + constructor(public relayer: IRelayer, public logger: Logger) { super(relayer, logger); this.relayer = relayer; @@ -263,6 +265,33 @@ export class Subscriber extends ISubscriber { } } + private async rpcBatchFetchMessages(subscriptions: SubscriberTypes.Params[]) { + if (!subscriptions.length) return; + const relay = subscriptions[0].relay; + const api = getRelayProtocolApi(relay.protocol); + const request: RequestArguments = { + method: api.batchFetchMessages, + params: { + topics: subscriptions.map((s) => s.topic), + }, + }; + this.logger.debug(`Outgoing Relay Payload`); + this.logger.trace({ type: "payload", direction: "outgoing", request }); + let result; + try { + const fetchMessagesPromise = await createExpiringPromise( + this.relayer.request(request).catch((e) => this.logger.warn(e)), + this.subscribeTimeout, + ); + result = (await fetchMessagesPromise) as { + messages: RelayerTypes.MessageEvent[]; + }; + } catch (err) { + this.relayer.events.emit(RELAYER_EVENTS.connection_stalled); + } + return result; + } + private rpcUnsubscribe(topic: string, id: string, relay: RelayerTypes.ProtocolOptions) { const api = getRelayProtocolApi(relay.protocol); const request: RequestArguments = { @@ -361,9 +390,10 @@ export class Subscriber extends ISubscriber { private async reset() { if (this.cached.length) { - const batches = Math.ceil(this.cached.length / this.batchSubscribeTopicsLimit); - for (let i = 0; i < batches; i++) { + const numOfBatches = Math.ceil(this.cached.length / this.batchSubscribeTopicsLimit); + for (let i = 0; i < numOfBatches; i++) { const batch = this.cached.splice(0, this.batchSubscribeTopicsLimit); + await this.batchFetchMessages(batch); await this.batchSubscribe(batch); } } @@ -399,6 +429,15 @@ export class Subscriber extends ISubscriber { this.onBatchSubscribe(result.map((id, i) => ({ ...subscriptions[i], id }))); } + private async batchFetchMessages(subscriptions: SubscriberTypes.Params[]) { + if (!subscriptions.length) return; + this.logger.trace(`Fetching batch messages for ${subscriptions.length} subscriptions`); + const response = await this.rpcBatchFetchMessages(subscriptions); + if (response && response.messages) { + this.pendingBatchMessages = this.pendingBatchMessages.concat(response.messages); + } + } + private async onConnect() { await this.restart(); this.onEnable(); @@ -416,6 +455,11 @@ export class Subscriber extends ISubscriber { pendingSubscriptions.push(params); }); await this.batchSubscribe(pendingSubscriptions); + + if (this.pendingBatchMessages.length) { + await this.relayer.handleBatchMessageEvents(this.pendingBatchMessages); + this.pendingBatchMessages = []; + } } private registerEventListeners() { diff --git a/packages/core/test/subscriber.spec.ts b/packages/core/test/subscriber.spec.ts index a4cbf99ba..7b236f93a 100644 --- a/packages/core/test/subscriber.spec.ts +++ b/packages/core/test/subscriber.spec.ts @@ -36,6 +36,55 @@ describe("Subscriber", () => { await disconnectSocket(core.relayer); }); + describe("init", () => { + it("should call batch fetch messages on init when it has cached topics", async () => { + const requestSpy: Sinon.SinonSpy = Sinon.spy(() => { + return {}; + }); + subscriber.relayer.provider.request = requestSpy; + + const topic = generateRandomBytes32(); + // manually switch off the subscriber + // @ts-expect-error + subscriber.onDisconnect(); + // add a topic to the subscriber as if it was loaded from persistence + // @ts-expect-error + subscriber.cached = [{ topic, relay: { protocol: "irn" } }]; + + // restart the subscriber + // @ts-expect-error + subscriber.onConnect(); + + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // first req should be the batch fetch messages call followed by the batch subscribe call + expect(requestSpy.getCalls().length).toBe(2); + expect(requestSpy.getCalls()[0].args[0].method).toBe("irn_batchFetchMessages"); + expect(requestSpy.getCalls()[1].args[0].method).toBe("irn_batchSubscribe"); + expect( + requestSpy.calledWith( + Sinon.match({ + method: "irn_batchFetchMessages", + params: { + topics: [topic], + }, + }), + ), + ).to.be.true; + + expect( + requestSpy.calledWith( + Sinon.match({ + method: "irn_batchSubscribe", + params: { + topics: [topic], + }, + }), + ), + ).to.be.true; + }); + }); + describe("storageKey", () => { it("provides the expected default `storageKey` format", () => { const subscriber = new Subscriber(relayer, logger); diff --git a/packages/sign-client/package.json b/packages/sign-client/package.json index f1f954e1f..e41b18a2e 100644 --- a/packages/sign-client/package.json +++ b/packages/sign-client/package.json @@ -52,6 +52,6 @@ "@aws-sdk/client-cloudwatch": "3.450.0", "@walletconnect/jsonrpc-provider": "1.0.13", "@walletconnect/jsonrpc-ws-connection": "1.0.14", - "@walletconnect/relay-api": "1.0.9" + "@walletconnect/relay-api": "1.0.10" } } diff --git a/packages/types/src/core/relayer.ts b/packages/types/src/core/relayer.ts index 9ae9a94c4..316d12fbb 100644 --- a/packages/types/src/core/relayer.ts +++ b/packages/types/src/core/relayer.ts @@ -118,4 +118,5 @@ export abstract class IRelayer extends IEvents { public abstract transportOpen(relayUrl?: string): Promise; public abstract restartTransport(relayUrl?: string): Promise; public abstract confirmOnlineStateOrThrow(): Promise; + public abstract handleBatchMessageEvents(messages: RelayerTypes.MessageEvent[]): Promise; } diff --git a/packages/utils/package.json b/packages/utils/package.json index 691177bfc..d0bdfa1b6 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -36,7 +36,7 @@ "@stablelib/random": "1.0.2", "@stablelib/sha256": "1.0.1", "@stablelib/x25519": "1.0.3", - "@walletconnect/relay-api": "1.0.9", + "@walletconnect/relay-api": "1.0.10", "@walletconnect/safe-json": "1.0.2", "@walletconnect/time": "1.0.2", "@walletconnect/types": "2.12.2",