Skip to content

Commit

Permalink
sandbox cache records from userland
Browse files Browse the repository at this point in the history
  • Loading branch information
kyscott18 committed Jun 11, 2024
1 parent 1f0fb11 commit d741660
Showing 1 changed file with 76 additions and 96 deletions.
172 changes: 76 additions & 96 deletions packages/core/src/indexing-store/historical.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ export const getHistoricalStore = ({
/** True if the cache contains the complete state of the store. */
let isCacheFull = _isCacheFull;

/** Number of rows in cache. */
let cacheSize = 0;
/** Estimated number of bytes used by cache. */
let cacheSizeBytes = 0;
/** LRU counter. */
let totalCacheOps = 0;

for (const tableName of Object.keys(tables)) {
Expand All @@ -91,48 +94,8 @@ export const getHistoricalStore = ({
};
}

const getRecordSize = (record: UserRecord) => {
// size of metadata
let size = 16;
for (const col of Object.values(record)) {
if (typeof col === "number") {
// p.float, p.int
size += 8;
} else if (typeof col === "string") {
// p.hex, p.string, p.enum
size += 2 * col.length;
} else if (typeof col === "boolean") {
// p.boolean
size += 4;
} else if (typeof col === "bigint") {
// p.bigint
size += 48;
} else if (Array.isArray(col)) {
// p.list
for (const e of col) {
size += getRecordSize(e);
}
} else if (col === null || col === undefined) {
size += 8;
} else {
// p.json
for (const e of Object.values(col)) {
size += getRecordSize(e);
}
}
}
return size;
};

const isIdColumnHex = Object.entries(tables).reduce<{
[tableName: string]: boolean;
}>((acc, [tableName, { table }]) => {
acc[tableName] = table.id[" scalar"] === "hex";
return acc;
}, {});

const getCacheId = (id: UserId, tableName: string): string | number => {
if (isIdColumnHex[tableName])
if (tables[tableName].table.id[" scalar"] === "hex")
return padHex(id as Hex, {
size: Math.ceil(((id as Hex).length - 2) / 2),
dir: "left",
Expand All @@ -142,9 +105,17 @@ export const getHistoricalStore = ({
};

/**
* Note: record is validated before passed to this function
* Validates and transforms a record to be equivalent to a record
* written and read from the db.
*/
const sanitizeRecord = (record: UserRecord, tableName: string) => {
validateRecord({
record,
table: tables[tableName].table,
schema,
skipValidate: false,
});

for (const [columnName, column] of Object.entries(
tables[tableName].table,
)) {
Expand All @@ -164,6 +135,8 @@ export const getHistoricalStore = ({
}).toLowerCase();
}
}

return getRecordSize(record);
};

const shouldFlush = () => cacheSizeBytes > maxSizeBytes;
Expand Down Expand Up @@ -355,7 +328,7 @@ export const getHistoricalStore = ({
storeCache[tableName].insert[encodedId]?.record ??
storeCache[tableName].update[encodedId]?.record;

if (cacheRecord !== undefined) return cacheRecord;
if (cacheRecord !== undefined) return structuredClone(cacheRecord);
if (isCacheFull) return null;

// TODO(kyle) load result into cache
Expand Down Expand Up @@ -391,14 +364,9 @@ export const getHistoricalStore = ({
throw new UniqueConstraintError();
}

const record = { ...data, id };

validateRecord({
record,
table: tables[tableName].table,
schema,
skipValidate: false,
});
// copy user-land record
const record = structuredClone(data) as UserRecord;
record.id = id;

sanitizeRecord(record, tableName);

Expand All @@ -415,7 +383,7 @@ export const getHistoricalStore = ({

if (shouldFlush()) await flush(false);

return record;
return structuredClone(record);
},
createMany: async ({
tableName,
Expand All @@ -424,8 +392,8 @@ export const getHistoricalStore = ({
tableName: string;
data: UserRecord[];
}) => {
for (const record of data) {
const encodedId = getCacheId(record.id, tableName);
for (const _record of data) {
const encodedId = getCacheId(_record.id, tableName);

if (
storeCache[tableName].insert[encodedId] !== undefined ||
Expand All @@ -434,12 +402,8 @@ export const getHistoricalStore = ({
throw new UniqueConstraintError();
}

validateRecord({
record,
table: tables[tableName].table,
schema,
skipValidate: false,
});
// copy user-land record
const record = structuredClone(_record);

sanitizeRecord(record, tableName);

Expand All @@ -458,7 +422,7 @@ export const getHistoricalStore = ({

if (shouldFlush()) await flush(false);

return data;
return structuredClone(data);
},
update: async ({
tableName,
Expand Down Expand Up @@ -501,20 +465,14 @@ export const getHistoricalStore = ({

const update =
typeof data === "function"
? data({ current: cacheEntry.record })
? data({ current: structuredClone(cacheEntry.record) })
: data;

const record: UserRecord = {
...cacheEntry.record,
...update,
};

validateRecord({
record,
table: tables[tableName].table,
schema,
skipValidate: false,
});
// copy user-land record
const record = cacheEntry.record;
for (const [key, value] of Object.entries(structuredClone(update))) {
record[key] = value;
}

sanitizeRecord(record, tableName);

Expand All @@ -524,7 +482,7 @@ export const getHistoricalStore = ({
cacheEntry.opIndex = totalCacheOps++;
cacheEntry.bytes = bytes;

return record;
return structuredClone(record);
},
updateMany: async ({
tableName,
Expand Down Expand Up @@ -704,14 +662,9 @@ export const getHistoricalStore = ({
if (cacheEntry === undefined) {
// insert/create branch

const record = { ...create, id };

validateRecord({
record,
table: tables[tableName].table,
schema,
skipValidate: false,
});
// copy user-land record
const record = structuredClone(create) as UserRecord;
record.id = id;

sanitizeRecord(record, tableName);

Expand All @@ -728,26 +681,20 @@ export const getHistoricalStore = ({

if (shouldFlush()) await flush(false);

return record;
return structuredClone(record);
} else {
// update branch

const _update =
typeof update === "function"
? update({ current: cacheEntry.record })
? update({ current: structuredClone(cacheEntry.record) })
: update;

const record: UserRecord = {
...cacheEntry.record,
..._update,
};

validateRecord({
record,
table: tables[tableName].table,
schema,
skipValidate: false,
});
// copy user-land record
const record = cacheEntry.record;
for (const [key, value] of Object.entries(structuredClone(_update))) {
record[key] = value;
}

sanitizeRecord(record, tableName);

Expand All @@ -757,7 +704,7 @@ export const getHistoricalStore = ({
cacheEntry.opIndex = totalCacheOps++;
cacheEntry.bytes = bytes;

return record;
return structuredClone(record);
}
},
delete: async ({
Expand Down Expand Up @@ -813,3 +760,36 @@ export const getHistoricalStore = ({
flush,
};
};

const getRecordSize = (record: UserRecord) => {
// size of metadata
let size = 16;
for (const col of Object.values(record)) {
if (typeof col === "number") {
// p.float, p.int
size += 8;
} else if (typeof col === "string") {
// p.hex, p.string, p.enum
size += 2 * col.length;
} else if (typeof col === "boolean") {
// p.boolean
size += 4;
} else if (typeof col === "bigint") {
// p.bigint
size += 48;
} else if (Array.isArray(col)) {
// p.list
for (const e of col) {
size += getRecordSize(e);
}
} else if (col === null || col === undefined) {
size += 8;
} else {
// p.json
for (const e of Object.values(col)) {
size += getRecordSize(e);
}
}
}
return size;
};

0 comments on commit d741660

Please sign in to comment.