Skip to content

Commit

Permalink
plugin-drizzle: add drizzle plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
jaipaljadeja committed Dec 30, 2024
1 parent 03729dd commit 1d65169
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 21 deletions.
9 changes: 9 additions & 0 deletions packages/plugin-drizzle/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: "3"
services:
timescaledb:
image: timescale/timescaledb-ha:pg14-latest
restart: always
environment:
POSTGRES_PASSWORD: postgres
ports:
- "5432:5432"
9 changes: 7 additions & 2 deletions packages/plugin-drizzle/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,22 @@
"test": "vitest",
"test:ci": "vitest run"
},
"peerDependencies": {
"drizzle-orm": "^0.37.0",
"pg": "^8.13.1"
},
"devDependencies": {
"@electric-sql/pglite": "^0.2.14",
"@types/node": "^20.14.0",
"@types/pg": "^8.11.10",
"drizzle-orm": "^0.37.0",
"pg": "^8.13.1",
"unbuild": "^2.0.0",
"vitest": "^1.6.0"
},
"dependencies": {
"@apibara/indexer": "workspace:*",
"@apibara/protocol": "workspace:*",
"drizzle-orm": "^0.37.0",
"pg": "^8.13.1",
"postgres-range": "^1.1.4"
}
}
224 changes: 221 additions & 3 deletions packages/plugin-drizzle/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,223 @@
import { DrizzleStorageError } from "./utils";
import { useIndexerContext } from "@apibara/indexer";
import { defineIndexerPlugin } from "@apibara/indexer/plugins";

