Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add drizzle persistence plugin #122

Merged
merged 2 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "indexer: add drizzle persistence plugin",
"packageName": "@apibara/indexer",
"email": "[email protected]",
"dependentChangeType": "patch"
}
46 changes: 45 additions & 1 deletion examples/cli/indexers/2-starknet.indexer.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,43 @@
import { defineIndexer, useSink } from "@apibara/indexer";
import { drizzlePersistence } from "@apibara/indexer/plugins/drizzle-persistence";
import { useLogger } from "@apibara/indexer/plugins/logger";
import { sqlite } from "@apibara/indexer/sinks/sqlite";
import { StarknetStream } from "@apibara/starknet";
import type { ApibaraRuntimeConfig } from "apibara/types";
import Database from "better-sqlite3";
import { sql } from "drizzle-orm";
import { drizzle } from "drizzle-orm/node-postgres";
import { Client } from "pg";
import { hash } from "starknet";

export default function (runtimeConfig: ApibaraRuntimeConfig) {
console.log("--> Starknet Indexer Runtime Config: ", runtimeConfig);
const database = new Database(runtimeConfig.databasePath);

// Sink Database
const database = new Database(runtimeConfig.databasePath);
database.exec("DROP TABLE IF EXISTS test");
database.exec(
"CREATE TABLE IF NOT EXISTS test (number TEXT, hash TEXT, _cursor BIGINT)",
);

// Persistence Database
const client = new Client({
connectionString: "postgres://postgres:postgres@localhost:5432/postgres",
});
const persistDatabase = drizzle(client);
Comment on lines +23 to +27
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Security & Configuration Issues in Database Setup

Several critical issues need to be addressed:

  1. Hardcoded database credentials in the connection string pose a security risk
  2. No error handling for database connection failures
  3. Missing configuration options for different environments (dev/staging/prod)

Consider implementing the following changes:

- const client = new Client({
-   connectionString: "postgres://postgres:postgres@localhost:5432/postgres",
- });
+ const client = new Client({
+   connectionString: runtimeConfig.persistenceConnectionString ?? 
+     process.env.PERSISTENCE_DB_URL ??
+     throw new Error("Database connection string not configured"),
+ });

Also, consider adding error handling:

try {
  await client.connect();
} catch (error) {
  logger.error("Failed to connect to persistence database:", error);
  throw error;
}


return defineIndexer(StarknetStream)({
streamUrl: "https://starknet.preview.apibara.org",
finality: "accepted",
startingCursor: {
orderKey: 800_000n,
},
plugins: [
drizzlePersistence({
database: persistDatabase,
indexerName: "2-starknet",
}),
],
sink: sqlite({ database, tableName: "test" }),
filter: {
events: [
Expand All @@ -42,5 +59,32 @@ export default function (runtimeConfig: ApibaraRuntimeConfig) {
// hash: header?.blockHash,
// }])
},
hooks: {
async "run:before"() {
await client.connect();

// Normally user will do migrations of both tables, which are defined in
// ```
// import { checkpoints, filters } from "@apibara/indexer/plugins/drizzle-persistence"
// ```,
// but just for quick testing and example we create them here directly

await persistDatabase.execute(sql`
CREATE TABLE IF NOT EXISTS checkpoints (
id TEXT NOT NULL PRIMARY KEY,
order_key INTEGER NOT NULL,
unique_key TEXT
);

CREATE TABLE IF NOT EXISTS filters (
id TEXT NOT NULL,
filter TEXT NOT NULL,
from_block INTEGER NOT NULL,
to_block INTEGER,
PRIMARY KEY (id, from_block)
);
`);
Comment on lines +72 to +86
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve Database Schema Management

Several improvements are recommended for the schema management:

  1. Use Drizzle's schema definitions instead of raw SQL
  2. Implement proper migrations
  3. Add indexes for better query performance

Consider refactoring to use Drizzle's schema definitions:

import { pgTable, text, integer } from 'drizzle-orm/pg-core';

export const checkpoints = pgTable('checkpoints', {
  id: text('id').primaryKey(),
  orderKey: integer('order_key').notNull(),
  uniqueKey: text('unique_key')
});

export const filters = pgTable('filters', {
  id: text('id').notNull(),
  filter: text('filter').notNull(),
  fromBlock: integer('from_block').notNull(),
  toBlock: integer('to_block'),
  // Add composite primary key
}, (table) => ({
  pk: primaryKey(table.id, table.fromBlock)
}));

},
},
});
}
4 changes: 4 additions & 0 deletions examples/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"devDependencies": {
"@types/better-sqlite3": "^7.6.11",
"@types/node": "^20.5.2",
"@types/pg": "^8.11.10",
"typescript": "^5.6.2",
"vitest": "^1.6.0"
},
Expand All @@ -25,8 +26,11 @@
"@apibara/indexer": "workspace:*",
"@apibara/protocol": "workspace:*",
"@apibara/starknet": "workspace:*",
"@electric-sql/pglite": "^0.2.14",
"apibara": "workspace:*",
"better-sqlite3": "^11.5.0",
"drizzle-orm": "^0.35.2",
"pg": "^8.12.0",
"starknet": "^6.11.0"
}
}
1 change: 1 addition & 0 deletions packages/indexer/build.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export default defineBuildConfig({
"./src/plugins/kv.ts",
"./src/plugins/logger.ts",
"./src/plugins/persistence.ts",
"./src/plugins/drizzle-persistence.ts",
],
clean: true,
outDir: "./dist",
Expand Down
8 changes: 8 additions & 0 deletions packages/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@
"import": "./dist/plugins/persistence.mjs",
"require": "./dist/plugins/persistence.cjs",
"default": "./dist/plugins/persistence.mjs"
},
"./plugins/drizzle-persistence": {
"types": "./dist/plugins/drizzle-persistence.d.ts",
"import": "./dist/plugins/drizzle-persistence.mjs",
"require": "./dist/plugins/drizzle-persistence.cjs",
"default": "./dist/plugins/drizzle-persistence.mjs"
}
},
"scripts": {
Expand All @@ -80,6 +86,7 @@
"test:ci": "vitest run"
},
"devDependencies": {
"@electric-sql/pglite": "^0.2.14",
"@types/better-sqlite3": "^7.6.11",
"@types/node": "^20.14.0",
"@types/pg": "^8.11.10",
Expand All @@ -102,6 +109,7 @@
"unctx": "^2.3.1"
},
"peerDependencies": {
"@electric-sql/pglite": "^0.2.14",
"better-sqlite3": "^11.5.0",
"csv-stringify": "^6.5.0",
"drizzle-orm": "^0.35.2",
Expand Down
192 changes: 192 additions & 0 deletions packages/indexer/src/plugins/drizzle-persistence.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import type { Cursor } from "@apibara/protocol";
import {
type ExtractTablesWithRelations,
type TablesRelationalConfig,
and,
eq,
isNull,
} from "drizzle-orm";
import {
type PgDatabase,
type PgQueryResultHKT,
integer,
pgTable,
primaryKey,
text,
} from "drizzle-orm/pg-core";
import { deserialize, serialize } from "../vcr";
import { defineIndexerPlugin } from "./config";

export const checkpoints = pgTable("checkpoints", {
id: text("id").notNull().primaryKey(),
orderKey: integer("order_key").notNull(),
uniqueKey: text("unique_key")
.$type<`0x${string}` | undefined>()
.notNull()
.default(undefined),
Comment on lines +24 to +26
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Inconsistency in Column Definition: uniqueKey

The uniqueKey column in the checkpoints table is defined with .notNull().default(undefined). Setting a default value of undefined for a NOT NULL column may not behave as intended. In SQL, undefined is not a valid default value.

Consider adjusting the column definition to allow uniqueKey to be nullable if appropriate:

-    .$type<`0x${string}` | undefined>()
-    .notNull()
-    .default(undefined),
+    .$type<`0x${string}` | null>()
+    .default(null),

Alternatively, if uniqueKey should always have a value, provide a valid default or ensure it's populated correctly when inserting records.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
.$type<`0x${string}` | undefined>()
.notNull()
.default(undefined),
.$type<`0x${string}` | null>()
.default(null),

});

export const filters = pgTable(
"filters",
{
id: text("id").notNull(),
filter: text("filter").notNull(),
fromBlock: integer("from_block").notNull(),
toBlock: integer("to_block"),
},
(table) => ({
pk: primaryKey({ columns: [table.id, table.fromBlock] }),
}),
);

export function drizzlePersistence<
TFilter,
TBlock,
TTxnParams,
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
>({
database,
indexerName = "default",
}: {
database: PgDatabase<TQueryResult, TFullSchema, TSchema>;
indexerName?: string;
}) {
return defineIndexerPlugin<TFilter, TBlock, TTxnParams>((indexer) => {
let store: DrizzlePersistence<TFilter, TQueryResult, TFullSchema, TSchema>;

indexer.hooks.hook("run:before", async () => {
store = new DrizzlePersistence(database, indexerName);
// Tables are created by user via migrations in Drizzle
});

indexer.hooks.hook("connect:before", async ({ request }) => {
const { cursor, filter } = await store.get();

if (cursor) {
request.startingCursor = cursor;
}

if (filter) {
request.filter[1] = filter;
}
});

indexer.hooks.hook("transaction:commit", async ({ endCursor }) => {
if (endCursor) {
await store.put({ cursor: endCursor });
}
});

indexer.hooks.hook("connect:factory", async ({ request, endCursor }) => {
if (request.filter[1]) {
await store.put({ cursor: endCursor, filter: request.filter[1] });
}
});
});
}

export class DrizzlePersistence<
TFilter,
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
TablesRelationalConfig = ExtractTablesWithRelations<TFullSchema>,
> {
constructor(
private _db: PgDatabase<TQueryResult, TFullSchema, TSchema>,
private _indexerName: string,
) {}

public async get(): Promise<{ cursor?: Cursor; filter?: TFilter }> {
const cursor = await this._getCheckpoint();
const filter = await this._getFilter();

return { cursor, filter };
}

public async put({ cursor, filter }: { cursor?: Cursor; filter?: TFilter }) {
if (cursor) {
await this._putCheckpoint(cursor);

if (filter) {
await this._putFilter(filter, cursor);
}
}
}

// --- CHECKPOINTS TABLE METHODS ---

private async _getCheckpoint(): Promise<Cursor | undefined> {
const rows = await this._db
.select()
.from(checkpoints)
.where(eq(checkpoints.id, this._indexerName));

const row = rows[0];
if (!row) return undefined;

return {
orderKey: BigInt(row.orderKey),
uniqueKey: row.uniqueKey,
};
}

private async _putCheckpoint(cursor: Cursor) {
await this._db
.insert(checkpoints)
.values({
id: this._indexerName,
orderKey: Number(cursor.orderKey),
uniqueKey: cursor.uniqueKey,
})
.onConflictDoUpdate({
target: checkpoints.id,
set: {
orderKey: Number(cursor.orderKey),
uniqueKey: cursor.uniqueKey,
},
});
}

// --- FILTERS TABLE METHODS ---

private async _getFilter(): Promise<TFilter | undefined> {
const rows = await this._db
.select()
.from(filters)
.where(and(eq(filters.id, this._indexerName), isNull(filters.toBlock)));

const row = rows[0];

if (!row) return undefined;

return deserialize(row.filter) as TFilter;
}

private async _putFilter(filter: TFilter, endCursor: Cursor) {
// Update existing filter's to_block
await this._db
.update(filters)
.set({ toBlock: Number(endCursor.orderKey) })
.where(and(eq(filters.id, this._indexerName), isNull(filters.toBlock)));

// Insert new filter
await this._db
.insert(filters)
.values({
id: this._indexerName,
filter: serialize(filter as Record<string, unknown>),
fromBlock: Number(endCursor.orderKey),
})
.onConflictDoUpdate({
target: [filters.id, filters.fromBlock],
set: {
filter: serialize(filter as Record<string, unknown>),
fromBlock: Number(endCursor.orderKey),
},
});
}
}
Loading