Skip to content

Commit

Permalink
batch query status
Browse files Browse the repository at this point in the history
  • Loading branch information
kyscott18 committed Jan 9, 2025
1 parent 9001a85 commit af25a96
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions packages/core/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,24 @@ export const client = ({ db }: { db: ReadonlyDrizzle<Schema> }) => {
// @ts-ignore
const session: PgSession = db._.session;
const listenConnection = global.PONDER_LISTEN_CONNECTION;
let statusResolver = promiseWithResolvers<void>();
let statusResolver = promiseWithResolvers<(typeof status.$inferSelect)[]>();

let queryPromise: Promise<any>;

if (listenConnection instanceof PGlite) {
queryPromise = listenConnection.query("LISTEN status_update_channel");

listenConnection.onNotification(() => {
statusResolver.resolve();
listenConnection.onNotification(async () => {
const result = await db.select().from(status);
statusResolver.resolve(result);
statusResolver = promiseWithResolvers();
});
} else {
queryPromise = listenConnection.query("LISTEN status_update_channel");

listenConnection.on("notification", () => {
statusResolver.resolve();
listenConnection.on("notification", async () => {
const result = await db.select().from(status);
statusResolver.resolve(result);
statusResolver = promiseWithResolvers();
});
}
Expand Down Expand Up @@ -66,12 +68,13 @@ export const client = ({ db }: { db: ReadonlyDrizzle<Schema> }) => {

await queryPromise;

let statusResult = await db.select().from(status);

return streamSSE(c, async (stream) => {
while (stream.closed === false) {
try {
const result = await db.select().from(status);
await stream.writeSSE({
data: JSON.stringify({ status: "success", result }),
data: JSON.stringify({ status: "success", result: statusResult }),
});
} catch (error) {
await stream.writeSSE({
Expand All @@ -81,7 +84,7 @@ export const client = ({ db }: { db: ReadonlyDrizzle<Schema> }) => {
}),
});
}
await statusResolver.promise;
statusResult = await statusResolver.promise;
}
});
}
Expand Down

0 comments on commit af25a96

Please sign in to comment.