Skip to content

Commit

Permalink
chore: add some docs
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Jun 14, 2024
1 parent 5fc9a4e commit 5b99863
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 14 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
- SASL

- Producer System
- Improve input errors handling
- Standardize options handling
- Rename cluster to client
- Add default producer and partition as client option
- Implement test helper functions (assert_produced)
- Improve input errors handling
- Accept more versions of the protocol
- OTEL
- Improve test coverage

- Consumer System (TBD)
Expand Down
87 changes: 86 additions & 1 deletion guides/examples/cluster_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,89 @@ This cluster will connect to brokers using non ssl connection and produce messag

This cluster will connect to brokers using ssl connection, `connect_opts` and `socket_opts` are forwarded to erlang module `:ssl` in order to proper configure the socket. See the documentation for more details.

## TODO: DO MORE EXAMPLES
## Defining multiple producers

```elixir
config :my_app, MyApp.Cluster,
connection: [
bootstrap_servers: ["localhost:19092", "localhost:29092"],
ssl: false
],
producers: [
[
name: :my_linger_ms_producer,
linger_ms: 1_000
],
[
name: :my_custom_client_id_producer,
client_id: "my_custom_client_id",
]
],
topics: [
[
name: "my_topic_0",
default_producer: :my_linger_ms_producer
],
[
name: "my_topic_1",
default_producer: :my_custom_client_id_producer
]
]
```

This cluster will have a total of 3 producers, the default one plus the other 2 defined in the configuration. You can see all the configuration options for the producers in `Klife.Producer`.


## Defining custom partitioner

First you need to implement a module following the `Klife.Behaviours.Partitioner` behaviour.

```elixir
defmodule MyApp.MyCustomPartitioner do
@behaviour Klife.Behaviours.Partitioner

alias Klife.Record

@impl true
def get_partition(%Record{} = record, max_partition) do
# Some logic to find the partition here!
end
end

```

Then, you need to use it on your configuration.

```elixir
config :my_app, MyApp.Cluster,
connection: [
bootstrap_servers: ["localhost:19092", "localhost:29092"],
ssl: false
],
topics: [
[
name: "my_topic_0",
default_partitioner: MyApp.MyCustomPartitioner
]
]
```

On this cluster, the records produced without a specific partition will have a partition assigned using the `MyApp.MyCustomPartitioner` module.

## Defining multiple txn pools

```elixir
config :my_app, MyApp.Cluster,
connection: [
bootstrap_servers: ["localhost:19092", "localhost:29092"],
ssl: false
],
default_txn_pool: :my_txn_pool,
txn_pools: [
[name: :my_txn_pool, base_txn_id: "my_custom_base_txn_id"],
[name: :my_txn_pool_2, txn_timeout_ms: :timer.seconds(120)]
]
topics: [[name: "my_topic_0"]]
```

This cluster will have a total of 3 txn pools, the default one plus the other two defined in the configuration. You can see all the configuration options for the producers in `Klife.TxnProducerPool`.
11 changes: 9 additions & 2 deletions lib/klife.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,17 @@ defmodule Klife do
end

def transaction(fun, cluster, opts \\ []) do
TxnProducerPool.run_txn(cluster, get_txn_pool(opts), fun)
TxnProducerPool.run_txn(cluster, get_txn_pool(cluster, opts), fun)
end

defp get_txn_pool(opts), do: Keyword.get(opts, :txn_pool, Klife.Cluster.default_txn_pool_name())
def in_txn?(cluster), do: TxnProducerPool.in_txn?(cluster)

defp get_txn_pool(cluster, opts) do
case Keyword.get(opts, :pool_name) do
nil -> apply(cluster, :get_default_txn_pool, [])
val -> val
end
end

defp maybe_add_partition(%Record{} = record, cluster, opts) do
case record do
Expand Down
30 changes: 25 additions & 5 deletions lib/klife/cluster.ex
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
defmodule Klife.Cluster do
# TODO: Rethink cluster name. Maybe client?
alias Klife
alias Klife.Record

@type record :: Klife.Record.t()
@type list_of_records :: list(record)

@default_producer_name :klife_default_producer
@default_txn_pool :klife_default_txn_pool

@doc false
def default_producer_name(), do: :klife_default_producer
def default_producer_name(), do: @default_producer_name

@doc false
def default_txn_pool_name(), do: :klife_default_txn_pool
def default_txn_pool_name(), do: @default_txn_pool

