Skip to content

Commit

Permalink
Improve ApiKey and versioning
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 21, 2024
1 parent 6f3dbd8 commit 9a8d839
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 5 deletions.
49 changes: 44 additions & 5 deletions protocol_codegen/src/generate_messages.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
Expand All @@ -11,13 +11,10 @@ mod generate;
mod parse;
mod spec;

use spec::SpecType;
use spec::{SpecType, VersionSpec};

pub fn run(messages_module_dir: &str, mut input_file_paths: Vec<PathBuf>) -> Result<(), Error> {
input_file_paths.sort();
let mut entity_types = BTreeSet::new();
let mut request_types = BTreeMap::new();
let mut response_types = BTreeMap::new();

let module_path = format!("{}.rs", messages_module_dir);

Expand All @@ -38,6 +35,7 @@ pub fn run(messages_module_dir: &str, mut input_file_paths: Vec<PathBuf>) -> Res
)?;
writeln!(m, "#[cfg(all(feature = \"client\", feature = \"broker\"))]")?;
writeln!(m, "use crate::protocol::Request;")?;
writeln!(m, "use crate::protocol::VersionRange;")?;
writeln!(m, "use std::convert::TryFrom;")?;
writeln!(m, "#[cfg(feature = \"messages_enums\")]")?;
writeln!(m, "#[cfg(any(feature = \"client\", feature = \"broker\"))]")?;
Expand All @@ -51,15 +49,22 @@ pub fn run(messages_module_dir: &str, mut input_file_paths: Vec<PathBuf>) -> Res
writeln!(m, "use anyhow::Context;")?;
writeln!(m)?;

let mut entity_types = BTreeSet::new();
let mut request_types = BTreeMap::new();
let mut response_types = BTreeMap::new();
let mut api_key_to_valid_version: HashMap<i16, VersionSpec> = HashMap::new();

