+ +
+
+

1. Let's make a test request

+
The gateway supports 250+ models across 36 AI providers. Choose your provider and API + key below.
+
+
🐍 Python
+
📦 Node.js
+
🌀 cURL
+
+
+ +
+
+
+
+
+
+
+
+
+
+ + + +
+
+ +
+

2. Create a routing config

+
Gateway configs allow you to route requests to different providers and models. You can load balance, set fallbacks, and configure automatic retries & timeouts. Learn more
+
+
Simple Config
+
Load Balancing
+
Fallbacks
+
Retries & Timeouts
+
+
+
+ +
+ +
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + + +
+ + +
+
+ + + + +

Setup a Call

+

Get personalized support and learn how Portkey can be tailored to your needs.

+ Schedule Consultation +
+
+ + + + + +

Enterprise Features

+

Explore advanced features and see how Portkey can scale with your business.

+ View Enterprise Plan +
+
+ + + + +

Join Our Community

+

Connect with other developers, share ideas, and get help from the Portkey team.

+ Join Discord +
+
+
+
+ +
+
+

Real-time Logs

+
+ + +
+
+ + + + + + + + + + + + + + + + + + +
TimeMethodEndpointStatusDurationActions
+
+ Listening for logs... +
+
+
+
+ + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/services/realtimeLlmEventParser.ts b/src/services/realtimeLlmEventParser.ts new file mode 100644 index 000000000..88415cc87 --- /dev/null +++ b/src/services/realtimeLlmEventParser.ts @@ -0,0 +1,160 @@ +import { Context } from 'hono'; + +export class RealtimeLlmEventParser { + private sessionState: any; + + constructor() { + this.sessionState = { + sessionDetails: null, + conversation: { + items: new Map(), + }, + responses: new Map(), + }; + } + + // Main entry point for processing events + handleEvent(c: Context, event: any, sessionOptions: any): void { + switch (event.type) { + case 'session.created': + this.handleSessionCreated(c, event, sessionOptions); + break; + case 'session.updated': + this.handleSessionUpdated(c, event, sessionOptions); + break; + case 'conversation.item.created': + this.handleConversationItemCreated(c, event); + break; + case 'conversation.item.deleted': + this.handleConversationItemDeleted(c, event); + break; + case 'response.done': + this.handleResponseDone(c, event, sessionOptions); + break; + case 'error': + this.handleError(c, event, sessionOptions); + break; + default: + break; + } + } + + // Handle `session.created` event + private handleSessionCreated( + c: Context, + data: any, + sessionOptions: any + ): void { + this.sessionState.sessionDetails = { ...data.session }; + const realtimeEventParser = c.get('realtimeEventParser'); + if (realtimeEventParser) { + c.executionCtx.waitUntil( + realtimeEventParser( + c, + sessionOptions, + {}, + { ...data.session }, + data.type + ) + ); + } + } + + // Handle `session.updated` event + private handleSessionUpdated( + c: Context, + data: any, + sessionOptions: any + ): void { + this.sessionState.sessionDetails = { ...data.session }; + const realtimeEventParser = c.get('realtimeEventParser'); + if (realtimeEventParser) { + c.executionCtx.waitUntil( + realtimeEventParser( + c, + sessionOptions, + {}, + { ...data.session }, + data.type + ) + ); + } + } + + // Conversation-specific handlers + private handleConversationItemCreated(c: Context, data: any): void { + const { item } = data; + this.sessionState.conversation.items.set(item.id, data); + } + + private handleConversationItemDeleted(c: Context, data: any): void { + this.sessionState.conversation.items.delete(data.item_id); + } + + private handleResponseDone(c: Context, data: any, sessionOptions: any): void { + const { response } = data; + this.sessionState.responses.set(response.id, response); + for (const item of response.output) { + const inProgressItem = this.sessionState.conversation.items.get(item.id); + this.sessionState.conversation.items.set(item.id, { + ...inProgressItem, + item, + }); + } + const realtimeEventParser = c.get('realtimeEventParser'); + if (realtimeEventParser) { + const itemSequence = this.rebuildConversationSequence( + this.sessionState.conversation.items + ); + c.executionCtx.waitUntil( + realtimeEventParser( + c, + sessionOptions, + { + conversation: { + items: this.getOrderedConversationItems(itemSequence).slice( + 0, + -1 + ), + }, + }, + data, + data.type + ) + ); + } + } + + private handleError(c: Context, data: any, sessionOptions: any): void { + const realtimeEventParser = c.get('realtimeEventParser'); + if (realtimeEventParser) { + c.executionCtx.waitUntil( + realtimeEventParser(c, sessionOptions, {}, data, data.type) + ); + } + } + + private rebuildConversationSequence(items: Map): string[] { + const orderedItemIds: string[] = []; + + // Find the first item (no previous_item_id) + let currentId: string | undefined = Array.from(items.values()).find( + (data) => data.previous_item_id === null + )?.item?.id; + + // Traverse through the chain using previous_item_id + while (currentId) { + orderedItemIds.push(currentId); + const nextItem = Array.from(items.values()).find( + (data) => data.previous_item_id === currentId + ); + currentId = nextItem?.item?.id; + } + + return orderedItemIds; + } + + private getOrderedConversationItems(sequence: string[]): any { + return sequence.map((id) => this.sessionState.conversation.items.get(id)!); + } +} diff --git a/src/services/transformToProviderRequest.ts b/src/services/transformToProviderRequest.ts index 08da34cf8..af4d56876 100644 --- a/src/services/transformToProviderRequest.ts +++ b/src/services/transformToProviderRequest.ts @@ -1,8 +1,7 @@ import { GatewayError } from '../errors/GatewayError'; -import { MULTIPART_FORM_DATA_ENDPOINTS } from '../globals'; import ProviderConfigs from '../providers'; import { endpointStrings } from '../providers/types'; -import { Options, Params, Targets } from '../types/requestBody'; +import { Params } from '../types/requestBody'; /** * Helper function to set a nested property in an object. @@ -184,10 +183,16 @@ const transformToProviderRequestFormData = ( export const transformToProviderRequest = ( provider: string, params: Params, - inputParams: Params | FormData, + inputParams: Params | FormData | ArrayBuffer, fn: endpointStrings ) => { - if (MULTIPART_FORM_DATA_ENDPOINTS.includes(fn)) return inputParams; + if (inputParams instanceof FormData || inputParams instanceof ArrayBuffer) + return inputParams; + + if (fn === 'proxy') { + return params; + } + const providerAPIConfig = ProviderConfigs[provider].api; if ( providerAPIConfig.transformToFormData && diff --git a/src/start-server.ts b/src/start-server.ts index 8b0934765..f58da4231 100644 --- a/src/start-server.ts +++ b/src/start-server.ts @@ -3,6 +3,11 @@ import { serve } from '@hono/node-server'; import app from './index'; +import { streamSSE } from 'hono/streaming'; +import { Context } from 'hono'; +import { createNodeWebSocket } from '@hono/node-ws'; +import { realTimeHandlerNode } from './handlers/realtimeHandlerNode'; +import { requestValidator } from './middlewares/requestValidator'; // Extract the port number from the command line arguments const defaultPort = 8787; @@ -10,9 +15,176 @@ const args = process.argv.slice(2); const portArg = args.find((arg) => arg.startsWith('--port=')); const port = portArg ? parseInt(portArg.split('=')[1]) : defaultPort; -serve({ +const isHeadless = args.includes('--headless'); + +// Setup static file serving only if not in headless mode +if ( + !isHeadless && + !( + process.env.NODE_ENV === 'production' || + process.env.ENVIRONMENT === 'production' + ) +) { + const setupStaticServing = async () => { + const { join, dirname } = await import('path'); + const { fileURLToPath } = await import('url'); + const { readFileSync } = await import('fs'); + + const scriptDir = dirname(fileURLToPath(import.meta.url)); + + // Serve the index.html content directly for both routes + const indexPath = join(scriptDir, 'public/index.html'); + const indexContent = readFileSync(indexPath, 'utf-8'); + + const serveIndex = (c: Context) => { + return c.html(indexContent); + }; + + // Set up routes + app.get('/public/logs', serveIndex); + app.get('/public/', serveIndex); + + // Redirect `/public` to `/public/` + app.get('/public', (c: Context) => { + return c.redirect('/public/'); + }); + }; + + // Initialize static file serving + await setupStaticServing(); + + /** + * A helper function to enforce a timeout on SSE sends. + * @param fn A function that returns a Promise (e.g. stream.writeSSE()) + * @param timeoutMs The timeout in milliseconds (default: 2000) + */ + async function sendWithTimeout(fn: () => Promise, timeoutMs = 200) { + const timeoutPromise = new Promise((_, reject) => { + const id = setTimeout(() => { + clearTimeout(id); + reject(new Error('Write timeout')); + }, timeoutMs); + }); + + return Promise.race([fn(), timeoutPromise]); + } + + app.get('/log/stream', (c: Context) => { + const clientId = Date.now().toString(); + + // Set headers to prevent caching + c.header('Cache-Control', 'no-cache'); + c.header('X-Accel-Buffering', 'no'); + + return streamSSE(c, async (stream) => { + const addLogClient: any = c.get('addLogClient'); + const removeLogClient: any = c.get('removeLogClient'); + + const client = { + sendLog: (message: any) => + sendWithTimeout(() => stream.writeSSE(message)), + }; + // Add this client to the set of log clients + addLogClient(clientId, client); + + // If the client disconnects (closes the tab, etc.), this signal will be aborted + const onAbort = () => { + removeLogClient(clientId); + }; + c.req.raw.signal.addEventListener('abort', onAbort); + + try { + // Send an initial connection event + await sendWithTimeout(() => + stream.writeSSE({ event: 'connected', data: clientId }) + ); + + // Use an interval instead of a while loop + const heartbeatInterval = setInterval(async () => { + if (c.req.raw.signal.aborted) { + clearInterval(heartbeatInterval); + return; + } + + try { + await sendWithTimeout(() => + stream.writeSSE({ event: 'heartbeat', data: 'pulse' }) + ); + } catch (error) { + // console.error(`Heartbeat failed for client ${clientId}:`, error); + clearInterval(heartbeatInterval); + removeLogClient(clientId); + } + }, 10000); + + // Wait for abort signal + await new Promise((resolve) => { + c.req.raw.signal.addEventListener('abort', () => { + clearInterval(heartbeatInterval); + resolve(undefined); + }); + }); + } catch (error) { + // console.error(`Error in log stream for client ${clientId}:`, error); + } finally { + // Remove this client when the connection is closed + removeLogClient(clientId); + c.req.raw.signal.removeEventListener('abort', onAbort); + } + }); + }); +} + +const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app }); + +app.get( + '/v1/realtime', + requestValidator, + upgradeWebSocket(realTimeHandlerNode) +); + +const server = serve({ fetch: app.fetch, port: port, }); -console.log(`Your AI Gateway is now running on http://localhost:${port} 🚀`); +const url = `http://localhost:${port}`; + +injectWebSocket(server); + +// Loading animation function +async function showLoadingAnimation() { + const frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']; + let i = 0; + + return new Promise((resolve) => { + const interval = setInterval(() => { + process.stdout.write(`\r${frames[i]} Starting AI Gateway...`); + i = (i + 1) % frames.length; + }, 80); + + // Stop after 1 second + setTimeout(() => { + clearInterval(interval); + process.stdout.write('\r'); + resolve(undefined); + }, 1000); + }); +} + +// Clear the console and show animation before main output +console.clear(); +await showLoadingAnimation(); + +// Main server information with minimal spacing +console.log('\x1b[1m%s\x1b[0m', '🚀 Your AI Gateway is running at:'); +console.log(' ' + '\x1b[1;4;32m%s\x1b[0m', `${url}`); + +// Secondary information on single lines +if (!isHeadless) { + console.log('\n\x1b[90m📱 UI:\x1b[0m \x1b[36m%s\x1b[0m', `${url}/public/`); +} +// console.log('\x1b[90m📚 Docs:\x1b[0m \x1b[36m%s\x1b[0m', 'https://portkey.ai/docs'); + +// Single-line ready message +console.log('\n\x1b[32m✨ Ready for connections!\x1b[0m'); diff --git a/src/types/requestBody.ts b/src/types/requestBody.ts index 87d765c51..4f7833dd1 100644 --- a/src/types/requestBody.ts +++ b/src/types/requestBody.ts @@ -78,7 +78,7 @@ export interface Options { requestTimeout?: number; /** This is used to determine if the request should be transformed to formData Example: Stability V2 */ transformToFormData?: boolean; - /** AWS Bedrock specific */ + /** AWS specific (used for Bedrock and Sagemaker) */ awsSecretAccessKey?: string; awsAccessKeyId?: string; awsSessionToken?: string; @@ -87,6 +87,16 @@ export interface Options { awsRoleArn?: string; awsExternalId?: string; + /** Sagemaker specific */ + amznSagemakerCustomAttributes?: string; + amznSagemakerTargetModel?: string; + amznSagemakerTargetVariant?: string; + amznSagemakerTargetContainerHostname?: string; + amznSagemakerInferenceId?: string; + amznSagemakerEnableExplanations?: string; + amznSagemakerInferenceComponent?: string; + amznSagemakerSessionId?: string; + /** Stability AI specific */ stabilityClientId?: string; stabilityClientUserId?: string; @@ -105,6 +115,7 @@ export interface Options { /** OpenAI specific */ openaiProject?: string; openaiOrganization?: string; + openaiBeta?: string; /** Azure Inference Specific */ azureRegion?: string; @@ -206,15 +217,18 @@ export enum MESSAGE_ROLES { ASSISTANT = 'assistant', FUNCTION = 'function', TOOL = 'tool', + DEVELOPER = 'developer', } +export const SYSTEM_MESSAGE_ROLES = ['system', 'developer']; + export type OpenAIMessageRole = | 'system' | 'user' | 'assistant' | 'function' - | 'tool'; - + | 'tool' + | 'developer'; /** * A message in the conversation. * @interface @@ -290,7 +304,7 @@ export interface Tool extends AnthropicPromptCache { /** The name of the function. */ type: string; /** A description of the function. */ - function?: Function; + function: Function; } /** diff --git a/start-test.js b/start-test.js index 369be1668..b4ba7beb9 100644 --- a/start-test.js +++ b/start-test.js @@ -2,7 +2,7 @@ import { spawn } from 'node:child_process'; console.log('Starting the application...'); -const app = spawn('node', ['build/start-server.js'], { +const app = spawn('node', ['build/start-server.js', '--headless'], { stdio: 'inherit', });