Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to report trace only once during pagination #2015

Open
wants to merge 1 commit into
base: 4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,13 @@ public enum DefaultDriverOption implements DriverOption {
* <p>Value type: {@link java.util.List List}&#60;{@link String}&#62;
*/
LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS(
"advanced.load-balancing-policy.dc-failover.preferred-remote-dcs");
"advanced.load-balancing-policy.dc-failover.preferred-remote-dcs"),
/**
* Report trace for every page fetch request
*
* <p>Value-type: {@link Boolean}
*/
REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH("advanced.request.trace.report-every-page-fetch");

private final String path;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
map.put(TypedDriverOption.REQUEST_WARN_IF_SET_KEYSPACE, true);
map.put(TypedDriverOption.REQUEST_TRACE_ATTEMPTS, 5);
map.put(TypedDriverOption.REQUEST_TRACE_INTERVAL, Duration.ofMillis(3));
map.put(TypedDriverOption.REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH, true);
map.put(TypedDriverOption.REQUEST_TRACE_CONSISTENCY, "ONE");
map.put(TypedDriverOption.REQUEST_LOG_WARNINGS, true);
map.put(TypedDriverOption.GRAPH_PAGING_ENABLED, "AUTO");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@ public String toString() {
/** The consistency level to use for trace queries. */
public static final TypedDriverOption<String> REQUEST_TRACE_CONSISTENCY =
new TypedDriverOption<>(DefaultDriverOption.REQUEST_TRACE_CONSISTENCY, GenericType.STRING);
/** Report trace for every page fetch request */
public static final TypedDriverOption<Boolean> REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH =
new TypedDriverOption<>(
DefaultDriverOption.REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH, GenericType.BOOLEAN);
/** Whether or not to publish aggregable histogram for metrics */
public static final TypedDriverOption<Boolean> METRICS_GENERATE_AGGREGABLE_HISTOGRAMS =
new TypedDriverOption<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,14 @@ public static AsyncResultSet toResultSet(
Result result,
ExecutionInfo executionInfo,
CqlSession session,
InternalDriverContext context) {
InternalDriverContext context,
DriverExecutionProfile executionProfile) {
if (result instanceof Rows) {
Rows rows = (Rows) result;
Statement<?> statement = (Statement<?>) executionInfo.getRequest();
ColumnDefinitions columnDefinitions = getResultDefinitions(rows, statement, context);
return new DefaultAsyncResultSet(
columnDefinitions, executionInfo, rows.getData(), session, context);
columnDefinitions, executionInfo, rows.getData(), session, context, executionProfile);
} else if (result instanceof Prepared) {
// This should never happen
throw new IllegalArgumentException("Unexpected PREPARED response to a CQL query");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ private void setFinalResult(
ExecutionInfo executionInfo =
buildExecutionInfo(callback, resultMessage, responseFrame, schemaInAgreement);
AsyncResultSet resultSet =
Conversions.toResultSet(resultMessage, executionInfo, session, context);
Conversions.toResultSet(resultMessage, executionInfo, session, context, executionProfile);
if (result.complete(resultSet)) {
cancelScheduledTasks();
throttler.signalSuccess(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.datastax.oss.driver.internal.core.cql;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
Expand All @@ -43,6 +45,7 @@ public class DefaultAsyncResultSet implements AsyncResultSet {

private final ColumnDefinitions definitions;
private final ExecutionInfo executionInfo;
private final DriverExecutionProfile executionProfile;
private final CqlSession session;
private final CountingIterator<Row> iterator;
private final Iterable<Row> currentPage;
Expand All @@ -52,9 +55,11 @@ public DefaultAsyncResultSet(
ExecutionInfo executionInfo,
Queue<List<ByteBuffer>> data,
CqlSession session,
InternalDriverContext context) {
InternalDriverContext context,
DriverExecutionProfile executionProfile) {
this.definitions = definitions;
this.executionInfo = executionInfo;
this.executionProfile = executionProfile;
this.session = session;
this.iterator =
new CountingIterator<Row>(data.size()) {
Expand Down Expand Up @@ -106,6 +111,11 @@ public CompletionStage<AsyncResultSet> fetchNextPage() throws IllegalStateExcept
Statement<?> statement = (Statement<?>) executionInfo.getRequest();
LOG.trace("Fetching next page for {}", statement);
Statement<?> nextStatement = statement.copy(nextState);
if (!executionProfile.getBoolean(DefaultDriverOption.REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH)
&& nextStatement.isTracing()) {
// report traces only for first page
nextStatement = nextStatement.setTracing(false);
}
return session.executeAsync(nextStatement);
}

Expand Down
8 changes: 8 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,14 @@ datastax-java-driver {
# Modifiable at runtime: yes, the new value will be used for traces fetched after the change.
# Overridable in a profile: yes
consistency = ONE

# Report trace for every page fetch request. If disabled, only one trace entry will be present
# on server side even if client application pages through a large partition.
#
# Required: yes
# Modifiable at runtime: yes, the new value will be used for traces fetched after the change.
# Overridable in a profile: yes
report-every-page-fetch = true
}

# Whether logging of server warnings generated during query execution should be disabled by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
Expand Down Expand Up @@ -135,6 +136,7 @@ public void should_complete_multi_page_result(ProtocolVersion version) {
CompletableFuture<AsyncResultSet> page2Future = new CompletableFuture<>();
when(session.executeAsync(any(Statement.class))).thenAnswer(invocation -> page2Future);
ExecutionInfo mockInfo = mock(ExecutionInfo.class);
DriverExecutionProfile mockExecutionProfile = mock(DriverExecutionProfile.class);

ReactiveResultSet publisher =
new CqlRequestReactiveProcessor(new CqlRequestAsyncProcessor())
Expand All @@ -152,7 +154,8 @@ public void should_complete_multi_page_result(ProtocolVersion version) {
DseTestFixtures.tenDseRows(2, true),
mockInfo,
harness.getSession(),
harness.getContext()));
harness.getContext(),
mockExecutionProfile));

List<ReactiveRow> rows = rowsPublisher.toList().blockingGet();
assertThat(rows).hasSize(20);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
import com.datastax.oss.driver.api.core.MappedAsyncPagingIterable;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
Expand All @@ -50,6 +51,7 @@ public class AsyncPagingIterableWrapperTest {
@Mock private Statement<?> statement;
@Mock private CqlSession session;
@Mock private InternalDriverContext context;
@Mock private DriverExecutionProfile executionProfile;

@Before
public void setup() {
Expand All @@ -74,10 +76,15 @@ public void should_wrap_result_set() throws Exception {
ExecutionInfo executionInfo1 = mockExecutionInfo();
DefaultAsyncResultSet resultSet1 =
new DefaultAsyncResultSet(
columnDefinitions, executionInfo1, mockData(0, 5), session, context);
columnDefinitions, executionInfo1, mockData(0, 5), session, context, executionProfile);
DefaultAsyncResultSet resultSet2 =
new DefaultAsyncResultSet(
columnDefinitions, mockExecutionInfo(), mockData(5, 10), session, context);
columnDefinitions,
mockExecutionInfo(),
mockData(5, 10),
session,
context,
executionProfile);
// chain them together:
ByteBuffer mockPagingState = ByteBuffer.allocate(0);
when(executionInfo1.getPagingState()).thenReturn(mockPagingState);
Expand Down Expand Up @@ -111,7 +118,12 @@ public void should_share_iteration_progress_with_wrapped_result_set() {
// Given
DefaultAsyncResultSet resultSet =
new DefaultAsyncResultSet(
columnDefinitions, mockExecutionInfo(), mockData(0, 10), session, context);
columnDefinitions,
mockExecutionInfo(),
mockData(0, 10),
session,
context,
executionProfile);

// When
MappedAsyncPagingIterable<Integer> iterable = resultSet.map(row -> row.getInt("i"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class DefaultAsyncResultSetTest {
@Mock private Statement<?> statement;
@Mock private CqlSession session;
@Mock private InternalDriverContext context;
@Mock private DriverExecutionProfile executionProfile;

@Before
public void setup() {
Expand All @@ -73,7 +75,12 @@ public void should_fail_to_fetch_next_page_if_last() {
// When
DefaultAsyncResultSet resultSet =
new DefaultAsyncResultSet(
columnDefinitions, executionInfo, new ArrayDeque<>(), session, context);
columnDefinitions,
executionInfo,
new ArrayDeque<>(),
session,
context,
executionProfile);

// Then
assertThat(resultSet.hasMorePages()).isFalse();
Expand All @@ -95,7 +102,12 @@ public void should_invoke_session_to_fetch_next_page() {
// When
DefaultAsyncResultSet resultSet =
new DefaultAsyncResultSet(
columnDefinitions, executionInfo, new ArrayDeque<>(), session, context);
columnDefinitions,
executionInfo,
new ArrayDeque<>(),
session,
context,
executionProfile);
assertThat(resultSet.hasMorePages()).isTrue();
CompletionStage<AsyncResultSet> nextPageFuture = resultSet.fetchNextPage();

Expand All @@ -113,7 +125,12 @@ public void should_report_applied_if_column_not_present_and_empty() {
// When
DefaultAsyncResultSet resultSet =
new DefaultAsyncResultSet(
columnDefinitions, executionInfo, new ArrayDeque<>(), session, context);
columnDefinitions,
executionInfo,
new ArrayDeque<>(),
session,
context,
executionProfile);

// Then
assertThat(resultSet.wasApplied()).isTrue();
Expand All @@ -128,7 +145,8 @@ public void should_report_applied_if_column_not_present_and_not_empty() {

// When
DefaultAsyncResultSet resultSet =
new DefaultAsyncResultSet(columnDefinitions, executionInfo, data, session, context);
new DefaultAsyncResultSet(
columnDefinitions, executionInfo, data, session, context, executionProfile);

// Then
assertThat(resultSet.wasApplied()).isTrue();
Expand All @@ -149,7 +167,8 @@ public void should_report_not_applied_if_column_present_and_false() {

// When
DefaultAsyncResultSet resultSet =
new DefaultAsyncResultSet(columnDefinitions, executionInfo, data, session, context);
new DefaultAsyncResultSet(
columnDefinitions, executionInfo, data, session, context, executionProfile);

// Then
assertThat(resultSet.wasApplied()).isFalse();
Expand All @@ -170,7 +189,8 @@ public void should_report_not_applied_if_column_present_and_true() {

// When
DefaultAsyncResultSet resultSet =
new DefaultAsyncResultSet(columnDefinitions, executionInfo, data, session, context);
new DefaultAsyncResultSet(
columnDefinitions, executionInfo, data, session, context, executionProfile);

// Then
assertThat(resultSet.wasApplied()).isTrue();
Expand All @@ -187,7 +207,12 @@ public void should_fail_to_report_if_applied_if_column_present_but_empty() {
// When
DefaultAsyncResultSet resultSet =
new DefaultAsyncResultSet(
columnDefinitions, executionInfo, new ArrayDeque<>(), session, context);
columnDefinitions,
executionInfo,
new ArrayDeque<>(),
session,
context,
executionProfile);

// Then
resultSet.wasApplied();
Expand Down
Loading