for input_file_path in &input_file_paths {
let spec = parse::parse(input_file_path)?;
let spec_meta = (spec.type_, spec.api_key);
let valid_versions = spec.valid_versions;

let outcome = generate::generate(messages_module_dir, spec)?;
if let Some(output) = outcome {
match spec_meta {
(SpecType::Request, Some(k)) => {
request_types.insert(k, output);
api_key_to_valid_version.insert(k, valid_versions);
}
(SpecType::Response, Some(k)) => {
response_types.insert(k, output);
Expand Down Expand Up @@ -179,6 +184,40 @@ pub fn run(messages_module_dir: &str, mut input_file_paths: Vec<PathBuf>) -> Res
}
writeln!(m, " }}")?;
writeln!(m, " }}")?;

writeln!(
m,
" /// Returns the valid versions that can be used with this ApiKey"
)?;
writeln!(m, " pub fn valid_versions(&self) -> VersionRange {{")?;
writeln!(m, " match self {{")?;
for (api_key, request_type) in request_types.iter() {
let valid_versions = api_key_to_valid_version
.get(api_key)
.unwrap()
.range()
.unwrap();
writeln!(
m,
"ApiKey::{} => VersionRange {{ min: {}, max: {} }},",
request_type.replace("Request", "Key"),
valid_versions.start(),
valid_versions.end(),
)?;
}
writeln!(m, " }}")?;
writeln!(m, " }}")?;

writeln!(
m,
r#"
/// Iterate through every ApiKey variant in the order of the internal code.
pub fn iterate_all() -> impl Iterator<Item = ApiKey> {{
(0..i16::MAX).map_while(|i| ApiKey::try_from(i).ok())
}}
"#
)?;

writeln!(m, "}}")?;

writeln!(m, "impl TryFrom<i16> for ApiKey {{")?;
Expand Down
87 changes: 87 additions & 0 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::protocol::Decodable;
use crate::protocol::Encodable;
#[cfg(all(feature = "client", feature = "broker"))]
use crate::protocol::Request;
use crate::protocol::VersionRange;
use crate::protocol::{HeaderVersion, NewType, StrBytes};
#[cfg(feature = "messages_enums")]
#[cfg(any(feature = "client", feature = "broker"))]
Expand Down Expand Up @@ -1346,6 +1347,92 @@ impl ApiKey {
}
}
}
/// Returns the valid versions that can be used with this ApiKey
pub fn valid_versions(&self) -> VersionRange {
match self {
ApiKey::ProduceKey => VersionRange { min: 0, max: 11 },
ApiKey::FetchKey => VersionRange { min: 0, max: 16 },
ApiKey::ListOffsetsKey => VersionRange { min: 0, max: 8 },
ApiKey::MetadataKey => VersionRange { min: 0, max: 12 },
ApiKey::LeaderAndIsrKey => VersionRange { min: 0, max: 7 },
ApiKey::StopReplicaKey => VersionRange { min: 0, max: 4 },
ApiKey::UpdateMetadataKey => VersionRange { min: 0, max: 8 },
ApiKey::ControlledShutdownKey => VersionRange { min: 0, max: 3 },
ApiKey::OffsetCommitKey => VersionRange { min: 0, max: 9 },
ApiKey::OffsetFetchKey => VersionRange { min: 0, max: 9 },
ApiKey::FindCoordinatorKey => VersionRange { min: 0, max: 5 },
ApiKey::JoinGroupKey => VersionRange { min: 0, max: 9 },
ApiKey::HeartbeatKey => VersionRange { min: 0, max: 4 },
ApiKey::LeaveGroupKey => VersionRange { min: 0, max: 5 },
ApiKey::SyncGroupKey => VersionRange { min: 0, max: 5 },
ApiKey::DescribeGroupsKey => VersionRange { min: 0, max: 5 },
ApiKey::ListGroupsKey => VersionRange { min: 0, max: 5 },
ApiKey::SaslHandshakeKey => VersionRange { min: 0, max: 1 },
ApiKey::ApiVersionsKey => VersionRange { min: 0, max: 3 },
ApiKey::CreateTopicsKey => VersionRange { min: 0, max: 7 },
ApiKey::DeleteTopicsKey => VersionRange { min: 0, max: 6 },
ApiKey::DeleteRecordsKey => VersionRange { min: 0, max: 2 },
ApiKey::InitProducerIdKey => VersionRange { min: 0, max: 5 },
ApiKey::OffsetForLeaderEpochKey => VersionRange { min: 0, max: 4 },
ApiKey::AddPartitionsToTxnKey => VersionRange { min: 0, max: 5 },
ApiKey::AddOffsetsToTxnKey => VersionRange { min: 0, max: 4 },
ApiKey::EndTxnKey => VersionRange { min: 0, max: 4 },
ApiKey::WriteTxnMarkersKey => VersionRange { min: 0, max: 1 },
ApiKey::TxnOffsetCommitKey => VersionRange { min: 0, max: 4 },
ApiKey::DescribeAclsKey => VersionRange { min: 0, max: 3 },
ApiKey::CreateAclsKey => VersionRange { min: 0, max: 3 },
ApiKey::DeleteAclsKey => VersionRange { min: 0, max: 3 },
ApiKey::DescribeConfigsKey => VersionRange { min: 0, max: 4 },
ApiKey::AlterConfigsKey => VersionRange { min: 0, max: 2 },
ApiKey::AlterReplicaLogDirsKey => VersionRange { min: 0, max: 2 },
ApiKey::DescribeLogDirsKey => VersionRange { min: 0, max: 4 },
ApiKey::SaslAuthenticateKey => VersionRange { min: 0, max: 2 },
ApiKey::CreatePartitionsKey => VersionRange { min: 0, max: 3 },
ApiKey::CreateDelegationTokenKey => VersionRange { min: 0, max: 3 },
ApiKey::RenewDelegationTokenKey => VersionRange { min: 0, max: 2 },
ApiKey::ExpireDelegationTokenKey => VersionRange { min: 0, max: 2 },
ApiKey::DescribeDelegationTokenKey => VersionRange { min: 0, max: 3 },
ApiKey::DeleteGroupsKey => VersionRange { min: 0, max: 2 },
ApiKey::ElectLeadersKey => VersionRange { min: 0, max: 2 },
ApiKey::IncrementalAlterConfigsKey => VersionRange { min: 0, max: 1 },
ApiKey::AlterPartitionReassignmentsKey => VersionRange { min: 0, max: 0 },
ApiKey::ListPartitionReassignmentsKey => VersionRange { min: 0, max: 0 },
ApiKey::OffsetDeleteKey => VersionRange { min: 0, max: 0 },
ApiKey::DescribeClientQuotasKey => VersionRange { min: 0, max: 1 },
ApiKey::AlterClientQuotasKey => VersionRange { min: 0, max: 1 },
ApiKey::DescribeUserScramCredentialsKey => VersionRange { min: 0, max: 0 },
ApiKey::AlterUserScramCredentialsKey => VersionRange { min: 0, max: 0 },
ApiKey::VoteKey => VersionRange { min: 0, max: 0 },
ApiKey::BeginQuorumEpochKey => VersionRange { min: 0, max: 0 },
ApiKey::EndQuorumEpochKey => VersionRange { min: 0, max: 0 },
ApiKey::DescribeQuorumKey => VersionRange { min: 0, max: 1 },
ApiKey::AlterPartitionKey => VersionRange { min: 0, max: 3 },
ApiKey::UpdateFeaturesKey => VersionRange { min: 0, max: 1 },
ApiKey::EnvelopeKey => VersionRange { min: 0, max: 0 },
ApiKey::FetchSnapshotKey => VersionRange { min: 0, max: 0 },
ApiKey::DescribeClusterKey => VersionRange { min: 0, max: 1 },
ApiKey::DescribeProducersKey => VersionRange { min: 0, max: 0 },
ApiKey::BrokerRegistrationKey => VersionRange { min: 0, max: 3 },
ApiKey::BrokerHeartbeatKey => VersionRange { min: 0, max: 1 },
ApiKey::UnregisterBrokerKey => VersionRange { min: 0, max: 0 },
ApiKey::DescribeTransactionsKey => VersionRange { min: 0, max: 0 },
ApiKey::ListTransactionsKey => VersionRange { min: 0, max: 1 },
ApiKey::AllocateProducerIdsKey => VersionRange { min: 0, max: 0 },
ApiKey::ConsumerGroupHeartbeatKey => VersionRange { min: 0, max: 0 },
ApiKey::ConsumerGroupDescribeKey => VersionRange { min: 0, max: 0 },
ApiKey::ControllerRegistrationKey => VersionRange { min: 0, max: 0 },
ApiKey::GetTelemetrySubscriptionsKey => VersionRange { min: 0, max: 0 },
ApiKey::PushTelemetryKey => VersionRange { min: 0, max: 0 },
ApiKey::AssignReplicasToDirsKey => VersionRange { min: 0, max: 0 },
ApiKey::ListClientMetricsResourcesKey => VersionRange { min: 0, max: 0 },
ApiKey::DescribeTopicPartitionsKey => VersionRange { min: 0, max: 0 },
}
}

/// Iterate through every ApiKey variant in the order of the internal code.
pub fn iterate_all() -> impl Iterator<Item = ApiKey> {
(0..i16::MAX).map_while(|i| ApiKey::try_from(i).ok())
}
}
impl TryFrom<i16> for ApiKey {
type Error = ();
Expand Down

0 comments on commit 9a8d839

Please sign in to comment.