Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Neon database client migration #251

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
deb66ff
fixing types
pellicceama Feb 4, 2025
59adc8e
migrating neon, tests WIP
pellicceama Feb 4, 2025
7474c94
Merge branch 'main' of github.com:openintegrations/openint into neon-…
pellicceama Feb 4, 2025
f5db69e
adding neon proxy
pellicceama Feb 4, 2025
c50281c
updating upsert tests
pellicceama Feb 4, 2025
0b5d73c
removing transaction support
pellicceama Feb 4, 2025
3bce91c
moving localhost to using local proxy
pellicceama Feb 4, 2025
971953a
removing pool init
pellicceama Feb 4, 2025
efa7b8b
removing dup
pellicceama Feb 4, 2025
ace6a4b
readding
pellicceama Feb 4, 2025
f9ba48a
migrating action
pellicceama Feb 4, 2025
0b8b850
fixing stably workflows
pellicceama Feb 4, 2025
319e7b9
moving to docker db
pellicceama Feb 4, 2025
7ca1464
update
pellicceama Feb 4, 2025
d444b41
fixing e2e
pellicceama Feb 4, 2025
13c3daa
adding missing config
pellicceama Feb 5, 2025
1aa3ea7
expanding tests for cases where db is not created from scratch in dev…
pellicceama Feb 5, 2025
e0c5f7f
tweaks
pellicceama Feb 5, 2025
f867960
adding debug statement
pellicceama Feb 5, 2025
85c5491
adding backup files
pellicceama Feb 5, 2025
9ea7634
adding priv/public key variables
pellicceama Feb 26, 2025
253cab9
adding support for private key signing of jwts
pellicceama Feb 26, 2025
220bd21
removing unecessary WIP previous approaches files
pellicceama Feb 26, 2025
087f97d
migrating package db and postgres meta service to new neon serverless
pellicceama Feb 26, 2025
712073d
merging main
pellicceama Feb 26, 2025
a5d40b3
adding migration script and spec, not tested
pellicceama Feb 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions apps/web/__tests__/end-to-end.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import plaidSdkDef, {initPlaidSDK} from '@opensdks/sdk-plaid'
import {drizzle, eq, schema, sql} from '@openint/db'
import {drizzle, eq, neon, schema, sql} from '@openint/db'
import {createAppTrpcClient} from '@openint/engine-frontend/lib/trpcClient'
import {env, testEnv, testEnvRequired} from '@openint/env'
import {initOpenIntSDK} from '@openint/sdk'
Expand All @@ -13,15 +13,15 @@ let sdk: ReturnType<typeof initOpenIntSDK>

let trpc: ReturnType<typeof createAppTrpcClient>

const db = drizzle(env.DATABASE_URL, {logger: true, schema})
const db = drizzle(neon(env.DATABASE_URL), {logger: true, schema})
async function setupTestDb(dbName: string) {
await db.execute(`DROP DATABASE IF EXISTS ${dbName}`)
await db.execute(`DROP DATABASE IF EXISTS ${dbName} WITH (FORCE)`)
await db.execute(`CREATE DATABASE ${dbName}`)
const url = new URL(env.DATABASE_URL)
url.pathname = `/${dbName}`
console.log('setupTestDb url:', url.toString())
console.log(url.toString())
return {url, db: drizzle(url.toString(), {logger: true})}
return {url, db: drizzle(neon(url.toString()), {logger: true})}
}

