diff --git a/README.md b/README.md index 8633bcb..2604262 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ - SASL - Producer System + - Revisit producer epoch on transactions - Implement test helper functions (assert_produced) - Improve input errors handling - Accept more versions of the protocol diff --git a/guides/examples/client_configuration.md b/guides/examples/client_configuration.md index 8441d6f..157a971 100644 --- a/guides/examples/client_configuration.md +++ b/guides/examples/client_configuration.md @@ -61,8 +61,9 @@ This client will connect to brokers using ssl connection, `connect_opts` and `so ] ``` -This client will have a total of 3 producers, the default one plus the other 2 defined in the configuration. You can see all the configuration options for the producers in `Klife.Producer`, messages produced to `my_topic_0` and `my_topic_1` will use `my_linger_ms_producer` and `my_custom_client_id_producer` respectively if no producer is set on opts. All other topics keep using the default producer. +This client will have a total of 3 producers, the default one plus the other 2 defined in the configuration. You can see all the configuration options for the producers in `Klife.Producer`. +Messages produced to `my_topic_0` and `my_topic_1` will use `my_linger_ms_producer` and `my_custom_client_id_producer` respectively if no producer is set on opts. All other topics keep using the default producer. ## Defining and using custom partitioner diff --git a/lib/klife/client.ex b/lib/klife/client.ex index 75c9df2..5295293 100644 --- a/lib/klife/client.ex +++ b/lib/klife/client.ex @@ -295,7 +295,6 @@ defmodule Klife.Client do """ @callback produce_batch(list_of_records, opts :: Keyword.t()) :: list({:ok | :error, record}) - @doc group: "Producer API" @doc """ Produce a batch of records asynchronoulsy. @@ -503,8 +502,13 @@ defmodule Klife.Client do def produce(%Record{} = rec, opts \\ []), do: Klife.produce(rec, __MODULE__, opts) def produce_batch(recs, opts \\ []), do: Klife.produce_batch(recs, __MODULE__, opts) - def produce_async(%Record{} = rec, opts \\ []), do: Klife.produce_async(rec, __MODULE__, opts) - def produce_batch_async(recs, opts \\ []), do: Klife.produce_batch_async(recs, __MODULE__, opts) + + def produce_async(%Record{} = rec, opts \\ []), + do: Klife.produce_async(rec, __MODULE__, opts) + + def produce_batch_async(recs, opts \\ []), + do: Klife.produce_batch_async(recs, __MODULE__, opts) + def produce_batch_txn(recs, opts \\ []), do: Klife.produce_batch_txn(recs, __MODULE__, opts) def transaction(fun, opts \\ []), do: Klife.transaction(fun, __MODULE__, opts) end diff --git a/lib/klife/producer/txn_producer_pool.ex b/lib/klife/producer/txn_producer_pool.ex index 562cadc..76a0639 100644 --- a/lib/klife/producer/txn_producer_pool.ex +++ b/lib/klife/producer/txn_producer_pool.ex @@ -92,13 +92,15 @@ defmodule Klife.TxnProducerPool do # if we get here we should probally just restart the pool {:error, {:unkown_producer, client_name, producer_name}} - _ -> + {pid, _} -> worker = %__MODULE__.WorkerState{ client_name: client_name, producer_name: producer_name, worker_id: worker_id } + Process.link(pid) + {:ok, worker, %{pool_state | worker_counter: worker_id}} end end diff --git a/lib/klife/utils.ex b/lib/klife/utils.ex index 8fb437a..82e24f0 100644 --- a/lib/klife/utils.ex +++ b/lib/klife/utils.ex @@ -31,8 +31,8 @@ defmodule Klife.Utils do defp do_create_topics(init_time) do case create_topics_call() do - :ok -> - :ok + {:ok, res} -> + {:ok, res} :error -> now = System.monotonic_time(:millisecond) @@ -118,7 +118,7 @@ defmodule Klife.Utils do {:ok, %{content: content}} -> case Enum.filter(content.topics, fn e -> e.error_code not in [0, 36] end) do [] -> - :ok + {:ok, Enum.map(content.topics, fn %{name: topic} -> topic end)} err -> {:error, err} diff --git a/lib/mix/tasks/benchmark.ex b/lib/mix/tasks/benchmark.ex index 73da7f5..7a8fb69 100644 --- a/lib/mix/tasks/benchmark.ex +++ b/lib/mix/tasks/benchmark.ex @@ -5,7 +5,7 @@ if Mix.env() in [:dev] do def run(args) do Application.ensure_all_started(:klife) - :ok = Klife.Utils.create_topics() + {:ok, _topics} = Klife.Utils.create_topics() opts = [strategy: :one_for_one, name: Benchmark.Supervisor] {:ok, _} = Supervisor.start_link([MyClient], opts) diff --git a/test/test_helper.exs b/test/test_helper.exs index 420a0bb..94b86b9 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,10 +1,22 @@ # ExUnit.configure(exclude: [cluster_change: true]) -:ok = Klife.Utils.create_topics() +{:ok, topics} = Klife.Utils.create_topics() opts = [strategy: :one_for_one, name: Test.Supervisor] {:ok, _} = Supervisor.start_link([MyClient], opts) :ok = Klife.TestUtils.wait_producer(MyClient) +# Enum.each(topics, fn topic -> +# warmup_rec = %Klife.Record{ +# value: "warmup", +# partition: 1, +# topic: topic +# } + +# {:ok, %Klife.Record{}} = MyClient.produce(warmup_rec) +# end) + +# Process.sleep(10_000) + ExUnit.start()