Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy Razon committed Jan 14, 2024
1 parent 8fed874 commit 5220276
Show file tree
Hide file tree
Showing 35 changed files with 934 additions and 745 deletions.
1 change: 1 addition & 0 deletions packages/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ export {
dockerMachineStatusCommandRecipeSchema,
machineStatusCommandSchema,
} from './src/machine-status-command.js'
export * from './src/async.js'
export { ProcessOutputBuffers, orderedOutput, OrderedOutput } from './src/process-output-buffers.js'
export { generateSchemaErrorMessage } from './src/schema.js'
14 changes: 14 additions & 0 deletions packages/common/src/async.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export type AsyncObjectIterator<TObject, TResult> = (
value: TObject[keyof TObject],
key: string,
collection: TObject,
) => Promise<TResult>

export const asyncMapValues = async <T extends object, TResult>(
obj: T,
callback: AsyncObjectIterator<T, TResult>,
): Promise<{ [P in keyof T]: TResult }> => Object.fromEntries(
await Promise.all(
Object.entries(obj).map(async ([key, value]) => [key, await callback(value, key, obj)])
)
)
22 changes: 13 additions & 9 deletions packages/common/src/compose-tunnel-agent/script-injection.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import z from 'zod'
import { camelCase, mapKeys, partition, snakeCase, chain } from 'lodash-es'
import { camelCase, mapKeys, partition, snakeCase, map, groupBy } from 'lodash-es'
import { inspect } from 'util'
import { COMPOSE_TUNNEL_AGENT_SERVICE_LABELS } from './labels.js'

