Skip to content

Commit

Permalink
Allow predefined progress window by projects
Browse files Browse the repository at this point in the history
  • Loading branch information
jyeshe committed Mar 6, 2025
1 parent 100e5af commit 612a54c
Showing 1 changed file with 26 additions and 13 deletions.
39 changes: 26 additions & 13 deletions lib/lightning/runs/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ defmodule Lightning.Runs.Query do
- `row_number`, the number of the row in the window, per workflow
- `concurrency`, the maximum number of runs that can be claimed for the workflow
"""
@spec in_progress_window() :: Ecto.Queryable.t()
def in_progress_window do
@spec in_progress_window(:dynamic | :by_project | :by_workflow) ::
Ecto.Queryable.t()
def in_progress_window(:dynamic) do
from(r in Run,
where: r.state in [:available, :claimed, :started],
join: wo in assoc(r, :work_order),
Expand All @@ -81,13 +82,8 @@ defmodule Lightning.Runs.Query do
row_number: [
partition_by:
fragment(
"""
CASE
WHEN ? IS NULL THEN ?
ELSE ?
END
""",
p.concurrency,
"CASE WHEN ? IS NOT NULL THEN ? ELSE ? END",
w.concurrency,
w.id,
p.id
),
Expand All @@ -102,7 +98,24 @@ defmodule Lightning.Runs.Query do
# calculated here?
row_number: row_number() |> over(:row_number),
project_id: w.project_id,
concurrency: coalesce(p.concurrency, w.concurrency),
concurrency: coalesce(w.concurrency, p.concurrency),
inserted_at: r.inserted_at
})
end

def in_progress_window(:by_project) do
from(r in Run,
where: r.state in [:available, :claimed, :started],
join: wo in assoc(r, :work_order),
join: w in assoc(wo, :workflow),
join: p in assoc(w, :project)
)
|> select([r, _wo, _w, p], %{
id: r.id,
state: r.state,
row_number: row_number() |> over(partition_by: p.id, order_by: [asc: r.inserted_at]),
project_id: p.id,
concurrency: p.concurrency,
inserted_at: r.inserted_at
})
end
Expand All @@ -122,10 +135,10 @@ defmodule Lightning.Runs.Query do
> eligible_for_claim() |> prepend_order_by([:priority])
> ```
"""
@spec eligible_for_claim() :: Ecto.Queryable.t()
def eligible_for_claim do
@spec eligible_for_claim(atom()) :: Ecto.Queryable.t()
def eligible_for_claim(window_partition \\ :dynamic) do
Run
|> with_cte("in_progress_window", as: ^in_progress_window())
|> with_cte("in_progress_window", as: ^in_progress_window(window_partition))
|> join(:inner, [r], ipw in fragment(~s("in_progress_window")),
on: r.id == ipw.id,
as: :in_progress_window
Expand Down

0 comments on commit 612a54c

Please sign in to comment.