Skip to content

Commit

Permalink
worker debug logging (tldraw#5219)
Browse files Browse the repository at this point in the history
This PR begins to address some pains we've felt with regard to logging
on cloudflare workers:

1. a console.log invocation is only flushed to the console (or cf
dashboard) once a request completes. so if a request hangs for some
reason you never see the log output.
2. logs are very eagerly sampled, which makes it hard to rely on them
for debugging because you never know whether a log statement wasn't
reached or whether it was just dropped.

so this PR

- adds a Logger durable object that you can connect to via a websocket
to get an instant stream of every log statement, no waiting for requests
to finish.
- wraps the Logger durable object with a Logger class to consolidate
code.

The logger durable object is on in dev and preview envs, but is not
available in staging or production yet because that would require auth
and filtering user DO events by user.

You can try it on this PR by going to
https://pr-5219-preview-deploy.tldraw.com/__debug-tail and then opening
the app in another tab

also tackles
https://linear.app/tldraw/issue/INT-648/investigate-flaky-preview-deploys
somewhat

### Change type

- [x] `other`
  • Loading branch information
ds300 authored Jan 17, 2025
1 parent bc46284 commit 7f52c14
Show file tree
Hide file tree
Showing 13 changed files with 277 additions and 79 deletions.
1 change: 1 addition & 0 deletions apps/dotcom/client/src/routes.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ export const router = createRoutesFromElements(
}}
>
{isClerkCookieSet || isOverrideFlagSet ? tlaRoutes : legacyRoutes}
<Route path="/__debug-tail" lazy={() => import('./tla/pages/worker-debug-tail')} />
<Route path="*" lazy={() => import('./pages/not-found')} />
</Route>
)
Expand Down
40 changes: 40 additions & 0 deletions apps/dotcom/client/src/tla/pages/worker-debug-tail.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { useEffect, useRef } from 'react'
import { MULTIPLAYER_SERVER } from '../../utils/config'

export function Component() {
const ref = useRef<HTMLDivElement>(null)
const isAutoScroll = useRef(true)
useEffect(() => {
const elem = ref.current
if (!elem) return
const socket = new WebSocket(MULTIPLAYER_SERVER + '/app/__debug-tail')
socket.onmessage = (msg) => {
const div = document.createElement('pre')
div.textContent = msg.data
elem.appendChild(div)
if (isAutoScroll.current) {
elem.scrollTo({ top: elem.scrollHeight })
}
}
socket.onerror = (err) => {
console.error(err)
}
socket.onclose = () => {
setTimeout(() => {
window.location.reload()
}, 500)
}

const onScroll = () => {
isAutoScroll.current = elem.scrollTop + elem.clientHeight > elem.scrollHeight - 100
}
elem.addEventListener('scroll', onScroll)
return () => {
socket.close()
elem.removeEventListener('scroll', onScroll)
}
}, [])
return (
<div ref={ref} style={{ fontFamily: 'monospace', overflow: 'scroll', height: '100vh' }}></div>
)
}
44 changes: 44 additions & 0 deletions apps/dotcom/sync-worker/src/Logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { createSentry } from '@tldraw/worker-shared'
import { Environment, isDebugLogging } from './types'
import { getLogger } from './utils/durableObjects'

export class Logger {
readonly logger
constructor(
env: Environment,
private prefix: string,
private sentry?: ReturnType<typeof createSentry>
) {
if (isDebugLogging(env)) {
this.logger = getLogger(env)
}
}
private outgoing: string[] = []

private isRunning = false

debug(...args: any[]) {
if (!this.logger && !this.sentry) return
const msg = `[${this.prefix} ${new Date().toISOString()}]: ${args.map((a) => (typeof a === 'object' ? JSON.stringify(a) : a)).join(' ')}`
this.outgoing.push(msg)
this.processQueue()
}

private async processQueue() {
if (this.isRunning) return
this.isRunning = true
try {
while (this.outgoing.length) {
const batch = this.outgoing
this.outgoing = []
await this.logger?.debug(batch)
for (const message of batch) {
// eslint-disable-next-line @typescript-eslint/no-deprecated
this.sentry?.addBreadcrumb({ message })
}
}
} finally {
this.isRunning = false
}
}
}
65 changes: 65 additions & 0 deletions apps/dotcom/sync-worker/src/TLLoggerDurableObject.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { DurableObject } from 'cloudflare:workers'
import { IRequest } from 'itty-router'
import { Environment, isDebugLogging } from './types'