Expand Down Expand Up @@ -70,15 +70,19 @@ const parseGroupedLabelKey = (
const parseLabelsWithPrefixAndId = (
labels: Record<string, string>,
prefix: string,
): Record<string, string>[] => chain(labels)
.entries()
.map(([k, v]) => [parseGroupedLabelKey(k), v])
.filter(
(kvp): kvp is [ParsedGroupedLabelKey, string] => (kvp[0] as ParsedGroupedLabelKey | undefined)?.prefix === prefix
): Record<string, string>[] => {
const filtered = map(Object.entries(labels), ([k, v]) => [parseGroupedLabelKey(k), v])
.filter(
(kvp): kvp is [ParsedGroupedLabelKey, string] => (
kvp[0] as ParsedGroupedLabelKey | undefined
)?.prefix === prefix
)
const grouped = groupBy(
filtered,
([{ id }]) => id,
)
.groupBy(([{ id }]) => id)
.map(group => Object.fromEntries(group.map(([{ key }, value]) => [key, value])))
.value()
return Object.values(grouped).map(group => Object.fromEntries(group.map(([{ key }, value]) => [key, value])))
}

const notAnError = <T>(val: T | Error): val is T => !(val instanceof Error)

Expand Down
15 changes: 9 additions & 6 deletions packages/common/src/ssh/base-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,17 @@ export const baseSshClient = async (
return await tryParseJsonChunks<T>(channel)
}

const end = async () => {
if (!ended) {
ssh.end()
await events.once(ssh, 'end')
}
}

const result = {
ssh,
end: async () => {
if (!ended) {
ssh.end()
await events.once(ssh, 'end')
}
},
end,
[Symbol.asyncDispose]: end,
execHello: () => exec<HelloResponse>('hello'),
execTunnelUrl: <T extends string>(tunnels: T[]) => exec<Record<T, string>>(`tunnel-url ${tunnels.join(' ')}`),
}
Expand Down
2 changes: 1 addition & 1 deletion packages/compose-tunnel-agent/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ WORKDIR /app

FROM base as production
COPY out /app/
CMD [ "node", "/app/index.mjs" ]
CMD [ "node", "--enable-source-maps", "/app/index.mjs" ]
1 change: 1 addition & 0 deletions packages/compose-tunnel-agent/build.mjs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as esbuild from 'esbuild'

const ESM_REQUIRE_SHIM = `
await import('disposablestack/auto')
await (async () => {
const { dirname } = await import("path");
const { fileURLToPath } = await import("url");
Expand Down
227 changes: 100 additions & 127 deletions packages/compose-tunnel-agent/index.ts
Original file line number Diff line number Diff line change
@@ -1,67 +1,23 @@
import fs from 'fs'
import path from 'path'
import Docker from 'dockerode'
import { rimraf } from 'rimraf'
import { pino } from 'pino'
import pinoPrettyModule from 'pino-pretty'
import {
requiredEnv,
formatPublicKey,
MachineStatusCommand,
parseSshUrl,
SshConnectionConfig,
tunnelNameResolver,
MachineStatusCommand,
COMPOSE_TUNNEL_AGENT_PORT,
ScriptInjection,
} from '@preevy/common'
import { inspect } from 'util'
import { omit } from 'lodash-es'
import { createApp } from './src/api-server/index.js'
import { sshClient as createSshClient } from './src/ssh/index.js'
import { runMachineStatusCommand } from './src/machine-status.js'
import { envMetadata } from './src/metadata.js'
import { readAllFiles } from './src/files.js'
import {
forwardsEmitter as createDockerForwardsEmitter,
filteredClient as createDockerFilteredClient,
} from './src/plugins/docker/forwards-emitter/index.js'
import { anyComposeProjectFilters, composeProjectFilters } from './src/plugins/docker/filters.js'

const PinoPretty = pinoPrettyModule.default

const homeDir = process.env.HOME || '/root'
const dockerSocket = '/var/run/docker.sock'
const composeModelPath = '/preevy/docker-compose.yaml'
const envId = requiredEnv('PREEVY_ENV_ID')

const targetComposeProject = process.env.COMPOSE_PROJECT
const defaultAccess = process.env.DEFAULT_ACCESS_LEVEL === 'private' ? 'private' : 'public'

const sshConnectionConfigFromEnv = async (): Promise<{ connectionConfig: SshConnectionConfig; sshUrl: string }> => {
const sshUrl = requiredEnv('SSH_URL')
const parsed = parseSshUrl(sshUrl)

const clientPrivateKey = process.env.SSH_PRIVATE_KEY || fs.readFileSync(
path.join(homeDir, '.ssh', 'id_rsa'),
{ encoding: 'utf8' },
)

const knownServerPublicKeys = await readAllFiles(path.join(homeDir, 'known_server_keys'))

return {
sshUrl,
connectionConfig: {
...parsed,
clientPrivateKey,
username: envId,
knownServerPublicKeys,
insecureSkipVerify: Boolean(process.env.INSECURE_SKIP_VERIFY),
tlsServerName: process.env.TLS_SERVERNAME || undefined,
},
}
}

const fastifyListenArgsFromEnv = async () => {
const portOrPath = process.env.PORT ?? COMPOSE_TUNNEL_AGENT_PORT
import { readConfig, Config, Plugin } from './src/configuration/index.js'
import { createLog } from './src/log.js'
import { Forward } from './src/forwards.js'
import { aggregator } from './src/aggregator.js'
import { loadPlugins } from './src/plugins.js'

const fastifyListenArgsFromConfig = async (config: Pick<Config, 'listen'>) => {
const portOrPath = config.listen
const portNumber = Number(portOrPath)
if (typeof portOrPath === 'string' && Number.isNaN(portNumber)) {
await rimraf(portOrPath)
Expand All @@ -70,107 +26,124 @@ const fastifyListenArgsFromEnv = async () => {
return { port: portNumber, host: '0.0.0.0' }
}

const machineStatusCommand = process.env.MACHINE_STATUS_COMMAND
? JSON.parse(process.env.MACHINE_STATUS_COMMAND) as MachineStatusCommand
: undefined

const globalInjects = process.env.GLOBAL_INJECT_SCRIPTS
? JSON.parse(process.env.GLOBAL_INJECT_SCRIPTS) as ScriptInjection[]
: []
const findMachineStatusCommandRunner = (
plugins: Plugin[],
spec: MachineStatusCommand | undefined,
) => {
if (!spec) {
return undefined
}
const runner = plugins.map(p => p.machineStatusCommands?.[spec.recipe.type]).find(x => x)
if (!runner) {
throw new Error(`no handler found in plugins for machine status command with type "${spec.recipe.type}"`)
}
return async () => ({
data: await runner(spec.recipe),
contentType: spec.contentType,
})
}

const log = pino({
level: process.env.DEBUG || process.env.DOCKER_PROXY_DEBUG ? 'debug' : 'info',
}, PinoPretty({ destination: pino.destination(process.stderr) }))
let log = createLog(process)
const SHUTDOWN_TIMEOUT = 5000
const exitSignals = ['SIGTERM', 'SIGINT', 'uncaughtException'] as const

const main = async () => {
const config = await readConfig(process)

log = createLog(process, config)

let endRequested = false
const { connectionConfig, sshUrl } = await sshConnectionConfigFromEnv()
const disposables = new AsyncDisposableStack()
const end = async () => {
endRequested = true
await disposables.disposeAsync()
}

const {
server: serverUrl,
envId,
machineStatusCommand: machineStatusCommandSpec,
} = config

const connectionConfig: SshConnectionConfig = {
...parseSshUrl(serverUrl),
clientPrivateKey: await config.privateKey,
username: envId,
knownServerPublicKeys: config.serverKey,
insecureSkipVerify: Boolean(config.insecureSkipVerify),
tlsServerName: config.tlsServerName,
}

log.debug('ssh config: %j', {
...connectionConfig,
clientPrivateKey: '*** REDACTED ***',
...omit<SshConnectionConfig, 'clientPrivateKey'>(connectionConfig, 'clientPrivateKey'),
clientPublicKey: formatPublicKey(connectionConfig.clientPrivateKey),
})

const dockerFilters = targetComposeProject
? composeProjectFilters({ composeProject: targetComposeProject })
: anyComposeProjectFilters

const docker = new Docker({ socketPath: dockerSocket })
const dockerForwardsEmitter = createDockerForwardsEmitter({
log: log.child({ name: 'docker' }),
docker,
debounceWait: 500,
filters: dockerFilters,
tunnelNameResolver: tunnelNameResolver({ envId }),
})

const sshLog = log.child({ name: 'ssh' })
const sshClient = await createSshClient({
sshLog.info('ssh client connecting to %j', serverUrl)
const sshClient = disposables.use(await createSshClient({
log: sshLog,
connectionConfig,
defaultAccess,
globalInjects,
})
defaultAccess: config.defaultAccess,
globalInjects: config.globalInjects,
}))

sshClient.ssh.on('close', () => {
sshLog.info('ssh client connected to %j', serverUrl)
sshClient.ssh.on('close', async () => {
if (!endRequested) {
log.error('ssh client closed unexpectedly')
await end()
process.exit(1)
}
log.info('ssh client closed')
})

sshLog.info('ssh client connected to %j', sshUrl)

void dockerForwardsEmitter.then(client => client.on('forwards', forwards => sshClient.updateForwards(forwards)))
const plugins = await loadPlugins(config, p => ({ log: log.child({ name: `plugin-${p}` }) }))

const app = await createApp({
const app = disposables.use(await createApp({
log: log.child({ name: 'api' }),
currentSshState: () => sshClient.state(),
machineStatus: machineStatusCommand
? async () => await runMachineStatusCommand({ log, docker })(machineStatusCommand)
: undefined,
envMetadata: await envMetadata({ env: process.env, log }),
composeModelPath: fs.existsSync(composeModelPath) ? composeModelPath : undefined,
dockerModem: docker.modem,
dockerFilter: createDockerFilteredClient({ docker, filters: dockerFilters }),
})
machineStatus: findMachineStatusCommandRunner(Object.values(plugins), machineStatusCommandSpec),
envMetadata: config.envMetadata,
}))

const forwardsAggregator = aggregator<Forward>(f => f.externalName)
await sshClient.updateForwards(forwardsAggregator(Symbol('staticConfig'), config.forwards))

await Promise.all(Object.entries(plugins).map(async ([pluginName, plugin]) => {
if (plugin.forwardsEmitter) {
disposables.use(await plugin.forwardsEmitter({
tunnelNameResolver: tunnelNameResolver({ envId }),
})).on('forwards', async forwards => {
await sshClient.updateForwards(forwardsAggregator(pluginName, forwards))
})
}

void app.listen({ ...await fastifyListenArgsFromEnv() })
app.server.unref()
if (plugin.fastifyPlugin) {
await app.register(plugin.fastifyPlugin)
}
}))

const end = async () => {
endRequested = true
await Promise.all([
app.close(),
sshClient.end(),
dockerForwardsEmitter.then(client => client[Symbol.asyncDispose]()),
])
}
void app.listen({ ...await fastifyListenArgsFromConfig(config) })
app.server.unref()

return { end }
exitSignals.forEach(signal => {
process.once(signal, async (...args) => {
const argsStr = args ? args.map(arg => inspect(arg)).join(', ') : undefined
const logLevel = signal === 'uncaughtException' ? 'error' : 'warn'
log[logLevel](`shutting down on ${[signal, argsStr].filter(Boolean).join(': ')}`)
if (!await Promise.race([
end().then(() => true),
new Promise<void>(resolve => { setTimeout(resolve, SHUTDOWN_TIMEOUT) }),
])) {
log.error(`timed out while waiting ${SHUTDOWN_TIMEOUT}ms for server to close, exiting`)
}
process.exit(1)
})
})
}

const SHUTDOWN_TIMEOUT = 5000

void main().then(
({ end }) => {
['SIGTERM', 'SIGINT', 'uncaughtException'].forEach(signal => {
process.once(signal, async (...args) => {
const argsStr = args ? args.map(arg => inspect(arg)).join(', ') : undefined
log.warn(`shutting down on ${[signal, argsStr].filter(Boolean).join(': ')}`)
const endResult = await Promise.race([
end().then(() => true),
new Promise<void>(resolve => { setTimeout(resolve, SHUTDOWN_TIMEOUT) }),
])
if (!endResult) {
log.error(`timed out while waiting ${SHUTDOWN_TIMEOUT}ms for server to close, exiting`)
}
process.exit(1)
})
})
},
void main().catch(
err => {
log.error(err)
process.exit(1)
Expand Down
23 changes: 23 additions & 0 deletions packages/compose-tunnel-agent/src/aggregator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { describe, test, expect } from '@jest/globals'
import { aggregator } from './aggregator.js'

describe('aggregator', () => {
type Obj = { key: string; value: number }

test('distinct objects', () => {
const agg = aggregator<Obj>(o => o.key)
const o1 = { key: 'a', value: 1 }
const o2 = { key: 'b', value: 2 }
expect(agg('s1', [o1])).toEqual([o1])
expect(agg('s2', [o2])).toEqual([o1, o2])
})

test('duplicate objects', () => {
const agg = aggregator<Obj>(o => o.key)
const o1 = { key: 'a', value: 1 }
const o2 = { key: 'a', value: 2 }
expect(agg('s1', [o1])).toEqual([o1])
expect(agg('s2', [o2])).toEqual([o2])
expect(agg('s2', [])).toEqual([o1])
})
})
Loading

0 comments on commit 5220276

Please sign in to comment.