Skip to content

Commit

Permalink
feat: Add query options and improve query execution (#39)
Browse files Browse the repository at this point in the history
This commit introduces a new QueryOptions structure that allows users to
specify whether a query should be prepared or not. The execute method in
ScyllaSession has been updated to handle these options and decide
whether to prepare a query or not based on the provided options. This
improves the performance of queries that need to be prepared before
execution.

Additionally, helper methods have been added to handle the execution of
prepared statements and direct queries separately, improving code
readability and maintainability.

The TypeScript definitions and examples have also been updated to
reflect these changes.

Signed-off-by: Daniel Boll <[email protected]>
  • Loading branch information
Daniel-Boll authored Aug 12, 2024
1 parent b571646 commit 048c17a
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 23 deletions.
26 changes: 18 additions & 8 deletions examples/basic.mts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Cluster } from "../index.js"
import { Cluster } from "../index.js";

const nodes = process.env.CLUSTER_NODES?.split(",") ?? ["127.0.0.1:9042"];

Expand All @@ -7,15 +7,25 @@ console.log(`Connecting to ${nodes}`);
const cluster = new Cluster({ nodes });
const session = await cluster.connect();

await session.execute("CREATE KEYSPACE IF NOT EXISTS basic WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
await session.execute(
"CREATE KEYSPACE IF NOT EXISTS basic WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }",
);
await session.useKeyspace("basic");

await session.execute("CREATE TABLE IF NOT EXISTS basic (a int, b int, c text, primary key (a, b))");
await session.execute(
"CREATE TABLE IF NOT EXISTS basic (a int, b int, c text, primary key (a, b))",
);

await session.execute("INSERT INTO basic (a, b, c) VALUES (1, 2, 'abc')");
await session.execute("INSERT INTO basic (a, b, c) VALUES (?, ?, ?)", [3, 4, "def"]);

const prepared = await session.prepare("INSERT INTO basic (a, b, c) VALUES (?, 7, ?)");
await session.execute("INSERT INTO basic (a, b, c) VALUES (?, ?, ?)", [
3,
4,
"def",
]);

const prepared = await session.prepare(
"INSERT INTO basic (a, b, c) VALUES (?, 7, ?)",
);
await session.execute(prepared, [42, "I'm prepared!"]);
await session.execute(prepared, [43, "I'm prepared 2!"]);
await session.execute(prepared, [44, "I'm prepared 3!"]);
Expand All @@ -25,7 +35,7 @@ interface RowData {
b: number;
c: string;
}
const result = await session.execute<RowData>("SELECT a, b, c FROM basic");
const result = await session.execute("SELECT a, b, c FROM basic");
console.log(result);

const metrics = session.metrics();
Expand All @@ -34,4 +44,4 @@ console.log(`Iter queries requested: ${metrics.getQueriesIterNum()}`);
console.log(`Errors occurred: ${metrics.getErrorsNum()}`);
console.log(`Iter errors occurred: ${metrics.getErrorsIterNum()}`);
console.log(`Average latency: ${metrics.getLatencyAvgMs()}`);
console.log(`99.9 latency percentile: ${metrics.getLatencyPercentileMs(99.9)}`);
console.log(`99.9 latency percentile: ${metrics.getLatencyPercentileMs(99.9)}`);
38 changes: 38 additions & 0 deletions examples/prepared.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { Cluster } from "../index.js";

const nodes = process.env.CLUSTER_NODES?.split(",") ?? ["127.0.0.1:9042"];

console.log(`Connecting to ${nodes}`);

const cluster = new Cluster({ nodes });
const session = await cluster.connect();

await session.execute(
"CREATE KEYSPACE IF NOT EXISTS prepared WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }",
);
await session.useKeyspace("prepared");

await session.execute(
"CREATE TABLE IF NOT EXISTS prepared (a int, b int, c text, primary key (a, b))",
);

const prepared = await session.prepare(
"INSERT INTO basic (a, b, c) VALUES (?, 7, ?)",
);
await session.execute(prepared, [42, "I'm prepared!"]);
await session.execute(prepared, [43, "I'm prepared 2!"]);
await session.execute(prepared, [44, "I'm prepared 3!"]);

await session.execute(
"INSERT INTO basic (a, b, c) VALUES (?, 7, ?)",
[45, "I'm also prepared"],
{ prepare: true },
);

const metrics = session.metrics();
console.log(`Queries requested: ${metrics.getQueriesNum()}`);
console.log(`Iter queries requested: ${metrics.getQueriesIterNum()}`);
console.log(`Errors occurred: ${metrics.getErrorsNum()}`);
console.log(`Iter errors occurred: ${metrics.getErrorsIterNum()}`);
console.log(`Average latency: ${metrics.getLatencyAvgMs()}`);
console.log(`99.9 latency percentile: ${metrics.getLatencyPercentileMs(99.9)}`);
21 changes: 20 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ export const enum VerifyMode {
None = 0,
Peer = 1
}
export interface QueryOptions {
prepare?: boolean
}
export interface ScyllaKeyspace {
strategy: ScyllaStrategy
tables: Record<string, ScyllaTable>
Expand Down Expand Up @@ -154,7 +157,23 @@ export class Metrics {
export class ScyllaSession {
metrics(): Metrics
getClusterData(): Promise<ScyllaClusterData>
execute(query: string | Query | PreparedStatement, parameters?: Array<number | string | Uuid | Record<string, number | string | Uuid>> | undefined | null): Promise<any>
/**
* Sends a query to the database and receives a response.\
* Returns only a single page of results, to receive multiple pages use (TODO: Not implemented yet)
*
* This is the easiest way to make a query, but performance is worse than that of prepared queries.
*
* It is discouraged to use this method with non-empty values argument. In such case, query first needs to be prepared (on a single connection), so
* driver will perform 2 round trips instead of 1. Please use `PreparedStatement` object or `{ prepared: true }` option instead.
*
* # Notes
*
* ## UDT
* Order of fields in the object must match the order of fields as defined in the UDT. The
* driver does not check it by itself, so incorrect data will be written if the order is
* wrong.
*/
execute(query: string | Query | PreparedStatement, parameters?: Array<number | string | Uuid | Record<string, number | string | Uuid>> | undefined | null, options?: QueryOptions | undefined | null): Promise<any>
query(scyllaQuery: Query, parameters?: Array<number | string | Uuid | Record<string, number | string | Uuid>> | undefined | null): Promise<any>
prepare(query: string): Promise<PreparedStatement>
/**
Expand Down
112 changes: 98 additions & 14 deletions src/session/scylla_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ use crate::query::scylla_prepared_statement::PreparedStatement;
use crate::query::scylla_query::Query;
use crate::types::uuid::Uuid;
use napi::bindgen_prelude::{Either3, Either4};
use napi::Either;

use super::metrics;
use super::topology::ScyllaClusterData;

#[napi(object)]
pub struct QueryOptions {
pub prepare: Option<bool>,
}

#[napi]
pub struct ScyllaSession {
session: scylla::Session,
Expand Down Expand Up @@ -39,6 +45,20 @@ impl ScyllaSession {
cluster_data.into()
}

/// Sends a query to the database and receives a response.\
/// Returns only a single page of results, to receive multiple pages use (TODO: Not implemented yet)
///
/// This is the easiest way to make a query, but performance is worse than that of prepared queries.
///
/// It is discouraged to use this method with non-empty values argument. In such case, query first needs to be prepared (on a single connection), so
/// driver will perform 2 round trips instead of 1. Please use `PreparedStatement` object or `{ prepared: true }` option instead.
///
/// # Notes
///
/// ## UDT
/// Order of fields in the object must match the order of fields as defined in the UDT. The
/// driver does not check it by itself, so incorrect data will be written if the order is
/// wrong.
#[allow(clippy::type_complexity)]
#[napi]
pub async fn execute(
Expand All @@ -47,27 +67,91 @@ impl ScyllaSession {
parameters: Option<
Vec<Either4<u32, String, &Uuid, HashMap<String, Either3<u32, String, &Uuid>>>>,
>,
options: Option<QueryOptions>,
) -> napi::Result<serde_json::Value> {
let values = QueryParameter::parser(parameters.clone()).ok_or(napi::Error::new(
napi::Status::InvalidArg,
format!("Something went wrong with your query parameters. {parameters:?}"),
))?;
let values = QueryParameter::parser(parameters.clone()).ok_or_else(|| {
napi::Error::new(
napi::Status::InvalidArg,
format!(
"Something went wrong with your query parameters. {:?}",
parameters
),
)
})?;

let should_prepare = options.map_or(false, |options| options.prepare.unwrap_or(false));

let query_result = match query.clone() {
Either3::A(query) => self.session.query(query, values).await,
Either3::B(query) => self.session.query(query.query.clone(), values).await,
Either3::C(prepared) => self.session.execute(&prepared.prepared, values).await,
match query {
Either3::A(ref query_str) if should_prepare => {
let prepared = self.session.prepare(query_str.clone()).await.map_err(|e| {
napi::Error::new(
napi::Status::InvalidArg,
format!(
"Something went wrong preparing your statement. - [{}]\n{}",
query_str, e
),
)
})?;
self.execute_prepared(&prepared, values, query_str).await
}
Either3::A(query_str) => self.execute_query(Either::A(query_str), values).await,
Either3::B(query_ref) => {
self
.execute_query(Either::B(query_ref.query.clone()), values)
.await
}
Either3::C(prepared_ref) => {
self
.execute_prepared(
&prepared_ref.prepared,
values,
prepared_ref.prepared.get_statement(),
)
.await
}
}
}

// Helper method to handle prepared statements
async fn execute_prepared(
&self,
prepared: &scylla::prepared_statement::PreparedStatement,
values: QueryParameter<'_>,
query: &str,
) -> napi::Result<serde_json::Value> {
let query_result = self.session.execute(prepared, values).await.map_err(|e| {
napi::Error::new(
napi::Status::InvalidArg,
format!(
"Something went wrong with your prepared statement. - [{}]\n{}",
query, e
),
)
})?;
Ok(QueryResult::parser(query_result))
}

// Helper method to handle direct queries
async fn execute_query(
&self,
query: Either<String, scylla::query::Query>,
values: QueryParameter<'_>,
) -> napi::Result<serde_json::Value> {
let query_result = match &query {
Either::A(query_str) => self.session.query(query_str.clone(), values).await,
Either::B(query_ref) => self.session.query(query_ref.clone(), values).await,
}
.map_err(|e| {
let query = match query {
Either3::A(query) => query,
Either3::B(query) => query.query.contents.clone(),
Either3::C(prepared) => prepared.prepared.get_statement().to_string(),
let query_str = match query {
Either::A(query_str) => query_str,
Either::B(query_ref) => query_ref.contents.clone(),
};

napi::Error::new(
napi::Status::InvalidArg,
format!("Something went wrong with your query. - [{query}] - {parameters:?}\n{e}"),
format!(
"Something went wrong with your query. - [{}]\n{}",
query_str, e
),
)
})?;

Expand Down

0 comments on commit 048c17a

Please sign in to comment.