From c132911f62aa9a7a7434238a686274622f42dd13 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Mon, 30 Dec 2024 15:23:29 +0530 Subject: [PATCH] plugin-drizzle: add persistence --- packages/plugin-drizzle/src/persistence.ts | 292 +++++++++++---------- packages/plugin-drizzle/src/utils.ts | 40 +++ 2 files changed, 187 insertions(+), 145 deletions(-) diff --git a/packages/plugin-drizzle/src/persistence.ts b/packages/plugin-drizzle/src/persistence.ts index 03e96a64..72a367b7 100644 --- a/packages/plugin-drizzle/src/persistence.ts +++ b/packages/plugin-drizzle/src/persistence.ts @@ -1,22 +1,12 @@ -/* import type { Cursor } from "@apibara/protocol"; -import { - type ExtractTablesWithRelations, - type TablesRelationalConfig, - and, - eq, - isNull, +import { and, eq, gt, isNull, lt } from "drizzle-orm"; +import type { + ExtractTablesWithRelations, + TablesRelationalConfig, } from "drizzle-orm"; -import { - type PgDatabase, - type PgQueryResultHKT, - integer, - pgTable, - primaryKey, - text, -} from "drizzle-orm/pg-core"; -import { deserialize, serialize } from "../vcr"; -import { defineIndexerPlugin } from "./config"; +import type { PgQueryResultHKT, PgTransaction } from "drizzle-orm/pg-core"; +import { integer, pgTable, primaryKey, text } from "drizzle-orm/pg-core"; +import { DrizzleStorageError, deserialize, serialize } from "./utils"; export const checkpoints = pgTable("checkpoints", { id: text("id").notNull().primaryKey(), @@ -33,162 +23,174 @@ export const filters = pgTable( id: text("id").notNull(), filter: text("filter").notNull(), fromBlock: integer("from_block").notNull(), - toBlock: integer("to_block"), + toBlock: integer("to_block").$type().default(null), }, - (table) => ({ - pk: primaryKey({ columns: [table.id, table.fromBlock] }), - }), + (table) => [ + { + pk: primaryKey({ columns: [table.id, table.fromBlock] }), + }, + ], ); -export function drizzlePersistence< - TFilter, - TBlock, - TTxnParams, +export async function initializePersistentState< TQueryResult extends PgQueryResultHKT, TFullSchema extends Record = Record, TSchema extends TablesRelationalConfig = ExtractTablesWithRelations, ->({ - database, - indexerName = "default", -}: { - database: PgDatabase; - indexerName?: string; -}) { - return defineIndexerPlugin((indexer) => { - let store: DrizzlePersistence; - - indexer.hooks.hook("run:before", async () => { - store = new DrizzlePersistence(database, indexerName); - // Tables are created by user via migrations in Drizzle - }); - - indexer.hooks.hook("connect:before", async ({ request }) => { - const { cursor, filter } = await store.get(); - - if (cursor) { - request.startingCursor = cursor; - } - - if (filter) { - request.filter[1] = filter; - } - }); - - indexer.hooks.hook("transaction:commit", async ({ endCursor }) => { - if (endCursor) { - await store.put({ cursor: endCursor }); - } - }); - - indexer.hooks.hook("connect:factory", async ({ request, endCursor }) => { - if (request.filter[1]) { - await store.put({ cursor: endCursor, filter: request.filter[1] }); - } - }); - }); +>(tx: PgTransaction) { + try { + // Try to query both tables + await tx.select().from(checkpoints).limit(1); + await tx.select().from(filters).limit(1); + } catch (error) { + throw new DrizzleStorageError( + "Required tables 'checkpoints' and 'filters' not found for persistence.\nPlease run migrations with 'checkpoints' and 'filters' tables before initializing the plugin with persistence.", + ); + } } -export class DrizzlePersistence< +export async function persistState< TFilter, TQueryResult extends PgQueryResultHKT, TFullSchema extends Record = Record, TSchema extends TablesRelationalConfig = ExtractTablesWithRelations, -> { - constructor( - private _db: PgDatabase, - private _indexerName: string, - ) {} - - public async get(): Promise<{ cursor?: Cursor; filter?: TFilter }> { - const cursor = await this._getCheckpoint(); - const filter = await this._getFilter(); - - return { cursor, filter }; - } - - public async put({ cursor, filter }: { cursor?: Cursor; filter?: TFilter }) { - if (cursor) { - await this._putCheckpoint(cursor); - - if (filter) { - await this._putFilter(filter, cursor); - } - } - } - - // --- CHECKPOINTS TABLE METHODS --- - - private async _getCheckpoint(): Promise { - const rows = await this._db - .select() - .from(checkpoints) - .where(eq(checkpoints.id, this._indexerName)); - - const row = rows[0]; - if (!row) return undefined; - - return { - orderKey: BigInt(row.orderKey), - uniqueKey: row.uniqueKey, - }; - } +>(props: { + tx: PgTransaction; + endCursor: Cursor; + filter?: TFilter; + indexerName?: string; +}) { + const { tx, endCursor, filter, indexerName = "default" } = props; - private async _putCheckpoint(cursor: Cursor) { - await this._db + if (endCursor) { + await tx .insert(checkpoints) .values({ - id: this._indexerName, - orderKey: Number(cursor.orderKey), - uniqueKey: cursor.uniqueKey, + id: indexerName, + orderKey: Number(endCursor.orderKey), + uniqueKey: endCursor.uniqueKey, }) .onConflictDoUpdate({ target: checkpoints.id, set: { - orderKey: Number(cursor.orderKey), - uniqueKey: cursor.uniqueKey, + orderKey: Number(endCursor.orderKey), + uniqueKey: endCursor.uniqueKey, }, }); - } - // --- FILTERS TABLE METHODS --- + if (filter) { + await tx + .update(filters) + .set({ toBlock: Number(endCursor.orderKey) }) + .where(and(eq(filters.id, indexerName), isNull(filters.toBlock))); + + await tx + .insert(filters) + .values({ + id: indexerName, + filter: serialize(filter), + fromBlock: Number(endCursor.orderKey), + toBlock: null, + }) + .onConflictDoUpdate({ + target: [filters.id, filters.fromBlock], + set: { + filter: serialize(filter), + fromBlock: Number(endCursor.orderKey), + toBlock: null, + }, + }); + } + } +} - private async _getFilter(): Promise { - const rows = await this._db - .select() - .from(filters) - .where(and(eq(filters.id, this._indexerName), isNull(filters.toBlock))); +export async function getState< + TFilter, + TQueryResult extends PgQueryResultHKT, + TFullSchema extends Record = Record, + TSchema extends + TablesRelationalConfig = ExtractTablesWithRelations, +>(props: { + tx: PgTransaction; + indexerName?: string; +}): Promise<{ cursor?: Cursor; filter?: TFilter }> { + const { tx, indexerName = "default" } = props; + + const checkpointRows = await tx + .select() + .from(checkpoints) + .where(eq(checkpoints.id, indexerName)); + + const cursor = checkpointRows[0] + ? { + orderKey: BigInt(checkpointRows[0].orderKey), + uniqueKey: checkpointRows[0].uniqueKey, + } + : undefined; - const row = rows[0]; + const filterRows = await tx + .select() + .from(filters) + .where(and(eq(filters.id, indexerName), isNull(filters.toBlock))); - if (!row) return undefined; + const filter = filterRows[0] + ? deserialize(filterRows[0].filter) + : undefined; - return deserialize(row.filter) as TFilter; - } + return { cursor, filter }; +} - private async _putFilter(filter: TFilter, endCursor: Cursor) { - // Update existing filter's to_block - await this._db - .update(filters) - .set({ toBlock: Number(endCursor.orderKey) }) - .where(and(eq(filters.id, this._indexerName), isNull(filters.toBlock))); +export async function invalidateState< + TQueryResult extends PgQueryResultHKT, + TFullSchema extends Record = Record, + TSchema extends + TablesRelationalConfig = ExtractTablesWithRelations, +>(props: { + tx: PgTransaction; + cursor: Cursor; + indexerName?: string; +}) { + const { tx, cursor, indexerName = "default" } = props; + + await tx + .delete(filters) + .where( + and( + eq(filters.id, indexerName), + gt(filters.fromBlock, Number(cursor.orderKey)), + ), + ); + + await tx + .update(filters) + .set({ toBlock: null }) + .where( + and( + eq(filters.id, indexerName), + gt(filters.toBlock, Number(cursor.orderKey)), + ), + ); +} - // Insert new filter - await this._db - .insert(filters) - .values({ - id: this._indexerName, - filter: serialize(filter as Record), - fromBlock: Number(endCursor.orderKey), - }) - .onConflictDoUpdate({ - target: [filters.id, filters.fromBlock], - set: { - filter: serialize(filter as Record), - fromBlock: Number(endCursor.orderKey), - }, - }); - } +export async function finalizeState< + TQueryResult extends PgQueryResultHKT, + TFullSchema extends Record = Record, + TSchema extends + TablesRelationalConfig = ExtractTablesWithRelations, +>(props: { + tx: PgTransaction; + cursor: Cursor; + indexerName?: string; +}) { + const { tx, cursor, indexerName = "default" } = props; + + await tx + .delete(filters) + .where( + and( + eq(filters.id, indexerName), + lt(filters.toBlock, Number(cursor.orderKey)), + ), + ); } -*/ diff --git a/packages/plugin-drizzle/src/utils.ts b/packages/plugin-drizzle/src/utils.ts index 7af22ac2..c7eb26ef 100644 --- a/packages/plugin-drizzle/src/utils.ts +++ b/packages/plugin-drizzle/src/utils.ts @@ -1,6 +1,46 @@ +import type { + ExtractTablesWithRelations, + TablesRelationalConfig, +} from "drizzle-orm"; +import type { + PgDatabase, + PgQueryResultHKT, + PgTransaction, +} from "drizzle-orm/pg-core"; + export class DrizzleStorageError extends Error { constructor(message: string) { super(message); this.name = "DrizzleStorageError"; } } + +export async function withTransaction< + TQueryResult extends PgQueryResultHKT, + TFullSchema extends Record = Record, + TSchema extends + TablesRelationalConfig = ExtractTablesWithRelations, +>( + db: PgDatabase, + cb: (db: PgTransaction) => Promise, +) { + return await db.transaction(async (txnDb) => { + return await cb(txnDb); + }); +} + +export function deserialize(str: string): T { + return JSON.parse(str, (_, value) => + typeof value === "string" && value.match(/^\d+n$/) + ? BigInt(value.slice(0, -1)) + : value, + ) as T; +} + +export function serialize(obj: T): string { + return JSON.stringify( + obj, + (_, value) => (typeof value === "bigint" ? `${value.toString()}n` : value), + "\t", + ); +}