From 7fe7d76dc80dffdb5741bd41d952a89f78a2be8c Mon Sep 17 00:00:00 2001 From: Francesco Ceccon Date: Sat, 18 Jan 2025 15:31:33 +0100 Subject: [PATCH 1/3] protocol: add data production field --- packages/indexer/src/indexer.test.ts | 16 +++++ packages/indexer/src/internal/testing.ts | 1 + packages/plugin-drizzle/tests/storage.test.ts | 28 +++++++++ packages/plugin-mongo/tests/storage.test.ts | 28 +++++++++ packages/plugin-sqlite/tests/kv.test.ts | 18 ++++++ .../plugin-sqlite/tests/persistence.test.ts | 28 +++++++++ packages/protocol/proto/stream.proto | 11 ++++ packages/protocol/src/proto/stream.ts | 61 ++++++++++++++++++- packages/protocol/src/stream.test.ts | 2 + packages/protocol/src/stream.ts | 31 ++++++++++ packages/protocol/src/testing/client.test.ts | 14 ++++- pnpm-lock.yaml | 8 ++- 12 files changed, 242 insertions(+), 4 deletions(-) diff --git a/packages/indexer/src/indexer.test.ts b/packages/indexer/src/indexer.test.ts index ef0b6455..9461aa3c 100644 --- a/packages/indexer/src/indexer.test.ts +++ b/packages/indexer/src/indexer.test.ts @@ -131,6 +131,7 @@ describe("Run Test", () => { cursor: { orderKey: 100n }, endCursor: { orderKey: 101n }, data: [null, null], + production: "backfill", }, }, { @@ -140,6 +141,7 @@ describe("Run Test", () => { cursor: { orderKey: 101n }, endCursor: { orderKey: 102n }, data: [null, null], + production: "backfill", }, }, { @@ -149,6 +151,7 @@ describe("Run Test", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, null], + production: "backfill", }, }, ]; @@ -165,6 +168,7 @@ describe("Run Test", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, { data: "103B" }], + production: "backfill", }, }, { @@ -174,6 +178,7 @@ describe("Run Test", () => { cursor: { orderKey: 103n }, endCursor: { orderKey: 104n }, data: [null, { data: "104B" }], + production: "backfill", }, }, { @@ -183,6 +188,7 @@ describe("Run Test", () => { cursor: { orderKey: 104n }, endCursor: { orderKey: 105n }, data: [null, { data: "105B" }], + production: "backfill", }, }, { @@ -192,6 +198,7 @@ describe("Run Test", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106B" }], + production: "backfill", }, }, ]; @@ -208,6 +215,7 @@ describe("Run Test", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106BC" }], + production: "backfill", }, }, { @@ -217,6 +225,7 @@ describe("Run Test", () => { cursor: { orderKey: 106n }, endCursor: { orderKey: 107n }, data: [null, { data: "107BC" }], + production: "backfill", }, }, { @@ -226,6 +235,7 @@ describe("Run Test", () => { cursor: { orderKey: 107n }, endCursor: { orderKey: 108n }, data: [null, { data: "108BC" }], + production: "backfill", }, }, ]; @@ -318,6 +328,7 @@ describe("Run Test", () => { cursor: { orderKey: 100n }, endCursor: { orderKey: 101n }, data: [null, null], + production: "backfill", }, }, { @@ -327,6 +338,7 @@ describe("Run Test", () => { cursor: { orderKey: 101n }, endCursor: { orderKey: 102n }, data: [null, null], + production: "backfill", }, }, { @@ -336,6 +348,7 @@ describe("Run Test", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, null], + production: "backfill", }, }, Error("this error should not occurr!"), @@ -354,6 +367,7 @@ describe("Run Test", () => { cursor: { orderKey: 103n }, endCursor: { orderKey: 104n }, data: [null, { data: "104B" }], + production: "backfill", }, }, { @@ -363,6 +377,7 @@ describe("Run Test", () => { cursor: { orderKey: 104n }, endCursor: { orderKey: 105n }, data: [null, { data: "105B" }], + production: "backfill", }, }, { @@ -372,6 +387,7 @@ describe("Run Test", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106B" }], + production: "backfill", }, }, ]; diff --git a/packages/indexer/src/internal/testing.ts b/packages/indexer/src/internal/testing.ts index e9513f3b..beef3dd9 100644 --- a/packages/indexer/src/internal/testing.ts +++ b/packages/indexer/src/internal/testing.ts @@ -56,6 +56,7 @@ export function generateMockMessages( finality: "accepted", data: [{ data: `${5_000_000 + i}` }], endCursor: { orderKey: BigInt(5_000_000 + i) }, + production: "backfill", }, }); } diff --git a/packages/plugin-drizzle/tests/storage.test.ts b/packages/plugin-drizzle/tests/storage.test.ts index 8aba7a4e..2a9c7eae 100644 --- a/packages/plugin-drizzle/tests/storage.test.ts +++ b/packages/plugin-drizzle/tests/storage.test.ts @@ -699,6 +699,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 100n }, endCursor: { orderKey: 101n }, data: [null, null], + production: "backfill", }, }, { @@ -708,6 +709,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 101n }, endCursor: { orderKey: 102n }, data: [null, null], + production: "backfill", }, }, { @@ -717,6 +719,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, null], + production: "backfill", }, }, ]; @@ -733,6 +736,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, { data: "103B" }], + production: "backfill", }, }, { @@ -742,6 +746,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 103n }, endCursor: { orderKey: 104n }, data: [null, { data: "104B" }], + production: "backfill", }, }, { @@ -751,6 +756,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 104n }, endCursor: { orderKey: 105n }, data: [null, { data: "105B" }], + production: "backfill", }, }, { @@ -760,6 +766,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106B" }], + production: "backfill", }, }, ]; @@ -776,6 +783,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106BC" }], + production: "backfill", }, }, { @@ -785,6 +793,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 106n }, endCursor: { orderKey: 107n }, data: [null, { data: "107BC" }], + production: "backfill", }, }, { @@ -794,6 +803,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 107n }, endCursor: { orderKey: 108n }, data: [null, { data: "108BC" }], + production: "backfill", }, }, ]; @@ -877,6 +887,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 100n }, endCursor: { orderKey: 101n }, data: [null, null], + production: "backfill", }, }, { @@ -886,6 +897,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 101n }, endCursor: { orderKey: 102n }, data: [null, null], + production: "backfill", }, }, { @@ -895,6 +907,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, null], + production: "backfill", }, }, ]; @@ -911,6 +924,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, { data: "103B" }], + production: "backfill", }, }, { @@ -920,6 +934,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 103n }, endCursor: { orderKey: 104n }, data: [null, { data: "104B" }], + production: "backfill", }, }, { @@ -929,6 +944,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 104n }, endCursor: { orderKey: 105n }, data: [null, { data: "105B" }], + production: "backfill", }, }, { @@ -938,6 +954,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106B" }], + production: "backfill", }, }, ]; @@ -954,6 +971,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106BC" }], + production: "backfill", }, }, { @@ -963,6 +981,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 106n }, endCursor: { orderKey: 107n }, data: [null, { data: "107BC" }], + production: "backfill", }, }, { @@ -1046,6 +1065,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 100n }, endCursor: { orderKey: 101n }, data: [null, null], + production: "backfill", }, }, { @@ -1055,6 +1075,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 101n }, endCursor: { orderKey: 102n }, data: [null, null], + production: "backfill", }, }, { @@ -1064,6 +1085,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, null], + production: "backfill", }, }, ]; @@ -1080,6 +1102,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, { data: "103B" }], + production: "backfill", }, }, { @@ -1089,6 +1112,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 103n }, endCursor: { orderKey: 104n }, data: [null, { data: "104B" }], + production: "backfill", }, }, { @@ -1098,6 +1122,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 104n }, endCursor: { orderKey: 105n }, data: [null, { data: "105B" }], + production: "backfill", }, }, { @@ -1107,6 +1132,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106B" }], + production: "backfill", }, }, ]; @@ -1123,6 +1149,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106BC" }], + production: "backfill", }, }, { @@ -1132,6 +1159,7 @@ describe("Drizzle test", () => { cursor: { orderKey: 106n }, endCursor: { orderKey: 107n }, data: [null, { data: "107BC" }], + production: "backfill", }, }, { diff --git a/packages/plugin-mongo/tests/storage.test.ts b/packages/plugin-mongo/tests/storage.test.ts index 224f60b1..87dc06d4 100644 --- a/packages/plugin-mongo/tests/storage.test.ts +++ b/packages/plugin-mongo/tests/storage.test.ts @@ -453,6 +453,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 100n }, endCursor: { orderKey: 101n }, data: [null, null], + production: "backfill", }, }, { @@ -462,6 +463,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 101n }, endCursor: { orderKey: 102n }, data: [null, null], + production: "backfill", }, }, { @@ -471,6 +473,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, null], + production: "backfill", }, }, ]; @@ -487,6 +490,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, { data: "103B" }], + production: "backfill", }, }, { @@ -496,6 +500,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 103n }, endCursor: { orderKey: 104n }, data: [null, { data: "104B" }], + production: "backfill", }, }, { @@ -505,6 +510,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 104n }, endCursor: { orderKey: 105n }, data: [null, { data: "105B" }], + production: "backfill", }, }, { @@ -514,6 +520,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106B" }], + production: "backfill", }, }, ]; @@ -530,6 +537,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106BC" }], + production: "backfill", }, }, { @@ -539,6 +547,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 106n }, endCursor: { orderKey: 107n }, data: [null, { data: "107BC" }], + production: "backfill", }, }, { @@ -548,6 +557,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 107n }, endCursor: { orderKey: 108n }, data: [null, { data: "108BC" }], + production: "backfill", }, }, ]; @@ -646,6 +656,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 100n }, endCursor: { orderKey: 101n }, data: [null, null], + production: "backfill", }, }, { @@ -655,6 +666,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 101n }, endCursor: { orderKey: 102n }, data: [null, null], + production: "backfill", }, }, { @@ -664,6 +676,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, null], + production: "backfill", }, }, ]; @@ -680,6 +693,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, { data: "103B" }], + production: "backfill", }, }, { @@ -689,6 +703,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 103n }, endCursor: { orderKey: 104n }, data: [null, { data: "104B" }], + production: "backfill", }, }, { @@ -698,6 +713,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 104n }, endCursor: { orderKey: 105n }, data: [null, { data: "105B" }], + production: "backfill", }, }, { @@ -707,6 +723,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106B" }], + production: "backfill", }, }, ]; @@ -723,6 +740,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106BC" }], + production: "backfill", }, }, { @@ -732,6 +750,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 106n }, endCursor: { orderKey: 107n }, data: [null, { data: "107BC" }], + production: "backfill", }, }, { @@ -830,6 +849,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 100n }, endCursor: { orderKey: 101n }, data: [null, null], + production: "backfill", }, }, { @@ -839,6 +859,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 101n }, endCursor: { orderKey: 102n }, data: [null, null], + production: "backfill", }, }, { @@ -848,6 +869,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, null], + production: "backfill", }, }, ]; @@ -864,6 +886,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, { data: "103B" }], + production: "backfill", }, }, { @@ -873,6 +896,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 103n }, endCursor: { orderKey: 104n }, data: [null, { data: "104B" }], + production: "backfill", }, }, { @@ -882,6 +906,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 104n }, endCursor: { orderKey: 105n }, data: [null, { data: "105B" }], + production: "backfill", }, }, { @@ -891,6 +916,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106B" }], + production: "backfill", }, }, ]; @@ -907,6 +933,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106BC" }], + production: "backfill", }, }, { @@ -916,6 +943,7 @@ describe("MongoDB Test", () => { cursor: { orderKey: 106n }, endCursor: { orderKey: 107n }, data: [null, { data: "107BC" }], + production: "backfill", }, }, { diff --git a/packages/plugin-sqlite/tests/kv.test.ts b/packages/plugin-sqlite/tests/kv.test.ts index a48fd1a9..94fadda8 100644 --- a/packages/plugin-sqlite/tests/kv.test.ts +++ b/packages/plugin-sqlite/tests/kv.test.ts @@ -124,6 +124,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 100n }, endCursor: { orderKey: 101n }, data: [null, null], + production: "backfill", }, }, { @@ -133,6 +134,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 101n }, endCursor: { orderKey: 102n }, data: [null, null], + production: "backfill", }, }, { @@ -142,6 +144,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, null], + production: "backfill", }, }, ]; @@ -158,6 +161,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, { data: "103B" }], + production: "backfill", }, }, { @@ -167,6 +171,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 103n }, endCursor: { orderKey: 104n }, data: [null, { data: "104B" }], + production: "backfill", }, }, { @@ -176,6 +181,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 104n }, endCursor: { orderKey: 105n }, data: [null, { data: "105B" }], + production: "backfill", }, }, { @@ -185,6 +191,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106B" }], + production: "backfill", }, }, ]; @@ -201,6 +208,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106BC" }], + production: "backfill", }, }, { @@ -210,6 +218,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 106n }, endCursor: { orderKey: 107n }, data: [null, { data: "107BC" }], + production: "backfill", }, }, { @@ -293,6 +302,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 100n }, endCursor: { orderKey: 101n }, data: [null, null], + production: "backfill", }, }, { @@ -302,6 +312,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 101n }, endCursor: { orderKey: 102n }, data: [null, null], + production: "backfill", }, }, { @@ -311,6 +322,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, null], + production: "backfill", }, }, ]; @@ -327,6 +339,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, { data: "103B" }], + production: "backfill", }, }, { @@ -336,6 +349,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 103n }, endCursor: { orderKey: 104n }, data: [null, { data: "104B" }], + production: "backfill", }, }, { @@ -345,6 +359,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 104n }, endCursor: { orderKey: 105n }, data: [null, { data: "105B" }], + production: "backfill", }, }, { @@ -354,6 +369,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106B" }], + production: "backfill", }, }, ]; @@ -370,6 +386,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106BC" }], + production: "backfill", }, }, { @@ -379,6 +396,7 @@ describe("SQLite key-value store", () => { cursor: { orderKey: 106n }, endCursor: { orderKey: 107n }, data: [null, { data: "107BC" }], + production: "backfill", }, }, { diff --git a/packages/plugin-sqlite/tests/persistence.test.ts b/packages/plugin-sqlite/tests/persistence.test.ts index 2469fe03..f7ad57b0 100644 --- a/packages/plugin-sqlite/tests/persistence.test.ts +++ b/packages/plugin-sqlite/tests/persistence.test.ts @@ -61,6 +61,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 100n }, endCursor: { orderKey: 101n }, data: [null, null], + production: "backfill", }, }, { @@ -70,6 +71,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 101n }, endCursor: { orderKey: 102n }, data: [null, null], + production: "backfill", }, }, { @@ -79,6 +81,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, null], + production: "backfill", }, }, ]; @@ -95,6 +98,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, { data: "103B" }], + production: "backfill", }, }, { @@ -104,6 +108,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 103n }, endCursor: { orderKey: 104n }, data: [null, { data: "104B" }], + production: "backfill", }, }, { @@ -113,6 +118,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 104n }, endCursor: { orderKey: 105n }, data: [null, { data: "105B" }], + production: "backfill", }, }, { @@ -122,6 +128,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106B" }], + production: "backfill", }, }, ]; @@ -138,6 +145,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106BC" }], + production: "backfill", }, }, { @@ -147,6 +155,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 106n }, endCursor: { orderKey: 107n }, data: [null, { data: "107BC" }], + production: "backfill", }, }, { @@ -156,6 +165,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 107n }, endCursor: { orderKey: 108n }, data: [null, { data: "108BC" }], + production: "backfill", }, }, ]; @@ -290,6 +300,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 100n }, endCursor: { orderKey: 101n }, data: [null, null], + production: "backfill", }, }, { @@ -299,6 +310,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 101n }, endCursor: { orderKey: 102n }, data: [null, null], + production: "backfill", }, }, { @@ -308,6 +320,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, null], + production: "backfill", }, }, ]; @@ -324,6 +337,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, { data: "103B" }], + production: "backfill", }, }, { @@ -333,6 +347,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 103n }, endCursor: { orderKey: 104n }, data: [null, { data: "104B" }], + production: "backfill", }, }, { @@ -342,6 +357,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 104n }, endCursor: { orderKey: 105n }, data: [null, { data: "105B" }], + production: "backfill", }, }, { @@ -351,6 +367,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106B" }], + production: "backfill", }, }, ]; @@ -367,6 +384,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106BC" }], + production: "backfill", }, }, { @@ -376,6 +394,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 106n }, endCursor: { orderKey: 107n }, data: [null, { data: "107BC" }], + production: "backfill", }, }, { @@ -457,6 +476,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 100n }, endCursor: { orderKey: 101n }, data: [null, null], + production: "backfill", }, }, { @@ -466,6 +486,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 101n }, endCursor: { orderKey: 102n }, data: [null, null], + production: "backfill", }, }, { @@ -475,6 +496,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, null], + production: "backfill", }, }, ]; @@ -491,6 +513,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 102n }, endCursor: { orderKey: 103n }, data: [{ data: "B" }, { data: "103B" }], + production: "backfill", }, }, { @@ -500,6 +523,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 103n }, endCursor: { orderKey: 104n }, data: [null, { data: "104B" }], + production: "backfill", }, }, { @@ -509,6 +533,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 104n }, endCursor: { orderKey: 105n }, data: [null, { data: "105B" }], + production: "backfill", }, }, { @@ -518,6 +543,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106B" }], + production: "backfill", }, }, ]; @@ -534,6 +560,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 105n }, endCursor: { orderKey: 106n }, data: [{ data: "C" }, { data: "106BC" }], + production: "backfill", }, }, { @@ -543,6 +570,7 @@ describe("SQLite persistence", () => { cursor: { orderKey: 106n }, endCursor: { orderKey: 107n }, data: [null, { data: "107BC" }], + production: "backfill", }, }, { diff --git a/packages/protocol/proto/stream.proto b/packages/protocol/proto/stream.proto index adb38907..f98e06a3 100644 --- a/packages/protocol/proto/stream.proto +++ b/packages/protocol/proto/stream.proto @@ -106,6 +106,8 @@ message Data { // // This message contains chain-specific data serialized using protobuf. repeated bytes data = 4; + // The production mode of the block. + DataProduction production = 5; } // Sent to clients to check if stream is still connected. @@ -131,3 +133,12 @@ enum DataFinality { // Data is finalized and cannot be invalidated. DATA_FINALITY_FINALIZED = 3; } + +// Data production mode. +enum DataProduction { + DATA_PRODUCTION_UNKNOWN = 0; + // Data is for a backfilled block. + DATA_PRODUCTION_BACKFILL = 1; + // Data is for a live block. + DATA_PRODUCTION_LIVE = 2; +} diff --git a/packages/protocol/src/proto/stream.ts b/packages/protocol/src/proto/stream.ts index d43ac790..692277db 100644 --- a/packages/protocol/src/proto/stream.ts +++ b/packages/protocol/src/proto/stream.ts @@ -63,6 +63,48 @@ export function dataFinalityToJSON(object: DataFinality): string { } } +/** Data production mode. */ +export enum DataProduction { + UNKNOWN = 0, + /** BACKFILL - Data is for a backfilled block. */ + BACKFILL = 1, + /** LIVE - Data is for a live block. */ + LIVE = 2, + UNRECOGNIZED = -1, +} + +export function dataProductionFromJSON(object: any): DataProduction { + switch (object) { + case 0: + case "DATA_PRODUCTION_UNKNOWN": + return DataProduction.UNKNOWN; + case 1: + case "DATA_PRODUCTION_BACKFILL": + return DataProduction.BACKFILL; + case 2: + case "DATA_PRODUCTION_LIVE": + return DataProduction.LIVE; + case -1: + case "UNRECOGNIZED": + default: + return DataProduction.UNRECOGNIZED; + } +} + +export function dataProductionToJSON(object: DataProduction): string { + switch (object) { + case DataProduction.UNKNOWN: + return "DATA_PRODUCTION_UNKNOWN"; + case DataProduction.BACKFILL: + return "DATA_PRODUCTION_BACKFILL"; + case DataProduction.LIVE: + return "DATA_PRODUCTION_LIVE"; + case DataProduction.UNRECOGNIZED: + default: + return "UNRECOGNIZED"; + } +} + /** A cursor over the stream content. */ export interface Cursor { /** @@ -192,6 +234,8 @@ export interface Data { * This message contains chain-specific data serialized using protobuf. */ readonly data: readonly Uint8Array[]; + /** The production mode of the block. */ + readonly production: DataProduction; } /** Sent to clients to check if stream is still connected. */ @@ -838,7 +882,7 @@ export const Finalize = { }; function createBaseData(): Data { - return { cursor: undefined, endCursor: undefined, finality: 0, data: [] }; + return { cursor: undefined, endCursor: undefined, finality: 0, data: [], production: 0 }; } export const Data = { @@ -855,6 +899,9 @@ export const Data = { for (const v of message.data) { writer.uint32(34).bytes(v!); } + if (message.production !== 0) { + writer.uint32(40).int32(message.production); + } return writer; }, @@ -893,6 +940,13 @@ export const Data = { message.data.push(reader.bytes()); continue; + case 5: + if (tag !== 40) { + break; + } + + message.production = reader.int32() as any; + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -908,6 +962,7 @@ export const Data = { endCursor: isSet(object.endCursor) ? Cursor.fromJSON(object.endCursor) : undefined, finality: isSet(object.finality) ? dataFinalityFromJSON(object.finality) : 0, data: globalThis.Array.isArray(object?.data) ? object.data.map((e: any) => bytesFromBase64(e)) : [], + production: isSet(object.production) ? dataProductionFromJSON(object.production) : 0, }; }, @@ -925,6 +980,9 @@ export const Data = { if (message.data?.length) { obj.data = message.data.map((e) => base64FromBytes(e)); } + if (message.production !== 0) { + obj.production = dataProductionToJSON(message.production); + } return obj; }, @@ -941,6 +999,7 @@ export const Data = { : undefined; message.finality = object.finality ?? 0; message.data = object.data?.map((e) => e) || []; + message.production = object.production ?? 0; return message; }, }; diff --git a/packages/protocol/src/stream.test.ts b/packages/protocol/src/stream.test.ts index 06542965..e9920d17 100644 --- a/packages/protocol/src/stream.test.ts +++ b/packages/protocol/src/stream.test.ts @@ -95,6 +95,7 @@ describe("StreamDataResponse", () => { data: { finality: "accepted", data: [{ value: "hello" }, { value: "world" }], + production: "backfill", }, } as const; @@ -120,6 +121,7 @@ describe("StreamDataResponse", () => { ], ], "finality": 2, + "production": 1, }, } `); diff --git a/packages/protocol/src/stream.ts b/packages/protocol/src/stream.ts index 815db6c4..24d13147 100644 --- a/packages/protocol/src/stream.ts +++ b/packages/protocol/src/stream.ts @@ -34,6 +34,35 @@ export const DataFinality = Schema.transform( export type DataFinality = typeof DataFinality.Type; +/** Data production mode. */ +export const DataProduction = Schema.transform( + Schema.Enums(proto.stream.DataProduction), + Schema.Literal("backfill", "live", "unknown"), + { + decode(value) { + const enumMap = { + [proto.stream.DataProduction.BACKFILL]: "backfill", + [proto.stream.DataProduction.LIVE]: "live", + [proto.stream.DataProduction.UNKNOWN]: "unknown", + [proto.stream.DataProduction.UNRECOGNIZED]: "unknown", + } as const; + + return enumMap[value] ?? "unknown"; + }, + encode(value) { + const enumMap = { + backfill: proto.stream.DataProduction.BACKFILL, + live: proto.stream.DataProduction.LIVE, + unknown: proto.stream.DataProduction.UNKNOWN, + }; + + return enumMap[value] ?? proto.stream.DataProduction.UNKNOWN; + }, + }, +); + +export type DataProduction = typeof DataProduction.Type; + export const Duration = Schema.Struct({ seconds: Schema.BigIntFromSelf, nanos: Schema.Number, @@ -112,6 +141,7 @@ export const Data = ( cursor: Schema.optional(Cursor), endCursor: Schema.optional(Cursor), finality: DataFinality, + production: DataProduction, data: Schema.Array(schema), }), }); @@ -136,6 +166,7 @@ export type StreamDataResponse = cursor?: Cursor | undefined; endCursor?: Cursor | undefined; finality: DataFinality; + production: DataProduction; data: readonly (TA | null)[]; }; }; diff --git a/packages/protocol/src/testing/client.test.ts b/packages/protocol/src/testing/client.test.ts index a45c92ef..b8d9d343 100644 --- a/packages/protocol/src/testing/client.test.ts +++ b/packages/protocol/src/testing/client.test.ts @@ -9,7 +9,11 @@ describe("MockClient", () => { return [ { _tag: "data", - data: { finality: "finalized", data: [{ data: "hello" }] }, + data: { + finality: "finalized", + data: [{ data: "hello" }], + production: "backfill", + }, }, ]; }); @@ -30,6 +34,7 @@ describe("MockClient", () => { }, ], "finality": "finalized", + "production": "backfill", }, }, ] @@ -41,7 +46,11 @@ describe("MockClient", () => { return [ { _tag: "data", - data: { finality: "finalized", data: [{ data: "hello" }, null] }, + data: { + finality: "finalized", + data: [{ data: "hello" }, null], + production: "backfill", + }, }, ]; }); @@ -63,6 +72,7 @@ describe("MockClient", () => { null, ], "finality": "finalized", + "production": "backfill", }, }, ] diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2cac0153..c3046d93 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -2305,6 +2305,10 @@ packages: engines: {node: '>=18'} hasBin: true + escalade@3.1.1: + resolution: {integrity: sha512-k0er2gUkLf8O0zKJiAhmkTnJlTvINGv7ygDNPbeIsX/TJjGJZHuh9B2UxbsaEkmlEo9MfhrSzmhIlhRlI2GXnw==} + engines: {node: '>=6'} + escalade@3.1.2: resolution: {integrity: sha512-ErCHMCae19vR8vQGe50xIsVomy19rg6gFu3+r3jkEO46suLMWBksvVyoGgQV+jOfl84ZSOSlmv6Gxa89PmTGmA==} engines: {node: '>=6'} @@ -5308,6 +5312,8 @@ snapshots: '@esbuild/win32-ia32': 0.23.0 '@esbuild/win32-x64': 0.23.0 + escalade@3.1.1: {} + escalade@3.1.2: {} escape-string-regexp@1.0.5: {} @@ -6993,7 +6999,7 @@ snapshots: yargs@17.7.2: dependencies: cliui: 8.0.1 - escalade: 3.1.2 + escalade: 3.1.1 get-caller-file: 2.0.5 require-directory: 2.1.1 string-width: 4.2.3 From c48a86805b6dbf48836aef2dace3f12f21d78894 Mon Sep 17 00:00:00 2001 From: Francesco Ceccon Date: Sat, 18 Jan 2025 15:53:55 +0100 Subject: [PATCH 2/3] protocol: add timeout to stream --- examples/starknet-client/src/main.ts | 6 ++- packages/protocol/src/client.ts | 74 +++++++++++++++++++--------- 2 files changed, 55 insertions(+), 25 deletions(-) diff --git a/examples/starknet-client/src/main.ts b/examples/starknet-client/src/main.ts index 761a65db..5f8a7b78 100644 --- a/examples/starknet-client/src/main.ts +++ b/examples/starknet-client/src/main.ts @@ -75,11 +75,13 @@ const command = defineCommand({ filter: [filter], finality: "accepted", startingCursor: { - orderKey: 800_000n, + orderKey: 1_078_335n, }, }); - for await (const message of client.streamData(request)) { + for await (const message of client.streamData(request, { + timeout: 40_000, + })) { switch (message._tag) { case "data": { consola.info("Data", message.data.endCursor?.orderKey); diff --git a/packages/protocol/src/client.ts b/packages/protocol/src/client.ts index 0947ed24..5fb3876f 100644 --- a/packages/protocol/src/client.ts +++ b/packages/protocol/src/client.ts @@ -23,14 +23,25 @@ import { type StreamDataRequest, StreamDataResponse } from "./stream"; export { ClientError, Status } from "nice-grpc"; +const DEFAULT_TIMEOUT_MS = 45_000; + +export class TimeoutError extends Error { + constructor(timeout: number) { + super(`No message received in ${timeout}ms`); + this.name = "TimeoutError"; + } +} + /** Client call options. */ export interface ClientCallOptions { signal?: AbortSignal; } export interface StreamDataOptions extends ClientCallOptions { - /** Stop at the specified cursor (inclusive) */ + /** Stop at the specified cursor (inclusive). */ endingCursor?: Cursor; + /** Timeout between messages, in milliseconds. */ + timeout?: number; } /** DNA client. */ @@ -112,44 +123,61 @@ export class StreamDataIterable { const inner = this.it[Symbol.asyncIterator](); const schema = StreamDataResponse(this.schema); const decoder = Schema.decodeSync(schema); - const { endingCursor } = this.options ?? {}; + const { endingCursor, timeout = DEFAULT_TIMEOUT_MS } = this.options ?? {}; let shouldStop = false; + let clock: string | number | NodeJS.Timeout | undefined; + return { async next() { if (shouldStop) { return { done: true, value: undefined }; } - const { done, value } = await inner.next(); + // biome-ignore lint/suspicious/noExplicitAny: any is ok + const t: Promise<{ done: boolean; value: any }> = new Promise( + (_, reject) => { + clock = setTimeout(() => { + reject(new TimeoutError(timeout)); + }, timeout); + }, + ); - if (done || value.message === undefined) { - return { done: true, value: undefined }; - } + try { + const { done, value } = await Promise.race([inner.next(), t]); + + clearTimeout(clock); + + if (done || value.message === undefined) { + return { done: true, value: undefined }; + } - const decodedMessage = decoder(value.message); + const decodedMessage = decoder(value.message); - if (endingCursor) { - assert(value.message.$case === "data"); - assert(decodedMessage._tag === "data"); + if (endingCursor) { + assert(value.message.$case === "data"); + assert(decodedMessage._tag === "data"); - const { orderKey, uniqueKey } = endingCursor; - const endCursor = decodedMessage.data.endCursor; + const { orderKey, uniqueKey } = endingCursor; + const endCursor = decodedMessage.data.endCursor; - // Check if the orderKey matches - if (orderKey === endCursor?.orderKey) { - // If a uniqueKey is specified, it must also match - if (!uniqueKey || uniqueKey === endCursor.uniqueKey) { - shouldStop = true; - return { done: false, value: decodedMessage }; + // Check if the orderKey matches + if (orderKey === endCursor?.orderKey) { + // If a uniqueKey is specified, it must also match + if (!uniqueKey || uniqueKey === endCursor.uniqueKey) { + shouldStop = true; + return { done: false, value: decodedMessage }; + } } } - } - return { - done: false, - value: decodedMessage, - }; + return { + done: false, + value: decodedMessage, + }; + } finally { + clearTimeout(clock); + } }, }; } From 4f827bad3a85304b80dd3d82931fced1cd009e1a Mon Sep 17 00:00:00 2001 From: Francesco Ceccon Date: Sat, 18 Jan 2025 15:54:30 +0100 Subject: [PATCH 3/3] Change files --- ...ibara-indexer-17e06f60-8a2d-4dd6-bc1f-a4b2fb4d3b3a.json | 7 +++++++ ...lugin-drizzle-f060747e-3aa6-4501-a250-dd8d12dbae1a.json | 7 +++++++ ...-plugin-mongo-3fbd3cc4-77af-4525-9192-0cd635c71b48.json | 7 +++++++ ...plugin-sqlite-8891e8b0-1000-4500-8f2b-84a0c119f61e.json | 7 +++++++ ...bara-protocol-70715784-8c5b-4a67-8852-e732f96e76d6.json | 7 +++++++ 5 files changed, 35 insertions(+) create mode 100644 change/@apibara-indexer-17e06f60-8a2d-4dd6-bc1f-a4b2fb4d3b3a.json create mode 100644 change/@apibara-plugin-drizzle-f060747e-3aa6-4501-a250-dd8d12dbae1a.json create mode 100644 change/@apibara-plugin-mongo-3fbd3cc4-77af-4525-9192-0cd635c71b48.json create mode 100644 change/@apibara-plugin-sqlite-8891e8b0-1000-4500-8f2b-84a0c119f61e.json create mode 100644 change/@apibara-protocol-70715784-8c5b-4a67-8852-e732f96e76d6.json diff --git a/change/@apibara-indexer-17e06f60-8a2d-4dd6-bc1f-a4b2fb4d3b3a.json b/change/@apibara-indexer-17e06f60-8a2d-4dd6-bc1f-a4b2fb4d3b3a.json new file mode 100644 index 00000000..24afbbf5 --- /dev/null +++ b/change/@apibara-indexer-17e06f60-8a2d-4dd6-bc1f-a4b2fb4d3b3a.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "protocol: add data production field", + "packageName": "@apibara/indexer", + "email": "francesco@ceccon.me", + "dependentChangeType": "patch" +} diff --git a/change/@apibara-plugin-drizzle-f060747e-3aa6-4501-a250-dd8d12dbae1a.json b/change/@apibara-plugin-drizzle-f060747e-3aa6-4501-a250-dd8d12dbae1a.json new file mode 100644 index 00000000..0866e40a --- /dev/null +++ b/change/@apibara-plugin-drizzle-f060747e-3aa6-4501-a250-dd8d12dbae1a.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "protocol: add data production field", + "packageName": "@apibara/plugin-drizzle", + "email": "francesco@ceccon.me", + "dependentChangeType": "patch" +} diff --git a/change/@apibara-plugin-mongo-3fbd3cc4-77af-4525-9192-0cd635c71b48.json b/change/@apibara-plugin-mongo-3fbd3cc4-77af-4525-9192-0cd635c71b48.json new file mode 100644 index 00000000..d4153dc0 --- /dev/null +++ b/change/@apibara-plugin-mongo-3fbd3cc4-77af-4525-9192-0cd635c71b48.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "protocol: add data production field", + "packageName": "@apibara/plugin-mongo", + "email": "francesco@ceccon.me", + "dependentChangeType": "patch" +} diff --git a/change/@apibara-plugin-sqlite-8891e8b0-1000-4500-8f2b-84a0c119f61e.json b/change/@apibara-plugin-sqlite-8891e8b0-1000-4500-8f2b-84a0c119f61e.json new file mode 100644 index 00000000..5c45614a --- /dev/null +++ b/change/@apibara-plugin-sqlite-8891e8b0-1000-4500-8f2b-84a0c119f61e.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "protocol: add data production field", + "packageName": "@apibara/plugin-sqlite", + "email": "francesco@ceccon.me", + "dependentChangeType": "patch" +} diff --git a/change/@apibara-protocol-70715784-8c5b-4a67-8852-e732f96e76d6.json b/change/@apibara-protocol-70715784-8c5b-4a67-8852-e732f96e76d6.json new file mode 100644 index 00000000..ddb6ef6e --- /dev/null +++ b/change/@apibara-protocol-70715784-8c5b-4a67-8852-e732f96e76d6.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "Add data production field to data messages", + "packageName": "@apibara/protocol", + "email": "francesco@ceccon.me", + "dependentChangeType": "patch" +}