Skip to content

Commit

Permalink
Add helper methods to RequestKind and ResponseKind (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Aug 28, 2024
1 parent bef3a72 commit d24e282
Show file tree
Hide file tree
Showing 5 changed files with 916 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ jobs:
- uses: dtolnay/rust-toolchain@stable
with:
components: clippy
- run: cargo clippy --workspace -- -D warnings
- run: cargo clippy --workspace --all-features -- -D warnings
9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ documentation = "https://docs.rs/kafka-protocol"
readme = "README.md"
keywords = ["kafka"]

[features]
# adds the ResponseKind + RequestKind enums with variants for every message type.
# disabled by default since it doubles clean release build times due to lots of generated code.
messages_enum = []

[dependencies]
bytes = "1.0.1"
Expand All @@ -34,3 +38,8 @@ anyhow = "1.0.80"

[dev-dependencies]
testcontainers = { version = "0.20.1", features = ["blocking", "watchdog"] }

# Display required features for items when rendering for docs.rs
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
135 changes: 135 additions & 0 deletions protocol_codegen/src/generate_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ pub fn run() -> Result<(), Error> {
"use crate::protocol::{{NewType, Request, StrBytes, HeaderVersion}};"
)?;
writeln!(module_file, "use std::convert::TryFrom;")?;
writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?;
writeln!(module_file, "use crate::protocol::Encodable;")?;
writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?;
writeln!(module_file, "use crate::protocol::Decodable;")?;
writeln!(module_file, "use anyhow::Result;")?;
writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?;
writeln!(module_file, "use anyhow::Context;")?;
writeln!(module_file)?;

for input_file_path in &input_file_paths {
Expand Down Expand Up @@ -252,6 +259,7 @@ pub fn run() -> Result<(), Error> {
module_file,
"/// Wrapping enum for all requests in the Kafka protocol."
)?;
writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?;
writeln!(module_file, "#[non_exhaustive]")?;
writeln!(module_file, "#[derive(Debug, Clone, PartialEq)]")?;
writeln!(module_file, "pub enum RequestKind {{")?;
Expand All @@ -267,7 +275,47 @@ pub fn run() -> Result<(), Error> {
writeln!(module_file, "}}")?;
writeln!(module_file)?;

writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?;
writeln!(module_file, "impl RequestKind {{")?;
writeln!(module_file, "/// Encode the message into the target buffer")?;
writeln!(
module_file,
"pub fn encode(&self, bytes: &mut bytes::BytesMut, version: i16) -> anyhow::Result<()> {{"
)?;
writeln!(module_file, "match self {{")?;
for (_, request_type) in request_types.iter() {
let variant = request_type.trim_end_matches("Request");
writeln!(
module_file,
"RequestKind::{variant}(x) => encode(x, bytes, version),"
)?;
}
writeln!(module_file, "}}")?;
writeln!(module_file, "}}")?;

writeln!(
module_file,
"/// Decode the message from the provided buffer and version"
)?;
writeln!(
module_file,
"pub fn decode(api_key: ApiKey, bytes: &mut bytes::Bytes, version: i16) -> anyhow::Result<RequestKind> {{"
)?;
writeln!(module_file, "match api_key {{")?;
for (_, request_type) in request_types.iter() {
let variant = request_type.trim_end_matches("Request");
writeln!(
module_file,
"ApiKey::{variant}Key => Ok(RequestKind::{variant}(decode(bytes, version)?)),"
)?;
}
writeln!(module_file, "}}")?;
writeln!(module_file, "}}")?;

writeln!(module_file, "}}")?;

for (_, request_type) in request_types.iter() {
writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?;
writeln!(module_file, "impl From<{request_type}> for RequestKind {{")?;
writeln!(
module_file,
Expand All @@ -280,12 +328,40 @@ pub fn run() -> Result<(), Error> {
writeln!(module_file)?;
}

writeln!(
module_file,
r#"
#[cfg(feature = "messages_enum")]
fn decode<T: Decodable>(bytes: &mut bytes::Bytes, version: i16) -> Result<T> {{
T::decode(bytes, version).with_context(|| {{
format!(
"Failed to decode {{}} v{{}} body",
std::any::type_name::<T>(),
version
)
}})
}}
#[cfg(feature = "messages_enum")]
fn encode<T: Encodable>(encodable: &T, bytes: &mut bytes::BytesMut, version: i16) -> Result<()> {{
encodable.encode(bytes, version).with_context(|| {{
format!(
"Failed to encode {{}} v{{}} body",
std::any::type_name::<T>(),
version
)
}})
}}
"#
)?;

writeln!(
module_file,
"/// Wrapping enum for all responses in the Kafka protocol."
)?;
writeln!(module_file, "#[non_exhaustive]")?;
writeln!(module_file, "#[derive(Debug, Clone, PartialEq)]")?;
writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?;
writeln!(module_file, "pub enum ResponseKind {{")?;
for (_, response_type) in response_types.iter() {
writeln!(module_file, " /// {},", response_type)?;
Expand All @@ -299,7 +375,66 @@ pub fn run() -> Result<(), Error> {
writeln!(module_file, "}}")?;
writeln!(module_file)?;

writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?;
writeln!(module_file, "impl ResponseKind {{")?;
writeln!(module_file, "/// Encode the message into the target buffer")?;
writeln!(
module_file,
"pub fn encode(&self, bytes: &mut bytes::BytesMut, version: i16) -> anyhow::Result<()> {{"
)?;
writeln!(module_file, "match self {{")?;
for (_, response_type) in response_types.iter() {
let variant = response_type.trim_end_matches("Response");
writeln!(
module_file,
"ResponseKind::{variant}(x) => encode(x, bytes, version),"
)?;
}
writeln!(module_file, "}}")?;
writeln!(module_file, "}}")?;

writeln!(
module_file,
"/// Decode the message from the provided buffer and version"
)?;
writeln!(
module_file,
"pub fn decode(api_key: ApiKey, bytes: &mut bytes::Bytes, version: i16) -> anyhow::Result<ResponseKind> {{"
)?;
writeln!(module_file, "match api_key {{")?;
for (_, response_type) in response_types.iter() {
let variant = response_type.trim_end_matches("Response");
writeln!(
module_file,
"ApiKey::{variant}Key => Ok(ResponseKind::{variant}(decode(bytes, version)?)),"
)?;
}
writeln!(module_file, "}}")?;
writeln!(module_file, "}}")?;

writeln!(
module_file,
"/// Get the version of request header that needs to be prepended to this message"
)?;
writeln!(
module_file,
"pub fn header_version(&self, version: i16) -> i16 {{"
)?;
writeln!(module_file, "match self {{")?;
for (_, response_type) in response_types.iter() {
let variant = response_type.trim_end_matches("Response");
writeln!(
module_file,
"ResponseKind::{variant}(_) => {response_type}::header_version(version),"
)?;
}
writeln!(module_file, "}}")?;
writeln!(module_file, "}}")?;
writeln!(module_file, "}}")?;
writeln!(module_file)?;

for (_, response_type) in response_types.iter() {
writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?;
writeln!(
module_file,
"impl From<{response_type}> for ResponseKind {{"
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@
//! }
//! ```
#![deny(missing_docs)]
// Display required features for items when rendering for docs.rs
#![cfg_attr(docsrs, feature(doc_auto_cfg))]

pub mod compression;
pub mod error;
Expand Down
Loading

0 comments on commit d24e282

Please sign in to comment.