Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Realtime #524

Open
alexrudall opened this issue Oct 10, 2024 · 20 comments · May be fixed by #545
Open

Realtime #524

alexrudall opened this issue Oct 10, 2024 · 20 comments · May be fixed by #545

Comments

@alexrudall
Copy link
Owner

alexrudall commented Oct 10, 2024

OpenAI added a realtime API!

How do we add this to ruby-openai?

  • As simple and easy to use as possible
  • Adding as few dependencies as possible
  • As reliable as possible

Options

@danielfriis
Copy link

danielfriis commented Oct 16, 2024

Did a small poc a few days ago, connecting Twilio and the realtime API:

class AnswerCallController < ApplicationController
  skip_before_action :verify_authenticity_token
  require "faye/websocket"
  require "net/http"
  require "eventmachine"

  def incoming_call
    response = Twilio::TwiML::VoiceResponse.new do |r|
      r.say(message: "Connecting to the AI voice assistant...")
      r.connect do |c|
        c.stream(url: "wss://#{request.host_with_port}/media-stream")
      end
    end
    render xml: response.to_s
  end

  def media_stream
    if Faye::WebSocket.websocket?(request.env)
      ws = Faye::WebSocket.new(request.env)
      stream_sid = nil

      ws.on :open do |event|
        puts "Twilio client connected"
        # Connect to OpenAI WebSocket
        openai_ws = Faye::WebSocket::Client.new("wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01", nil, headers: {
          "Authorization" => "Bearer #{Rails.application.credentials.dig(:openai, :api_key)}",
          "OpenAI-Beta" => "realtime=v1"
        })

        openai_ws.on :open do |event|
          puts "Connected to OpenAI Realtime API"
          # Send session update
          session_update = {
            type: "session.update",
            session: {
              turn_detection: { type: "server_vad" },
              input_audio_format: "g711_ulaw",
              output_audio_format: "g711_ulaw",
              voice: "alloy",
              instructions: "You are a helpful and bubbly AI assistant. You are brief and to the point.",
              modalities: [ "text", "audio" ],
              temperature: 0.8
            }
          }
          openai_ws.send(session_update.to_json)
        end

        openai_ws.on :message do |event|
          # Handle incoming messages from OpenAI
          begin
            data = JSON.parse(event.data)
            case data["type"]
            when "response.audio.delta"
              if data["delta"]
                begin
                  # Process audio delta
                  audio_delta = {
                    event: "media",
                    streamSid: stream_sid,
                    media: {
                      payload: data["delta"]
                    }
                  }
                  # Send audio delta to Twilio
                  ws.send(audio_delta.to_json)
                rescue => e
                  puts "Error processing audio delta: #{e.message}"
                end
              end
            when "session.updated"
              puts "Session updated successfully: #{data}"
            when "input_audio_buffer.speech_started"
              puts "Speech Start: #{data['type']}"
              handle_speech_started_event(ws, openai_ws, stream_sid)
            end
          rescue => e
            puts "Error processing OpenAI message: #{e.message}, Raw message: #{event.data}"
          end
        end

        openai_ws.on :close do |event|
          puts "Disconnected from OpenAI Realtime API"
        end

        openai_ws.on :error do |event|
          puts "WebSocket error: #{event.message}"
        end

        # Handle incoming messages from Twilio
        ws.on :message do |event|
          data = JSON.parse(event.data)
          if data["event"] == "media"
            begin
              # Forward media to OpenAI
              audio_append = {
                type: "input_audio_buffer.append",
                audio: data["media"]["payload"]
              }
              openai_ws.send(audio_append.to_json) if openai_ws.ready_state == Faye::WebSocket::OPEN
            rescue => e
              puts "Error processing Twilio audio: #{e.message}"
            end
          elsif data["event"] == "start"
            stream_sid = data["start"]["streamSid"]
            puts "Incoming stream has started: #{stream_sid}"
          end
        end

        ws.on :close do |event|
          puts "Twilio client disconnected"
          openai_ws.close if openai_ws.ready_state == Faye::WebSocket::OPEN
        end
      end

      # Return async Rack response
      ws.rack_response
    else
      # Handle non-WebSocket requests
      render plain: "This endpoint is for WebSocket connections only."
    end
  end

  private

  def handle_speech_started_event(ws, openai_ws, stream_sid)
    if ws.ready_state == Faye::WebSocket::OPEN
      # Send a clear event to Twilio to clear the media buffer
      ws.send({ streamSid: stream_sid, event: "clear" }.to_json)
      puts "Cancelling AI speech from the server"
    end

    if openai_ws.ready_state == Faye::WebSocket::OPEN
      # Send a cancel message to OpenAI to interrupt the AI response
      interrupt_message = { type: "response.cancel" }
      openai_ws.send(interrupt_message.to_json)
    end
  end
