Skip to content

Commit

Permalink
Add workflows api to create, get and list
Browse files Browse the repository at this point in the history
  • Loading branch information
jyeshe committed Dec 17, 2024
1 parent ad06c03 commit 3a78e2c
Show file tree
Hide file tree
Showing 15 changed files with 451 additions and 19 deletions.
28 changes: 28 additions & 0 deletions lib/lightning/policies/workflows.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
defmodule Lightning.Policies.Workflows do
@moduledoc """
The Bodyguard Policy module for Workflows API.
"""
@behaviour Bodyguard.Policy

alias Lightning.Accounts.User
alias Lightning.Projects.Project
alias Lightning.Run

@type actions :: :access_write | :access_read
@spec authorize(actions(), User.t() | Runt.t(), Project.t()) ::
:ok | {:error, :unauthorized}
def authorize(access, %User{} = user, project)
when access in [:access_write, :access_read] do
Lightning.Policies.Permissions.can(
Lightning.Policies.ProjectUsers,
:access_project,
user,
project
)
end

def authorize(access, %Run{} = run, project)
when access in [:access_write, :access_read] do
Lightning.Runs.get_project_id_for_run(run) == project.id
end
end
6 changes: 5 additions & 1 deletion lib/lightning/projects.ex
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,11 @@ defmodule Lightning.Projects do
|> Repo.one()
end

def member_of?(%Project{id: project_id}, %User{id: user_id}) do
def member_of?(%Project{id: project_id}, user) do
member_of?(project_id, user)
end

def member_of?(project_id, %User{id: user_id}) do
from(p in Project,
join: pu in assoc(p, :project_users),
where: pu.user_id == ^user_id and p.id == ^project_id,
Expand Down
20 changes: 20 additions & 0 deletions lib/lightning/workflows.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ defmodule Lightning.Workflows do
Repo.all(Workflow)
end

@doc """
Returns the list of workflows for a project.
## Examples
iex> list_project_workflows(project_id)
[%Workflow{}, ...]
"""
def list_project_workflows(project_id, opts \\ []) do
include = Keyword.get(opts, :include, [])

from(w in Workflow,
where: w.project_id == ^project_id,
preload: ^include,
order_by: :name
)
|> Repo.all()
end

@doc """
Gets a single workflow with optional preloads.
Expand Down
12 changes: 12 additions & 0 deletions lib/lightning/workflows/edge.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ defmodule Lightning.Workflows.Edge do
}

@conditions [:on_job_success, :on_job_failure, :always, :js_expression]

@derive {Jason.Encoder,
only: [
:id,
:condition_type,
:enabled,
:source_job_id,
:source_trigger_id,
:target_job_id,
:inserted_at,
:updated_at
]}
schema "workflow_edges" do
belongs_to :workflow, Workflow
belongs_to :source_job, Job
Expand Down
9 changes: 9 additions & 0 deletions lib/lightning/workflows/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ defmodule Lightning.Workflows.Job do
workflow: nil | Workflow.t() | Ecto.Association.NotLoaded.t()
}

@derive {Jason.Encoder,
only: [
:id,
:body,
:name,
:adaptor,
:inserted_at,
:updated_at
]}
schema "jobs" do
field :body, :string

Expand Down
57 changes: 48 additions & 9 deletions lib/lightning/workflows/presence.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ defmodule Lightning.Workflows.Presence do
pubsub_server: Lightning.PubSub

alias LightningWeb.Endpoint
alias Lightning.Accounts.User
alias Lightning.Workflows.Workflow

defstruct user: nil, joined_at: nil, active_sessions: 0

Expand Down Expand Up @@ -41,22 +43,23 @@ defmodule Lightning.Workflows.Presence do
end

@doc """
Tracks the presence of a user on a given topic.
Tracks the presence of a user on editing a workflow.
## Parameters
- `user`: The user to be tracked.
- `topic`: The topic to track the user on.
- `workflow`: The workflow to track the user on.
- `pid`: The process identifier for the user's session.
## Examples
iex> Lightning.Workflows.Presence.track_user_presence(%User{id: 1}, "room:lobby", self())
iex> Lightning.Workflows.Presence.track_user_presence(%User{id: user_id}, %Workflow{id: workflow_id}, self())
:ok
"""
def track_user_presence(user, topic, pid) do
def track_user_presence(user, %Workflow{id: workflow_id}, pid) do
joined_at = System.system_time(:microsecond)
topic = workflow_topic(workflow_id)

track(pid, topic, user.id, %{
user: user,
Expand All @@ -67,25 +70,59 @@ defmodule Lightning.Workflows.Presence do
end

@doc """
Lists all presences for a given topic.
Untracks the presence of a user on editing a workflow.
## Parameters
- `topic`: The topic to list the presences for.
- `user`: The user to be tracked.
- `workflow`: The workflow to track the user on.
- `pid`: The process identifier for the user's session.
## Examples
iex> Lightning.Workflows.Presence.list_presences("workflow:canvas")
iex> Lightning.Workflows.Presence.untrack_user_presence(%User{id: user_id}, %Workflow{id: workflow_id}, self())
:ok
"""
def untrack_user_presence(%User{id: user_id}, %Workflow{id: workflow_id}, pid) do
untrack(
pid,
workflow_topic(workflow_id),
user_id
)
end

@doc """
Lists all presences for a given workflow.
## Parameters
- `workflow`: The workflow to list the presences for.
## Examples
iex> Lightning.Workflows.Presence.list_presences_for(%Workflow{id: xpto})
[%Lightning.Workflows.Presence{user: %User{id: 1}, ...}, ...]
"""
def list_presences(topic) do
topic
def list_presences_for(%Workflow{id: workflow_id}) do
workflow_id
|> workflow_topic()
|> list_presences_by_topic()
|> group_presences_by_user()
|> extract_presences()
end

@doc """
Informs if there is someone editing a workflow.
"""
def has_any_presence?(%Workflow{id: workflow_id}) do
workflow_id
|> workflow_topic()
|> list()
|> Enum.any?()
end

@doc """
Builds a summary of presences with details about the current user's presence, promotable presences,
and edit priority.
Expand Down Expand Up @@ -191,4 +228,6 @@ defmodule Lightning.Workflows.Presence do
)
end)
end

