Skip to content

Commit

Permalink
chore(connector): remove unnecessary Serialize for connector config (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Nov 22, 2023
1 parent 11011b1 commit 32717b4
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 20 deletions.
16 changes: 8 additions & 8 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use pulsar::{Authentication, Pulsar, TokioExecutor};
use rdkafka::ClientConfig;
use risingwave_common::error::ErrorCode::InvalidParameterValue;
use risingwave_common::error::{anyhow_error, RwError};
use serde_derive::{Deserialize, Serialize};
use serde_derive::Deserialize;
use serde_with::json::JsonString;
use serde_with::{serde_as, DisplayFromStr};
use tempfile::NamedTempFile;
Expand All @@ -44,7 +44,7 @@ use crate::source::nats::source::NatsOffset;
pub const BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints";
pub const PRIVATE_LINK_TARGETS_KEY: &str = "privatelink.targets";

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct AwsPrivateLinkItem {
pub az_id: Option<String>,
pub port: u16,
Expand All @@ -57,7 +57,7 @@ use aws_types::region::Region;
use aws_types::SdkConfig;

/// A flatten config map for aws auth.
#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct AwsAuthProps {
pub region: Option<String>,
#[serde(alias = "endpoint_url")]
Expand Down Expand Up @@ -143,7 +143,7 @@ impl AwsAuthProps {
}

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, WithOptions)]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct KafkaCommon {
#[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")]
pub brokers: String,
Expand Down Expand Up @@ -229,7 +229,7 @@ const fn default_kafka_sync_call_timeout() -> Duration {
}

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, WithOptions)]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct RdKafkaPropertiesCommon {
/// Maximum Kafka protocol request message size. Due to differing framing overhead between
/// protocol versions the producer is unable to reliably enforce a strict max message limit at
Expand Down Expand Up @@ -434,7 +434,7 @@ impl PulsarCommon {
}
}

#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct KinesisCommon {
#[serde(rename = "stream", alias = "kinesis.stream.name")]
pub stream_name: String,
Expand Down Expand Up @@ -486,7 +486,7 @@ impl KinesisCommon {
Ok(KinesisClient::from_conf(builder.build()))
}
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Deserialize)]
pub struct UpsertMessage<'a> {
#[serde(borrow)]
pub primary_key: Cow<'a, [u8]>,
Expand All @@ -495,7 +495,7 @@ pub struct UpsertMessage<'a> {
}

#[serde_as]
#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct NatsCommon {
#[serde(rename = "server_url")]
pub server_url: String,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/schema/schema_registry/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use reqwest::Method;
use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
use risingwave_common::error::{Result, RwError};
use serde::de::DeserializeOwned;
use serde_derive::{Deserialize, Serialize};
use serde_derive::Deserialize;
use url::{ParseError, Url};

pub fn handle_sr_list(addr: &str) -> Result<Vec<Url>> {
Expand Down Expand Up @@ -155,7 +155,7 @@ pub struct Subject {

/// One schema can reference another schema
/// (e.g., import "other.proto" in protobuf)
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Deserialize)]
pub struct SchemaReference {
/// The name of the reference.
pub name: String,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::types::DataType;
use serde_derive::{Deserialize, Serialize};
use serde_derive::Deserialize;
use serde_json::Value;
use serde_with::serde_as;
use url::Url;
Expand All @@ -46,7 +46,7 @@ use crate::sink::{
pub const BIGQUERY_SINK: &str = "bigquery";
const BIGQUERY_INSERT_MAX_NUMS: usize = 1024;

#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct BigQueryCommon {
#[serde(rename = "bigquery.local.path")]
pub local_path: Option<String>,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const QUERY_COLUMN: &str =
pub const CLICKHOUSE_SINK: &str = "clickhouse";
const BUFFER_SIZE: usize = 1024;

#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct ClickHouseCommon {
#[serde(rename = "clickhouse.url")]
pub url: String,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkW

pub const DORIS_SINK: &str = "doris";

#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct DorisCommon {
#[serde(rename = "doris.url")]
pub url: String,
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use rdkafka::types::RDKafkaErrorCode;
use rdkafka::ClientConfig;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use serde_derive::{Deserialize, Serialize};
use serde_derive::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use strum_macros::{Display, EnumString};
use with_options::WithOptions;
Expand Down Expand Up @@ -65,7 +65,7 @@ const fn _default_max_in_flight_requests_per_connection() -> usize {
5
}

#[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize, EnumString)]
#[derive(Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
#[strum(serialize_all = "snake_case")]
enum CompressionCodec {
None,
Expand All @@ -78,7 +78,7 @@ enum CompressionCodec {
/// See <https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>
/// for the detailed meaning of these librdkafka producer properties
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, WithOptions)]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct RdKafkaPropertiesProducer {
/// Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics.
#[serde(rename = "properties.allow.auto.create.topics")]
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use redis::aio::Connection;
use redis::{Client as RedisClient, Pipeline};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use serde_derive::{Deserialize, Serialize};
use serde_derive::Deserialize;
use serde_with::serde_as;
use with_options::WithOptions;

Expand All @@ -40,7 +40,7 @@ pub const REDIS_SINK: &str = "redis";
pub const KEY_FORMAT: &str = "key_format";
pub const VALUE_FORMAT: &str = "value_format";

#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct RedisCommon {
#[serde(rename = "redis.url")]
pub url: String,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const STARROCK_MYSQL_PREFER_SOCKET: &str = "false";
const STARROCK_MYSQL_MAX_ALLOWED_PACKET: usize = 1024;
const STARROCK_MYSQL_WAIT_TIMEOUT: usize = 28800;

#[derive(Deserialize, Serialize, Debug, Clone)]
#[derive(Deserialize, Debug, Clone)]
pub struct StarrocksCommon {
#[serde(rename = "starrocks.host")]
pub host: String,
Expand Down

0 comments on commit 32717b4

Please sign in to comment.