Skip to content

Commit

Permalink
fix: streaming msg
Browse files Browse the repository at this point in the history
  • Loading branch information
hanxiao committed Feb 12, 2025
1 parent bdf03ef commit 45bb105
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 67 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ The server will start on http://localhost:3000 with the following endpoint:
curl http://localhost:3000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "gpt-4o-mini",
"model": "jina-deepsearch-v1",
"messages": [
{
"role": "user",
Expand All @@ -142,7 +142,7 @@ curl http://localhost:3000/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer your_secret_token" \
-d '{
"model": "gpt-4o-mini",
"model": "jina-deepsearch-v1",
"messages": [
{
"role": "user",
Expand All @@ -159,7 +159,7 @@ Response format:
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1677652288,
"model": "gpt-4o-mini",
"model": "jina-deepsearch-v1",
"system_fingerprint": "fp_44709d6fcb",
"choices": [{
"index": 0,
Expand Down Expand Up @@ -189,7 +189,7 @@ For streaming responses (stream: true), the server sends chunks in this format:
"id": "chatcmpl-123",
"object": "chat.completion.chunk",
"created": 1694268190,
"model": "gpt-4o-mini",
"model": "jina-deepsearch-v1",
"system_fingerprint": "fp_44709d6fcb",
"choices": [{
"index": 0,
Expand Down
83 changes: 20 additions & 63 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
ChatCompletionResponse,
ChatCompletionChunk,
AnswerAction,
TOKEN_CATEGORIES,
Model
} from './types';
import fs from 'fs/promises';
Expand Down Expand Up @@ -76,6 +75,7 @@ async function* streamTextWordByWord(text: string, streamingState: StreamingStat
async function emitRemainingContent(
res: Response,
requestId: string,
created: number,
model: string,
content: string
) {
Expand All @@ -84,7 +84,7 @@ async function emitRemainingContent(
const chunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
created,
model: model,
system_fingerprint: 'fp_' + requestId,
choices: [{
Expand All @@ -107,40 +107,19 @@ interface StreamingState {
}


async function emitContentImmediately(
res: Response,
requestId: string,
model: string,
content: string
) {
const chunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
model: model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: {content},
logprobs: null,
finish_reason: null
}]
};
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}


async function completeCurrentStreaming(
streamingState: StreamingState,
res: Response,
requestId: string,
created: number,
model: string
) {
if (streamingState.currentlyStreaming && streamingState.remainingContent) {
// Force completion of current streaming
await emitRemainingContent(
res,
requestId,
created,
model,
streamingState.remainingContent
);
Expand Down Expand Up @@ -203,7 +182,7 @@ if (secret) {
});
}

async function processQueue(streamingState: StreamingState, res: Response, requestId: string, model: string) {
async function processQueue(streamingState: StreamingState, res: Response, requestId: string, created: number, model: string) {
if (streamingState.processingQueue) return;

streamingState.processingQueue = true;
Expand All @@ -221,8 +200,8 @@ async function processQueue(streamingState: StreamingState, res: Response, reque
const chunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
model: model,
created,
model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
Expand Down Expand Up @@ -301,16 +280,12 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
}

const requestId = Date.now().toString();
const created = Math.floor(Date.now() / 1000);
const context: TrackerContext = {
tokenTracker: new TokenTracker(),
actionTracker: new ActionTracker()
};

// Track prompt tokens for the initial message
// Use Vercel's token counting convention - 1 token per message
const messageTokens = body.messages.length;
context.tokenTracker.trackUsage('agent', messageTokens, TOKEN_CATEGORIES.PROMPT);

// Add this inside the chat completions endpoint, before setting up the action listener
const streamingState: StreamingState = {
currentlyStreaming: false,
Expand All @@ -331,7 +306,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
const initialChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
created,
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
Expand All @@ -354,7 +329,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
});

// Start processing queue if not already processing
processQueue(streamingState, res, requestId, body.model);
processQueue(streamingState, res, requestId, created, body.model);
});
}
};
Expand All @@ -370,10 +345,6 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
}

try {
// Track initial query tokens - already tracked above
// const queryTokens = Buffer.byteLength(lastMessage.content, 'utf-8');
// context.tokenTracker.trackUsage('agent', queryTokens, 'prompt');

let result;
try {
({result} = await getResponse(lastMessage.content, undefined, undefined, context));
Expand All @@ -387,26 +358,15 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
}
}

// Track tokens based on action type
if (result.action === 'answer') {
// Track accepted prediction tokens for the final answer using Vercel's convention
const answerTokens = 1; // Default to 1 token per answer
context.tokenTracker.trackUsage('evaluator', answerTokens, TOKEN_CATEGORIES.ACCEPTED);
} else {
// Track rejected prediction tokens for non-answer responses
const rejectedTokens = 1; // Default to 1 token per rejected response
context.tokenTracker.trackUsage('evaluator', rejectedTokens, TOKEN_CATEGORIES.REJECTED);
}

if (body.stream) {
// Complete any ongoing streaming before sending final answer
await completeCurrentStreaming(streamingState, res, requestId, body.model);
await completeCurrentStreaming(streamingState, res, requestId, created, body.model);

// Send closing think tag
const closeThinkChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
created,
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
Expand All @@ -422,7 +382,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
const answerChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
created,
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
Expand All @@ -439,7 +399,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
const response: ChatCompletionResponse = {
id: requestId,
object: 'chat.completion',
created: Math.floor(Date.now() / 1000),
created,
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
Expand Down Expand Up @@ -475,9 +435,6 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {

// Track error as rejected tokens with Vercel token counting
const errorMessage = error?.message || 'An error occurred';
// Default to 1 token for errors as per Vercel AI SDK convention
const errorTokens = 1;
context.tokenTracker.trackUsage('evaluator', errorTokens, TOKEN_CATEGORIES.REJECTED);

// Clean up event listeners
context.actionTracker.removeAllListeners('action');
Expand All @@ -491,24 +448,24 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
const closeThinkChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
created,
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: {content: '</think>'},
logprobs: null,
finish_reason: null
}]
}],
usage
};
res.write(`data: ${JSON.stringify(closeThinkChunk)}\n\n`);

// Track error token and send error message
context.tokenTracker.trackUsage('evaluator', 1, TOKEN_CATEGORIES.REJECTED);

const errorChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created: Math.floor(Date.now() / 1000),
created,
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
Expand All @@ -525,7 +482,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
const response: ChatCompletionResponse = {
id: requestId,
object: 'chat.completion',
created: Math.floor(Date.now() / 1000),
created,
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ export interface ChatCompletionChunk {
logprobs: null;
finish_reason: null | 'stop';
}>;
usage?: any;
}

// Tracker Types
Expand Down

0 comments on commit 45bb105

Please sign in to comment.