diff --git a/.gitignore b/.gitignore index 6704566..356de79 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,5 @@ dist # TernJS port file .tern-port + +tmp/ \ No newline at end of file diff --git a/package.json b/package.json index 05be689..f4652bd 100644 --- a/package.json +++ b/package.json @@ -4,6 +4,7 @@ "scripts": { "test": "jest", "codegen": "graphql-codegen", + "codegen:bq": "ts-node src/bigquery.ts", "execute": "ts-node src/index.ts", "lint": "eslint --config ./.eslintrc.js ./src/ --ext .jsx,.js,.tsx,.ts", "lint:fix": "prettier --write ./src/ & yarn lint --fix" @@ -47,4 +48,4 @@ "graphql-tag": "^2.12.6", "jsonl-parse-stringify": "^1.0.3" } -} +} \ No newline at end of file diff --git a/src/bigquery.ts b/src/bigquery.ts new file mode 100644 index 0000000..5e891c4 --- /dev/null +++ b/src/bigquery.ts @@ -0,0 +1,19 @@ +import { getBigQuerySchema } from "./helpers/bigquerySchema"; +import { writeFile } from "./helpers/fs"; +import { generateJSONSchema } from "./helpers/jsonSchema"; + +const writeBigQuerySchema = async (object: string, typesFilePath: string): Promise => { + const schema = await generateJSONSchema(object, typesFilePath); + + const bqSchema = await getBigQuerySchema(schema); + writeFile(`./tmp/${object}_schema.json`, bqSchema); +}; + +// Runs via CLI +if (require.main === module) { + if (process.argv.length !== 4) { + throw new Error(`2 arguments are required, but the app received: ${JSON.stringify(process.argv.slice(2))}`); + } + + writeBigQuerySchema(process.argv[2], process.argv[3]); +} diff --git a/src/helpers/fs.ts b/src/helpers/fs.ts index 7e2d6c9..bfdbe3f 100644 --- a/src/helpers/fs.ts +++ b/src/helpers/fs.ts @@ -1,3 +1,5 @@ +import { existsSync, mkdirSync, writeFileSync } from "fs"; +import { dirname } from "path"; import path = require("path"); export const extractPartitionKey = (filePath: string): string => { @@ -8,3 +10,11 @@ export const extractPartitionKey = (filePath: string): string => { // Split into ["token-balances/dt=", "2021-01-01"] return directoryPath.split("dt=")[1]; }; + +export const writeFile = (filePath: string, content: string): void => { + if (!existsSync(dirname(filePath))) { + mkdirSync(dirname(filePath)); + } + + writeFileSync(filePath, content); +}; diff --git a/src/index.ts b/src/index.ts index 7fc0ce8..aa006de 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,9 +1,10 @@ import { createClient } from "@urql/core"; import fetch from "cross-fetch"; -import { getBigQuerySchema } from "./helpers/bigquerySchema"; -import { generateJSONSchema } from "./helpers/jsonSchema"; -import { generateQuery } from "./helpers/subgraphQuery"; +import { getISO8601DateString } from "./helpers/date"; +import { getLatestFinishDate, sendPubSubMessage } from "./helpers/pubsub"; +import { getFinalDate, getRecords, getRecordsFetchStartDate } from "./records"; +import { getEarliestTransactionDateStart } from "./subgraph"; const SUBGRAPH_URL = "https://api.studio.thegraph.com/query/28103/token-holders/0.0.40"; @@ -35,64 +36,56 @@ export const handler = async ( fetch, }); - const schema = await generateJSONSchema("TokenHolderTransaction", "./src/graphql/generated.ts"); - - console.log(`query = ${generateQuery(schema, "TokenHolderTransaction")}`); - - console.log(`bq = ${await getBigQuerySchema(schema)}`); - - return; - - // // Get the earliest date in the subgraph - // const subgraphEarliestDate: Date = await getEarliestTransactionDateStart(client); - // // Final date in the subgraph - // const subgraphFinalDate: Date = finalDateOverride ? new Date(finalDateOverride) : await getFinalDate(client); - // // Date up to which records have been cached - // const recordsFetchedUpToDate: Date | null = await getRecordsFetchStartDate(storagePrefix, bucketName); - - // const getFetchStartDate = async (): Promise => { - // // Check for PubSub messages - // const pubSubFinishDate: Date | null = await getLatestFinishDate(pubSubSubscriptionId); - - // // If there is a finishDate in the PubSub messages, then we proceed from there - // // This is because there may have been a refresh of records - // if (pubSubFinishDate) { - // console.log( - // `getFetchStartDate: Using finish date from PubSub message queue: ${getISO8601DateString(pubSubFinishDate)}`, - // ); - // return pubSubFinishDate; - // } - - // // If records exist, continue from where we left off - // if (recordsFetchedUpToDate) { - // console.log( - // `getFetchStartDate: Using latest cached records date: ${getISO8601DateString(recordsFetchedUpToDate)}`, - // ); - // return recordsFetchedUpToDate; - // } + // Get the earliest date in the subgraph + const subgraphEarliestDate: Date = await getEarliestTransactionDateStart(client); + // Final date in the subgraph + const subgraphFinalDate: Date = finalDateOverride ? new Date(finalDateOverride) : await getFinalDate(client); + // Date up to which records have been cached + const recordsFetchedUpToDate: Date | null = await getRecordsFetchStartDate(storagePrefix, bucketName); + + const getFetchStartDate = async (): Promise => { + // Check for PubSub messages + const pubSubFinishDate: Date | null = await getLatestFinishDate(pubSubSubscriptionId); + + // If there is a finishDate in the PubSub messages, then we proceed from there + // This is because there may have been a refresh of records + if (pubSubFinishDate) { + console.log( + `getFetchStartDate: Using finish date from PubSub message queue: ${getISO8601DateString(pubSubFinishDate)}`, + ); + return pubSubFinishDate; + } - // // Otherwise proceed from the start of the subgraph - // console.log(`getFetchStartDate: Using subgraph start date: ${getISO8601DateString(subgraphEarliestDate)}`); - // return subgraphEarliestDate; - // }; + // If records exist, continue from where we left off + if (recordsFetchedUpToDate) { + console.log( + `getFetchStartDate: Using latest cached records date: ${getISO8601DateString(recordsFetchedUpToDate)}`, + ); + return recordsFetchedUpToDate; + } - // const startDate = await getFetchStartDate(); - // console.log(`Transactions will be fetched from ${startDate.toISOString()}`); + // Otherwise proceed from the start of the subgraph + console.log(`getFetchStartDate: Using subgraph start date: ${getISO8601DateString(subgraphEarliestDate)}`); + return subgraphEarliestDate; + }; - // // Get and write records - // const fetchedUpTo: Date = await getRecords( - // client, - // storagePrefix, - // bucketName, - // startDate, - // subgraphFinalDate, - // shouldTerminate, - // ); + const startDate = await getFetchStartDate(); + console.log(`Transactions will be fetched from ${startDate.toISOString()}`); + + // Get and write records + const fetchedUpTo: Date = await getRecords( + client, + storagePrefix, + bucketName, + startDate, + subgraphFinalDate, + shouldTerminate, + ); - // /** - // * Publish the start and finish dates for what was fetched. - // */ - // await sendPubSubMessage(pubSubTopic, startDate, fetchedUpTo); + /** + * Publish the start and finish dates for what was fetched. + */ + await sendPubSubMessage(pubSubTopic, startDate, fetchedUpTo); }; // Run locally using `yarn execute`. Inputs may need to be changed if re-deployments occur.