From a447fbf76bea96d073c736815e6a3f6fc1da4fdd Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Mon, 10 Feb 2025 18:44:03 +0100 Subject: [PATCH] Option to report trace only once during pagination --- .../api/core/config/DefaultDriverOption.java | 8 +- .../driver/api/core/config/OptionsMap.java | 1 + .../api/core/config/TypedDriverOption.java | 4 + .../driver/internal/core/cql/Conversions.java | 5 +- .../internal/core/cql/CqlRequestHandler.java | 2 +- .../core/cql/DefaultAsyncResultSet.java | 12 ++- core/src/main/resources/reference.conf | 8 ++ .../CqlRequestReactiveProcessorTest.java | 5 +- .../core/AsyncPagingIterableWrapperTest.java | 18 +++- .../core/cql/DefaultAsyncResultSetTest.java | 39 ++++++-- .../oss/driver/core/cql/QueryTraceIT.java | 96 ++++++++++++++++++- 11 files changed, 181 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java index 11f2702c3cf..f1844c6aba1 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java @@ -988,7 +988,13 @@ public enum DefaultDriverOption implements DriverOption { *

Value type: {@link java.util.List List}<{@link String}> */ LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS( - "advanced.load-balancing-policy.dc-failover.preferred-remote-dcs"); + "advanced.load-balancing-policy.dc-failover.preferred-remote-dcs"), + /** + * Report trace for every page fetch request + * + *

Value-type: {@link Boolean} + */ + REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH("advanced.request.trace.report-every-page-fetch"); private final String path; diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java index 98faf3e590c..aba86753d69 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java @@ -289,6 +289,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) { map.put(TypedDriverOption.REQUEST_WARN_IF_SET_KEYSPACE, true); map.put(TypedDriverOption.REQUEST_TRACE_ATTEMPTS, 5); map.put(TypedDriverOption.REQUEST_TRACE_INTERVAL, Duration.ofMillis(3)); + map.put(TypedDriverOption.REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH, true); map.put(TypedDriverOption.REQUEST_TRACE_CONSISTENCY, "ONE"); map.put(TypedDriverOption.REQUEST_LOG_WARNINGS, true); map.put(TypedDriverOption.GRAPH_PAGING_ENABLED, "AUTO"); diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java index ca60b67f0ba..e57f452e225 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java @@ -396,6 +396,10 @@ public String toString() { /** The consistency level to use for trace queries. */ public static final TypedDriverOption REQUEST_TRACE_CONSISTENCY = new TypedDriverOption<>(DefaultDriverOption.REQUEST_TRACE_CONSISTENCY, GenericType.STRING); + /** Report trace for every page fetch request */ + public static final TypedDriverOption REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH = + new TypedDriverOption<>( + DefaultDriverOption.REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH, GenericType.BOOLEAN); /** Whether or not to publish aggregable histogram for metrics */ public static final TypedDriverOption METRICS_GENERATE_AGGREGABLE_HISTOGRAMS = new TypedDriverOption<>( diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java index ff9384b3e24..078f8bb3c73 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java @@ -320,13 +320,14 @@ public static AsyncResultSet toResultSet( Result result, ExecutionInfo executionInfo, CqlSession session, - InternalDriverContext context) { + InternalDriverContext context, + DriverExecutionProfile executionProfile) { if (result instanceof Rows) { Rows rows = (Rows) result; Statement statement = (Statement) executionInfo.getRequest(); ColumnDefinitions columnDefinitions = getResultDefinitions(rows, statement, context); return new DefaultAsyncResultSet( - columnDefinitions, executionInfo, rows.getData(), session, context); + columnDefinitions, executionInfo, rows.getData(), session, context, executionProfile); } else if (result instanceof Prepared) { // This should never happen throw new IllegalArgumentException("Unexpected PREPARED response to a CQL query"); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index 0808bdce63f..a1e9cfac4dd 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -321,7 +321,7 @@ private void setFinalResult( ExecutionInfo executionInfo = buildExecutionInfo(callback, resultMessage, responseFrame, schemaInAgreement); AsyncResultSet resultSet = - Conversions.toResultSet(resultMessage, executionInfo, session, context); + Conversions.toResultSet(resultMessage, executionInfo, session, context, executionProfile); if (result.complete(resultSet)) { cancelScheduledTasks(); throttler.signalSuccess(this); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSet.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSet.java index 243e9aeb775..25208be2dec 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSet.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSet.java @@ -18,6 +18,8 @@ package com.datastax.oss.driver.internal.core.cql; import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; @@ -43,6 +45,7 @@ public class DefaultAsyncResultSet implements AsyncResultSet { private final ColumnDefinitions definitions; private final ExecutionInfo executionInfo; + private final DriverExecutionProfile executionProfile; private final CqlSession session; private final CountingIterator iterator; private final Iterable currentPage; @@ -52,9 +55,11 @@ public DefaultAsyncResultSet( ExecutionInfo executionInfo, Queue> data, CqlSession session, - InternalDriverContext context) { + InternalDriverContext context, + DriverExecutionProfile executionProfile) { this.definitions = definitions; this.executionInfo = executionInfo; + this.executionProfile = executionProfile; this.session = session; this.iterator = new CountingIterator(data.size()) { @@ -106,6 +111,11 @@ public CompletionStage fetchNextPage() throws IllegalStateExcept Statement statement = (Statement) executionInfo.getRequest(); LOG.trace("Fetching next page for {}", statement); Statement nextStatement = statement.copy(nextState); + if (!executionProfile.getBoolean(DefaultDriverOption.REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH) + && nextStatement.isTracing()) { + // report traces only for first page + nextStatement = nextStatement.setTracing(false); + } return session.executeAsync(nextStatement); } diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 7b1c43f8bea..19ff1f706ce 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -1153,6 +1153,14 @@ datastax-java-driver { # Modifiable at runtime: yes, the new value will be used for traces fetched after the change. # Overridable in a profile: yes consistency = ONE + + # Report trace for every page fetch request. If disabled, only one trace entry will be present + # on server side even if client application pages through a large partition. + # + # Required: yes + # Modifiable at runtime: yes, the new value will be used for traces fetched after the change. + # Overridable in a profile: yes + report-every-page-fetch = true } # Whether logging of server warnings generated during query execution should be disabled by the diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/cql/reactive/CqlRequestReactiveProcessorTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/cql/reactive/CqlRequestReactiveProcessorTest.java index a7a6bced9e8..118a737be16 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/cql/reactive/CqlRequestReactiveProcessorTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/cql/reactive/CqlRequestReactiveProcessorTest.java @@ -29,6 +29,7 @@ import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet; import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow; import com.datastax.oss.driver.api.core.ProtocolVersion; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; @@ -135,6 +136,7 @@ public void should_complete_multi_page_result(ProtocolVersion version) { CompletableFuture page2Future = new CompletableFuture<>(); when(session.executeAsync(any(Statement.class))).thenAnswer(invocation -> page2Future); ExecutionInfo mockInfo = mock(ExecutionInfo.class); + DriverExecutionProfile mockExecutionProfile = mock(DriverExecutionProfile.class); ReactiveResultSet publisher = new CqlRequestReactiveProcessor(new CqlRequestAsyncProcessor()) @@ -152,7 +154,8 @@ public void should_complete_multi_page_result(ProtocolVersion version) { DseTestFixtures.tenDseRows(2, true), mockInfo, harness.getSession(), - harness.getContext())); + harness.getContext(), + mockExecutionProfile)); List rows = rowsPublisher.toList().blockingGet(); assertThat(rows).hasSize(20); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/AsyncPagingIterableWrapperTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/AsyncPagingIterableWrapperTest.java index dff9877b62d..3556490e470 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/AsyncPagingIterableWrapperTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/AsyncPagingIterableWrapperTest.java @@ -24,6 +24,7 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.DefaultProtocolVersion; import com.datastax.oss.driver.api.core.MappedAsyncPagingIterable; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.cql.ColumnDefinition; import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; @@ -50,6 +51,7 @@ public class AsyncPagingIterableWrapperTest { @Mock private Statement statement; @Mock private CqlSession session; @Mock private InternalDriverContext context; + @Mock private DriverExecutionProfile executionProfile; @Before public void setup() { @@ -74,10 +76,15 @@ public void should_wrap_result_set() throws Exception { ExecutionInfo executionInfo1 = mockExecutionInfo(); DefaultAsyncResultSet resultSet1 = new DefaultAsyncResultSet( - columnDefinitions, executionInfo1, mockData(0, 5), session, context); + columnDefinitions, executionInfo1, mockData(0, 5), session, context, executionProfile); DefaultAsyncResultSet resultSet2 = new DefaultAsyncResultSet( - columnDefinitions, mockExecutionInfo(), mockData(5, 10), session, context); + columnDefinitions, + mockExecutionInfo(), + mockData(5, 10), + session, + context, + executionProfile); // chain them together: ByteBuffer mockPagingState = ByteBuffer.allocate(0); when(executionInfo1.getPagingState()).thenReturn(mockPagingState); @@ -111,7 +118,12 @@ public void should_share_iteration_progress_with_wrapped_result_set() { // Given DefaultAsyncResultSet resultSet = new DefaultAsyncResultSet( - columnDefinitions, mockExecutionInfo(), mockData(0, 10), session, context); + columnDefinitions, + mockExecutionInfo(), + mockData(0, 10), + session, + context, + executionProfile); // When MappedAsyncPagingIterable iterable = resultSet.map(row -> row.getInt("i")); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSetTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSetTest.java index 8ed509caeb7..b3cff9093ae 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSetTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSetTest.java @@ -26,6 +26,7 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.DefaultProtocolVersion; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.ColumnDefinition; import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; @@ -55,6 +56,7 @@ public class DefaultAsyncResultSetTest { @Mock private Statement statement; @Mock private CqlSession session; @Mock private InternalDriverContext context; + @Mock private DriverExecutionProfile executionProfile; @Before public void setup() { @@ -73,7 +75,12 @@ public void should_fail_to_fetch_next_page_if_last() { // When DefaultAsyncResultSet resultSet = new DefaultAsyncResultSet( - columnDefinitions, executionInfo, new ArrayDeque<>(), session, context); + columnDefinitions, + executionInfo, + new ArrayDeque<>(), + session, + context, + executionProfile); // Then assertThat(resultSet.hasMorePages()).isFalse(); @@ -95,7 +102,12 @@ public void should_invoke_session_to_fetch_next_page() { // When DefaultAsyncResultSet resultSet = new DefaultAsyncResultSet( - columnDefinitions, executionInfo, new ArrayDeque<>(), session, context); + columnDefinitions, + executionInfo, + new ArrayDeque<>(), + session, + context, + executionProfile); assertThat(resultSet.hasMorePages()).isTrue(); CompletionStage nextPageFuture = resultSet.fetchNextPage(); @@ -113,7 +125,12 @@ public void should_report_applied_if_column_not_present_and_empty() { // When DefaultAsyncResultSet resultSet = new DefaultAsyncResultSet( - columnDefinitions, executionInfo, new ArrayDeque<>(), session, context); + columnDefinitions, + executionInfo, + new ArrayDeque<>(), + session, + context, + executionProfile); // Then assertThat(resultSet.wasApplied()).isTrue(); @@ -128,7 +145,8 @@ public void should_report_applied_if_column_not_present_and_not_empty() { // When DefaultAsyncResultSet resultSet = - new DefaultAsyncResultSet(columnDefinitions, executionInfo, data, session, context); + new DefaultAsyncResultSet( + columnDefinitions, executionInfo, data, session, context, executionProfile); // Then assertThat(resultSet.wasApplied()).isTrue(); @@ -149,7 +167,8 @@ public void should_report_not_applied_if_column_present_and_false() { // When DefaultAsyncResultSet resultSet = - new DefaultAsyncResultSet(columnDefinitions, executionInfo, data, session, context); + new DefaultAsyncResultSet( + columnDefinitions, executionInfo, data, session, context, executionProfile); // Then assertThat(resultSet.wasApplied()).isFalse(); @@ -170,7 +189,8 @@ public void should_report_not_applied_if_column_present_and_true() { // When DefaultAsyncResultSet resultSet = - new DefaultAsyncResultSet(columnDefinitions, executionInfo, data, session, context); + new DefaultAsyncResultSet( + columnDefinitions, executionInfo, data, session, context, executionProfile); // Then assertThat(resultSet.wasApplied()).isTrue(); @@ -187,7 +207,12 @@ public void should_fail_to_report_if_applied_if_column_present_but_empty() { // When DefaultAsyncResultSet resultSet = new DefaultAsyncResultSet( - columnDefinitions, executionInfo, new ArrayDeque<>(), session, context); + columnDefinitions, + executionInfo, + new ArrayDeque<>(), + session, + context, + executionProfile); // Then resultSet.wasApplied(); diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/QueryTraceIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/QueryTraceIT.java index 37a600efbc4..d12c321a496 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/QueryTraceIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/QueryTraceIT.java @@ -19,20 +19,31 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.catchThrowable; +import static org.awaitility.Awaitility.await; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.Version; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.QueryTrace; +import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.metadata.EndPoint; import com.datastax.oss.driver.api.testinfra.ccm.CcmRule; import com.datastax.oss.driver.api.testinfra.requirement.BackendType; import com.datastax.oss.driver.api.testinfra.session.SessionRule; +import com.datastax.oss.driver.api.testinfra.session.SessionUtils; import com.datastax.oss.driver.categories.ParallelizableTests; +import com.datastax.oss.driver.internal.core.util.Strings; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -46,8 +57,17 @@ public class QueryTraceIT { private static final SessionRule SESSION_RULE = SessionRule.builder(CCM_RULE).build(); + private static final SessionRule SESSION_RULE_SINGLE_TRACE = + SessionRule.builder(CCM_RULE) + .withConfigLoader( + SessionUtils.configLoaderBuilder() + .withBoolean(DefaultDriverOption.REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH, false) + .build()) + .build(); + @ClassRule - public static final TestRule CHAIN = RuleChain.outerRule(CCM_RULE).around(SESSION_RULE); + public static final TestRule CHAIN = + RuleChain.outerRule(CCM_RULE).around(SESSION_RULE).around(SESSION_RULE_SINGLE_TRACE); @Test public void should_not_have_tracing_id_when_tracing_disabled() { @@ -131,4 +151,78 @@ public void should_fetch_trace_when_tracing_enabled() { assertThat(sourceAddress0.getPort()).isEqualTo(0); } } + + @Test + public void should_report_trace_once_during_pagination() { + testTraceDuringPagination(SESSION_RULE_SINGLE_TRACE, 1); + } + + @Test + public void should_report_trace_multiple_during_pagination() { + testTraceDuringPagination(SESSION_RULE, 6); + } + + private void testTraceDuringPagination( + SessionRule sessionRule, int traceEventsCount) { + String key = setupPaginationTable(sessionRule); + + String cql = "SELECT v0, v1 FROM trace_pagination WHERE k = ?"; + SimpleStatement query = SimpleStatement.builder(cql).setTracing().setPageSize(2).build(); + PreparedStatement preparedStatement = sessionRule.session().prepare(query); + ResultSet resultSet = sessionRule.session().execute(preparedStatement.bind(key)); + + ExecutionInfo executionInfo = resultSet.getExecutionInfo(); + assertThat(executionInfo.getTracingId()).isNotNull(); + QueryTrace queryTrace = executionInfo.getQueryTrace(); + assertThat(queryTrace.getTracingId()).isEqualTo(executionInfo.getTracingId()); + assertThat(queryTrace.getRequestType()).isEqualTo("Execute CQL3 prepared query"); + + Iterator iterator = resultSet.iterator(); + while (iterator.hasNext()) { + iterator.next(); // iterate over several pages + } + + // assert that only one event for tracing has been recorded + await() + .untilAsserted( + () -> { + List rows = + sessionRule.session().execute("SELECT * FROM system_traces.sessions").all() + .stream() + .filter(row -> isTraceForQuery(row, cql, key)) + .collect(Collectors.toList()); + assertThat(rows).hasSize(traceEventsCount); + }); + } + + private String setupPaginationTable(SessionRule sessionRule) { + String key = UUID.randomUUID().toString(); + sessionRule + .session() + .execute( + SimpleStatement.builder( + "CREATE TABLE IF NOT EXISTS trace_pagination (k text, v0 int, v1 int, PRIMARY KEY(k, v0))") + .setExecutionProfile(sessionRule.slowProfile()) + .build()); + for (int i = 0; i < 10; i++) { + sessionRule + .session() + .execute( + SimpleStatement.builder("INSERT INTO trace_pagination (k, v0, v1) VALUES (?, ?, ?)") + .addPositionalValues(key, i, i) + .build()); + } + return key; + } + + private static boolean isTraceForQuery(Row row, String cql, String key) { + if (!row.getColumnDefinitions().contains("parameters")) { + return false; + } + Map queryParams = row.getMap("parameters", String.class, String.class); + if (queryParams == null || !queryParams.containsKey("query")) { + return false; + } + return queryParams.get("query").contains(cql) && queryParams.containsValue(Strings.quote(key)); + } }