From c96ea2f109545ee35f0d0ed4178626425018f4d8 Mon Sep 17 00:00:00 2001 From: "Elias W. BA" Date: Wed, 18 Dec 2024 09:45:37 +0000 Subject: [PATCH] AI Assistant: Retry or Cancel Messages with Errors (#2773) * Add migration and update model to introduce a status field on the ai assistant's chat messages * Add functions for updating a message status * Add buttons for retrying and cancelling messages with errors * Style retry and cancel icons * Organize aliases * Fix failing tests and refactor code for readability * Filter cancelled messages * Update tests for ai_assistant.ex * Test: users can retry failed messages * Test: cancel buttons are available until only one message remainsusers can retry failed messages * Remove unnecessary check * Remove unnecessary check * Remove unreachable code * Update CL * No Multi needed, add ? to func --- CHANGELOG.md | 5 + lib/lightning/ai_assistant/ai_assistant.ex | 89 ++++++-- lib/lightning/ai_assistant/chat_message.ex | 9 + .../workflow_live/ai_assistant_component.ex | 95 +++++++- ...1210004733_add_status_to_chat_messages.exs | 9 + .../ai_assistant/ai_assistant_test.exs | 168 ++++++++++++++ .../live/workflow_live/edit_test.exs | 208 ++++++++++++++++++ 7 files changed, 549 insertions(+), 34 deletions(-) create mode 100644 priv/repo/migrations/20241210004733_add_status_to_chat_messages.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index 329f70a02d..2e02668d8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,9 @@ and this project adheres to ### Added +- Add ability to retry or cancel AI Assistant error responses for user messages + [#2704](https://github.com/OpenFn/lightning/issues/2704) + ### Changed ### Fixed @@ -29,6 +32,8 @@ and this project adheres to [#2781](https://github.com/OpenFn/lightning/issues/2781) - Allow different rules and action for delete user. [#2500](https://github.com/OpenFn/lightning/issues/2500) +- Handle errors from the AI Assistant more gracefully + [#2741](https://github.com/OpenFn/lightning/issues/2741) ### Changed diff --git a/lib/lightning/ai_assistant/ai_assistant.ex b/lib/lightning/ai_assistant/ai_assistant.ex index 29dcb553b0..091a708b27 100644 --- a/lib/lightning/ai_assistant/ai_assistant.ex +++ b/lib/lightning/ai_assistant/ai_assistant.ex @@ -5,9 +5,11 @@ defmodule Lightning.AiAssistant do import Ecto.Query + alias Ecto.Changeset alias Ecto.Multi alias Lightning.Accounts alias Lightning.Accounts.User + alias Lightning.AiAssistant.ChatMessage alias Lightning.AiAssistant.ChatSession alias Lightning.ApolloClient alias Lightning.Repo @@ -43,7 +45,15 @@ defmodule Lightning.AiAssistant do @spec get_session!(Ecto.UUID.t()) :: ChatSession.t() def get_session!(id) do - ChatSession |> Repo.get!(id) |> Repo.preload(messages: :user) + message_query = + from(m in ChatMessage, + where: m.status != :cancelled, + order_by: [asc: :inserted_at] + ) + + ChatSession + |> Repo.get!(id) + |> Repo.preload(messages: {message_query, :user}) end @spec create_session(Job.t(), User.t(), String.t()) :: @@ -108,7 +118,7 @@ defmodule Lightning.AiAssistant do @doc """ Queries the AI assistant with the given content. - Returns `{:ok, session}` if the query was successful, otherwise `:error`. + Returns `{:ok, session}` if the query was successful, otherwise `{:error, reason}`. **Example** @@ -119,34 +129,53 @@ defmodule Lightning.AiAssistant do {:ok, ChatSession.t()} | {:error, String.t() | Ecto.Changeset.t()} def query(session, content) do - apollo_resp = - ApolloClient.query( - content, - %{expression: session.expression, adaptor: session.adaptor}, - build_history(session) - ) + ApolloClient.query( + content, + %{expression: session.expression, adaptor: session.adaptor}, + build_history(session) + ) + |> handle_apollo_resp(session) + end - case apollo_resp do - {:ok, %Tesla.Env{status: status, body: body}} when status in 200..299 -> - message = body["history"] |> Enum.reverse() |> hd() + defp handle_apollo_resp( + {:ok, %Tesla.Env{status: status, body: %{"history" => history}}}, + session + ) + when status in 200..299 do + case List.last(history) do + nil -> + {:error, "No message history received"} + + message -> save_message(session, message) + end + end - {:ok, %Tesla.Env{body: %{"message" => message}}} -> - {:error, message} + defp handle_apollo_resp( + {:ok, %Tesla.Env{status: status, body: %{"message" => error_message}}}, + session + ) + when status not in 200..299 do + Logger.error("AI query failed for session #{session.id}: #{error_message}") + {:error, error_message} + end - {:error, :timeout} -> - {:error, "Request timed out. Please try again."} + defp handle_apollo_resp({:error, :timeout}, session) do + Logger.error("AI query timed out for session #{session.id}") + {:error, "Request timed out. Please try again."} + end - {:error, :econnrefused} -> - {:error, "Unable to reach the AI server. Please try again later."} + defp handle_apollo_resp({:error, :econnrefused}, session) do + Logger.error("Connection to AI server refused for session #{session.id}") + {:error, "Unable to reach the AI server. Please try again later."} + end - unexpected_error -> - Logger.warning( - "Received an unexpected error: #{inspect(unexpected_error)}" - ) + defp handle_apollo_resp(unexpected_error, session) do + Logger.error( + "Received an unexpected error for session #{session.id}: #{inspect(unexpected_error)}" + ) - {:error, "Oops! Something went wrong. Please try again."} - end + {:error, "Oops! Something went wrong. Please try again."} end defp build_history(session) do @@ -236,4 +265,18 @@ defmodule Lightning.AiAssistant do do: UsageLimiter.increment_ai_queries(session) defp maybe_increment_msgs_counter(_user_role), do: Multi.new() + + @doc """ + Updates the status of a specific message within a chat session. + + Returns `{:ok, session}` if the update is successful, otherwise `{:error, changeset}`. + """ + @spec update_message_status(ChatSession.t(), ChatMessage.t(), atom()) :: + {:ok, ChatSession.t()} | {:error, Changeset.t()} + def update_message_status(session, message, status) do + case Repo.update(ChatMessage.changeset(message, %{status: status})) do + {:ok, _updated_message} -> {:ok, get_session!(session.id)} + {:error, changeset} -> {:error, changeset} + end + end end diff --git a/lib/lightning/ai_assistant/chat_message.ex b/lib/lightning/ai_assistant/chat_message.ex index a9e371d6e6..2db9082a0a 100644 --- a/lib/lightning/ai_assistant/chat_message.ex +++ b/lib/lightning/ai_assistant/chat_message.ex @@ -6,10 +6,13 @@ defmodule Lightning.AiAssistant.ChatMessage do import Lightning.Validators, only: [validate_required_assoc: 2] @type role() :: :user | :assistant + @type status() :: :success | :error | :cancelled + @type t() :: %__MODULE__{ id: Ecto.UUID.t(), content: String.t() | nil, role: role(), + status: status(), is_deleted: boolean(), is_public: boolean() } @@ -17,6 +20,11 @@ defmodule Lightning.AiAssistant.ChatMessage do schema "ai_chat_messages" do field :content, :string field :role, Ecto.Enum, values: [:user, :assistant] + + field :status, Ecto.Enum, + values: [:success, :error, :cancelled], + default: :success + field :is_deleted, :boolean, default: false field :is_public, :boolean, default: true @@ -31,6 +39,7 @@ defmodule Lightning.AiAssistant.ChatMessage do |> cast(attrs, [ :content, :role, + :status, :is_deleted, :is_public, :chat_session_id diff --git a/lib/lightning_web/live/workflow_live/ai_assistant_component.ex b/lib/lightning_web/live/workflow_live/ai_assistant_component.ex index 4b82a3bd76..e320bf69ef 100644 --- a/lib/lightning_web/live/workflow_live/ai_assistant_component.ex +++ b/lib/lightning_web/live/workflow_live/ai_assistant_component.ex @@ -6,6 +6,8 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do alias Phoenix.LiveView.AsyncResult alias Phoenix.LiveView.JS + require Logger + @dialyzer {:nowarn_function, process_ast: 2} def mount(socket) do @@ -128,6 +130,38 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do |> then(fn socket -> {:noreply, socket} end) end + def handle_event("cancel_message", %{"message-id" => message_id}, socket) do + message = Enum.find(socket.assigns.session.messages, &(&1.id == message_id)) + + {:ok, session} = + AiAssistant.update_message_status( + socket.assigns.session, + message, + :cancelled + ) + + {:noreply, assign(socket, :session, session)} + end + + def handle_event("retry_message", %{"message-id" => message_id}, socket) do + message = Enum.find(socket.assigns.session.messages, &(&1.id == message_id)) + + {:ok, session} = + AiAssistant.update_message_status( + socket.assigns.session, + message, + :success + ) + + {:noreply, + socket + |> assign(:session, session) + |> assign(:pending_message, AsyncResult.loading()) + |> start_async(:process_message, fn -> + AiAssistant.query(session, message.content) + end)} + end + defp save_message(%{assigns: assigns} = socket, :new, content) do case AiAssistant.create_session( assigns.selected_job, @@ -206,19 +240,23 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do |> assign(:pending_message, AsyncResult.ok(nil))} end - def handle_async(:process_message, {:ok, error}, socket) do - {:noreply, - socket - |> update(:pending_message, fn async_result -> - AsyncResult.failed(async_result, error) - end)} - end + def handle_async(:process_message, {:ok, {:error, error}}, socket), + do: handle_failed_async({:error, error}, socket) + + def handle_async(:process_message, {:exit, error}, socket), + do: handle_failed_async({:exit, error}, socket) + + defp handle_failed_async(error, socket) do + message = List.last(socket.assigns.session.messages) + + {:ok, updated_session} = + AiAssistant.update_message_status(socket.assigns.session, message, :error) - def handle_async(:process_message, {:exit, error}, socket) do {:noreply, socket + |> assign(:session, updated_session) |> update(:pending_message, fn async_result -> - AsyncResult.failed(async_result, {:exit, error}) + AsyncResult.failed(async_result, error) end)} end @@ -315,7 +353,7 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do enabled: true }, %{ - quote: "Be skeptical, but don’t be cynical", + quote: "Be skeptical, but don't be cynical", author: "OpenFn Responsible AI Policy", source_link: "https://www.openfn.org/ai", enabled: true @@ -511,6 +549,7 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do pending_message={@pending_message} query_params={@query_params} base_url={@base_url} + target={@myself} /> <% end %> @@ -700,6 +739,7 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do attr :pending_message, AsyncResult, required: true attr :query_params, :map, required: true attr :base_url, :string, required: true + attr :target, :any, required: true defp render_individual_session(assigns) do ~H""" @@ -727,9 +767,37 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do class="flex flex-row-reverse items-end gap-x-3 mr-3" > <.user_avatar user={message.user} size_class="min-w-10 h-10 w-10" /> -
+
<%= message.content %>
+
+ + +
1 + end end diff --git a/priv/repo/migrations/20241210004733_add_status_to_chat_messages.exs b/priv/repo/migrations/20241210004733_add_status_to_chat_messages.exs new file mode 100644 index 0000000000..a03d29b288 --- /dev/null +++ b/priv/repo/migrations/20241210004733_add_status_to_chat_messages.exs @@ -0,0 +1,9 @@ +defmodule Lightning.Repo.Migrations.AddStatusToChatMessages do + use Ecto.Migration + + def change do + alter table(:ai_chat_messages) do + add :status, :string, default: "success", null: false + end + end +end diff --git a/test/lightning/ai_assistant/ai_assistant_test.exs b/test/lightning/ai_assistant/ai_assistant_test.exs index 5cc6f1058d..984f401d16 100644 --- a/test/lightning/ai_assistant/ai_assistant_test.exs +++ b/test/lightning/ai_assistant/ai_assistant_test.exs @@ -110,6 +110,39 @@ defmodule Lightning.AiAssistantTest do assert Lightning.Repo.reload!(saved_message) end + + test "handles empty history in response", %{ + user: user, + workflow: %{jobs: [job_1 | _]} + } do + session = + insert(:chat_session, + user: user, + job: job_1, + messages: [%{role: :user, content: "what?", user: user}] + ) + + Mox.stub(Lightning.MockConfig, :apollo, fn key -> + case key do + :endpoint -> "http://localhost:3000" + :openai_api_key -> "api_key" + end + end) + + empty_history_reply = + Jason.encode!(%{ + "response" => "Some response", + "history" => [] + }) + + expect(Lightning.Tesla.Mock, :call, fn %{method: :post, url: url}, _opts -> + assert url =~ "/services/job_chat" + {:ok, %Tesla.Env{status: 200, body: Jason.decode!(empty_history_reply)}} + end) + + assert {:error, "No message history received"} = + AiAssistant.query(session, "foo") + end end describe "list_sessions_for_job/1" do @@ -254,4 +287,139 @@ defmodule Lightning.AiAssistantTest do }) end end + + describe "update_message_status/3" do + test "successfully updates message status to success", %{ + user: user, + workflow: %{jobs: [job_1 | _]} + } do + message = + insert(:chat_message, + content: "test", + role: :user, + user: user, + status: :error + ) + + session = + insert(:chat_session, user: user, job: job_1, messages: [message]) + + assert {:ok, updated_session} = + AiAssistant.update_message_status(session, message, :success) + + updated_message = List.first(updated_session.messages) + assert updated_message.status == :success + end + + test "successfully updates message status to error", %{ + user: user, + workflow: %{jobs: [job_1 | _]} + } do + message = + insert(:chat_message, + content: "test", + role: :user, + user: user, + status: :success + ) + + session = + insert(:chat_session, user: user, job: job_1, messages: [message]) + + assert {:ok, updated_session} = + AiAssistant.update_message_status(session, message, :error) + + updated_message = List.first(updated_session.messages) + assert updated_message.status == :error + end + + test "successfully updates message status to cancelled", %{ + user: user, + workflow: %{jobs: [job_1 | _]} + } do + message = + insert(:chat_message, + content: "test", + role: :user, + user: user, + status: :success + ) + + session = + insert(:chat_session, user: user, job: job_1, messages: [message]) + + assert {:ok, updated_session} = + AiAssistant.update_message_status(session, message, :cancelled) + + # Note: Since get_session! filters out cancelled messages, we should not see it in updated_session + assert updated_session.messages == [] + end + + test "returns error changeset when status update fails", %{ + user: user, + workflow: %{jobs: [job_1 | _]} + } do + message = insert(:chat_message, content: "test", role: :user, user: user) + + session = + insert(:chat_session, user: user, job: job_1, messages: [message]) + + assert {:error, changeset} = + AiAssistant.update_message_status( + session, + message, + :invalid_status + ) + + assert %Ecto.Changeset{} = changeset + assert "is invalid" in errors_on(changeset).status + end + + test "handles non-existent session", %{ + user: user + } do + message = insert(:chat_message, content: "test", role: :user, user: user) + # Session doesn't exist in DB + session = build(:chat_session, id: Ecto.UUID.generate()) + + assert_raise Ecto.NoResultsError, fn -> + AiAssistant.update_message_status(session, message, :success) + end + end + + test "updates message status for session with multiple messages", %{ + user: user, + workflow: %{jobs: [job_1 | _]} + } do + message1 = + insert(:chat_message, + content: "test1", + role: :user, + user: user, + status: :error + ) + + message2 = + insert(:chat_message, + content: "test2", + role: :assistant, + user: user, + status: :error + ) + + session = + insert(:chat_session, + user: user, + job: job_1, + messages: [message1, message2] + ) + + assert {:ok, updated_session} = + AiAssistant.update_message_status(session, message1, :success) + + [updated_message2, updated_message1] = updated_session.messages + assert updated_message1.status == :success + assert updated_message2.status == :error + end + end end diff --git a/test/lightning_web/live/workflow_live/edit_test.exs b/test/lightning_web/live/workflow_live/edit_test.exs index 113509ce00..5c28be77a0 100644 --- a/test/lightning_web/live/workflow_live/edit_test.exs +++ b/test/lightning_web/live/workflow_live/edit_test.exs @@ -2825,6 +2825,214 @@ defmodule LightningWeb.WorkflowLive.EditTest do refute render(input_element) =~ message end + + @tag email: "user@openfn.org" + test "users can retry failed messages", %{ + conn: conn, + project: project, + user: user, + workflow: %{jobs: [job_1 | _]} = workflow + } do + apollo_endpoint = "http://localhost:4001" + + Mox.stub(Lightning.MockConfig, :apollo, fn + :endpoint -> apollo_endpoint + :openai_api_key -> "openai_api_key" + end) + + Mox.stub(Lightning.Tesla.Mock, :call, fn + %{method: :get, url: ^apollo_endpoint <> "/"}, _opts -> + {:ok, %Tesla.Env{status: 200}} + + %{method: :post}, _opts -> + {:ok, %Tesla.Env{status: 500}} + end) + + session = + insert(:chat_session, + user: user, + job: job_1, + messages: [ + %{role: :user, content: "Hello", status: :error, user: user} + ] + ) + + timestamp = DateTime.utc_now() |> DateTime.to_unix() + + Ecto.Changeset.change(user, %{ + preferences: %{"ai_assistant.disclaimer_read_at" => timestamp} + }) + |> Lightning.Repo.update!() + + {:ok, view, _html} = + live( + conn, + ~p"/projects/#{project.id}/w/#{workflow.id}?#{[v: workflow.lock_version, s: job_1.id, m: "expand", chat: session.id]}" + ) + + render_async(view) + + assert has_element?( + view, + "#retry-message-#{List.first(session.messages).id}" + ) + + # can't cancel first message of session + refute has_element?( + view, + "#cancel-message-#{List.first(session.messages).id}" + ) + + Mox.stub(Lightning.Tesla.Mock, :call, fn + %{method: :get, url: ^apollo_endpoint <> "/"}, _opts -> + {:ok, %Tesla.Env{status: 200}} + + %{method: :post}, _opts -> + {:ok, + %Tesla.Env{ + status: 200, + body: %{ + "history" => [ + %{"role" => "user", "content" => "Hello"}, + %{"role" => "assistant", "content" => "Hi there!"} + ] + } + }} + end) + + view + |> element("#retry-message-#{List.first(session.messages).id}") + |> render_click() + + html = render_async(view) + + assert html =~ "Hi there!" + + refute has_element?(view, "#assistant-failed-message") + end + + @tag email: "user@openfn.org" + test "cancel buttons are available until only one message remains", %{ + conn: conn, + project: project, + user: user, + workflow: %{jobs: [job_1 | _]} = workflow + } do + apollo_endpoint = "http://localhost:4001" + + Mox.stub(Lightning.MockConfig, :apollo, fn + :endpoint -> apollo_endpoint + :openai_api_key -> "openai_api_key" + end) + + Mox.stub(Lightning.Tesla.Mock, :call, fn + %{method: :get, url: ^apollo_endpoint <> "/"}, _opts -> + {:ok, %Tesla.Env{status: 200}} + end) + + session = + insert(:chat_session, + user: user, + job: job_1, + messages: [ + %{role: :user, content: "First message", status: :error, user: user}, + %{role: :assistant, content: "First response"}, + %{ + role: :user, + content: "Second message", + status: :error, + user: user + }, + %{role: :assistant, content: "Second response"}, + %{role: :user, content: "Third message", status: :error, user: user} + ] + ) + + timestamp = DateTime.utc_now() |> DateTime.to_unix() + + Ecto.Changeset.change(user, %{ + preferences: %{"ai_assistant.disclaimer_read_at" => timestamp} + }) + |> Lightning.Repo.update!() + + {:ok, view, _html} = + live( + conn, + ~p"/projects/#{project.id}/w/#{workflow.id}?#{[v: workflow.lock_version, s: job_1.id, m: "expand", chat: session.id]}" + ) + + render_async(view) + + failed_messages = Enum.filter(session.messages, &(&1.status == :error)) + + # Initially, all failed messages should have both retry and cancel buttons + Enum.each(failed_messages, fn message -> + assert has_element?(view, "#retry-message-#{message.id}") + assert has_element?(view, "#cancel-message-#{message.id}") + end) + + # Cancel messages one by one until only one remains + Enum.take(failed_messages, length(failed_messages) - 1) + |> Enum.each(fn message -> + view + |> element("#cancel-message-#{message.id}") + |> render_click() + + refute has_element?(view, "#retry-message-#{message.id}") + refute has_element?(view, "#cancel-message-#{message.id}") + + updated_session = Lightning.AiAssistant.get_session!(session.id) + + refute Enum.any?(updated_session.messages, &(&1.id == message.id)) + end) + + # After cancelling all messages except one, get the current session state + updated_session = Lightning.AiAssistant.get_session!(session.id) + + # Verify total remaining messages + user_messages = Enum.filter(updated_session.messages, &(&1.role == :user)) + assert length(user_messages) == 1 + + # Get the remaining failed message + current_failed_messages = + Enum.filter(updated_session.messages, &(&1.status == :error)) + + assert length(current_failed_messages) == 1 + + last_remaining_message = List.first(user_messages) + + assert has_element?(view, "#retry-message-#{last_remaining_message.id}") + refute has_element?(view, "#cancel-message-#{last_remaining_message.id}") + + # Compare with single message session behavior + single_message_session = + insert(:chat_session, + user: user, + job: job_1, + messages: [ + %{role: :user, content: "Hello", status: :error, user: user} + ] + ) + + {:ok, view, _html} = + live( + conn, + ~p"/projects/#{project.id}/w/#{workflow.id}?#{[v: workflow.lock_version, s: job_1.id, m: "expand", chat: single_message_session.id]}" + ) + + render_async(view) + + # Verify single message session only shows retry button + assert has_element?( + view, + "#retry-message-#{List.first(single_message_session.messages).id}" + ) + + refute has_element?( + view, + "#cancel-message-#{List.first(single_message_session.messages).id}" + ) + end end describe "Allow low priority access users to retry steps and create workorders" do