end

@alexrudall
Copy link
Owner Author

Thanks so much for sharing @danielfriis ! super helpful.

@michelson
Copy link

simple example with Async websocket

 def ws_client
    require "async"
    require "async/http"
    require "async/websocket"

    url = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"

    # Creating headers for the request
    headers = {
      "Authorization" => "Bearer #{ENV.fetch('OPENAI_API_KEY', nil)}",
      "OpenAI-Beta" => "realtime=v1"
    }

    Async do |task|
      endpoint = Async::HTTP::Endpoint.parse(url, alpn_protocols: Async::HTTP::Protocol::HTTP11.names)

      Async::WebSocket::Client.connect(endpoint, headers: headers) do |connection|
        input_task = task.async do
          while line = $stdin.gets

            text = {
              type: "response.create",
              response: {
                modalities: ["text"],
                instructions: "Please assist the user."
              }
            }
            message = Protocol::WebSocket::TextMessage.generate(text) # ({ text: line })
            message.send(connection)
            connection.flush
          end
        end

        puts "Connected..."
        while message = connection.read
          puts "> #{message.to_h}"
        end
      ensure
        input_task&.stop
      end
    end
  end

@franpb14
Copy link

franpb14 commented Nov 1, 2024

Hello! I'm interested in opening a PR. I have been working with the API using ruby on rails. What I have done (Briefly):

  1. I have created a simple service that, using faye/websocket and eventmachine, connects to OpenAI's WebSocket and keeps it alive.
  2. I have connected this websocket to the front using actioncable
  3. With JS I have sent the audios and handled the messages from openai.

My proposal:

Maybe the solution in this gem should be something like the first point, right? More or less it should have this functions:
client.real_time.connect
client.real_time.onmessage(block) => where the developer will pass how to handle the responses
client.real_time.session_update({ hash_with_options })
client.real_time.append_input_audio_buffer(Base64EncodedAudioData)
client.real_time.commit_input_audio_buffler
client.real_time.clear_input_audio_buffler
client.real_time.create_conversation({ hash_with_options })
client.real_time.truncate_conversation({ hash_with_options })
client.real_time.delete_conversation(item_id)
client.real_time.create_response({ hash_with_options })

Maybe we could have a client.realtime.event instead of a function for each event. Let me know what you think. I'm really looking forward to being able to contribute to this project.

@franpb14 franpb14 linked a pull request Nov 4, 2024 that will close this issue
3 tasks
@franpb14
Copy link

franpb14 commented Nov 4, 2024

Hello! I wanted to try a bit and I opened a small PR, perhaps we can iterate better over it 😄

@tbcooney
Copy link

Likewise, thanks for sharing this example @danielfriis! Have you had any success using this approach to invoke an outbound call and stream from a worker?

@danielfriis
Copy link

danielfriis commented Nov 29, 2024

@tbcooney Yes!

See below.

I renamed the incoming_call method in my previous example to connect_call and reused that for both outgoing and incoming calls.