beforeAll(async () => {
Expand All @@ -41,7 +41,7 @@ beforeAll(async () => {
afterAll(async () => {
if (!testEnv.DEBUG) {
await tearDownTestOrg(fixture)
await testDb.db.$client.end()
// No need to close connections with neon
// Cannot drop because database connection is still kept open by connector-postgres/server
// await db.execute(`DROP DATABASE IF EXISTS test_${fixture.testId}`)
}
Expand Down Expand Up @@ -125,7 +125,7 @@ test('create and sync plaid connection', async () => {
const rows = await testDb.db.execute(
sql`SELECT * FROM openint.banking_transaction`,
)
expect(rows[0]).toMatchObject({
expect(rows.rows[0]).toMatchObject({
source_id: connId,
id: expect.any(String),
customer_id: null,
Expand Down Expand Up @@ -178,7 +178,7 @@ test('create and sync greenhouse connection', async () => {
})

const rows = await testDb.db.execute(sql`SELECT * FROM openint.job`)
expect(rows[0]).toMatchObject({
expect(rows.rows[0]).toMatchObject({
source_id: connId,
id: expect.any(String),
customer_id: null,
Expand Down
2 changes: 1 addition & 1 deletion apps/web/app/api/connections/[connectionId]/sql/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export async function GET(
format === 'csv'
? new NextResponse(
Papa.unparse([
...rows.map((r) =>
...rows.rows.map((r) =>
R.mapValues(r, (v) =>
v instanceof Date
? v.toISOString()
Expand Down
40 changes: 26 additions & 14 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,40 @@
version: '3'
services:
postgres:
image: postgres
image: postgres:17
ports:
- 5432:5432
# command: postgres -c log_statement=all
restart: always
environment:
POSTGRES_USER: postgres
POSTGRES_DB: postgres # Only database named `postgres` works with pg_cron by default
POSTGRES_PASSWORD: password
command: ["postgres", "-c", "log_statement=all", "-c", "max_connections=500"]
healthcheck:
test: ['CMD-SHELL', 'pg_isready -U postgres']
interval: 10s
timeout: 5s
retries: 5
command: '-d 1'
volumes:
- db_data:/var/lib/postgresql/data

supabase:
image: supabase/postgres:15.8.1.017 # For some reason "lastest tag" is having issues... so we pin to a specific version latest as of 2024-12-20_0055
ports:
- 5433:5432 # Change the port to avoid conflict with the default postgres
command: postgres -c config_file=/etc/postgresql/postgresql.conf -c log_statement=all -c max_connections=500
restart: always
neon-proxy:
image: ghcr.io/timowilhelm/local-neon-http-proxy:main
environment:
POSTGRES_DB: supabase # Only database named `postgres` works with pg_cron by default
POSTGRES_PASSWORD: password
## Ensure that this matches the .env.dev DATABASE_URL
- PG_CONNECTION_STRING=postgres://postgres:password@postgres:5432/postgres
ports:
- '4444:4444'
depends_on:
postgres:
condition: service_healthy

inngest:
image: inngest/inngest
command: 'inngest dev'
ports:
- '8288:8288'
image: inngest/inngest
command: 'inngest dev'
ports:
- '8288:8288'

volumes:
db_data:
7 changes: 6 additions & 1 deletion kits/cdk/internal/oauthConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,12 @@ export function makeOauthConnectorServer({
},
},
})
.then((r) => r.data as OauthBaseTypes['connectionSettings']['oauth'])
.then(
(r) =>
r.data as OauthBaseTypes['connectionSettings']['oauth'] & {
error: z.infer<typeof zOauthConnectionError>
},
)

const parsed = zOauthCredentials.safeParse(res.credentials)
if (!parsed.success) {
Expand Down
16 changes: 8 additions & 8 deletions packages/db/index.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
import {neon} from '@neondatabase/serverless'

Check failure on line 1 in packages/db/index.ts

View workflow job for this annotation

GitHub Actions / Run type checks, lint, and tests

Cannot find module '@neondatabase/serverless' or its corresponding type declarations.
import type {DrizzleConfig, SQL} from 'drizzle-orm'
import {sql} from 'drizzle-orm'
import {drizzle} from 'drizzle-orm/postgres-js'
import postgres from 'postgres'
import {drizzle} from 'drizzle-orm/neon-http'
import {env} from '@openint/env'
import * as schema from './schema'

export * from 'drizzle-orm'
export * from './stripeNullByte'
export * from './upsert'
export {schema, drizzle, postgres}
export {schema, drizzle, neon}

export function getDb<
TSchema extends Record<string, unknown> = Record<string, never>,
>(urlString: string, config?: DrizzleConfig<TSchema>) {
const pg = postgres(urlString)
const db = drizzle(pg, {logger: !!env['DEBUG'], ...config})
const sql = neon(urlString)
const db = drizzle(sql, {logger: !!env['DEBUG'], ...config})

const url = new URL(urlString)
if (env.DEBUG) {
console.log('[db] host', url.host)
}
return {db, pg}
return {db, sql}
}

export const {pg, db} = getDb(env.DATABASE_URL, {schema})
export const {sql: pg, db} = getDb(env.DATABASE_URL, {schema})

export async function ensureSchema(
thisDb: ReturnType<typeof getDb>['db'],
Expand All @@ -34,7 +34,7 @@
.execute(
sql`SELECT true as exists FROM information_schema.schemata WHERE schema_name = ${schema}`,
)
.then((r) => r[0]?.['exists'] === true)
.then((r) => r.rows[0]?.['exists'] === true)
if (exists) {
return
}
Expand Down
50 changes: 32 additions & 18 deletions packages/db/upsert.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import {neon, neonConfig} from '@neondatabase/serverless'

Check failure on line 1 in packages/db/upsert.spec.ts

View workflow job for this annotation

GitHub Actions / Run type checks, lint, and tests

Cannot find module '@neondatabase/serverless' or its corresponding type declarations.
import {generateDrizzleJson, generateMigration} from 'drizzle-kit/api'
import {sql} from 'drizzle-orm'
import {
Expand All @@ -9,11 +10,16 @@
serial,
varchar,
} from 'drizzle-orm/pg-core'
import postgres from 'postgres'
import {env} from '@openint/env'
import {drizzle} from './'
import {dbUpsert, dbUpsertOne, inferTableForUpsert} from './upsert'

neonConfig.fetchEndpoint = (host) => {

Check failure on line 17 in packages/db/upsert.spec.ts

View workflow job for this annotation

GitHub Actions / Run type checks, lint, and tests

Parameter 'host' implicitly has an 'any' type.
const [protocol, port] =
host === 'db.localtest.me' ? ['http', 4444] : ['https', 443]
return `${protocol}://${host}:${port}/sql`
}

async function formatSql(sqlString: string) {
const prettier = await import('prettier')
const prettierSql = await import('prettier-plugin-sql')
Expand All @@ -25,7 +31,12 @@
})
}

const noopDb = drizzle('postgres://noop', {logger: true})
const noopDb = drizzle(
neon('postgres://noop:noop@localhost:5432/noop?schema=public'),
{
logger: true,
},
)

test('upsert query', async () => {
const engagement_sequence = pgTable(
Expand Down Expand Up @@ -237,7 +248,9 @@
"
`)

const query = dbUpsertOne(drizzle(''), table, row, {keyColumns: ['id']})
const query = dbUpsertOne(noopDb, table, row, {
keyColumns: ['id'],
})
expect(query.toSQL().params).toEqual([
'123',
'abc',
Expand Down Expand Up @@ -272,18 +285,18 @@
})

describe('with db', () => {
// console.log('filename', __filename)
jest.setTimeout(30000) // 30 seconds

const dbName = 'upsert_db'

const dbUrl = new URL(env.DATABASE_URL)
dbUrl.pathname = `/${dbName}`
const db = drizzle(dbUrl.toString(), {logger: true})
const db = drizzle(neon(dbUrl.toString()), {logger: true})

beforeAll(async () => {
const masterDb = drizzle(env.DATABASE_URL, {logger: true})
await masterDb.execute(`DROP DATABASE IF EXISTS ${dbName}`)
const masterDb = drizzle(neon(env.DATABASE_URL), {logger: true})
await masterDb.execute(`DROP DATABASE IF EXISTS ${dbName} WITH (FORCE)`)
await masterDb.execute(`CREATE DATABASE ${dbName}`)
await masterDb.$client.end()

await db.execute(sql`
CREATE TABLE IF NOT EXISTS "test_user" (
Expand Down Expand Up @@ -314,7 +327,7 @@

test('upsert with inferred table', async () => {
const ret = await db.execute(sql`SELECT * FROM "test_user"`)
expect(ret[0]).toEqual(row)
expect(ret.rows[0]).toEqual(row)
})

test('null means null', async () => {
Expand All @@ -325,7 +338,7 @@
{keyColumns: ['id']},
)
const ret2 = await db.execute(sql`SELECT * FROM "test_user"`)
expect(ret2[0]).toEqual({...row, name: null})
expect(ret2.rows[0]).toEqual({...row, name: null})
})

test('ignore undefined values by default', async () => {
Expand All @@ -336,7 +349,7 @@
{keyColumns: ['id']},
)
const ret2 = await db.execute(sql`SELECT * FROM "test_user"`)
expect(ret2[0]).toEqual(row)
expect(ret2.rows[0]).toEqual(row)
})

test('treat undefined as sql DEFAULT keyword', async () => {
Expand All @@ -347,7 +360,7 @@
{keyColumns: ['id'], undefinedAsDefault: true},
)
const ret2 = await db.execute(sql`SELECT * FROM "test_user"`)
expect(ret2[0]).toEqual({...row, name: 'unnamed'})
expect(ret2.rows[0]).toEqual({...row, name: 'unnamed'})
})

test('only use the firstRow for inferring schema', async () => {
Expand All @@ -362,7 +375,7 @@
)

expect(
await db.execute(sql`SELECT * FROM "test_user"`).then((r) => r[0]),
await db.execute(sql`SELECT * FROM "test_user"`).then((r) => r.rows[0]),
).toEqual({...row, name: 'original'})
})

Expand Down Expand Up @@ -391,13 +404,13 @@
;(db as any).dialect.casing.clearCache()
await dbUpsertOne(db, 'pipeline', {id: 2, str: 'my'}, {keyColumns: ['id']})

// recreating the db each time works better
const pg = postgres(dbUrl.toString())
const d = () => drizzle(pg, {logger: true})
// recreating the db connection each time works better
const neonClient = neon(dbUrl.toString())
const d = () => drizzle(neonClient, {logger: true})
await dbUpsertOne(d(), 'pipeline', {id: 3, num: 223}, {keyColumns: ['id']})
await dbUpsertOne(d(), 'pipeline', {id: 4, str: 'my'}, {keyColumns: ['id']})
const res = await d().execute('SELECT * FROM "pipeline"')
expect(res).toEqual([
expect(res.rows).toEqual([
{id: '1', num: 123, str: null},
{id: '2', num: null, str: 'my'},
{id: '3', num: 223, str: null},
Expand All @@ -406,6 +419,7 @@
})

afterAll(async () => {
await db.$client.end()
// neon does not support end
// await db.$client.end()
})
})
16 changes: 10 additions & 6 deletions packages/engine-backend/inngest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,22 @@ export const persistEventsMiddleware = new InngestMiddleware({
)

const rows = connectionIds.length
? await db.execute<{
id: string
org_id: string
customer_id: string
}>(sql`
? await db
.execute<{
id: string
org_id: string
customer_id: string
}>(
sql`
SELECT c.id, cc.org_id, c.customer_id as cus_id
FROM ${schema.connection} c
JOIN ${
schema.connector_config
} cc ON c.connector_config_id = cc.id
WHERE c.id = ANY(${sql.param(connectionIds)})
`)
`,
)
.then((r) => r.rows)
: []

const infoByConnId = Object.fromEntries(
Expand Down
Loading
Loading