Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue incoming events #28

Merged
merged 2 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@11labs/client",
"version": "0.0.6-beta.2",
"version": "0.0.6-beta.3",
"description": "ElevenLabs JavaScript Client Library",
"main": "./dist/lib.umd.js",
"module": "./dist/lib.module.js",
Expand Down
242 changes: 114 additions & 128 deletions packages/client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ import {
OnDisconnectCallback,
SessionConfig,
} from "./utils/connection";
import {
ClientToolCallEvent,
isValidSocketEvent,
PingEvent,
} from "./utils/events";
import { ClientToolCallEvent, IncomingSocketEvent } from "./utils/events";

export type { IncomingSocketEvent } from "./utils/events";
export type { SessionConfig, DisconnectionDetails } from "./utils/connection";
Expand Down Expand Up @@ -84,6 +80,13 @@ export class Conversation {
let output: Output | null = null;

try {
// some browsers won't allow calling getSupportedConstraints or enumerateDevices
// before getting approval for microphone access
const preliminaryInputStream = await navigator.mediaDevices.getUserMedia({
audio: true,
});
preliminaryInputStream?.getTracks().forEach(track => track.stop());

Comment on lines +83 to +89
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was moved here to init the microphone before everything else, but we close it right after. I'm guessing it's solving the problem from your testing, but it's a bit weird that we'd close it here and then recreate shortly after without awaiting. Seems like it could still keep some issues.

Would it make sense to create Input first instead here? That one already had this check, and we can await Input creation before moving to anything else. This bit of the code would stay where it was, and the code related to input (and the comment) would stay in place where the input code was meant to be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is how it used to be before. But the input can now have a different sampling rate depending on the agent configuration, so we need to establish a connection first, before we know how to set up the streams.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha! Perhaps it could be within Input.prepare method or something, which would be called here? The code (at least with the current comment about getSupportedConstraints or enumerateDevices) doesn't make much sense here.

connection = await Connection.create(options);
[input, output] = await Promise.all([
Input.create({
Expand Down Expand Up @@ -122,9 +125,7 @@ export class Conversation {
this.options.onConnect({ conversationId: connection.conversationId });

this.connection.onDisconnect(this.endSessionWithDetails);
this.connection.socket.addEventListener("message", event => {
this.onEvent(event);
});
this.connection.onMessage(this.onMessage);

this.input.worklet.port.onmessage = this.onInputWorkletMessage;
this.output.worklet.port.onmessage = this.onOutputWorkletMessage;
Expand Down Expand Up @@ -167,142 +168,127 @@ export class Conversation {
}
};

private onEvent = async (event: MessageEvent) => {
try {
const parsedEvent = JSON.parse(event.data);
private onMessage = async (parsedEvent: IncomingSocketEvent) => {
switch (parsedEvent.type) {
case "interruption": {
if (parsedEvent.interruption_event) {
this.lastInterruptTimestamp = parsedEvent.interruption_event.event_id;
}
this.fadeOutAudio();
break;
}

if (!isValidSocketEvent(parsedEvent)) {
return;
case "agent_response": {
this.options.onMessage({
source: "ai",
message: parsedEvent.agent_response_event.agent_response,
});
break;
}

switch (parsedEvent.type) {
case "interruption": {
if (parsedEvent.interruption_event) {
this.lastInterruptTimestamp =
parsedEvent.interruption_event.event_id;
}
this.fadeOutAudio();
break;
}
case "user_transcript": {
this.options.onMessage({
source: "user",
message: parsedEvent.user_transcription_event.user_transcript,
});
break;
}

case "agent_response": {
this.options.onMessage({
source: "ai",
message: parsedEvent.agent_response_event.agent_response,
});
break;
}
case "internal_tentative_agent_response": {
this.options.onDebug({
type: "tentative_agent_response",
response:
parsedEvent.tentative_agent_response_internal_event
.tentative_agent_response,
});
break;
}

case "user_transcript": {
this.options.onMessage({
source: "user",
message: parsedEvent.user_transcription_event.user_transcript,
});
break;
}
case "client_tool_call": {
if (
this.options.clientTools.hasOwnProperty(
parsedEvent.client_tool_call.tool_name
)
) {
try {
const result =
(await this.options.clientTools[
parsedEvent.client_tool_call.tool_name
](parsedEvent.client_tool_call.parameters)) ??
"Client tool execution successful."; // default client-tool call response

this.connection.sendMessage({
type: "client_tool_result",
tool_call_id: parsedEvent.client_tool_call.tool_call_id,
result: result,
is_error: false,
});
} catch (e) {
this.onError(
"Client tool execution failed with following error: " +
(e as Error)?.message,
{
clientToolName: parsedEvent.client_tool_call.tool_name,
}
);
this.connection.sendMessage({
type: "client_tool_result",
tool_call_id: parsedEvent.client_tool_call.tool_call_id,
result: "Client tool execution failed: " + (e as Error)?.message,
is_error: true,
});
}

case "internal_tentative_agent_response": {
this.options.onDebug({
type: "tentative_agent_response",
response:
parsedEvent.tentative_agent_response_internal_event
.tentative_agent_response,
});
break;
}

case "client_tool_call": {
if (
this.options.clientTools.hasOwnProperty(
parsedEvent.client_tool_call.tool_name
)
) {
try {
const result =
(await this.options.clientTools[
parsedEvent.client_tool_call.tool_name
](parsedEvent.client_tool_call.parameters)) ??
"Client tool execution successful."; // default client-tool call response

this.connection.sendMessage({
type: "client_tool_result",
tool_call_id: parsedEvent.client_tool_call.tool_call_id,
result: result,
is_error: false,
});
} catch (e) {
this.onError(
"Client tool execution failed with following error: " +
(e as Error)?.message,
{
clientToolName: parsedEvent.client_tool_call.tool_name,
}
);
this.connection.sendMessage({
type: "client_tool_result",
tool_call_id: parsedEvent.client_tool_call.tool_call_id,
result:
"Client tool execution failed: " + (e as Error)?.message,
is_error: true,
});
}

break;
}

if (this.options.onUnhandledClientToolCall) {
this.options.onUnhandledClientToolCall(
parsedEvent.client_tool_call
);

break;
}

this.onError(
`Client tool with name ${parsedEvent.client_tool_call.tool_name} is not defined on client`,
{
clientToolName: parsedEvent.client_tool_call.tool_name,
}
);
this.connection.sendMessage({
type: "client_tool_result",
tool_call_id: parsedEvent.client_tool_call.tool_call_id,
result: `Client tool with name ${parsedEvent.client_tool_call.tool_name} is not defined on client`,
is_error: true,
});
if (this.options.onUnhandledClientToolCall) {
this.options.onUnhandledClientToolCall(parsedEvent.client_tool_call);

break;
}

case "audio": {
if (this.lastInterruptTimestamp <= parsedEvent.audio_event.event_id) {
this.addAudioBase64Chunk(parsedEvent.audio_event.audio_base_64);
this.currentEventId = parsedEvent.audio_event.event_id;
this.updateCanSendFeedback();
this.updateMode("speaking");
this.onError(
`Client tool with name ${parsedEvent.client_tool_call.tool_name} is not defined on client`,
{
clientToolName: parsedEvent.client_tool_call.tool_name,
}
break;
}
);
this.connection.sendMessage({
type: "client_tool_result",
tool_call_id: parsedEvent.client_tool_call.tool_call_id,
result: `Client tool with name ${parsedEvent.client_tool_call.tool_name} is not defined on client`,
is_error: true,
});

break;
}

case "ping": {
this.connection.sendMessage({
type: "pong",
event_id: (parsedEvent as PingEvent).ping_event.event_id,
});
// parsedEvent.ping_event.ping_ms can be used on client side, for example
// to warn if ping is too high that experience might be degraded.
break;
case "audio": {
if (this.lastInterruptTimestamp <= parsedEvent.audio_event.event_id) {
this.addAudioBase64Chunk(parsedEvent.audio_event.audio_base_64);
this.currentEventId = parsedEvent.audio_event.event_id;
this.updateCanSendFeedback();
this.updateMode("speaking");
}
break;
}

// unhandled events are expected to be internal events
default: {
this.options.onDebug(parsedEvent);
break;
}
case "ping": {
this.connection.sendMessage({
type: "pong",
event_id: parsedEvent.ping_event.event_id,
});
// parsedEvent.ping_event.ping_ms can be used on client side, for example
// to warn if ping is too high that experience might be degraded.
break;
}

// unhandled events are expected to be internal events
default: {
this.options.onDebug(parsedEvent);
break;
}
} catch {
this.onError("Failed to parse event data", { event });
return;
}
};

Expand All @@ -328,7 +314,7 @@ export class Conversation {
}
};

private addAudioBase64Chunk = async (chunk: string) => {
private addAudioBase64Chunk = (chunk: string) => {
gmrchk marked this conversation as resolved.
Show resolved Hide resolved
this.output.gain.gain.value = this.volume;
this.output.worklet.port.postMessage({ type: "clearInterrupted" });
this.output.worklet.port.postMessage({
Expand All @@ -337,7 +323,7 @@ export class Conversation {
});
};

private fadeOutAudio = async () => {
private fadeOutAudio = () => {
// mute agent
this.updateMode("listening");
this.output.worklet.port.postMessage({ type: "interrupt" });
Expand Down
24 changes: 24 additions & 0 deletions packages/client/src/utils/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
ConfigEvent,
isValidSocketEvent,
OutgoingSocketEvent,
IncomingSocketEvent,
} from "./events";

const MAIN_PROTOCOL = "convai";
Expand Down Expand Up @@ -76,6 +77,7 @@ export type DisconnectionDetails =
reason: "user";
};
export type OnDisconnectCallback = (details: DisconnectionDetails) => void;
export type OnMessageCallback = (event: IncomingSocketEvent) => void;

const WSS_API_ORIGIN = "wss://api.elevenlabs.io";
const WSS_API_PATHNAME = "/v1/convai/conversation?agent_id=";
Expand Down Expand Up @@ -174,8 +176,10 @@ export class Connection {
}
}

private queue: IncomingSocketEvent[] = [];
private disconnectionDetails: DisconnectionDetails | null = null;
private onDisconnectCallback: OnDisconnectCallback | null = null;
private onMessageCallback: OnMessageCallback | null = null;

private constructor(
public readonly socket: WebSocket,
Expand Down Expand Up @@ -212,6 +216,20 @@ export class Connection {
}
);
});
this.socket.addEventListener("message", event => {
try {
const parsedEvent = JSON.parse(event.data);
if (!isValidSocketEvent(parsedEvent)) {
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should actually report this in console? Not a point of the PR so we can skip.

}

if (this.onMessageCallback) {
this.onMessageCallback(parsedEvent);
} else {
this.queue.push(parsedEvent);
}
} catch (_) {}
});
}

public close() {
Expand All @@ -222,6 +240,12 @@ export class Connection {
this.socket.send(JSON.stringify(message));
}

public onMessage(callback: OnMessageCallback) {
this.onMessageCallback = callback;
this.queue.forEach(callback);
this.queue = [];
}

public onDisconnect(callback: OnDisconnectCallback) {
this.onDisconnectCallback = callback;
if (this.disconnectionDetails) {
Expand Down
7 changes: 0 additions & 7 deletions packages/client/src/utils/input.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,6 @@ export class Input {
noiseSuppression: { ideal: true },
};

// some browsers won't allow calling getSupportedConstraints or enumerateDevices
// before getting approval for microphone access
const preliminaryInputStream = await navigator.mediaDevices.getUserMedia({
audio: true,
});
preliminaryInputStream?.getTracks().forEach(track => track.stop());

if (isIosDevice() && preferHeadphonesForIosDevices) {
const availableDevices =
await window.navigator.mediaDevices.enumerateDevices();
Expand Down
2 changes: 1 addition & 1 deletion packages/react/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@11labs/react",
"version": "0.0.6-beta.2",
"version": "0.0.6-beta.3",
"description": "ElevenLabs React Library",
"main": "./dist/lib.umd.js",
"module": "./dist/lib.module.js",
Expand Down
Loading