From 52ce27059e1c9f6a2ec0eac25dd7af2a990be7cd Mon Sep 17 00:00:00 2001 From: Rory McKinley Date: Thu, 2 Nov 2023 11:02:51 +0200 Subject: [PATCH] Partition work_orders --- CHANGELOG.md | 2 + config/runtime.exs | 6 +- lib/lightning/application.ex | 10 + lib/lightning/maintenance/admin_tools.ex | 17 + .../maintenance/partition_table_service.ex | 182 +++++++ mix.exs | 5 +- ...1114452_create_partitioned_work_orders.exs | 201 +++++++ .../maintenance/admin_tools_test.exs | 63 +++ .../partition_table_service_test.exs | 493 ++++++++++++++++++ 9 files changed, 977 insertions(+), 2 deletions(-) create mode 100644 lib/lightning/maintenance/admin_tools.ex create mode 100644 lib/lightning/maintenance/partition_table_service.ex create mode 100644 priv/repo/migrations/20231101114452_create_partitioned_work_orders.exs create mode 100644 test/lightning/maintenance/admin_tools_test.exs create mode 100644 test/lightning/maintenance/partition_table_service_test.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ac5998a8d..45331b7d74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -109,6 +109,8 @@ bearing with us as we move towards our first stable Lightning release.) [#1327](https://github.com/OpenFn/Lightning/issues/1327) - Have user create workflow name before moving to the canvas [#1103](https://github.com/OpenFn/Lightning/issues/1103) +- Partition `work_orders` by week and year and add functionality to maintain + the partitions. [#1254](https://github.com/OpenFn/Lightning/issues/1254) ### Changed diff --git a/config/runtime.exs b/config/runtime.exs index f39fe9e418..48489c0dc2 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -97,7 +97,11 @@ base_oban_cron = [ {"0 10 * * 1", Lightning.DigestEmailWorker, args: %{"type" => "weekly_project_digest"}}, {"0 10 1 * *", Lightning.DigestEmailWorker, - args: %{"type" => "monthly_project_digest"}} + args: %{"type" => "monthly_project_digest"}}, + {"0 1 * * *", Lightning.PartitionTableService, + args: %{"add_headroom" => %{"weeks" => 2}}}, + {"0 0 * * *", Lightning.PartitionTableService, + args: %{"drop_older_than" => %{"weeks" => -6}}} ] conditional_cron = diff --git a/lib/lightning/application.ex b/lib/lightning/application.ex index 8d196a89a2..75cac71438 100644 --- a/lib/lightning/application.ex +++ b/lib/lightning/application.ex @@ -131,4 +131,14 @@ defmodule Lightning.Application do def oban_opts() do Application.get_env(:lightning, Oban) end + + @impl true + @doc """ + Perform any idempotent database setup that must be done after the repo is started. + """ + def start_phase(:ensure_db_config, :normal, _opts) do + Lightning.PartitionTableService.add_headroom(:all, 2) + Lightning.PartitionTableService.add_headroom(:all, -5) + :ok + end end diff --git a/lib/lightning/maintenance/admin_tools.ex b/lib/lightning/maintenance/admin_tools.ex new file mode 100644 index 0000000000..305ad176ec --- /dev/null +++ b/lib/lightning/maintenance/admin_tools.ex @@ -0,0 +1,17 @@ +defmodule Lightning.AdminTools do + def generate_iso_weeks(start_date, end_date) do + Date.range(start_date, end_date) + |> Enum.map(&Timex.beginning_of_week(&1, :mon)) + |> Enum.uniq() + |> Enum.map(fn date -> + {year, week} = Timex.iso_week(date) + + { + year |> Integer.to_string(), + week |> Integer.to_string() |> String.pad_leading(2, "0"), + date |> Date.to_string(), + date |> Timex.shift(weeks: 1) |> Date.to_string() + } + end) + end +end diff --git a/lib/lightning/maintenance/partition_table_service.ex b/lib/lightning/maintenance/partition_table_service.ex new file mode 100644 index 0000000000..baf7b4f859 --- /dev/null +++ b/lib/lightning/maintenance/partition_table_service.ex @@ -0,0 +1,182 @@ +defmodule Lightning.PartitionTableService do + @moduledoc """ + Service to keep the partition tables up to date. + """ + + use Oban.Worker, + queue: :background, + max_attempts: 1 + + import Ecto.Query + + alias Lightning.Repo + + require Logger + + @impl Oban.Worker + def perform(%Oban.Job{args: %{"add_headroom" => %{"weeks" => weeks}}}) + when is_integer(weeks) do + add_headroom(:all, weeks) + end + + @impl Oban.Worker + def perform(%Oban.Job{args: %{"drop_older_than" => %{"weeks" => weeks}}}) + when is_integer(weeks) do + upper_bound = Timex.shift(DateTime.utc_now(), weeks: weeks) + + remove_empty("work_orders", upper_bound) + end + + def add_headroom(:all, weeks) when is_integer(weeks) do + add_headroom(:work_orders, weeks) |> log_partition_creation() + end + + def add_headroom(:work_orders, weeks) when is_integer(weeks) do + proposed_tables = tables_to_add("work_orders", weeks) + + :ok = + Enum.each(proposed_tables, fn {partition_name, from, to} -> + { + Repo.query(create_query(partition_name, "work_orders", from, to)) + } + end) + + proposed_tables + end + + def tables_to_add(table, weeks) do + today = Date.utc_today() + + existing_tables = get_partitions(table) + + Lightning.AdminTools.generate_iso_weeks(today, today |> Date.add(weeks * 7)) + |> Enum.map(&to_partition_details(table, &1)) + |> Enum.reject(fn {name, _from, _to} -> + Enum.find(existing_tables, &String.equivalent?(name, &1)) + end) + end + + def get_partitions(parent) do + %Postgrex.Result{rows: rows} = + Repo.query!( + ~S[ + SELECT CAST(inhrelid::regclass AS text) AS child + FROM pg_catalog.pg_inherits + WHERE inhparent = $1::text::regclass; + ], + [parent] + ) + + rows |> List.flatten() + end + + @doc """ + Drops empty partition tables that have an upper partition bound less than the + date given. + + This bound is the `TO` part of the partition: + + ``` + FOR VALUES FROM ('2020-12-28 00:00:00') TO ('2021-01-04 00:00:00') + ``` + """ + def remove_empty(parent, upper_bound) do + parent + |> find_range_partitions + |> partitions_older_than(upper_bound) + |> Enum.each(&drop_empty_partition(parent, &1)) + end + + def find_range_partitions(parent) do + Repo.query!( + ~S[ + SELECT + pt.relname AS partition_name, + pg_get_expr(pt.relpartbound, + pt.oid, + TRUE) AS partition_expression + FROM + pg_class base_tb + JOIN pg_inherits i ON + i.inhparent = base_tb.oid + JOIN pg_class pt ON + pt.oid = i.inhrelid + WHERE + base_tb.oid = $1::text::regclass + AND pg_get_expr( + pt.relpartbound, + pt.oid, + TRUE + ) != 'DEFAULT' + ], + [parent] + ).rows + end + + def partitions_older_than(partitions, bound) do + partitions + |> Enum.map(fn [table, range_expression] -> + [_, to_as_string] = + ~r/TO \('(.+)'\)/ + |> Regex.run(range_expression) + + {:ok, to_as_dt, _} = DateTime.from_iso8601(to_as_string <> "Z") + + [table, to_as_dt] + end) + |> Enum.filter(fn [_table, to] -> DateTime.compare(to, bound) == :lt end) + |> Enum.map(fn [table, _to] -> table end) + end + + def drop_empty_partition(parent, partition) do + unless valid_chars?(parent) && valid_chars?(partition) do + raise ArgumentError, message: "Table name contains invalid characters" + end + + partition + |> partition_empty? + |> handle_drop(parent, partition) + end + + defp valid_chars?(table_name) do + table_name =~ ~r/\A\w+\z/ + end + + defp partition_empty?(partition) do + from(r in partition, select: count()) |> Repo.one!() == 0 + end + + defp handle_drop(true, parent, partition) do + Logger.info("Detaching #{partition} from #{parent}") + Repo.query!("ALTER TABLE #{parent} DETACH PARTITION #{partition};") + Logger.info("Dropping #{partition}") + Repo.query!("DROP TABLE #{partition};") + end + + defp handle_drop(false, _parent, _partition) do + end + + defp create_query(partition, parent, from, to) do + """ + CREATE TABLE #{partition} + PARTITION OF #{parent} + FOR VALUES FROM ('#{from}') TO ('#{to}'); + """ + end + + defp to_partition_details(table, {year, week, from, to}) do + {"#{table}_#{year}_#{week}", from, to} + end + + defp log_partition_creation(partitions) when length(partitions) > 0 do + partitions + |> Enum.map_join("\n", fn {partition_name, from, to} -> + "Created #{partition_name} for #{from} -> #{to}" + end) + |> Logger.info() + end + + defp log_partition_creation(partitions) when partitions == [] do + Logger.info("No extra partitions were needed.") + end +end diff --git a/mix.exs b/mix.exs index 52a9966cba..7c2a355315 100644 --- a/mix.exs +++ b/mix.exs @@ -39,7 +39,10 @@ defmodule Lightning.MixProject do def application do [ mod: {Lightning.Application, [:timex]}, - extra_applications: [:logger, :runtime_tools, :os_mon] + extra_applications: [:logger, :runtime_tools, :os_mon], + start_phases: [ + ensure_db_config: [] + ] ] end diff --git a/priv/repo/migrations/20231101114452_create_partitioned_work_orders.exs b/priv/repo/migrations/20231101114452_create_partitioned_work_orders.exs new file mode 100644 index 0000000000..0f536b40ad --- /dev/null +++ b/priv/repo/migrations/20231101114452_create_partitioned_work_orders.exs @@ -0,0 +1,201 @@ +defmodule Lightning.Repo.Migrations.CreatePartitionedWorkOrders do + use Ecto.Migration + + def up do + execute(""" + ALTER TABLE work_orders + RENAME TO work_orders_monolith + """) + + execute(""" + ALTER TABLE work_orders_monolith + RENAME CONSTRAINT work_orders_pkey TO work_orders_monolith_pkey + """) + + execute(""" + ALTER INDEX work_orders_reason_id_index RENAME TO work_orders_monolith_reason_id_index + """) + + execute(""" + ALTER INDEX work_orders_state_index RENAME TO work_orders_monolith_state_index + """) + + execute(""" + ALTER INDEX work_orders_workflow_id_index RENAME TO work_orders_monolith_workflow_id_index + """) + + execute(""" + ALTER TABLE work_orders_monolith + RENAME CONSTRAINT work_orders_dataclip_id_fkey TO work_orders_monolith_dataclip_id_fkey + """) + + execute(""" + ALTER TABLE work_orders_monolith + RENAME CONSTRAINT work_orders_reason_id_fkey TO work_orders_monolith_reason_id_fkey + """) + + execute(""" + ALTER TABLE work_orders_monolith + RENAME CONSTRAINT work_orders_trigger_id_fkey TO work_orders_monolith_trigger_id_fkey + """) + + execute(""" + ALTER TABLE work_orders_monolith + RENAME CONSTRAINT work_orders_workflow_id_fkey TO work_orders_monolith_workflow_id_fkey + """) + + execute(""" + CREATE TABLE work_orders ( + id uuid NOT NULL, + workflow_id uuid NOT NULL, + reason_id uuid, + inserted_at timestamp without time zone NOT NULL, + updated_at timestamp without time zone NOT NULL, + trigger_id uuid, + dataclip_id uuid, + state character varying(255) DEFAULT 'pending'::character varying NOT NULL, + last_activity timestamp without time zone, + CONSTRAINT work_orders_pkey PRIMARY KEY (inserted_at, id) + ) PARTITION BY RANGE (inserted_at) + """) + + Lightning.AdminTools.generate_iso_weeks(~D[2023-01-02], ~D[2024-01-29]) + |> Enum.each(fn {year, wnum, from, to} -> + execute(""" + CREATE TABLE work_orders_#{year}_#{wnum} + PARTITION OF work_orders + FOR VALUES FROM ('#{from}') TO ('#{to}') + """) + end) + + execute(""" + CREATE TABLE work_orders_default + PARTITION OF work_orders + DEFAULT + """) + + execute(""" + INSERT INTO work_orders + SELECT * + FROM work_orders_monolith + """) + + execute(""" + CREATE INDEX work_orders_id_index + ON work_orders USING hash (id) + """) + + execute(""" + CREATE INDEX work_orders_reason_id_index + ON work_orders USING btree (reason_id) + """) + + execute(""" + CREATE INDEX work_orders_state_index + ON work_orders USING btree (state); + """) + + execute(""" + CREATE INDEX work_orders_workflow_id_index + ON work_orders USING btree (workflow_id) + """) + + execute(""" + ALTER TABLE work_orders + ADD CONSTRAINT work_orders_dataclip_id_fkey + FOREIGN KEY (dataclip_id) + REFERENCES dataclips(id) + ON DELETE SET NULL + """) + + execute(""" + ALTER TABLE work_orders + ADD CONSTRAINT work_orders_reason_id_fkey + FOREIGN KEY (reason_id) + REFERENCES invocation_reasons(id) + """) + + execute(""" + ALTER TABLE work_orders + ADD CONSTRAINT work_orders_trigger_id_fkey + FOREIGN KEY (trigger_id) + REFERENCES triggers(id) + ON DELETE SET NULL; + """) + + execute(""" + ALTER TABLE work_orders + ADD CONSTRAINT work_orders_workflow_id_fkey + FOREIGN KEY (workflow_id) + REFERENCES workflows(id) + ON DELETE CASCADE; + """) + end + + def down do + Lightning.AdminTools.generate_iso_weeks(~D[2023-01-02], ~D[2024-01-29]) + |> Enum.each(fn {year, wnum, _from, _to} -> + execute(""" + ALTER TABLE work_orders DETACH PARTITION work_orders_#{year}_#{wnum} + """) + + execute(""" + DROP TABLE work_orders_#{year}_#{wnum} + """) + end) + + execute(""" + ALTER TABLE work_orders DETACH PARTITION work_orders_default + """) + + execute(""" + DROP TABLE work_orders_default + """) + + execute(""" + DROP TABLE IF EXISTS work_orders + """) + + execute(""" + ALTER TABLE work_orders_monolith + RENAME TO work_orders + """) + + execute(""" + ALTER TABLE work_orders + RENAME CONSTRAINT work_orders_monolith_pkey TO work_orders_pkey + """) + + execute(""" + ALTER INDEX work_orders_monolith_reason_id_index RENAME TO work_orders_reason_id_index + """) + + execute(""" + ALTER INDEX work_orders_monolith_state_index RENAME TO work_orders_state_index + """) + + execute(""" + ALTER INDEX work_orders_monolith_workflow_id_index RENAME TO work_orders_workflow_id_index + """) + + execute(""" + ALTER TABLE work_orders + RENAME CONSTRAINT work_orders_monolith_dataclip_id_fkey TO work_orders_dataclip_id_fkey + """) + + execute(""" + ALTER TABLE work_orders + RENAME CONSTRAINT work_orders_monolith_reason_id_fkey TO work_orders_reason_id_fkey + """) + + execute(""" + ALTER TABLE work_orders + RENAME CONSTRAINT work_orders_monolith_trigger_id_fkey TO work_orders_trigger_id_fkey + """) + + execute(""" + ALTER TABLE work_orders + RENAME CONSTRAINT work_orders_monolith_workflow_id_fkey TO work_orders_workflow_id_fkey + """) + end +end diff --git a/test/lightning/maintenance/admin_tools_test.exs b/test/lightning/maintenance/admin_tools_test.exs new file mode 100644 index 0000000000..705c60189e --- /dev/null +++ b/test/lightning/maintenance/admin_tools_test.exs @@ -0,0 +1,63 @@ +defmodule Lightning.AdminToolstest do + use ExUnit.Case, async: true + + alias Lightning.AdminTools + + describe "generate_iso_weeks" do + test "returns list of weeks when both dates are Mondays" do + expected_weeks = [ + {"2023", "08", "2023-02-20", "2023-02-27"}, + {"2023", "09", "2023-02-27", "2023-03-06"}, + {"2023", "10", "2023-03-06", "2023-03-13"}, + {"2023", "11", "2023-03-13", "2023-03-20"}, + {"2023", "12", "2023-03-20", "2023-03-27"} + ] + + weeks = AdminTools.generate_iso_weeks(~D[2023-02-20], ~D[2023-03-20]) + + assert weeks == expected_weeks + end + + test "returns list of weeks when start date is a Monday" do + expected_weeks = [ + {"2023", "08", "2023-02-20", "2023-02-27"}, + {"2023", "09", "2023-02-27", "2023-03-06"}, + {"2023", "10", "2023-03-06", "2023-03-13"}, + {"2023", "11", "2023-03-13", "2023-03-20"}, + {"2023", "12", "2023-03-20", "2023-03-27"} + ] + + weeks = AdminTools.generate_iso_weeks(~D[2023-02-20], ~D[2023-03-23]) + + assert weeks == expected_weeks + end + + test "returns list of weeks when end date is a Monday" do + expected_weeks = [ + {"2023", "08", "2023-02-20", "2023-02-27"}, + {"2023", "09", "2023-02-27", "2023-03-06"}, + {"2023", "10", "2023-03-06", "2023-03-13"}, + {"2023", "11", "2023-03-13", "2023-03-20"}, + {"2023", "12", "2023-03-20", "2023-03-27"} + ] + + weeks = AdminTools.generate_iso_weeks(~D[2023-02-22], ~D[2023-03-20]) + + assert weeks == expected_weeks + end + + test "returns list of weeks when neither day is a Monday" do + expected_weeks = [ + {"2023", "08", "2023-02-20", "2023-02-27"}, + {"2023", "09", "2023-02-27", "2023-03-06"}, + {"2023", "10", "2023-03-06", "2023-03-13"}, + {"2023", "11", "2023-03-13", "2023-03-20"}, + {"2023", "12", "2023-03-20", "2023-03-27"} + ] + + weeks = AdminTools.generate_iso_weeks(~D[2023-02-22], ~D[2023-03-23]) + + assert weeks == expected_weeks + end + end +end diff --git a/test/lightning/maintenance/partition_table_service_test.exs b/test/lightning/maintenance/partition_table_service_test.exs new file mode 100644 index 0000000000..ddc2aba584 --- /dev/null +++ b/test/lightning/maintenance/partition_table_service_test.exs @@ -0,0 +1,493 @@ +defmodule Lightning.PartitionTableServiceTest do + use Lightning.DataCase, async: false + + import Lightning.Factories + + alias Lightning.PartitionTableService, as: Service + + describe "perform" do + test "adds additional partitions" do + parent = "work_orders" + + drop_range_partitions(parent) + + add_partitions(parent) + + now = DateTime.utc_now() + + new_partitions = + [ + now |> build_partition_name(parent), + now |> date_with_offset(1) |> build_partition_name(parent), + now |> date_with_offset(2) |> build_partition_name(parent) + ] + + expected = modified_relations(new_partitions, all_relations()) + + Service.perform(%Oban.Job{ + args: %{"add_headroom" => %{"weeks" => 2}} + }) + + assert all_relations() == expected + end + + test "removes obsolete partitions" do + parent = "work_orders" + + now = DateTime.now!("Etc/UTC") + + drop_range_partitions(parent) + + new_range_partitions = + -2..3 + |> generate_partition_properties(now) + |> Enum.map(&partition_name(&1, parent)) + + new_partitions = ["#{parent}_default" | new_range_partitions] + + expected = modified_relations(new_partitions, all_relations()) + + generate_partitions(-6..3, now, parent) + + Service.perform(%Oban.Job{ + args: %{"drop_older_than" => %{"weeks" => -2}} + }) + + assert all_relations() == expected + end + end + + test "gets a list of partitions for a given parent" do + parent = "work_orders" + + drop_range_partitions(parent) + + add_partitions(parent) + + expected = [ + "#{parent}_2023_01", + "#{parent}_2023_02", + "#{parent}_2023_03", + "#{parent}_default" + ] + + assert Service.get_partitions("work_orders") |> Enum.sort() == expected + end + + test "tables_to_add returns tables that do not already exist" do + now = DateTime.now!("Etc/UTC") + + parent = "work_orders" + + drop_range_partitions(parent) + + existing_partition_properties = + 0..3 + |> Enum.map(&date_with_offset(now, &1)) + |> generate_partition_properties() + + generate_partitions(existing_partition_properties, parent) + + expected_additional_partitions = + 4..6 + |> Enum.map(&date_with_offset(now, &1)) + |> generate_partition_properties() + |> Enum.map(fn properties -> + {_, _, from, to} = properties + + { + partition_name(properties, parent), + from |> DateTime.to_date() |> Date.to_string(), + to |> DateTime.to_date() |> Date.to_string() + } + end) + |> Enum.sort_by(fn {a, _, _} -> a end, :asc) + + proposed_partitions = Service.tables_to_add(parent, 6) |> Enum.sort() + + assert proposed_partitions == expected_additional_partitions + end + + test "add_headroom - all" do + now = DateTime.now!("Etc/UTC") + + parent = "work_orders" + + drop_range_partitions(parent) + + expected_partitions = + 0..3 + |> Enum.map(&date_with_offset(now, &1)) + |> generate_partition_properties() + |> Enum.map(&partition_name(&1, parent)) + + expected = modified_relations(expected_partitions, all_relations()) + + Service.add_headroom(:all, 3) + + assert all_relations() == expected + end + + test "add_headroom - parent specified" do + now = DateTime.now!("Etc/UTC") + + parent = "work_orders" + + drop_range_partitions(parent) + + new_partitions = + 0..3 + |> Enum.map(&date_with_offset(now, &1)) + |> generate_partition_properties() + |> Enum.map(fn {_, _, from, _} -> from end) + |> Enum.map(&build_partition_name(&1, parent)) + + expected = modified_relations(new_partitions, all_relations()) + + Service.add_headroom(:work_orders, 3) + + assert all_relations() == expected + end + + test "remove_empty" do + parent = "work_orders" + + now = DateTime.now!("Etc/UTC") + + drop_range_partitions(parent) + + expected_range_partitions = + -2..3 + |> generate_partition_properties(now) + |> Enum.map(&partition_name(&1, parent)) + + new_partitions = ["#{parent}_default" | expected_range_partitions] + + expected = modified_relations(new_partitions, all_relations()) + + generate_partitions(-6..3, now, parent) + + weeks_ago = Timex.shift(DateTime.utc_now(), weeks: -2) + + Service.remove_empty(parent, weeks_ago) + + assert all_relations() == expected + end + + describe "list partitions" do + test "returns partitions of the specified table" do + drop_range_partitions("work_orders") + + add_partitions("work_orders") + + sort_fn = fn [name, _expression] -> name end + + expected = [ + [ + "work_orders_2023_01", + "FOR VALUES FROM ('2023-01-01 00:00:00') TO ('2023-01-31 00:00:00')" + ], + [ + "work_orders_2023_02", + "FOR VALUES FROM ('2023-02-01 00:00:00') TO ('2023-02-28 00:00:00')" + ], + [ + "work_orders_2023_03", + "FOR VALUES FROM ('2023-03-01 00:00:00') TO ('2023-03-31 00:00:00')" + ] + ] |> Enum.sort_by(sort_fn, :asc) + + partitions = + Service.find_range_partitions("work_orders") + |> Enum.sort_by(sort_fn, :asc) + + assert partitions == expected + end + end + + describe "partitions_older_than" do + test "it returns partition tables that end before the given datetime" do + parent = "work_orders" + bound = ~U[2023-04-29 23:59:59Z] + partitions = input_partitions(parent) + expected = ["#{parent}_2023_01", "#{parent}_2023_02", "#{parent}_2023_03"] + + assert Service.partitions_older_than(partitions, bound) == expected + end + + defp input_partitions(parent) do + [ + [ + "#{parent}_2023_01", + "FOR VALUES FROM ('2023-01-01 00:00:00') TO ('2023-01-31 00:00:00')" + ], + [ + "#{parent}_2023_02", + "FOR VALUES FROM ('2023-02-01 00:00:00') TO ('2023-02-28 00:00:00')" + ], + [ + "#{parent}_2023_03", + "FOR VALUES FROM ('2023-03-01 00:00:00') TO ('2023-03-31 00:00:00')" + ], + [ + "#{parent}_2023_04", + "FOR VALUES FROM ('2023-04-01 00:00:00') TO ('2023-04-30 00:00:00')" + ], + [ + "#{parent}_2023_05", + "FOR VALUES FROM ('2023-05-01 00:00:00') TO ('2023-05-31 00:00:00')" + ] + ] + end + end + + describe "drop_empty_partition" do + test "drops the named partition" do + parent = "work_orders" + + drop_range_partitions(parent) + + add_partitions(parent) + + expected = + [ + "#{parent}", + "#{parent}_2023_01", + "#{parent}_2023_03", + "#{parent}_default", + "#{parent}_monolith" + ] + |> Enum.sort() + + Service.drop_empty_partition(parent, "#{parent}_2023_02") + + assert associated_relations(all_relations(), parent) == expected + end + + test "does nothing if the table is not empty" do + parent = "work_orders" + + drop_range_partitions(parent) + + add_partitions(parent) + + expected = + [ + "#{parent}", + "#{parent}_2023_01", + "#{parent}_2023_02", + "#{parent}_2023_03", + "#{parent}_default", + "#{parent}_monolith" + ] + |> Enum.sort() + + insert(:workorder, inserted_at: ~U[2023-02-15 10:00:00Z]) + + Service.drop_empty_partition(parent, "#{parent}_2023_02") + + assert associated_relations(all_relations(), parent) == expected + end + + test "errors out if the parent contains unexpected chars" do + parent = "work_orders" + + drop_range_partitions(parent) + + add_partitions(parent) + + assert_raise( + ArgumentError, + fn -> + Service.drop_empty_partition("#{parent} --", "#{parent}_2023_02") + end + ) + + expected = + [ + "#{parent}", + "#{parent}_2023_01", + "#{parent}_2023_02", + "#{parent}_2023_03", + "#{parent}_default", + "#{parent}_monolith" + ] + |> Enum.sort() + + assert associated_relations(all_relations(), parent) == expected + end + + test "errors out if the partition contains unexpected chars" do + parent = "work_orders" + + drop_range_partitions(parent) + + add_partitions(parent) + + assert_raise( + ArgumentError, + fn -> + Service.drop_empty_partition(parent, "#{parent}_2023_2 --") + end + ) + + expected = + [ + "#{parent}", + "#{parent}_2023_01", + "#{parent}_2023_02", + "#{parent}_2023_03", + "#{parent}_default", + "#{parent}_monolith" + ] + |> Enum.sort() + + assert associated_relations(all_relations(), parent) == expected + end + end + + defp all_relations() do + Repo.query!(~S[ + SELECT c.relname + FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + LEFT JOIN pg_catalog.pg_am am ON am.oid = c.relam + WHERE c.relkind IN ('r','p','') + AND n.nspname <> 'pg_catalog' + AND n.nspname !~ '^pg_toast' + AND n.nspname <> 'information_schema' + AND pg_catalog.pg_table_is_visible(c.oid); + ]).rows + |> List.flatten() + |> Enum.sort() + end + + defp week(date) do + {_year, week} = Timex.iso_week(date) + + week + end + + defp build_partition_name(date, parent) do + {year, week} = Timex.iso_week(date) + + padded_week = week |> Integer.to_string() |> String.pad_leading(2, "0") + + "#{parent}_#{year}_#{padded_week}" + end + + defp drop_range_partitions(parent) do + all_relations() + |> Enum.filter(&(&1 =~ ~r/\A#{parent}_[2d]/)) + |> Enum.each(fn partition -> + Repo.query!("ALTER TABLE #{parent} DETACH PARTITION #{partition};") + Repo.query!("DROP TABLE #{partition};") + end) + end + + defp add_partitions(parent) do + [ + {"2023", "1", "2023-01-01", "2023-01-31"}, + {"2023", "2", "2023-02-01", "2023-02-28"}, + {"2023", "3", "2023-03-01", "2023-03-31"} + ] + |> Enum.each(&create_range_partition(&1, parent)) + + create_default_partition(parent) + end + + defp create_range_partition(partition_properties, parent) do + {_year, _num, from, to} = partition_properties + + Repo.query!(""" + CREATE TABLE #{partition_name(partition_properties, parent)} + PARTITION OF #{parent} + FOR VALUES FROM ('#{from}') TO ('#{to}') + """) + end + + defp partition_name({year, num, _, _}, parent) when is_binary(num) do + padded_num = num |> String.pad_leading(2, "0") + + "#{parent}_#{year}_#{padded_num}" + end + + defp partition_name({year, num, _, _}, parent) when is_integer(num) do + padded_num = num |> Integer.to_string() |> String.pad_leading(2, "0") + + "#{parent}_#{year}_#{padded_num}" + end + + defp create_default_partition(parent) do + Repo.query!(""" + CREATE TABLE #{parent}_default + PARTITION OF #{parent} + DEFAULT + """) + end + + defp associated_relations(relations, parent) do + relations + |> Enum.filter(&(&1 =~ ~r/\A#{parent}/)) + |> Enum.sort() + end + + defp generate_partition_properties(dates) do + dates + |> Enum.map(fn from -> + shifted_from = from |> shift_to_monday() + to = range_end(shifted_from) + + {from.year, week(from), shifted_from, to} + end) + end + + defp generate_partition_properties(range, now) do + range + |> Enum.map(fn week_offset -> + from = date_with_offset(now, week_offset) |> shift_to_monday() + to = range_end(from) + + {from.year, normalise_counter(week_offset), from, to} + end) + end + + defp normalise_counter(counter) when counter < 0 do + "minus_#{abs(counter)}" + end + + defp normalise_counter(counter) when counter >= 0 do + "#{counter}" + end + + defp generate_partitions(properties, parent) do + properties + |> Enum.map(&create_range_partition(&1, parent)) + + create_default_partition(parent) + end + + defp generate_partitions(range, now, parent) do + range + |> generate_partition_properties(now) + |> Enum.map(&create_range_partition(&1, parent)) + + create_default_partition(parent) + end + + defp date_with_offset(now, offset) do + DateTime.add(now, 7 * offset, :day) + end + + defp range_end(range_start) do + DateTime.add(range_start, 7, :day) + end + + defp shift_to_monday(date) do + date |> Timex.beginning_of_week(:mon) + end + + defp modified_relations(new_relations, existing_relations) do + [new_relations | existing_relations] |> List.flatten() |> Enum.sort() + end +end