Skip to content

Commit

Permalink
fix: emit stream
Browse files Browse the repository at this point in the history
  • Loading branch information
hanxiao committed Feb 13, 2025
1 parent bd77535 commit 23ceaa0
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 48 deletions.
26 changes: 12 additions & 14 deletions src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,7 @@ export async function getResponse(question: string,
let allowRead = true;
let allowReflect = true;
let prompt = '';
let thisStep: StepAction = {action: 'answer', answer: '', references: [], think: ''};
let isAnswered = false;
let thisStep: StepAction = {action: 'answer', answer: '', references: [], think: '', isFinal: false};

const allURLs: Record<string, string> = {};
const visitedURLs: string[] = [];
Expand Down Expand Up @@ -388,7 +387,7 @@ export async function getResponse(question: string,
if (thisStep.action === 'answer') {
if (step === 1) {
// LLM is so confident and answer immediately, skip all evaluations
isAnswered = true;
thisStep.isFinal = true;
break
}

Expand Down Expand Up @@ -417,11 +416,11 @@ ${evaluation.think}
Your journey ends here. You have successfully answered the original question. Congratulations! 🎉
`);
isAnswered = true;
thisStep.isFinal = true;
break
} else {
if (badAttempts >= maxBadAttempts) {
isAnswered = false;
thisStep.isFinal = false;
break
} else {
diaryContext.push(`
Expand Down Expand Up @@ -676,9 +675,7 @@ You decided to think out of the box or cut from a completely different angle.`);
}

await storeContext(prompt, schema, [allContext, allKeywords, allQuestions, allKnowledge], totalStep);
if (isAnswered) {
return {result: thisStep, context};
} else {
if (!(thisStep as AnswerAction).isFinal) {
console.log('Enter Beast mode!!!')
// any answer is better than no answer, humanity last resort
step++;
Expand All @@ -705,14 +702,15 @@ You decided to think out of the box or cut from a completely different angle.`);
schema,
prompt,
});

await storeContext(prompt, schema, [allContext, allKeywords, allQuestions, allKnowledge], totalStep);
thisStep = result.object as StepAction;
thisStep = result.object as AnswerAction;
(thisStep as AnswerAction).isFinal = true;
context.actionTracker.trackAction({totalStep, thisStep, gaps, badAttempts});

console.log(thisStep)
return {result: thisStep, context};
}
console.log(thisStep)

await storeContext(prompt, schema, [allContext, allKeywords, allQuestions, allKnowledge], totalStep);
return {result: thisStep, context};

}

async function storeContext(prompt: string, schema: any, memory: any[][], step: number) {
Expand Down
65 changes: 33 additions & 32 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
ChatCompletionResponse,
ChatCompletionChunk,
AnswerAction,
Model
Model, StepAction
} from './types';
import {TokenTracker} from "./utils/token-tracker";
import {ActionTracker} from "./utils/action-tracker";
Expand All @@ -26,25 +26,26 @@ app.get('/health', (req, res) => {
});

function buildMdFromAnswer(answer: AnswerAction) {
let refStr = '';
if (answer.references?.length > 0) {
refStr = `
if (!answer.references?.length || !answer.references.some(ref => ref.url.startsWith('http'))) {
return answer.answer;
}

const references = answer.references.map((ref, i) => {
const escapedQuote = ref.exactQuote
.replace(/([[\]_*`])/g, '\\$1')
.replace(/\n/g, ' ')
.trim();

return `[^${i + 1}]: [${escapedQuote}](${ref.url})`;
}).join('\n\n');

return `${answer.answer.replace(/\(REF_(\d+)\)/g, (_, num) => `[^${num}]`)}
<references>
${answer.references.map((ref, i) => {
// Escape special markdown characters in the quote
const escapedQuote = ref.exactQuote
.replace(/([[\]_*`])/g, '\\$1') // Escape markdown syntax chars
.replace(/\n/g, ' ') // Replace line breaks with spaces
.trim(); // Remove excess whitespace
return `[^${i + 1}]: [${escapedQuote}](${ref.url})\n\n`;
}).join()}
</references>
`.trim();
}
return `${answer.answer.replace(/\(REF_(\d+)\)/g, (_, num) => `[^${num}]`)}\n\n${refStr}`;
${references}
</references>`;
}

