Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioning: Initial suggestion #485

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion lib/data_layer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,31 @@ defmodule AshPostgres.DataLayer do
]
}

@partitioning %Spark.Dsl.Section{
name: :partitioning,
describe: """
A section for configuring the initial partitioning of the table
""",
examples: [
"""
partitioning do
method :list
attribute :post
end
"""
],
schema: [
method: [
type: {:one_of, [:range, :list, :hash]},
doc: "Specifying what partitioning method to use"
],
attribute: [
type: :atom,
doc: "The attribute to partition on"
]
]
}

@postgres %Spark.Dsl.Section{
name: :postgres,
describe: """
Expand All @@ -262,7 +287,8 @@ defmodule AshPostgres.DataLayer do
@custom_statements,
@manage_tenant,
@references,
@check_constraints
@check_constraints,
@partitioning
],
modules: [
:repo
Expand Down
10 changes: 10 additions & 0 deletions lib/data_layer/info.ex
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,14 @@ defmodule AshPostgres.DataLayer.Info do
def manage_tenant_update?(resource) do
Extension.get_opt(resource, [:postgres, :manage_tenant], :update?, false)
end

@doc "Partitioning method"
def partitioning_method(resource) do
Extension.get_opt(resource, [:postgres, :partitioning], :method, nil)
end

@doc "Partitioning attribute"
def partitioning_attribute(resource) do
Extension.get_opt(resource, [:postgres, :partitioning], :attribute, nil)
end
end
35 changes: 31 additions & 4 deletions lib/migration_generator/migration_generator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1192,14 +1192,25 @@ defmodule AshPostgres.MigrationGenerator do

defp group_into_phases(
[
%Operation.CreateTable{table: table, schema: schema, multitenancy: multitenancy} | rest
%Operation.CreateTable{
table: table,
schema: schema,
multitenancy: multitenancy,
partitioning: partitioning
}
| rest
],
nil,
acc
) do
group_into_phases(
rest,
%Phase.Create{table: table, schema: schema, multitenancy: multitenancy},
%Phase.Create{
table: table,
schema: schema,
multitenancy: multitenancy,
partitioning: partitioning
},
acc
)
end
Expand Down Expand Up @@ -1801,7 +1812,8 @@ defmodule AshPostgres.MigrationGenerator do
table: snapshot.table,
schema: snapshot.schema,
multitenancy: snapshot.multitenancy,
old_multitenancy: empty_snapshot.multitenancy
old_multitenancy: empty_snapshot.multitenancy,
partitioning: snapshot.partitioning
}
| acc
])
Expand Down Expand Up @@ -2836,7 +2848,8 @@ defmodule AshPostgres.MigrationGenerator do
repo: AshPostgres.DataLayer.Info.repo(resource, :mutate),
multitenancy: multitenancy(resource),
base_filter: AshPostgres.DataLayer.Info.base_filter_sql(resource),
has_create_action: has_create_action?(resource)
has_create_action: has_create_action?(resource),
partitioning: partitioning(resource)
}

hash =
Expand Down Expand Up @@ -2911,6 +2924,20 @@ defmodule AshPostgres.MigrationGenerator do
end)
end

defp partitioning(resource) do
method = AshPostgres.DataLayer.Info.partitioning_method(resource)
attribute = AshPostgres.DataLayer.Info.partitioning_attribute(resource)

if not is_nil(method) and not is_nil(attribute) do
%{
method: method,
attribute: attribute
}
else
nil
end
end

defp multitenancy(resource) do
strategy = Ash.Resource.Info.multitenancy_strategy(resource)
attribute = Ash.Resource.Info.multitenancy_attribute(resource)
Expand Down
2 changes: 1 addition & 1 deletion lib/migration_generator/operation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ defmodule AshPostgres.MigrationGenerator.Operation do

defmodule CreateTable do
@moduledoc false
defstruct [:table, :schema, :multitenancy, :old_multitenancy]
defstruct [:table, :schema, :multitenancy, :old_multitenancy, :partitioning]
end

defmodule AddAttribute do
Expand Down
38 changes: 28 additions & 10 deletions lib/migration_generator/phase.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,21 @@ defmodule AshPostgres.MigrationGenerator.Phase do

defmodule Create do
@moduledoc false
defstruct [:table, :schema, :multitenancy, operations: [], commented?: false]
defstruct [:table, :schema, :multitenancy, partitioning: nil, operations: [], commented?: false]

import AshPostgres.MigrationGenerator.Operation.Helper, only: [as_atom: 1]

def up(%{schema: schema, table: table, operations: operations, multitenancy: multitenancy}) do
def up(%{schema: schema, table: table, operations: operations, multitenancy: multitenancy, partitioning: partitioning}) do
if multitenancy.strategy == :context do
"create table(:#{as_atom(table)}, primary_key: false, prefix: prefix()) do\n" <>
arguments = arguments([prefix("prefix()"), options(partitioning: partitioning)])

"create table(:#{as_atom(table)}, primary_key: false#{arguments}) do\n" <>
Enum.map_join(operations, "\n", fn operation -> operation.__struct__.up(operation) end) <>
"\nend"
else
opts =
if schema do
", prefix: \"#{schema}\""
else
""
end
arguments = arguments([prefix(schema), options(partitioning: partitioning)])

"create table(:#{as_atom(table)}, primary_key: false#{opts}) do\n" <>
"create table(:#{as_atom(table)}, primary_key: false#{arguments}) do\n" <>
Enum.map_join(operations, "\n", fn operation -> operation.__struct__.up(operation) end) <>
"\nend"
end
Expand All @@ -40,6 +37,27 @@ defmodule AshPostgres.MigrationGenerator.Phase do
"drop table(:#{as_atom(table)}#{opts})"
end
end

def arguments(["",""]), do: ""
def arguments(arguments), do: ", " <> Enum.join(Enum.reject(arguments, &is_nil(&1)), ",")

def prefix(nil), do: nil
def prefix(schema), do: "prefix: #{schema}"

def options(_options, _acc \\ [])
def options([], []), do: ""
def options([], acc), do: "options: \"#{Enum.join(acc, " ")}\""

def options([{:partitioning, %{method: method, attribute: attribute}} | rest], acc) do
option = "PARTITION BY #{String.upcase(Atom.to_string(method))} (#{attribute})"

rest
|> options(acc ++ [option])
end

def options([_| rest], acc) do
options(rest, acc)
end
end

defmodule Alter do
Expand Down
79 changes: 79 additions & 0 deletions lib/partitioning.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
defmodule AshPostgres.Partitioning do
@moduledoc false

@doc """
Create a new partition for a resource
"""
def create_partition(resource, opts) do
repo = AshPostgres.DataLayer.Info.repo(resource)

resource
|> AshPostgres.DataLayer.Info.partitioning_method()
|> case do
:range ->
create_range_partition(repo, resource, opts)

:list ->
create_list_partition(repo, resource, opts)

:hash ->
create_hash_partition(repo, resource, opts)

unsupported_method ->
raise "Invalid partition method, got: #{unsupported_method}"
end
end

@doc """
Check if partition exists
"""
def exists?(resource, opts) do
repo = AshPostgres.DataLayer.Info.repo(resource)
key = Keyword.fetch!(opts, :key)
table = AshPostgres.DataLayer.Info.table(resource)
partition_name = table <> "_" <> "#{key}"

partition_exists?(repo, resource, partition_name)
end

# TBI
defp create_range_partition(repo, resource, opts) do
end

defp create_list_partition(repo, resource, opts) do
key = Keyword.fetch!(opts, :key)
table = AshPostgres.DataLayer.Info.table(resource)
partition_name = table <> "_" <> "#{key}"

if partition_exists?(repo, resource, partition_name) do
{:error, :allready_exists}
else
Ecto.Adapters.SQL.query(
repo,
"CREATE TABLE #{partition_name} PARTITION OF public.#{table} FOR VALUES IN (#{key})"
)

if partition_exists?(repo, resource, partition_name) do
:ok
else
{:error, "Unable to create partition"}
end
end
end

# TBI
defp create_hash_partition(repo, resource, opts) do
end

defp partition_exists?(repo, resource, parition_name) do
%Postgrex.Result{} =
result =
repo
|> Ecto.Adapters.SQL.query!(
"select table_name from information_schema.tables t where t.table_schema = 'public' and t.table_name = $1",
[parition_name]
)

result.num_rows > 0
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"attributes": [
{
"allow_nil?": false,
"default": "fragment(\"gen_random_uuid()\")",
"generated?": false,
"primary_key?": true,
"references": null,
"size": null,
"source": "id",
"type": "uuid"
},
{
"allow_nil?": false,
"default": "1",
"generated?": false,
"primary_key?": true,
"references": null,
"size": null,
"source": "key",
"type": "bigint"
}
],
"base_filter": null,
"check_constraints": [],
"custom_indexes": [],
"custom_statements": [],
"has_create_action": false,
"hash": "7FE5D9659135887A47FAE2729CEB0281FA8FF392EDB3B43426EAFD89A1518FEB",
"identities": [],
"multitenancy": {
"attribute": null,
"global": null,
"strategy": null
},
"partitioning": {
"attribute": "key",
"method": "list"
},
"repo": "Elixir.AshPostgres.TestRepo",
"schema": null,
"table": "partitioned_posts"
}
20 changes: 20 additions & 0 deletions priv/test_repo/migrations/20250214114101_partitioned_post.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule AshPostgres.TestRepo.Migrations.PartitionedPost do
@moduledoc """
Updates resources based on their most recent snapshots.

This file was autogenerated with `mix ash_postgres.generate_migrations`
"""

use Ecto.Migration

def up do
create table(:partitioned_posts, primary_key: false, options: "PARTITION BY LIST (key)") do
add(:id, :uuid, null: false, default: fragment("gen_random_uuid()"), primary_key: true)
add(:key, :bigint, null: false, default: 1, primary_key: true)
end
end

def down do
drop(table(:partitioned_posts))
end
end
50 changes: 50 additions & 0 deletions test/migration_generator_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,56 @@ defmodule AshPostgres.MigrationGeneratorTest do
end
end

describe "creating initial snapshots for resources with partitioning" do
setup do
on_exit(fn ->
File.rm_rf!("test_snapshots_path")
File.rm_rf!("test_migration_path")
end)

defposts do
postgres do
partitioning do
method(:list)
attribute(:title)
end
end

attributes do
uuid_primary_key(:id)
attribute(:title, :string, public?: true)
end
end

defdomain([Post])

AshPostgres.MigrationGenerator.generate(Domain,
snapshot_path: "test_snapshots_path",
migration_path: "test_migration_path",
quiet: false,
format: false
)

:ok
end

test "the migration sets up resources correctly" do
# the snapshot exists and contains valid json
assert File.read!(Path.wildcard("test_snapshots_path/test_repo/posts/*.json"))
|> Jason.decode!(keys: :atoms!)

assert [file] =
Path.wildcard("test_migration_path/**/*_migrate_resources*.exs")
|> Enum.reject(&String.contains?(&1, "extensions"))

file_contents = File.read!(file)

# the migration creates the table with options specifing how to partition the table
assert file_contents =~
~S{create table(:posts, primary_key: false, options: "PARTITION BY LIST (title)") do}
end
end

describe "custom_indexes with `concurrently: true`" do
setup do
on_exit(fn ->
Expand Down
Loading