From 7aee72526ca566f38d108d557dee6970eaeb36b4 Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Wed, 5 Mar 2025 16:02:24 +0100 Subject: [PATCH] Allow predefined progress window by projects --- lib/lightning/projects/project.ex | 3 +-- lib/lightning/runs/query.ex | 39 ++++++++++++++++++++----------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/lib/lightning/projects/project.ex b/lib/lightning/projects/project.ex index f5f7c25080..c4dc9d8812 100644 --- a/lib/lightning/projects/project.ex +++ b/lib/lightning/projects/project.ex @@ -75,7 +75,6 @@ defmodule Lightning.Projects.Project do |> validate_required([:name]) |> validate_format(:name, ~r/^[a-z\-\d]+$/) |> validate_dataclip_retention_period() - |> validate_inclusion(:concurrency, [1, nil]) |> validate_inclusion(:history_retention_period, data_retention_options()) |> validate_inclusion(:dataclip_retention_period, data_retention_options()) end @@ -124,7 +123,7 @@ defmodule Lightning.Projects.Project do def project_with_users_changeset(project, attrs) do project - |> cast(attrs, [:id, :name, :description]) + |> cast(attrs, [:id, :name, :description, :concurrency]) |> cast_assoc(:project_users, required: true, sort_param: :users_sort diff --git a/lib/lightning/runs/query.ex b/lib/lightning/runs/query.ex index e34df1c892..a6c4040390 100644 --- a/lib/lightning/runs/query.ex +++ b/lib/lightning/runs/query.ex @@ -69,8 +69,8 @@ defmodule Lightning.Runs.Query do - `row_number`, the number of the row in the window, per workflow - `concurrency`, the maximum number of runs that can be claimed for the workflow """ - @spec in_progress_window() :: Ecto.Queryable.t() - def in_progress_window do + @spec in_progress_window(:dynamic | :by_project) :: Ecto.Queryable.t() + def in_progress_window(:dynamic) do from(r in Run, where: r.state in [:available, :claimed, :started], join: wo in assoc(r, :work_order), @@ -81,13 +81,8 @@ defmodule Lightning.Runs.Query do row_number: [ partition_by: fragment( - """ - CASE - WHEN ? IS NULL THEN ? - ELSE ? - END - """, - p.concurrency, + "CASE WHEN ? IS NOT NULL THEN ? ELSE ? END", + w.concurrency, w.id, p.id ), @@ -102,7 +97,25 @@ defmodule Lightning.Runs.Query do # calculated here? row_number: row_number() |> over(:row_number), project_id: w.project_id, - concurrency: coalesce(p.concurrency, w.concurrency), + concurrency: coalesce(w.concurrency, p.concurrency), + inserted_at: r.inserted_at + }) + end + + def in_progress_window(:by_project) do + from(r in Run, + where: r.state in [:available, :claimed, :started], + join: wo in assoc(r, :work_order), + join: w in assoc(wo, :workflow), + join: p in assoc(w, :project) + ) + |> select([r, _wo, _w, p], %{ + id: r.id, + state: r.state, + row_number: + row_number() |> over(partition_by: p.id, order_by: [asc: r.inserted_at]), + project_id: p.id, + concurrency: p.concurrency, inserted_at: r.inserted_at }) end @@ -122,10 +135,10 @@ defmodule Lightning.Runs.Query do > eligible_for_claim() |> prepend_order_by([:priority]) > ``` """ - @spec eligible_for_claim() :: Ecto.Queryable.t() - def eligible_for_claim do + @spec eligible_for_claim(atom()) :: Ecto.Queryable.t() + def eligible_for_claim(window_partition \\ :dynamic) do Run - |> with_cte("in_progress_window", as: ^in_progress_window()) + |> with_cte("in_progress_window", as: ^in_progress_window(window_partition)) |> join(:inner, [r], ipw in fragment(~s("in_progress_window")), on: r.id == ipw.id, as: :in_progress_window