@input_options [
connection: [
type: :non_empty_keyword_list,
required: true,
keys: Klife.Connection.Controller.get_opts()
],
default_txn_pool: [
type: :atom,
required: false,
default: @default_txn_pool,
doc:
"Name of the txn pool to be used on transactions when a `:pool_name` is not provided as an option."
],
txn_pools: [
type: {:list, {:keyword_list, Klife.TxnProducerPool.get_opts()}},
type_doc: "List of `Klife.TxnProducerPool` configurations",
Expand Down Expand Up @@ -161,13 +172,16 @@ defmodule Klife.Cluster do
@doc group: "Producer API"
@callback produce_batch(list_of_records, opts :: Keyword.t()) :: list({:ok | :error, record})

@doc group: "Producer API"
@doc group: "Transaction API"
@callback produce_batch_txn(list_of_records, opts :: Keyword.t()) ::
{:ok, list_of_records} | {:error, list_of_records}

@doc group: "Producer API"
@doc group: "Transaction API"
@callback transaction(fun :: function(), opts :: Keyword.t()) :: any()

@doc group: "Transaction API"
@callback in_txn?() :: true | false

defmacro __using__(opts) do
input_opts = @input_options

Expand All @@ -184,8 +198,11 @@ defmodule Klife.Cluster do
Supervisor.start_link(__MODULE__, args, name: __MODULE__)
end

defp default_txn_pool_key(), do: {__MODULE__, :default_txn_pool}
def get_default_txn_pool(), do: :persistent_term.get(default_txn_pool_key())

@doc false
@impl true
@impl Supervisor
def init(_args) do
config = Application.get_env(@otp_app, __MODULE__)

Expand Down Expand Up @@ -223,13 +240,16 @@ defmodule Klife.Cluster do
{Klife.Producer.Supervisor, producer_opts}
]

:persistent_term.put(default_txn_pool_key(), validated_opts[:default_txn_pool])

Supervisor.init(children, strategy: :one_for_one)
end

def produce(%Record{} = rec, opts \\ []), do: Klife.produce(rec, __MODULE__, opts)
def produce_batch(recs, opts \\ []), do: Klife.produce_batch(recs, __MODULE__, opts)
def produce_batch_txn(recs, opts \\ []), do: Klife.produce_batch_txn(recs, __MODULE__, opts)
def transaction(fun, opts \\ []), do: Klife.transaction(fun, __MODULE__, opts)
def in_txn?(), do: Klife.in_txn?(__MODULE__)
end
end
end
2 changes: 1 addition & 1 deletion lib/klife/producer/default_partitioner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Klife.Producer.DefaultPartitioner do

@impl Klife.Behaviours.Partitioner
def get_partition(%Record{key: nil}, max_partition),
do: Enum.random(0..max_partition)
do: :rand.uniform(max_partition + 1) - 1

def get_partition(%Record{key: key}, max_partition),
do: :erlang.phash2(key, max_partition + 1)
Expand Down
1 change: 1 addition & 0 deletions lib/klife/producer/producer.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule Klife.Producer do
@doc false
use GenServer

import Klife.ProcessRegistry, only: [via_tuple: 1, registry_lookup: 1]
Expand Down
2 changes: 2 additions & 0 deletions lib/klife/producer/txn_producer_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ defmodule Klife.TxnProducerPool do
new_state
end

@doc false
def start_link(args) do
NimblePool.start_link(
worker: {__MODULE__, args},
Expand All @@ -364,6 +365,7 @@ defmodule Klife.TxnProducerPool do
)
end

@doc false
def child_spec(args) do
%{
id: pool_name(args.cluster_name, args.name),
Expand Down
6 changes: 3 additions & 3 deletions test/producer/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ defmodule Klife.ProducerTest do

{:ok, resp}
end,
txn_pool: :my_test_pool_1
pool_name: :my_test_pool_1
)

assert_offset(MyCluster, rec1, offset1, txn_status: :committed)
Expand Down Expand Up @@ -998,7 +998,7 @@ defmodule Klife.ProducerTest do
Process.put(:raised_offsets, {offset3, offset4, offset5})
raise "crazy error"
end,
txn_pool: :my_test_pool_1
pool_name: :my_test_pool_1
)

{offset3, offset4, offset5} = Process.get(:raised_offsets)
Expand All @@ -1020,7 +1020,7 @@ defmodule Klife.ProducerTest do

{:ok, resp}
end,
txn_pool: :my_test_pool_1
pool_name: :my_test_pool_1
)

assert_offset(MyCluster, rec6, offset6, txn_status: :committed)
Expand Down

0 comments on commit 5b99863

Please sign in to comment.