Skip to content

Commit

Permalink
Refactoring opentracing out of core code, moving context unit tests i…
Browse files Browse the repository at this point in the history
…nto their own test
  • Loading branch information
sdwr98 committed Sep 14, 2020
1 parent b24cd30 commit ac2bff4
Show file tree
Hide file tree
Showing 11 changed files with 557 additions and 315 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/** Support for OpenTracing spans */
public class OpenTracingContextPropagator implements ContextPropagator {

private static final Logger log = LoggerFactory.getLogger(OpenTracingContextPropagator.class);

private static ThreadLocal<SpanContext> currentOpenTracingSpanContext = new ThreadLocal<>();
private static ThreadLocal<Span> currentOpenTracingSpan = new ThreadLocal<>();
private static ThreadLocal<Scope> currentOpenTracingScope = new ThreadLocal<>();
Expand All @@ -59,8 +63,10 @@ public String getName() {
public Map<String, byte[]> serializeContext(Object context) {
Map<String, byte[]> serializedContext = new HashMap<>();
Map<String, String> contextMap = (Map<String, String>) context;
for (Map.Entry<String, String> entry : contextMap.entrySet()) {
serializedContext.put(entry.getKey(), entry.getValue().getBytes(Charset.defaultCharset()));
if (contextMap != null) {
for (Map.Entry<String, String> entry : contextMap.entrySet()) {
serializedContext.put(entry.getKey(), entry.getValue().getBytes(Charset.defaultCharset()));
}
}
return serializedContext;
}
Expand All @@ -76,11 +82,14 @@ public Object deserializeContext(Map<String, byte[]> context) {

@Override
public Object getCurrentContext() {
log.debug("Getting current context");
Tracer currentTracer = GlobalTracer.get();
Span currentSpan = currentTracer.scopeManager().activeSpan();
if (currentSpan != null) {
HashMapTextMap contextTextMap = new HashMapTextMap();
currentTracer.inject(currentSpan.context(), Format.Builtin.TEXT_MAP, contextTextMap);
log.debug(
"Retrieving current span data as current context: " + contextTextMap.getBackingMap());
return contextTextMap.getBackingMap();
} else {
return null;
Expand All @@ -89,9 +98,11 @@ public Object getCurrentContext() {

@Override
public void setCurrentContext(Object context) {
log.debug("Setting current context");
Tracer currentTracer = GlobalTracer.get();
Map<String, String> contextAsMap = (Map<String, String>) context;
if (contextAsMap != null) {
log.debug("setting current context to " + contextAsMap);
HashMapTextMap contextTextMap = new HashMapTextMap(contextAsMap);
setCurrentOpenTracingSpanContext(
currentTracer.extract(Format.Builtin.TEXT_MAP, contextTextMap));
Expand All @@ -100,6 +111,7 @@ public void setCurrentContext(Object context) {

@Override
public void setUp() {
log.debug("Starting a new opentracing span");
Tracer openTracingTracer = GlobalTracer.get();
Tracer.SpanBuilder builder =
openTracingTracer
Expand All @@ -111,6 +123,7 @@ public void setUp() {
}

Span span = builder.start();
log.debug("New span: " + span);
openTracingTracer.activateSpan(span);
currentOpenTracingSpan.set(span);
Scope scope = openTracingTracer.activateSpan(span);
Expand All @@ -134,8 +147,36 @@ public void onError(Throwable t) {
public void finish(boolean successful) {
Scope currentScope = currentOpenTracingScope.get();
Span currentSpan = currentOpenTracingSpan.get();

log.debug("Closing currently open span " + currentSpan.context().toSpanId());
currentScope.close();
currentSpan.finish();
currentOpenTracingScope.remove();
currentOpenTracingSpan.remove();
currentOpenTracingSpanContext.remove();
}

/** Just check for other instances of the same class */
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}

if (this == obj) {
return true;
}

if (this.getClass().equals(obj.getClass())) {
return true;
}

return false;
}

@Override
public int hashCode() {
return this.getClass().hashCode();
}

private class HashMapTextMap implements TextMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public static void onErrorContextPropagators(Throwable t) {
}

/**
* Calls {@link ContextPropagator#finish(boolean))} for each propagator
* Calls {@link ContextPropagator#finish(boolean)} for each propagator
*
* @param successful True if the workflow/activity completed without unhandled exception, false
* otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

final class WorkflowContext {

Expand Down Expand Up @@ -166,7 +167,17 @@ Map<String, Object> getPropagatedContexts() {

Map<String, Object> contextData = new HashMap<>();
for (ContextPropagator propagator : contextPropagators) {
contextData.put(propagator.getName(), propagator.deserializeContext(headerData));
// Only send the context propagator the fields that belong to them
// Change the map from MyPropagator:foo -> bar to foo -> bar
Map<String, byte[]> filteredData =
headerData
.entrySet()
.stream()
.filter(e -> e.getKey().startsWith(propagator.getName()))
.collect(
Collectors.toMap(
e -> e.getKey().substring(e.getKey().indexOf(":") + 1), Map.Entry::getValue));
contextData.put(propagator.getName(), propagator.deserializeContext(filteredData));
}

return contextData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -449,7 +450,18 @@ private Map<String, byte[]> extractContextsAndConvertToBytes(
}
Map<String, byte[]> result = new HashMap<>();
for (ContextPropagator propagator : contextPropagators) {
result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
// Get the serialized context from the propagator
Map<String, byte[]> serializedContext =
propagator.serializeContext(propagator.getCurrentContext());
// Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar
Map<String, byte[]> namespacedSerializedContext =
serializedContext
.entrySet()
.stream()
.collect(
Collectors.toMap(
e -> propagator.getName() + ":" + e.getKey(), Map.Entry::getValue));
result.putAll(namespacedSerializedContext);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

class WorkflowStubImpl implements WorkflowStub {

Expand Down Expand Up @@ -188,7 +189,18 @@ private Map<String, byte[]> extractContextsAndConvertToBytes(
}
Map<String, byte[]> result = new HashMap<>();
for (ContextPropagator propagator : contextPropagators) {
result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
// Get the serialized context from the propagator
Map<String, byte[]> serializedContext =
propagator.serializeContext(propagator.getCurrentContext());
// Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar
Map<String, byte[]> namespacedSerializedContext =
serializedContext
.entrySet()
.stream()
.collect(
Collectors.toMap(
k -> propagator.getName() + ":" + k.getKey(), Map.Entry::getValue));
result.putAll(namespacedSerializedContext);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.thrift.TException;
import org.slf4j.MDC;

Expand Down Expand Up @@ -252,7 +253,18 @@ void propagateContext(PollForActivityTaskResponse response) {
});

for (ContextPropagator propagator : options.getContextPropagators()) {
propagator.setCurrentContext(propagator.deserializeContext(headerData));
// Only send the context propagator the fields that belong to them
// Change the map from MyPropagator:foo -> bar to foo -> bar
Map<String, byte[]> filteredData =
headerData
.entrySet()
.stream()
.filter(e -> e.getKey().startsWith(propagator.getName()))
.collect(
Collectors.toMap(
e -> e.getKey().substring(e.getKey().indexOf(":") + 1),
Map.Entry::getValue));
propagator.setCurrentContext(propagator.deserializeContext(filteredData));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.uber.cadence.MarkerRecordedEventAttributes;
import com.uber.cadence.PollForActivityTaskResponse;
import com.uber.cadence.common.RetryOptions;
import com.uber.cadence.context.ContextPropagator;
import com.uber.cadence.internal.common.LocalActivityMarkerData;
import com.uber.cadence.internal.metrics.MetricsTag;
import com.uber.cadence.internal.metrics.MetricsType;
Expand All @@ -38,6 +39,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

public final class LocalActivityWorker implements SuspendableWorker {

Expand Down Expand Up @@ -271,9 +273,18 @@ private void propagateContext(ExecuteLocalActivityParameters params) {
}

private void restoreContext(Map<String, byte[]> context) {
options
.getContextPropagators()
.forEach(
propagator -> propagator.setCurrentContext(propagator.deserializeContext(context)));
for (ContextPropagator propagator : options.getContextPropagators()) {
// Only send the context propagator the fields that belong to them
// Change the map from MyPropagator:foo -> bar to foo -> bar
Map<String, byte[]> filteredData =
context
.entrySet()
.stream()
.filter(e -> e.getKey().startsWith(propagator.getName()))
.collect(
Collectors.toMap(
e -> e.getKey().substring(e.getKey().indexOf(":") + 1), Map.Entry::getValue));
propagator.setCurrentContext(propagator.deserializeContext(filteredData));
}
}
}
7 changes: 5 additions & 2 deletions src/main/java/com/uber/cadence/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -958,8 +958,11 @@ private FactoryOptions(
this.contextPropagators = new ArrayList<>();
}

// Add the OpenTracing propagator
this.contextPropagators.add(new OpenTracingContextPropagator());
// Add the OpenTracing propagator if it's not already there
OpenTracingContextPropagator openTracing = new OpenTracingContextPropagator();
if (!this.contextPropagators.contains(openTracing)) {
this.contextPropagators.add(openTracing);
}
}
}
}
Loading

0 comments on commit ac2bff4

Please sign in to comment.