export class TLLoggerDurableObject extends DurableObject<Environment> {
private readonly isDebugEnv
private readonly db
constructor(ctx: DurableObjectState, env: Environment) {
super(ctx, env)
this.isDebugEnv = isDebugLogging(env)
this.db = this.ctx.storage.sql
this.db.exec(
`CREATE TABLE IF NOT EXISTS logs (
message TEXT NOT NULL
);
CREATE TRIGGER IF NOT EXISTS limit_logs
AFTER INSERT ON logs
BEGIN
DELETE FROM logs WHERE rowid NOT IN (
SELECT rowid FROM logs ORDER BY rowid DESC LIMIT 20000
);
END;`
)
}

private sockets = new Set<WebSocket>()

async debug(messages: string[]) {
if (!this.isDebugEnv) return
for (const message of messages) {
this.db.exec(`INSERT INTO logs (message) VALUES (?)`, message)
}

const sockets = Array.from(this.sockets)
if (this.sockets.size === 0) return
for (const message of messages) {
sockets.forEach((socket) => {
socket.send(message + '\n')
})
}
}

getFullHistory() {
return this.db
.exec('SELECT message FROM logs ORDER BY rowid ASC')
.toArray()
.map((row) => row.message)
}

override async fetch(_req: IRequest) {
if (!this.isDebugEnv) return new Response('Not Found', { status: 404 })
const { 0: clientWebSocket, 1: serverWebSocket } = new WebSocketPair()
serverWebSocket.accept()

this.sockets.add(serverWebSocket)
const cleanup = () => {
this.sockets.delete(serverWebSocket)
serverWebSocket.close()
}
serverWebSocket.addEventListener('close', cleanup)
serverWebSocket.addEventListener('error', cleanup)
serverWebSocket.send('Connected to logger\n' + this.getFullHistory().join('\n'))
return new Response(null, { status: 101, webSocket: clientWebSocket })
}
}
68 changes: 45 additions & 23 deletions apps/dotcom/sync-worker/src/TLPostgresReplicator.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ROOM_PREFIX, TlaFile, ZTable } from '@tldraw/dotcom-shared'
import { DB, ROOM_PREFIX, TlaFile, ZTable } from '@tldraw/dotcom-shared'
import {
ExecutionQueue,
assert,
Expand All @@ -9,10 +9,12 @@ import {
} from '@tldraw/utils'
import { createSentry } from '@tldraw/worker-shared'
import { DurableObject } from 'cloudflare:workers'
import { Kysely, sql } from 'kysely'
import postgres from 'postgres'
import { Logger } from './Logger'
import type { TLDrawDurableObject } from './TLDrawDurableObject'
import { ZReplicationEventWithoutSequenceInfo } from './UserDataSyncer'
import { createPostgresConnection } from './postgres'
import { createPostgresConnection, createPostgresConnectionPool } from './postgres'
import { Analytics, Environment, TLPostgresReplicatorEvent } from './types'
import { EventData, writeDataPoint } from './utils/analytics'
import { getUserDurableObject } from './utils/durableObjects'
Expand Down Expand Up @@ -82,6 +84,7 @@ export class TLPostgresReplicator extends DurableObject<Environment> {
}
private measure: Analytics | undefined
private postgresUpdates = 0
private lastPostgresMessageTime = Date.now()
private lastRpmLogTime = Date.now()

// we need to guarantee in-order delivery of messages to users
Expand All @@ -103,6 +106,9 @@ export class TLPostgresReplicator extends DurableObject<Environment> {
}
}

private log

private readonly db: Kysely<DB>
constructor(ctx: DurableObjectState, env: Environment) {
super(ctx, env)
this.sentry = createSentry(ctx, env)
Expand All @@ -115,8 +121,15 @@ export class TLPostgresReplicator extends DurableObject<Environment> {
})
)

this.reboot(false)
// debug logging in preview envs by default
this.log = new Logger(env, 'TLPostgresReplicator', this.sentry)
this.db = createPostgresConnectionPool(env, 'TLPostgresReplicator')

this.alarm()
this.reboot(false).catch((e) => {
this.captureException(e)
this.__test__panic()
})
this.measure = env.MEASURE
}

Expand Down Expand Up @@ -161,19 +174,20 @@ export class TLPostgresReplicator extends DurableObject<Environment> {
this.ctx.abort()
}

