Skip to content

Commit

Permalink
explicit flush arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
kyscott18 committed Jun 12, 2024
1 parent b60df84 commit b5a0c0e
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 35 deletions.
2 changes: 1 addition & 1 deletion packages/core/src/bin/utils/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ export async function run({
}
}

await historicalStore.flush();
await historicalStore.flush({ isFullFlush: true });

// Become healthy
common.logger.info({
Expand Down
34 changes: 14 additions & 20 deletions packages/core/src/indexing-store/historical.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ test("findUnique() w/ cache miss", async (context) => {
data: { name: "Skip", age: 12 },
});

await (indexingStore as HistoricalStore).flush();
await (indexingStore as HistoricalStore).flush({ isFullFlush: true });

const instance = await indexingStore.findUnique({
tableName: "Pet",
Expand Down Expand Up @@ -465,7 +465,7 @@ test("update() w/ cache miss", async (context) => {
data: { name: "Skip", bigAge: 100n },
});

await (indexingStore as HistoricalStore).flush();
await (indexingStore as HistoricalStore).flush({ isFullFlush: true });

const updatedInstance = await indexingStore.update({
tableName: "Pet",
Expand All @@ -490,7 +490,7 @@ test("update() w/ find cache", async (context) => {
data: { name: "Skip", bigAge: 100n },
});

await (indexingStore as HistoricalStore).flush();
await (indexingStore as HistoricalStore).flush({ isFullFlush: true });

await indexingStore.findUnique({ tableName: "Pet", id: "id1" });

Expand All @@ -509,8 +509,7 @@ test("update() w/ find cache", async (context) => {

expect(findInstance).toMatchObject({ id: "id1", name: "Peanut Butter" });

// @ts-ignore
await indexingStore.flush();
await (indexingStore as HistoricalStore).flush({ isFullFlush: true });

const rows = await database.indexingDb
.withSchema(namespaceInfo.userNamespace)
Expand Down Expand Up @@ -700,7 +699,7 @@ test("upsert() w/ cache miss", async (context) => {
data: { name: "Skip", age: 12 },
});

await (indexingStore as HistoricalStore).flush();
await (indexingStore as HistoricalStore).flush({ isFullFlush: true });

const updatedInstance = await indexingStore.upsert({
tableName: "Pet",
Expand All @@ -722,8 +721,7 @@ test("upsert() w/ find cache", async (context) => {

// add pet.id1 to find cache

// @ts-ignore
await indexingStore.flush();
await (indexingStore as HistoricalStore).flush({ isFullFlush: true });

await indexingStore.findUnique({
tableName: "Pet",
Expand All @@ -748,8 +746,7 @@ test("upsert() w/ find cache", async (context) => {

// add pet.id1 to find cache, remove from create cache

// @ts-ignore
await indexingStore.flush();
await (indexingStore as HistoricalStore).flush({ isFullFlush: true });

await indexingStore.findUnique({
tableName: "Pet",
Expand All @@ -772,8 +769,7 @@ test("upsert() w/ find cache", async (context) => {

expect(findInstance).toMatchObject({ id: "id1", name: "Kevin" });

// @ts-ignore
await indexingStore.flush();
await (indexingStore as HistoricalStore).flush({ isFullFlush: true });

const rows = await database.indexingDb
.withSchema(namespaceInfo.userNamespace)
Expand Down Expand Up @@ -832,7 +828,7 @@ test("delete() w/ find cache", async (context) => {
data: { name: "Skip", age: 12 },
});

await (indexingStore as HistoricalStore).flush();
await (indexingStore as HistoricalStore).flush({ isFullFlush: true });

await indexingStore.findUnique({ tableName: "Pet", id: "id1" });

Expand All @@ -849,8 +845,7 @@ test("delete() w/ find cache", async (context) => {
});
expect(deletedInstance).toBe(null);

// @ts-ignore
await indexingStore.flush();
await (indexingStore as HistoricalStore).flush({ isFullFlush: true });

const rows = await database.indexingDb
.withSchema(namespaceInfo.userNamespace)
Expand Down Expand Up @@ -1065,7 +1060,7 @@ test("flush() insert", async (context) => {
data: { name: "Skip", age: 12 },
});

await (indexingStore as HistoricalStore).flush();
await (indexingStore as HistoricalStore).flush({ isFullFlush: true });

const rows = await database.indexingDb
.withSchema(namespaceInfo.userNamespace)
Expand Down Expand Up @@ -1095,15 +1090,15 @@ test("flush() update", async (context) => {
data: { name: "Skip", age: 12 },
});

await (indexingStore as HistoricalStore).flush();
await (indexingStore as HistoricalStore).flush({ isFullFlush: true });

await indexingStore.update({
tableName: "Pet",
id: "id1",
data: { name: "Peanut Butter" },
});

await (indexingStore as HistoricalStore).flush();
await (indexingStore as HistoricalStore).flush({ isFullFlush: true });

const rows = await database.indexingDb
.withSchema(namespaceInfo.userNamespace)
Expand Down Expand Up @@ -1143,8 +1138,7 @@ test("flush() partial", async (context) => {
],
});

// @ts-ignore
await indexingStore.flush(false);
await (indexingStore as HistoricalStore).flush({ isFullFlush: false });

const rows = await database.indexingDb
.withSchema(namespaceInfo.userNamespace)
Expand Down
20 changes: 10 additions & 10 deletions packages/core/src/indexing-store/historical.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ export const getHistoricalStore = ({

const shouldFlush = () => cacheSizeBytes > maxSizeBytes;

const flush = createQueue<void, boolean | void>({
const flush = createQueue<void, { isFullFlush: boolean }>({
concurrency: 1,
initialStart: true,
browser: false,
worker: async (fullFlush = true) => {
worker: async ({ isFullFlush }: { isFullFlush: boolean }) => {
const flushIndex = totalCacheOps - cacheSize * (1 - CACHE_FLUSH);

await Promise.all(
Expand All @@ -179,7 +179,7 @@ export const getHistoricalStore = ({

const cacheEntries = Object.values(tableStoreCache);

if (fullFlush) {
if (isFullFlush) {
insertRecords = cacheEntries
.filter(({ type }) => type === "insert")
.map(({ record }) => record!);
Expand Down Expand Up @@ -302,7 +302,7 @@ export const getHistoricalStore = ({
),
);

if (fullFlush) {
if (isFullFlush) {
for (const tableName of Object.keys(tables)) {
storeCache[tableName] = {};
}
Expand Down Expand Up @@ -361,7 +361,7 @@ export const getHistoricalStore = ({
cacheSizeBytes += bytes;
cacheSize++;

if (shouldFlush()) await flush(false);
if (shouldFlush()) await flush({ isFullFlush: false });

return structuredClone(record);
},
Expand All @@ -373,7 +373,7 @@ export const getHistoricalStore = ({
after?: string | null;
limit?: number;
}) => {
await flush(true);
await flush({ isFullFlush: true });

return readonlyStore.findMany(arg);
},
Expand Down Expand Up @@ -416,7 +416,7 @@ export const getHistoricalStore = ({
cacheSizeBytes += bytes;
cacheSize++;

if (shouldFlush()) await flush(false);
if (shouldFlush()) await flush({ isFullFlush: false });

return structuredClone(record);
},
Expand Down Expand Up @@ -458,7 +458,7 @@ export const getHistoricalStore = ({

cacheSize += data.length;

if (shouldFlush()) await flush(false);
if (shouldFlush()) await flush({ isFullFlush: false });

const returnData = structuredClone(data);
for (const record of data) {
Expand Down Expand Up @@ -544,7 +544,7 @@ export const getHistoricalStore = ({
| Partial<Omit<UserRecord, "id">>
| ((args: { current: UserRecord }) => Partial<Omit<UserRecord, "id">>);
}) => {
await flush(true);
await flush({ isFullFlush: true });
const table = (schema[tableName] as { table: Table }).table;

if (typeof data === "function") {
Expand Down Expand Up @@ -749,7 +749,7 @@ export const getHistoricalStore = ({
cacheSize++;
cacheSizeBytes += bytes;

if (shouldFlush()) await flush(false);
if (shouldFlush()) await flush({ isFullFlush: false });

return structuredClone(record);
}
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/indexing-store/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ export type WriteStore<
export type RealtimeStore = ReadonlyStore & WriteStore<"realtime">;

export type HistoricalStore = ReadonlyStore &
WriteStore<"historical"> & { flush: () => Promise<void> };
WriteStore<"historical"> & {
flush: (arg: { isFullFlush: boolean }) => Promise<void>;
};

export type IndexingStore<
env extends "historical" | "realtime" = "historical" | "realtime",
Expand Down
5 changes: 2 additions & 3 deletions packages/core/src/server/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
setupDatabaseServices,
setupIsolatedDatabase,
} from "@/_test/setup.js";
import type { ReadonlyStore } from "@/indexing-store/store.js";
import type { HistoricalStore, ReadonlyStore } from "@/indexing-store/store.js";
import { createSchema } from "@/schema/schema.js";
import { encodeCheckpoint, zeroCheckpoint } from "@/utils/checkpoint.js";
import type { GraphQLSchema } from "graphql";
Expand Down Expand Up @@ -161,8 +161,7 @@ test("graphql", async (context) => {
},
});

// @ts-ignore
await indexingStore.flush(true);
await (indexingStore as HistoricalStore).flush({ isFullFlush: true });

const graphqlSchema = buildGraphqlSchema(schema);

Expand Down

0 comments on commit b5a0c0e

Please sign in to comment.