defp workflow_topic(workflow_id), do: "workflow-#{workflow_id}:presence"
end
11 changes: 10 additions & 1 deletion lib/lightning/workflows/trigger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,19 @@ defmodule Lightning.Workflows.Trigger do
__meta__: Ecto.Schema.Metadata.t(),
id: Ecto.UUID.t() | nil
}
@type trigger_type :: :webhook | :cron

@trigger_types [:webhook, :cron, :kafka]

@type trigger_type :: :webhook | :cron
@derive {Jason.Encoder,
only: [
:id,
:comment,
:custom_path,
:cron_expression,
:type,
:enabled
]}
schema "triggers" do
field :comment, :string
field :custom_path, :string
Expand Down
11 changes: 11 additions & 0 deletions lib/lightning/workflows/workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ defmodule Lightning.Workflows.Workflow do
project: nil | Project.t() | Ecto.Association.NotLoaded.t()
}

@derive {Jason.Encoder,
only: [
:id,
:name,
:project_id,
:edges,
:jobs,
:triggers,
:inserted_at,
:updated_at
]}
schema "workflows" do
field :name, :string
field :concurrency, :integer, default: nil
Expand Down
7 changes: 7 additions & 0 deletions lib/lightning_web/controllers/fallback_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ defmodule LightningWeb.FallbackController do
|> render(:"403")
end

def call(conn, {:error, :conflict}) do
conn
|> put_status(:conflict)
|> put_view(LightningWeb.ErrorView)
|> render(:"409")
end

def call(conn, {:error, %Ecto.Changeset{} = changeset}) do
conn
|> put_status(:unprocessable_entity)
Expand Down
85 changes: 85 additions & 0 deletions lib/lightning_web/controllers/workflows_controller.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
defmodule LightningWeb.WorkflowsController do
use LightningWeb, :controller

