Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

errors: Narrow return error type of Session::[query/execute]_iter #1191

Merged
merged 3 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions scylla/src/client/caching_session.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::batch::{Batch, BatchStatement};
use crate::errors::{ExecutionError, PrepareError};
use crate::errors::{ExecutionError, PagerExecutionError, PrepareError};
use crate::prepared_statement::PreparedStatement;
use crate::query::Query;
use crate::response::query_result::QueryResult;
Expand Down Expand Up @@ -108,7 +108,7 @@ where
&self,
query: impl Into<Query>,
values: impl SerializeRow,
) -> Result<QueryPager, ExecutionError> {
) -> Result<QueryPager, PagerExecutionError> {
let query = query.into();
let prepared = self.add_prepared_statement_owned(query).await?;
self.session.execute_iter(prepared, values).await
Expand Down
15 changes: 6 additions & 9 deletions scylla/src/client/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ impl QueryPager {
execution_profile: Arc<ExecutionProfileInner>,
cluster_state: Arc<ClusterState>,
metrics: Arc<Metrics>,
) -> Result<Self, NextRowError> {
) -> Result<Self, NextPageError> {
let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);

let consistency = query
Expand Down Expand Up @@ -743,7 +743,7 @@ impl QueryPager {

pub(crate) async fn new_for_prepared_statement(
config: PreparedIteratorConfig,
) -> Result<Self, NextRowError> {
) -> Result<Self, NextPageError> {
let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);

let consistency = config
Expand Down Expand Up @@ -866,7 +866,7 @@ impl QueryPager {
connection: Arc<Connection>,
consistency: Consistency,
serial_consistency: Option<SerialConsistency>,
) -> Result<Self, NextRowError> {
) -> Result<Self, NextPageError> {
let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);

let page_size = query.get_validated_page_size();
Expand Down Expand Up @@ -896,7 +896,7 @@ impl QueryPager {
connection: Arc<Connection>,
consistency: Consistency,
serial_consistency: Option<SerialConsistency>,
) -> Result<Self, NextRowError> {
) -> Result<Self, NextPageError> {
let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);

let page_size = prepared.get_validated_page_size();
Expand Down Expand Up @@ -924,7 +924,7 @@ impl QueryPager {
async fn new_from_worker_future(
worker_task: impl Future<Output = PageSendAttemptedProof> + Send + 'static,
mut receiver: mpsc::Receiver<Result<ReceivedPage, NextPageError>>,
) -> Result<Self, NextRowError> {
) -> Result<Self, NextPageError> {
tokio::task::spawn(worker_task);

// This unwrap is safe because:
Expand All @@ -933,10 +933,7 @@ impl QueryPager {
// - That future is polled in a tokio::task which isn't going to be
// cancelled
let page_received = receiver.recv().await.unwrap()?;
let raw_rows_with_deserialized_metadata =
page_received.rows.deserialize_metadata().map_err(|err| {
NextRowError::NextPageError(NextPageError::ResultMetadataParseError(err))
})?;
let raw_rows_with_deserialized_metadata = page_received.rows.deserialize_metadata()?;

Ok(Self {
current_page: RawRowLendingIterator::new(raw_rows_with_deserialized_metadata),
Expand Down
18 changes: 9 additions & 9 deletions scylla/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::cluster::node::CloudEndpoint;
use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
use crate::errors::{
BadQuery, ExecutionError, MetadataError, NewSessionError, PrepareError, ProtocolError,
RequestAttemptError, RequestError, TracingError, UseKeyspaceError,
BadQuery, ExecutionError, MetadataError, NewSessionError, PagerExecutionError, PrepareError,
ProtocolError, RequestAttemptError, RequestError, TracingError, UseKeyspaceError,
};
use crate::frame::response::result;
#[cfg(feature = "ssl")]
Expand Down Expand Up @@ -511,7 +511,7 @@ impl Session {
&self,
query: impl Into<Query>,
values: impl SerializeRow,
) -> Result<QueryPager, ExecutionError> {
) -> Result<QueryPager, PagerExecutionError> {
self.do_query_iter(query.into(), values).await
}

Expand Down Expand Up @@ -675,7 +675,7 @@ impl Session {
&self,
prepared: impl Into<PreparedStatement>,
values: impl SerializeRow,
) -> Result<QueryPager, ExecutionError> {
) -> Result<QueryPager, PagerExecutionError> {
self.do_execute_iter(prepared.into(), values).await
}

Expand Down Expand Up @@ -1035,7 +1035,7 @@ impl Session {
&self,
query: Query,
values: impl SerializeRow,
) -> Result<QueryPager, ExecutionError> {
) -> Result<QueryPager, PagerExecutionError> {
let execution_profile = query
.get_execution_profile_handle()
.unwrap_or_else(|| self.get_default_execution_profile_handle())
Expand All @@ -1049,7 +1049,7 @@ impl Session {
self.metrics.clone(),
)
.await
.map_err(ExecutionError::from)
.map_err(PagerExecutionError::NextPageError)
} else {
// Making QueryPager::new_for_query work with values is too hard (if even possible)
// so instead of sending one prepare to a specific connection on each iterator query,
Expand All @@ -1064,7 +1064,7 @@ impl Session {
metrics: self.metrics.clone(),
})
.await
.map_err(ExecutionError::from)
.map_err(PagerExecutionError::NextPageError)
}
}

Expand Down Expand Up @@ -1307,7 +1307,7 @@ impl Session {
&self,
prepared: PreparedStatement,
values: impl SerializeRow,
) -> Result<QueryPager, ExecutionError> {
) -> Result<QueryPager, PagerExecutionError> {
let serialized_values = prepared.serialize_values(&values)?;

let execution_profile = prepared
Expand All @@ -1323,7 +1323,7 @@ impl Session {
metrics: self.metrics.clone(),
})
.await
.map_err(ExecutionError::from)
.map_err(PagerExecutionError::NextPageError)
}

async fn do_batch(
Expand Down
26 changes: 18 additions & 8 deletions scylla/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use scylla_cql::{

use thiserror::Error;

use crate::client::pager::NextPageError;
use crate::{authentication::AuthError, frame::response};

use crate::client::pager::NextRowError;
Expand Down Expand Up @@ -110,14 +111,6 @@ pub enum ExecutionError {
/// 'USE KEYSPACE <>' request failed.
#[error("'USE KEYSPACE <>' request failed: {0}")]
UseKeyspaceError(#[from] UseKeyspaceError),

// TODO: This should not belong here, but it requires changes to error types
// returned in async iterator API. This should be handled in separate PR.
// The reason this needs to be included is that topology.rs makes use of iter API and returns ExecutionError.
// Once iter API is adjusted, we can then adjust errors returned by topology module (e.g. refactor MetadataError and not include it in ExecutionError).
/// An error occurred during async iteration over rows of result.
#[error("An error occurred during async iteration over rows of result: {0}")]
NextRowError(#[from] NextRowError),
}

impl From<SerializationError> for ExecutionError {
Expand Down Expand Up @@ -151,6 +144,23 @@ pub enum PrepareError {
PreparedStatementIdsMismatch,
}

/// An error that occurred during construction of [`QueryPager`][crate::client::pager::QueryPager].
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum PagerExecutionError {
/// Failed to prepare the statement.
#[error("Failed to prepare the statement to be used by the pager: {0}")]
PrepareError(#[from] PrepareError),

/// Failed to serialize statement parameters.
#[error("Failed to serialize statement parameters: {0}")]
SerializationError(#[from] SerializationError),

/// Failed to fetch the first page of the result.
#[error("Failed to fetch the first page of the result: {0}")]
NextPageError(#[from] NextPageError),
}

/// Error that occurred during session creation
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
Expand Down
2 changes: 2 additions & 0 deletions scylla/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,7 @@ impl Connection {

QueryPager::new_for_connection_query_iter(query, self, consistency, serial_consistency)
.await
.map_err(NextRowError::NextPageError)
}

/// Executes a prepared statements and fetches its results over multiple pages, using
Expand All @@ -1039,6 +1040,7 @@ impl Connection {
serial_consistency,
)
.await
.map_err(NextRowError::NextPageError)
}

#[allow(dead_code)]
Expand Down