diff --git a/test/kayrock/client/produce_test.exs b/test/integration/compression_test.exs similarity index 53% rename from test/kayrock/client/produce_test.exs rename to test/integration/compression_test.exs index 8a0bc44..dbd0360 100644 --- a/test/kayrock/client/produce_test.exs +++ b/test/integration/compression_test.exs @@ -1,95 +1,13 @@ -defmodule Kayrock.Client.ProduceTest do - use Kayrock.ClientCase +defmodule Kayrock.Client.CompressionTest do + use Kayrock.IntegrationCase + use ExUnit.Case, async: true - alias Kayrock.RecordBatch - alias Kayrock.RecordBatch.Record - alias Kayrock.RecordBatch.RecordHeader + import Kayrock.TestSupport + import Kayrock.RequestFactory - test "gzip produce works", %{client: client} do - {:ok, topic} = ensure_test_topic(client, "simple_produce") + container(:kafka, KafkaContainer.new(), shared: true) - record_batch = RecordBatch.from_binary_list(["foo", "bar", "baz"], :gzip) - {:ok, _resp} = Kayrock.produce(client, record_batch, topic, 0) - - offset = Kayrock.Convenience.partition_last_offset(client, topic, 0) - - {:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1) - - [main_resp] = resp.responses - [partition_resp] = main_resp.partition_responses - - [ - %RecordBatch{ - partition_leader_epoch: partition_leader_epoch, - records: [%Record{offset: first_offset} | _] - } - | _ - ] = partition_resp.record_set - - assert resp == %Kayrock.Fetch.V4.Response{ - correlation_id: 4, - responses: [ - %{ - partition_responses: [ - %{ - partition_header: %{ - aborted_transactions: [], - error_code: 0, - high_watermark: offset, - last_stable_offset: offset, - partition: 0 - }, - record_set: [ - %Kayrock.RecordBatch{ - attributes: 1, - base_sequence: -1, - batch_length: 94, - batch_offset: first_offset, - crc: 1_821_682_799, - first_timestamp: -1, - last_offset_delta: 2, - max_timestamp: -1, - partition_leader_epoch: partition_leader_epoch, - producer_epoch: -1, - producer_id: -1, - records: [ - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset, - timestamp: -1, - value: "foo" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 1, - timestamp: -1, - value: "bar" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 2, - timestamp: -1, - value: "baz" - } - ] - } - ] - } - ], - topic: "simple_produce" - } - ], - throttle_time_ms: 0 - } - end - - describe "with snappy compression" do + describe "with compression" do setup do on_exit(fn -> Application.put_env(:kayrock, :snappy_module, :snappy) @@ -98,25 +16,105 @@ defmodule Kayrock.Client.ProduceTest do :ok end - test "using snappyer produce works", %{client: client} do - Application.put_env(:kayrock, :snappy_module, :snappyer) + test "gzip produce works", %{kafka: kafka} do + client_pid = build_client(kafka) + topic_name = create_topic(client_pid) - {:ok, topic} = ensure_test_topic(client, "simple_produce") + record_batch = Kayrock.RecordBatch.from_binary_list(["foo", "bar", "baz"], :gzip) + {:ok, _resp} = Kayrock.produce(client_pid, record_batch, topic_name, 0) + offset = Kayrock.Convenience.partition_last_offset(client_pid, topic_name, 0) + {:ok, resp} = Kayrock.fetch(client_pid, topic_name, 0, offset - 1) + [main_resp] = resp.responses + [partition_resp] = main_resp.partition_responses - record_batch = RecordBatch.from_binary_list(["foo", "bar", "baz"], :snappy) - {:ok, _resp} = Kayrock.produce(client, record_batch, topic, 0) + [ + %Kayrock.RecordBatch{ + partition_leader_epoch: partition_leader_epoch, + records: [%Kayrock.RecordBatch.Record{offset: first_offset} | _] + } + | _ + ] = partition_resp.record_set - offset = Kayrock.Convenience.partition_last_offset(client, topic, 0) + assert resp == %Kayrock.Fetch.V4.Response{ + correlation_id: 4, + responses: [ + %{ + partition_responses: [ + %{ + partition_header: %{ + aborted_transactions: [], + error_code: 0, + high_watermark: offset, + last_stable_offset: offset, + partition: 0 + }, + record_set: [ + %Kayrock.RecordBatch{ + attributes: 1, + base_sequence: -1, + batch_length: 94, + batch_offset: first_offset, + crc: 1_821_682_799, + first_timestamp: -1, + last_offset_delta: 2, + max_timestamp: -1, + partition_leader_epoch: partition_leader_epoch, + producer_epoch: -1, + producer_id: -1, + records: [ + %Kayrock.RecordBatch.Record{ + attributes: 0, + headers: [], + key: nil, + offset: first_offset, + timestamp: -1, + value: "foo" + }, + %Kayrock.RecordBatch.Record{ + attributes: 0, + headers: [], + key: nil, + offset: first_offset + 1, + timestamp: -1, + value: "bar" + }, + %Kayrock.RecordBatch.Record{ + attributes: 0, + headers: [], + key: nil, + offset: first_offset + 2, + timestamp: -1, + value: "baz" + } + ] + } + ] + } + ], + topic: topic_name + } + ], + throttle_time_ms: 0 + } + end - {:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1) + test "using snappyer produce works", %{kafka: kafka} do + Application.put_env(:kayrock, :snappy_module, :snappyer) + client_pid = build_client(kafka) + topic_name = create_topic(client_pid) + record_batch = Kayrock.RecordBatch.from_binary_list(["foo", "bar", "baz"], :snappy) + {:ok, _resp} = Kayrock.produce(client_pid, record_batch, topic_name, 0) + offset = Kayrock.Convenience.partition_last_offset(client_pid, topic_name, 0) + + {:ok, resp} = Kayrock.fetch(client_pid, topic_name, 0, offset - 1) [main_resp] = resp.responses [partition_resp] = main_resp.partition_responses [ - %RecordBatch{ + %Kayrock.RecordBatch{ partition_leader_epoch: partition_leader_epoch, - records: [%Record{offset: first_offset} | _] + records: [%Kayrock.RecordBatch.Record{offset: first_offset} | _] } | _ ] = partition_resp.record_set @@ -184,23 +182,22 @@ defmodule Kayrock.Client.ProduceTest do } end - test "using snappy-erlang-nif produce works", %{client: client} do - {:ok, topic} = ensure_test_topic(client, "simple_produce") - - record_batch = RecordBatch.from_binary_list(["foo", "bar", "baz"], :snappy) - {:ok, _resp} = Kayrock.produce(client, record_batch, topic, 0) + test "using snappy-erlang-nif produce works", %{kafka: kafka} do + client_pid = build_client(kafka) + topic_name = create_topic(client_pid) - offset = Kayrock.Convenience.partition_last_offset(client, topic, 0) - - {:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1) + record_batch = Kayrock.RecordBatch.from_binary_list(["foo", "bar", "baz"], :snappy) + {:ok, _resp} = Kayrock.produce(client_pid, record_batch, topic_name, 0) + offset = Kayrock.Convenience.partition_last_offset(client_pid, topic_name, 0) + {:ok, resp} = Kayrock.fetch(client_pid, topic_name, 0, offset - 1) [main_resp] = resp.responses [partition_resp] = main_resp.partition_responses [ - %RecordBatch{ + %Kayrock.RecordBatch{ partition_leader_epoch: partition_leader_epoch, - records: [%Record{offset: first_offset} | _] + records: [%Kayrock.RecordBatch.Record{offset: first_offset} | _] } | _ ] = partition_resp.record_set @@ -268,4 +265,16 @@ defmodule Kayrock.Client.ProduceTest do } end end + + defp build_client(kafka) do + uris = [{"localhost", Container.mapped_port(kafka, 9092)}] + Kayrock.Client.start_link(uris) + end + + defp create_topic(client_pid) do + topic_name = unique_string() + create_request = create_topic_request(topic_name, 5) + {:ok, _} = Kayrock.client_call(client_pid, create_request, :controller) + topic_name + end end diff --git a/test/integration/producer_test.exs b/test/integration/producer_test.exs index 6091ad0..4e5fc89 100644 --- a/test/integration/producer_test.exs +++ b/test/integration/producer_test.exs @@ -3,7 +3,6 @@ defmodule Kayrock.Integration.ProducerTest do use ExUnit.Case, async: true import Kayrock.TestSupport - import Kayrock.Convenience import Kayrock.RequestFactory container(:kafka, KafkaContainer.new(), shared: true)