Skip to content

Commit

Permalink
wip: use workqueue to offload progress events
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Hale <[email protected]>
  • Loading branch information
njhale committed Sep 6, 2024
1 parent 58e30f1 commit 16b1087
Showing 1 changed file with 47 additions and 24 deletions.
71 changes: 47 additions & 24 deletions components/chat/useChatSocket.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const useChatSocket = (isEmpty?: boolean) => {
'github.com/gptscript-ai/search-website',
'github.com/gptscript-ai/tools/apis/hubspot/crm/read',
];

// State
const [socket, setSocket] = useState<Socket | null>(null);
const [connected, setConnected] = useState(false);
Expand All @@ -52,6 +53,9 @@ const useChatSocket = (isEmpty?: boolean) => {
// trustedOpenAPIRef contains a mapping of OpenAPI run tools to OpenAPI operation names that have been trusted.
const trustedOpenAPIRef = useRef<Record<string, Record<string, boolean>>>({});

// Workqueue for storing progress events
const workQueue = useRef<Array<{ frame: Frame; name?: string }>>([]);

// update the refs as the state changes
messagesRef.current = messages;
socketRef.current = socket;
Expand All @@ -63,7 +67,6 @@ const useChatSocket = (isEmpty?: boolean) => {
setError(error);
setMessages((prevMessages) => {
if (!latestAgentMessageRef.current.type) {
// If there are no previous messages, create a new error message
return [
...prevMessages,
{
Expand All @@ -85,6 +88,23 @@ const useChatSocket = (isEmpty?: boolean) => {
// handles progress being received from the server (callProgress style frames).
const handleProgress = useCallback(
({ frame, name }: { frame: Frame; name?: string }) => {
workQueue.current.push({ frame, name });
},
[]
);

// Function to process only the number of messages that were in the queue when processing started
const processWorkQueue = useCallback(() => {
const initialQueueLength = workQueue.current.length;
if (initialQueueLength === 0) return;

console.log(`processing ${initialQueueLength} items from the work queue`);

for (let i = 0; i < initialQueueLength; i++) {
if (workQueue.current.length === 0) break; // Stop if the queue is empty

const { frame, name } = workQueue.current.shift()!; // Remove and process the first message in the queue

if (!latestAgentMessageRef.current.type)
latestAgentMessageRef.current.type = MessageType.Agent;
if (!latestAgentMessageRef.current.name)
Expand All @@ -98,36 +118,35 @@ const useChatSocket = (isEmpty?: boolean) => {
'Waiting for model response...';
setLatestAgentMessage({ ...latestAgentMessageRef.current });
}
return;
continue;
}

// At this point, we know that we are dealing with a call frame
frame = frame as CallFrame;
const callFrame = frame as CallFrame;
if (!latestAgentMessageRef.current.calls) {
latestAgentMessageRef.current.calls = {};
}
latestAgentMessageRef.current.calls[frame.id] = frame;
latestAgentMessageRef.current.calls[callFrame.id] = callFrame;

if (!frame.error && frame.toolCategory === 'provider') {
return;
if (!callFrame.error && callFrame.toolCategory === 'provider') {
continue;
}

const isMainContent =
frame?.output &&
frame.output.length > 0 &&
(!frame.parentID || frame.tool?.chat) &&
!frame.output[frame.output.length - 1].subCalls;
callFrame?.output &&
callFrame.output.length > 0 &&
(!callFrame.parentID || callFrame.tool?.chat) &&
!callFrame.output[callFrame.output.length - 1].subCalls;

let content = isMainContent
? frame.output[frame.output.length - 1].content || ''
? callFrame.output[callFrame.output.length - 1].content || ''
: '';
if (!content) return;
if (!content) continue;
setGenerating(true);
if (
content === 'Waiting for model response...' &&
latestAgentMessageRef.current.message
)
return;
continue;

if (content.startsWith('<tool call>')) {
const parsedToolCall = parseToolCall(content);
Expand All @@ -136,7 +155,7 @@ const useChatSocket = (isEmpty?: boolean) => {

latestAgentMessageRef.current.message = content;

if (isMainContent && frame.type == 'callFinish') {
if (isMainContent && callFrame.type === 'callFinish') {
setMessages([
...messagesRef.current,
{ ...latestAgentMessageRef.current },
Expand All @@ -146,9 +165,17 @@ const useChatSocket = (isEmpty?: boolean) => {
} else {
setLatestAgentMessage({ ...latestAgentMessageRef.current });
}
},
[]
);
}
}, []);

// Set up the interval to process the work queue every 100ms
useEffect(() => {
const intervalId = setInterval(() => {
processWorkQueue();
}, 100);

return () => clearInterval(intervalId); // Clear interval on component unmount
}, [processWorkQueue]);

const handlePromptRequest = useCallback(
({ frame, name }: { frame: PromptFrame; name?: string }) => {
Expand Down Expand Up @@ -227,7 +254,6 @@ const useChatSocket = (isEmpty?: boolean) => {
) => {
setTools(tools);
if (scriptContent) {
// Ensure the knowledge tool isn't set.
const tool = await rootTool(scriptContent);
tool.tools = (tool.tools || []).filter((t) => t !== gatewayTool());
setScriptContent(scriptContent);
Expand Down Expand Up @@ -290,7 +316,6 @@ const useChatSocket = (isEmpty?: boolean) => {
const repo = frame.tool?.source.repo.Root;
const trimmedRepo = trimRepo(repo);

// If it is a read-only tool we've authored, auto-allow it.
if (
trimmedRepo.startsWith('github.com/gptscript-ai') &&
(frame.tool.name?.startsWith('list') ||
Expand All @@ -310,10 +335,8 @@ const useChatSocket = (isEmpty?: boolean) => {
}

// If the tool is a system tool and wasn't already trusted, return false.
if (frame.tool?.name?.startsWith('sys.')) return false;

// Automatically allow all other tools
return true;
// Automatically allow all other tools.
return !frame.tool?.name?.startsWith('sys.');
};

const addTrustedFor = (frame: CallFrame) => {
Expand Down

0 comments on commit 16b1087

Please sign in to comment.