export function drizzleStorage<TFilter, TBlock>() {
throw new DrizzleStorageError("Not implemented");
import type {
ExtractTablesWithRelations,
TablesRelationalConfig,
} from "drizzle-orm";

import type {
PgDatabase,
PgQueryResultHKT,
PgTransaction,
} from "drizzle-orm/pg-core";
import {
finalizeState,
getState,
initializePersistentState,
invalidateState,
persistState,
} from "./persistence";
import {
initializeReorgRollbackTable,
registerTriggers,
removeTriggers,
} from "./storage";
import { DrizzleStorageError, withTransaction } from "./utils";

const DRIZZLE_PROPERTY = "_drizzle";

export type DrizzleStorage<
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
> = {
db: PgTransaction<TQueryResult, TFullSchema, TSchema>;
};

export function useDrizzleStorage<
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
>(
_db: PgDatabase<TQueryResult, TFullSchema, TSchema>,
): DrizzleStorage<TQueryResult, TFullSchema, TSchema> {
const context = useIndexerContext();

if (!context[DRIZZLE_PROPERTY]) {
throw new DrizzleStorageError(
"drizzle storage is not available. Did you register the plugin?",
);
}

return context[DRIZZLE_PROPERTY];
}

export interface DrizzleStorageOptions<
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
> {
db: PgDatabase<TQueryResult, TFullSchema, TSchema>;
persistState?: boolean;
indexerName?: string;
schema?: Record<string, unknown>;
idColumn?: string;
}

export function drizzleStorage<
TFilter,
TBlock,
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
>({
db,
persistState: enablePersistence = true,
indexerName,
schema = {},
idColumn = "id",
}: DrizzleStorageOptions<TQueryResult, TFullSchema, TSchema>) {
return defineIndexerPlugin<TFilter, TBlock>((indexer) => {
const tableNames = Object.keys(schema ?? db._.schema);

indexer.hooks.hook("run:before", async () => {
await withTransaction(db, async (tx) => {
await initializeReorgRollbackTable(tx);
if (enablePersistence) {
await initializePersistentState(tx);
}
});
});

indexer.hooks.hook("connect:before", async ({ request }) => {
if (!enablePersistence) {
return;
}

await withTransaction(db, async (tx) => {
const { cursor, filter } = await getState<
TFilter,
TQueryResult,
TFullSchema,
TSchema
>({
tx,
indexerName,
});
if (cursor) {
request.startingCursor = cursor;
}
if (filter) {
request.filter[1] = filter;
}
});
});

indexer.hooks.hook("connect:after", async ({ request }) => {
// On restart, we need to invalidate data for blocks that were processed but not persisted.
const cursor = request.startingCursor;

if (!cursor) {
return;
}

await withTransaction(db, async (tx) => {
// TODO: Implement invalidate for drizzle/reorg table maybe?
// await invalidate(db, cursor, tables);

if (enablePersistence) {
await invalidateState({ tx, cursor, indexerName });
}
});
});

indexer.hooks.hook("connect:factory", async ({ request, endCursor }) => {
if (!enablePersistence) {
return;
}
await withTransaction(db, async (tx) => {
if (endCursor && request.filter[1]) {
await persistState({
tx,
endCursor,
filter: request.filter[1],
indexerName,
});
}
});
});

indexer.hooks.hook("message:finalize", async ({ message }) => {
const { cursor } = message.finalize;

if (!cursor) {
throw new DrizzleStorageError("finalized cursor is undefined");
}

await withTransaction(db, async (tx) => {
// TODO: Implement finalize for drizzle/reorg table maybe?
// await finalize(db, cursor, tables);

if (enablePersistence) {
await finalizeState({ tx, cursor, indexerName });
}
});
});

indexer.hooks.hook("message:invalidate", async ({ message }) => {
const { cursor } = message.invalidate;

if (!cursor) {
throw new DrizzleStorageError("invalidate cursor is undefined");
}

await withTransaction(db, async (tx) => {
// TODO: Implement invalidate for drizzle/reorg table maybe?
// await invalidate(db, cursor, tables);

if (enablePersistence) {
await invalidateState({ tx, cursor, indexerName });
}
});
});

indexer.hooks.hook("handler:middleware", async ({ use }) => {
use(async (context, next) => {
const { endCursor } = context;

if (!endCursor) {
throw new DrizzleStorageError("end cursor is undefined");
}

await withTransaction(db, async (tx) => {
context[DRIZZLE_PROPERTY] = { db: tx } as DrizzleStorage<
TQueryResult,
TFullSchema,
TSchema
>;

await registerTriggers(tx, tableNames, endCursor, idColumn);

await next();
delete context[DRIZZLE_PROPERTY];

if (enablePersistence) {
await persistState({
tx,
endCursor,
indexerName,
});
}
});

// remove trigger outside of the transaction or it won't be triggered.
await removeTriggers(db, tableNames);
});
});
});
}
7 changes: 0 additions & 7 deletions packages/plugin-drizzle/src/persistence.test.ts

This file was deleted.

87 changes: 87 additions & 0 deletions packages/plugin-drizzle/src/storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import type { Cursor } from "@apibara/protocol";
import type {
ExtractTablesWithRelations,
TablesRelationalConfig,
} from "drizzle-orm";
import type {
PgDatabase,
PgQueryResultHKT,
PgTransaction,
} from "drizzle-orm/pg-core";

export async function initializeReorgRollbackTable<
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
>(tx: PgTransaction<TQueryResult, TFullSchema, TSchema>) {
// Create the audit log table
await tx.execute(`
CREATE TABLE IF NOT EXISTS __reorg_rollback(
n SERIAL PRIMARY KEY,
op CHAR(1) NOT NULL,
table_name TEXT NOT NULL,
cursor INTEGER NOT NULL,
row_id TEXT,
row_value JSONB
);
`);

// Create the trigger function
await tx.execute(`
CREATE OR REPLACE FUNCTION reorg_checkpoint()
RETURNS TRIGGER AS $$
DECLARE
id_col TEXT := TG_ARGV[0]::TEXT;
order_key INTEGER := TG_ARGV[1]::INTEGER;
new_id_value TEXT := row_to_json(NEW.*)->>id_col;
old_id_value TEXT := row_to_json(OLD.*)->>id_col;
BEGIN
IF (TG_OP = 'DELETE') THEN
INSERT INTO __reorg_rollback(op, table_name, cursor, row_id, row_value)
SELECT 'D', TG_TABLE_NAME, order_key, old_id_value, row_to_json(OLD.*);
ELSIF (TG_OP = 'UPDATE') THEN
INSERT INTO __reorg_rollback(op, table_name, cursor, row_id, row_value)
SELECT 'U', TG_TABLE_NAME, order_key, new_id_value, row_to_json(OLD.*);
ELSIF (TG_OP = 'INSERT') THEN
INSERT INTO __reorg_rollback(op, table_name, cursor, row_id, row_value)
SELECT 'I', TG_TABLE_NAME, order_key, new_id_value, null;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
`);
}

export async function registerTriggers<
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
>(
tx: PgTransaction<TQueryResult, TFullSchema, TSchema>,
tables: string[],
endCursor: Cursor,
idColumn: string,
) {
for (const table of tables) {
await tx.execute(`
DROP TRIGGER IF EXISTS ${table}_reorg ON ${table};
CREATE CONSTRAINT TRIGGER ${table}_reorg
AFTER INSERT OR UPDATE OR DELETE ON ${table}
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE FUNCTION reorg_checkpoint('${idColumn}', ${Number(endCursor.orderKey)});
`);
}
}

export async function removeTriggers<
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
>(db: PgDatabase<TQueryResult, TFullSchema, TSchema>, tables: string[]) {
for (const table of tables) {
await db.execute(`DROP TRIGGER IF EXISTS ${table}_reorg ON ${table};`);
}
}
4 changes: 1 addition & 3 deletions packages/plugin-drizzle/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
"outDir": "dist",
"declarationDir": "dist",
"noEmit": false,
"rootDir": "src",
"types": ["node"]
},
"include": ["src/"]
}
}
15 changes: 9 additions & 6 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 1d65169

Please sign in to comment.