Skip to content

Commit

Permalink
feat: add finalize & invalidation implementations and mongo persisten…
Browse files Browse the repository at this point in the history
…ce (#129)
fracek authored Dec 30, 2024
2 parents f878894 + 2b6d1df commit 3b77ab7
Showing 14 changed files with 2,174 additions and 87 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "option to get invalidate messages from mock generator",
"packageName": "@apibara/indexer",
"email": "jadejajaipal5@gmail.com",
"dependentChangeType": "patch"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "finalize and invalidation with tests",
"packageName": "@apibara/plugin-mongo",
"email": "jadejajaipal5@gmail.com",
"dependentChangeType": "patch"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "finalize and invalidation with tests",
"packageName": "@apibara/plugin-sqlite",
"email": "jadejajaipal5@gmail.com",
"dependentChangeType": "patch"
}
49 changes: 39 additions & 10 deletions packages/indexer/src/internal/testing.ts
Original file line number Diff line number Diff line change
@@ -10,16 +10,45 @@ import { useIndexerContext } from "../context";
import { type IndexerConfig, createIndexer, defineIndexer } from "../indexer";
import { type IndexerPlugin, defineIndexerPlugin } from "../plugins";

export function generateMockMessages(count = 10): MockStreamResponse[] {
return [...Array(count)].map((_, i) => ({
_tag: "data",
data: {
cursor: { orderKey: BigInt(5_000_000 + i - 1) },
finality: "accepted",
data: [{ data: `${5_000_000 + i}` }],
endCursor: { orderKey: BigInt(5_000_000 + i) },
},
}));
export type MockMessagesOptions = {
invalidate?: {
invalidateFromIndex: number;
invalidateTriggerIndex: number;
};
};

export function generateMockMessages(
count = 10,
options?: MockMessagesOptions,
): MockStreamResponse[] {
const invalidateAt = options?.invalidate;

const messages: MockStreamResponse[] = [];

for (let i = 0; i < count; i++) {
if (invalidateAt && i === invalidateAt.invalidateTriggerIndex) {
messages.push({
_tag: "invalidate",
invalidate: {
cursor: {
orderKey: BigInt(5_000_000 + invalidateAt.invalidateFromIndex),
},
},
});
} else {
messages.push({
_tag: "data",
data: {
cursor: { orderKey: BigInt(5_000_000 + i - 1) },
finality: "accepted",
data: [{ data: `${5_000_000 + i}` }],
endCursor: { orderKey: BigInt(5_000_000 + i) },
},
});
}
}

return messages;
}

export function getMockIndexer({
110 changes: 95 additions & 15 deletions packages/plugin-mongo/src/index.ts
Original file line number Diff line number Diff line change
@@ -3,6 +3,13 @@ import { defineIndexerPlugin } from "@apibara/indexer/plugins";
import type { DbOptions, MongoClient } from "mongodb";

import { finalize, invalidate } from "./mongo";
import {
finalizeState,
getState,
initializePersistentState,
invalidateState,
persistState,
} from "./persistence";
import { MongoStorage } from "./storage";
import { MongoStorageError, withTransaction } from "./utils";

@@ -28,53 +35,127 @@ export interface MongoStorageOptions {
dbOptions?: DbOptions;
collections: string[];
persistState?: boolean;
indexerName?: string;
}

/**
* Creates a plugin that uses MongoDB as the storage layer.
*
* Supports storing the indexer's state and provides a simple Key-Value store.
* @param options.client - The MongoDB client instance.
* @param options.dbName - The name of the database.
* @param options.dbOptions - The database options.
* @param options.collections - The collections to use.
* @param options.persistState - Whether to persist the indexer's state. Defaults to true.
* @param options.indexerName - The name of the indexer. Defaults value is 'default'.
*/
export function mongoStorage<TFilter, TBlock>({
client,
dbName,
dbOptions,
collections,
persistState: enablePersistence = true,
indexerName,
}: MongoStorageOptions) {
return defineIndexerPlugin<TFilter, TBlock>((indexer) => {
indexer.hooks.hook("message:finalize", async ({ message }) => {
const { cursor } = message.finalize;
indexer.hooks.hook("run:before", async () => {
await withTransaction(client, async (session) => {
const db = client.db(dbName, dbOptions);
if (enablePersistence) {
await initializePersistentState(db, session);
}
});
});

if (!cursor) {
throw new MongoStorageError("finalized cursor is undefined");
indexer.hooks.hook("connect:before", async ({ request }) => {
if (!enablePersistence) {
return;
}

await withTransaction(client, async (session) => {
const db = client.db(dbName, dbOptions);
await finalize(db, session, cursor, collections);
const { cursor, filter } = await getState<TFilter>({
db,
session,
indexerName,
});

if (cursor) {
request.startingCursor = cursor;
}

if (filter) {
request.filter[1] = filter;
}
});
});

indexer.hooks.hook("message:invalidate", async ({ message }) => {
const { cursor } = message.invalidate;
indexer.hooks.hook("connect:after", async ({ request }) => {
// On restart, we need to invalidate data for blocks that were processed but not persisted.
const cursor = request.startingCursor;

if (!cursor) {
throw new MongoStorageError("invalidate cursor is undefined");
return;
}

await withTransaction(client, async (session) => {
const db = client.db(dbName, dbOptions);
await invalidate(db, session, cursor, collections);

if (enablePersistence) {
await invalidateState({ db, session, cursor, indexerName });
}
});
});

indexer.hooks.hook("connect:after", async ({ request }) => {
// On restart, we need to invalidate data for blocks that were processed but not persisted.
const cursor = request.startingCursor;
indexer.hooks.hook("connect:factory", async ({ request, endCursor }) => {
if (!enablePersistence) {
return;
}
await withTransaction(client, async (session) => {
const db = client.db(dbName, dbOptions);
if (endCursor && request.filter[1]) {
await persistState({
db,
endCursor,
session,
filter: request.filter[1],
indexerName,
});
}
});
});

indexer.hooks.hook("message:finalize", async ({ message }) => {
const { cursor } = message.finalize;

if (!cursor) {
return;
throw new MongoStorageError("finalized cursor is undefined");
}

await withTransaction(client, async (session) => {
const db = client.db(dbName, dbOptions);
await finalize(db, session, cursor, collections);

if (enablePersistence) {
await finalizeState({ db, session, cursor, indexerName });
}
});
});

indexer.hooks.hook("message:invalidate", async ({ message }) => {
const { cursor } = message.invalidate;

if (!cursor) {
throw new MongoStorageError("invalidate cursor is undefined");
}

await withTransaction(client, async (session) => {
const db = client.db(dbName, dbOptions);
await invalidate(db, session, cursor, collections);

if (enablePersistence) {
await invalidateState({ db, session, cursor, indexerName });
}
});
});

@@ -91,11 +172,10 @@ export function mongoStorage<TFilter, TBlock>({
context[MONGO_PROPERTY] = new MongoStorage(db, session, endCursor);

await next();

delete context[MONGO_PROPERTY];

if (enablePersistence) {
// TODO: persist state
await persistState({ db, endCursor, session, indexerName });
}
});
});
Loading

0 comments on commit 3b77ab7

Please sign in to comment.