diff --git a/config/config.exs b/config/config.exs index 8450518..ef5e6af 100644 --- a/config/config.exs +++ b/config/config.exs @@ -30,7 +30,12 @@ config :klife, MyClient, producers: [ [ name: :benchmark_producer, - client_id: "my_custom_client_id" + client_id: "my_custom_client_id", + ], + [ + name: :async_benchmark_producer, + client_id: "my_custom_client_id", + batchers_count: 4 ], [ name: :benchmark_producer_in_flight, @@ -68,6 +73,18 @@ config :klife, MyClient, name: "benchmark_topic_2", default_producer: :benchmark_producer ], + [ + name: "async_benchmark_topic_0", + default_producer: :async_benchmark_producer + ], + [ + name: "async_benchmark_topic_1", + default_producer: :async_benchmark_producer + ], + [ + name: "async_benchmark_topic_2", + default_producer: :async_benchmark_producer + ], [ name: "benchmark_topic_in_flight", default_producer: :benchmark_producer_in_flight diff --git a/lib/klife/testing.ex b/lib/klife/testing.ex index 461bc6f..1a58649 100644 --- a/lib/klife/testing.ex +++ b/lib/klife/testing.ex @@ -201,7 +201,7 @@ defmodule Klife.Testing do end) end - defp get_latest_offsets(leader_id, metas, client_name) do + def get_latest_offsets(leader_id, metas, client_name) do content = %{ replica_id: -1, isolation_level: 1, diff --git a/lib/mix/tasks/benchmark.ex b/lib/mix/tasks/benchmark.ex index e977be2..fb3876b 100644 --- a/lib/mix/tasks/benchmark.ex +++ b/lib/mix/tasks/benchmark.ex @@ -2,6 +2,9 @@ if Mix.env() in [:dev] do defmodule Mix.Tasks.Benchmark do use Mix.Task + alias Klife.Producer.Controller, as: PController + alias Klife.TestUtils.AsyncProducerBenchmark + def run(args) do Application.ensure_all_started(:klife) @@ -78,6 +81,10 @@ if Mix.env() in [:dev] do ) end + def do_run_bench("producer_async", parallel) do + AsyncProducerBenchmark.run(["klife", "erlkaf", "brod"]) + end + def do_run_bench("producer_sync", parallel) do %{ records_0: records_0, diff --git a/mix.exs b/mix.exs index ccd5c86..d3fdf14 100644 --- a/mix.exs +++ b/mix.exs @@ -73,6 +73,7 @@ defmodule Klife.MixProject do {:benchee, "~> 1.0", only: :dev, runtime: false}, {:kafka_ex, "~> 0.13", only: :dev}, {:brod, "~> 3.16", only: :dev}, + {:erlkaf, "~> 2.1.6", only: :dev}, {:observer_cli, "~> 1.7", only: :dev} ] end diff --git a/mix.lock b/mix.lock index 54e30d3..ac54637 100644 --- a/mix.lock +++ b/mix.lock @@ -3,9 +3,13 @@ "brod": {:hex, :brod, "3.17.0", "437daa5204a2175a3f6d01ee31152ca881539ca90acdf123d69835577f6133b1", [:rebar3], [{:kafka_protocol, "4.1.3", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:snappyer, "1.2.9", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "1bf5eb9d1bad1140f97b9d0c5a819ceb30414231cb7f5ad5d5e18201cfaf09f4"}, "connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, "crc32cer": {:hex, :crc32cer, "0.1.8", "c6c2275c5fb60a95f4935d414f30b50ee9cfed494081c9b36ebb02edfc2f48db", [:rebar3], [], "hexpm", "251499085482920deb6c9b7aadabf9fb4c432f96add97ab42aee4501e5b6f591"}, + "datum": {:hex, :datum, "4.6.1", "93b131203a60cfea9ffff6435a50dc24239f689dfebb76e6aecf6ce689efe8f4", [:rebar3], [], "hexpm", "e14340f8280fedb1731d5cd6e9f5aeaa14b880c51f0b3dc16c42c6671c167e4d"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, + "erlkaf": {:hex, :erlkaf, "2.1.6", "fb9aed863f09249dc549135391f5d173d1a1064bf222dc14a74c92fe3408cd60", [:rebar3], [{:esq, "2.0.6", [hex: :esq, repo: "hexpm", optional: false]}, {:jsone, "1.8.1", [hex: :jsone, repo: "hexpm", optional: false]}], "hexpm", "22ab3e870e78b6d16ae1ffc3fee0c155c9179fcd6e2f2a703398d6fb677ddff9"}, + "esq": {:hex, :esq, "2.0.6", "9917e1a731c609b42624a4bb8594a25d537ea30e7b55d46cd46fa1b95e6db675", [:rebar3], [{:datum, "~> 4.6.1", [hex: :datum, repo: "hexpm", optional: false]}, {:pipe, "~> 2.0.1", [hex: :pipes, repo: "hexpm", optional: false]}, {:uid, "~> 1.3.4", [hex: :uid, repo: "hexpm", optional: false]}], "hexpm", "3b798da50c508fe93248dbbd64d3d2cb618cab5387e66515ab83cadf2b1abac1"}, "ex_doc": {:hex, :ex_doc, "0.34.0", "ab95e0775db3df71d30cf8d78728dd9261c355c81382bcd4cefdc74610bef13e", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "60734fb4c1353f270c3286df4a0d51e65a2c1d9fba66af3940847cc65a8066d7"}, + "jsone": {:hex, :jsone, "1.8.1", "6bc74d3863d55d420077346da97c601711017a057f2fd1df65d6d65dd562fbab", [:rebar3], [], "hexpm", "c78918124148c51a7a84c678e39bbc6281f8cb582f1d88584628a98468e99738"}, "kafka_ex": {:hex, :kafka_ex, "0.13.0", "2bfaf3c81d4ee01ed2088cb09e46c070c245f60f5752ec7043f29e807f6679ec", [:mix], [{:kayrock, "~> 0.1.12", [hex: :kayrock, repo: "hexpm", optional: false]}], "hexpm", "8a806eee5cd8191f45870b2ef4b3f4f52c57d798039f2d3fc602ce47053db7b9"}, "kafka_protocol": {:hex, :kafka_protocol, "4.1.3", "362d85a898d4148a43dbabb10a30bb2d6ff32ba0097eb06981d11b34e2e0a9cd", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "28cf73001270d972524dd0fad4a59074f4441219f9cf237ad808a2ac1ec97487"}, "kayrock": {:hex, :kayrock, "0.1.15", "61ce03b65dd2236479357ca4162f43fe3a42923b39fbb6551a16d57cf2b93072", [:mix], [{:connection, "~> 1.1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~> 0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~> 1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm", "61d7b3579db68e61c26f316b9246e0231b878148bb1887adc59fecedcbc46c12"}, @@ -17,8 +21,10 @@ "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "observer_cli": {:hex, :observer_cli, "1.7.4", "3c1bfb6d91bf68f6a3d15f46ae20da0f7740d363ee5bc041191ce8722a6c4fae", [:mix, :rebar3], [{:recon, "~> 2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "50de6d95d814f447458bd5d72666a74624eddb0ef98bdcee61a0153aae0865ff"}, + "pipe": {:hex, :pipes, "2.0.1", "a2b56796c63690ed0e78bb77bb389af250bd70afa15a6869369dbdc11087d68f", [:rebar3], [], "hexpm", "623357a158e4c33ee589d4c735ddbab9c77a04e85159192e4d42f1dc97c60bd9"}, "recon": {:hex, :recon, "2.5.5", "c108a4c406fa301a529151a3bb53158cadc4064ec0c5f99b03ddb8c0e4281bdf", [:mix, :rebar3], [], "hexpm", "632a6f447df7ccc1a4a10bdcfce71514412b16660fe59deca0fcf0aa3c054404"}, "snappyer": {:hex, :snappyer, "1.2.9", "9cc58470798648ce34c662ca0aa6daae31367667714c9a543384430a3586e5d3", [:rebar3], [], "hexpm", "18d00ca218ae613416e6eecafe1078db86342a66f86277bd45c95f05bf1c8b29"}, "statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"}, + "uid": {:hex, :uid, "1.3.4", "42e30e22908e8e2faa6227e9c261f1954cb540be3c5a139e112369ae6cc451fc", [:rebar3], [], "hexpm", "f8388ef93b16a5d5f9977e1fe814ae0acf5529b1e0ee5d7b18d23cb4c0f87eaa"}, "varint": {:hex, :varint, "1.2.0", "61bffd9dcc2d5242d59f75694506b4d4013bb103f6a23e34b94f89cebb0c1ab3", [:mix], [], "hexpm", "d94941ed8b9d1a5fdede9103a5e52035bd0aaf35081d44e67713a36799927e47"}, } diff --git a/test/support/async_producer_benchmark.ex b/test/support/async_producer_benchmark.ex new file mode 100644 index 0000000..4499304 --- /dev/null +++ b/test/support/async_producer_benchmark.ex @@ -0,0 +1,178 @@ +defmodule Klife.TestUtils.AsyncProducerBenchmark do + require Logger + + alias Klife.Producer.Controller, as: PController + + @number_of_records 5_000_000 + + def run(clients) do + sample_data = generate_data() + + topics = [ + List.first(sample_data.records_0).topic, + List.first(sample_data.records_1).topic, + List.first(sample_data.records_2).topic + ] + + records = sample_data.records_0 ++ sample_data.records_1 ++ sample_data.records_2 + + client_results = Enum.map(clients, &run_benchmark(&1, topics, records)) + + results = Enum.zip(clients, client_results) |> Map.new() + IO.puts("Client | Result | Compared to klife") + Enum.each(results, fn {client, result} -> + IO.puts( + "#{client}\t| #{result} | x#{results_compared_to_klife(result, results)}" + ) + end) + end + + defp run_benchmark("erlkaf", topics, records) do + :erlkaf.start() + + producer_config = [ + bootstrap_servers: "localhost:19092", + max_in_flight: 1, + enable_idempotence: true, + sticky_partitioning_linger_ms: 0, + batch_size: 512_000 + ] + + :ok = :erlkaf.create_producer(:erlkaf_test_producer, producer_config) + + Task.start(fn -> + Enum.map(1..@number_of_records, fn _i -> + erlkaf_msg = Enum.random(records) + + :erlkaf.produce( + :erlkaf_test_producer, + erlkaf_msg.topic, + erlkaf_msg.key, + erlkaf_msg.value + ) + end) + + :ok + end) + + result = measurement_collector(topics) + + :erlkaf.stop() + + result + end + + defp run_benchmark("klife", topics, records) do + {:ok, client_pid} = + Task.start(fn -> + Enum.map(1..@number_of_records, fn _i -> + klife_msg = Enum.random(records) + MyClient.produce_async(klife_msg) + end) + end) + + result = measurement_collector(topics) + + Process.exit(client_pid, :kill) + + result + end + + defp run_benchmark("brod", topics, records) do + Task.async(fn -> + Enum.map(1..@number_of_records, fn _i -> + brod_msg = Enum.random(records) + + :brod.produce( + :kafka_client, + brod_msg.topic, + brod_msg.partition, + brod_msg.key, + brod_msg.value + ) + end) + end) + + result = measurement_collector(topics) + + :brod.stop() + + result + end + + defp measurement_collector(topics) do + starting_offset = get_total_offsets(topics) + + Process.sleep(10000) + + get_total_offsets(topics) - starting_offset + end + + defp get_total_offsets(topics), do: get_offset_by_topic(topics) |> Map.values() |> Enum.sum() + + defp get_offset_by_topic(topics) do + metas = PController.get_all_topics_partitions_metadata(MyClient) + + data_by_topic = + metas + |> Enum.group_by(fn m -> m.leader_id end) + |> Enum.flat_map(fn {leader_id, metas} -> + Klife.Testing.get_latest_offsets(leader_id, metas, MyClient) + end) + |> Enum.filter(fn {topic, _pdata} -> Enum.member?(topics, topic) end) + |> Enum.group_by(fn {topic, _pdata} -> topic end, fn {_topic, pdata} -> pdata end) + |> Enum.map(fn {k, v} -> + {k, List.flatten(v) |> Enum.map(fn {_p, offset} -> offset end) |> Enum.sum()} + end) + |> Map.new() + end + + defp generate_data() do + topic0 = "async_benchmark_topic_0" + topic1 = "async_benchmark_topic_1" + topic2 = "async_benchmark_topic_2" + + max_partition = 30 + + records_0 = + Enum.map(0..(max_partition - 1), fn p -> + %Klife.Record{ + value: :rand.bytes(1_000), + key: :rand.bytes(50), + topic: topic0, + partition: p + } + end) + + records_1 = + Enum.map(0..(max_partition - 1), fn p -> + %Klife.Record{ + value: :rand.bytes(1_000), + key: :rand.bytes(50), + topic: topic1, + partition: p + } + end) + + records_2 = + Enum.map(0..(max_partition - 1), fn p -> + %Klife.Record{ + value: :rand.bytes(1_000), + key: :rand.bytes(50), + topic: topic2, + partition: p + } + end) + + %{ + records_0: records_0, + records_1: records_1, + records_2: records_2, + max_partition: max_partition + } + end + + defp results_compared_to_klife(result, results) do + result / Map.get(results, "klife") |> Float.round(2) + end +end