Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reduce usage of unwraps in messenger #112

Merged
merged 1 commit into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/talos_messenger_actions/src/kafka/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ where

headers.insert(MessengerStateTransitionTimestamps::EndOnCommitActions.to_string(), timestamp);
async move {
publisher.send(version, action, headers, total_len ).await;
if let Err(publish_error) = publisher.send(version, action, headers, total_len ).await {
error!("Failed to publish message for version={version} with error {publish_error}")
}
}
});
join_all(publish_vec).await;
Expand Down
48 changes: 32 additions & 16 deletions packages/talos_messenger_actions/src/messenger_with_kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use ahash::HashMap;
use async_trait::async_trait;
use log::debug;
use rdkafka::producer::ProducerContext;
use talos_certifier::ports::MessageReciever;
use talos_certifier::ports::{errors::MessagePublishError, MessageReciever};
use talos_certifier_adapters::KafkaConsumer;
use talos_messenger_core::{
core::{MessengerPublisher, PublishActionType},
errors::MessengerServiceResult,
errors::{MessengerServiceError, MessengerServiceErrorKind, MessengerServiceResult},
services::{MessengerInboundService, MessengerInboundServiceConfig},
suffix::MessengerCandidate,
talos_messenger_service::TalosMessengerService,
Expand Down Expand Up @@ -37,13 +37,26 @@ where
PublishActionType::Kafka
}

async fn send(&self, version: u64, payload: Self::Payload, headers: HashMap<String, String>, additional_data: Self::AdditionalData) -> () {
async fn send(
&self,
version: u64,
payload: Self::Payload,
headers: HashMap<String, String>,
additional_data: Self::AdditionalData,
) -> Result<(), MessagePublishError> {
debug!("[MessengerKafkaPublisher] Publishing message with payload=\n{payload:#?}");

let mut bytes: Vec<u8> = Vec::new();
serde_json::to_writer(&mut bytes, &payload.value).unwrap();
serde_json::to_writer(&mut bytes, &payload.value).map_err(|e| MessagePublishError {
reason: format!("failed to serialize payload for version={version} with error {e}").to_string(),
data: Some(payload.value.to_string()),
})?;

let payload_str = std::str::from_utf8(&bytes).map_err(|e| MessagePublishError {
reason: format!("failed to parse payload for version={version} with error {e}").to_string(),
data: Some(payload.value.to_string()),
})?;

let payload_str = std::str::from_utf8(&bytes).unwrap();
debug!("[MessengerKafkaPublisher] base_record=\n{payload_str:#?}");

let delivery_opaque = MessengerProducerDeliveryOpaque {
Expand All @@ -57,16 +70,14 @@ where
headers_to_publish.extend(payload_header);
}

self.publisher
.publish_to_topic(
&payload.topic,
payload.partition,
payload.key.as_deref(),
payload_str,
headers_to_publish,
Box::new(delivery_opaque),
)
.unwrap();
self.publisher.publish_to_topic(
&payload.topic,
payload.partition,
payload.key.as_deref(),
payload_str,
headers_to_publish,
Box::new(delivery_opaque),
)
}
}

Expand Down Expand Up @@ -103,7 +114,12 @@ pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResu
let kafka_consumer = KafkaConsumer::new(&config.kafka_config);

// Subscribe to topic.
kafka_consumer.subscribe().await.unwrap();
kafka_consumer.subscribe().await.map_err(|e| MessengerServiceError {
kind: MessengerServiceErrorKind::Messaging,
reason: e.reason,
data: e.data,
service: e.service,
})?;

let ChannelBuffers {
actions_channel,
Expand Down
9 changes: 8 additions & 1 deletion packages/talos_messenger_core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use strum::{Display, EnumIter, EnumString};
use talos_certifier::ports::errors::MessagePublishError;

use crate::errors::{MessengerActionError, MessengerServiceResult};

Expand All @@ -23,7 +24,13 @@ pub trait MessengerPublisher {
type Payload;
type AdditionalData;
fn get_publish_type(&self) -> PublishActionType;
async fn send(&self, version: u64, payload: Self::Payload, headers: HashMap<String, String>, additional_data: Self::AdditionalData) -> ();
async fn send(
&self,
version: u64,
payload: Self::Payload,
headers: HashMap<String, String>,
additional_data: Self::AdditionalData,
) -> Result<(), MessagePublishError>;
}

/// Trait to be implemented by all services.
Expand Down
Loading