diff --git a/lib/kayrock/record_batch.ex b/lib/kayrock/record_batch.ex index 8093fdc..ea6d8aa 100644 --- a/lib/kayrock/record_batch.ex +++ b/lib/kayrock/record_batch.ex @@ -185,6 +185,17 @@ defmodule Kayrock.RecordBatch do defp deserialize(rest, acc, batch_offset, batch_length, partition_leader_epoch) do # we already parsed off 5 bytes in get_magic_byte real_size = batch_length - 5 + + deserialize(real_size, rest, acc, batch_offset, batch_length, partition_leader_epoch) + end + + # If the expected size to fetch is less than the rest of the body, we have + # fetched an incomplete record. Cowardly refuse to parse this record. + defp deserialize(real_size, rest, acc, _, _, _) when real_size > byte_size(rest) do + Enum.reverse(acc) + end + + defp deserialize(real_size, rest, acc, batch_offset, batch_length, partition_leader_epoch) do <<batch_data::size(real_size)-binary, body_rest::binary>> = rest <<crc::32-signed, attributes::16-signed, last_offset_delta::32-signed, @@ -236,15 +247,12 @@ defmodule Kayrock.RecordBatch do acc = [record_batch | acc] - case body_rest do - "" -> - Enum.reverse(acc) + case get_magic_byte(body_rest) do + {2, batch_offset, batch_length, partition_leader_epoch, new_rest} -> + deserialize(new_rest, acc, batch_offset, batch_length, partition_leader_epoch) _ -> - {2, batch_offset, batch_length, partition_leader_epoch, new_rest} = - get_magic_byte(body_rest) - - deserialize(new_rest, acc, batch_offset, batch_length, partition_leader_epoch) + Enum.reverse(acc) end end @@ -346,6 +354,9 @@ defmodule Kayrock.RecordBatch do # message_size: int32 # first_message crc: int32 # first_message magic: int8 + # Return early if we do not have a complete 17 bytes to parse from the record + defp get_magic_byte(msg_set_data) when byte_size(msg_set_data) < 17, do: nil + defp get_magic_byte(msg_set_data) do <<first_offset::64-signed, batch_length_or_message_size::32-signed, partition_leader_epoch_or_first_crc::32-signed, magic::8-signed, rest::bits>> = msg_set_data diff --git a/test/kayrock/fetch_test.exs b/test/kayrock/fetch_test.exs index fdf4082..c624887 100644 --- a/test/kayrock/fetch_test.exs +++ b/test/kayrock/fetch_test.exs @@ -1351,6 +1351,122 @@ defmodule Kayrock.FetchTest do assert got == expect end + test "test deserializing a record batch with an incomplete record" do + data = + <<0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 231, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, + 0, 255, 255, 255, 255, 0, 0, 1, 34, 0, 0, 0, 0, 0, 0, 0, 228, 0, 0, 0, 78, 0, 0, 0, 33, 2, + 106, 8, 42, 102, 0, 2, 0, 0, 0, 0, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 255, 0, 0, 0, 1, 27, 104, 52, 0, 0, 0, 0, 40, 75, 81, 76, 84, 67, 69, 67, 88, 68, 81, 66, + 82, 81, 67, 89, 73, 65, 68, 84, 79, 0, 0, 0, 0, 0, 0, 0, 0, 229, 0, 0, 0, 76, 0, 0, 0, 33, + 2, 232, 120, 45, 102, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 255, 255, 0, 0, 0, 1, 52, 0, 0, 0, 0, 40, 84, 87, 73, 73, 83, 71, 73, 73, 69, 72, 72, 72, + 79, 88, 70, 79, 74, 73, 86, 79, 0, 0, 0, 0, 0, 0, 0, 0, 230, 0, 0, 0, 98, 0, 0, 0, 33, 2, + 35, 151, 146, 115, 0, 2, 0, 0, 0, 0, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 255, 255, 0, 0, 0, 1, 130, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 29, + 27, 104, 52, 0, 0, 0, 0, 40, 72, 67, 85, 80, 72, 73, 85, 78, 75, 84, 67, 86, 82, 80, 66, + 86, 70, 77, 73, 82, 0, 0, 0>> + + expect = %Kayrock.Fetch.V5.Response{ + correlation_id: 4, + responses: [ + %{ + partition_responses: [ + %{ + partition_header: %{ + aborted_transactions: [], + error_code: 0, + high_watermark: 231, + last_stable_offset: -1, + log_start_offset: 0, + partition: 0 + }, + record_set: [ + %Kayrock.RecordBatch{ + attributes: 2, + base_sequence: -1, + batch_length: 78, + batch_offset: 228, + crc: 1_778_920_038, + first_timestamp: -1, + last_offset_delta: 0, + max_timestamp: -1, + partition_leader_epoch: 33, + producer_epoch: -1, + producer_id: -1, + records: [ + %Kayrock.RecordBatch.Record{ + attributes: 0, + headers: [], + key: "", + offset: 228, + timestamp: -1, + value: "KQLTCECXDQBRQCYIADTO" + } + ] + }, + %Kayrock.RecordBatch{ + attributes: 0, + base_sequence: -1, + batch_length: 76, + batch_offset: 229, + crc: -394_777_242, + first_timestamp: -1, + last_offset_delta: 0, + max_timestamp: -1, + partition_leader_epoch: 33, + producer_epoch: -1, + producer_id: -1, + records: [ + %Kayrock.RecordBatch.Record{ + attributes: 0, + headers: [], + key: "", + offset: 229, + timestamp: -1, + value: "TWIISGIIEHHHOXFOJIVO" + } + ] + }, + %Kayrock.RecordBatch{ + attributes: 2, + base_sequence: -1, + batch_length: 98, + batch_offset: 230, + crc: 597_135_987, + first_timestamp: -1, + last_offset_delta: 0, + max_timestamp: -1, + partition_leader_epoch: 33, + producer_epoch: -1, + producer_id: -1, + records: [ + %Kayrock.RecordBatch.Record{ + attributes: 0, + headers: [], + key: "", + offset: 230, + timestamp: -1, + value: "HCUPHIUNKTCVRPBVFMIR" + } + ] + } + ] + } + ], + topic: "food" + } + ], + throttle_time_ms: 0 + } + + {got, ""} = Kayrock.Fetch.V5.Response.deserialize(data) + assert got == expect + end + test "correctly handle timestamps for LogAppend" do data = <<0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 1, 0, 25, 116, 101, 115, 116, 95, 108, 111, 103, 95, 97,