diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 25140eb..e5b5817 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -32,4 +32,5 @@ jobs: - uses: dtolnay/rust-toolchain@stable with: components: clippy + - run: cargo clippy --workspace -- -D warnings - run: cargo clippy --workspace --all-features -- -D warnings diff --git a/Cargo.toml b/Cargo.toml index 1e53455..24b7967 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,10 @@ documentation = "https://docs.rs/kafka-protocol" readme = "README.md" keywords = ["kafka"] +[features] +# adds the ResponseKind + RequestKind enums with variants for every message type. +# disabled by default since it doubles clean release build times due to lots of generated code. +messages_enum = [] [dependencies] bytes = "1.0.1" @@ -34,3 +38,8 @@ anyhow = "1.0.80" [dev-dependencies] testcontainers = { version = "0.20.1", features = ["blocking", "watchdog"] } + +# Display required features for items when rendering for docs.rs +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] diff --git a/protocol_codegen/src/generate_messages.rs b/protocol_codegen/src/generate_messages.rs index 51cbcd5..6b3d88c 100644 --- a/protocol_codegen/src/generate_messages.rs +++ b/protocol_codegen/src/generate_messages.rs @@ -95,6 +95,13 @@ pub fn run() -> Result<(), Error> { "use crate::protocol::{{NewType, Request, StrBytes, HeaderVersion}};" )?; writeln!(module_file, "use std::convert::TryFrom;")?; + writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?; + writeln!(module_file, "use crate::protocol::Encodable;")?; + writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?; + writeln!(module_file, "use crate::protocol::Decodable;")?; + writeln!(module_file, "use anyhow::Result;")?; + writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?; + writeln!(module_file, "use anyhow::Context;")?; writeln!(module_file)?; for input_file_path in &input_file_paths { @@ -252,6 +259,7 @@ pub fn run() -> Result<(), Error> { module_file, "/// Wrapping enum for all requests in the Kafka protocol." )?; + writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?; writeln!(module_file, "#[non_exhaustive]")?; writeln!(module_file, "#[derive(Debug, Clone, PartialEq)]")?; writeln!(module_file, "pub enum RequestKind {{")?; @@ -267,7 +275,47 @@ pub fn run() -> Result<(), Error> { writeln!(module_file, "}}")?; writeln!(module_file)?; + writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?; + writeln!(module_file, "impl RequestKind {{")?; + writeln!(module_file, "/// Encode the message into the target buffer")?; + writeln!( + module_file, + "pub fn encode(&self, bytes: &mut bytes::BytesMut, version: i16) -> anyhow::Result<()> {{" + )?; + writeln!(module_file, "match self {{")?; + for (_, request_type) in request_types.iter() { + let variant = request_type.trim_end_matches("Request"); + writeln!( + module_file, + "RequestKind::{variant}(x) => encode(x, bytes, version)," + )?; + } + writeln!(module_file, "}}")?; + writeln!(module_file, "}}")?; + + writeln!( + module_file, + "/// Decode the message from the provided buffer and version" + )?; + writeln!( + module_file, + "pub fn decode(api_key: ApiKey, bytes: &mut bytes::Bytes, version: i16) -> anyhow::Result {{" + )?; + writeln!(module_file, "match api_key {{")?; for (_, request_type) in request_types.iter() { + let variant = request_type.trim_end_matches("Request"); + writeln!( + module_file, + "ApiKey::{variant}Key => Ok(RequestKind::{variant}(decode(bytes, version)?))," + )?; + } + writeln!(module_file, "}}")?; + writeln!(module_file, "}}")?; + + writeln!(module_file, "}}")?; + + for (_, request_type) in request_types.iter() { + writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?; writeln!(module_file, "impl From<{request_type}> for RequestKind {{")?; writeln!( module_file, @@ -280,12 +328,40 @@ pub fn run() -> Result<(), Error> { writeln!(module_file)?; } + writeln!( + module_file, + r#" +#[cfg(feature = "messages_enum")] +fn decode(bytes: &mut bytes::Bytes, version: i16) -> Result {{ + T::decode(bytes, version).with_context(|| {{ + format!( + "Failed to decode {{}} v{{}} body", + std::any::type_name::(), + version + ) + }}) +}} + +#[cfg(feature = "messages_enum")] +fn encode(encodable: &T, bytes: &mut bytes::BytesMut, version: i16) -> Result<()> {{ + encodable.encode(bytes, version).with_context(|| {{ + format!( + "Failed to encode {{}} v{{}} body", + std::any::type_name::(), + version + ) + }}) +}} + "# + )?; + writeln!( module_file, "/// Wrapping enum for all responses in the Kafka protocol." )?; writeln!(module_file, "#[non_exhaustive]")?; writeln!(module_file, "#[derive(Debug, Clone, PartialEq)]")?; + writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?; writeln!(module_file, "pub enum ResponseKind {{")?; for (_, response_type) in response_types.iter() { writeln!(module_file, " /// {},", response_type)?; @@ -299,7 +375,66 @@ pub fn run() -> Result<(), Error> { writeln!(module_file, "}}")?; writeln!(module_file)?; + writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?; + writeln!(module_file, "impl ResponseKind {{")?; + writeln!(module_file, "/// Encode the message into the target buffer")?; + writeln!( + module_file, + "pub fn encode(&self, bytes: &mut bytes::BytesMut, version: i16) -> anyhow::Result<()> {{" + )?; + writeln!(module_file, "match self {{")?; + for (_, response_type) in response_types.iter() { + let variant = response_type.trim_end_matches("Response"); + writeln!( + module_file, + "ResponseKind::{variant}(x) => encode(x, bytes, version)," + )?; + } + writeln!(module_file, "}}")?; + writeln!(module_file, "}}")?; + + writeln!( + module_file, + "/// Decode the message from the provided buffer and version" + )?; + writeln!( + module_file, + "pub fn decode(api_key: ApiKey, bytes: &mut bytes::Bytes, version: i16) -> anyhow::Result {{" + )?; + writeln!(module_file, "match api_key {{")?; + for (_, response_type) in response_types.iter() { + let variant = response_type.trim_end_matches("Response"); + writeln!( + module_file, + "ApiKey::{variant}Key => Ok(ResponseKind::{variant}(decode(bytes, version)?))," + )?; + } + writeln!(module_file, "}}")?; + writeln!(module_file, "}}")?; + + writeln!( + module_file, + "/// Get the version of request header that needs to be prepended to this message" + )?; + writeln!( + module_file, + "pub fn header_version(&self, version: i16) -> i16 {{" + )?; + writeln!(module_file, "match self {{")?; + for (_, response_type) in response_types.iter() { + let variant = response_type.trim_end_matches("Response"); + writeln!( + module_file, + "ResponseKind::{variant}(_) => {response_type}::header_version(version)," + )?; + } + writeln!(module_file, "}}")?; + writeln!(module_file, "}}")?; + writeln!(module_file, "}}")?; + writeln!(module_file)?; + for (_, response_type) in response_types.iter() { + writeln!(module_file, "#[cfg(feature = \"messages_enum\")]")?; writeln!( module_file, "impl From<{response_type}> for ResponseKind {{" diff --git a/src/lib.rs b/src/lib.rs index dfc8aa4..cc02870 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -103,6 +103,8 @@ //! } //! ``` #![deny(missing_docs)] +// Display required features for items when rendering for docs.rs +#![cfg_attr(docsrs, feature(doc_auto_cfg))] pub mod compression; pub mod error; diff --git a/src/messages.rs b/src/messages.rs index d2336f5..23e5584 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -3,7 +3,14 @@ //! These messages are generated programmatically. See the [Kafka's protocol documentation](https://kafka.apache.org/protocol.html) for more information about a given message type. // WARNING: the items of this module are generated and should not be edited directly. +#[cfg(feature = "messages_enum")] +use crate::protocol::Decodable; +#[cfg(feature = "messages_enum")] +use crate::protocol::Encodable; use crate::protocol::{HeaderVersion, NewType, Request, StrBytes}; +#[cfg(feature = "messages_enum")] +use anyhow::Context; +use anyhow::Result; use std::convert::TryFrom; pub mod consumer_protocol_assignment; @@ -1324,6 +1331,7 @@ impl TryFrom for ApiKey { } /// Wrapping enum for all requests in the Kafka protocol. +#[cfg(feature = "messages_enum")] #[non_exhaustive] #[derive(Debug, Clone, PartialEq)] pub enum RequestKind { @@ -1477,453 +1485,773 @@ pub enum RequestKind { ListClientMetricsResources(ListClientMetricsResourcesRequest), } +#[cfg(feature = "messages_enum")] +impl RequestKind { + /// Encode the message into the target buffer + pub fn encode(&self, bytes: &mut bytes::BytesMut, version: i16) -> anyhow::Result<()> { + match self { + RequestKind::Produce(x) => encode(x, bytes, version), + RequestKind::Fetch(x) => encode(x, bytes, version), + RequestKind::ListOffsets(x) => encode(x, bytes, version), + RequestKind::Metadata(x) => encode(x, bytes, version), + RequestKind::LeaderAndIsr(x) => encode(x, bytes, version), + RequestKind::StopReplica(x) => encode(x, bytes, version), + RequestKind::UpdateMetadata(x) => encode(x, bytes, version), + RequestKind::ControlledShutdown(x) => encode(x, bytes, version), + RequestKind::OffsetCommit(x) => encode(x, bytes, version), + RequestKind::OffsetFetch(x) => encode(x, bytes, version), + RequestKind::FindCoordinator(x) => encode(x, bytes, version), + RequestKind::JoinGroup(x) => encode(x, bytes, version), + RequestKind::Heartbeat(x) => encode(x, bytes, version), + RequestKind::LeaveGroup(x) => encode(x, bytes, version), + RequestKind::SyncGroup(x) => encode(x, bytes, version), + RequestKind::DescribeGroups(x) => encode(x, bytes, version), + RequestKind::ListGroups(x) => encode(x, bytes, version), + RequestKind::SaslHandshake(x) => encode(x, bytes, version), + RequestKind::ApiVersions(x) => encode(x, bytes, version), + RequestKind::CreateTopics(x) => encode(x, bytes, version), + RequestKind::DeleteTopics(x) => encode(x, bytes, version), + RequestKind::DeleteRecords(x) => encode(x, bytes, version), + RequestKind::InitProducerId(x) => encode(x, bytes, version), + RequestKind::OffsetForLeaderEpoch(x) => encode(x, bytes, version), + RequestKind::AddPartitionsToTxn(x) => encode(x, bytes, version), + RequestKind::AddOffsetsToTxn(x) => encode(x, bytes, version), + RequestKind::EndTxn(x) => encode(x, bytes, version), + RequestKind::WriteTxnMarkers(x) => encode(x, bytes, version), + RequestKind::TxnOffsetCommit(x) => encode(x, bytes, version), + RequestKind::DescribeAcls(x) => encode(x, bytes, version), + RequestKind::CreateAcls(x) => encode(x, bytes, version), + RequestKind::DeleteAcls(x) => encode(x, bytes, version), + RequestKind::DescribeConfigs(x) => encode(x, bytes, version), + RequestKind::AlterConfigs(x) => encode(x, bytes, version), + RequestKind::AlterReplicaLogDirs(x) => encode(x, bytes, version), + RequestKind::DescribeLogDirs(x) => encode(x, bytes, version), + RequestKind::SaslAuthenticate(x) => encode(x, bytes, version), + RequestKind::CreatePartitions(x) => encode(x, bytes, version), + RequestKind::CreateDelegationToken(x) => encode(x, bytes, version), + RequestKind::RenewDelegationToken(x) => encode(x, bytes, version), + RequestKind::ExpireDelegationToken(x) => encode(x, bytes, version), + RequestKind::DescribeDelegationToken(x) => encode(x, bytes, version), + RequestKind::DeleteGroups(x) => encode(x, bytes, version), + RequestKind::ElectLeaders(x) => encode(x, bytes, version), + RequestKind::IncrementalAlterConfigs(x) => encode(x, bytes, version), + RequestKind::AlterPartitionReassignments(x) => encode(x, bytes, version), + RequestKind::ListPartitionReassignments(x) => encode(x, bytes, version), + RequestKind::OffsetDelete(x) => encode(x, bytes, version), + RequestKind::DescribeClientQuotas(x) => encode(x, bytes, version), + RequestKind::AlterClientQuotas(x) => encode(x, bytes, version), + RequestKind::DescribeUserScramCredentials(x) => encode(x, bytes, version), + RequestKind::AlterUserScramCredentials(x) => encode(x, bytes, version), + RequestKind::Vote(x) => encode(x, bytes, version), + RequestKind::BeginQuorumEpoch(x) => encode(x, bytes, version), + RequestKind::EndQuorumEpoch(x) => encode(x, bytes, version), + RequestKind::DescribeQuorum(x) => encode(x, bytes, version), + RequestKind::AlterPartition(x) => encode(x, bytes, version), + RequestKind::UpdateFeatures(x) => encode(x, bytes, version), + RequestKind::Envelope(x) => encode(x, bytes, version), + RequestKind::FetchSnapshot(x) => encode(x, bytes, version), + RequestKind::DescribeCluster(x) => encode(x, bytes, version), + RequestKind::DescribeProducers(x) => encode(x, bytes, version), + RequestKind::BrokerRegistration(x) => encode(x, bytes, version), + RequestKind::BrokerHeartbeat(x) => encode(x, bytes, version), + RequestKind::UnregisterBroker(x) => encode(x, bytes, version), + RequestKind::DescribeTransactions(x) => encode(x, bytes, version), + RequestKind::ListTransactions(x) => encode(x, bytes, version), + RequestKind::AllocateProducerIds(x) => encode(x, bytes, version), + RequestKind::ConsumerGroupHeartbeat(x) => encode(x, bytes, version), + RequestKind::ControllerRegistration(x) => encode(x, bytes, version), + RequestKind::GetTelemetrySubscriptions(x) => encode(x, bytes, version), + RequestKind::PushTelemetry(x) => encode(x, bytes, version), + RequestKind::AssignReplicasToDirs(x) => encode(x, bytes, version), + RequestKind::ListClientMetricsResources(x) => encode(x, bytes, version), + } + } + /// Decode the message from the provided buffer and version + pub fn decode( + api_key: ApiKey, + bytes: &mut bytes::Bytes, + version: i16, + ) -> anyhow::Result { + match api_key { + ApiKey::ProduceKey => Ok(RequestKind::Produce(decode(bytes, version)?)), + ApiKey::FetchKey => Ok(RequestKind::Fetch(decode(bytes, version)?)), + ApiKey::ListOffsetsKey => Ok(RequestKind::ListOffsets(decode(bytes, version)?)), + ApiKey::MetadataKey => Ok(RequestKind::Metadata(decode(bytes, version)?)), + ApiKey::LeaderAndIsrKey => Ok(RequestKind::LeaderAndIsr(decode(bytes, version)?)), + ApiKey::StopReplicaKey => Ok(RequestKind::StopReplica(decode(bytes, version)?)), + ApiKey::UpdateMetadataKey => Ok(RequestKind::UpdateMetadata(decode(bytes, version)?)), + ApiKey::ControlledShutdownKey => { + Ok(RequestKind::ControlledShutdown(decode(bytes, version)?)) + } + ApiKey::OffsetCommitKey => Ok(RequestKind::OffsetCommit(decode(bytes, version)?)), + ApiKey::OffsetFetchKey => Ok(RequestKind::OffsetFetch(decode(bytes, version)?)), + ApiKey::FindCoordinatorKey => Ok(RequestKind::FindCoordinator(decode(bytes, version)?)), + ApiKey::JoinGroupKey => Ok(RequestKind::JoinGroup(decode(bytes, version)?)), + ApiKey::HeartbeatKey => Ok(RequestKind::Heartbeat(decode(bytes, version)?)), + ApiKey::LeaveGroupKey => Ok(RequestKind::LeaveGroup(decode(bytes, version)?)), + ApiKey::SyncGroupKey => Ok(RequestKind::SyncGroup(decode(bytes, version)?)), + ApiKey::DescribeGroupsKey => Ok(RequestKind::DescribeGroups(decode(bytes, version)?)), + ApiKey::ListGroupsKey => Ok(RequestKind::ListGroups(decode(bytes, version)?)), + ApiKey::SaslHandshakeKey => Ok(RequestKind::SaslHandshake(decode(bytes, version)?)), + ApiKey::ApiVersionsKey => Ok(RequestKind::ApiVersions(decode(bytes, version)?)), + ApiKey::CreateTopicsKey => Ok(RequestKind::CreateTopics(decode(bytes, version)?)), + ApiKey::DeleteTopicsKey => Ok(RequestKind::DeleteTopics(decode(bytes, version)?)), + ApiKey::DeleteRecordsKey => Ok(RequestKind::DeleteRecords(decode(bytes, version)?)), + ApiKey::InitProducerIdKey => Ok(RequestKind::InitProducerId(decode(bytes, version)?)), + ApiKey::OffsetForLeaderEpochKey => { + Ok(RequestKind::OffsetForLeaderEpoch(decode(bytes, version)?)) + } + ApiKey::AddPartitionsToTxnKey => { + Ok(RequestKind::AddPartitionsToTxn(decode(bytes, version)?)) + } + ApiKey::AddOffsetsToTxnKey => Ok(RequestKind::AddOffsetsToTxn(decode(bytes, version)?)), + ApiKey::EndTxnKey => Ok(RequestKind::EndTxn(decode(bytes, version)?)), + ApiKey::WriteTxnMarkersKey => Ok(RequestKind::WriteTxnMarkers(decode(bytes, version)?)), + ApiKey::TxnOffsetCommitKey => Ok(RequestKind::TxnOffsetCommit(decode(bytes, version)?)), + ApiKey::DescribeAclsKey => Ok(RequestKind::DescribeAcls(decode(bytes, version)?)), + ApiKey::CreateAclsKey => Ok(RequestKind::CreateAcls(decode(bytes, version)?)), + ApiKey::DeleteAclsKey => Ok(RequestKind::DeleteAcls(decode(bytes, version)?)), + ApiKey::DescribeConfigsKey => Ok(RequestKind::DescribeConfigs(decode(bytes, version)?)), + ApiKey::AlterConfigsKey => Ok(RequestKind::AlterConfigs(decode(bytes, version)?)), + ApiKey::AlterReplicaLogDirsKey => { + Ok(RequestKind::AlterReplicaLogDirs(decode(bytes, version)?)) + } + ApiKey::DescribeLogDirsKey => Ok(RequestKind::DescribeLogDirs(decode(bytes, version)?)), + ApiKey::SaslAuthenticateKey => { + Ok(RequestKind::SaslAuthenticate(decode(bytes, version)?)) + } + ApiKey::CreatePartitionsKey => { + Ok(RequestKind::CreatePartitions(decode(bytes, version)?)) + } + ApiKey::CreateDelegationTokenKey => { + Ok(RequestKind::CreateDelegationToken(decode(bytes, version)?)) + } + ApiKey::RenewDelegationTokenKey => { + Ok(RequestKind::RenewDelegationToken(decode(bytes, version)?)) + } + ApiKey::ExpireDelegationTokenKey => { + Ok(RequestKind::ExpireDelegationToken(decode(bytes, version)?)) + } + ApiKey::DescribeDelegationTokenKey => Ok(RequestKind::DescribeDelegationToken(decode( + bytes, version, + )?)), + ApiKey::DeleteGroupsKey => Ok(RequestKind::DeleteGroups(decode(bytes, version)?)), + ApiKey::ElectLeadersKey => Ok(RequestKind::ElectLeaders(decode(bytes, version)?)), + ApiKey::IncrementalAlterConfigsKey => Ok(RequestKind::IncrementalAlterConfigs(decode( + bytes, version, + )?)), + ApiKey::AlterPartitionReassignmentsKey => Ok(RequestKind::AlterPartitionReassignments( + decode(bytes, version)?, + )), + ApiKey::ListPartitionReassignmentsKey => Ok(RequestKind::ListPartitionReassignments( + decode(bytes, version)?, + )), + ApiKey::OffsetDeleteKey => Ok(RequestKind::OffsetDelete(decode(bytes, version)?)), + ApiKey::DescribeClientQuotasKey => { + Ok(RequestKind::DescribeClientQuotas(decode(bytes, version)?)) + } + ApiKey::AlterClientQuotasKey => { + Ok(RequestKind::AlterClientQuotas(decode(bytes, version)?)) + } + ApiKey::DescribeUserScramCredentialsKey => Ok( + RequestKind::DescribeUserScramCredentials(decode(bytes, version)?), + ), + ApiKey::AlterUserScramCredentialsKey => Ok(RequestKind::AlterUserScramCredentials( + decode(bytes, version)?, + )), + ApiKey::VoteKey => Ok(RequestKind::Vote(decode(bytes, version)?)), + ApiKey::BeginQuorumEpochKey => { + Ok(RequestKind::BeginQuorumEpoch(decode(bytes, version)?)) + } + ApiKey::EndQuorumEpochKey => Ok(RequestKind::EndQuorumEpoch(decode(bytes, version)?)), + ApiKey::DescribeQuorumKey => Ok(RequestKind::DescribeQuorum(decode(bytes, version)?)), + ApiKey::AlterPartitionKey => Ok(RequestKind::AlterPartition(decode(bytes, version)?)), + ApiKey::UpdateFeaturesKey => Ok(RequestKind::UpdateFeatures(decode(bytes, version)?)), + ApiKey::EnvelopeKey => Ok(RequestKind::Envelope(decode(bytes, version)?)), + ApiKey::FetchSnapshotKey => Ok(RequestKind::FetchSnapshot(decode(bytes, version)?)), + ApiKey::DescribeClusterKey => Ok(RequestKind::DescribeCluster(decode(bytes, version)?)), + ApiKey::DescribeProducersKey => { + Ok(RequestKind::DescribeProducers(decode(bytes, version)?)) + } + ApiKey::BrokerRegistrationKey => { + Ok(RequestKind::BrokerRegistration(decode(bytes, version)?)) + } + ApiKey::BrokerHeartbeatKey => Ok(RequestKind::BrokerHeartbeat(decode(bytes, version)?)), + ApiKey::UnregisterBrokerKey => { + Ok(RequestKind::UnregisterBroker(decode(bytes, version)?)) + } + ApiKey::DescribeTransactionsKey => { + Ok(RequestKind::DescribeTransactions(decode(bytes, version)?)) + } + ApiKey::ListTransactionsKey => { + Ok(RequestKind::ListTransactions(decode(bytes, version)?)) + } + ApiKey::AllocateProducerIdsKey => { + Ok(RequestKind::AllocateProducerIds(decode(bytes, version)?)) + } + ApiKey::ConsumerGroupHeartbeatKey => { + Ok(RequestKind::ConsumerGroupHeartbeat(decode(bytes, version)?)) + } + ApiKey::ControllerRegistrationKey => { + Ok(RequestKind::ControllerRegistration(decode(bytes, version)?)) + } + ApiKey::GetTelemetrySubscriptionsKey => Ok(RequestKind::GetTelemetrySubscriptions( + decode(bytes, version)?, + )), + ApiKey::PushTelemetryKey => Ok(RequestKind::PushTelemetry(decode(bytes, version)?)), + ApiKey::AssignReplicasToDirsKey => { + Ok(RequestKind::AssignReplicasToDirs(decode(bytes, version)?)) + } + ApiKey::ListClientMetricsResourcesKey => Ok(RequestKind::ListClientMetricsResources( + decode(bytes, version)?, + )), + } + } +} +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: ProduceRequest) -> RequestKind { RequestKind::Produce(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: FetchRequest) -> RequestKind { RequestKind::Fetch(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: ListOffsetsRequest) -> RequestKind { RequestKind::ListOffsets(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: MetadataRequest) -> RequestKind { RequestKind::Metadata(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: LeaderAndIsrRequest) -> RequestKind { RequestKind::LeaderAndIsr(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: StopReplicaRequest) -> RequestKind { RequestKind::StopReplica(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: UpdateMetadataRequest) -> RequestKind { RequestKind::UpdateMetadata(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: ControlledShutdownRequest) -> RequestKind { RequestKind::ControlledShutdown(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: OffsetCommitRequest) -> RequestKind { RequestKind::OffsetCommit(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: OffsetFetchRequest) -> RequestKind { RequestKind::OffsetFetch(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: FindCoordinatorRequest) -> RequestKind { RequestKind::FindCoordinator(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: JoinGroupRequest) -> RequestKind { RequestKind::JoinGroup(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: HeartbeatRequest) -> RequestKind { RequestKind::Heartbeat(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: LeaveGroupRequest) -> RequestKind { RequestKind::LeaveGroup(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: SyncGroupRequest) -> RequestKind { RequestKind::SyncGroup(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: DescribeGroupsRequest) -> RequestKind { RequestKind::DescribeGroups(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: ListGroupsRequest) -> RequestKind { RequestKind::ListGroups(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: SaslHandshakeRequest) -> RequestKind { RequestKind::SaslHandshake(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: ApiVersionsRequest) -> RequestKind { RequestKind::ApiVersions(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: CreateTopicsRequest) -> RequestKind { RequestKind::CreateTopics(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: DeleteTopicsRequest) -> RequestKind { RequestKind::DeleteTopics(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: DeleteRecordsRequest) -> RequestKind { RequestKind::DeleteRecords(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: InitProducerIdRequest) -> RequestKind { RequestKind::InitProducerId(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: OffsetForLeaderEpochRequest) -> RequestKind { RequestKind::OffsetForLeaderEpoch(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: AddPartitionsToTxnRequest) -> RequestKind { RequestKind::AddPartitionsToTxn(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: AddOffsetsToTxnRequest) -> RequestKind { RequestKind::AddOffsetsToTxn(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: EndTxnRequest) -> RequestKind { RequestKind::EndTxn(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: WriteTxnMarkersRequest) -> RequestKind { RequestKind::WriteTxnMarkers(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: TxnOffsetCommitRequest) -> RequestKind { RequestKind::TxnOffsetCommit(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: DescribeAclsRequest) -> RequestKind { RequestKind::DescribeAcls(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: CreateAclsRequest) -> RequestKind { RequestKind::CreateAcls(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: DeleteAclsRequest) -> RequestKind { RequestKind::DeleteAcls(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: DescribeConfigsRequest) -> RequestKind { RequestKind::DescribeConfigs(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: AlterConfigsRequest) -> RequestKind { RequestKind::AlterConfigs(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: AlterReplicaLogDirsRequest) -> RequestKind { RequestKind::AlterReplicaLogDirs(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: DescribeLogDirsRequest) -> RequestKind { RequestKind::DescribeLogDirs(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: SaslAuthenticateRequest) -> RequestKind { RequestKind::SaslAuthenticate(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: CreatePartitionsRequest) -> RequestKind { RequestKind::CreatePartitions(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: CreateDelegationTokenRequest) -> RequestKind { RequestKind::CreateDelegationToken(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: RenewDelegationTokenRequest) -> RequestKind { RequestKind::RenewDelegationToken(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: ExpireDelegationTokenRequest) -> RequestKind { RequestKind::ExpireDelegationToken(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: DescribeDelegationTokenRequest) -> RequestKind { RequestKind::DescribeDelegationToken(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: DeleteGroupsRequest) -> RequestKind { RequestKind::DeleteGroups(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: ElectLeadersRequest) -> RequestKind { RequestKind::ElectLeaders(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: IncrementalAlterConfigsRequest) -> RequestKind { RequestKind::IncrementalAlterConfigs(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: AlterPartitionReassignmentsRequest) -> RequestKind { RequestKind::AlterPartitionReassignments(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: ListPartitionReassignmentsRequest) -> RequestKind { RequestKind::ListPartitionReassignments(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: OffsetDeleteRequest) -> RequestKind { RequestKind::OffsetDelete(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: DescribeClientQuotasRequest) -> RequestKind { RequestKind::DescribeClientQuotas(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: AlterClientQuotasRequest) -> RequestKind { RequestKind::AlterClientQuotas(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: DescribeUserScramCredentialsRequest) -> RequestKind { RequestKind::DescribeUserScramCredentials(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: AlterUserScramCredentialsRequest) -> RequestKind { RequestKind::AlterUserScramCredentials(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: VoteRequest) -> RequestKind { RequestKind::Vote(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: BeginQuorumEpochRequest) -> RequestKind { RequestKind::BeginQuorumEpoch(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: EndQuorumEpochRequest) -> RequestKind { RequestKind::EndQuorumEpoch(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: DescribeQuorumRequest) -> RequestKind { RequestKind::DescribeQuorum(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: AlterPartitionRequest) -> RequestKind { RequestKind::AlterPartition(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: UpdateFeaturesRequest) -> RequestKind { RequestKind::UpdateFeatures(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: EnvelopeRequest) -> RequestKind { RequestKind::Envelope(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: FetchSnapshotRequest) -> RequestKind { RequestKind::FetchSnapshot(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: DescribeClusterRequest) -> RequestKind { RequestKind::DescribeCluster(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: DescribeProducersRequest) -> RequestKind { RequestKind::DescribeProducers(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: BrokerRegistrationRequest) -> RequestKind { RequestKind::BrokerRegistration(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: BrokerHeartbeatRequest) -> RequestKind { RequestKind::BrokerHeartbeat(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: UnregisterBrokerRequest) -> RequestKind { RequestKind::UnregisterBroker(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: DescribeTransactionsRequest) -> RequestKind { RequestKind::DescribeTransactions(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: ListTransactionsRequest) -> RequestKind { RequestKind::ListTransactions(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: AllocateProducerIdsRequest) -> RequestKind { RequestKind::AllocateProducerIds(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: ConsumerGroupHeartbeatRequest) -> RequestKind { RequestKind::ConsumerGroupHeartbeat(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: ControllerRegistrationRequest) -> RequestKind { RequestKind::ControllerRegistration(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: GetTelemetrySubscriptionsRequest) -> RequestKind { RequestKind::GetTelemetrySubscriptions(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: PushTelemetryRequest) -> RequestKind { RequestKind::PushTelemetry(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: AssignReplicasToDirsRequest) -> RequestKind { RequestKind::AssignReplicasToDirs(value) } } +#[cfg(feature = "messages_enum")] impl From for RequestKind { fn from(value: ListClientMetricsResourcesRequest) -> RequestKind { RequestKind::ListClientMetricsResources(value) } } +#[cfg(feature = "messages_enum")] +fn decode(bytes: &mut bytes::Bytes, version: i16) -> Result { + T::decode(bytes, version).with_context(|| { + format!( + "Failed to decode {} v{} body", + std::any::type_name::(), + version + ) + }) +} + +#[cfg(feature = "messages_enum")] +fn encode(encodable: &T, bytes: &mut bytes::BytesMut, version: i16) -> Result<()> { + encodable.encode(bytes, version).with_context(|| { + format!( + "Failed to encode {} v{} body", + std::any::type_name::(), + version + ) + }) +} + /// Wrapping enum for all responses in the Kafka protocol. #[non_exhaustive] #[derive(Debug, Clone, PartialEq)] +#[cfg(feature = "messages_enum")] pub enum ResponseKind { /// ProduceResponse, Produce(ProduceResponse), @@ -2075,444 +2403,885 @@ pub enum ResponseKind { ListClientMetricsResources(ListClientMetricsResourcesResponse), } +#[cfg(feature = "messages_enum")] +impl ResponseKind { + /// Encode the message into the target buffer + pub fn encode(&self, bytes: &mut bytes::BytesMut, version: i16) -> anyhow::Result<()> { + match self { + ResponseKind::Produce(x) => encode(x, bytes, version), + ResponseKind::Fetch(x) => encode(x, bytes, version), + ResponseKind::ListOffsets(x) => encode(x, bytes, version), + ResponseKind::Metadata(x) => encode(x, bytes, version), + ResponseKind::LeaderAndIsr(x) => encode(x, bytes, version), + ResponseKind::StopReplica(x) => encode(x, bytes, version), + ResponseKind::UpdateMetadata(x) => encode(x, bytes, version), + ResponseKind::ControlledShutdown(x) => encode(x, bytes, version), + ResponseKind::OffsetCommit(x) => encode(x, bytes, version), + ResponseKind::OffsetFetch(x) => encode(x, bytes, version), + ResponseKind::FindCoordinator(x) => encode(x, bytes, version), + ResponseKind::JoinGroup(x) => encode(x, bytes, version), + ResponseKind::Heartbeat(x) => encode(x, bytes, version), + ResponseKind::LeaveGroup(x) => encode(x, bytes, version), + ResponseKind::SyncGroup(x) => encode(x, bytes, version), + ResponseKind::DescribeGroups(x) => encode(x, bytes, version), + ResponseKind::ListGroups(x) => encode(x, bytes, version), + ResponseKind::SaslHandshake(x) => encode(x, bytes, version), + ResponseKind::ApiVersions(x) => encode(x, bytes, version), + ResponseKind::CreateTopics(x) => encode(x, bytes, version), + ResponseKind::DeleteTopics(x) => encode(x, bytes, version), + ResponseKind::DeleteRecords(x) => encode(x, bytes, version), + ResponseKind::InitProducerId(x) => encode(x, bytes, version), + ResponseKind::OffsetForLeaderEpoch(x) => encode(x, bytes, version), + ResponseKind::AddPartitionsToTxn(x) => encode(x, bytes, version), + ResponseKind::AddOffsetsToTxn(x) => encode(x, bytes, version), + ResponseKind::EndTxn(x) => encode(x, bytes, version), + ResponseKind::WriteTxnMarkers(x) => encode(x, bytes, version), + ResponseKind::TxnOffsetCommit(x) => encode(x, bytes, version), + ResponseKind::DescribeAcls(x) => encode(x, bytes, version), + ResponseKind::CreateAcls(x) => encode(x, bytes, version), + ResponseKind::DeleteAcls(x) => encode(x, bytes, version), + ResponseKind::DescribeConfigs(x) => encode(x, bytes, version), + ResponseKind::AlterConfigs(x) => encode(x, bytes, version), + ResponseKind::AlterReplicaLogDirs(x) => encode(x, bytes, version), + ResponseKind::DescribeLogDirs(x) => encode(x, bytes, version), + ResponseKind::SaslAuthenticate(x) => encode(x, bytes, version), + ResponseKind::CreatePartitions(x) => encode(x, bytes, version), + ResponseKind::CreateDelegationToken(x) => encode(x, bytes, version), + ResponseKind::RenewDelegationToken(x) => encode(x, bytes, version), + ResponseKind::ExpireDelegationToken(x) => encode(x, bytes, version), + ResponseKind::DescribeDelegationToken(x) => encode(x, bytes, version), + ResponseKind::DeleteGroups(x) => encode(x, bytes, version), + ResponseKind::ElectLeaders(x) => encode(x, bytes, version), + ResponseKind::IncrementalAlterConfigs(x) => encode(x, bytes, version), + ResponseKind::AlterPartitionReassignments(x) => encode(x, bytes, version), + ResponseKind::ListPartitionReassignments(x) => encode(x, bytes, version), + ResponseKind::OffsetDelete(x) => encode(x, bytes, version), + ResponseKind::DescribeClientQuotas(x) => encode(x, bytes, version), + ResponseKind::AlterClientQuotas(x) => encode(x, bytes, version), + ResponseKind::DescribeUserScramCredentials(x) => encode(x, bytes, version), + ResponseKind::AlterUserScramCredentials(x) => encode(x, bytes, version), + ResponseKind::Vote(x) => encode(x, bytes, version), + ResponseKind::BeginQuorumEpoch(x) => encode(x, bytes, version), + ResponseKind::EndQuorumEpoch(x) => encode(x, bytes, version), + ResponseKind::DescribeQuorum(x) => encode(x, bytes, version), + ResponseKind::AlterPartition(x) => encode(x, bytes, version), + ResponseKind::UpdateFeatures(x) => encode(x, bytes, version), + ResponseKind::Envelope(x) => encode(x, bytes, version), + ResponseKind::FetchSnapshot(x) => encode(x, bytes, version), + ResponseKind::DescribeCluster(x) => encode(x, bytes, version), + ResponseKind::DescribeProducers(x) => encode(x, bytes, version), + ResponseKind::BrokerRegistration(x) => encode(x, bytes, version), + ResponseKind::BrokerHeartbeat(x) => encode(x, bytes, version), + ResponseKind::UnregisterBroker(x) => encode(x, bytes, version), + ResponseKind::DescribeTransactions(x) => encode(x, bytes, version), + ResponseKind::ListTransactions(x) => encode(x, bytes, version), + ResponseKind::AllocateProducerIds(x) => encode(x, bytes, version), + ResponseKind::ConsumerGroupHeartbeat(x) => encode(x, bytes, version), + ResponseKind::ControllerRegistration(x) => encode(x, bytes, version), + ResponseKind::GetTelemetrySubscriptions(x) => encode(x, bytes, version), + ResponseKind::PushTelemetry(x) => encode(x, bytes, version), + ResponseKind::AssignReplicasToDirs(x) => encode(x, bytes, version), + ResponseKind::ListClientMetricsResources(x) => encode(x, bytes, version), + } + } + /// Decode the message from the provided buffer and version + pub fn decode( + api_key: ApiKey, + bytes: &mut bytes::Bytes, + version: i16, + ) -> anyhow::Result { + match api_key { + ApiKey::ProduceKey => Ok(ResponseKind::Produce(decode(bytes, version)?)), + ApiKey::FetchKey => Ok(ResponseKind::Fetch(decode(bytes, version)?)), + ApiKey::ListOffsetsKey => Ok(ResponseKind::ListOffsets(decode(bytes, version)?)), + ApiKey::MetadataKey => Ok(ResponseKind::Metadata(decode(bytes, version)?)), + ApiKey::LeaderAndIsrKey => Ok(ResponseKind::LeaderAndIsr(decode(bytes, version)?)), + ApiKey::StopReplicaKey => Ok(ResponseKind::StopReplica(decode(bytes, version)?)), + ApiKey::UpdateMetadataKey => Ok(ResponseKind::UpdateMetadata(decode(bytes, version)?)), + ApiKey::ControlledShutdownKey => { + Ok(ResponseKind::ControlledShutdown(decode(bytes, version)?)) + } + ApiKey::OffsetCommitKey => Ok(ResponseKind::OffsetCommit(decode(bytes, version)?)), + ApiKey::OffsetFetchKey => Ok(ResponseKind::OffsetFetch(decode(bytes, version)?)), + ApiKey::FindCoordinatorKey => { + Ok(ResponseKind::FindCoordinator(decode(bytes, version)?)) + } + ApiKey::JoinGroupKey => Ok(ResponseKind::JoinGroup(decode(bytes, version)?)), + ApiKey::HeartbeatKey => Ok(ResponseKind::Heartbeat(decode(bytes, version)?)), + ApiKey::LeaveGroupKey => Ok(ResponseKind::LeaveGroup(decode(bytes, version)?)), + ApiKey::SyncGroupKey => Ok(ResponseKind::SyncGroup(decode(bytes, version)?)), + ApiKey::DescribeGroupsKey => Ok(ResponseKind::DescribeGroups(decode(bytes, version)?)), + ApiKey::ListGroupsKey => Ok(ResponseKind::ListGroups(decode(bytes, version)?)), + ApiKey::SaslHandshakeKey => Ok(ResponseKind::SaslHandshake(decode(bytes, version)?)), + ApiKey::ApiVersionsKey => Ok(ResponseKind::ApiVersions(decode(bytes, version)?)), + ApiKey::CreateTopicsKey => Ok(ResponseKind::CreateTopics(decode(bytes, version)?)), + ApiKey::DeleteTopicsKey => Ok(ResponseKind::DeleteTopics(decode(bytes, version)?)), + ApiKey::DeleteRecordsKey => Ok(ResponseKind::DeleteRecords(decode(bytes, version)?)), + ApiKey::InitProducerIdKey => Ok(ResponseKind::InitProducerId(decode(bytes, version)?)), + ApiKey::OffsetForLeaderEpochKey => { + Ok(ResponseKind::OffsetForLeaderEpoch(decode(bytes, version)?)) + } + ApiKey::AddPartitionsToTxnKey => { + Ok(ResponseKind::AddPartitionsToTxn(decode(bytes, version)?)) + } + ApiKey::AddOffsetsToTxnKey => { + Ok(ResponseKind::AddOffsetsToTxn(decode(bytes, version)?)) + } + ApiKey::EndTxnKey => Ok(ResponseKind::EndTxn(decode(bytes, version)?)), + ApiKey::WriteTxnMarkersKey => { + Ok(ResponseKind::WriteTxnMarkers(decode(bytes, version)?)) + } + ApiKey::TxnOffsetCommitKey => { + Ok(ResponseKind::TxnOffsetCommit(decode(bytes, version)?)) + } + ApiKey::DescribeAclsKey => Ok(ResponseKind::DescribeAcls(decode(bytes, version)?)), + ApiKey::CreateAclsKey => Ok(ResponseKind::CreateAcls(decode(bytes, version)?)), + ApiKey::DeleteAclsKey => Ok(ResponseKind::DeleteAcls(decode(bytes, version)?)), + ApiKey::DescribeConfigsKey => { + Ok(ResponseKind::DescribeConfigs(decode(bytes, version)?)) + } + ApiKey::AlterConfigsKey => Ok(ResponseKind::AlterConfigs(decode(bytes, version)?)), + ApiKey::AlterReplicaLogDirsKey => { + Ok(ResponseKind::AlterReplicaLogDirs(decode(bytes, version)?)) + } + ApiKey::DescribeLogDirsKey => { + Ok(ResponseKind::DescribeLogDirs(decode(bytes, version)?)) + } + ApiKey::SaslAuthenticateKey => { + Ok(ResponseKind::SaslAuthenticate(decode(bytes, version)?)) + } + ApiKey::CreatePartitionsKey => { + Ok(ResponseKind::CreatePartitions(decode(bytes, version)?)) + } + ApiKey::CreateDelegationTokenKey => { + Ok(ResponseKind::CreateDelegationToken(decode(bytes, version)?)) + } + ApiKey::RenewDelegationTokenKey => { + Ok(ResponseKind::RenewDelegationToken(decode(bytes, version)?)) + } + ApiKey::ExpireDelegationTokenKey => { + Ok(ResponseKind::ExpireDelegationToken(decode(bytes, version)?)) + } + ApiKey::DescribeDelegationTokenKey => Ok(ResponseKind::DescribeDelegationToken( + decode(bytes, version)?, + )), + ApiKey::DeleteGroupsKey => Ok(ResponseKind::DeleteGroups(decode(bytes, version)?)), + ApiKey::ElectLeadersKey => Ok(ResponseKind::ElectLeaders(decode(bytes, version)?)), + ApiKey::IncrementalAlterConfigsKey => Ok(ResponseKind::IncrementalAlterConfigs( + decode(bytes, version)?, + )), + ApiKey::AlterPartitionReassignmentsKey => Ok( + ResponseKind::AlterPartitionReassignments(decode(bytes, version)?), + ), + ApiKey::ListPartitionReassignmentsKey => Ok(ResponseKind::ListPartitionReassignments( + decode(bytes, version)?, + )), + ApiKey::OffsetDeleteKey => Ok(ResponseKind::OffsetDelete(decode(bytes, version)?)), + ApiKey::DescribeClientQuotasKey => { + Ok(ResponseKind::DescribeClientQuotas(decode(bytes, version)?)) + } + ApiKey::AlterClientQuotasKey => { + Ok(ResponseKind::AlterClientQuotas(decode(bytes, version)?)) + } + ApiKey::DescribeUserScramCredentialsKey => Ok( + ResponseKind::DescribeUserScramCredentials(decode(bytes, version)?), + ), + ApiKey::AlterUserScramCredentialsKey => Ok(ResponseKind::AlterUserScramCredentials( + decode(bytes, version)?, + )), + ApiKey::VoteKey => Ok(ResponseKind::Vote(decode(bytes, version)?)), + ApiKey::BeginQuorumEpochKey => { + Ok(ResponseKind::BeginQuorumEpoch(decode(bytes, version)?)) + } + ApiKey::EndQuorumEpochKey => Ok(ResponseKind::EndQuorumEpoch(decode(bytes, version)?)), + ApiKey::DescribeQuorumKey => Ok(ResponseKind::DescribeQuorum(decode(bytes, version)?)), + ApiKey::AlterPartitionKey => Ok(ResponseKind::AlterPartition(decode(bytes, version)?)), + ApiKey::UpdateFeaturesKey => Ok(ResponseKind::UpdateFeatures(decode(bytes, version)?)), + ApiKey::EnvelopeKey => Ok(ResponseKind::Envelope(decode(bytes, version)?)), + ApiKey::FetchSnapshotKey => Ok(ResponseKind::FetchSnapshot(decode(bytes, version)?)), + ApiKey::DescribeClusterKey => { + Ok(ResponseKind::DescribeCluster(decode(bytes, version)?)) + } + ApiKey::DescribeProducersKey => { + Ok(ResponseKind::DescribeProducers(decode(bytes, version)?)) + } + ApiKey::BrokerRegistrationKey => { + Ok(ResponseKind::BrokerRegistration(decode(bytes, version)?)) + } + ApiKey::BrokerHeartbeatKey => { + Ok(ResponseKind::BrokerHeartbeat(decode(bytes, version)?)) + } + ApiKey::UnregisterBrokerKey => { + Ok(ResponseKind::UnregisterBroker(decode(bytes, version)?)) + } + ApiKey::DescribeTransactionsKey => { + Ok(ResponseKind::DescribeTransactions(decode(bytes, version)?)) + } + ApiKey::ListTransactionsKey => { + Ok(ResponseKind::ListTransactions(decode(bytes, version)?)) + } + ApiKey::AllocateProducerIdsKey => { + Ok(ResponseKind::AllocateProducerIds(decode(bytes, version)?)) + } + ApiKey::ConsumerGroupHeartbeatKey => Ok(ResponseKind::ConsumerGroupHeartbeat(decode( + bytes, version, + )?)), + ApiKey::ControllerRegistrationKey => Ok(ResponseKind::ControllerRegistration(decode( + bytes, version, + )?)), + ApiKey::GetTelemetrySubscriptionsKey => Ok(ResponseKind::GetTelemetrySubscriptions( + decode(bytes, version)?, + )), + ApiKey::PushTelemetryKey => Ok(ResponseKind::PushTelemetry(decode(bytes, version)?)), + ApiKey::AssignReplicasToDirsKey => { + Ok(ResponseKind::AssignReplicasToDirs(decode(bytes, version)?)) + } + ApiKey::ListClientMetricsResourcesKey => Ok(ResponseKind::ListClientMetricsResources( + decode(bytes, version)?, + )), + } + } + /// Get the version of request header that needs to be prepended to this message + pub fn header_version(&self, version: i16) -> i16 { + match self { + ResponseKind::Produce(_) => ProduceResponse::header_version(version), + ResponseKind::Fetch(_) => FetchResponse::header_version(version), + ResponseKind::ListOffsets(_) => ListOffsetsResponse::header_version(version), + ResponseKind::Metadata(_) => MetadataResponse::header_version(version), + ResponseKind::LeaderAndIsr(_) => LeaderAndIsrResponse::header_version(version), + ResponseKind::StopReplica(_) => StopReplicaResponse::header_version(version), + ResponseKind::UpdateMetadata(_) => UpdateMetadataResponse::header_version(version), + ResponseKind::ControlledShutdown(_) => { + ControlledShutdownResponse::header_version(version) + } + ResponseKind::OffsetCommit(_) => OffsetCommitResponse::header_version(version), + ResponseKind::OffsetFetch(_) => OffsetFetchResponse::header_version(version), + ResponseKind::FindCoordinator(_) => FindCoordinatorResponse::header_version(version), + ResponseKind::JoinGroup(_) => JoinGroupResponse::header_version(version), + ResponseKind::Heartbeat(_) => HeartbeatResponse::header_version(version), + ResponseKind::LeaveGroup(_) => LeaveGroupResponse::header_version(version), + ResponseKind::SyncGroup(_) => SyncGroupResponse::header_version(version), + ResponseKind::DescribeGroups(_) => DescribeGroupsResponse::header_version(version), + ResponseKind::ListGroups(_) => ListGroupsResponse::header_version(version), + ResponseKind::SaslHandshake(_) => SaslHandshakeResponse::header_version(version), + ResponseKind::ApiVersions(_) => ApiVersionsResponse::header_version(version), + ResponseKind::CreateTopics(_) => CreateTopicsResponse::header_version(version), + ResponseKind::DeleteTopics(_) => DeleteTopicsResponse::header_version(version), + ResponseKind::DeleteRecords(_) => DeleteRecordsResponse::header_version(version), + ResponseKind::InitProducerId(_) => InitProducerIdResponse::header_version(version), + ResponseKind::OffsetForLeaderEpoch(_) => { + OffsetForLeaderEpochResponse::header_version(version) + } + ResponseKind::AddPartitionsToTxn(_) => { + AddPartitionsToTxnResponse::header_version(version) + } + ResponseKind::AddOffsetsToTxn(_) => AddOffsetsToTxnResponse::header_version(version), + ResponseKind::EndTxn(_) => EndTxnResponse::header_version(version), + ResponseKind::WriteTxnMarkers(_) => WriteTxnMarkersResponse::header_version(version), + ResponseKind::TxnOffsetCommit(_) => TxnOffsetCommitResponse::header_version(version), + ResponseKind::DescribeAcls(_) => DescribeAclsResponse::header_version(version), + ResponseKind::CreateAcls(_) => CreateAclsResponse::header_version(version), + ResponseKind::DeleteAcls(_) => DeleteAclsResponse::header_version(version), + ResponseKind::DescribeConfigs(_) => DescribeConfigsResponse::header_version(version), + ResponseKind::AlterConfigs(_) => AlterConfigsResponse::header_version(version), + ResponseKind::AlterReplicaLogDirs(_) => { + AlterReplicaLogDirsResponse::header_version(version) + } + ResponseKind::DescribeLogDirs(_) => DescribeLogDirsResponse::header_version(version), + ResponseKind::SaslAuthenticate(_) => SaslAuthenticateResponse::header_version(version), + ResponseKind::CreatePartitions(_) => CreatePartitionsResponse::header_version(version), + ResponseKind::CreateDelegationToken(_) => { + CreateDelegationTokenResponse::header_version(version) + } + ResponseKind::RenewDelegationToken(_) => { + RenewDelegationTokenResponse::header_version(version) + } + ResponseKind::ExpireDelegationToken(_) => { + ExpireDelegationTokenResponse::header_version(version) + } + ResponseKind::DescribeDelegationToken(_) => { + DescribeDelegationTokenResponse::header_version(version) + } + ResponseKind::DeleteGroups(_) => DeleteGroupsResponse::header_version(version), + ResponseKind::ElectLeaders(_) => ElectLeadersResponse::header_version(version), + ResponseKind::IncrementalAlterConfigs(_) => { + IncrementalAlterConfigsResponse::header_version(version) + } + ResponseKind::AlterPartitionReassignments(_) => { + AlterPartitionReassignmentsResponse::header_version(version) + } + ResponseKind::ListPartitionReassignments(_) => { + ListPartitionReassignmentsResponse::header_version(version) + } + ResponseKind::OffsetDelete(_) => OffsetDeleteResponse::header_version(version), + ResponseKind::DescribeClientQuotas(_) => { + DescribeClientQuotasResponse::header_version(version) + } + ResponseKind::AlterClientQuotas(_) => { + AlterClientQuotasResponse::header_version(version) + } + ResponseKind::DescribeUserScramCredentials(_) => { + DescribeUserScramCredentialsResponse::header_version(version) + } + ResponseKind::AlterUserScramCredentials(_) => { + AlterUserScramCredentialsResponse::header_version(version) + } + ResponseKind::Vote(_) => VoteResponse::header_version(version), + ResponseKind::BeginQuorumEpoch(_) => BeginQuorumEpochResponse::header_version(version), + ResponseKind::EndQuorumEpoch(_) => EndQuorumEpochResponse::header_version(version), + ResponseKind::DescribeQuorum(_) => DescribeQuorumResponse::header_version(version), + ResponseKind::AlterPartition(_) => AlterPartitionResponse::header_version(version), + ResponseKind::UpdateFeatures(_) => UpdateFeaturesResponse::header_version(version), + ResponseKind::Envelope(_) => EnvelopeResponse::header_version(version), + ResponseKind::FetchSnapshot(_) => FetchSnapshotResponse::header_version(version), + ResponseKind::DescribeCluster(_) => DescribeClusterResponse::header_version(version), + ResponseKind::DescribeProducers(_) => { + DescribeProducersResponse::header_version(version) + } + ResponseKind::BrokerRegistration(_) => { + BrokerRegistrationResponse::header_version(version) + } + ResponseKind::BrokerHeartbeat(_) => BrokerHeartbeatResponse::header_version(version), + ResponseKind::UnregisterBroker(_) => UnregisterBrokerResponse::header_version(version), + ResponseKind::DescribeTransactions(_) => { + DescribeTransactionsResponse::header_version(version) + } + ResponseKind::ListTransactions(_) => ListTransactionsResponse::header_version(version), + ResponseKind::AllocateProducerIds(_) => { + AllocateProducerIdsResponse::header_version(version) + } + ResponseKind::ConsumerGroupHeartbeat(_) => { + ConsumerGroupHeartbeatResponse::header_version(version) + } + ResponseKind::ControllerRegistration(_) => { + ControllerRegistrationResponse::header_version(version) + } + ResponseKind::GetTelemetrySubscriptions(_) => { + GetTelemetrySubscriptionsResponse::header_version(version) + } + ResponseKind::PushTelemetry(_) => PushTelemetryResponse::header_version(version), + ResponseKind::AssignReplicasToDirs(_) => { + AssignReplicasToDirsResponse::header_version(version) + } + ResponseKind::ListClientMetricsResources(_) => { + ListClientMetricsResourcesResponse::header_version(version) + } + } + } +} + +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: ProduceResponse) -> ResponseKind { ResponseKind::Produce(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: FetchResponse) -> ResponseKind { ResponseKind::Fetch(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: ListOffsetsResponse) -> ResponseKind { ResponseKind::ListOffsets(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: MetadataResponse) -> ResponseKind { ResponseKind::Metadata(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: LeaderAndIsrResponse) -> ResponseKind { ResponseKind::LeaderAndIsr(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: StopReplicaResponse) -> ResponseKind { ResponseKind::StopReplica(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: UpdateMetadataResponse) -> ResponseKind { ResponseKind::UpdateMetadata(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: ControlledShutdownResponse) -> ResponseKind { ResponseKind::ControlledShutdown(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: OffsetCommitResponse) -> ResponseKind { ResponseKind::OffsetCommit(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: OffsetFetchResponse) -> ResponseKind { ResponseKind::OffsetFetch(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: FindCoordinatorResponse) -> ResponseKind { ResponseKind::FindCoordinator(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: JoinGroupResponse) -> ResponseKind { ResponseKind::JoinGroup(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: HeartbeatResponse) -> ResponseKind { ResponseKind::Heartbeat(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: LeaveGroupResponse) -> ResponseKind { ResponseKind::LeaveGroup(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: SyncGroupResponse) -> ResponseKind { ResponseKind::SyncGroup(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: DescribeGroupsResponse) -> ResponseKind { ResponseKind::DescribeGroups(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: ListGroupsResponse) -> ResponseKind { ResponseKind::ListGroups(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: SaslHandshakeResponse) -> ResponseKind { ResponseKind::SaslHandshake(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: ApiVersionsResponse) -> ResponseKind { ResponseKind::ApiVersions(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: CreateTopicsResponse) -> ResponseKind { ResponseKind::CreateTopics(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: DeleteTopicsResponse) -> ResponseKind { ResponseKind::DeleteTopics(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: DeleteRecordsResponse) -> ResponseKind { ResponseKind::DeleteRecords(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: InitProducerIdResponse) -> ResponseKind { ResponseKind::InitProducerId(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: OffsetForLeaderEpochResponse) -> ResponseKind { ResponseKind::OffsetForLeaderEpoch(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: AddPartitionsToTxnResponse) -> ResponseKind { ResponseKind::AddPartitionsToTxn(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: AddOffsetsToTxnResponse) -> ResponseKind { ResponseKind::AddOffsetsToTxn(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: EndTxnResponse) -> ResponseKind { ResponseKind::EndTxn(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: WriteTxnMarkersResponse) -> ResponseKind { ResponseKind::WriteTxnMarkers(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: TxnOffsetCommitResponse) -> ResponseKind { ResponseKind::TxnOffsetCommit(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: DescribeAclsResponse) -> ResponseKind { ResponseKind::DescribeAcls(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: CreateAclsResponse) -> ResponseKind { ResponseKind::CreateAcls(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: DeleteAclsResponse) -> ResponseKind { ResponseKind::DeleteAcls(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: DescribeConfigsResponse) -> ResponseKind { ResponseKind::DescribeConfigs(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: AlterConfigsResponse) -> ResponseKind { ResponseKind::AlterConfigs(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: AlterReplicaLogDirsResponse) -> ResponseKind { ResponseKind::AlterReplicaLogDirs(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: DescribeLogDirsResponse) -> ResponseKind { ResponseKind::DescribeLogDirs(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: SaslAuthenticateResponse) -> ResponseKind { ResponseKind::SaslAuthenticate(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: CreatePartitionsResponse) -> ResponseKind { ResponseKind::CreatePartitions(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: CreateDelegationTokenResponse) -> ResponseKind { ResponseKind::CreateDelegationToken(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: RenewDelegationTokenResponse) -> ResponseKind { ResponseKind::RenewDelegationToken(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: ExpireDelegationTokenResponse) -> ResponseKind { ResponseKind::ExpireDelegationToken(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: DescribeDelegationTokenResponse) -> ResponseKind { ResponseKind::DescribeDelegationToken(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: DeleteGroupsResponse) -> ResponseKind { ResponseKind::DeleteGroups(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: ElectLeadersResponse) -> ResponseKind { ResponseKind::ElectLeaders(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: IncrementalAlterConfigsResponse) -> ResponseKind { ResponseKind::IncrementalAlterConfigs(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: AlterPartitionReassignmentsResponse) -> ResponseKind { ResponseKind::AlterPartitionReassignments(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: ListPartitionReassignmentsResponse) -> ResponseKind { ResponseKind::ListPartitionReassignments(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: OffsetDeleteResponse) -> ResponseKind { ResponseKind::OffsetDelete(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: DescribeClientQuotasResponse) -> ResponseKind { ResponseKind::DescribeClientQuotas(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: AlterClientQuotasResponse) -> ResponseKind { ResponseKind::AlterClientQuotas(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: DescribeUserScramCredentialsResponse) -> ResponseKind { ResponseKind::DescribeUserScramCredentials(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: AlterUserScramCredentialsResponse) -> ResponseKind { ResponseKind::AlterUserScramCredentials(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: VoteResponse) -> ResponseKind { ResponseKind::Vote(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: BeginQuorumEpochResponse) -> ResponseKind { ResponseKind::BeginQuorumEpoch(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: EndQuorumEpochResponse) -> ResponseKind { ResponseKind::EndQuorumEpoch(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: DescribeQuorumResponse) -> ResponseKind { ResponseKind::DescribeQuorum(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: AlterPartitionResponse) -> ResponseKind { ResponseKind::AlterPartition(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: UpdateFeaturesResponse) -> ResponseKind { ResponseKind::UpdateFeatures(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: EnvelopeResponse) -> ResponseKind { ResponseKind::Envelope(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: FetchSnapshotResponse) -> ResponseKind { ResponseKind::FetchSnapshot(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: DescribeClusterResponse) -> ResponseKind { ResponseKind::DescribeCluster(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: DescribeProducersResponse) -> ResponseKind { ResponseKind::DescribeProducers(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: BrokerRegistrationResponse) -> ResponseKind { ResponseKind::BrokerRegistration(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: BrokerHeartbeatResponse) -> ResponseKind { ResponseKind::BrokerHeartbeat(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: UnregisterBrokerResponse) -> ResponseKind { ResponseKind::UnregisterBroker(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: DescribeTransactionsResponse) -> ResponseKind { ResponseKind::DescribeTransactions(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: ListTransactionsResponse) -> ResponseKind { ResponseKind::ListTransactions(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: AllocateProducerIdsResponse) -> ResponseKind { ResponseKind::AllocateProducerIds(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: ConsumerGroupHeartbeatResponse) -> ResponseKind { ResponseKind::ConsumerGroupHeartbeat(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: ControllerRegistrationResponse) -> ResponseKind { ResponseKind::ControllerRegistration(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: GetTelemetrySubscriptionsResponse) -> ResponseKind { ResponseKind::GetTelemetrySubscriptions(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: PushTelemetryResponse) -> ResponseKind { ResponseKind::PushTelemetry(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: AssignReplicasToDirsResponse) -> ResponseKind { ResponseKind::AssignReplicasToDirs(value) } } +#[cfg(feature = "messages_enum")] impl From for ResponseKind { fn from(value: ListClientMetricsResourcesResponse) -> ResponseKind { ResponseKind::ListClientMetricsResources(value)