Skip to content

Commit

Permalink
session: move the batch token awareness module to separate function
Browse files Browse the repository at this point in the history
Simplify the logic of `Session::batch` by moving the parts responsible
for token calculation and wrapping the `BatchValues` argument into a
separate function in the `batch_values` module.
  • Loading branch information
piodul committed Dec 14, 2023
1 parent 3952d18 commit 59534dd
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 29 deletions.
51 changes: 46 additions & 5 deletions scylla/src/statement/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ impl<'a: 'b, 'b> From<&'a BatchStatement>
}

pub(crate) mod batch_values {
use scylla_cql::errors::QueryError;
use scylla_cql::frame::types::RawValue;
use scylla_cql::types::serialize::batch::BatchValues;
use scylla_cql::types::serialize::batch::BatchValuesIterator;
Expand All @@ -207,18 +208,58 @@ pub(crate) mod batch_values {
use scylla_cql::types::serialize::row::SerializedValues;
use scylla_cql::types::serialize::{RowWriter, SerializationError};

use crate::routing::Token;

use super::BatchStatement;

// Takes an optional reference to the first statement in the batch and
// the batch values, and tries to compute the token for the statement.
// Returns the (optional) token and batch values. If the function needed
// to serialize values for the first statement, the returned batch values
// will cache the results of the serialization.
//
// NOTE: Batch values returned by this function might not type check
// the first statement when it is serialized! However, if they don't,
// then the first row was already checked by the function. It is assumed
// that `statement` holds the first prepared statement of the batch (if
// there is one), and that it will be used later to serialize the values.
pub(crate) fn peek_first_token<'bv>(
values: impl BatchValues + 'bv,
statement: Option<&BatchStatement>,
) -> Result<(Option<Token>, impl BatchValues + 'bv), QueryError> {
let mut values_iter = values.batch_values_iter();
let first_values = values_iter.next();
let (token, first_values) = match (first_values, statement) {
(Some(first_values), Some(BatchStatement::PreparedStatement(ps))) => {
let ctx = RowSerializationContext::from_prepared(ps.get_prepared_metadata());
let first_values = SerializedValues::from_serializable(&ctx, &first_values)?;
let token = ps.calculate_token_untyped(&first_values)?;
(token, Some(first_values))
}
_ => (None, None),
};

// Need to do it explicitly, otherwise the next line will complain
// that `values_iter` still borrows `values`.
std::mem::drop(values_iter);

// Reuse the already serialized first value via `BatchValuesFirstSerialized`.
let values = BatchValuesFirstSerialized::new(values, first_values);

Ok((token, values))
}

struct BatchValuesFirstSerialized<BV> {
// Contains the first value of BV in a serialized form.
// The first value in the iterator returned from `rest` should be skipped!
first: Option<SerializedValues>,
rest: BV,
}

pub(crate) fn new_batch_values_first_serialized(
rest: impl BatchValues,
first: Option<SerializedValues>,
) -> impl BatchValues {
BatchValuesFirstSerialized { first, rest }
impl<BV> BatchValuesFirstSerialized<BV> {
fn new(rest: BV, first: Option<SerializedValues>) -> Self {
Self { first, rest }
}
}

impl<BV> BatchValues for BatchValuesFirstSerialized<BV>
Expand Down
34 changes: 10 additions & 24 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use itertools::{Either, Itertools};
pub use scylla_cql::errors::TranslationError;
use scylla_cql::frame::response::result::{deser_cql_value, ColumnSpec, Rows};
use scylla_cql::frame::response::NonErrorResponse;
use scylla_cql::types::serialize::batch::{BatchValues, BatchValuesIterator};
use scylla_cql::types::serialize::row::{RowSerializationContext, SerializeRow, SerializedValues};
use scylla_cql::types::serialize::batch::BatchValues;
use scylla_cql::types::serialize::row::SerializeRow;
use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt::Display;
Expand Down Expand Up @@ -1194,23 +1194,15 @@ impl Session {
.serial_consistency
.unwrap_or(execution_profile.serial_consistency);

let (first_serialized_value, first_value_token, keyspace_name) = {
let mut values_iter = values.batch_values_iter();
let first_values = values_iter.next();

// The temporary "p" is necessary because lifetimes
let p = match (first_values, batch.statements.first()) {
(Some(first_values), Some(BatchStatement::PreparedStatement(ps))) => {
let ctx = RowSerializationContext::from_prepared(ps.get_prepared_metadata());
let first_serialized_value =
SerializedValues::from_serializable(&ctx, &first_values)?;
let token = ps.calculate_token_untyped(&first_serialized_value)?;
(Some(first_serialized_value), token, ps.get_keyspace_name())
}
_ => (None, None, None),
};
p
let keyspace_name = match batch.statements.first() {
Some(BatchStatement::PreparedStatement(ps)) => ps.get_keyspace_name(),
_ => None,
};

let (first_value_token, values) =
batch_values::peek_first_token(values, batch.statements.first())?;
let values_ref = &values;

let statement_info = RoutingInfo {
consistency,
serial_consistency,
Expand All @@ -1219,12 +1211,6 @@ impl Session {
is_confirmed_lwt: false,
};

// Reuse first serialized value when serializing query, and delegate to `BatchValues::write_next_to_request`
// directly for others (if they weren't already serialized, possibly don't even allocate the `LegacySerializedValues`)
let values =
batch_values::new_batch_values_first_serialized(&values, first_serialized_value);
let values_ref = &values;

let span = RequestSpan::new_batch();

let run_query_result = self
Expand Down

0 comments on commit 59534dd

Please sign in to comment.