Then initiating outgoing calls with the initiate_outgoing_call method below.

def initiate_outgoing_call
    account_sid = Rails.application.credentials.dig(:twilio, :account_sid)
    auth_token = Rails.application.credentials.dig(:twilio, :auth_token)
    client = Twilio::REST::Client.new(account_sid, auth_token)

    call = client.calls.create(
      method: "POST",
      url: "http://#{request.host_with_port}/connect-call",
      to: <NUMBER>,
      from: <TWILIO NUMBER>
    )

    render json: { message: "Call initiated", sid: call.sid }
rescue Twilio::REST::TwilioError => e
    render json: { error: e.message }, status: :unprocessable_entity
end

def connect_call
    response = Twilio::TwiML::VoiceResponse.new do |r|
      r.say(message: "Connecting to the AI voice assistant...")
      r.connect do |c|
        c.stream(url: "wss://#{request.host_with_port}/media-stream")
      end
    end
    render xml: response.to_s
end

@tbcooney
Copy link

tbcooney commented Dec 1, 2024

image

@drnic
Copy link
Contributor

drnic commented Dec 5, 2024

@jon-sully
Copy link

I've been mulling this over pretty intensely since Realtime was released and @tbcooney and I actually put heads together a couple of times to think through how to integrate Rails with RealTime.

First, the code samples here are fantastic and such a big help. Thank you all so much for sharing!

Second, given my business and domain, I'm primarily interested in the two-way connection between Twilio and Realtime to facilitate phone calls with AI and/or humans+AI (e.g. 3 way calls), but one thing at a time.

Third, I'm highly infrastructure and scalability conscious. I like to push long-running things into background jobs as much as possible.

The main hold-up I've been mulling over the last month or so is sort of about system coordination and scaling. If a Rails application simply wants to connect to the Realtime API and stream audio back and forth, doing this from a background job is fine. OpenAI wants its customers to setup a web socket consumer to connect to its WSS endpoint. That's a simple task to do from a background job and, as long as you're running some kind event machine to wait until the connection is dead (and handle all the events in the middle, like "new audio arrived" and "sending audio back"), no problem.

The challenge arises on the Twilio side of things. Twilio requires that customers run the WSS endpoint and Twilio sets up a consumer to connect to the endpoint. This is the exact opposite of OpenAI's setup. This poses a challenge to the idea of "doing everything in a (long-running) background job". Ideally, I'd have wanted each phone call to essentially be represented by a single background job that runs until the phone call completes. Since a background job, on Heroku at least, cannot be network-addressable (and even if they were, Sidekiq doesn't include any facilities for this sort of thing and I'd need to spin up my own little WSS server on each job..) — this fundamentally will not work.

After beating my head against that reality for a while, I think I've come to what I believe is the best choice. Twilio essentially requires us to run web-endpoints (@danielfriis' example above (def media_stream...)) for receiving the Twilio audio stream via web sockets, wherein, as part of that endpoint's processing, we setup the consumer WSS client over to OpenAI. This is all fine and does work.

Where this raised an eyebrow for me was around capacity and scaling — we work with finite resources! Each call we coordinate from Twilio this way will essentially setup a new background thread on our web-server, independent of Puma's typical request threads, which will do work (pushing audio packets back and forth between the streams, perhaps writing logs along the way, perhaps tracking Rails model objects etc., and maybe even needing to transcode audio packets?) totally outside of actual web requests that are still coming in. Now imagine you have fifty phone calls going at once. That could spell a real issue for your web containers / dynos!

In addition to saturating resources, I'm not sure how autoscaling would even work with this setup. In general, the best way to autoscale Rails is by queue time (with a tool like Judoscale), but what happens when your web dyno is bogging down with saturated CPU .. and no requests to get a request queue time from?! 😂 That example might be a little dramatic — I don't expect that a web dyno that's coordinating phone calls would get no web requests.. but I'm just not sure what to expect with this architecture. The Puma web threads may not be busy and may be able to take on new requests (which means 0 queue time) but I guess the CPU would constantly be context switching to those background call threads, meaning our Puma threads might just ultimately take longer to process web requests. So maybe we see our overall response time rise while our queue time actually stays at zero? I'm not sure. I don't want to find out 😆