alias Lightning.Projects.Project
alias Lightning.Repo
alias Lightning.Workflows
alias Lightning.Workflows.Presence
alias Lightning.Workflows.Workflow
alias Lightning.Policies.Permissions

action_fallback LightningWeb.FallbackController

require Logger

def post(conn, %{"project_id" => project_id} = params) do
with :ok <- authorize_write(conn, project_id),
{:ok, %{id: workflow_id}} <- save_workflow(params, conn.assigns.subject) do
json(conn, %{id: workflow_id, error: nil})
end
end

def get(conn, %{"project_id" => project_id, "id" => workflow_id}) do
with :ok <- authorize_read(conn, project_id),
{:ok, workflow} <- get_workflow(workflow_id, project_id) do
json(conn, %{workflow: workflow, error: nil})
end
end

def get(conn, %{"project_id" => project_id}) do
with :ok <- authorize_read(conn, project_id) do
list =
Workflows.list_project_workflows(project_id,
include: [:edges, :jobs, :triggers]
)

json(conn, %{workflows: list, error: nil})
end
end

def put(conn, %{"project_id" => project_id, "id" => workflow_id} = params) do
with :ok <- authorize_write(conn, project_id),
{:ok, workflow} <- get_workflow(workflow_id, project_id),
:ok <- authorize_write(conn, workflow),
{:ok, %{id: workflow_id}} <- save_workflow(params, conn.assigns.subject) do
json(conn, %{id: workflow_id})
end
end

defp save_workflow(params, user), do: Workflows.save_workflow(params, user)

defp get_workflow(workflow_id, project_id) do
case Workflows.get_workflow(workflow_id, include: [:edges, :jobs, :triggers]) do
nil -> {:error, :not_found}
%{project_id: ^project_id} = workflow -> {:ok, workflow}
_project_mismatch -> {:error, :bad_request}
end
end

defp authorize_write(_conn, %Workflow{} = workflow) do
if Presence.has_any_presence?(workflow) do
{:error, :conflict}
else
:ok
end
end

defp authorize_write(conn, project_id) do
authorize_for_project(conn, project_id, :access_write)
end

defp authorize_read(conn, project_id) do
authorize_for_project(conn, project_id, :access_read)
end

defp authorize_for_project(conn, project_id, access) do
project = Repo.get(Project, project_id)

Permissions.can(
Lightning.Policies.Workflows,
access,
conn.assigns.subject,
project
)
end
end
14 changes: 7 additions & 7 deletions lib/lightning_web/live/workflow_live/edit.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1198,7 +1198,7 @@ defmodule LightningWeb.WorkflowLive.Edit do
if connected?(socket) && socket.assigns.snapshot_version_tag == "latest" do
Presence.track_user_presence(
socket.assigns.current_user,
"workflow-#{socket.assigns.workflow.id}:presence",
socket.assigns.workflow,
self()
)
end
Expand Down Expand Up @@ -1258,10 +1258,10 @@ defmodule LightningWeb.WorkflowLive.Edit do
url = ~p"/projects/#{project.id}/w/#{workflow.id}?#{query_params}"

if version != "latest" do
Presence.untrack(
self(),
"workflow-#{socket.assigns.workflow.id}:presence",
socket.assigns.current_user.id
Presence.untrack_user_presence(
socket.assigns.current_user,
socket.assigns.workflow,
self()
)
end

Expand Down Expand Up @@ -1805,8 +1805,8 @@ defmodule LightningWeb.WorkflowLive.Edit do

def handle_info(%{event: "presence_diff", payload: _diff}, socket) do
summary =
"workflow-#{socket.assigns.workflow.id}:presence"
|> Presence.list_presences()
socket.assigns.workflow
|> Presence.list_presences_for()
|> Presence.build_presences_summary(socket.assigns)

{:noreply,
Expand Down
Loading

0 comments on commit 3a78e2c

Please sign in to comment.