From 4f51a13fcbfd61f67231d0764117cb7c050c4109 Mon Sep 17 00:00:00 2001 From: Akshaya Acharya Date: Thu, 16 Jan 2025 20:30:34 +0530 Subject: [PATCH 1/3] Multitenant data connector sdk --- ndc-duckduckapi/src/duckduckapi.ts | 155 +++++++++++++++---------- ndc-duckduckapi/src/generate-config.ts | 2 +- ndc-duckduckapi/src/handlers/query.ts | 46 +++++--- ndc-duckduckapi/src/sdk.ts | 6 + 4 files changed, 129 insertions(+), 80 deletions(-) diff --git a/ndc-duckduckapi/src/duckduckapi.ts b/ndc-duckduckapi/src/duckduckapi.ts index 41ab5d2..a97d0e1 100644 --- a/ndc-duckduckapi/src/duckduckapi.ts +++ b/ndc-duckduckapi/src/duckduckapi.ts @@ -26,22 +26,46 @@ import { Connection, Database } from "duckdb-async"; import fs from "fs-extra"; import path from "path"; -// Make a connection manager +let DATABASE_SCHEMA = ""; + +// Single tenant const DUCKDB_URL = "duck.db"; let db: Database; export async function getDB() { if (!db) { - const duckDBUrl = (process.env["DUCKDB_URL"] as string) ?? DUCKDB_URL; + const dbUrl = (process.env["DUCKDB_URL"] as string) ?? DUCKDB_URL; + db = await openDatabaseFile(dbUrl); + } + return db; +} - const dirPath = path.dirname(duckDBUrl); - if (dirPath !== ".") { - await fs.ensureDir(dirPath); - } +// Multi tenant +export interface Tenant { + tenantId: string; + tenantToken: string | null; + db: Database; + syncState: string; +} + +export type TenantToken = string; - db = await Database.create(duckDBUrl); - console.log("Database file at", duckDBUrl); +export const tenants = new Map(); + +export function getTenantById(tenantId: string): Tenant | null { + for (let [_, tenant] of tenants.entries()) { + if (tenant.tenantId === tenantId) { + return tenant; + } } - return db; + return null; +} + +export async function getTenantDB(tenantId: string) { + const tenantDb = getTenantById(tenantId)?.db; + if (tenantDb) return tenantDb; + + const dbUrl = `duck-${tenantId}.db`; + return await openDatabaseFile(dbUrl); } export async function transaction( @@ -66,28 +90,6 @@ process.on("SIGINT", async () => { process.exit(0); }); -// // Example usage: -// const connectionManager = new DuckDBConnectionManager('mydb.db', 3); -// -// // Async usage -// async function example() { -// // Each operation gets its own connection -// const result1 = await connectionManager.withConnection(async (conn) => { -// return await conn.all('SELECT * FROM mytable'); -// }); -// -// const result2 = await connectionManager.withConnection(async (conn) => { -// return await conn.run('INSERT INTO mytable VALUES (?)'); -// }); -// } -// -// // Sync usage -// function exampleSync() { -// const result = connectionManager.withConnectionSync((conn) => { -// return conn.prepare('SELECT * FROM mytable').all(); -// }); -// } - export type DuckDBConfigurationSchema = { collection_names: string[]; collection_aliases: { [k: string]: string }; @@ -96,48 +98,30 @@ export type DuckDBConfigurationSchema = { procedures: ProcedureInfo[]; }; -type CredentialSchema = { - url: string; -}; - export type Configuration = lambdaSdk.Configuration & { duckdbConfig: DuckDBConfigurationSchema; }; -export type State = lambdaSdk.State & { - client: Database; -}; - -async function createDuckDBFile(schema: string): Promise { - try { - const db = await getDB(); - await db.run(schema); - console.log("Schema created successfully"); - } catch (err) { - console.error("Error creating schema:", err); - throw err; - } -} +export type State = lambdaSdk.State; export interface duckduckapi { dbSchema: string; functionsFilePath: string; + multitenantMode?: boolean; + oauthProviderName?: string; } export async function makeConnector( dda: duckduckapi ): Promise> { + DATABASE_SCHEMA = dda.dbSchema; + db = await getDB(); const lambdaSdkConnector = lambdaSdk.createConnector({ functionsFilePath: dda.functionsFilePath, }); - /** - * Create the db and load the DB path as a global variable - */ - await createDuckDBFile(dda.dbSchema); - const connector: Connector = { /** * Validate the configuration files provided by the user, returning a validated 'Configuration', @@ -151,8 +135,6 @@ export async function makeConnector( // Load DuckDB configuration by instrospecting DuckDB const duckdbConfig = await generateConfig(db); - console.log("#####", dda.functionsFilePath); - const config = await lambdaSdkConnector.parseConfiguration( configurationDir ); @@ -178,11 +160,7 @@ export async function makeConnector( configuration: Configuration, metrics: Registry ): Promise { - const state = await lambdaSdkConnector.tryInitState( - configuration, - metrics - ); - return Promise.resolve({ ...state, client: db }); + return lambdaSdkConnector.tryInitState(configuration, metrics); }, /** @@ -258,8 +236,10 @@ export async function makeConnector( if (configuration.functionsSchema.functions[request.collection]) { return lambdaSdkConnector.query(configuration, state, request); } else { + const db = selectTenantDatabase(dda, request?.arguments?.headers); + let query_plans = await plan_queries(configuration, request); - return await perform_query(state, query_plans); + return await perform_query(db, query_plans); } }, @@ -315,8 +295,13 @@ export async function makeConnector( export function getOAuthCredentialsFromHeader( headers: JSONValue ): Record { + if (!headers) { + console.log("Engine header forwarding is disabled"); + throw new Error("Engine header forwarding is disabled"); + } + const oauthServices = headers.value as any; - console.log(oauthServices); + try { const decodedServices = Buffer.from( oauthServices["x-hasura-oauth-services"] as string, @@ -332,3 +317,47 @@ export function getOAuthCredentialsFromHeader( throw error; } } + +function selectTenantDatabase(dda: duckduckapi, headers: any): Database { + if (!dda.multitenantMode) { + return db; + } + + const token = + getOAuthCredentialsFromHeader(headers)?.[dda.oauthProviderName!] + ?.access_token; + + const tenantDb = tenants.get(token)?.db; + + if (!tenantDb) { + throw new Forbidden("Tenant not found", {}); + } + + return tenantDb; +} + +async function openDatabaseFile(dbUrl: string): Promise { + const { dbPath, dirPath } = getDatabaseFileParts(dbUrl); + + if (dirPath !== ".") { + await fs.ensureDir(dirPath); + } + + const db = await Database.create(dbPath); + await db.run(DATABASE_SCHEMA); + + console.log("Created database file at", dbPath); + + return db; +} + +function getDatabaseFileParts(dbUrl: string) { + const dbPath = path.resolve( + (process.env["DUCKDB_PATH"] as string) ?? ".", + dbUrl + ); + + const dirPath = path.dirname(dbPath); + + return { dbPath, dirPath }; +} diff --git a/ndc-duckduckapi/src/generate-config.ts b/ndc-duckduckapi/src/generate-config.ts index a78766c..0878a88 100644 --- a/ndc-duckduckapi/src/generate-config.ts +++ b/ndc-duckduckapi/src/generate-config.ts @@ -119,7 +119,7 @@ export async function generateConfig( for (let table of tables) { const tableName = table.name; - const aliasName = `${table.database}.${table.schema}.${table.name}`; + const aliasName = `${table.schema}.${table.name}`; tableNames.push(tableName); tableAliases[tableName] = aliasName; diff --git a/ndc-duckduckapi/src/handlers/query.ts b/ndc-duckduckapi/src/handlers/query.ts index 8746a22..573112a 100644 --- a/ndc-duckduckapi/src/handlers/query.ts +++ b/ndc-duckduckapi/src/handlers/query.ts @@ -8,9 +8,10 @@ import { Conflict, Relationship, } from "@hasura/ndc-sdk-typescript"; -import { Configuration, State } from "../duckduckapi"; +import { Configuration } from "../duckduckapi"; const SqlString = require("sqlstring-sqlite"); import { MAX_32_INT } from "../constants"; +import { Database } from "duckdb-async"; const escape_single = (s: any) => SqlString.escape(s); const escape_double = (s: any) => `"${SqlString.escape(s).slice(1, -1)}"`; @@ -89,36 +90,41 @@ function wrap_rows(s: string): string { function isTimestampType(field_def: any): boolean { if (!field_def) return false; - + function checkType(type: any): boolean { if (type.type === "nullable") { return checkType(type.underlying_type); } return type.type === "named" && type.name === "Timestamp"; } - + return checkType(field_def.type); } function getIntegerType(field_def: any): string | null { if (!field_def) return null; - + function checkType(type: any): string | null { if (type.type === "nullable") { return checkType(type.underlying_type); } if (type.type === "named") { switch (type.name) { - case "BigInt": return "BIGINT"; - case "UBigInt": return "UBIGINT"; - case "HugeInt": return "HUGEINT"; - case "UHugeInt": return "UHUGEINT"; - default: return null; + case "BigInt": + return "BIGINT"; + case "UBigInt": + return "UBIGINT"; + case "HugeInt": + return "HUGEINT"; + case "UHugeInt": + return "UHUGEINT"; + default: + return null; } } return null; } - + return checkType(field_def.type); } @@ -153,7 +159,8 @@ function build_where( } break; case "binary_comparison_operator": - const object_type = config.duckdbConfig?.object_types[query_request.collection]; + const object_type = + config.duckdbConfig?.object_types[query_request.collection]; const field_def = object_type?.fields[expression.column.name]; const isTimestamp = isTimestampType(field_def); const integerType = getIntegerType(field_def); @@ -309,11 +316,17 @@ function build_where( return sql; } -function getColumnExpression(field_def: any, collection_alias: string, column: string): string { +function getColumnExpression( + field_def: any, + collection_alias: string, + column: string +): string { // Helper function to handle the actual type function handleNamedType(type: any): string { if (type.name === "BigInt") { - return `CAST(${escape_double(collection_alias)}.${escape_double(column)} AS TEXT)`; + return `CAST(${escape_double(collection_alias)}.${escape_double( + column + )} AS TEXT)`; } return `${escape_double(collection_alias)}.${escape_double(column)}`; } @@ -450,7 +463,8 @@ function build_query( collect_rows.push(escape_single(field_name)); switch (field_value.type) { case "column": - const object_type = config.duckdbConfig.object_types[query_request.collection]; + const object_type = + config.duckdbConfig.object_types[query_request.collection]; let field_def = object_type.fields[field_value.column]; collect_rows.push( getColumnExpression(field_def, collection_alias, field_value.column) @@ -679,13 +693,13 @@ async function do_all(con: any, query: SQLQuery): Promise { } export async function perform_query( - state: State, + db: Database, query_plans: SQLQuery[] ): Promise { const response: RowSet[] = []; for (let query_plan of query_plans) { try { - const connection = await state.client.connect(); + const connection = await db.connect(); let row_set: RowSet = { rows: [] }; // Handle aggregate query if present diff --git a/ndc-duckduckapi/src/sdk.ts b/ndc-duckduckapi/src/sdk.ts index 92ccfe5..8335e35 100644 --- a/ndc-duckduckapi/src/sdk.ts +++ b/ndc-duckduckapi/src/sdk.ts @@ -1,8 +1,14 @@ export { start } from "@hasura/ndc-sdk-typescript"; +export { Connection, Database } from "duckdb-async"; export { makeConnector, duckduckapi, getDB, transaction, getOAuthCredentialsFromHeader, + getTenantDB, + tenants, + getTenantById, + Tenant, + TenantToken, } from "./duckduckapi"; From 5ef7a87d0bf4b6bef676a407c1f716fdafc8c399 Mon Sep 17 00:00:00 2001 From: Akshaya Acharya Date: Thu, 30 Jan 2025 10:00:09 +0530 Subject: [PATCH 2/3] Update multitenant sdk --- ndc-duckduckapi/src/duckduckapi.ts | 6 +++++- ndc-duckduckapi/src/sdk.ts | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/ndc-duckduckapi/src/duckduckapi.ts b/ndc-duckduckapi/src/duckduckapi.ts index a97d0e1..69ef146 100644 --- a/ndc-duckduckapi/src/duckduckapi.ts +++ b/ndc-duckduckapi/src/duckduckapi.ts @@ -49,7 +49,11 @@ export interface Tenant { export type TenantToken = string; -export const tenants = new Map(); +const tenants = new Map(); + +export function getTenants() { + return tenants; +} export function getTenantById(tenantId: string): Tenant | null { for (let [_, tenant] of tenants.entries()) { diff --git a/ndc-duckduckapi/src/sdk.ts b/ndc-duckduckapi/src/sdk.ts index 8335e35..0327b96 100644 --- a/ndc-duckduckapi/src/sdk.ts +++ b/ndc-duckduckapi/src/sdk.ts @@ -6,9 +6,9 @@ export { getDB, transaction, getOAuthCredentialsFromHeader, - getTenantDB, - tenants, + getTenants, getTenantById, + getTenantDB, Tenant, TenantToken, } from "./duckduckapi"; From e584af3dbb0db496becb848f8a9edd1a394199e2 Mon Sep 17 00:00:00 2001 From: Akshaya Acharya Date: Thu, 30 Jan 2025 10:17:02 +0530 Subject: [PATCH 3/3] Release version 0.6.0 --- README.md | 28 ++++++++++++++++++++++++++++ ndc-duckduckapi/package.json | 2 +- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 59c3f15..0382c0c 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,34 @@ To test, run the ts connector and refresh the supergraph project (step 3 onwards _TODO:_ +## Single-tenant support + +```typescript +const DATABASE_SCHEMA = "create table if not exists foo( ... )"; + +const connectorConfig: duckduckapi = { + dbSchema: DATABASE_SCHEMA, + functionsFilePath: path.resolve(__dirname, "./functions.ts"), +}; +``` + +## Multi-tenant support + +```typescript +const connectorConfig: duckduckapi = { + dbSchema: DATABASE_SCHEMA, + functionsFilePath: path.resolve(__dirname, "./functions.ts"), + multitenantMode: true, + oauthProviderName: "zendesk", +}; +``` + +A `Tenant` is identified by the tenantToken in the `oauthProviderName` key of the `x-hasura-oauth-services` header forwarded by the engine. + +A `Tenant` has a unique `tenantId` and an isolated duckdb database. Multiple tenantTokens can map to the same `Tenant` over multiple logins. + +The [Zendesk data connector](https://github.com/hasura/zendesk-data-connector) is an example of a multi-tenant data connector. + ## Duck DB Features Below, you'll find a matrix of all supported features for the DuckDB connector: diff --git a/ndc-duckduckapi/package.json b/ndc-duckduckapi/package.json index 725f537..903807f 100644 --- a/ndc-duckduckapi/package.json +++ b/ndc-duckduckapi/package.json @@ -1,6 +1,6 @@ { "name": "@hasura/ndc-duckduckapi", - "version": "0.5.3", + "version": "0.6.0", "description": "SDK for the Hasura DDN DuckDuckAPI connector. Easily build a data API from any existing API by ETLing data into DuckDB.", "author": "Hasura", "license": "Apache-2.0",