async function* streamTextNaturally(text: string, streamingState: StreamingState) {
Expand Down Expand Up @@ -465,16 +466,16 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
res.write(`data: ${JSON.stringify(initialChunk)}\n\n`);

// Set up progress listener with cleanup
const actionListener = async (action: any) => {
if (action.thisStep.think) {
// Create a promise that resolves when this content is done streaming
const actionListener = async (step: StepAction) => {
// Add content to queue for both thinking steps and final answer
if (step.think) {
const content = step.think;
await new Promise<void>(resolve => {
streamingState.queue.push({
content: action.thisStep.think,
content,
resolve
});

// Start processing queue if not already processing
// Single call to process queue is sufficient
processQueue(streamingState, res, requestId, created, body.model);
});
}
Expand All @@ -491,13 +492,13 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
}

try {
const {result} = await getResponse(lastMessage.content as string, tokenBudget, maxBadAttempts, context, body.messages)
const {result: finalStep} = await getResponse(lastMessage.content as string, tokenBudget, maxBadAttempts, context, body.messages)

const usage = context.tokenTracker.getTotalUsageSnakeCase();
if (body.stream) {
// Complete any ongoing streaming before sending final answer
await completeCurrentStreaming(streamingState, res, requestId, created, body.model);

const finalAnswer = buildMdFromAnswer(finalStep as AnswerAction);
// Send closing think tag
const closeThinkChunk: ChatCompletionChunk = {
id: requestId,
Expand All @@ -507,29 +508,29 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: {content: `</think>\n\n`},
delta: {content: `</think>\n\n${finalAnswer}`},
logprobs: null,
finish_reason: null
}]
};
res.write(`data: ${JSON.stringify(closeThinkChunk)}\n\n`);

// Send final answer as separate chunk
const answerChunk: ChatCompletionChunk = {
// After the content is fully streamed, send the final chunk with finish_reason and usage
const finalChunk: ChatCompletionChunk = {
id: requestId,
object: 'chat.completion.chunk',
created,
model: body.model,
system_fingerprint: 'fp_' + requestId,
choices: [{
index: 0,
delta: {content: result.action === 'answer' ? buildMdFromAnswer(result) : result.think},
delta: {content: ''},
logprobs: null,
finish_reason: 'stop'
}],
usage
};
res.write(`data: ${JSON.stringify(answerChunk)}\n\n`);
res.write(`data: ${JSON.stringify(finalChunk)}\n\n`);
res.end();
} else {

Expand All @@ -543,7 +544,7 @@ app.post('/v1/chat/completions', (async (req: Request, res: Response) => {
index: 0,
message: {
role: 'assistant',
content: result.action === 'answer' ? buildMdFromAnswer(result) : result.think
content: finalStep.action === 'answer' ? buildMdFromAnswer(finalStep) : finalStep.think
},
logprobs: null,
finish_reason: 'stop'
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export type AnswerAction = BaseAction & {
exactQuote: string;
url: string;
}>;
isFinal?: boolean;
};

export type ReflectAction = BaseAction & {
Expand Down
4 changes: 2 additions & 2 deletions src/utils/action-tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ export class ActionTracker extends EventEmitter {

trackAction(newState: Partial<ActionState>) {
this.state = { ...this.state, ...newState };
this.emit('action', this.state);
this.emit('action', this.state.thisStep);
}

trackThink(think: string) {
// only update the think field of the current state
this.state = { ...this.state, thisStep: { ...this.state.thisStep, think } };
this.emit('action', this.state);
this.emit('action', this.state.thisStep);
}

getState(): ActionState {
Expand Down

0 comments on commit 23ceaa0

Please sign in to comment.