Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated example to switch pipelines per the original request #1320

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 205 additions & 75 deletions examples/phone-chatbot/bot_daily_gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
EndFrame,
EndTaskFrame,
InputAudioRawFrame,
StopTaskFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
Expand All @@ -25,10 +27,15 @@
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.ai_services import LLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.google import GoogleLLMService
from pipecat.services.google.google import GoogleLLMContext
from pipecat.transports.services.daily import DailyDialinSettings, DailyParams, DailyTransport
from pipecat.transports.services.daily import (
DailyDialinSettings,
DailyParams,
DailyTransport,
)

load_dotenv(override=True)

Expand All @@ -39,6 +46,8 @@
daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")

system_message = None


class UserAudioCollector(FrameProcessor):
"""This FrameProcessor collects audio frames in a buffer, then adds them to the
Expand Down Expand Up @@ -112,7 +121,13 @@ def __init__(self, context_switcher):
self.context_switcher = context_switcher

async def voicemail_response(
self, function_name, tool_call_id, args, llm, context, result_callback
self,
function_name,
tool_call_id,
args,
llm: LLMService,
context,
result_callback,
):
"""Function the bot can call to leave a voicemail message."""
message = """You are Chatbot leaving a voicemail message. Say EXACTLY this message and nothing else:
Expand All @@ -122,38 +137,34 @@ async def voicemail_response(
After saying this message, call the terminate_call function."""

await self.context_switcher.switch_context(system_instruction=message)

await result_callback("Leaving a voicemail message")

async def human_conversation(
self, function_name, tool_call_id, args, llm, context, result_callback
self,
function_name,
tool_call_id,
args,
llm: LLMService,
context,
result_callback,
):
"""Function the bot can when it detects it's talking to a human."""
message = """You are Chatbot talking to a human. Be friendly and helpful.

Start with: "Hello! I'm a friendly chatbot. How can I help you today?"

Keep your responses brief and to the point. Listen to what the person says.

When the person indicates they're done with the conversation by saying something like:
- "Goodbye"
- "That's all"
- "I'm done"
- "Thank you, that's all I needed"

THEN say: "Thank you for chatting. Goodbye!" and call the terminate_call function."""

await self.context_switcher.switch_context(system_instruction=message)

await result_callback("Talking to the customer")
await llm.push_frame(StopTaskFrame(), FrameDirection.UPSTREAM)


async def terminate_call(
function_name, tool_call_id, args, llm: LLMService, context, result_callback
function_name,
tool_call_id,
args,
llm: LLMService,
context,
result_callback,
call_state=None,
):
"""Function the bot can call to terminate the call upon completion of the call."""

await llm.queue_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
if call_state:
call_state.bot_terminated_call = True
await llm.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)


async def main(
Expand All @@ -164,20 +175,10 @@ async def main(
detect_voicemail: bool,
dialout_number: Optional[str],
):
# dialin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.

# We don't want to specify dial-in settings if we're not dialing in
dialin_settings = None
if callId and callDomain:
if callId != "None" and callDomain != "None":
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)

transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
transport_params = DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
dialin_settings=dialin_settings,
Expand All @@ -187,15 +188,41 @@ async def main(
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
# transcription_enabled=True,
),
)
else:
transport_params = DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
)

class CallState:
participant_left_early = False
bot_terminated_call = False

call_state = CallState()

transport = DailyTransport(
room_url,
token,
"Chatbot",
transport_params,
)

tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)

stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

### VOICEMAIL PIPELINE