My working idea is to spin up a second 'prod' app on a subdomain (or otherwise) which exists solely for these phone call threads. I rarely reach for the "run a second prod app instance" lever, but it feels like it might be the best option in this particular situation. Especially since Twilio's HTTP POST for incoming_call can hit the main app and the wss:// address we pass back can actually hit the clone app. How the clone app would scale, I'm still not sure.

@michelson
Copy link

@jon-sully I think putting the calls in a job queue is not the best as you will hit the queue concurrency very quick, for instance if you have concurrency set as five, then you can only do 5 concurrent calls.

I like the approach of using websocket connections and maybe offload things with async gem like requests to rag or other kind of time consuming. Also I wouldn't use threads as threads has a higher memory cost, we could implement this with fibers (with async gem)

in terms of scalability there is a nice article from evilmartians that implements anycable (go middleware to activejob) and the repo example:

https://github.com/anycable/twilio-ai-demo

@jon-sully
Copy link

Agreed, calls will absolutely saturate background job concurrency for a single instance, but I tend to think of background worker instances / containers as more easily scalable than web containers. If nothing else than for the fact that by wrapping calls in jobs we'd have a stable metric on when to scale up: job queue time. I don't think we have the parallel with web requests to web dynos where a call is passed off to a background thread (outside of puma) — all we'd be able to observe there is CPU saturation (I think?)

Personally I'd like to avoid running AnyCable. I've read up on it previously and it's a neat tool but it's a much larger endeavor and I don't want to run / maintain a whole second application (in Go, no less) just to coordinate my Realtime stuff going through my Rails apps. Additionally I think AnyCable may be susceptible to the same challenges at a different layer.

@jon-sully
Copy link

I don't think threads vs. fibers matters much for the sake of the discussion around how to scale — both will inevitably free the Puma thread back to Puma, meaning that we've got a headless background thread (or fiber) with which we are using system resources on but aren't representing in any kind of system health metric (other than, again, maybe, CPU saturation)

@moladukes
Copy link

moladukes commented Jan 15, 2025

As a comment here I have found just using WebRTC easier when interacting with this API (not ideal for all usecases) but perhaps the ruby-opanai gem could simplify getting the ephemeral key and abstracting the various settings?

Or I guess my point is don't forget about the WebRTC API :)

My service for example which just makes an API Call.

class RealtimeSessionService
  require "net/http"
  require "json"

  def initialize(interview)
    @interview = interview
  end

  def create_realtime_session
    response = http_client.request(request)
    unless response.is_a?(Net::HTTPSuccess)
      Rails.logger.error("Failed to create realtime session: #{response.body}")
      raise RealtimeSessionError, "OpenAI Realtime session creation failed"
    end

    JSON.parse(response.body)
  end

  private

  def http_client
    uri = URI("https://api.openai.com/v1/realtime/sessions")
    http = Net::HTTP.new(uri.host, uri.port)
    http.use_ssl = true
    http
  end

  def request
    req = Net::HTTP::Post.new("/v1/realtime/sessions")
    req["Authorization"] = "Bearer #{Rails.application.credentials.openai[:api_key]}"
    req["Content-Type"] = "application/json"
    req.body = payload.to_json
    req
  end

  def payload
    {
      model: "gpt-4o-mini-realtime-preview",
      modalities: [ "audio", "text" ],
      instructions: "Your instructions here... ",
      voice: "alloy",
      input_audio_format: "pcm16",
      output_audio_format: "pcm16",
      turn_detection: {
        type: "server_vad",
        threshold: 0.5,
        prefix_padding_ms: 300,
        silence_duration_ms: 500,
        create_response: true
      },
      max_response_output_tokens: 1000
    }
  end

