Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fixed Span nesting for ReadWriteTransactionCallable by using parent SpanContext instead of just parent Context #1495

Merged
merged 6 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.datastore.execution.AggregationQueryExecutor;
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.cloud.datastore.telemetry.TraceUtil.Context;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -42,6 +42,11 @@
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -53,7 +58,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

final class DatastoreImpl extends BaseService<DatastoreOptions> implements Datastore {

Expand Down Expand Up @@ -106,15 +111,18 @@ static class ReadWriteTransactionCallable<T> implements Callable<T> {
private volatile TransactionOptions options;
private volatile Transaction transaction;

private final com.google.cloud.datastore.telemetry.TraceUtil.SpanContext parentSpanContext;

ReadWriteTransactionCallable(
Datastore datastore,
TransactionCallable<T> callable,
TransactionOptions options,
@Nonnull Context parentTraceContext) {
@Nullable com.google.cloud.datastore.telemetry.TraceUtil.SpanContext parentSpanContext) {
this.datastore = datastore;
this.callable = callable;
this.options = options;
this.transaction = null;
this.parentSpanContext = parentSpanContext;
}

Datastore getDatastore() {
Expand All @@ -135,26 +143,57 @@ void setPrevTransactionId(ByteString transactionId) {
options = options.toBuilder().setReadWrite(readWrite).build();
}

private io.opentelemetry.api.trace.Span startSpanWithParentContext(
String spanName,
com.google.cloud.datastore.telemetry.TraceUtil.SpanContext parentSpanContext) {
com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil =
datastore.getOptions().getTraceUtil();
SpanBuilder spanBuilder =
otelTraceUtil
.getTracer()
.spanBuilder(com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN)
.setSpanKind(SpanKind.PRODUCER)
.setParent(
Context.current()
.with(
io.opentelemetry.api.trace.Span.wrap(
parentSpanContext.getSpanContext())));
return spanBuilder.startSpan();
}

@Override
public T call() throws DatastoreException {
com.google.cloud.datastore.telemetry.TraceUtil traceUtil =
datastore.getOptions().getTraceUtil();
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
traceUtil.startSpan(
// TODO Instead of using OTel Spans directly, TraceUtil.Span should be used here. However,
// the same code in startSpanInternal doesn't work when EnabledTraceUtil.StartSpan is called
// probably because of some thread-local caching that is getting lost. This needs more
// debugging. The code below works and is idiomatic but could be prettier and more consistent
// with the use of TraceUtil-provided framework.
io.opentelemetry.api.trace.Span span =
startSpanWithParentContext(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN,
datastore.getOptions().getTraceUtil().getCurrentContext());
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
parentSpanContext);
try (io.opentelemetry.context.Scope ignored = span.makeCurrent()) {
transaction = datastore.newTransaction(options);
T value = callable.run(transaction);
transaction.commit();
return value;
} catch (Exception ex) {
transaction.rollback();
span.setStatus(StatusCode.ERROR, ex.getMessage());
span.recordException(
ex,
Attributes.builder()
.put("exception.message", ex.getMessage())
.put("exception.type", ex.getClass().getName())
.put("exception.stacktrace", Throwables.getStackTraceAsString(ex))
.build());
span.end();
throw DatastoreException.propagateUserException(ex);
} finally {
if (transaction.isActive()) {
transaction.rollback();
}
span.end();
if (options != null
&& options.getModeCase().equals(TransactionOptions.ModeCase.READ_WRITE)) {
setPrevTransactionId(transaction.getTransactionId());
Expand All @@ -165,42 +204,30 @@ public T call() throws DatastoreException {

@Override
public <T> T runInTransaction(final TransactionCallable<T> callable) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
try {
return RetryHelper.runWithRetries(
new ReadWriteTransactionCallable<T>(
this, callable, null, otelTraceUtil.getCurrentContext()),
this, callable, null, otelTraceUtil.getCurrentSpanContext()),
retrySettings,
TRANSACTION_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end();
}
}

@Override
public <T> T runInTransaction(
final TransactionCallable<T> callable, TransactionOptions transactionOptions) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
try {
return RetryHelper.runWithRetries(
new ReadWriteTransactionCallable<T>(
this, callable, transactionOptions, otelTraceUtil.getCurrentContext()),
this, callable, transactionOptions, otelTraceUtil.getCurrentSpanContext()),
retrySettings,
TRANSACTION_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end();
}
}

Expand Down Expand Up @@ -258,11 +285,14 @@ public AggregationResults runAggregation(

com.google.datastore.v1.RunQueryResponse runQuery(
final com.google.datastore.v1.RunQueryRequest requestPb) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY);
ReadOptions readOptions = requestPb.getReadOptions();
span.setAttribute(
"isTransactional", readOptions.hasTransaction() || readOptions.hasNewTransaction());
boolean isTransactional = readOptions.hasTransaction() || readOptions.hasNewTransaction();
String spanName =
(isTransactional
? com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN_QUERY
: com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY);
com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName);
span.setAttribute("isTransactional", isTransactional);
span.setAttribute("readConsistency", readOptions.getReadConsistency().toString());

try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
Expand All @@ -275,7 +305,7 @@ com.google.datastore.v1.RunQueryResponse runQuery(
: TRANSACTION_OPERATION_EXCEPTION_HANDLER,
getOptions().getClock());
span.addEvent(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY + ": Completed",
spanName + ": Completed",
new ImmutableMap.Builder<String, Object>()
.put("Received", response.getBatch().getEntityResultsCount())
.put("More results", response.getBatch().getMoreResults().toString())
Expand Down Expand Up @@ -687,7 +717,7 @@ com.google.datastore.v1.BeginTransactionResponse beginTransaction(
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_BEGIN_TRANSACTION,
otelTraceUtil.getCurrentContext());
otelTraceUtil.getCurrentSpanContext());
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope scope = span.makeCurrent()) {
return RetryHelper.runWithRetries(
() -> datastoreRpc.beginTransaction(requestPb),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.cloud.datastore.telemetry.TraceUtil.SpanContext;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.TracerProvider;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -31,6 +35,13 @@
@InternalApi
public class DisabledTraceUtil implements TraceUtil {

static class SpanContext implements TraceUtil.SpanContext {
@Override
public io.opentelemetry.api.trace.SpanContext getSpanContext() {
return null;
}
}

static class Span implements TraceUtil.Span {
@Override
public void end() {}
Expand Down Expand Up @@ -66,6 +77,10 @@ public TraceUtil.Span setAttribute(String key, boolean value) {
return this;
}

public io.opentelemetry.api.trace.Span getSpan() {
return null;
}

@Override
public Scope makeCurrent() {
return new Scope();
Expand Down Expand Up @@ -96,7 +111,7 @@ public Span startSpan(String spanName) {
}

@Override
public TraceUtil.Span startSpan(String spanName, TraceUtil.Context parent) {
public TraceUtil.Span startSpan(String spanName, TraceUtil.SpanContext parentSpanContext) {
return new Span();
}

Expand All @@ -111,4 +126,15 @@ public TraceUtil.Span getCurrentSpan() {
public TraceUtil.Context getCurrentContext() {
return new Context();
}

@Nonnull
@Override
public TraceUtil.SpanContext getCurrentSpanContext() {
return new SpanContext();
}

@Override
public Tracer getTracer() {
return TracerProvider.noop().get(LIBRARY_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.cloud.datastore.telemetry.TraceUtil.SpanContext;
import com.google.common.base.Throwables;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -70,6 +73,19 @@ public ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> getChannelConfi
return null;
}

static class SpanContext implements TraceUtil.SpanContext {
private final io.opentelemetry.api.trace.SpanContext spanContext;

public SpanContext(io.opentelemetry.api.trace.SpanContext spanContext) {
this.spanContext = spanContext;
}

@Override
public io.opentelemetry.api.trace.SpanContext getSpanContext() {
return this.spanContext;
}
}

static class Span implements TraceUtil.Span {
private final io.opentelemetry.api.trace.Span span;
private final String spanName;
Expand Down Expand Up @@ -181,6 +197,10 @@ public TraceUtil.Span setAttribute(String key, boolean value) {
return this;
}

public io.opentelemetry.api.trace.Span getSpan() {
return this.span;
}

@Override
public Scope makeCurrent() {
try (io.opentelemetry.context.Scope scope = span.makeCurrent()) {
Expand Down Expand Up @@ -286,13 +306,15 @@ public Span startSpan(String spanName) {
}

@Override
public TraceUtil.Span startSpan(String spanName, TraceUtil.Context parent) {
assert (parent instanceof Context);
public TraceUtil.Span startSpan(String spanName, TraceUtil.SpanContext parentSpanContext) {
SpanBuilder spanBuilder =
tracer
.spanBuilder(spanName)
.setSpanKind(SpanKind.PRODUCER)
.setParent(((Context) parent).context);
.setParent(
io.opentelemetry.context.Context.current()
.with(
io.opentelemetry.api.trace.Span.wrap(parentSpanContext.getSpanContext())));
io.opentelemetry.api.trace.Span span =
addSettingsAttributesToCurrentSpan(spanBuilder).startSpan();
return new Span(span, spanName);
Expand All @@ -309,4 +331,15 @@ public TraceUtil.Span getCurrentSpan() {
public TraceUtil.Context getCurrentContext() {
return new Context(io.opentelemetry.context.Context.current());
}

@Nonnull
@Override
public TraceUtil.SpanContext getCurrentSpanContext() {
return new SpanContext(io.opentelemetry.api.trace.Span.current().getSpanContext());
}

@Override
public Tracer getTracer() {
return this.tracer;
}
}
Loading
Loading