diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 35752844..14bc7622 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -33,6 +33,12 @@ jobs: uses: arduino/setup-protoc@v1 with: repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Start MongoDB + uses: supercharge/mongodb-github-action@1.11.0 + with: + mongodb-version: '7.0' + mongodb-replica-set: rs0 + mongodb-port: 27017 - name: Install dependencies run: pnpm install --strict-peer-dependencies=false - name: Run lint diff --git a/change/@apibara-indexer-b2b2208a-7a4e-4779-826d-c246b023891a.json b/change/@apibara-indexer-b2b2208a-7a4e-4779-826d-c246b023891a.json new file mode 100644 index 00000000..c865174e --- /dev/null +++ b/change/@apibara-indexer-b2b2208a-7a4e-4779-826d-c246b023891a.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "indexer: rename drizzle sink factory and fix update method", + "packageName": "@apibara/indexer", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/change/@apibara-sink-mongo-0e59869f-bf56-417c-b432-622a2c1bb1da.json b/change/@apibara-sink-mongo-0e59869f-bf56-417c-b432-622a2c1bb1da.json new file mode 100644 index 00000000..09bc3e73 --- /dev/null +++ b/change/@apibara-sink-mongo-0e59869f-bf56-417c-b432-622a2c1bb1da.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "sink-mongo: add mongodb sink", + "packageName": "@apibara/sink-mongo", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/examples/cli/indexers/1-evm.indexer.ts b/examples/cli/indexers/1-evm.indexer.ts index 19042c0e..9d064606 100644 --- a/examples/cli/indexers/1-evm.indexer.ts +++ b/examples/cli/indexers/1-evm.indexer.ts @@ -2,7 +2,7 @@ import { EvmStream } from "@apibara/evm"; import { defineIndexer, useSink } from "@apibara/indexer"; import { drizzlePersistence } from "@apibara/indexer/plugins/drizzle-persistence"; import { useLogger } from "@apibara/indexer/plugins/logger"; -import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle"; +import { drizzleSink } from "@apibara/indexer/sinks/drizzle"; import type { ApibaraRuntimeConfig } from "apibara/types"; import type { diff --git a/examples/cli/indexers/2-starknet.indexer.ts b/examples/cli/indexers/2-starknet.indexer.ts index b6f052d4..28dc3853 100644 --- a/examples/cli/indexers/2-starknet.indexer.ts +++ b/examples/cli/indexers/2-starknet.indexer.ts @@ -1,7 +1,7 @@ import { defineIndexer, useSink } from "@apibara/indexer"; import { drizzlePersistence } from "@apibara/indexer/plugins/drizzle-persistence"; import { useLogger } from "@apibara/indexer/plugins/logger"; -import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle"; +import { drizzleSink } from "@apibara/indexer/sinks/drizzle"; import { StarknetStream } from "@apibara/starknet"; import type { ApibaraRuntimeConfig } from "apibara/types"; diff --git a/examples/indexer/src/indexer.ts b/examples/indexer/src/indexer.ts index 274c8948..1ac23b46 100644 --- a/examples/indexer/src/indexer.ts +++ b/examples/indexer/src/indexer.ts @@ -1,6 +1,6 @@ import { EvmStream } from "@apibara/evm"; import { defineIndexer, useSink } from "@apibara/indexer"; -import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle"; +import { drizzleSink } from "@apibara/indexer/sinks/drizzle"; import consola from "consola"; import { pgTable, serial, text, varchar } from "drizzle-orm/pg-core"; import { drizzle } from "drizzle-orm/postgres-js"; diff --git a/examples/starknet-indexer/src/indexer.ts b/examples/starknet-indexer/src/indexer.ts index f1ebc774..f48a0c33 100644 --- a/examples/starknet-indexer/src/indexer.ts +++ b/examples/starknet-indexer/src/indexer.ts @@ -1,8 +1,5 @@ import { defineIndexer, useSink } from "@apibara/indexer"; -import { - drizzle as drizzleSink, - pgIndexerTable, -} from "@apibara/indexer/sinks/drizzle"; +import { drizzleSink, pgIndexerTable } from "@apibara/indexer/sinks/drizzle"; import { StarknetStream } from "@apibara/starknet"; import consola from "consola"; import { bigint } from "drizzle-orm/pg-core"; diff --git a/packages/indexer/build.config.ts b/packages/indexer/build.config.ts index 964198df..01fb35c3 100644 --- a/packages/indexer/build.config.ts +++ b/packages/indexer/build.config.ts @@ -13,6 +13,7 @@ export default defineBuildConfig({ "./src/plugins/logger.ts", "./src/plugins/persistence.ts", "./src/plugins/drizzle-persistence.ts", + "./src/internal/testing.ts", ], clean: true, outDir: "./dist", diff --git a/packages/indexer/docker-compose.yaml b/packages/indexer/docker-compose.yaml new file mode 100644 index 00000000..5246acac --- /dev/null +++ b/packages/indexer/docker-compose.yaml @@ -0,0 +1,10 @@ +version: "3.8" + +services: + postgres: + image: postgres:16 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + ports: + - 5432:5432 diff --git a/packages/indexer/package.json b/packages/indexer/package.json index 5bebbbe8..435fbd07 100644 --- a/packages/indexer/package.json +++ b/packages/indexer/package.json @@ -75,6 +75,12 @@ "import": "./dist/plugins/drizzle-persistence.mjs", "require": "./dist/plugins/drizzle-persistence.cjs", "default": "./dist/plugins/drizzle-persistence.mjs" + }, + "./internal": { + "types": "./dist/internal/testing.d.ts", + "import": "./dist/internal/testing.mjs", + "require": "./dist/internal/testing.cjs", + "default": "./dist/internal/testing.mjs" } }, "scripts": { diff --git a/packages/indexer/src/sinks/drizzle/drizzle.test.ts b/packages/indexer/src/sinks/drizzle/drizzle.test.ts index 39a8acb2..f2c2ff67 100644 --- a/packages/indexer/src/sinks/drizzle/drizzle.test.ts +++ b/packages/indexer/src/sinks/drizzle/drizzle.test.ts @@ -13,7 +13,7 @@ import { run } from "../../indexer"; import { generateMockMessages, getMockIndexer } from "../../internal/testing"; import { useSink } from "../../sink"; import type { Int8Range } from "./Int8Range"; -import { drizzle as drizzleSink } from "./drizzle"; +import { drizzleSink } from "./drizzle"; import { getDrizzleCursor, pgIndexerTable } from "./utils"; const testTable = pgIndexerTable("test_table", { @@ -35,7 +35,7 @@ describe("Drizzle Test", () => { await db.execute(sql`DROP TABLE IF EXISTS test_table`); // create test_table with db await db.execute( - sql`CREATE TABLE test_table (id SERIAL PRIMARY KEY, data TEXT, _cursor INT8RANGE)`, + sql`CREATE TABLE test_table (id SERIAL, data TEXT, _cursor INT8RANGE)`, ); }); @@ -108,11 +108,14 @@ describe("Drizzle Test", () => { const result = await db.select().from(testTable).orderBy(asc(testTable.id)); - expect(result).toHaveLength(5); - expect(result[2].data).toBe("0000000"); + expect(result).toHaveLength(6); + expect( + result.find((r) => r.id === 5000002 && r._cursor?.range.upper === null) + ?.data, + ).toBe("0000000"); }); - it("should delete data", async () => { + it("should soft delete data", async () => { const client = new MockClient((request, options) => { return generateMockMessages(5); }); diff --git a/packages/indexer/src/sinks/drizzle/drizzle.ts b/packages/indexer/src/sinks/drizzle/drizzle.ts index 03e1597b..79cc8bf5 100644 --- a/packages/indexer/src/sinks/drizzle/drizzle.ts +++ b/packages/indexer/src/sinks/drizzle/drizzle.ts @@ -121,7 +121,7 @@ export class DrizzleSink< } } -export const drizzle = < +export const drizzleSink = < TQueryResult extends PgQueryResultHKT, TFullSchema extends Record = Record, TSchema extends diff --git a/packages/indexer/src/sinks/drizzle/update.ts b/packages/indexer/src/sinks/drizzle/update.ts index 24537fcd..3cea116f 100644 --- a/packages/indexer/src/sinks/drizzle/update.ts +++ b/packages/indexer/src/sinks/drizzle/update.ts @@ -12,6 +12,8 @@ import type { PgUpdateBase, PgUpdateSetSource, } from "drizzle-orm/pg-core"; +import type { Int8Range } from "./Int8Range"; +import { getDrizzleCursor } from "./utils"; export class DrizzleSinkUpdate< TTable extends PgTable, @@ -32,15 +34,36 @@ export class DrizzleSinkUpdate< return { ...originalSet, where: async (where: SQL | undefined) => { - await this.db - .update(this.table) - .set({ - _cursor: sql`int8range(lower(_cursor), ${Number(this.endCursor?.orderKey!)}, '[)')`, - } as PgUpdateSetSource) + // 1. Find and store old versions of matching records + const oldRecords = await this.db + .select() + .from(this.table) .where(sql`${where ? sql`${where} AND ` : sql``}upper_inf(_cursor)`) .execute(); - return originalSet.where(where); + // 2. Insert old versions with updated upperbound cursor + if (oldRecords.length > 0) { + const oldRecordsWithNewCursor = oldRecords.map((record) => ({ + ...record, + _cursor: getDrizzleCursor([ + BigInt((record._cursor as Int8Range).range.lower!), + this.endCursor?.orderKey, + ]), + })); + + await this.db + .insert(this.table) + .values(oldRecordsWithNewCursor) + .execute(); + } + + // 3. Update matching records with new values and new 'lowerbound' cursor + return originalUpdate + .set({ + ...values, + _cursor: sql`int8range(${Number(this.endCursor?.orderKey!)}, NULL, '[)')`, + } as PgUpdateSetSource) + .where(sql`${where ? sql`${where} AND ` : sql``}upper_inf(_cursor)`); }, } as PgUpdateBase; } diff --git a/packages/sink-mongo/README.md b/packages/sink-mongo/README.md new file mode 100644 index 00000000..c2f26904 --- /dev/null +++ b/packages/sink-mongo/README.md @@ -0,0 +1,7 @@ +# `@apibara/sink-mongo` + +TODO + +## Installation + +TODO diff --git a/packages/sink-mongo/build.config.ts b/packages/sink-mongo/build.config.ts new file mode 100644 index 00000000..9aaddef8 --- /dev/null +++ b/packages/sink-mongo/build.config.ts @@ -0,0 +1,11 @@ +import { defineBuildConfig } from "unbuild"; + +export default defineBuildConfig({ + entries: ["./src/index.ts"], + clean: true, + outDir: "./dist", + declaration: true, + rollup: { + emitCJS: true, + }, +}); diff --git a/packages/sink-mongo/docker-compose.orbstack.yaml b/packages/sink-mongo/docker-compose.orbstack.yaml new file mode 100644 index 00000000..0fd930e4 --- /dev/null +++ b/packages/sink-mongo/docker-compose.orbstack.yaml @@ -0,0 +1,26 @@ +# Reference: https://medium.com/workleap/the-only-local-mongodb-replica-set-with-docker-compose-guide-youll-ever-need-2f0b74dd8384 + +version: "3.8" + +services: + mongo1: + image: mongo:7.0 + command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"] + ports: + - 27017:27017 + extra_hosts: + - "localhost:host-gateway" + healthcheck: + test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'localhost:27017'}]}) }" | mongosh --port 27017 --quiet + interval: 5s + timeout: 30s + start_period: 0s + start_interval: 1s + retries: 30 + volumes: + - "mongo1_data:/data/db" + - "mongo1_config:/data/configdb" + +volumes: + mongo1_data: + mongo1_config: diff --git a/packages/sink-mongo/docker-compose.yaml b/packages/sink-mongo/docker-compose.yaml new file mode 100644 index 00000000..d2e07aa3 --- /dev/null +++ b/packages/sink-mongo/docker-compose.yaml @@ -0,0 +1,26 @@ +# Reference: https://medium.com/workleap/the-only-local-mongodb-replica-set-with-docker-compose-guide-youll-ever-need-2f0b74dd8384 + +version: "3.8" + +services: + mongo1: + image: mongo:7.0 + command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"] + ports: + - 27017:27017 + extra_hosts: + - "host.docker.internal:host-gateway" + healthcheck: + test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'host.docker.internal:27017'}]}) }" | mongosh --port 27017 --quiet + interval: 5s + timeout: 30s + start_period: 0s + start_interval: 1s + retries: 30 + volumes: + - "mongo1_data:/data/db" + - "mongo1_config:/data/configdb" + +volumes: + mongo1_data: + mongo1_config: diff --git a/packages/sink-mongo/package.json b/packages/sink-mongo/package.json new file mode 100644 index 00000000..eed2f223 --- /dev/null +++ b/packages/sink-mongo/package.json @@ -0,0 +1,41 @@ +{ + "name": "@apibara/sink-mongo", + "version": "2.0.0-beta.26", + "type": "module", + "files": [ + "dist", + "src", + "README.md" + ], + "main": "./dist/index.mjs", + "types": "./dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.mjs", + "require": "./dist/index.cjs", + "default": "./dist/index.mjs" + } + }, + "scripts": { + "build": "unbuild", + "typecheck": "tsc --noEmit", + "lint": "biome check .", + "lint:fix": "pnpm lint --write", + "test": "vitest", + "test:ci": "vitest run" + }, + "devDependencies": { + "@types/node": "^20.14.0", + "mongodb": "^6.12.0", + "unbuild": "^2.0.0", + "vitest": "^1.6.0" + }, + "peerDependencies": { + "mongodb": "^6.12.0" + }, + "dependencies": { + "@apibara/indexer": "workspace:*", + "@apibara/protocol": "workspace:*" + } +} diff --git a/packages/sink-mongo/src/collection.ts b/packages/sink-mongo/src/collection.ts new file mode 100644 index 00000000..f0dc3969 --- /dev/null +++ b/packages/sink-mongo/src/collection.ts @@ -0,0 +1,235 @@ +import type { Cursor } from "@apibara/protocol"; +import type { + BulkWriteOptions, + ClientSession, + Collection, + DeleteOptions, + Document, + Filter, + FindCursor, + FindOneAndUpdateOptions, + FindOptions, + InsertManyResult, + InsertOneOptions, + InsertOneResult, + MatchKeysAndValues, + OptionalUnlessRequiredId, + UpdateFilter, + UpdateOptions, + UpdateResult, + WithId, +} from "mongodb"; + +export type MongoCursor = { + from: number | null; + to: number | null; +}; + +export type CursoredSchema = TSchema & { + _cursor: MongoCursor; +}; + +export class MongoSinkCollection { + constructor( + private session: ClientSession, + private collection: Collection, + private endCursor?: Cursor, + ) {} + + async insertOne( + doc: OptionalUnlessRequiredId, + options?: InsertOneOptions, + ): Promise> { + return await this.collection.insertOne( + { + ...doc, + _cursor: { + from: Number(this.endCursor?.orderKey), + to: null, + } as MongoCursor, + }, + { ...options, session: this.session }, + ); + } + + async insertMany( + docs: ReadonlyArray>, + options?: BulkWriteOptions, + ): Promise> { + return await this.collection.insertMany( + docs.map((doc) => ({ + ...doc, + _cursor: { + from: Number(this.endCursor?.orderKey), + to: null, + } as MongoCursor, + })), + { ...options, session: this.session }, + ); + } + + async updateOne( + filter: Filter, + update: UpdateFilter, + options?: UpdateOptions, + ): Promise> { + // 1. Find and update the document, getting the old version + const oldDoc = await this.collection.findOneAndUpdate( + { + ...filter, + "_cursor.to": null, + }, + { + ...update, + $set: { + ...update.$set, + "_cursor.from": Number(this.endCursor?.orderKey), + } as unknown as MatchKeysAndValues, + }, + { + ...options, + session: this.session, + returnDocument: "before", + } as FindOneAndUpdateOptions, + ); + + // 2. If we found and updated a document, insert its old version + if (oldDoc) { + const { _id, ...doc } = oldDoc; + await this.collection.insertOne( + { + ...doc, + _cursor: { + ...oldDoc._cursor, + to: Number(this.endCursor?.orderKey), + }, + } as unknown as OptionalUnlessRequiredId, + { session: this.session }, + ); + } + + // 3. Return an UpdateResult-compatible object + return { + acknowledged: true, + modifiedCount: oldDoc ? 1 : 0, + upsertedId: null, + upsertedCount: 0, + matchedCount: oldDoc ? 1 : 0, + }; + } + + async updateMany( + filter: Filter, + update: UpdateFilter, + options?: UpdateOptions, + ): Promise> { + // 1. Find all documents matching the filter that are latest (to: null) + const oldDocs = await this.collection + .find( + { + ...filter, + "_cursor.to": null, + }, + { session: this.session }, + ) + .toArray(); + + // 2. Update to the new values with updateMany + // (setting _cursor.from to endCursor, leaving _cursor.to unchanged) + const updateResult = await this.collection.updateMany( + { + ...filter, + "_cursor.to": null, + }, + { + ...update, + $set: { + ...update.$set, + "_cursor.from": Number(this.endCursor?.orderKey), + } as unknown as MatchKeysAndValues, + }, + { ...options, session: this.session }, + ); + + // 3. Adjust the cursor.to of the old values + const oldDocsWithUpdatedCursor = oldDocs.map(({ _id, ...doc }) => ({ + ...doc, + _cursor: { + ...doc._cursor, + to: Number(this.endCursor?.orderKey), + }, + })); + + // 4. Insert the old values back into the db + if (oldDocsWithUpdatedCursor.length > 0) { + await this.collection.insertMany( + oldDocsWithUpdatedCursor as unknown as OptionalUnlessRequiredId[], + { session: this.session }, + ); + } + + return updateResult; + } + + async deleteOne( + filter: Filter, + options?: DeleteOptions, + ): Promise> { + return await this.collection.updateOne( + { + ...filter, + "_cursor.to": null, + }, + { + $set: { + "_cursor.to": Number(this.endCursor?.orderKey), + } as unknown as MatchKeysAndValues, + }, + { ...options, session: this.session }, + ); + } + + async deleteMany( + filter?: Filter, + options?: DeleteOptions, + ): Promise> { + return await this.collection.updateMany( + { + ...((filter ?? {}) as Filter), + "_cursor.to": null, + }, + { + $set: { + "_cursor.to": Number(this.endCursor?.orderKey), + } as unknown as MatchKeysAndValues, + }, + { ...options, session: this.session }, + ); + } + + async findOne( + filter: Filter, + options?: Omit, + ): Promise | null> { + return await this.collection.findOne( + { + ...filter, + "_cursor.to": null, + }, + { ...options, session: this.session }, + ); + } + + find( + filter: Filter, + options?: FindOptions, + ): FindCursor> { + return this.collection.find( + { + ...filter, + "_cursor.to": null, + }, + { ...options, session: this.session }, + ); + } +} diff --git a/packages/sink-mongo/src/index.ts b/packages/sink-mongo/src/index.ts new file mode 100644 index 00000000..69f0b58f --- /dev/null +++ b/packages/sink-mongo/src/index.ts @@ -0,0 +1,3 @@ +export * from "./mongo"; +export * from "./transaction"; +export * from "./collection"; diff --git a/packages/sink-mongo/src/mongo.test.ts b/packages/sink-mongo/src/mongo.test.ts new file mode 100644 index 00000000..21555ce1 --- /dev/null +++ b/packages/sink-mongo/src/mongo.test.ts @@ -0,0 +1,279 @@ +import { run, useSink } from "@apibara/indexer"; +import { + generateMockMessages, + getMockIndexer, +} from "@apibara/indexer/internal"; +import type { Cursor } from "@apibara/protocol"; +import { + type MockBlock, + MockClient, + type MockFilter, +} from "@apibara/protocol/testing"; +import { MongoClient, type WithId } from "mongodb"; +import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest"; +import { mongoSink } from "./mongo"; + +const TEST_COLLECTION = "test_collection"; +const mongoClient = new MongoClient( + "mongodb://localhost:27017/?replicaSet=rs0", +); + +interface Schema { + blockNumber: number; + data: string | undefined; + _cursor?: { + from: number; + to: number | null; + }; +} + +const db = mongoClient.db("test"); + +describe("MongoDB Sink Test", () => { + beforeAll(async () => { + await mongoClient.connect(); + }); + + beforeEach(async () => { + await db.collection(TEST_COLLECTION).deleteMany({}); + }); + + it("should insert data", async () => { + const client = new MockClient((request, options) => { + return generateMockMessages(5); + }); + + const sink = mongoSink({ + client: mongoClient, + dbName: "test", + collections: [TEST_COLLECTION], + }); + + const indexer = getMockIndexer({ + sink, + override: { + transform: async ({ context, endCursor, block: { data } }) => { + const { db } = useSink({ context }); + await db.collection(TEST_COLLECTION).insertOne({ + blockNumber: Number(endCursor?.orderKey), + data, + }); + }, + }, + }); + + await run(client, indexer); + + const result = await db + .collection(TEST_COLLECTION) + .find({}, { sort: { blockNumber: 1 } }) + .toArray(); + + expect(result).toHaveLength(5); + expect(result[0].data).toBe("5000000"); + expect(result[2].data).toBe("5000002"); + }); + + it("should update data", async () => { + const client = new MockClient((request, options) => { + return generateMockMessages(5); + }); + + const sink = mongoSink({ + client: mongoClient, + dbName: "test", + collections: [TEST_COLLECTION], + }); + + const indexer = getMockIndexer({ + sink, + override: { + transform: async ({ context, endCursor, block: { data } }) => { + const { db } = useSink({ context }); + + // insert data for each message in db + await db.collection(TEST_COLLECTION).insertOne({ + blockNumber: Number(endCursor?.orderKey), + data, + }); + + // update data for id 5000002 when orderKey is 5000004 + // this is to test if the update query is working + if (endCursor?.orderKey === 5000004n) { + // Find the document and update it, creating a new version + await db + .collection(TEST_COLLECTION) + .updateOne( + { blockNumber: 5000002 }, + { $set: { data: "0000000" } }, + ); + } + }, + }, + }); + + await run(client, indexer); + + const result = await db + .collection(TEST_COLLECTION) + .find() + .sort({ blockNumber: 1 }) + .toArray(); + + expect(result).toHaveLength(6); + expect( + result.find((r) => r.blockNumber === 5000002 && r._cursor?.to === null) + ?.data, + ).toBe("0000000"); + }); + + it("should handle soft deletes", async () => { + const client = new MockClient((request, options) => { + return generateMockMessages(5); + }); + + const sink = mongoSink({ + client: mongoClient, + dbName: "test", + collections: [TEST_COLLECTION], + }); + + const indexer = getMockIndexer({ + sink, + override: { + transform: async ({ context, endCursor, block: { data } }) => { + const { db } = useSink({ context }); + + // insert data for each message in db + await db.collection(TEST_COLLECTION).insertOne({ + blockNumber: Number(endCursor?.orderKey), + data, + }); + + // delete data for id 5000002 when orderKey is 5000004 + // this is to test if the delete query is working + if (endCursor?.orderKey === 5000004n) { + await db + .collection(TEST_COLLECTION) + .deleteOne({ blockNumber: 5000002 }); + } + }, + }, + }); + + await run(client, indexer); + + const result = await db + .collection(TEST_COLLECTION) + .find() + .sort({ blockNumber: 1 }) + .toArray(); + + expect(result).toHaveLength(5); + + // as when you run delete query on a data, it isnt literally deleted from the db, + // instead, we just update the upper bound of that row to the current cursor + // check if the cursor upper bound has been set correctly + expect(result.find((r) => r.blockNumber === 5000002)?._cursor?.to).toBe( + 5000004, + ); + }); + + it("should select data", async () => { + const client = new MockClient((request, options) => { + return generateMockMessages(5); + }); + + const sink = mongoSink({ + client: mongoClient, + dbName: "test", + collections: [TEST_COLLECTION], + }); + + let result: WithId[] = []; + + const indexer = getMockIndexer({ + sink, + override: { + transform: async ({ context, endCursor, block: { data } }) => { + const { db } = useSink({ context }); + + // insert data for each message in db + await db.collection(TEST_COLLECTION).insertOne({ + blockNumber: Number(endCursor?.orderKey), + data, + }); + + // delete data for id 5000002 when orderKey is 5000004 + // this will update the upper bound of the row with id 5000002 from null to 5000004 + // so when we select all rows, row with id 5000002 will not be included + // as when we run select query it should only return rows with upper bound null + if (endCursor?.orderKey === 5000003n) { + await db + .collection(TEST_COLLECTION) + .deleteOne({ blockNumber: 5000002 }); + } + + // when on last message of mock stream, select all rows from db + if (endCursor?.orderKey === 5000004n) { + result = await db + .collection(TEST_COLLECTION) + .find({}) + .sort({ blockNumber: 1 }) + .toArray(); + } + }, + }, + }); + + await run(client, indexer); + + expect(result).toHaveLength(4); + expect(result.find((r) => r.blockNumber === 5000002)).toBeUndefined(); + // check if all rows are still in db + const allRows = await db + .collection(TEST_COLLECTION) + .find() + .toArray(); + expect(allRows).toHaveLength(5); + }); + + it("should invalidate data correctly", async () => { + const sink = mongoSink({ + client: mongoClient, + dbName: "test", + collections: [TEST_COLLECTION], + }); + + // Insert test data + await db.collection(TEST_COLLECTION).insertMany([ + { blockNumber: 1, data: "data1", _cursor: { from: 1, to: 5 } }, + { blockNumber: 2, data: "data2", _cursor: { from: 2, to: 5 } }, + { blockNumber: 3, data: "data3", _cursor: { from: 3, to: null } }, + { blockNumber: 4, data: "data4", _cursor: { from: 4, to: null } }, + { blockNumber: 5, data: "data5", _cursor: { from: 5, to: null } }, + ]); + + // Create a cursor at position 3 + const cursor: Cursor = { orderKey: 3n }; + + // Invalidate data + await sink.invalidate(cursor); + + // Check the results + const result = await db + .collection(TEST_COLLECTION) + .find() + .sort({ blockNumber: 1 }) + .toArray(); + + expect(result).toHaveLength(3); + expect(result[0]._cursor?.to).toBe(null); + expect(result[1]._cursor?.to).toBe(null); + expect(result[2]._cursor?.to).toBe(null); + }); + + afterAll(async () => { + await mongoClient.close(); + }); +}); diff --git a/packages/sink-mongo/src/mongo.ts b/packages/sink-mongo/src/mongo.ts new file mode 100644 index 00000000..4a56dda3 --- /dev/null +++ b/packages/sink-mongo/src/mongo.ts @@ -0,0 +1,102 @@ +import { Sink, type SinkCursorParams } from "@apibara/indexer"; +import type { Cursor } from "@apibara/protocol"; +import type { ClientSession, DbOptions, MongoClient } from "mongodb"; +import { MongoSinkTransactionDb } from "./transaction"; + +export interface MongoSinkOptions { + client: MongoClient; + dbName: string; + dbOptions?: DbOptions; + collections: string[]; +} + +export class MongoSink extends Sink { + constructor( + private client: MongoClient, + private config: Omit, + ) { + super(); + } + + async transaction( + { cursor, endCursor, finality }: SinkCursorParams, + cb: (params: { + db: MongoSinkTransactionDb; + session: ClientSession; + }) => Promise, + ): Promise { + await this.client.withSession(async (session) => + session.withTransaction(async (session) => { + const db = this.client.db(this.config.dbName, this.config.dbOptions); + await cb({ + db: new MongoSinkTransactionDb(db, session, endCursor), + session, + }); + }), + ); + } + + async finalize(cursor?: Cursor) { + if (cursor?.orderKey === undefined) return; + + await this.client.withSession(async (session) => + session.withTransaction(async (session) => { + const db = this.client.db(this.config.dbName, this.config.dbOptions); + const orderKeyValue = Number(cursor.orderKey); + + for (const collection of this.config.collections) { + // Delete documents where the upper bound of _cursor is less than the finalize cursor + await db.collection(collection).deleteMany( + { + "_cursor.to": { $lt: orderKeyValue }, + }, + { session }, + ); + } + }), + ); + } + + async invalidate(cursor?: Cursor) { + if (cursor?.orderKey === undefined) return; + + await this.client.withSession(async (session) => + session.withTransaction(async (session) => { + const db = this.client.db(this.config.dbName, this.config.dbOptions); + const orderKeyValue = Number(cursor.orderKey); + + for (const collection of this.config.collections) { + // Delete documents where the lower bound of _cursor is greater than the invalidate cursor + await db.collection(collection).deleteMany( + { + "_cursor.from": { + $gt: orderKeyValue, + }, + }, + { session }, + ); + + // Update documents where the upper bound of _cursor is greater than the invalidate cursor + await db.collection(collection).updateMany( + { "_cursor.to": { $gt: orderKeyValue } }, + { + $set: { + "_cursor.to": null, + }, + }, + { session }, + ); + } + }), + ); + } + + async invalidateOnRestart(cursor?: Cursor) { + await this.invalidate(cursor); + } +} + +export const mongoSink = (args: MongoSinkOptions) => { + const { client, ...rest } = args; + return new MongoSink(client, rest); +}; diff --git a/packages/sink-mongo/src/transaction.ts b/packages/sink-mongo/src/transaction.ts new file mode 100644 index 00000000..e385fb73 --- /dev/null +++ b/packages/sink-mongo/src/transaction.ts @@ -0,0 +1,24 @@ +import type { Cursor } from "@apibara/protocol"; +import type { ClientSession, CollectionOptions, Db, Document } from "mongodb"; +import { MongoSinkCollection } from "./collection"; + +export class MongoSinkTransactionDb { + constructor( + private db: Db, + private session: ClientSession, + private endCursor?: Cursor, + ) {} + + collection( + name: string, + options?: CollectionOptions, + ) { + const collection = this.db.collection(name, options); + + return new MongoSinkCollection( + this.session, + collection, + this.endCursor, + ); + } +} diff --git a/packages/sink-mongo/tsconfig.json b/packages/sink-mongo/tsconfig.json new file mode 100644 index 00000000..9d059740 --- /dev/null +++ b/packages/sink-mongo/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "outDir": "dist", + "declarationDir": "dist", + "noEmit": false, + "rootDir": "src", + "types": ["node"] + }, + "include": ["src/"] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 42b75cfb..1fe9a556 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -585,6 +585,28 @@ importers: specifier: ^1.6.0 version: 1.6.0(@types/node@20.12.13) + packages/sink-mongo: + dependencies: + '@apibara/indexer': + specifier: workspace:* + version: link:../indexer + '@apibara/protocol': + specifier: workspace:* + version: link:../protocol + devDependencies: + '@types/node': + specifier: ^20.14.0 + version: 20.14.0 + mongodb: + specifier: ^6.12.0 + version: 6.12.0 + unbuild: + specifier: ^2.0.0 + version: 2.0.0(typescript@5.6.2) + vitest: + specifier: ^1.6.0 + version: 1.6.0(@types/node@20.14.0) + packages/starknet: dependencies: '@apibara/protocol': @@ -1484,6 +1506,9 @@ packages: '@jridgewell/trace-mapping@0.3.25': resolution: {integrity: sha512-vNk6aEwybGtawWmy/PzwnGDOjCkLWSD2wqvjGGAgOAwCGWySYXfYoxt00IJkTF+8Lb57DwOb3Aa0o9CApepiYQ==} + '@mongodb-js/saslprep@1.1.9': + resolution: {integrity: sha512-tVkljjeEaAhCqTzajSdgbQ6gE6f3oneVwa3iXR6csiEwXXOFsiC6Uh9iAjAhXPtqa/XMDHWjjeNH/77m/Yq2dw==} + '@noble/curves@1.2.0': resolution: {integrity: sha512-oYclrNgRaM9SsBUBVbb8M6DTV7ZHRTKugureoYEncY5c65HOmRzvSiTE3y5CYaPYJA/GVkrhXEoF0M3Ya9PMnw==} @@ -1913,6 +1938,12 @@ packages: '@types/shimmer@1.0.5': resolution: {integrity: sha512-9Hp0ObzwwO57DpLFF0InUjUm/II8GmKAvzbefxQTihCb7KI6yc9yzf0nLc4mVdby5N4DRCgQM2wCup9KTieeww==} + '@types/webidl-conversions@7.0.3': + resolution: {integrity: sha512-CiJJvcRtIgzadHCYXw7dqEnMNRjhGZlYK05Mj9OyktqV8uVT8fD2BFOB7S1uwBE3Kj2Z+4UyPmFw/Ixgw/LAlA==} + + '@types/whatwg-url@11.0.5': + resolution: {integrity: sha512-coYR071JRaHa+xoEvvYqvnIHaVqaYrLPbsufM9BF63HkwI5Lgmy2QR8Q5K/lYDYo5AK82wOvSOS0UsLTpTG7uQ==} + '@vitest/expect@1.6.0': resolution: {integrity: sha512-ixEvFVQjycy/oNgHjqsL6AZCDduC+tflRluaHIzKIsdbzkLn2U/iBnVeJwB6HsIjQBdfMR8Z0tRxKUsvFJEeWQ==} @@ -2064,6 +2095,10 @@ packages: engines: {node: ^6 || ^7 || ^8 || ^9 || ^10 || ^11 || ^12 || >=13.7} hasBin: true + bson@6.10.1: + resolution: {integrity: sha512-P92xmHDQjSKPLHqFxefqMxASNq/aWJMEZugpCjf+AF/pgcUpMMQCg7t7+ewko0/u8AapvF3luf/FoehddEK+sA==} + engines: {node: '>=16.20.1'} + buffer-from@1.1.2: resolution: {integrity: sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==} @@ -2825,6 +2860,9 @@ packages: mdn-data@2.0.30: resolution: {integrity: sha512-GaqWWShW4kv/G9IEucWScBx9G1/vsFZZJUO+tD26M8J8z3Kw5RDQjaoZe03YAClgeS/SWPOcb4nkFBTEi5DUEA==} + memory-pager@1.5.0: + resolution: {integrity: sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg==} + merge-stream@2.0.0: resolution: {integrity: sha512-abv/qOcuPfk3URPfDzmZU1LKmuw8kT+0nIHvKrKgFrwifol/doWcdA4ZqsWQ8ENrFKkd67Mfpo/LovbIUsbt3w==} @@ -2910,6 +2948,36 @@ packages: module-details-from-path@1.0.3: resolution: {integrity: sha512-ySViT69/76t8VhE1xXHK6Ch4NcDd26gx0MzKXLO+F7NOtnqH68d9zF94nT8ZWSxXh8ELOERsnJO/sWt1xZYw5A==} + mongodb-connection-string-url@3.0.1: + resolution: {integrity: sha512-XqMGwRX0Lgn05TDB4PyG2h2kKO/FfWJyCzYQbIhXUxz7ETt0I/FqHjUeqj37irJ+Dl1ZtU82uYyj14u2XsZKfg==} + + mongodb@6.12.0: + resolution: {integrity: sha512-RM7AHlvYfS7jv7+BXund/kR64DryVI+cHbVAy9P61fnb1RcWZqOW1/Wj2YhqMCx+MuYhqTRGv7AwHBzmsCKBfA==} + engines: {node: '>=16.20.1'} + peerDependencies: + '@aws-sdk/credential-providers': ^3.188.0 + '@mongodb-js/zstd': ^1.1.0 || ^2.0.0 + gcp-metadata: ^5.2.0 + kerberos: ^2.0.1 + mongodb-client-encryption: '>=6.0.0 <7' + snappy: ^7.2.2 + socks: ^2.7.1 + peerDependenciesMeta: + '@aws-sdk/credential-providers': + optional: true + '@mongodb-js/zstd': + optional: true + gcp-metadata: + optional: true + kerberos: + optional: true + mongodb-client-encryption: + optional: true + snappy: + optional: true + socks: + optional: true + mri@1.2.0: resolution: {integrity: sha512-tzzskb3bG8LvYGFF/mDTpq3jpI6Q9wc3LEmBaghu+DdCssd1FakN7Bc0hVNmEyGq1bq3RgfkCb3cmQLpNPOroA==} engines: {node: '>=4'} @@ -3524,6 +3592,9 @@ packages: resolution: {integrity: sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==} engines: {node: '>=0.10.0'} + sparse-bitfield@3.0.3: + resolution: {integrity: sha512-kvzhi7vqKTfkh0PZU+2D2PIllw2ymqJKujUcyPMd9Y75Nv4nPbGJZXNhxsgdQab2BmlDct1YnfQCguEvHr7VsQ==} + split2@4.2.0: resolution: {integrity: sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==} engines: {node: '>= 10.x'} @@ -3633,6 +3704,10 @@ packages: tr46@0.0.3: resolution: {integrity: sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==} + tr46@4.1.1: + resolution: {integrity: sha512-2lv/66T7e5yNyhAAC4NaKe5nVavzuGJQVVtRYLyQ2OI8tsJ61PMLlelehb0wi2Hx6+hT/OJUWZcw8MjlSRnxvw==} + engines: {node: '>=14'} + ts-error@1.0.6: resolution: {integrity: sha512-tLJxacIQUM82IR7JO1UUkKlYuUTmoY9HBJAmNWFzheSlDS5SPMcNIepejHJa4BpPQLAcbRhRf3GDJzyj6rbKvA==} @@ -3859,6 +3934,10 @@ packages: webidl-conversions@3.0.1: resolution: {integrity: sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==} + webidl-conversions@7.0.0: + resolution: {integrity: sha512-VwddBukDzu71offAQR975unBIGqfKZpM+8ZX6ySk8nYhVoo5CYaZyzt3YBvYtRtO+aoGlqxPg/B87NGVZ/fu6g==} + engines: {node: '>=12'} + webpack-sources@3.2.3: resolution: {integrity: sha512-/DyMEOrDgLKKIG0fmvtz+4dUX/3Ghozwgm6iPp8KRhvn+eQf9+Q7GWxVNMk3+uCPWfdXYC4ExGBckIXdFEfH1w==} engines: {node: '>=10.13.0'} @@ -3869,6 +3948,10 @@ packages: whatwg-fetch@3.6.20: resolution: {integrity: sha512-EqhiFU6daOA8kpjOWTL0olhVOF3i7OrFzSYiGsEMB8GcXS+RrzauAERX65xMeNWVqxA6HXH2m69Z9LaKKdisfg==} + whatwg-url@13.0.0: + resolution: {integrity: sha512-9WWbymnqj57+XEuqADHrCJ2eSXzn8WXIW/YSGaZtb2WKAInQ6CHfaUUcTyyver0p8BDg5StLQq8h1vtZuwmOig==} + engines: {node: '>=16'} + whatwg-url@5.0.0: resolution: {integrity: sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==} @@ -4531,6 +4614,10 @@ snapshots: '@jridgewell/resolve-uri': 3.1.1 '@jridgewell/sourcemap-codec': 1.4.15 + '@mongodb-js/saslprep@1.1.9': + dependencies: + sparse-bitfield: 3.0.3 + '@noble/curves@1.2.0': dependencies: '@noble/hashes': 1.3.2 @@ -4975,6 +5062,12 @@ snapshots: '@types/shimmer@1.0.5': {} + '@types/webidl-conversions@7.0.3': {} + + '@types/whatwg-url@11.0.5': + dependencies: + '@types/webidl-conversions': 7.0.3 + '@vitest/expect@1.6.0': dependencies: '@vitest/spy': 1.6.0 @@ -5132,6 +5225,8 @@ snapshots: node-releases: 2.0.14 update-browserslist-db: 1.0.16(browserslist@4.23.0) + bson@6.10.1: {} + buffer-from@1.1.2: {} buffer@5.7.1: @@ -5914,6 +6009,8 @@ snapshots: mdn-data@2.0.30: {} + memory-pager@1.5.0: {} + merge-stream@2.0.0: {} merge2@1.4.1: {} @@ -5996,6 +6093,17 @@ snapshots: module-details-from-path@1.0.3: {} + mongodb-connection-string-url@3.0.1: + dependencies: + '@types/whatwg-url': 11.0.5 + whatwg-url: 13.0.0 + + mongodb@6.12.0: + dependencies: + '@mongodb-js/saslprep': 1.1.9 + bson: 6.10.1 + mongodb-connection-string-url: 3.0.1 + mri@1.2.0: {} ms@2.1.2: {} @@ -6608,6 +6716,10 @@ snapshots: source-map@0.6.1: {} + sparse-bitfield@3.0.3: + dependencies: + memory-pager: 1.5.0 + split2@4.2.0: {} sqlite@5.1.1: {} @@ -6737,6 +6849,10 @@ snapshots: tr46@0.0.3: {} + tr46@4.1.1: + dependencies: + punycode: 2.3.1 + ts-error@1.0.6: {} ts-mixer@6.0.4: {} @@ -7132,12 +7248,19 @@ snapshots: webidl-conversions@3.0.1: {} + webidl-conversions@7.0.0: {} + webpack-sources@3.2.3: {} webpack-virtual-modules@0.6.1: {} whatwg-fetch@3.6.20: {} + whatwg-url@13.0.0: + dependencies: + tr46: 4.1.1 + webidl-conversions: 7.0.0 + whatwg-url@5.0.0: dependencies: tr46: 0.0.3