From 3a78e2c80ecf9c156023bf28a32fdc8407833bc0 Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Tue, 17 Dec 2024 17:46:29 +0100 Subject: [PATCH] Add workflows api to create, get and list --- lib/lightning/policies/workflows.ex | 28 +++ lib/lightning/projects.ex | 6 +- lib/lightning/workflows.ex | 20 ++ lib/lightning/workflows/edge.ex | 12 ++ lib/lightning/workflows/job.ex | 9 + lib/lightning/workflows/presence.ex | 57 +++++- lib/lightning/workflows/trigger.ex | 11 +- lib/lightning/workflows/workflow.ex | 11 ++ .../controllers/fallback_controller.ex | 7 + .../controllers/workflows_controller.ex | 85 ++++++++ lib/lightning_web/live/workflow_live/edit.ex | 14 +- lib/lightning_web/router.ex | 10 + .../workflows_controller_test.exs | 185 ++++++++++++++++++ test/support/conn_helpers.ex | 11 ++ test/support/factories.ex | 4 +- 15 files changed, 451 insertions(+), 19 deletions(-) create mode 100644 lib/lightning/policies/workflows.ex create mode 100644 lib/lightning_web/controllers/workflows_controller.ex create mode 100644 test/lightning_web/workflows_controller_test.exs diff --git a/lib/lightning/policies/workflows.ex b/lib/lightning/policies/workflows.ex new file mode 100644 index 0000000000..614f708fff --- /dev/null +++ b/lib/lightning/policies/workflows.ex @@ -0,0 +1,28 @@ +defmodule Lightning.Policies.Workflows do + @moduledoc """ + The Bodyguard Policy module for Workflows API. + """ + @behaviour Bodyguard.Policy + + alias Lightning.Accounts.User + alias Lightning.Projects.Project + alias Lightning.Run + + @type actions :: :access_write | :access_read + @spec authorize(actions(), User.t() | Runt.t(), Project.t()) :: + :ok | {:error, :unauthorized} + def authorize(access, %User{} = user, project) + when access in [:access_write, :access_read] do + Lightning.Policies.Permissions.can( + Lightning.Policies.ProjectUsers, + :access_project, + user, + project + ) + end + + def authorize(access, %Run{} = run, project) + when access in [:access_write, :access_read] do + Lightning.Runs.get_project_id_for_run(run) == project.id + end +end diff --git a/lib/lightning/projects.ex b/lib/lightning/projects.ex index 925f515576..45db1c2f4f 100644 --- a/lib/lightning/projects.ex +++ b/lib/lightning/projects.ex @@ -649,7 +649,11 @@ defmodule Lightning.Projects do |> Repo.one() end - def member_of?(%Project{id: project_id}, %User{id: user_id}) do + def member_of?(%Project{id: project_id}, user) do + member_of?(project_id, user) + end + + def member_of?(project_id, %User{id: user_id}) do from(p in Project, join: pu in assoc(p, :project_users), where: pu.user_id == ^user_id and p.id == ^project_id, diff --git a/lib/lightning/workflows.ex b/lib/lightning/workflows.ex index b84ee46f3d..7769ab3e16 100644 --- a/lib/lightning/workflows.ex +++ b/lib/lightning/workflows.ex @@ -36,6 +36,26 @@ defmodule Lightning.Workflows do Repo.all(Workflow) end + @doc """ + Returns the list of workflows for a project. + + ## Examples + + iex> list_project_workflows(project_id) + [%Workflow{}, ...] + + """ + def list_project_workflows(project_id, opts \\ []) do + include = Keyword.get(opts, :include, []) + + from(w in Workflow, + where: w.project_id == ^project_id, + preload: ^include, + order_by: :name + ) + |> Repo.all() + end + @doc """ Gets a single workflow with optional preloads. diff --git a/lib/lightning/workflows/edge.ex b/lib/lightning/workflows/edge.ex index bf02c72397..fc5f5012f3 100644 --- a/lib/lightning/workflows/edge.ex +++ b/lib/lightning/workflows/edge.ex @@ -30,6 +30,18 @@ defmodule Lightning.Workflows.Edge do } @conditions [:on_job_success, :on_job_failure, :always, :js_expression] + + @derive {Jason.Encoder, + only: [ + :id, + :condition_type, + :enabled, + :source_job_id, + :source_trigger_id, + :target_job_id, + :inserted_at, + :updated_at + ]} schema "workflow_edges" do belongs_to :workflow, Workflow belongs_to :source_job, Job diff --git a/lib/lightning/workflows/job.ex b/lib/lightning/workflows/job.ex index 97916f4ffc..7e3bf366aa 100644 --- a/lib/lightning/workflows/job.ex +++ b/lib/lightning/workflows/job.ex @@ -31,6 +31,15 @@ defmodule Lightning.Workflows.Job do workflow: nil | Workflow.t() | Ecto.Association.NotLoaded.t() } + @derive {Jason.Encoder, + only: [ + :id, + :body, + :name, + :adaptor, + :inserted_at, + :updated_at + ]} schema "jobs" do field :body, :string diff --git a/lib/lightning/workflows/presence.ex b/lib/lightning/workflows/presence.ex index f01660cfff..5f59c7f6f4 100644 --- a/lib/lightning/workflows/presence.ex +++ b/lib/lightning/workflows/presence.ex @@ -10,6 +10,8 @@ defmodule Lightning.Workflows.Presence do pubsub_server: Lightning.PubSub alias LightningWeb.Endpoint + alias Lightning.Accounts.User + alias Lightning.Workflows.Workflow defstruct user: nil, joined_at: nil, active_sessions: 0 @@ -41,22 +43,23 @@ defmodule Lightning.Workflows.Presence do end @doc """ - Tracks the presence of a user on a given topic. + Tracks the presence of a user on editing a workflow. ## Parameters - `user`: The user to be tracked. - - `topic`: The topic to track the user on. + - `workflow`: The workflow to track the user on. - `pid`: The process identifier for the user's session. ## Examples - iex> Lightning.Workflows.Presence.track_user_presence(%User{id: 1}, "room:lobby", self()) + iex> Lightning.Workflows.Presence.track_user_presence(%User{id: user_id}, %Workflow{id: workflow_id}, self()) :ok """ - def track_user_presence(user, topic, pid) do + def track_user_presence(user, %Workflow{id: workflow_id}, pid) do joined_at = System.system_time(:microsecond) + topic = workflow_topic(workflow_id) track(pid, topic, user.id, %{ user: user, @@ -67,25 +70,59 @@ defmodule Lightning.Workflows.Presence do end @doc """ - Lists all presences for a given topic. + Untracks the presence of a user on editing a workflow. ## Parameters - - `topic`: The topic to list the presences for. + - `user`: The user to be tracked. + - `workflow`: The workflow to track the user on. + - `pid`: The process identifier for the user's session. ## Examples - iex> Lightning.Workflows.Presence.list_presences("workflow:canvas") + iex> Lightning.Workflows.Presence.untrack_user_presence(%User{id: user_id}, %Workflow{id: workflow_id}, self()) + :ok + + """ + def untrack_user_presence(%User{id: user_id}, %Workflow{id: workflow_id}, pid) do + untrack( + pid, + workflow_topic(workflow_id), + user_id + ) + end + + @doc """ + Lists all presences for a given workflow. + + ## Parameters + + - `workflow`: The workflow to list the presences for. + + ## Examples + + iex> Lightning.Workflows.Presence.list_presences_for(%Workflow{id: xpto}) [%Lightning.Workflows.Presence{user: %User{id: 1}, ...}, ...] """ - def list_presences(topic) do - topic + def list_presences_for(%Workflow{id: workflow_id}) do + workflow_id + |> workflow_topic() |> list_presences_by_topic() |> group_presences_by_user() |> extract_presences() end + @doc """ + Informs if there is someone editing a workflow. + """ + def has_any_presence?(%Workflow{id: workflow_id}) do + workflow_id + |> workflow_topic() + |> list() + |> Enum.any?() + end + @doc """ Builds a summary of presences with details about the current user's presence, promotable presences, and edit priority. @@ -191,4 +228,6 @@ defmodule Lightning.Workflows.Presence do ) end) end + + defp workflow_topic(workflow_id), do: "workflow-#{workflow_id}:presence" end diff --git a/lib/lightning/workflows/trigger.ex b/lib/lightning/workflows/trigger.ex index 52a5034f07..3d3d3dc28d 100644 --- a/lib/lightning/workflows/trigger.ex +++ b/lib/lightning/workflows/trigger.ex @@ -21,10 +21,19 @@ defmodule Lightning.Workflows.Trigger do __meta__: Ecto.Schema.Metadata.t(), id: Ecto.UUID.t() | nil } + @type trigger_type :: :webhook | :cron @trigger_types [:webhook, :cron, :kafka] - @type trigger_type :: :webhook | :cron + @derive {Jason.Encoder, + only: [ + :id, + :comment, + :custom_path, + :cron_expression, + :type, + :enabled + ]} schema "triggers" do field :comment, :string field :custom_path, :string diff --git a/lib/lightning/workflows/workflow.ex b/lib/lightning/workflows/workflow.ex index af91c480b6..7889d1b52b 100644 --- a/lib/lightning/workflows/workflow.ex +++ b/lib/lightning/workflows/workflow.ex @@ -23,6 +23,17 @@ defmodule Lightning.Workflows.Workflow do project: nil | Project.t() | Ecto.Association.NotLoaded.t() } + @derive {Jason.Encoder, + only: [ + :id, + :name, + :project_id, + :edges, + :jobs, + :triggers, + :inserted_at, + :updated_at + ]} schema "workflows" do field :name, :string field :concurrency, :integer, default: nil diff --git a/lib/lightning_web/controllers/fallback_controller.ex b/lib/lightning_web/controllers/fallback_controller.ex index 467130d4d2..1f10d33a92 100644 --- a/lib/lightning_web/controllers/fallback_controller.ex +++ b/lib/lightning_web/controllers/fallback_controller.ex @@ -35,6 +35,13 @@ defmodule LightningWeb.FallbackController do |> render(:"403") end + def call(conn, {:error, :conflict}) do + conn + |> put_status(:conflict) + |> put_view(LightningWeb.ErrorView) + |> render(:"409") + end + def call(conn, {:error, %Ecto.Changeset{} = changeset}) do conn |> put_status(:unprocessable_entity) diff --git a/lib/lightning_web/controllers/workflows_controller.ex b/lib/lightning_web/controllers/workflows_controller.ex new file mode 100644 index 0000000000..b6c7528f55 --- /dev/null +++ b/lib/lightning_web/controllers/workflows_controller.ex @@ -0,0 +1,85 @@ +defmodule LightningWeb.WorkflowsController do + use LightningWeb, :controller + + alias Lightning.Projects.Project + alias Lightning.Repo + alias Lightning.Workflows + alias Lightning.Workflows.Presence + alias Lightning.Workflows.Workflow + alias Lightning.Policies.Permissions + + action_fallback LightningWeb.FallbackController + + require Logger + + def post(conn, %{"project_id" => project_id} = params) do + with :ok <- authorize_write(conn, project_id), + {:ok, %{id: workflow_id}} <- save_workflow(params, conn.assigns.subject) do + json(conn, %{id: workflow_id, error: nil}) + end + end + + def get(conn, %{"project_id" => project_id, "id" => workflow_id}) do + with :ok <- authorize_read(conn, project_id), + {:ok, workflow} <- get_workflow(workflow_id, project_id) do + json(conn, %{workflow: workflow, error: nil}) + end + end + + def get(conn, %{"project_id" => project_id}) do + with :ok <- authorize_read(conn, project_id) do + list = + Workflows.list_project_workflows(project_id, + include: [:edges, :jobs, :triggers] + ) + + json(conn, %{workflows: list, error: nil}) + end + end + + def put(conn, %{"project_id" => project_id, "id" => workflow_id} = params) do + with :ok <- authorize_write(conn, project_id), + {:ok, workflow} <- get_workflow(workflow_id, project_id), + :ok <- authorize_write(conn, workflow), + {:ok, %{id: workflow_id}} <- save_workflow(params, conn.assigns.subject) do + json(conn, %{id: workflow_id}) + end + end + + defp save_workflow(params, user), do: Workflows.save_workflow(params, user) + + defp get_workflow(workflow_id, project_id) do + case Workflows.get_workflow(workflow_id, include: [:edges, :jobs, :triggers]) do + nil -> {:error, :not_found} + %{project_id: ^project_id} = workflow -> {:ok, workflow} + _project_mismatch -> {:error, :bad_request} + end + end + + defp authorize_write(_conn, %Workflow{} = workflow) do + if Presence.has_any_presence?(workflow) do + {:error, :conflict} + else + :ok + end + end + + defp authorize_write(conn, project_id) do + authorize_for_project(conn, project_id, :access_write) + end + + defp authorize_read(conn, project_id) do + authorize_for_project(conn, project_id, :access_read) + end + + defp authorize_for_project(conn, project_id, access) do + project = Repo.get(Project, project_id) + + Permissions.can( + Lightning.Policies.Workflows, + access, + conn.assigns.subject, + project + ) + end +end diff --git a/lib/lightning_web/live/workflow_live/edit.ex b/lib/lightning_web/live/workflow_live/edit.ex index 9a3b75e6c6..adb7075277 100644 --- a/lib/lightning_web/live/workflow_live/edit.ex +++ b/lib/lightning_web/live/workflow_live/edit.ex @@ -1198,7 +1198,7 @@ defmodule LightningWeb.WorkflowLive.Edit do if connected?(socket) && socket.assigns.snapshot_version_tag == "latest" do Presence.track_user_presence( socket.assigns.current_user, - "workflow-#{socket.assigns.workflow.id}:presence", + socket.assigns.workflow, self() ) end @@ -1258,10 +1258,10 @@ defmodule LightningWeb.WorkflowLive.Edit do url = ~p"/projects/#{project.id}/w/#{workflow.id}?#{query_params}" if version != "latest" do - Presence.untrack( - self(), - "workflow-#{socket.assigns.workflow.id}:presence", - socket.assigns.current_user.id + Presence.untrack_user_presence( + socket.assigns.current_user, + socket.assigns.workflow, + self() ) end @@ -1805,8 +1805,8 @@ defmodule LightningWeb.WorkflowLive.Edit do def handle_info(%{event: "presence_diff", payload: _diff}, socket) do summary = - "workflow-#{socket.assigns.workflow.id}:presence" - |> Presence.list_presences() + socket.assigns.workflow + |> Presence.list_presences_for() |> Presence.build_presences_summary(socket.assigns) {:noreply, diff --git a/lib/lightning_web/router.ex b/lib/lightning_web/router.ex index 3842cc2fcf..dfc672c3e9 100644 --- a/lib/lightning_web/router.ex +++ b/lib/lightning_web/router.ex @@ -101,6 +101,16 @@ defmodule LightningWeb.Router do delete "/:name", CollectionsController, :delete_all end + ## Workflows + scope "/workflows", LightningWeb do + pipe_through [:authenticated_api] + + get "/:project_id", WorkflowsController, :get + get "/:project_id/:id", WorkflowsController, :get + post "/:project_id/", WorkflowsController, :post + put "/:project_id/:id", WorkflowsController, :put + end + ## Authentication routes scope "/", LightningWeb do pipe_through [:browser, :redirect_if_user_is_authenticated] diff --git a/test/lightning_web/workflows_controller_test.exs b/test/lightning_web/workflows_controller_test.exs new file mode 100644 index 0000000000..c7ec37d06f --- /dev/null +++ b/test/lightning_web/workflows_controller_test.exs @@ -0,0 +1,185 @@ +defmodule LightningWeb.API.WorkflowsControllerTest do + use LightningWeb.ConnCase, async: true + + import Lightning.Factories + + alias Lightning.Workflows.Workflow + + setup %{conn: conn} do + conn = + conn + |> put_req_header("accept", "application/json") + |> put_req_header("content-type", "application/json") + + {:ok, conn: conn} + end + + describe "GET /workflows/:project_id" do + test "returns a list of workflows", %{conn: conn} do + user = insert(:user) + + project = + insert(:project, project_users: [%{user: user}]) + + workflow1 = insert(:simple_workflow, name: "workf-A", project: project) + workflow2 = insert(:simple_workflow, name: "workf-B", project: project) + _workflow = insert(:simple_workflow) + + conn = + conn + |> assign_bearer(user) + |> get(~p"/workflows/#{project.id}") + + assert json_response(conn, 200) == %{ + "error" => nil, + "workflows" => [ + encode_decode(workflow1), + encode_decode(workflow2) + ] + } + end + + test "returns 401 without a token", %{conn: conn} do + %{id: workflow_id, project_id: project_id} = insert(:simple_workflow) + + conn = get(conn, ~p"/workflows/#{project_id}/#{workflow_id}") + + assert %{"error" => "Unauthorized"} == json_response(conn, 401) + end + + test "returns 401 when a token is invalid", %{conn: conn} do + %{id: workflow_id, project_id: project_id} = + workflow = insert(:simple_workflow) + + workorder = insert(:workorder, dataclip: insert(:dataclip)) + + run = + insert(:run, + work_order: workorder, + dataclip: workorder.dataclip, + starting_trigger: workflow.triggers |> hd() + ) + + token = Lightning.Workers.generate_run_token(run) + + conn = + conn + |> assign_bearer(token) + |> get(~p"/workflows/#{project_id}/#{workflow_id}") + + assert json_response(conn, 401) == %{"error" => "Unauthorized"} + end + + test "returns 401 on a project the user don't have access to", %{conn: conn} do + user = insert(:user) + + %{id: workflow_id, project_id: project_id} = insert(:simple_workflow) + + conn = + conn + |> assign_bearer(user) + |> get(~p"/workflows/#{project_id}/#{workflow_id}") + + assert json_response(conn, 401) == %{"error" => "Unauthorized"} + end + end + + describe "GET /workflows/:project_id/:workflow_id" do + test "returns a workflow", %{conn: conn} do + user = insert(:user) + + project = + insert(:project, project_users: [%{user: user}]) + + %{project_id: project_id} = + workflow = insert(:simple_workflow, project: project) + + conn = + conn + |> assign_bearer(user) + |> get(~p"/workflows/#{project_id}/#{workflow.id}") + + assert json_response(conn, 200) == %{ + "error" => nil, + "workflow" => encode_decode(workflow) + } + end + + test "returns 401 without a token", %{conn: conn} do + %{id: workflow_id, project_id: project_id} = insert(:simple_workflow) + + conn = get(conn, ~p"/workflows/#{project_id}/#{workflow_id}") + + assert %{"error" => "Unauthorized"} == json_response(conn, 401) + end + end + + describe "POST /workflows/:project_id" do + test "inserts a workflow", %{conn: conn} do + user = insert(:user) + + project = + insert(:project, project_users: [%{user: user}]) + + workflow = + build(:simple_workflow, name: "work1", project_id: project.id) + + conn = + conn + |> assign_bearer(user) + |> post(~p"/workflows/#{project.id}", Jason.encode!(workflow)) + + assert %{"id" => workflow_id, "error" => nil} = json_response(conn, 200) + assert Ecto.UUID.dump(workflow_id) + + saved_workflow = + Repo.get(Workflow, workflow_id) + |> Repo.preload([:edges, :jobs, :triggers]) + |> encode_decode() + |> remove_timestamps() + + assert workflow + |> Map.put(:id, workflow_id) + |> encode_decode() + |> remove_timestamps() == saved_workflow + end + + test "returns 401 without a token", %{conn: conn} do + user = insert(:user) + + project = + insert(:project, project_users: [%{user: user}]) + + build(:simple_workflow, name: "work1", project: project) + + conn = post(conn, ~p"/workflows/#{project.id}") + + assert %{"error" => "Unauthorized"} == json_response(conn, 401) + end + end + + defp encode_decode(item) do + item + |> Jason.encode!() + |> Jason.decode!() + end + + defp remove_timestamps([%{"edges" => _el} | _workflows] = list) + when is_list(list) do + Enum.map(list, &Map.drop(&1, ["inserted_at", "updated_at"])) + end + + defp remove_timestamps(list) when is_list(list) do + Enum.map(list, &Map.drop(&1, ["inserted_at", "updated_at"])) + end + + defp remove_timestamps(workflow) do + Map.merge(workflow, %{ + "inserted_at" => nil, + "updated_at" => nil, + "edges" => remove_timestamps(workflow["edges"]), + "jobs" => remove_timestamps(workflow["jobs"]), + "triggers" => remove_timestamps(workflow["triggers"]) + }) + end +end diff --git a/test/support/conn_helpers.ex b/test/support/conn_helpers.ex index e4cb38e605..67ee5bb728 100644 --- a/test/support/conn_helpers.ex +++ b/test/support/conn_helpers.ex @@ -1,4 +1,15 @@ defmodule LightningWeb.ConnHelpers do + @moduledoc false + + alias Lightning.Accounts + alias Lightning.Accounts.User + + def assign_bearer(conn, %User{} = user) do + token = Accounts.generate_api_token(user) + + conn |> Plug.Conn.put_req_header("authorization", "Bearer #{token}") + end + def assign_bearer(conn, token) do conn |> Plug.Conn.put_req_header("authorization", "Bearer #{token}") end diff --git a/test/support/factories.ex b/test/support/factories.ex index cf0ec20179..9460828d9c 100644 --- a/test/support/factories.ex +++ b/test/support/factories.ex @@ -480,8 +480,10 @@ defmodule Lightning.Factories do Enum.into(extra, %{ id: Ecto.UUID.generate(), source_trigger_id: trigger.id, + source_job_id: nil, target_job_id: job.id, - condition_type: :always + condition_type: :always, + enabled: true }) ) }