Skip to content

Commit

Permalink
Restore main file. Add CLI for BigQuery schema generation.
Browse files Browse the repository at this point in the history
  • Loading branch information
0xJem committed Nov 1, 2022
1 parent 0e49121 commit dc1c74a
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 58 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,5 @@ dist

# TernJS port file
.tern-port

tmp/
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"scripts": {
"test": "jest",
"codegen": "graphql-codegen",
"codegen:bq": "ts-node src/bigquery.ts",
"execute": "ts-node src/index.ts",
"lint": "eslint --config ./.eslintrc.js ./src/ --ext .jsx,.js,.tsx,.ts",
"lint:fix": "prettier --write ./src/ & yarn lint --fix"
Expand Down Expand Up @@ -47,4 +48,4 @@
"graphql-tag": "^2.12.6",
"jsonl-parse-stringify": "^1.0.3"
}
}
}
19 changes: 19 additions & 0 deletions src/bigquery.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { getBigQuerySchema } from "./helpers/bigquerySchema";
import { writeFile } from "./helpers/fs";
import { generateJSONSchema } from "./helpers/jsonSchema";

const writeBigQuerySchema = async (object: string, typesFilePath: string): Promise<void> => {
const schema = await generateJSONSchema(object, typesFilePath);

const bqSchema = await getBigQuerySchema(schema);
writeFile(`./tmp/${object}_schema.json`, bqSchema);
};

// Runs via CLI
if (require.main === module) {
if (process.argv.length !== 4) {
throw new Error(`2 arguments are required, but the app received: ${JSON.stringify(process.argv.slice(2))}`);
}

writeBigQuerySchema(process.argv[2], process.argv[3]);
}
10 changes: 10 additions & 0 deletions src/helpers/fs.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { existsSync, mkdirSync, writeFileSync } from "fs";
import { dirname } from "path";
import path = require("path");

export const extractPartitionKey = (filePath: string): string => {
Expand All @@ -8,3 +10,11 @@ export const extractPartitionKey = (filePath: string): string => {
// Split into ["token-balances/dt=", "2021-01-01"]
return directoryPath.split("dt=")[1];
};

export const writeFile = (filePath: string, content: string): void => {
if (!existsSync(dirname(filePath))) {
mkdirSync(dirname(filePath));
}

writeFileSync(filePath, content);
};
107 changes: 50 additions & 57 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { createClient } from "@urql/core";
import fetch from "cross-fetch";

import { getBigQuerySchema } from "./helpers/bigquerySchema";
import { generateJSONSchema } from "./helpers/jsonSchema";
import { generateQuery } from "./helpers/subgraphQuery";
import { getISO8601DateString } from "./helpers/date";
import { getLatestFinishDate, sendPubSubMessage } from "./helpers/pubsub";
import { getFinalDate, getRecords, getRecordsFetchStartDate } from "./records";
import { getEarliestTransactionDateStart } from "./subgraph";

const SUBGRAPH_URL = "https://api.studio.thegraph.com/query/28103/token-holders/0.0.40";

Expand Down Expand Up @@ -35,64 +36,56 @@ export const handler = async (
fetch,
});

const schema = await generateJSONSchema("TokenHolderTransaction", "./src/graphql/generated.ts");

console.log(`query = ${generateQuery(schema, "TokenHolderTransaction")}`);

console.log(`bq = ${await getBigQuerySchema(schema)}`);

return;

// // Get the earliest date in the subgraph
// const subgraphEarliestDate: Date = await getEarliestTransactionDateStart(client);
// // Final date in the subgraph
// const subgraphFinalDate: Date = finalDateOverride ? new Date(finalDateOverride) : await getFinalDate(client);
// // Date up to which records have been cached
// const recordsFetchedUpToDate: Date | null = await getRecordsFetchStartDate(storagePrefix, bucketName);

// const getFetchStartDate = async (): Promise<Date> => {
// // Check for PubSub messages
// const pubSubFinishDate: Date | null = await getLatestFinishDate(pubSubSubscriptionId);

// // If there is a finishDate in the PubSub messages, then we proceed from there
// // This is because there may have been a refresh of records
// if (pubSubFinishDate) {
// console.log(
// `getFetchStartDate: Using finish date from PubSub message queue: ${getISO8601DateString(pubSubFinishDate)}`,
// );
// return pubSubFinishDate;
// }

// // If records exist, continue from where we left off
// if (recordsFetchedUpToDate) {
// console.log(
// `getFetchStartDate: Using latest cached records date: ${getISO8601DateString(recordsFetchedUpToDate)}`,
// );
// return recordsFetchedUpToDate;
// }
// Get the earliest date in the subgraph
const subgraphEarliestDate: Date = await getEarliestTransactionDateStart(client);
// Final date in the subgraph
const subgraphFinalDate: Date = finalDateOverride ? new Date(finalDateOverride) : await getFinalDate(client);
// Date up to which records have been cached
const recordsFetchedUpToDate: Date | null = await getRecordsFetchStartDate(storagePrefix, bucketName);

const getFetchStartDate = async (): Promise<Date> => {
// Check for PubSub messages
const pubSubFinishDate: Date | null = await getLatestFinishDate(pubSubSubscriptionId);

// If there is a finishDate in the PubSub messages, then we proceed from there
// This is because there may have been a refresh of records
if (pubSubFinishDate) {
console.log(
`getFetchStartDate: Using finish date from PubSub message queue: ${getISO8601DateString(pubSubFinishDate)}`,
);
return pubSubFinishDate;
}

// // Otherwise proceed from the start of the subgraph
// console.log(`getFetchStartDate: Using subgraph start date: ${getISO8601DateString(subgraphEarliestDate)}`);
// return subgraphEarliestDate;
// };
// If records exist, continue from where we left off
if (recordsFetchedUpToDate) {
console.log(
`getFetchStartDate: Using latest cached records date: ${getISO8601DateString(recordsFetchedUpToDate)}`,
);
return recordsFetchedUpToDate;
}

// const startDate = await getFetchStartDate();
// console.log(`Transactions will be fetched from ${startDate.toISOString()}`);
// Otherwise proceed from the start of the subgraph
console.log(`getFetchStartDate: Using subgraph start date: ${getISO8601DateString(subgraphEarliestDate)}`);
return subgraphEarliestDate;
};

// // Get and write records
// const fetchedUpTo: Date = await getRecords(
// client,
// storagePrefix,
// bucketName,
// startDate,
// subgraphFinalDate,
// shouldTerminate,
// );
const startDate = await getFetchStartDate();
console.log(`Transactions will be fetched from ${startDate.toISOString()}`);

// Get and write records
const fetchedUpTo: Date = await getRecords(
client,
storagePrefix,
bucketName,
startDate,
subgraphFinalDate,
shouldTerminate,
);

// /**
// * Publish the start and finish dates for what was fetched.
// */
// await sendPubSubMessage(pubSubTopic, startDate, fetchedUpTo);
/**
* Publish the start and finish dates for what was fetched.
*/
await sendPubSubMessage(pubSubTopic, startDate, fetchedUpTo);
};

// Run locally using `yarn execute`. Inputs may need to be changed if re-deployments occur.
Expand Down

0 comments on commit dc1c74a

Please sign in to comment.