Skip to content

Commit

Permalink
Disable jobs with non retriable steps (#2968)
Browse files Browse the repository at this point in the history
* Disable jobs with non retriable steps

* Add test case for disabled non retriable jobs

* Changelog

* Pass a disabled set to UI

* Fix the displayed count when some selected work orders are not retriable
  • Loading branch information
jyeshe authored Feb 27, 2025
1 parent 1badc31 commit 83443d2
Show file tree
Hide file tree
Showing 9 changed files with 590 additions and 444 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ and this project adheres to

### Added

- Disable jobs with non retriable steps
[#2925](https://github.com/OpenFn/lightning/issues/2925)

### Changed

### Fixed
Expand Down
4 changes: 1 addition & 3 deletions lib/lightning/accounts/user_notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Lightning.Accounts.UserNotifier do
max_attempts: 1

import Swoosh.Email
import LightningWeb.Utils, only: [pluralize_with_s: 2]

alias Lightning.Accounts.User
alias Lightning.Helpers
Expand Down Expand Up @@ -413,9 +414,6 @@ defmodule Lightning.Accounts.UserNotifier do
"THIS LIGHTNING INSTANCE DOES NOT HAVE ALTERNATE STORAGE ENABLED, SO THESE FAILED MESSAGES CANNOT BE RECOVERED WITHOUT MAKING THEM AVAILABLE ON THE KAFKA CLUSTER AGAIN."
end

defp pluralize_with_s(1, string), do: string
defp pluralize_with_s(_integer, string), do: "#{string}s"

@doc """
Deliver instructions to confirm a credential transfer.
"""
Expand Down
72 changes: 46 additions & 26 deletions lib/lightning/work_orders.ex
Original file line number Diff line number Diff line change
Expand Up @@ -402,32 +402,9 @@ defmodule Lightning.WorkOrders do
| UsageLimiting.error()
| {:error, :enqueue_error}
def retry_many([%WorkOrder{} | _rest] = workorders, job_id, opts) do
orders_ids = Enum.map(workorders, & &1.id)

last_runs_query =
from(r in Run,
where: r.work_order_id in ^orders_ids,
group_by: [r.work_order_id],
select: %{
work_order_id: r.work_order_id,
last_inserted_at: max(r.inserted_at)
}
)

run_steps_query =
from(as in RunStep,
join: att in assoc(as, :run),
join: wo in assoc(att, :work_order),
join: last in subquery(last_runs_query),
on:
last.work_order_id == att.work_order_id and
att.inserted_at == last.last_inserted_at,
join: s in assoc(as, :step),
on: s.job_id == ^job_id,
order_by: [asc: wo.inserted_at]
)

run_steps_query
workorders
|> Enum.map(& &1.id)
|> last_run_steps_query([job_id])
|> Repo.all()
|> retry_many(opts)
end
Expand Down Expand Up @@ -506,6 +483,49 @@ defmodule Lightning.WorkOrders do
{:ok, 0, 0}
end

def get_last_runs_steps_with_dataclips(workorders, jobs) do
job_ids = Enum.map(jobs, & &1.id)

workorders
|> Enum.map(& &1.id)
|> last_run_steps_query(job_ids, non_wiped_dataclip?: true)
|> Repo.all()
end

defp last_run_steps_query(workorder_ids, job_ids, opts \\ []) do
last_runs_query =
from(r in Run,
where: r.work_order_id in ^workorder_ids,
group_by: [r.work_order_id],
select: %{
work_order_id: r.work_order_id,
last_inserted_at: max(r.inserted_at)
}
)

from(rs in RunStep,
join: r in assoc(rs, :run),
join: s in assoc(rs, :step),
join: wo in assoc(r, :work_order),
join: lr in subquery(last_runs_query),
on:
r.work_order_id == lr.work_order_id and
r.inserted_at == lr.last_inserted_at,
where: s.job_id in ^job_ids,
order_by: [asc: wo.inserted_at],
select: %{rs | step: s, run: r}
)
|> then(fn query ->
if Keyword.get(opts, :non_wiped_dataclip?) do
query
|> join(:inner, [rs, _r, s, _wo, _lr], d in assoc(s, :input_dataclip))
|> where([_rs, _r, _s, _wo, _lr, d], is_nil(d.wiped_at))
else
query
end
end)
end

@doc """
Enqueue multiple runs for retry in the same transaction.
"""
Expand Down
4 changes: 0 additions & 4 deletions lib/lightning_web/live/run_live/index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,6 @@ defmodule LightningWeb.RunLive.Index do
|> Enum.count()
end

defp selected_workorder_count(selected_orders) do
Enum.count(selected_orders)
end

defp maybe_humanize_date(date) do
date && Timex.format!(date, "{D}/{M}/{YY}")
end
Expand Down
5 changes: 2 additions & 3 deletions lib/lightning_web/live/run_live/index.html.heex
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@
pages={@page.total_pages}
total_entries={@page.total_entries}
all_selected?={all_selected?(@selected_work_orders, @page.entries)}
selected_count={selected_workorder_count(@selected_work_orders)}
selected_count={Enum.count(@selected_work_orders)}
filters={SearchParams.new(@filters)}
workflows={@workflows}
/>
Expand All @@ -780,9 +780,8 @@
module={LightningWeb.RunLive.RerunJobComponent}
id="bulk-rerun-from-job-modal"
total_entries={@page.total_entries}
selected_count={selected_workorder_count(@selected_work_orders)}
all_selected?={all_selected?(@selected_work_orders, @page.entries)}
workflow_id={@selected_work_orders |> hd() |> Map.get(:workflow_id)}
selected_workorders={@selected_work_orders}
pages={@page.total_pages}
/>
</div>
Expand Down
81 changes: 61 additions & 20 deletions lib/lightning_web/live/run_live/rerun_job_component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,65 @@ defmodule LightningWeb.RunLive.RerunJobComponent do
@moduledoc """
Rerun job component
"""

use LightningWeb, :live_component

import LightningWeb.Utils, only: [pluralize_with_s: 2]

alias Lightning.Jobs
alias Lightning.Workflows
alias Lightning.WorkOrders

@impl true
def update(
%{
total_entries: _count,
selected_count: _selected_count,
workflow_id: workflow_id
selected_workorders:
[%{workflow_id: workflow_id} | _] = selected_workorders
} = assigns,
socket
) do
workflow = Workflows.get_workflow!(workflow_id)
jobs = Jobs.list_jobs_for_workflow(workflow)
workflow_jobs = Jobs.list_jobs_for_workflow(workflow)

{retriable_jobs_ids, retriable_count_per_job} =
selected_workorders
|> WorkOrders.get_last_runs_steps_with_dataclips(workflow_jobs)
|> then(fn run_steps ->
retriable_jobs_ids = MapSet.new(run_steps, & &1.step.job_id)

retriable_count_per_job =
run_steps
|> Enum.group_by(& &1.step.job_id, & &1.run.work_order_id)
|> Map.new(fn {job_id, workorder_ids} ->
{job_id, Enum.count(workorder_ids)}
end)

{retriable_jobs_ids, retriable_count_per_job}
end)

disabled_jobs_ids =
MapSet.difference(MapSet.new(workflow_jobs, & &1.id), retriable_jobs_ids)

{:ok,
socket
|> assign(
show: false,
workflow: workflow,
workflow_jobs: jobs,
selected_job: hd(jobs)
workflow_jobs: workflow_jobs,
disabled_jobs_ids: disabled_jobs_ids,
retriable_count_per_job: retriable_count_per_job
)
|> update_selected_job(hd(workflow_jobs).id)
|> assign(assigns)}
end

@impl true
def handle_event(
"select_job",
%{"job" => job_id},
%{assigns: assigns} = socket
socket
) do
selected_job =
Enum.find(assigns.workflow_jobs, fn job -> job.id == job_id end)

{:noreply, assign(socket, selected_job: selected_job)}
{:noreply, update_selected_job(socket, job_id)}
end

@impl true
Expand Down Expand Up @@ -105,10 +126,15 @@ defmodule LightningWeb.RunLive.RerunJobComponent do
do: "checked",
else: false
}
disabled={MapSet.member?(@disabled_jobs_ids, job.id)}
/>
<label
id={"jobl_#{job.id}"}
for={"job_#{job.id}"}
class="ml-3 block text-sm font-medium leading-6 text-gray-900"
class={[
"ml-3 block text-sm leading-6 font-medium",
"#{if MapSet.member?(@disabled_jobs_ids, job.id), do: "text-slate-500", else: "text-gray-900"}"
]}
>
<%= job.name %>
</label>
Expand All @@ -132,10 +158,10 @@ defmodule LightningWeb.RunLive.RerunJobComponent do
phx-disable-with="Running..."
class="inline-flex w-full justify-center rounded-md bg-indigo-600 px-3 py-2 text-sm font-semibold text-white shadow-sm hover:bg-indigo-500 focus-visible:outline focus-visible:outline-2 focus-visible:outline-offset-2 focus-visible:outline-indigo-600 sm:col-start-1"
>
Rerun <%= @selected_count %> selected work order<%= if @selected_count >
1,
do: "s",
else: "" %> from selected job
Rerun <%= @retriable_count %> selected work <%= pluralize_with_s(
@retriable_count,
"order"
) %> from selected job
</button>
<button
id="rerun-all-from-job-trigger"
Expand Down Expand Up @@ -179,10 +205,10 @@ defmodule LightningWeb.RunLive.RerunJobComponent do
phx-disable-with="Running..."
class="inline-flex w-full justify-center rounded-md bg-indigo-600 px-3 py-2 text-sm font-semibold text-white shadow-sm hover:bg-indigo-500 focus-visible:outline focus-visible:outline-2 focus-visible:outline-offset-2 focus-visible:outline-indigo-600 sm:col-start-2"
>
Rerun <%= @selected_count %> selected work order<%= if @selected_count >
1,
do: "s",
else: "" %> from selected job
Rerun <%= @retriable_count %> selected work <%= pluralize_with_s(
@retriable_count,
"order"
) %> from selected job
</button>
<button
type="button"
Expand All @@ -198,4 +224,19 @@ defmodule LightningWeb.RunLive.RerunJobComponent do
</div>
"""
end

defp update_selected_job(socket, job_id) do
%{
retriable_count_per_job: retriable_count_per_job,
workflow_jobs: workflow_jobs
} = socket.assigns

selected_job =
Enum.find(workflow_jobs, fn job -> job.id == job_id end)

assign(socket,
selected_job: selected_job,
retriable_count: Map.get(retriable_count_per_job, selected_job.id)
)
end
end
3 changes: 3 additions & 0 deletions lib/lightning_web/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ defmodule LightningWeb.Utils do
alias Phoenix.HTML.Form
alias Plug.Conn.Query

def pluralize_with_s(n, string) when n <= 1, do: string
def pluralize_with_s(_integer, string), do: "#{string}s"

@doc """
Builds nested parameters for the given `form` field with the specified `value`.
Expand Down
Loading

0 comments on commit 83443d2

Please sign in to comment.