Skip to content

Commit

Permalink
feat: add data warehouse settings (#548)
Browse files Browse the repository at this point in the history
  • Loading branch information
hughcrt authored Sep 11, 2024
1 parent be54057 commit 8ed495a
Show file tree
Hide file tree
Showing 17 changed files with 1,170 additions and 19 deletions.
593 changes: 592 additions & 1 deletion package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
"dependencies": {
"@authenio/samlify-node-xmllint": "^2.0.0",
"@aws-sdk/client-comprehend": "^3.645.0",
"@google-cloud/bigquery": "^7.9.0",
"@google-cloud/datastream": "^3.2.0",
"@json2csv/plainjs": "^7.0.6",
"@koa/cors": "^5.0.0",
"@sentry/node": "^7.99.0",
Expand Down
36 changes: 36 additions & 0 deletions packages/backend/src/api/v1/data-warehouse/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import Context from "@/src/utils/koa";
import Router from "koa-router";
import { z } from "zod";
import { createNewDatastream } from "./utils";
import sql from "@/src/utils/db";

const dataWarehouse = new Router({
prefix: "/data-warehouse",
});

dataWarehouse.get("/bigquery", async (ctx: Context) => {
const { projectId } = ctx.state;
const [connector] =
await sql`select * from _data_warehouse_connector where project_id = ${projectId}`;

console.log(connector);
ctx.body = connector;
});

dataWarehouse.post("/bigquery", async (ctx: Context) => {
const bodySchema = z.object({
apiKey: z.string().transform((apiKey) => JSON.parse(apiKey)),
});

// TODO: validate apiKey first with Zod

const { apiKey } = bodySchema.parse(ctx.request.body);

await createNewDatastream(apiKey, process.env.DATABASE_URL!, ctx);

ctx.body = {};
});

dataWarehouse.patch("/bigquery", async (ctx: Context) => {});

export default dataWarehouse;
247 changes: 247 additions & 0 deletions packages/backend/src/api/v1/data-warehouse/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
import { protos, DatastreamClient } from "@google-cloud/datastream";
import { parse as parseUrl } from "url";
import fs from "fs";
import Context from "@/src/utils/koa";
import sql from "@/src/utils/db";
import { BigQuery } from "@google-cloud/bigquery";

type ConnectionProfile = protos.google.cloud.datastream.v1.IConnectionProfile;

interface ParsedPostgresUri {
hostname: string;
port: number;
username: string;
password: string;
database: string;
}

function parsePostgresUri(uri: string): ParsedPostgresUri {
const parsed = parseUrl(uri);

if (!parsed.hostname || !parsed.auth || !parsed.pathname) {
throw new Error("Invalid PostgreSQL connection URI");
}

const [username, password] = parsed.auth.split(":");

if (!username || !password) {
throw new Error("Username or password is missing from the URI");
}

return {
hostname: parsed.hostname,
port: parseInt(parsed.port, 10),
username,
password,
database: parsed.pathname.slice(1),
};
}

function createBigQueryConnectionProfile(
displayName: string,
): ConnectionProfile {
const connectionProfile: ConnectionProfile = {
displayName: displayName,
bigqueryProfile: {},
staticServiceIpConnectivity: {},
};

return connectionProfile;
}

type PostgresqlSourceConfig =
protos.google.cloud.datastream.v1.IPostgresqlSourceConfig;

type BigQueryDestinationConfig =
protos.google.cloud.datastream.v1.IBigQueryDestinationConfig;

type Stream = protos.google.cloud.datastream.v1.IStream;

const location = "us-east1";

export async function createNewDatastream(
apiKey: string,
postgresURI: string,
ctx: Context,
) {
const projectId = apiKey.project_id as string;

const datastreamClient = new DatastreamClient({
credentials: apiKey,
projectId,
});

const parsedUri = parsePostgresUri(postgresURI);

const postgresConnectionProfile: ConnectionProfile = {
displayName: "Lunary Data Warehouse Source",
postgresqlProfile: {
hostname: parsedUri.hostname,
port: parsedUri.port || 5432,
username: parsedUri.username,
password: parsedUri.password,
database: parsedUri.database,
},
staticServiceIpConnectivity: {},
};

const request1 = {
parent: datastreamClient.locationPath(projectId, location),
connectionProfileId: "lunary-data-warehouse-source",
connectionProfile: postgresConnectionProfile,
};

try {
await sql`revoke all privileges on all tables in schema public from lunary_bigquery_connector`;
await sql`revoke all privileges on schema public from lunary_bigquery_connector`;
await sql`revoke all privileges on all sequences in schema public from lunary_bigquery_connector`;
await sql`revoke all privileges on all functions in schema public from lunary_bigquery_connector`;
await sql`alter default privileges in schema public revoke all on tables from lunary_bigquery_connector`;
await sql`alter default privileges in schema public revoke all on sequences from lunary_bigquery_connector`;
await sql`alter default privileges in schema public revoke all on functions from lunary_bigquery_connector`;
await sql`drop publication if exists lunary_bigquery_connector`;
await sql`select pg_drop_replication_slot('lunary_bigquery_connector')`.catch(
console.error,
);
await sql`drop user if exists lunary_bigquery_connector`;

await sql`create publication lunary_bigquery_connector for all tables`;
await sql`select pg_create_logical_replication_slot('lunary_bigquery_connector', 'pgoutput')`;
await sql.unsafe(
`create user lunary_bigquery_connector with encrypted password '${process.env.JWT_SECRET}'`,
);
await sql`grant select on all tables in schema public to lunary_bigquery_connector`;
await sql`grant usage on schema public to lunary_bigquery_connector`;
await sql`alter default privileges in schema public grant select on tables to lunary_bigquery_connector`;
await sql`grant rds_replication to lunary_bigquery_connector`.catch(
() => {},
);
} catch (error) {
console.error(error);
ctx.throw(
500,
"Could not configure the PostgreSQL source database. Have you read the tutorial at https://lunary.ai/docs/enterprise/bigquery#setup-postgtresl-source",
);
}

await sql`grant RDS_REPLICATION to lunary_bigquery_connector`.catch(() => {});

// TODO: delete connections and stream if they already exist

try {
const [operation1] =
await datastreamClient.createConnectionProfile(request1);
const [response1] = await operation1.promise();
} catch (error) {
if (error.code === 6) {
console.info("Source Connection already exists. Skipping.");
} else {
throw error;
}
}

const bigQueryConnectionProfile = createBigQueryConnectionProfile(
"Lunary Data Warehouse Destination",
);

const request2 = {
parent: datastreamClient.locationPath(apiKey.project_id, location),
connectionProfileId: "lunary-data-warehouse-destination",
connectionProfile: bigQueryConnectionProfile,
};

try {
const [operation2] =
await datastreamClient.createConnectionProfile(request2);
const [response2] = await operation2.promise();
} catch (error) {
if (error.code === 6) {
console.info("Destination Connection already exists. Skipping.");
} else {
throw error;
}
}

const postgresSourceConfig: PostgresqlSourceConfig = {
includeObjects: {
postgresqlSchemas: [{ schema: "public" }],
},
replicationSlot: "lunary_bigquery_connector",
publication: "lunary_bigquery_connector",
};

const bigqueryDestinationConfig: BigQueryDestinationConfig = {
dataFreshness: { seconds: 300 },
singleTargetDataset: {
datasetId: `${projectId}:lunary`,
},
};

const bigquery = new BigQuery({
credentials: apiKey,
projectId,
});

try {
await bigquery.createDataset("lunary", {
location: "US",
});
} catch (error) {
if (error.code === 403) {
ctx.throw(500, "You do not have the suh");
}
console.error(error);
console.info("Dataset already exist. Skipping");
}

const streamConfig: Stream = {
sourceConfig: {
sourceConnectionProfile: `projects/${projectId}/locations/${location}/connectionProfiles/lunary-data-warehouse-source`,
postgresqlSourceConfig: postgresSourceConfig,
},
destinationConfig: {
destinationConnectionProfile: `projects/${projectId}/locations/${location}/connectionProfiles/lunary-data-warehouse-destination`,
bigqueryDestinationConfig: bigqueryDestinationConfig,
},
displayName: `PostgreSQL to BigQuery Stream`,
backfillAll: {},
};

const request = {
parent: `projects/${projectId}/locations/${location}`,
streamId: "lunary-data-warehouse-stream",
stream: streamConfig,
};

const [operation] = await datastreamClient.createStream(request);
console.log(`Stream creation initiated. Operation name: ${operation.name}`);

const [response] = await operation.promise();
console.log("Stream created successfully:", response);

if (typeof operation.result?.name !== "string") {
throw new Error("Stream creation failed:", response);
}

const updateStreamRequest = {
stream: {
name: operation.result.name,
state: protos.google.cloud.datastream.v1.Stream.State.RUNNING,
},
updateMask: {
paths: ["state"],
},
};

const [updateOperation] =
await datastreamClient.updateStream(updateStreamRequest);
console.log(
`Stream update initiated. Operation name: ${updateOperation.name}`,
);
await sql`
insert into _data_warehouse_connector (project_id, type, status)
values (${ctx.state.projectId}, 'BigQuery', 'created')
on conflict (project_id)
do update set type = 'BigQuery', status = 'created', updated_at = now()
`;
}
2 changes: 2 additions & 0 deletions packages/backend/src/api/v1/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import evaluators from "./evaluator";
import analytics from "./analytics";
import views from "./views";
import models from "./models";
import dataWarehouse from "./data-warehouse";

const v1 = new Router({
prefix: "/v1",
Expand Down Expand Up @@ -50,5 +51,6 @@ v1.use(checklists.routes());
v1.use(analytics.routes());
v1.use(views.routes());
v1.use(models.routes());
v1.use(dataWarehouse.routes());

export default v1;
10 changes: 10 additions & 0 deletions packages/db/0045.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
create table _data_warehouse_connector(
id uuid default uuid_generate_v4() primary key,
created_at timestamp with time zone default now(),
updated_at timestamp with time zone default now(),
project_id uuid not null,
type text,
status text,
constraint fk_checklist_project_id foreign key (project_id) references project(id) on delete cascade
);
create unique index on _data_warehouse_connector(project_id);
1 change: 1 addition & 0 deletions packages/db/0046.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table api_key add primary key (id);
Loading

0 comments on commit 8ed495a

Please sign in to comment.