Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
hseeberger committed Oct 11, 2023
1 parent 82d1d74 commit 5118814
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 35 deletions.
33 changes: 17 additions & 16 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,20 @@ documentation = "https://github.com/hseeberger/pub-sub-client"
publish = true

[workspace.dependencies]
anyhow = { version = "1.0" }
base64 = { version = "0.13" }
goauth = { version = "0.13" }
proc-macro2 = { version = "1.0" }
quote = { version = "1.0" }
reqwest = { version = "0.11" }
serde = { version = "1.0" }
serde_json = { version = "1.0" }
smpl_jwt = { version = "0.7" }
syn = { version = "1.0" }
testcontainers = { version = "0.14" }
thiserror = { version = "1.0" }
time = { version = "0.3" }
tokio = { version = "1.21" }
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3" }
anyhow = { version = "1.0" }
base64 = { version = "0.21" }
goauth = { version = "0.13" }
proc-macro2 = { version = "1.0" }
quote = { version = "1.0" }
reqwest = { version = "0.11" }
serde = { version = "1.0" }
serde_json = { version = "1.0" }
smpl_jwt = { version = "0.7" }
syn = { version = "2.0" }
testcontainers = { version = "0.15" }
testcontainers-modules = { version = "0.1", features = [ "google_cloud_sdk_emulators" ] }
thiserror = { version = "1.0" }
time = { version = "0.3" }
tokio = { version = "1" }
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3" }
19 changes: 10 additions & 9 deletions pub-sub-client-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ documentation = { workspace = true }
publish = false

[dev-dependencies]
pub-sub-client = { version = "=0.11.1-alpha.0", path = "../pub-sub-client", features = [ "derive" ] }
anyhow = { workspace = true }
base64 = { workspace = true }
reqwest = { workspace = true, features = [ "json" ] }
serde = { workspace = true, features = [ "derive" ] }
serde_json = { workspace = true }
testcontainers = { workspace = true }
tokio = { workspace = true, features = [ "full" ] }
tracing-subscriber = { workspace = true, features = [ "env-filter", "fmt", "json", "tracing-log" ] }
pub-sub-client = { version = "=0.11.1-alpha.0", path = "../pub-sub-client", features = [ "derive" ] }
anyhow = { workspace = true }
base64 = { workspace = true }
reqwest = { workspace = true, features = [ "json" ] }
serde = { workspace = true, features = [ "derive" ] }
serde_json = { workspace = true }
testcontainers = { workspace = true }
testcontainers-modules = { workspace = true }
tokio = { workspace = true, features = [ "full" ] }
tracing-subscriber = { workspace = true, features = [ "env-filter", "fmt", "json", "tracing-log" ] }
3 changes: 2 additions & 1 deletion pub-sub-client-tests/examples/transform.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::anyhow;
use base64::{engine::general_purpose::STANDARD, Engine};
use pub_sub_client::{
Error, PubSubClient, PublishedMessage, PulledMessage, RawPublishedMessage,
RawPulledMessageEnvelope,
Expand Down Expand Up @@ -40,7 +41,7 @@ async fn run() -> Result<(), Error> {

let messages = vec!["Hello", "from pub-sub-client"]
.iter()
.map(|s| base64::encode(json!({ "text": s }).to_string()))
.map(|s| STANDARD.encode(json!({ "text": s }).to_string()))
.map(|data| {
RawPublishedMessage::new(data)
.with_attributes(HashMap::from([("type".to_string(), "Foo".to_string())]))
Expand Down
9 changes: 4 additions & 5 deletions pub-sub-client-tests/tests/test.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use base64::{engine::general_purpose::STANDARD, Engine};
use pub_sub_client::{PubSubClient, PublishedMessage, RawPublishedMessage};
use reqwest::{Client, StatusCode};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::{collections::HashMap, env, time::Duration, vec};
use testcontainers::{
clients::Cli,
images::google_cloud_sdk_emulators::{CloudSdk, PUBSUB_PORT},
};
use testcontainers::clients::Cli;
use testcontainers_modules::google_cloud_sdk_emulators::{CloudSdk, PUBSUB_PORT};

const PROJECT_ID: &str = "active-road-365118";
const TOPIC_ID: &str = "test";
Expand Down Expand Up @@ -64,7 +63,7 @@ async fn test() {
let pub_sub_client = pub_sub_client.unwrap();

// Publish raw
let foo = base64::encode(json!({ "Foo": { "text": TEXT } }).to_string());
let foo = STANDARD.encode(json!({ "Foo": { "text": TEXT } }).to_string());
let messages = vec![RawPublishedMessage::new(foo)];
let result = pub_sub_client
.publish_raw(TOPIC_ID, messages, Some(Duration::from_secs(10)))
Expand Down
3 changes: 2 additions & 1 deletion pub-sub-client/src/publisher/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{error::Error, PubSubClient};
use base64::{engine::general_purpose::STANDARD, Engine};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fmt::Debug, time::Duration};
use tracing::debug;
Expand Down Expand Up @@ -101,7 +102,7 @@ impl PubSubClient {
.map_err(|source| Error::Serialize { source })?
.into_iter()
.map(|(bytes, attributes)| RawPublishedMessage {
data: Some(base64::encode(bytes)),
data: Some(STANDARD.encode(bytes)),
attributes,
ordering_key,
})
Expand Down
12 changes: 9 additions & 3 deletions pub-sub-client/src/subscriber/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{error::Error, PubSubClient};
use base64::{engine::general_purpose::STANDARD, Engine};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value;
use std::{collections::HashMap, error::Error as StdError, fmt::Debug, time::Duration};
Expand Down Expand Up @@ -164,7 +165,11 @@ where
.data
.as_ref()
.ok_or(Error::NoData)
.and_then(|data| base64::decode(data).map_err(|source| Error::NoBase64 { source }))
.and_then(|data| {
STANDARD
.decode(data)
.map_err(|source| Error::NoBase64 { source })
})
.and_then(|bytes| {
serde_json::from_slice::<Value>(&bytes)
.map_err(|source| Error::Deserialize { source })
Expand Down Expand Up @@ -204,6 +209,7 @@ where
mod tests {
use super::{deserialize, RawPulledMessage, RawPulledMessageEnvelope};
use anyhow::anyhow;
use base64::{engine::general_purpose::STANDARD, Engine};
use serde::Deserialize;
use serde_json::{json, Value};
use std::{collections::HashMap, error::Error as StdError};
Expand All @@ -223,7 +229,7 @@ mod tests {
RawPulledMessageEnvelope {
ack_id: "ack_id".to_string(),
message: RawPulledMessage {
data: Some(base64::encode(json!({"text": "test"}).to_string())),
data: Some(STANDARD.encode(json!({"text": "test"}).to_string())),
attributes: Some(HashMap::from([("type".to_string(), "Foo".to_string())])),
id: "id".to_string(),
publish_time: OffsetDateTime::parse(TIME, &Rfc3339).unwrap(),
Expand All @@ -234,7 +240,7 @@ mod tests {
RawPulledMessageEnvelope {
ack_id: "ack_id".to_string(),
message: RawPulledMessage {
data: Some(base64::encode(json!({"Bar": {"text": "test"}}).to_string())),
data: Some(STANDARD.encode(json!({"Bar": {"text": "test"}}).to_string())),
attributes: Some(HashMap::from([("version".to_string(), "v2".to_string())])),
id: "id".to_string(),
publish_time: OffsetDateTime::parse(TIME, &Rfc3339).unwrap(),
Expand Down

0 comments on commit 5118814

Please sign in to comment.