Skip to content

Commit

Permalink
plugin-drizzle: add persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
jaipaljadeja committed Dec 30, 2024
1 parent 1d65169 commit c132911
Showing 2 changed files with 187 additions and 145 deletions.
292 changes: 147 additions & 145 deletions packages/plugin-drizzle/src/persistence.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
/*
import type { Cursor } from "@apibara/protocol";
import {
type ExtractTablesWithRelations,
type TablesRelationalConfig,
and,
eq,
isNull,
import { and, eq, gt, isNull, lt } from "drizzle-orm";
import type {
ExtractTablesWithRelations,
TablesRelationalConfig,
} from "drizzle-orm";
import {
type PgDatabase,
type PgQueryResultHKT,
integer,
pgTable,
primaryKey,
text,
} from "drizzle-orm/pg-core";
import { deserialize, serialize } from "../vcr";
import { defineIndexerPlugin } from "./config";
import type { PgQueryResultHKT, PgTransaction } from "drizzle-orm/pg-core";
import { integer, pgTable, primaryKey, text } from "drizzle-orm/pg-core";
import { DrizzleStorageError, deserialize, serialize } from "./utils";

export const checkpoints = pgTable("checkpoints", {
id: text("id").notNull().primaryKey(),
@@ -33,162 +23,174 @@ export const filters = pgTable(
id: text("id").notNull(),
filter: text("filter").notNull(),
fromBlock: integer("from_block").notNull(),
toBlock: integer("to_block"),
toBlock: integer("to_block").$type<number | null>().default(null),
},
(table) => ({
pk: primaryKey({ columns: [table.id, table.fromBlock] }),
}),
(table) => [
{
pk: primaryKey({ columns: [table.id, table.fromBlock] }),
},
],
);

export function drizzlePersistence<
TFilter,
TBlock,
TTxnParams,
export async function initializePersistentState<
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
>({
database,
indexerName = "default",
}: {
database: PgDatabase<TQueryResult, TFullSchema, TSchema>;
indexerName?: string;
}) {
return defineIndexerPlugin<TFilter, TBlock, TTxnParams>((indexer) => {
let store: DrizzlePersistence<TFilter, TQueryResult, TFullSchema, TSchema>;
indexer.hooks.hook("run:before", async () => {
store = new DrizzlePersistence(database, indexerName);
// Tables are created by user via migrations in Drizzle
});
indexer.hooks.hook("connect:before", async ({ request }) => {
const { cursor, filter } = await store.get();
if (cursor) {
request.startingCursor = cursor;
}
if (filter) {
request.filter[1] = filter;
}
});
indexer.hooks.hook("transaction:commit", async ({ endCursor }) => {
if (endCursor) {
await store.put({ cursor: endCursor });
}
});
indexer.hooks.hook("connect:factory", async ({ request, endCursor }) => {
if (request.filter[1]) {
await store.put({ cursor: endCursor, filter: request.filter[1] });
}
});
});
>(tx: PgTransaction<TQueryResult, TFullSchema, TSchema>) {
try {
// Try to query both tables
await tx.select().from(checkpoints).limit(1);
await tx.select().from(filters).limit(1);
} catch (error) {
throw new DrizzleStorageError(
"Required tables 'checkpoints' and 'filters' not found for persistence.\nPlease run migrations with 'checkpoints' and 'filters' tables before initializing the plugin with persistence.",
);
}
}

export class DrizzlePersistence<
export async function persistState<
TFilter,
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
> {
constructor(
private _db: PgDatabase<TQueryResult, TFullSchema, TSchema>,
private _indexerName: string,
) {}
public async get(): Promise<{ cursor?: Cursor; filter?: TFilter }> {
const cursor = await this._getCheckpoint();
const filter = await this._getFilter();
return { cursor, filter };
}
public async put({ cursor, filter }: { cursor?: Cursor; filter?: TFilter }) {
if (cursor) {
await this._putCheckpoint(cursor);
if (filter) {
await this._putFilter(filter, cursor);
}
}
}
// --- CHECKPOINTS TABLE METHODS ---
private async _getCheckpoint(): Promise<Cursor | undefined> {
const rows = await this._db
.select()
.from(checkpoints)
.where(eq(checkpoints.id, this._indexerName));
const row = rows[0];
if (!row) return undefined;
return {
orderKey: BigInt(row.orderKey),
uniqueKey: row.uniqueKey,
};
}
>(props: {
tx: PgTransaction<TQueryResult, TFullSchema, TSchema>;
endCursor: Cursor;
filter?: TFilter;
indexerName?: string;
}) {
const { tx, endCursor, filter, indexerName = "default" } = props;

private async _putCheckpoint(cursor: Cursor) {
await this._db
if (endCursor) {
await tx
.insert(checkpoints)
.values({
id: this._indexerName,
orderKey: Number(cursor.orderKey),
uniqueKey: cursor.uniqueKey,
id: indexerName,
orderKey: Number(endCursor.orderKey),
uniqueKey: endCursor.uniqueKey,
})
.onConflictDoUpdate({
target: checkpoints.id,
set: {
orderKey: Number(cursor.orderKey),
uniqueKey: cursor.uniqueKey,
orderKey: Number(endCursor.orderKey),
uniqueKey: endCursor.uniqueKey,
},
});
}

// --- FILTERS TABLE METHODS ---
if (filter) {
await tx
.update(filters)
.set({ toBlock: Number(endCursor.orderKey) })
.where(and(eq(filters.id, indexerName), isNull(filters.toBlock)));

await tx
.insert(filters)
.values({
id: indexerName,
filter: serialize(filter),
fromBlock: Number(endCursor.orderKey),
toBlock: null,
})
.onConflictDoUpdate({
target: [filters.id, filters.fromBlock],
set: {
filter: serialize(filter),
fromBlock: Number(endCursor.orderKey),
toBlock: null,
},
});
}
}
}

private async _getFilter(): Promise<TFilter | undefined> {
const rows = await this._db
.select()
.from(filters)
.where(and(eq(filters.id, this._indexerName), isNull(filters.toBlock)));
export async function getState<
TFilter,
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
>(props: {
tx: PgTransaction<TQueryResult, TFullSchema, TSchema>;
indexerName?: string;
}): Promise<{ cursor?: Cursor; filter?: TFilter }> {
const { tx, indexerName = "default" } = props;

const checkpointRows = await tx
.select()
.from(checkpoints)
.where(eq(checkpoints.id, indexerName));

const cursor = checkpointRows[0]
? {
orderKey: BigInt(checkpointRows[0].orderKey),
uniqueKey: checkpointRows[0].uniqueKey,
}
: undefined;

const row = rows[0];
const filterRows = await tx
.select()
.from(filters)
.where(and(eq(filters.id, indexerName), isNull(filters.toBlock)));

if (!row) return undefined;
const filter = filterRows[0]
? deserialize<TFilter>(filterRows[0].filter)
: undefined;

return deserialize(row.filter) as TFilter;
}
return { cursor, filter };
}

private async _putFilter(filter: TFilter, endCursor: Cursor) {
// Update existing filter's to_block
await this._db
.update(filters)
.set({ toBlock: Number(endCursor.orderKey) })
.where(and(eq(filters.id, this._indexerName), isNull(filters.toBlock)));
export async function invalidateState<
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
>(props: {
tx: PgTransaction<TQueryResult, TFullSchema, TSchema>;
cursor: Cursor;
indexerName?: string;
}) {
const { tx, cursor, indexerName = "default" } = props;

await tx
.delete(filters)
.where(
and(
eq(filters.id, indexerName),
gt(filters.fromBlock, Number(cursor.orderKey)),
),
);

await tx
.update(filters)
.set({ toBlock: null })
.where(
and(
eq(filters.id, indexerName),
gt(filters.toBlock, Number(cursor.orderKey)),
),
);
}

// Insert new filter
await this._db
.insert(filters)
.values({
id: this._indexerName,
filter: serialize(filter as Record<string, unknown>),
fromBlock: Number(endCursor.orderKey),
})
.onConflictDoUpdate({
target: [filters.id, filters.fromBlock],
set: {
filter: serialize(filter as Record<string, unknown>),
fromBlock: Number(endCursor.orderKey),
},
});
}
export async function finalizeState<
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
>(props: {
tx: PgTransaction<TQueryResult, TFullSchema, TSchema>;
cursor: Cursor;
indexerName?: string;
}) {
const { tx, cursor, indexerName = "default" } = props;

await tx
.delete(filters)
.where(
and(
eq(filters.id, indexerName),
lt(filters.toBlock, Number(cursor.orderKey)),
),
);
}
*/
40 changes: 40 additions & 0 deletions packages/plugin-drizzle/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,46 @@
import type {
ExtractTablesWithRelations,
TablesRelationalConfig,
} from "drizzle-orm";
import type {
PgDatabase,
PgQueryResultHKT,
PgTransaction,
} from "drizzle-orm/pg-core";

export class DrizzleStorageError extends Error {
constructor(message: string) {
super(message);
this.name = "DrizzleStorageError";
}
}

export async function withTransaction<
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
>(
db: PgDatabase<TQueryResult, TFullSchema, TSchema>,
cb: (db: PgTransaction<TQueryResult, TFullSchema, TSchema>) => Promise<void>,
) {
return await db.transaction(async (txnDb) => {
return await cb(txnDb);
});
}

export function deserialize<T>(str: string): T {
return JSON.parse(str, (_, value) =>
typeof value === "string" && value.match(/^\d+n$/)
? BigInt(value.slice(0, -1))
: value,
) as T;
}

export function serialize<T>(obj: T): string {
return JSON.stringify(
obj,
(_, value) => (typeof value === "bigint" ? `${value.toString()}n` : value),
"\t",
);
}

0 comments on commit c132911

Please sign in to comment.