private debug(...args: any[]) {
// uncomment for dev time debugging
// console.log('[TLPostgresReplicator]:', ...args)
if (this.sentry) {
// eslint-disable-next-line @typescript-eslint/no-deprecated
this.sentry.addBreadcrumb({
message: `[TLPostgresReplicator]: ${args.join(' ')}`,
})
}
}
override async alarm() {
this.ctx.storage.setAlarm(Date.now() + 1000)
this.maybeLogRpm()
// If we haven't heard anything from postgres for 5 seconds, do a little transaction
// to update a random string as a kind of 'ping' to keep the connection alive
// If we haven't heard anything for 10 seconds, reboot
if (Date.now() - this.lastPostgresMessageTime > 10000) {
this.log.debug('rebooting due to inactivity')
this.reboot()
} else if (Date.now() - this.lastPostgresMessageTime > 5000) {
sql`insert into replicator_boot_id ("replicatorId", "bootId") values (${this.ctx.id.toString()}, ${uniqueId()}) on conflict ("replicatorId") do update set "bootId" = excluded."bootId"`.execute(
this.db
)
}
}

private maybeLogRpm() {
Expand All @@ -192,13 +206,13 @@ export class TLPostgresReplicator extends DurableObject<Environment> {

private async reboot(delay = true) {
this.logEvent({ type: 'reboot' })
this.debug('reboot push')
this.log.debug('reboot push')
await this.queue.push(async () => {
if (delay) {
await sleep(1000)
}
const start = Date.now()
this.debug('rebooting')
this.log.debug('rebooting')
const res = await Promise.race([
this.boot().then(() => 'ok'),
sleep(3000).then(() => 'timeout'),
Expand All @@ -207,7 +221,7 @@ export class TLPostgresReplicator extends DurableObject<Environment> {
this.captureException(e)
return 'error'
})
this.debug('rebooted', res)
this.log.debug('rebooted', res)
if (res === 'ok') {
this.logEvent({ type: 'reboot_duration', duration: Date.now() - start })
} else {
Expand All @@ -217,7 +231,8 @@ export class TLPostgresReplicator extends DurableObject<Environment> {
}

private async boot() {
this.debug('booting')
this.log.debug('booting')
this.lastPostgresMessageTime = Date.now()
// clean up old resources if necessary
if (this.state.type === 'connected') {
this.state.subscription.unsubscribe()
Expand Down Expand Up @@ -275,8 +290,13 @@ export class TLPostgresReplicator extends DurableObject<Environment> {
}

private handleEvent(row: postgres.Row | null, event: postgres.ReplicationEvent) {
this.lastPostgresMessageTime = Date.now()
if (event.relation.table === 'replicator_boot_id') {
// ping, ignore
return
}
this.postgresUpdates++
this.debug('handleEvent', event)
this.log.debug('handleEvent', event)
assert(this.state.type === 'connected', 'state should be connected in handleEvent')
try {
switch (event.relation.table) {
Expand Down Expand Up @@ -380,7 +400,7 @@ export class TLPostgresReplicator extends DurableObject<Environment> {
.toArray()[0]
if (!sub) {
// the file was deleted before the file state
this.debug('file state deleted before file', row)
this.log.debug('file state deleted before file', row)
} else {
this.sql.exec(
`DELETE FROM user_file_subscriptions WHERE userId = ? AND fileId = ?`,
Expand Down Expand Up @@ -441,7 +461,7 @@ export class TLPostgresReplicator extends DurableObject<Environment> {

private handleUserEvent(row: postgres.Row | null, event: postgres.ReplicationEvent) {
assert(row?.id, 'user id is required')
this.debug('USER EVENT', event.command, row.id)
this.log.debug('USER EVENT', event.command, row.id)
this.messageUser(row.id, {
type: 'row_update',
row: row as any,
Expand All @@ -456,7 +476,7 @@ export class TLPostgresReplicator extends DurableObject<Environment> {
}

async ping() {
this.debug('ping')
this.log.debug('ping')
return { sequenceId: this.state.sequenceId }
}

Expand Down Expand Up @@ -505,7 +525,7 @@ export class TLPostgresReplicator extends DurableObject<Environment> {
sequenceId: this.state.sequenceId + ':' + sequenceIdSuffix,
})
if (res === 'unregister') {
this.debug('unregistering user', userId, event)
this.log.debug('unregistering user', userId, event)
this.unregisterUser(userId)
}
})
Expand All @@ -520,9 +540,11 @@ export class TLPostgresReplicator extends DurableObject<Environment> {
}

async registerUser(userId: string) {
this.debug('registering user', userId)
this.log.debug('registering user', userId)
this.logEvent({ type: 'register_user' })
this.log.debug('reg user wait')
await this.waitUntilConnected()
this.log.debug('reg user connect')
assert(this.state.type === 'connected', 'state should be connected in registerUser')
const guestFiles = await this.state
.db`SELECT "fileId" as id FROM file_state where "userId" = ${userId}`
Expand Down
Loading

0 comments on commit 7f52c14

Please sign in to comment.