Skip to content

Commit

Permalink
AI Assistant: Retry or Cancel Messages with Errors (#2773)
Browse files Browse the repository at this point in the history
* Add migration and update model to introduce a status field on the ai assistant's chat messages

* Add functions for updating a message status

* Add buttons for retrying and cancelling messages with errors

* Style retry and cancel icons

* Organize aliases

* Fix failing tests and refactor code for readability

* Filter cancelled messages

* Update tests for ai_assistant.ex

* Test: users can retry failed messages

* Test: cancel buttons are available until only one message remainsusers can retry failed messages

* Remove unnecessary check

* Remove unnecessary check

* Remove unreachable code

* Update CL

* No Multi needed, add ? to func
  • Loading branch information
elias-ba authored Dec 18, 2024
1 parent 4e24553 commit c96ea2f
Show file tree
Hide file tree
Showing 7 changed files with 549 additions and 34 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ and this project adheres to

### Added

- Add ability to retry or cancel AI Assistant error responses for user messages
[#2704](https://github.com/OpenFn/lightning/issues/2704)

### Changed

### Fixed
Expand All @@ -29,6 +32,8 @@ and this project adheres to
[#2781](https://github.com/OpenFn/lightning/issues/2781)
- Allow different rules and action for delete user.
[#2500](https://github.com/OpenFn/lightning/issues/2500)
- Handle errors from the AI Assistant more gracefully
[#2741](https://github.com/OpenFn/lightning/issues/2741)

### Changed

Expand Down
89 changes: 66 additions & 23 deletions lib/lightning/ai_assistant/ai_assistant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ defmodule Lightning.AiAssistant do

import Ecto.Query

alias Ecto.Changeset
alias Ecto.Multi
alias Lightning.Accounts
alias Lightning.Accounts.User
alias Lightning.AiAssistant.ChatMessage
alias Lightning.AiAssistant.ChatSession
alias Lightning.ApolloClient
alias Lightning.Repo
Expand Down Expand Up @@ -43,7 +45,15 @@ defmodule Lightning.AiAssistant do

@spec get_session!(Ecto.UUID.t()) :: ChatSession.t()
def get_session!(id) do
ChatSession |> Repo.get!(id) |> Repo.preload(messages: :user)
message_query =
from(m in ChatMessage,
where: m.status != :cancelled,
order_by: [asc: :inserted_at]
)

ChatSession
|> Repo.get!(id)
|> Repo.preload(messages: {message_query, :user})
end

@spec create_session(Job.t(), User.t(), String.t()) ::
Expand Down Expand Up @@ -108,7 +118,7 @@ defmodule Lightning.AiAssistant do
@doc """
Queries the AI assistant with the given content.
Returns `{:ok, session}` if the query was successful, otherwise `:error`.
Returns `{:ok, session}` if the query was successful, otherwise `{:error, reason}`.
**Example**
Expand All @@ -119,34 +129,53 @@ defmodule Lightning.AiAssistant do
{:ok, ChatSession.t()}
| {:error, String.t() | Ecto.Changeset.t()}
def query(session, content) do
apollo_resp =
ApolloClient.query(
content,
%{expression: session.expression, adaptor: session.adaptor},
build_history(session)
)
ApolloClient.query(
content,
%{expression: session.expression, adaptor: session.adaptor},
build_history(session)
)
|> handle_apollo_resp(session)
end

case apollo_resp do
{:ok, %Tesla.Env{status: status, body: body}} when status in 200..299 ->
message = body["history"] |> Enum.reverse() |> hd()
defp handle_apollo_resp(
{:ok, %Tesla.Env{status: status, body: %{"history" => history}}},
session
)
when status in 200..299 do
case List.last(history) do
nil ->
{:error, "No message history received"}

message ->
save_message(session, message)
end
end

{:ok, %Tesla.Env{body: %{"message" => message}}} ->
{:error, message}
defp handle_apollo_resp(
{:ok, %Tesla.Env{status: status, body: %{"message" => error_message}}},
session
)
when status not in 200..299 do
Logger.error("AI query failed for session #{session.id}: #{error_message}")
{:error, error_message}
end

{:error, :timeout} ->
{:error, "Request timed out. Please try again."}
defp handle_apollo_resp({:error, :timeout}, session) do
Logger.error("AI query timed out for session #{session.id}")
{:error, "Request timed out. Please try again."}
end

{:error, :econnrefused} ->
{:error, "Unable to reach the AI server. Please try again later."}
defp handle_apollo_resp({:error, :econnrefused}, session) do
Logger.error("Connection to AI server refused for session #{session.id}")
{:error, "Unable to reach the AI server. Please try again later."}
end

unexpected_error ->
Logger.warning(
"Received an unexpected error: #{inspect(unexpected_error)}"
)
defp handle_apollo_resp(unexpected_error, session) do
Logger.error(
"Received an unexpected error for session #{session.id}: #{inspect(unexpected_error)}"
)

{:error, "Oops! Something went wrong. Please try again."}
end
{:error, "Oops! Something went wrong. Please try again."}
end

defp build_history(session) do
Expand Down Expand Up @@ -236,4 +265,18 @@ defmodule Lightning.AiAssistant do
do: UsageLimiter.increment_ai_queries(session)

defp maybe_increment_msgs_counter(_user_role), do: Multi.new()

@doc """
Updates the status of a specific message within a chat session.
Returns `{:ok, session}` if the update is successful, otherwise `{:error, changeset}`.
"""
@spec update_message_status(ChatSession.t(), ChatMessage.t(), atom()) ::
{:ok, ChatSession.t()} | {:error, Changeset.t()}
def update_message_status(session, message, status) do
case Repo.update(ChatMessage.changeset(message, %{status: status})) do
{:ok, _updated_message} -> {:ok, get_session!(session.id)}
{:error, changeset} -> {:error, changeset}
end
end
end
9 changes: 9 additions & 0 deletions lib/lightning/ai_assistant/chat_message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,25 @@ defmodule Lightning.AiAssistant.ChatMessage do
import Lightning.Validators, only: [validate_required_assoc: 2]

@type role() :: :user | :assistant
@type status() :: :success | :error | :cancelled

@type t() :: %__MODULE__{
id: Ecto.UUID.t(),
content: String.t() | nil,
role: role(),
status: status(),
is_deleted: boolean(),
is_public: boolean()
}

schema "ai_chat_messages" do
field :content, :string
field :role, Ecto.Enum, values: [:user, :assistant]

field :status, Ecto.Enum,
values: [:success, :error, :cancelled],
default: :success

field :is_deleted, :boolean, default: false
field :is_public, :boolean, default: true

Expand All @@ -31,6 +39,7 @@ defmodule Lightning.AiAssistant.ChatMessage do
|> cast(attrs, [
:content,
:role,
:status,
:is_deleted,
:is_public,
:chat_session_id
Expand Down
95 changes: 84 additions & 11 deletions lib/lightning_web/live/workflow_live/ai_assistant_component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do
alias Phoenix.LiveView.AsyncResult
alias Phoenix.LiveView.JS

require Logger

@dialyzer {:nowarn_function, process_ast: 2}

def mount(socket) do
Expand Down Expand Up @@ -128,6 +130,38 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do
|> then(fn socket -> {:noreply, socket} end)
end

def handle_event("cancel_message", %{"message-id" => message_id}, socket) do
message = Enum.find(socket.assigns.session.messages, &(&1.id == message_id))

{:ok, session} =
AiAssistant.update_message_status(
socket.assigns.session,
message,
:cancelled
)

{:noreply, assign(socket, :session, session)}
end

def handle_event("retry_message", %{"message-id" => message_id}, socket) do
message = Enum.find(socket.assigns.session.messages, &(&1.id == message_id))

{:ok, session} =
AiAssistant.update_message_status(
socket.assigns.session,
message,
:success
)

{:noreply,
socket
|> assign(:session, session)
|> assign(:pending_message, AsyncResult.loading())
|> start_async(:process_message, fn ->
AiAssistant.query(session, message.content)
end)}
end

defp save_message(%{assigns: assigns} = socket, :new, content) do
case AiAssistant.create_session(
assigns.selected_job,
Expand Down Expand Up @@ -206,19 +240,23 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do
|> assign(:pending_message, AsyncResult.ok(nil))}
end

def handle_async(:process_message, {:ok, error}, socket) do
{:noreply,
socket
|> update(:pending_message, fn async_result ->
AsyncResult.failed(async_result, error)
end)}
end
def handle_async(:process_message, {:ok, {:error, error}}, socket),
do: handle_failed_async({:error, error}, socket)

def handle_async(:process_message, {:exit, error}, socket),
do: handle_failed_async({:exit, error}, socket)

defp handle_failed_async(error, socket) do
message = List.last(socket.assigns.session.messages)

{:ok, updated_session} =
AiAssistant.update_message_status(socket.assigns.session, message, :error)

def handle_async(:process_message, {:exit, error}, socket) do
{:noreply,
socket
|> assign(:session, updated_session)
|> update(:pending_message, fn async_result ->
AsyncResult.failed(async_result, {:exit, error})
AsyncResult.failed(async_result, error)
end)}
end

Expand Down Expand Up @@ -315,7 +353,7 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do
enabled: true
},
%{
quote: "Be skeptical, but dont be cynical",
quote: "Be skeptical, but don't be cynical",
author: "OpenFn Responsible AI Policy",
source_link: "https://www.openfn.org/ai",
enabled: true
Expand Down Expand Up @@ -511,6 +549,7 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do
pending_message={@pending_message}
query_params={@query_params}
base_url={@base_url}
target={@myself}
/>
<% end %>
Expand Down Expand Up @@ -700,6 +739,7 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do
attr :pending_message, AsyncResult, required: true
attr :query_params, :map, required: true
attr :base_url, :string, required: true
attr :target, :any, required: true

defp render_individual_session(assigns) do
~H"""
Expand Down Expand Up @@ -727,9 +767,37 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do
class="flex flex-row-reverse items-end gap-x-3 mr-3"
>
<.user_avatar user={message.user} size_class="min-w-10 h-10 w-10" />
<div class="bg-blue-300 bg-opacity-50 p-2 mb-0.5 rounded-lg break-words max-w-[80%]">
<div class="bg-blue-300 bg-opacity-50 p-2 mb-0.5 rounded-lg break-words max-w-[70%]">
<%= message.content %>
</div>
<div
:if={message.role == :user and message.status == :error}
class="flex items-center gap-2 mt-1"
>
<button
id={"retry-message-#{message.id}"}
phx-click="retry_message"
phx-value-message-id={message.id}
phx-target={@target}
class="flex items-center justify-center w-5 h-5 rounded-full bg-gray-100 text-gray-400 hover:bg-gray-200 hover:text-gray-600 transition duration-200"
phx-hook="Tooltip"
aria-label="Retry this message"
>
<.icon name="hero-arrow-path-mini" class="h-4 w-4" />
</button>
<button
:if={display_cancel_message_btn?(@session)}
id={"cancel-message-#{message.id}"}
phx-click="cancel_message"
phx-value-message-id={message.id}
phx-target={@target}
class="flex items-center justify-center w-5 h-5 rounded-full bg-gray-100 text-gray-400 hover:bg-gray-200 hover:text-gray-600 transition duration-200"
phx-hook="Tooltip"
aria-label="Cancel this message"
>
<.icon name="hero-x-mark" class="h-4 w-4" />
</button>
</div>
</div>
<div
:if={message.role == :assistant}
Expand Down Expand Up @@ -887,4 +955,9 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do
title
end
end

defp display_cancel_message_btn?(session) do
user_messages = Enum.filter(session.messages, &(&1.role == :user))
length(user_messages) > 1
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Lightning.Repo.Migrations.AddStatusToChatMessages do
use Ecto.Migration

def change do
alter table(:ai_chat_messages) do
add :status, :string, default: "success", null: false
end
end
end
Loading

0 comments on commit c96ea2f

Please sign in to comment.