From 83443d20e88524ff919ccad31204aa4334205545 Mon Sep 17 00:00:00 2001
From: Rogerio Pontual <44991200+jyeshe@users.noreply.github.com>
Date: Thu, 27 Feb 2025 13:44:19 +0100
Subject: [PATCH] Disable jobs with non retriable steps (#2968)
* Disable jobs with non retriable steps
* Add test case for disabled non retriable jobs
* Changelog
* Pass a disabled set to UI
* Fix the displayed count when some selected work orders are not retriable
---
CHANGELOG.md | 3 +
lib/lightning/accounts/user_notifier.ex | 4 +-
lib/lightning/work_orders.ex | 72 ++-
lib/lightning_web/live/run_live/index.ex | 4 -
.../live/run_live/index.html.heex | 5 +-
.../live/run_live/rerun_job_component.ex | 81 ++-
lib/lightning_web/utils.ex | 3 +
.../live/run_live/index_test.exs | 474 ++++++++++++++++++
.../live/work_order_live_test.exs | 388 --------------
9 files changed, 590 insertions(+), 444 deletions(-)
create mode 100644 test/lightning_web/live/run_live/index_test.exs
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d1ad50b379..a8cb87122f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -17,6 +17,9 @@ and this project adheres to
### Added
+- Disable jobs with non retriable steps
+ [#2925](https://github.com/OpenFn/lightning/issues/2925)
+
### Changed
### Fixed
diff --git a/lib/lightning/accounts/user_notifier.ex b/lib/lightning/accounts/user_notifier.ex
index 26145ec078..7a134d5cd7 100644
--- a/lib/lightning/accounts/user_notifier.ex
+++ b/lib/lightning/accounts/user_notifier.ex
@@ -10,6 +10,7 @@ defmodule Lightning.Accounts.UserNotifier do
max_attempts: 1
import Swoosh.Email
+ import LightningWeb.Utils, only: [pluralize_with_s: 2]
alias Lightning.Accounts.User
alias Lightning.Helpers
@@ -413,9 +414,6 @@ defmodule Lightning.Accounts.UserNotifier do
"THIS LIGHTNING INSTANCE DOES NOT HAVE ALTERNATE STORAGE ENABLED, SO THESE FAILED MESSAGES CANNOT BE RECOVERED WITHOUT MAKING THEM AVAILABLE ON THE KAFKA CLUSTER AGAIN."
end
- defp pluralize_with_s(1, string), do: string
- defp pluralize_with_s(_integer, string), do: "#{string}s"
-
@doc """
Deliver instructions to confirm a credential transfer.
"""
diff --git a/lib/lightning/work_orders.ex b/lib/lightning/work_orders.ex
index 4687984ba6..d7011d28d1 100644
--- a/lib/lightning/work_orders.ex
+++ b/lib/lightning/work_orders.ex
@@ -402,32 +402,9 @@ defmodule Lightning.WorkOrders do
| UsageLimiting.error()
| {:error, :enqueue_error}
def retry_many([%WorkOrder{} | _rest] = workorders, job_id, opts) do
- orders_ids = Enum.map(workorders, & &1.id)
-
- last_runs_query =
- from(r in Run,
- where: r.work_order_id in ^orders_ids,
- group_by: [r.work_order_id],
- select: %{
- work_order_id: r.work_order_id,
- last_inserted_at: max(r.inserted_at)
- }
- )
-
- run_steps_query =
- from(as in RunStep,
- join: att in assoc(as, :run),
- join: wo in assoc(att, :work_order),
- join: last in subquery(last_runs_query),
- on:
- last.work_order_id == att.work_order_id and
- att.inserted_at == last.last_inserted_at,
- join: s in assoc(as, :step),
- on: s.job_id == ^job_id,
- order_by: [asc: wo.inserted_at]
- )
-
- run_steps_query
+ workorders
+ |> Enum.map(& &1.id)
+ |> last_run_steps_query([job_id])
|> Repo.all()
|> retry_many(opts)
end
@@ -506,6 +483,49 @@ defmodule Lightning.WorkOrders do
{:ok, 0, 0}
end
+ def get_last_runs_steps_with_dataclips(workorders, jobs) do
+ job_ids = Enum.map(jobs, & &1.id)
+
+ workorders
+ |> Enum.map(& &1.id)
+ |> last_run_steps_query(job_ids, non_wiped_dataclip?: true)
+ |> Repo.all()
+ end
+
+ defp last_run_steps_query(workorder_ids, job_ids, opts \\ []) do
+ last_runs_query =
+ from(r in Run,
+ where: r.work_order_id in ^workorder_ids,
+ group_by: [r.work_order_id],
+ select: %{
+ work_order_id: r.work_order_id,
+ last_inserted_at: max(r.inserted_at)
+ }
+ )
+
+ from(rs in RunStep,
+ join: r in assoc(rs, :run),
+ join: s in assoc(rs, :step),
+ join: wo in assoc(r, :work_order),
+ join: lr in subquery(last_runs_query),
+ on:
+ r.work_order_id == lr.work_order_id and
+ r.inserted_at == lr.last_inserted_at,
+ where: s.job_id in ^job_ids,
+ order_by: [asc: wo.inserted_at],
+ select: %{rs | step: s, run: r}
+ )
+ |> then(fn query ->
+ if Keyword.get(opts, :non_wiped_dataclip?) do
+ query
+ |> join(:inner, [rs, _r, s, _wo, _lr], d in assoc(s, :input_dataclip))
+ |> where([_rs, _r, _s, _wo, _lr, d], is_nil(d.wiped_at))
+ else
+ query
+ end
+ end)
+ end
+
@doc """
Enqueue multiple runs for retry in the same transaction.
"""
diff --git a/lib/lightning_web/live/run_live/index.ex b/lib/lightning_web/live/run_live/index.ex
index ee76be2801..1d69f187d4 100644
--- a/lib/lightning_web/live/run_live/index.ex
+++ b/lib/lightning_web/live/run_live/index.ex
@@ -575,10 +575,6 @@ defmodule LightningWeb.RunLive.Index do
|> Enum.count()
end
- defp selected_workorder_count(selected_orders) do
- Enum.count(selected_orders)
- end
-
defp maybe_humanize_date(date) do
date && Timex.format!(date, "{D}/{M}/{YY}")
end
diff --git a/lib/lightning_web/live/run_live/index.html.heex b/lib/lightning_web/live/run_live/index.html.heex
index 43daf6a59d..0692995df3 100644
--- a/lib/lightning_web/live/run_live/index.html.heex
+++ b/lib/lightning_web/live/run_live/index.html.heex
@@ -770,7 +770,7 @@
pages={@page.total_pages}
total_entries={@page.total_entries}
all_selected?={all_selected?(@selected_work_orders, @page.entries)}
- selected_count={selected_workorder_count(@selected_work_orders)}
+ selected_count={Enum.count(@selected_work_orders)}
filters={SearchParams.new(@filters)}
workflows={@workflows}
/>
@@ -780,9 +780,8 @@
module={LightningWeb.RunLive.RerunJobComponent}
id="bulk-rerun-from-job-modal"
total_entries={@page.total_entries}
- selected_count={selected_workorder_count(@selected_work_orders)}
all_selected?={all_selected?(@selected_work_orders, @page.entries)}
- workflow_id={@selected_work_orders |> hd() |> Map.get(:workflow_id)}
+ selected_workorders={@selected_work_orders}
pages={@page.total_pages}
/>
diff --git a/lib/lightning_web/live/run_live/rerun_job_component.ex b/lib/lightning_web/live/run_live/rerun_job_component.ex
index 529829550e..e804693322 100644
--- a/lib/lightning_web/live/run_live/rerun_job_component.ex
+++ b/lib/lightning_web/live/run_live/rerun_job_component.ex
@@ -2,31 +2,55 @@ defmodule LightningWeb.RunLive.RerunJobComponent do
@moduledoc """
Rerun job component
"""
-
use LightningWeb, :live_component
+
+ import LightningWeb.Utils, only: [pluralize_with_s: 2]
+
alias Lightning.Jobs
alias Lightning.Workflows
+ alias Lightning.WorkOrders
@impl true
def update(
%{
total_entries: _count,
- selected_count: _selected_count,
- workflow_id: workflow_id
+ selected_workorders:
+ [%{workflow_id: workflow_id} | _] = selected_workorders
} = assigns,
socket
) do
workflow = Workflows.get_workflow!(workflow_id)
- jobs = Jobs.list_jobs_for_workflow(workflow)
+ workflow_jobs = Jobs.list_jobs_for_workflow(workflow)
+
+ {retriable_jobs_ids, retriable_count_per_job} =
+ selected_workorders
+ |> WorkOrders.get_last_runs_steps_with_dataclips(workflow_jobs)
+ |> then(fn run_steps ->
+ retriable_jobs_ids = MapSet.new(run_steps, & &1.step.job_id)
+
+ retriable_count_per_job =
+ run_steps
+ |> Enum.group_by(& &1.step.job_id, & &1.run.work_order_id)
+ |> Map.new(fn {job_id, workorder_ids} ->
+ {job_id, Enum.count(workorder_ids)}
+ end)
+
+ {retriable_jobs_ids, retriable_count_per_job}
+ end)
+
+ disabled_jobs_ids =
+ MapSet.difference(MapSet.new(workflow_jobs, & &1.id), retriable_jobs_ids)
{:ok,
socket
|> assign(
show: false,
workflow: workflow,
- workflow_jobs: jobs,
- selected_job: hd(jobs)
+ workflow_jobs: workflow_jobs,
+ disabled_jobs_ids: disabled_jobs_ids,
+ retriable_count_per_job: retriable_count_per_job
)
+ |> update_selected_job(hd(workflow_jobs).id)
|> assign(assigns)}
end
@@ -34,12 +58,9 @@ defmodule LightningWeb.RunLive.RerunJobComponent do
def handle_event(
"select_job",
%{"job" => job_id},
- %{assigns: assigns} = socket
+ socket
) do
- selected_job =
- Enum.find(assigns.workflow_jobs, fn job -> job.id == job_id end)
-
- {:noreply, assign(socket, selected_job: selected_job)}
+ {:noreply, update_selected_job(socket, job_id)}
end
@impl true
@@ -105,10 +126,15 @@ defmodule LightningWeb.RunLive.RerunJobComponent do
do: "checked",
else: false
}
+ disabled={MapSet.member?(@disabled_jobs_ids, job.id)}
/>
@@ -132,10 +158,10 @@ defmodule LightningWeb.RunLive.RerunJobComponent do
phx-disable-with="Running..."
class="inline-flex w-full justify-center rounded-md bg-indigo-600 px-3 py-2 text-sm font-semibold text-white shadow-sm hover:bg-indigo-500 focus-visible:outline focus-visible:outline-2 focus-visible:outline-offset-2 focus-visible:outline-indigo-600 sm:col-start-1"
>
- Rerun <%= @selected_count %> selected work order<%= if @selected_count >
- 1,
- do: "s",
- else: "" %> from selected job
+ Rerun <%= @retriable_count %> selected work <%= pluralize_with_s(
+ @retriable_count,
+ "order"
+ ) %> from selected job