Skip to content

Commit

Permalink
Merge pull request #1168 from muzarski/internal-request-error
Browse files Browse the repository at this point in the history
errors: rename RequestError and make it private
  • Loading branch information
wprzytula authored Jan 15, 2025
2 parents 7ac4d45 + f480b0e commit 36a6775
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 37 deletions.
25 changes: 14 additions & 11 deletions scylla/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,8 +832,8 @@ pub enum BrokenConnectionErrorKind {
KeepaliveTimeout(IpAddr),

/// Driver sent a keepalive request to the database, but request execution failed.
#[error("Failed to execute keepalive query: {0}")]
KeepaliveQueryError(RequestError),
#[error("Failed to execute keepalive request: {0}")]
KeepaliveRequestError(Arc<dyn Error + Sync + Send>),

/// Failed to deserialize response frame header.
#[error("Failed to deserialize frame: {0}")]
Expand Down Expand Up @@ -942,20 +942,20 @@ impl From<response::error::Error> for UserRequestError {
}
}

impl From<RequestError> for UserRequestError {
fn from(value: RequestError) -> Self {
impl From<InternalRequestError> for UserRequestError {
fn from(value: InternalRequestError) -> Self {
match value {
RequestError::CqlRequestSerialization(e) => e.into(),
RequestError::BodyExtensionsParseError(e) => e.into(),
RequestError::CqlResponseParseError(e) => match e {
InternalRequestError::CqlRequestSerialization(e) => e.into(),
InternalRequestError::BodyExtensionsParseError(e) => e.into(),
InternalRequestError::CqlResponseParseError(e) => match e {
// Only possible responses are RESULT and ERROR. If we failed parsing
// other response, treat it as unexpected response.
CqlResponseParseError::CqlErrorParseError(e) => e.into(),
CqlResponseParseError::CqlResultParseError(e) => e.into(),
_ => UserRequestError::UnexpectedResponse(e.to_response_kind()),
},
RequestError::BrokenConnection(e) => e.into(),
RequestError::UnableToAllocStreamId => UserRequestError::UnableToAllocStreamId,
InternalRequestError::BrokenConnection(e) => e.into(),
InternalRequestError::UnableToAllocStreamId => UserRequestError::UnableToAllocStreamId,
}
}
}
Expand All @@ -967,9 +967,12 @@ impl From<RequestError> for UserRequestError {
/// - Response's frame header deserialization error
/// - CQL response (frame body) deserialization error
/// - Driver was unable to allocate a stream id for a request
///
/// This is driver's internal low-level error type. It can occur
/// during any request execution in connection layer.
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum RequestError {
pub(crate) enum InternalRequestError {
/// Failed to serialize CQL request.
#[error("Failed to serialize CQL request: {0}")]
CqlRequestSerialization(#[from] CqlRequestSerializationError),
Expand All @@ -991,7 +994,7 @@ pub enum RequestError {
UnableToAllocStreamId,
}

impl From<ResponseParseError> for RequestError {
impl From<ResponseParseError> for InternalRequestError {
fn from(value: ResponseParseError) -> Self {
match value {
ResponseParseError::BodyExtensionsParseError(e) => e.into(),
Expand Down
54 changes: 28 additions & 26 deletions scylla/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::cluster::NodeAddr;
use crate::errors::{
BadKeyspaceName, BrokenConnectionError, BrokenConnectionErrorKind, ConnectionError,
ConnectionSetupRequestError, ConnectionSetupRequestErrorKind, CqlEventHandlingError, DbError,
ProtocolError, QueryError, RequestError, ResponseParseError, SchemaVersionFetchError,
InternalRequestError, ProtocolError, QueryError, ResponseParseError, SchemaVersionFetchError,
TranslationError, UseKeyspaceProtocolError, UserRequestError,
};
use crate::frame::protocol_features::ProtocolFeatures;
Expand Down Expand Up @@ -116,7 +116,7 @@ impl RouterHandle {
request: &impl SerializableRequest,
compression: Option<Compression>,
tracing: bool,
) -> Result<TaskResponse, RequestError> {
) -> Result<TaskResponse, InternalRequestError> {
let serialized_request = SerializedRequest::make(request, compression, tracing)?;
let request_id = self.allocate_request_id();

Expand Down Expand Up @@ -163,7 +163,7 @@ pub(crate) struct ConnectionFeatures {
type RequestId = u64;

struct ResponseHandler {
response_sender: oneshot::Sender<Result<TaskResponse, RequestError>>,
response_sender: oneshot::Sender<Result<TaskResponse, InternalRequestError>>,
request_id: RequestId,
}

Expand Down Expand Up @@ -531,9 +531,9 @@ impl Connection {
}
},
Err(e) => match e {
RequestError::CqlRequestSerialization(e) => return Err(err(e.into())),
RequestError::BodyExtensionsParseError(e) => return Err(err(e.into())),
RequestError::CqlResponseParseError(e) => match e {
InternalRequestError::CqlRequestSerialization(e) => return Err(err(e.into())),
InternalRequestError::BodyExtensionsParseError(e) => return Err(err(e.into())),
InternalRequestError::CqlResponseParseError(e) => match e {
// Parsing of READY response cannot fail, since its body is empty.
// Remaining valid responses are AUTHENTICATE and ERROR.
CqlResponseParseError::CqlAuthenticateParseError(e) => {
Expand All @@ -546,8 +546,8 @@ impl Connection {
)))
}
},
RequestError::BrokenConnection(e) => return Err(err(e.into())),
RequestError::UnableToAllocStreamId => {
InternalRequestError::BrokenConnection(e) => return Err(err(e.into())),
InternalRequestError::UnableToAllocStreamId => {
return Err(err(ConnectionSetupRequestErrorKind::UnableToAllocStreamId))
}
},
Expand Down Expand Up @@ -579,9 +579,9 @@ impl Connection {
}
},
Err(e) => match e {
RequestError::CqlRequestSerialization(e) => return Err(err(e.into())),
RequestError::BodyExtensionsParseError(e) => return Err(err(e.into())),
RequestError::CqlResponseParseError(e) => match e {
InternalRequestError::CqlRequestSerialization(e) => return Err(err(e.into())),
InternalRequestError::BodyExtensionsParseError(e) => return Err(err(e.into())),
InternalRequestError::CqlResponseParseError(e) => match e {
CqlResponseParseError::CqlSupportedParseError(e) => return Err(err(e.into())),
CqlResponseParseError::CqlErrorParseError(e) => return Err(err(e.into())),
_ => {
Expand All @@ -590,8 +590,8 @@ impl Connection {
)))
}
},
RequestError::BrokenConnection(e) => return Err(err(e.into())),
RequestError::UnableToAllocStreamId => {
InternalRequestError::BrokenConnection(e) => return Err(err(e.into())),
InternalRequestError::UnableToAllocStreamId => {
return Err(err(ConnectionSetupRequestErrorKind::UnableToAllocStreamId))
}
},
Expand Down Expand Up @@ -743,9 +743,9 @@ impl Connection {
}
},
Err(e) => match e {
RequestError::CqlRequestSerialization(e) => return Err(err(e.into())),
RequestError::BodyExtensionsParseError(e) => return Err(err(e.into())),
RequestError::CqlResponseParseError(e) => match e {
InternalRequestError::CqlRequestSerialization(e) => return Err(err(e.into())),
InternalRequestError::BodyExtensionsParseError(e) => return Err(err(e.into())),
InternalRequestError::CqlResponseParseError(e) => match e {
CqlResponseParseError::CqlAuthSuccessParseError(e) => {
return Err(err(e.into()))
}
Expand All @@ -759,8 +759,8 @@ impl Connection {
)))
}
},
RequestError::BrokenConnection(e) => return Err(err(e.into())),
RequestError::UnableToAllocStreamId => {
InternalRequestError::BrokenConnection(e) => return Err(err(e.into())),
InternalRequestError::UnableToAllocStreamId => {
return Err(err(ConnectionSetupRequestErrorKind::UnableToAllocStreamId))
}
},
Expand Down Expand Up @@ -1219,17 +1219,17 @@ impl Connection {
))),
},
Err(e) => match e {
RequestError::CqlRequestSerialization(e) => Err(err(e.into())),
RequestError::BodyExtensionsParseError(e) => Err(err(e.into())),
RequestError::CqlResponseParseError(e) => match e {
InternalRequestError::CqlRequestSerialization(e) => Err(err(e.into())),
InternalRequestError::BodyExtensionsParseError(e) => Err(err(e.into())),
InternalRequestError::CqlResponseParseError(e) => match e {
// Parsing the READY response cannot fail. Only remaining valid response is ERROR.
CqlResponseParseError::CqlErrorParseError(e) => Err(err(e.into())),
_ => Err(err(ConnectionSetupRequestErrorKind::UnexpectedResponse(
e.to_response_kind(),
))),
},
RequestError::BrokenConnection(e) => Err(err(e.into())),
RequestError::UnableToAllocStreamId => {
InternalRequestError::BrokenConnection(e) => Err(err(e.into())),
InternalRequestError::UnableToAllocStreamId => {
Err(err(ConnectionSetupRequestErrorKind::UnableToAllocStreamId))
}
},
Expand Down Expand Up @@ -1260,7 +1260,7 @@ impl Connection {
compress: bool,
tracing: bool,
cached_metadata: Option<&Arc<ResultMetadata<'static>>>,
) -> Result<QueryResponse, RequestError> {
) -> Result<QueryResponse, InternalRequestError> {
let compression = if compress {
self.config.compression
} else {
Expand Down Expand Up @@ -1503,7 +1503,7 @@ impl Connection {
error!("Could not allocate stream id");
let _ = response_handler
.response_sender
.send(Err(RequestError::UnableToAllocStreamId));
.send(Err(InternalRequestError::UnableToAllocStreamId));
None
}
}
Expand Down Expand Up @@ -1608,7 +1608,9 @@ impl Connection {
.send_request(&Options, None, false)
.await
.map(|_| ())
.map_err(|q_err| BrokenConnectionErrorKind::KeepaliveQueryError(q_err).into())
.map_err(|req_err| {
BrokenConnectionErrorKind::KeepaliveRequestError(Arc::new(req_err)).into()
})
}

if let Some(keepalive_interval) = keepalive_interval {
Expand Down

0 comments on commit 36a6775

Please sign in to comment.