diff --git a/src/records.rs b/src/records.rs index 1cf0690..6b3645e 100644 --- a/src/records.rs +++ b/src/records.rs @@ -27,7 +27,7 @@ //! for topic in res.responses { //! for partition in topic.partitions { //! let mut records = partition.records.unwrap(); -//! let records = RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data)).unwrap(); +//! let records = RecordBatchDecoder::decode_with_custom_compression(&mut records, Some(decompress_record_batch_data)).unwrap(); //! } //! } //! @@ -156,13 +156,34 @@ pub struct Record { const MAGIC_BYTE_OFFSET: usize = 16; impl RecordBatchEncoder { + /// Encode records into given buffer, using provided encoding options that select the encoding + /// strategy based on version. + pub fn encode<'a, B, I, CF>( + buf: &mut B, + records: I, + options: &RecordEncodeOptions, + ) -> Result<()> + where + B: ByteBufMut, + I: IntoIterator, + I::IntoIter: Clone, + CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>, + { + Self::encode_with_custom_compression( + buf, + records, + options, + None:: Result<()>>, + ) + } + /// Encode records into given buffer, using provided encoding options that select the encoding /// strategy based on version. /// # Arguments /// * `compressor` - A function that compresses the given batch of records. /// /// If `None`, the right compression algorithm will automatically be selected and applied. - pub fn encode<'a, B, I, CF>( + pub fn encode_with_custom_compression<'a, B, I, CF>( buf: &mut B, records: I, options: &RecordEncodeOptions, @@ -485,12 +506,26 @@ impl RecordBatchEncoder { } impl RecordBatchDecoder { + /// Decode the provided buffer into a vec of records. + pub fn decode(buf: &mut B) -> Result> + where + F: Fn(&mut bytes::Bytes, Compression) -> Result, + { + Self::decode_with_custom_compression( + buf, + None:: Result>, + ) + } + /// Decode the provided buffer into a vec of records. /// # Arguments /// * `decompressor` - A function that decompresses the given batch of records. /// /// If `None`, the right decompression algorithm will automatically be selected and applied. - pub fn decode(buf: &mut B, decompressor: Option) -> Result> + pub fn decode_with_custom_compression( + buf: &mut B, + decompressor: Option, + ) -> Result> where F: Fn(&mut bytes::Bytes, Compression) -> Result, { diff --git a/tests/all_tests/fetch_response.rs b/tests/all_tests/fetch_response.rs index cf11c4f..6806769 100644 --- a/tests/all_tests/fetch_response.rs +++ b/tests/all_tests/fetch_response.rs @@ -86,9 +86,11 @@ mod client_tests { assert_eq!(partition.aborted_transactions.as_ref().unwrap().len(), 0); let mut records = partition.records.unwrap(); - let records = - RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data)) - .unwrap(); + let records = RecordBatchDecoder::decode_with_custom_compression( + &mut records, + Some(decompress_record_batch_data), + ) + .unwrap(); assert_eq!(records.len(), 1); for record in records { assert_eq!( @@ -123,9 +125,11 @@ mod client_tests { assert_eq!(partition.aborted_transactions.as_ref().unwrap().len(), 0); let mut records = partition.records.unwrap(); - let records = - RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data)) - .unwrap(); + let records = RecordBatchDecoder::decode_with_custom_compression( + &mut records, + Some(decompress_record_batch_data), + ) + .unwrap(); assert_eq!(records.len(), 1); for record in records { assert_eq!( @@ -161,9 +165,11 @@ mod client_tests { assert_eq!(partition.aborted_transactions.as_ref().unwrap().len(), 0); let mut records = partition.records.unwrap(); - let records = - RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data)) - .unwrap(); + let records = RecordBatchDecoder::decode_with_custom_compression( + &mut records, + Some(decompress_record_batch_data), + ) + .unwrap(); assert_eq!(records.len(), 1); } } diff --git a/tests/all_tests/produce_fetch.rs b/tests/all_tests/produce_fetch.rs index 13114b8..d04878f 100644 --- a/tests/all_tests/produce_fetch.rs +++ b/tests/all_tests/produce_fetch.rs @@ -32,7 +32,7 @@ fn record_batch_produce_fetch() { ]; let mut encoded = BytesMut::new(); - RecordBatchEncoder::encode( + RecordBatchEncoder::encode_with_custom_compression( &mut encoded, &records, &RecordEncodeOptions { @@ -67,7 +67,7 @@ fn message_set_v1_produce_fetch() { ]; let mut encoded = BytesMut::new(); - RecordBatchEncoder::encode( + RecordBatchEncoder::encode_with_custom_compression( &mut encoded, &records, &RecordEncodeOptions { @@ -196,9 +196,11 @@ fn fetch_records( ); let mut fetched_records = partition_response.records.clone().unwrap(); - let fetched_records = - RecordBatchDecoder::decode(&mut fetched_records, Some(decompress_record_batch_data)) - .unwrap(); + let fetched_records = RecordBatchDecoder::decode_with_custom_compression( + &mut fetched_records, + Some(decompress_record_batch_data), + ) + .unwrap(); eprintln!("{expected:#?}"); eprintln!("{fetched_records:#?}");