tools = [
{
"function_declarations": [
Expand All @@ -217,55 +244,67 @@ async def main(

system_instruction = """You are Chatbot trying to determine if this is a voicemail system or a human.

If you hear any of these phrases (or very similar ones):
- "Please leave a message after the beep"
- "No one is available to take your call"
- "Record your message after the tone"
- "You have reached voicemail for..."
- "You have reached [phone number]"
- "[phone number] is unavailable"
- "The person you are trying to reach..."
- "The number you have dialed..."
- "Your call has been forwarded to an automated voice messaging system"
If you hear any of these phrases (or very similar ones):
- "Please leave a message after the beep"
- "No one is available to take your call"
- "Record your message after the tone"
- "You have reached voicemail for..."
- "You have reached [phone number]"
- "[phone number] is unavailable"
- "The person you are trying to reach..."
- "The number you have dialed..."
- "Your call has been forwarded to an automated voice messaging system"

Then call the function switch_to_voicemail_response.
Then call the function switch_to_voicemail_response.

If it sounds like a human (saying hello, asking questions, etc.), call the function switch_to_human_conversation.
If it sounds like a human (saying hello, asking questions, etc.), call the function switch_to_human_conversation.

DO NOT say anything until you've determined if this is a voicemail or human."""
DO NOT say anything until you've determined if this is a voicemail or human."""

llm = GoogleLLMService(
voicemail_detection_llm = GoogleLLMService(
model="models/gemini-2.0-flash-lite",
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
tools=tools,
)

context = GoogleLLMContext()
context_aggregator = llm.create_context_aggregator(context)
audio_collector = UserAudioCollector(context, context_aggregator.user())

context_switcher = ContextSwitcher(llm, context_aggregator.user())
voicemail_detection_context = GoogleLLMContext()
voicemail_detection_context_aggregator = voicemail_detection_llm.create_context_aggregator(
voicemail_detection_context
)
context_switcher = ContextSwitcher(
voicemail_detection_llm, voicemail_detection_context_aggregator.user()
)
handlers = FunctionHandlers(context_switcher)

llm.register_function("switch_to_voicemail_response", handlers.voicemail_response)
llm.register_function("switch_to_human_conversation", handlers.human_conversation)
llm.register_function("terminate_call", terminate_call)
voicemail_detection_llm.register_function(
"switch_to_voicemail_response", handlers.voicemail_response
)
voicemail_detection_llm.register_function(
"switch_to_human_conversation", handlers.human_conversation
)
voicemail_detection_llm.register_function(
"terminate_call",
lambda *args, **kwargs: terminate_call(*args, **kwargs, call_state=call_state),
)

voicemail_detection_audio_collector = UserAudioCollector(
voicemail_detection_context, voicemail_detection_context_aggregator.user()
)

pipeline = Pipeline(
voicemail_detection_pipeline = Pipeline(
[
transport.input(), # Transport user input
audio_collector, # Collect audio frames
context_aggregator.user(), # User responses
llm, # LLM
voicemail_detection_audio_collector, # Collect audio frames
voicemail_detection_context_aggregator.user(), # User responses
voicemail_detection_llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
voicemail_detection_context_aggregator.assistant(), # Assistant spoken responses
]
)

task = PipelineTask(
pipeline,
voicemail_detection_pipeline_task = PipelineTask(
voicemail_detection_pipeline,
params=PipelineParams(allow_interruptions=True),
)

Expand Down Expand Up @@ -300,25 +339,116 @@ async def on_first_participant_joined(transport, participant):
# machine to say something like 'Leave a message after the beep', or for the user to say 'Hello?'.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
logger.debug("Detect voicemail; capturing participant transcription")
await transport.capture_participant_transcription(participant["id"])
else:
logger.debug("no dialout number; assuming dialin")
logger.debug("+++++ No dialout number; assuming dialin")

# Different handlers for dialin
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
# This event is not firing for some reason
await transport.capture_participant_transcription(participant["id"])
# For the dialin case, we want the bot to answer the phone and greet the user. We
# can prompt the bot to speak by putting the context into the pipeline.
await task.queue_frames([context_aggregator.user().get_context_frame()])
dialin_instructions = """Always call the function switch_to_human_conversation"""
messages = [
{
"role": "system",
"content": dialin_instructions,
}
]
voicemail_detection_context_aggregator.user().set_messages(messages)
await voicemail_detection_pipeline_task.queue_frames(
[voicemail_detection_context_aggregator.user().get_context_frame()]
)

runner = PipelineRunner()

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
call_state.participant_left_early = True
await voicemail_detection_pipeline_task.queue_frame(EndFrame())

runner = PipelineRunner()
print("!!! starting voicemail detection pipeline")
await runner.run(voicemail_detection_pipeline_task)
print("!!! Done with voicemail detection pipeline")

if call_state.participant_left_early or call_state.bot_terminated_call:
if call_state.participant_left_early:
print("!!! Participant left early; terminating call")
elif call_state.bot_terminated_call:
print("!!! Bot terminated call; not proceeding to human conversation")
return

### HUMAN CONVERSATION PIPELINE

human_conversation_system_instruction = """You are Chatbot talking to a human. Be friendly and helpful.

Start with: "Hello! I'm a friendly chatbot. How can I help you today?"

Keep your responses brief and to the point. Listen to what the person says.

When the person indicates they're done with the conversation by saying something like:
- "Goodbye"
- "That's all"
- "I'm done"
- "Thank you, that's all I needed"

THEN say: "Thank you for chatting. Goodbye!" and call the terminate_call function."""

human_conversation_llm = GoogleLLMService(
model="models/gemini-2.0-flash-001",
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=human_conversation_system_instruction,
tools=tools,
)
human_conversation_context = GoogleLLMContext()

human_conversation_context_aggregator = human_conversation_llm.create_context_aggregator(
human_conversation_context
)

human_conversation_llm.register_function(
"terminate_call",
lambda *args, **kwargs: terminate_call(*args, **kwargs, call_state=call_state),
)

human_conversation_pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
human_conversation_context_aggregator.user(), # User responses
human_conversation_llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
human_conversation_context_aggregator.assistant(), # Assistant spoken responses
]
)

human_conversation_pipeline_task = PipelineTask(
human_conversation_pipeline,
params=PipelineParams(allow_interruptions=True),
)

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await voicemail_detection_pipeline_task.queue_frame(EndFrame())
await human_conversation_pipeline_task.queue_frame(EndFrame())

print("!!! starting human conversation pipeline")
human_conversation_context_aggregator.user().set_messages(
[
{
"role": "system",
"content": human_conversation_system_instruction,
}
]
)
await human_conversation_pipeline_task.queue_frames(
[human_conversation_context_aggregator.user().get_context_frame()]
)
await runner.run(human_conversation_pipeline_task)

await runner.run(task)
print("!!! Done with human conversation pipeline")


if __name__ == "__main__":
Expand Down