Skip to content

Commit

Permalink
fix: try to restart websocket streams
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Trost <[email protected]>
  • Loading branch information
galexrt committed Aug 28, 2024
1 parent 86874fa commit 080f368
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 4,914 deletions.
22 changes: 19 additions & 3 deletions app/composables/grpcws/transports/websocket/websocketChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ export function WebsocketChannelTransport(logger: ILogger, webSocket: UseWebSock
opts.debug && logger.debug('Websocket factory triggered');
if (webSocket.status.value === 'CLOSED') {
webSocket.open();

if (wsChannel.activeStreams.size > 0) {
wsChannel.activeStreams.forEach((stream) => {
if (!stream[1].isStream) {
return;
}

stream[1].start(new Metadata());
});
}
}

return wsChannel.getStream(opts);
Expand All @@ -22,6 +32,9 @@ export function WebsocketChannelTransport(logger: ILogger, webSocket: UseWebSock

interface GrpcStream extends Transport {
readonly streamId: number;
readonly service: string;
readonly method: string;
readonly isStream: boolean;
}

interface WebsocketChannel {
Expand All @@ -33,7 +46,7 @@ class WebsocketChannelImpl implements WebsocketChannel {
private logger: ILogger;
protected ws: UseWebSocketReturn<any>;
readonly activeStreams = new Map<number, [TransportOptions, GrpcStream]>();
protected streamId = 1;
protected lastStreamId = 1;

constructor(logger: ILogger, ws: UseWebSocketReturn<any>) {
this.logger = logger;
Expand Down Expand Up @@ -127,7 +140,7 @@ class WebsocketChannelImpl implements WebsocketChannel {
}

getStream(opts: TransportOptions): GrpcStream {
let currentStreamId = this.streamId++;
let currentStreamId = this.lastStreamId++;
const self = this;

async function sendToWebsocket(toSend: GrpcFrame): Promise<void> {
Expand All @@ -149,9 +162,12 @@ class WebsocketChannelImpl implements WebsocketChannel {
return frame;
}

//question: can this structure be reused or is it one time use?
// Question: can this structure be reused or is it one time use?
const stream = {
streamId: currentStreamId,
service: opts.methodDefinition.service.typeName,
method: opts.methodDefinition.name,
isStream: opts.methodDefinition.serverStreaming || opts.methodDefinition.clientStreaming,

start: (metadata: Metadata) => {
opts.debug &&
Expand Down
3 changes: 2 additions & 1 deletion app/lang/de/de.json
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@
"pin": "Anheften",
"unpin": "Lösen",
"pinned_document": "Angeheftetes Dokument | Angeheftete Dokumente",
"or": "ODER"
"or": "ODER",
"retrying": "Erneuter Versuch..."
},
"components": {
"partials": {
Expand Down
Loading

0 comments on commit 080f368

Please sign in to comment.