Skip to content

Commit

Permalink
[Breaking change] Fix calling Records::encode / Records::decode w…
Browse files Browse the repository at this point in the history
…ith None (#100)
  • Loading branch information
rukai authored Dec 2, 2024
1 parent 6f3dbd8 commit 565b89a
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 17 deletions.
41 changes: 38 additions & 3 deletions src/records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
//! for topic in res.responses {
//! for partition in topic.partitions {
//! let mut records = partition.records.unwrap();
//! let records = RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data)).unwrap();
//! let records = RecordBatchDecoder::decode_with_custom_compression(&mut records, Some(decompress_record_batch_data)).unwrap();
//! }
//! }
//!
Expand Down Expand Up @@ -156,13 +156,34 @@ pub struct Record {
const MAGIC_BYTE_OFFSET: usize = 16;

impl RecordBatchEncoder {
/// Encode records into given buffer, using provided encoding options that select the encoding
/// strategy based on version.
pub fn encode<'a, B, I, CF>(
buf: &mut B,
records: I,
options: &RecordEncodeOptions,
) -> Result<()>
where
B: ByteBufMut,
I: IntoIterator<Item = &'a Record>,
I::IntoIter: Clone,
CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
{
Self::encode_with_custom_compression(
buf,
records,
options,
None::<fn(&mut BytesMut, &mut B, Compression) -> Result<()>>,
)
}

/// Encode records into given buffer, using provided encoding options that select the encoding
/// strategy based on version.
/// # Arguments
/// * `compressor` - A function that compresses the given batch of records.
///
/// If `None`, the right compression algorithm will automatically be selected and applied.
pub fn encode<'a, B, I, CF>(
pub fn encode_with_custom_compression<'a, B, I, CF>(
buf: &mut B,
records: I,
options: &RecordEncodeOptions,
Expand Down Expand Up @@ -485,12 +506,26 @@ impl RecordBatchEncoder {
}

impl RecordBatchDecoder {
/// Decode the provided buffer into a vec of records.
pub fn decode<B: ByteBuf, F>(buf: &mut B) -> Result<Vec<Record>>
where
F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
{
Self::decode_with_custom_compression(
buf,
None::<fn(&mut bytes::Bytes, Compression) -> Result<B>>,
)
}

/// Decode the provided buffer into a vec of records.
/// # Arguments
/// * `decompressor` - A function that decompresses the given batch of records.
///
/// If `None`, the right decompression algorithm will automatically be selected and applied.
pub fn decode<B: ByteBuf, F>(buf: &mut B, decompressor: Option<F>) -> Result<Vec<Record>>
pub fn decode_with_custom_compression<B: ByteBuf, F>(
buf: &mut B,
decompressor: Option<F>,
) -> Result<Vec<Record>>
where
F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
{
Expand Down
24 changes: 15 additions & 9 deletions tests/all_tests/fetch_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ mod client_tests {
assert_eq!(partition.aborted_transactions.as_ref().unwrap().len(), 0);

let mut records = partition.records.unwrap();
let records =
RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data))
.unwrap();
let records = RecordBatchDecoder::decode_with_custom_compression(
&mut records,
Some(decompress_record_batch_data),
)
.unwrap();
assert_eq!(records.len(), 1);
for record in records {
assert_eq!(
Expand Down Expand Up @@ -123,9 +125,11 @@ mod client_tests {
assert_eq!(partition.aborted_transactions.as_ref().unwrap().len(), 0);

let mut records = partition.records.unwrap();
let records =
RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data))
.unwrap();
let records = RecordBatchDecoder::decode_with_custom_compression(
&mut records,
Some(decompress_record_batch_data),
)
.unwrap();
assert_eq!(records.len(), 1);
for record in records {
assert_eq!(
Expand Down Expand Up @@ -161,9 +165,11 @@ mod client_tests {
assert_eq!(partition.aborted_transactions.as_ref().unwrap().len(), 0);

let mut records = partition.records.unwrap();
let records =
RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data))
.unwrap();
let records = RecordBatchDecoder::decode_with_custom_compression(
&mut records,
Some(decompress_record_batch_data),
)
.unwrap();
assert_eq!(records.len(), 1);
}
}
Expand Down
12 changes: 7 additions & 5 deletions tests/all_tests/produce_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn record_batch_produce_fetch() {
];

let mut encoded = BytesMut::new();
RecordBatchEncoder::encode(
RecordBatchEncoder::encode_with_custom_compression(
&mut encoded,
&records,
&RecordEncodeOptions {
Expand Down Expand Up @@ -67,7 +67,7 @@ fn message_set_v1_produce_fetch() {
];

let mut encoded = BytesMut::new();
RecordBatchEncoder::encode(
RecordBatchEncoder::encode_with_custom_compression(
&mut encoded,
&records,
&RecordEncodeOptions {
Expand Down Expand Up @@ -196,9 +196,11 @@ fn fetch_records(
);

let mut fetched_records = partition_response.records.clone().unwrap();
let fetched_records =
RecordBatchDecoder::decode(&mut fetched_records, Some(decompress_record_batch_data))
.unwrap();
let fetched_records = RecordBatchDecoder::decode_with_custom_compression(
&mut fetched_records,
Some(decompress_record_batch_data),
)
.unwrap();

eprintln!("{expected:#?}");
eprintln!("{fetched_records:#?}");
Expand Down

0 comments on commit 565b89a

Please sign in to comment.