diff --git a/scylla-cql/src/frame/request/batch.rs b/scylla-cql/src/frame/request/batch.rs index 5b5c2f84b6..89f07b991b 100644 --- a/scylla-cql/src/frame/request/batch.rs +++ b/scylla-cql/src/frame/request/batch.rs @@ -1,11 +1,13 @@ use bytes::{Buf, BufMut}; use std::{borrow::Cow, convert::TryInto}; -use crate::frame::{ - frame_errors::ParseError, - request::{RequestOpcode, SerializableRequest}, - types::{self, SerialConsistency}, - value::{BatchValues, BatchValuesIterator, LegacySerializedValues}, +use crate::{ + frame::{ + frame_errors::ParseError, + request::{RequestOpcode, SerializableRequest}, + types::{self, SerialConsistency}, + }, + types::serialize::row::SerializedValues, }; use super::DeserializableRequest; @@ -16,18 +18,16 @@ const FLAG_WITH_DEFAULT_TIMESTAMP: u8 = 0x20; const ALL_FLAGS: u8 = FLAG_WITH_SERIAL_CONSISTENCY | FLAG_WITH_DEFAULT_TIMESTAMP; #[cfg_attr(test, derive(Debug, PartialEq, Eq))] -pub struct Batch<'b, Statement, Values> +pub struct Batch<'b, Statement> where BatchStatement<'b>: From<&'b Statement>, Statement: Clone, - Values: BatchValues, { pub statements: Cow<'b, [Statement]>, pub batch_type: BatchType, pub consistency: types::Consistency, pub serial_consistency: Option, pub timestamp: Option, - pub values: Values, } /// The type of a batch. @@ -64,15 +64,20 @@ impl TryFrom for BatchType { #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord)] pub enum BatchStatement<'a> { - Query { text: Cow<'a, str> }, - Prepared { id: Cow<'a, [u8]> }, + Query { + text: Cow<'a, str>, + values: Cow<'a, SerializedValues>, + }, + Prepared { + id: Cow<'a, [u8]>, + values: Cow<'a, SerializedValues>, + }, } -impl SerializableRequest for Batch<'_, Statement, Values> +impl SerializableRequest for Batch<'_, Statement> where for<'s> BatchStatement<'s>: From<&'s Statement>, Statement: Clone, - Values: BatchValues, { const OPCODE: RequestOpcode = RequestOpcode::Batch; @@ -83,36 +88,9 @@ where // Serializing queries types::write_short(self.statements.len().try_into()?, buf); - let counts_mismatch_err = |n_values: usize, n_statements: usize| { - ParseError::BadDataToSerialize(format!( - "Length of provided values must be equal to number of batch statements \ - (got {n_values} values, {n_statements} statements)" - )) - }; - let mut n_serialized_statements = 0usize; - let mut value_lists = self.values.batch_values_iter(); - for (idx, statement) in self.statements.iter().enumerate() { - BatchStatement::from(statement).serialize(buf)?; - value_lists - .write_next_to_request(buf) - .ok_or_else(|| counts_mismatch_err(idx, self.statements.len()))??; - n_serialized_statements += 1; - } - // At this point, we have all statements serialized. If any values are still left, we have a mismatch. - if value_lists.skip_next().is_some() { - return Err(counts_mismatch_err( - n_serialized_statements + 1 /*skipped above*/ + value_lists.count(), - n_serialized_statements, - )); - } - if n_serialized_statements != self.statements.len() { - // We want to check this to avoid propagating an invalid construction of self.statements_count as a - // hard-to-debug silent fail - return Err(ParseError::BadDataToSerialize(format!( - "Invalid Batch constructed: not as many statements serialized as announced \ - (batch.statement_count: {announced_statement_count}, {n_serialized_statements}", - announced_statement_count = self.statements.len() - ))); + for statement in self.statements.iter() { + let stmt = BatchStatement::from(statement); + stmt.serialize(buf)?; } // Serializing consistency @@ -146,11 +124,13 @@ impl BatchStatement<'_> { match kind { 0 => { let text = Cow::Owned(types::read_long_string(buf)?.to_owned()); - Ok(BatchStatement::Query { text }) + let values = Cow::Owned(SerializedValues::new_from_frame(buf)?); + Ok(BatchStatement::Query { text, values }) } 1 => { let id = types::read_short_bytes(buf)?.to_vec().into(); - Ok(BatchStatement::Prepared { id }) + let values = Cow::Owned(SerializedValues::new_from_frame(buf)?); + Ok(BatchStatement::Prepared { id, values }) } _ => Err(ParseError::BadIncomingData(format!( "Unexpected batch statement kind: {}", @@ -163,13 +143,15 @@ impl BatchStatement<'_> { impl BatchStatement<'_> { fn serialize(&self, buf: &mut impl BufMut) -> Result<(), ParseError> { match self { - Self::Query { text } => { + Self::Query { text, values } => { buf.put_u8(0); types::write_long_string(text, buf)?; + values.write_to_request(buf); } - Self::Prepared { id } => { + Self::Prepared { id, values } => { buf.put_u8(1); types::write_short_bytes(id, buf)?; + values.write_to_request(buf); } } @@ -180,25 +162,28 @@ impl BatchStatement<'_> { impl<'s, 'b> From<&'s BatchStatement<'b>> for BatchStatement<'s> { fn from(value: &'s BatchStatement) -> Self { match value { - BatchStatement::Query { text } => BatchStatement::Query { text: text.clone() }, - BatchStatement::Prepared { id } => BatchStatement::Prepared { id: id.clone() }, + BatchStatement::Query { text, values } => BatchStatement::Query { + text: text.clone(), + values: values.clone(), + }, + BatchStatement::Prepared { id, values } => BatchStatement::Prepared { + id: id.clone(), + values: values.clone(), + }, } } } -impl<'b> DeserializableRequest for Batch<'b, BatchStatement<'b>, Vec> { +impl<'b> DeserializableRequest for Batch<'b, BatchStatement<'b>> { fn deserialize(buf: &mut &[u8]) -> Result { let batch_type = buf.get_u8().try_into()?; let statements_count: usize = types::read_short(buf)?.into(); - let statements_with_values = (0..statements_count) + let statements = (0..statements_count) .map(|_| { let batch_statement = BatchStatement::deserialize(buf)?; - // As stated in CQL protocol v4 specification, values names in Batch are broken and should be never used. - let values = LegacySerializedValues::new_from_frame(buf, false)?; - - Ok((batch_statement, values)) + Ok(batch_statement) }) .collect::, ParseError>>()?; @@ -233,16 +218,12 @@ impl<'b> DeserializableRequest for Batch<'b, BatchStatement<'b>, Vec, Vec) = - statements_with_values.into_iter().unzip(); - Ok(Self { + statements: Cow::Owned(statements), batch_type, consistency, serial_consistency, timestamp, - statements: Cow::Owned(statements), - values, }) } } diff --git a/scylla-cql/src/frame/request/mod.rs b/scylla-cql/src/frame/request/mod.rs index 6955ca931c..6286cbb8e8 100644 --- a/scylla-cql/src/frame/request/mod.rs +++ b/scylla-cql/src/frame/request/mod.rs @@ -22,7 +22,6 @@ pub use startup::Startup; use self::batch::BatchStatement; use super::types::SerialConsistency; -use super::value::LegacySerializedValues; #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, TryFromPrimitive)] #[repr(u8)] @@ -59,7 +58,7 @@ pub trait DeserializableRequest: SerializableRequest + Sized { pub enum Request<'r> { Query(Query<'r>), Execute(Execute<'r>), - Batch(Batch<'r, BatchStatement<'r>, Vec>), + Batch(Batch<'r, BatchStatement<'r>>), } impl<'r> Request<'r> { @@ -100,7 +99,7 @@ impl<'r> Request<'r> { #[cfg(test)] mod tests { - use std::{borrow::Cow, ops::Deref}; + use std::borrow::Cow; use bytes::Bytes; @@ -176,9 +175,12 @@ mod tests { let statements = vec![ BatchStatement::Query { text: query.contents, + values: query.parameters.values.clone(), }, + // Not execute's values, because named values are not supported in batches. BatchStatement::Prepared { id: Cow::Borrowed(&execute.id), + values: query.parameters.values, }, ]; let batch = Batch { @@ -187,22 +189,6 @@ mod tests { consistency: Consistency::EachQuorum, serial_consistency: Some(SerialConsistency::LocalSerial), timestamp: Some(32432), - - // Not execute's values, because named values are not supported in batches. - values: vec![ - query - .parameters - .values - .deref() - .clone() - .into_old_serialized_values(), - query - .parameters - .values - .deref() - .clone() - .into_old_serialized_values(), - ], }; { let mut buf = Vec::new(); @@ -264,6 +250,7 @@ mod tests { // Batch let statements = vec![BatchStatement::Query { text: query.contents, + values: query.parameters.values, }]; let batch = Batch { statements: Cow::Owned(statements), @@ -271,13 +258,6 @@ mod tests { consistency: Consistency::EachQuorum, serial_consistency: None, timestamp: None, - - values: vec![query - .parameters - .values - .deref() - .clone() - .into_old_serialized_values()], }; { let mut buf = Vec::new(); diff --git a/scylla/src/statement/batch.rs b/scylla/src/statement/batch.rs index 6805dcb275..880ba8acd7 100644 --- a/scylla/src/statement/batch.rs +++ b/scylla/src/statement/batch.rs @@ -1,6 +1,9 @@ use std::borrow::Cow; use std::sync::Arc; +use scylla_cql::types::serialize::row::{SerializeRow, SerializedValues}; +use scylla_cql::types::serialize::SerializationError; + use crate::history::HistoryListener; use crate::retry_policy::RetryPolicy; use crate::statement::{prepared_statement::PreparedStatement, query::Query}; @@ -17,7 +20,7 @@ pub use crate::frame::request::batch::BatchType; pub struct Batch { pub(crate) config: StatementConfig, - pub statements: Vec, + pub(crate) statements: Vec, batch_type: BatchType, } @@ -30,18 +33,21 @@ impl Batch { } } - /// Creates a new, empty `Batch` of `batch_type` type with the provided statements. - pub fn new_with_statements(batch_type: BatchType, statements: Vec) -> Self { - Self { - batch_type, - statements, - ..Default::default() - } + pub fn append_query(&mut self, query: impl Into) { + self.statements.push(BatchStatement::Query(query.into())); } - /// Appends a new statement to the batch. - pub fn append_statement(&mut self, statement: impl Into) { - self.statements.push(statement.into()); + pub fn append_statement( + &mut self, + statement: PreparedStatement, + values: impl SerializeRow, + ) -> Result<(), SerializationError> { + let serialized = statement.serialize_values(&values)?; + self.statements.push(BatchStatement::PreparedStatement { + statement, + values: serialized, + }); + Ok(()) } /// Gets type of batch. @@ -156,27 +162,12 @@ impl Default for Batch { /// This enum represents a CQL statement, that can be part of batch. #[derive(Clone)] -pub enum BatchStatement { +pub(crate) enum BatchStatement { Query(Query), - PreparedStatement(PreparedStatement), -} - -impl From<&str> for BatchStatement { - fn from(s: &str) -> Self { - BatchStatement::Query(Query::from(s)) - } -} - -impl From for BatchStatement { - fn from(q: Query) -> Self { - BatchStatement::Query(q) - } -} - -impl From for BatchStatement { - fn from(p: PreparedStatement) -> Self { - BatchStatement::PreparedStatement(p) - } + PreparedStatement { + statement: PreparedStatement, + values: SerializedValues, + }, } impl<'a: 'b, 'b> From<&'a BatchStatement> @@ -187,11 +178,13 @@ impl<'a: 'b, 'b> From<&'a BatchStatement> BatchStatement::Query(query) => { scylla_cql::frame::request::batch::BatchStatement::Query { text: Cow::Borrowed(&query.contents), + values: Cow::Owned(SerializedValues::new()), } } - BatchStatement::PreparedStatement(prepared) => { + BatchStatement::PreparedStatement { statement, values } => { scylla_cql::frame::request::batch::BatchStatement::Prepared { - id: Cow::Borrowed(prepared.get_id()), + id: Cow::Borrowed(statement.get_id()), + values: Cow::Borrowed(values), } } } diff --git a/scylla/src/transport/caching_session.rs b/scylla/src/transport/caching_session.rs index 034ec8793a..16e69b0090 100644 --- a/scylla/src/transport/caching_session.rs +++ b/scylla/src/transport/caching_session.rs @@ -1,5 +1,4 @@ use crate::batch::{Batch, BatchStatement}; -use crate::frame::value::BatchValues; use crate::prepared_statement::PreparedStatement; use crate::query::Query; use crate::transport::errors::QueryError; @@ -10,7 +9,7 @@ use bytes::Bytes; use dashmap::DashMap; use futures::future::try_join_all; use scylla_cql::frame::response::result::PreparedMetadata; -use scylla_cql::types::serialize::row::SerializeRow; +use scylla_cql::types::serialize::row::{SerializeRow, SerializedValues}; use std::collections::hash_map::RandomState; use std::hash::BuildHasher; @@ -105,22 +104,18 @@ where /// Does the same thing as [`Session::batch`] but uses the prepared statement cache\ /// Prepares batch using CachingSession::prepare_batch if needed and then executes it - pub async fn batch( - &self, - batch: &Batch, - values: impl BatchValues, - ) -> Result { + pub async fn batch(&self, batch: &Batch) -> Result { let all_prepared: bool = batch .statements .iter() - .all(|stmt| matches!(stmt, BatchStatement::PreparedStatement(_))); + .all(|stmt| matches!(stmt, BatchStatement::PreparedStatement { .. })); if all_prepared { - self.session.batch(batch, &values).await + self.session.batch(batch).await } else { let prepared_batch: Batch = self.prepare_batch(batch).await?; - self.session.batch(&prepared_batch, &values).await + self.session.batch(&prepared_batch).await } } @@ -137,7 +132,10 @@ where .map(|statement| async move { if let BatchStatement::Query(query) = statement { let prepared = self.add_prepared_statement(&*query).await?; - *statement = BatchStatement::PreparedStatement(prepared); + *statement = BatchStatement::PreparedStatement { + statement: prepared, + values: SerializedValues::new(), + }; } Ok::<(), QueryError>(()) }), @@ -449,7 +447,7 @@ mod tests { let assert_batch_prepared = |b: &Batch| { for stmt in &b.statements { match stmt { - BatchStatement::PreparedStatement(_) => {} + BatchStatement::PreparedStatement { .. } => {} _ => panic!("Unprepared statement in prepared batch!"), } } diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 0c177bcc54..13e6a12d60 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -27,7 +27,7 @@ pub(crate) use ssl_config::SslConfig; use crate::authentication::AuthenticatorProvider; use scylla_cql::frame::response::authenticate::Authenticate; -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap}; use std::convert::TryFrom; use std::io::ErrorKind; use std::net::{IpAddr, SocketAddr}; @@ -53,7 +53,6 @@ use crate::frame::{ request::{self, batch, execute, query, register, SerializableRequest}, response::{event::Event, result, NonErrorResponse, Response, ResponseOpcode}, server_event_type::EventType, - value::{BatchValues, BatchValuesIterator}, FrameParams, SerializedRequest, }; use crate::query::Query; @@ -760,14 +759,9 @@ impl Connection { } #[allow(dead_code)] - pub(crate) async fn batch( - &self, - batch: &Batch, - values: impl BatchValues, - ) -> Result { + pub(crate) async fn batch(&self, batch: &Batch) -> Result { self.batch_with_consistency( batch, - values, batch .config .determine_consistency(self.config.default_consistency), @@ -778,16 +772,12 @@ impl Connection { pub(crate) async fn batch_with_consistency( &self, - init_batch: &Batch, - values: impl BatchValues, + batch: &Batch, consistency: Consistency, serial_consistency: Option, ) -> Result { - let batch = self.prepare_batch(init_batch, &values).await?; - let batch_frame = batch::Batch { statements: Cow::Borrowed(&batch.statements), - values, batch_type: batch.get_type(), consistency, serial_consistency, @@ -804,8 +794,10 @@ impl Connection { DbError::Unprepared { statement_id } => { debug!("Connection::batch: got DbError::Unprepared - repreparing statement with id {:?}", statement_id); let prepared_statement = batch.statements.iter().find_map(|s| match s { - BatchStatement::PreparedStatement(s) if *s.get_id() == statement_id => { - Some(s) + BatchStatement::PreparedStatement { statement, .. } + if *statement.get_id() == statement_id => + { + Some(statement) } _ => None, }); @@ -828,58 +820,6 @@ impl Connection { } } - async fn prepare_batch<'b>( - &self, - init_batch: &'b Batch, - values: impl BatchValues, - ) -> Result, QueryError> { - let mut to_prepare = HashSet::<&str>::new(); - - { - let mut values_iter = values.batch_values_iter(); - for stmt in &init_batch.statements { - if let BatchStatement::Query(query) = stmt { - let value = values_iter.next_serialized().transpose()?; - if let Some(v) = value { - if v.len() > 0 { - to_prepare.insert(&query.contents); - } - } - } else { - values_iter.skip_next(); - } - } - } - - if to_prepare.is_empty() { - return Ok(Cow::Borrowed(init_batch)); - } - - let mut prepared_queries = HashMap::<&str, PreparedStatement>::new(); - - for query in &to_prepare { - let prepared = self.prepare(&Query::new(query.to_string())).await?; - prepared_queries.insert(query, prepared); - } - - let mut batch: Cow = Cow::Owned(Default::default()); - batch.to_mut().config = init_batch.config.clone(); - for stmt in &init_batch.statements { - match stmt { - BatchStatement::Query(query) => match prepared_queries.get(query.contents.as_str()) - { - Some(prepared) => batch.to_mut().append_statement(prepared.clone()), - None => batch.to_mut().append_statement(query.clone()), - }, - BatchStatement::PreparedStatement(prepared) => { - batch.to_mut().append_statement(prepared.clone()); - } - } - } - - Ok(batch) - } - pub(crate) async fn use_keyspace( &self, keyspace_name: &VerifiedKeyspaceName, diff --git a/scylla/src/transport/large_batch_statements_test.rs b/scylla/src/transport/large_batch_statements_test.rs index 29482e31ce..2c86c91b30 100644 --- a/scylla/src/transport/large_batch_statements_test.rs +++ b/scylla/src/transport/large_batch_statements_test.rs @@ -52,7 +52,6 @@ async fn create_test_session(session: Session, ks: &String) -> Session { async fn write_batch(session: &Session, n: usize, ks: &String) -> Result { let mut batch_query = Batch::new(BatchType::Unlogged); - let mut batch_values = Vec::new(); let query = format!("INSERT INTO {}.pairs (dummy, k, v) VALUES (0, ?, ?)", ks); let query = Query::new(query); let prepared_statement = session.prepare(query).await.unwrap(); @@ -61,8 +60,7 @@ async fn write_batch(session: &Session, n: usize, ks: &String) -> Result Result { + pub async fn batch(&self, batch: &Batch) -> Result { // Shard-awareness behavior for batch will be to pick shard based on first batch statement's shard // If users batch statements by shard, they will be rewarded with full shard awareness @@ -1180,9 +1175,6 @@ impl Session { BadQuery::TooManyQueriesInBatchStatement(batch_statements_length), )); } - // Extract first serialized_value - let first_serialized_value = values.batch_values_iter().next_serialized().transpose()?; - let first_serialized_value = first_serialized_value.as_deref(); let execution_profile = batch .get_execution_profile_handle() @@ -1199,13 +1191,19 @@ impl Session { .serial_consistency .unwrap_or(execution_profile.serial_consistency); - let statement_info = match (first_serialized_value, batch.statements.first()) { - (Some(first_serialized_value), Some(BatchStatement::PreparedStatement(ps))) => { + let statement_info = match batch.statements.first() { + Some(BatchStatement::PreparedStatement { statement, values }) => { + let token = statement + .extract_partition_key_and_calculate_token( + statement.get_partitioner_name(), + &values, + )? + .map(|(_, t)| t); RoutingInfo { consistency, serial_consistency, - token: ps.calculate_token(first_serialized_value)?, - keyspace: ps.get_keyspace_name(), + token, + keyspace: statement.get_keyspace_name(), is_confirmed_lwt: false, } } @@ -1217,11 +1215,6 @@ impl Session { }; let first_value_token = statement_info.token; - // Reuse first serialized value when serializing query, and delegate to `BatchValues::write_next_to_request` - // directly for others (if they weren't already serialized, possibly don't even allocate the `LegacySerializedValues`) - let values = BatchValuesFirstSerialized::new(&values, first_serialized_value); - let values_ref = &values; - let span = RequestSpan::new_batch(); let run_query_result = self @@ -1246,12 +1239,7 @@ impl Session { .unwrap_or(execution_profile.serial_consistency); async move { connection - .batch_with_consistency( - batch, - values_ref, - consistency, - serial_consistency, - ) + .batch_with_consistency(batch, consistency, serial_consistency) .await } }, @@ -1305,7 +1293,10 @@ impl Session { .map(|statement| async move { if let BatchStatement::Query(query) = statement { let prepared = self.prepare(query.clone()).await?; - *statement = BatchStatement::PreparedStatement(prepared); + *statement = BatchStatement::PreparedStatement { + statement: prepared, + values: SerializedValues::new(), + }; } Ok::<(), QueryError>(()) }), diff --git a/scylla/tests/integration/consistency.rs b/scylla/tests/integration/consistency.rs index e265265335..e3d42cd227 100644 --- a/scylla/tests/integration/consistency.rs +++ b/scylla/tests/integration/consistency.rs @@ -9,7 +9,6 @@ use scylla::test_utils::unique_keyspace_name; use scylla::transport::session::Session; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; -use scylla::statement::batch::BatchStatement; use scylla::statement::query::Query; use scylla::{ batch::{Batch, BatchType}, @@ -100,7 +99,7 @@ async fn batch_consistency_set_directly( let mut batch = batch.clone(); batch.set_consistency(c); batch.set_serial_consistency(sc); - session.batch(&batch, ((1,),)).await.unwrap(); + session.batch(&batch).await.unwrap(); } // The following functions perform a request with consistencies set on a per-statement execution profile. @@ -133,7 +132,7 @@ async fn batch_consistency_set_on_exec_profile( ) { let mut batch = batch.clone(); batch.set_execution_profile_handle(Some(profile)); - session.batch(&batch, ((1,),)).await.unwrap(); + session.batch(&batch).await.unwrap(); } // For all consistencies (as defined by `pairs_of_all_consistencies()`) and every method of setting consistencies @@ -166,10 +165,8 @@ async fn check_for_all_consistencies_and_setting_options< // We will be using these requests: let query = Query::from(QUERY_STR); let prepared = session.prepare(QUERY_STR).await.unwrap(); - let batch = Batch::new_with_statements( - BatchType::Logged, - vec![BatchStatement::Query(Query::from(QUERY_STR))], - ); + let mut batch = Batch::new(BatchType::Logged); + batch.append_statement(prepared.clone(), (1,)).unwrap(); for (consistency, serial_consistency) in pairs_of_all_consistencies() { // Some checks are double, because both non-paged and paged executions are done. @@ -241,10 +238,7 @@ async fn check_for_all_consistencies_and_setting_options< .unwrap(); rx = check_consistencies(consistency, serial_consistency, rx).await; - session_with_consistencies - .batch(&batch, ((1,),)) - .await - .unwrap(); + session_with_consistencies.batch(&batch).await.unwrap(); rx = check_consistencies(consistency, serial_consistency, rx).await; } } diff --git a/scylla/tests/integration/execution_profiles.rs b/scylla/tests/integration/execution_profiles.rs index 119487a609..fb4d2e22c5 100644 --- a/scylla/tests/integration/execution_profiles.rs +++ b/scylla/tests/integration/execution_profiles.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use crate::utils::test_with_3_node_cluster; use assert_matches::assert_matches; -use scylla::batch::BatchStatement; use scylla::batch::{Batch, BatchType}; use scylla::query::Query; use scylla::statement::SerialConsistency; @@ -173,7 +172,8 @@ async fn test_execution_profiles() { let mut query = Query::from(format!("INSERT INTO {}.t (a, b, c) VALUES (1, 2, 'abc')", ks)); let mut prepared = session.prepare(format!("INSERT INTO {}.t (a, b, c) VALUES (1, 2, 'abc')", ks)).await.unwrap(); - let mut batch = Batch::new_with_statements(BatchType::Unlogged, vec![BatchStatement::Query(query.clone())]); + let mut batch = Batch::new(BatchType::Unlogged); + batch.append_query(query.clone()); while profile_rx.try_recv().is_ok() {} consistency_rx.try_recv().unwrap_err(); @@ -194,7 +194,7 @@ async fn test_execution_profiles() { assert_matches!((report1, report2), ((Report::LoadBalancing, 1), (Report::RetryPolicy, 1)) | ((Report::RetryPolicy, 1), (Report::LoadBalancing, 1))); profile_rx.try_recv().unwrap_err(); - session.batch(&batch, ((),)).await.unwrap(); + session.batch(&batch).await.unwrap(); let report1 = profile_rx.recv().await.unwrap(); let report2 = profile_rx.recv().await.unwrap(); assert_matches!((report1, report2), ((Report::LoadBalancing, 1), (Report::RetryPolicy, 1)) | ((Report::RetryPolicy, 1), (Report::LoadBalancing, 1))); @@ -216,7 +216,7 @@ async fn test_execution_profiles() { profile_rx.try_recv().unwrap_err(); batch.set_execution_profile_handle(Some(profile2.clone().into_handle())); - session.batch(&batch, ((),)).await.unwrap(); + session.batch(&batch).await.unwrap(); let report1 = profile_rx.recv().await.unwrap(); let report2 = profile_rx.recv().await.unwrap(); assert_matches!((report1, report2), ((Report::LoadBalancing, 2), (Report::RetryPolicy, 2)) | ((Report::RetryPolicy, 2), (Report::LoadBalancing, 2))); @@ -238,7 +238,7 @@ async fn test_execution_profiles() { profile_rx.try_recv().unwrap_err(); batch.set_execution_profile_handle(None); - session.batch(&batch, ((),)).await.unwrap(); + session.batch(&batch).await.unwrap(); let report1 = profile_rx.recv().await.unwrap(); let report2 = profile_rx.recv().await.unwrap(); assert_matches!((report1, report2), ((Report::LoadBalancing, 1), (Report::RetryPolicy, 1)) | ((Report::RetryPolicy, 1), (Report::LoadBalancing, 1))); @@ -267,7 +267,7 @@ async fn test_execution_profiles() { assert_matches!(report_consistency, Consistency::One); consistency_rx.try_recv().unwrap_err(); - session.batch(&batch, ((),)).await.unwrap_err(); + session.batch(&batch).await.unwrap_err(); let report_consistency = consistency_rx.recv().await.unwrap(); assert_matches!(report_consistency, Consistency::One); consistency_rx.try_recv().unwrap_err(); @@ -286,7 +286,7 @@ async fn test_execution_profiles() { consistency_rx.try_recv().unwrap_err(); batch.set_execution_profile_handle(Some(profile2.clone().into_handle())); - session.batch(&batch, ((),)).await.unwrap_err(); + session.batch(&batch).await.unwrap_err(); let report_consistency = consistency_rx.recv().await.unwrap(); assert_matches!(report_consistency, Consistency::Two); consistency_rx.try_recv().unwrap_err(); @@ -305,7 +305,7 @@ async fn test_execution_profiles() { consistency_rx.try_recv().unwrap_err(); batch.set_consistency(Consistency::Three); - session.batch(&batch, ((),)).await.unwrap_err(); + session.batch(&batch).await.unwrap_err(); let report_consistency = consistency_rx.recv().await.unwrap(); assert_matches!(report_consistency, Consistency::Three); consistency_rx.try_recv().unwrap_err();