Skip to content

Commit

Permalink
chore: add producer link on txn pool
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Aug 18, 2024
1 parent a4b4e26 commit 9a61930
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 10 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- SASL

- Producer System
- Revisit producer epoch on transactions
- Implement test helper functions (assert_produced)
- Improve input errors handling
- Accept more versions of the protocol
Expand Down
3 changes: 2 additions & 1 deletion guides/examples/client_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ This client will connect to brokers using ssl connection, `connect_opts` and `so
]
```

This client will have a total of 3 producers, the default one plus the other 2 defined in the configuration. You can see all the configuration options for the producers in `Klife.Producer`, messages produced to `my_topic_0` and `my_topic_1` will use `my_linger_ms_producer` and `my_custom_client_id_producer` respectively if no producer is set on opts. All other topics keep using the default producer.
This client will have a total of 3 producers, the default one plus the other 2 defined in the configuration. You can see all the configuration options for the producers in `Klife.Producer`.

Messages produced to `my_topic_0` and `my_topic_1` will use `my_linger_ms_producer` and `my_custom_client_id_producer` respectively if no producer is set on opts. All other topics keep using the default producer.

## Defining and using custom partitioner

Expand Down
10 changes: 7 additions & 3 deletions lib/klife/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ defmodule Klife.Client do
"""
@callback produce_batch(list_of_records, opts :: Keyword.t()) :: list({:ok | :error, record})


@doc group: "Producer API"
@doc """
Produce a batch of records asynchronoulsy.
Expand Down Expand Up @@ -503,8 +502,13 @@ defmodule Klife.Client do

def produce(%Record{} = rec, opts \\ []), do: Klife.produce(rec, __MODULE__, opts)
def produce_batch(recs, opts \\ []), do: Klife.produce_batch(recs, __MODULE__, opts)
def produce_async(%Record{} = rec, opts \\ []), do: Klife.produce_async(rec, __MODULE__, opts)
def produce_batch_async(recs, opts \\ []), do: Klife.produce_batch_async(recs, __MODULE__, opts)

def produce_async(%Record{} = rec, opts \\ []),
do: Klife.produce_async(rec, __MODULE__, opts)

def produce_batch_async(recs, opts \\ []),
do: Klife.produce_batch_async(recs, __MODULE__, opts)

def produce_batch_txn(recs, opts \\ []), do: Klife.produce_batch_txn(recs, __MODULE__, opts)
def transaction(fun, opts \\ []), do: Klife.transaction(fun, __MODULE__, opts)
end
Expand Down
4 changes: 3 additions & 1 deletion lib/klife/producer/txn_producer_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@ defmodule Klife.TxnProducerPool do
# if we get here we should probally just restart the pool
{:error, {:unkown_producer, client_name, producer_name}}

_ ->
{pid, _} ->
worker = %__MODULE__.WorkerState{
client_name: client_name,
producer_name: producer_name,
worker_id: worker_id
}

Process.link(pid)

{:ok, worker, %{pool_state | worker_counter: worker_id}}
end
end
Expand Down
6 changes: 3 additions & 3 deletions lib/klife/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ defmodule Klife.Utils do

defp do_create_topics(init_time) do
case create_topics_call() do
:ok ->
:ok
{:ok, res} ->
{:ok, res}

:error ->
now = System.monotonic_time(:millisecond)
Expand Down Expand Up @@ -118,7 +118,7 @@ defmodule Klife.Utils do
{:ok, %{content: content}} ->
case Enum.filter(content.topics, fn e -> e.error_code not in [0, 36] end) do
[] ->
:ok
{:ok, Enum.map(content.topics, fn %{name: topic} -> topic end)}

err ->
{:error, err}
Expand Down
2 changes: 1 addition & 1 deletion lib/mix/tasks/benchmark.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ if Mix.env() in [:dev] do
def run(args) do
Application.ensure_all_started(:klife)

:ok = Klife.Utils.create_topics()
{:ok, _topics} = Klife.Utils.create_topics()
opts = [strategy: :one_for_one, name: Benchmark.Supervisor]
{:ok, _} = Supervisor.start_link([MyClient], opts)

Expand Down
14 changes: 13 additions & 1 deletion test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
# ExUnit.configure(exclude: [cluster_change: true])

:ok = Klife.Utils.create_topics()
{:ok, topics} = Klife.Utils.create_topics()

Check warning on line 3 in test/test_helper.exs

View workflow job for this annotation

GitHub Actions / test (1.17.0, 27)

variable "topics" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 3 in test/test_helper.exs

View workflow job for this annotation

GitHub Actions / test (1.16.3, 26.1.2)

variable "topics" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 3 in test/test_helper.exs

View workflow job for this annotation

GitHub Actions / test (1.15.6, 24.3.4.13)

variable "topics" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 3 in test/test_helper.exs

View workflow job for this annotation

GitHub Actions / test (1.11.4, 21.3.8.24)

variable "topics" is unused (if the variable is not meant to be used, prefix it with an underscore)

opts = [strategy: :one_for_one, name: Test.Supervisor]
{:ok, _} = Supervisor.start_link([MyClient], opts)

:ok = Klife.TestUtils.wait_producer(MyClient)

# Enum.each(topics, fn topic ->
# warmup_rec = %Klife.Record{
# value: "warmup",
# partition: 1,
# topic: topic
# }

# {:ok, %Klife.Record{}} = MyClient.produce(warmup_rec)
# end)

# Process.sleep(10_000)

ExUnit.start()

0 comments on commit 9a61930

Please sign in to comment.