Skip to content

Commit

Permalink
Merge pull request #5515 from WalletConnect/fix/single-connection
Browse files Browse the repository at this point in the history
fix:  connection robustness and flaky tests
  • Loading branch information
ganchoradkov authored Dec 5, 2024
2 parents 7579dd9 + cfb6723 commit 4596280
Show file tree
Hide file tree
Showing 23 changed files with 552 additions and 457 deletions.
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"build:types": "tsc",
"build:source": "rollup --config rollup.config.js",
"build": "npm run build:pre; npm run build:source; npm run build:types",
"test:pre": "rm -rf ./test/*.db",
"test:pre": "rm -rf ./test/tmp",
"test:run": "vitest run --dir test",
"test": "npm run test:pre; npm run test:run",
"test:ignoreUnhandled": "npm run test:pre; npm run test:run -- --dangerouslyIgnoreUnhandledErrors",
Expand Down
129 changes: 82 additions & 47 deletions packages/core/src/controllers/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { HEARTBEAT_EVENTS } from "@walletconnect/heartbeat";
import { JsonRpcPayload, RequestArguments } from "@walletconnect/jsonrpc-types";
import { generateChildLogger, getLoggerContext, Logger } from "@walletconnect/logger";
import { RelayJsonRpc } from "@walletconnect/relay-api";
import { IPublisher, IRelayer, PublisherTypes, RelayerTypes } from "@walletconnect/types";
import { IPublisher, IRelayer, PublisherTypes } from "@walletconnect/types";
import {
getRelayProtocolApi,
getRelayProtocolName,
Expand All @@ -15,13 +15,17 @@ import { PUBLISHER_CONTEXT, PUBLISHER_DEFAULT_TTL, RELAYER_EVENTS } from "../con
import { getBigIntRpcId } from "@walletconnect/jsonrpc-utils";
import { ONE_MINUTE, ONE_SECOND, toMiliseconds } from "@walletconnect/time";

type IPublishType = PublisherTypes.Params & {
attestation?: string;
attempt: number;
};
export class Publisher extends IPublisher {
public events = new EventEmitter();
public name = PUBLISHER_CONTEXT;
public queue = new Map<string, PublisherTypes.Params>();
public queue = new Map<string, IPublishType>();

private publishTimeout = toMiliseconds(ONE_MINUTE);
private failedPublishTimeout = toMiliseconds(ONE_SECOND);
private initialPublishTimeout = toMiliseconds(ONE_SECOND * 15);
private needsTransportRestart = false;

constructor(public relayer: IRelayer, public logger: Logger) {
Expand Down Expand Up @@ -57,52 +61,65 @@ export class Publisher extends IPublisher {
},
};
const failedPublishMessage = `Failed to publish payload, please try again. id:${id} tag:${tag}`;
const startPublish = Date.now();
let result;
let attempts = 1;

try {
/**
* Loop until the publish is successful or the timeout is reached
* The loop allows to retry to retry the publish in case of disconnect
* attempt to publish the payload for <initialPublishTimeout> seconds,
* if the publish fails, add the payload to the queue and it will be retried on every pulse
* until it is successfully published or <publishTimeout> seconds have passed
*/
while (result === undefined) {
// Terminate the publishing attempts if publisTimeout has been exceeded
if (Date.now() - startPublish > this.publishTimeout) {
throw new Error(failedPublishMessage);
}

this.logger.trace({ id, attempts }, `publisher.publish - attempt ${attempts}`);
const publish = await createExpiringPromise(
this.rpcPublish(topic, message, ttl, relay, prompt, tag, id, opts?.attestation).catch(
(e) => this.logger.warn(e),
),
this.publishTimeout,
failedPublishMessage,
const publishPromise = new Promise(async (resolve) => {
const onPublish = ({ id }: { id: string }) => {
if (params.opts.id === id) {
this.removeRequestFromQueue(id);
this.relayer.events.removeListener(RELAYER_EVENTS.publish, onPublish);
resolve(params);
}
};
this.relayer.events.on(RELAYER_EVENTS.publish, onPublish);
const initialPublish = createExpiringPromise(
new Promise((resolve, reject) => {
this.rpcPublish({
topic,
message,
ttl,
prompt,
tag,
id,
attestation: opts?.attestation,
})
.then(resolve)
.catch((e) => {
this.logger.warn(e, e?.message);
reject(e);
});
}),
this.initialPublishTimeout,
`Failed initial publish, retrying.... id:${id} tag:${tag}`,
);
result = await publish;
attempts++;

if (!result) {
// small delay before retrying so we can limit retry to max 1 time per second
// if network is down `rpcPublish` will throw immediately
await new Promise((resolve) => setTimeout(resolve, this.failedPublishTimeout));
try {
await initialPublish;
this.events.removeListener(RELAYER_EVENTS.publish, onPublish);
} catch (e) {
this.queue.set(id, { ...params, attempt: 1 });
this.logger.warn(e, (e as Error)?.message);
}
}
this.relayer.events.emit(RELAYER_EVENTS.publish, params);
this.logger.debug(`Successfully Published Payload`);
});
this.logger.trace({
type: "method",
method: "publish",
params: { id, topic, message, opts },
});

await createExpiringPromise(publishPromise, this.publishTimeout, failedPublishMessage);
} catch (e) {
this.logger.debug(`Failed to Publish Payload`);
this.logger.error(e as any);
if (opts?.internal?.throwOnFailedPublish) {
throw e;
}
this.queue.set(id, params);
} finally {
this.queue.delete(id);
}
};

Expand All @@ -124,17 +141,17 @@ export class Publisher extends IPublisher {

// ---------- Private ----------------------------------------------- //

private rpcPublish(
topic: string,
message: string,
ttl: number,
relay: RelayerTypes.ProtocolOptions,
prompt?: boolean,
tag?: number,
id?: number,
attestation?: string,
) {
const api = getRelayProtocolApi(relay.protocol);
private async rpcPublish(params: {
topic: string;
message: string;
ttl?: number;
prompt?: boolean;
tag?: number;
id?: number;
attestation?: string;
}) {
const { topic, message, ttl = PUBLISHER_DEFAULT_TTL, prompt, tag, id, attestation } = params;
const api = getRelayProtocolApi(getRelayProtocolName().protocol);
const request: RequestArguments<RelayJsonRpc.PublishParams> = {
method: api.publish,
params: {
Expand All @@ -151,17 +168,35 @@ export class Publisher extends IPublisher {
if (isUndefined(request.params?.tag)) delete request.params?.tag;
this.logger.debug(`Outgoing Relay Payload`);
this.logger.trace({ type: "message", direction: "outgoing", request });
return this.relayer.request(request);
const result = await this.relayer.request(request);
this.relayer.events.emit(RELAYER_EVENTS.publish, params);
this.logger.debug(`Successfully Published Payload`);
return result;
}

private removeRequestFromQueue(id: string) {
this.queue.delete(id);
}

private checkQueue() {
this.queue.forEach(async (params) => {
const { topic, message, opts } = params;
await this.publish(topic, message, opts);
this.queue.forEach(async (params, id) => {
const attempt = params.attempt + 1;
this.queue.set(id, { ...params, attempt });
const { topic, message, opts, attestation } = params;
this.logger.warn(
{},
`Publisher: queue->publishing: ${params.opts.id}, tag: ${params.opts.tag}, attempt: ${attempt}`,
);
await this.rpcPublish({
topic,
message,
ttl: opts.ttl,
prompt: opts.prompt,
tag: opts.tag,
id: opts.id,
attestation,
});
this.logger.warn({}, `Publisher: queue->published: ${params.opts.id}`);
});
}

Expand Down
Loading

0 comments on commit 4596280

Please sign in to comment.