@jon-sully
Copy link

I don't think the WebRTC endpoint/version of Realtime is really doable with Rails the way I understand this issue to describe — I think folks in this issue are after a server-to-server implementation of Realtime such that audio is ported back and forth from the Rails app to/from the Realtime API (and myself, along with a few others, also adding to/from Twilio into that mix). The WebRTC Realtime endpoint is for browsers to connect to Realtime directly, such that a user of your app could talk to AI directly from their browser, bypassing your server entirely. I think that's a fully separate architecture / issue.

That said, it's probably a viable issue to raise for this library to support too — the initial handshake and token hand-off for the WebRTC setup could definitely be part of this library.

@jon-sully
Copy link

jon-sully commented Jan 16, 2025

Haven't given an update in a bit for my end of things but I've had a tremendously successful week or two with my proof-of-concept implementation into my app. The code sample @danielfriis shared has been invaluable and provided a tremendous foundation to dive in from! Excited to share where I've landed on things, particular as it pertains to handing the context off to a background thread as a "job" (lower-case J) and, more importantly for production apps IMO, passing context along to the call thread so that it knows what User record (etc) it's talking to so it can pull right levers and access the right functions!

This stuff is very cool.

That said, since this thread is ultimately an Issue in the repo of the ruby-openai library and the goal here should be to investigate if any of the repeated steps for setting up the OpenAI side of the equation should be baked into / abstracted into this library. That's where it gets challenging. I don't actually see any great places where wrapping functionality into the library would really gain us anything yet. We'd essentially be wrapping the Faye Websocket API to a tee and still having the user setup the event callbacks in the same style...

At best maybe we could wrap this into a library method so we can inject the auth credentials from the same place as elsewhere in this library:

        openai_ws = Faye::WebSocket::Client.new("wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01", nil, headers: {
          "Authorization" => "Bearer #{Rails.application.credentials.dig(:openai, :api_key)}",
          "OpenAI-Beta" => "realtime=v1"
        })

But the API to users of the library would still be like:

client = OpenAI::Client.new
realtime_ws = client.real_time(model: "uses-default-otherwise")

#=> `realtime_ws` is a Faye::Websocket object, not a home-baked OpenAI::XYZ object

realtime_ws.on :close do |event|
  # etc..
end

But I guess, to me, that feels like a fairly small thing to encapsulate I guess? Maybe it's worth it so auth tokens for OpenAI aren't spread out and remain exclusively in the initializer for OpenAI, but I'm not sure what else we can add here from a library standpoint.

@schappim
Copy link

schappim commented Jan 16, 2025 via email

@moladukes
Copy link

I don't think the WebRTC endpoint/version of Realtime is really doable with Rails the way I understand this issue to describe — I think folks in this issue are after a server-to-server implementation of Realtime such that audio is ported back and forth from the Rails app to/from the Realtime API (and myself, along with a few others, also adding to/from Twilio into that mix). The WebRTC Realtime endpoint is for browsers to connect to Realtime directly, such that a user of your app could talk to AI directly from their browser, bypassing your server entirely. I think that's a fully separate architecture / issue.

That said, it's probably a viable issue to raise for this library to support too — the initial handshake and token hand-off for the WebRTC setup could definitely be part of this library.

Ya i think thats my point. Agree with the above.

@jon-sully
Copy link

Ya i think thats my point. Agree with the above.

Ah I see now, apologies!

@jon-sully
Copy link

I got some time to draft a quick writeup on my findings and successful integration for those that are in the same camp (wanting OpenAI Realtime to feed into Twilio for phone call uses). Feel free to read here:

"Guide to Twilio + OpenAI Realtime on Rails (without Anycable)"

Otherwise I think all that's needed for this actual library is a tiny little wrapper around the client websocket instantiation to inject auth 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants