From 0810b20dfe199b0c3354a0875ebe42dd8493a79a Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Mon, 27 Jan 2025 17:33:18 +0000 Subject: [PATCH] Upgrade OTEL version to 1.2 Signed-off-by: Krishna Kondaka --- .../otelmetrics/OTelMetricsProtoHelper.java | 16 --- .../plugins/otel/codec/OTelProtoCodec.java | 106 +----------------- settings.gradle | 2 +- 3 files changed, 6 insertions(+), 118 deletions(-) diff --git a/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsProtoHelper.java b/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsProtoHelper.java index a925bb4730..52392b1103 100644 --- a/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsProtoHelper.java +++ b/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsProtoHelper.java @@ -8,7 +8,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.opentelemetry.proto.common.v1.AnyValue; -import io.opentelemetry.proto.common.v1.InstrumentationLibrary; import io.opentelemetry.proto.common.v1.InstrumentationScope; import io.opentelemetry.proto.common.v1.KeyValue; import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; @@ -191,21 +190,6 @@ public static Map getResourceAttributes(final Resource resource) .collect(Collectors.toMap(i -> PREFIX_AND_RESOURCE_ATTRIBUTES_REPLACE_DOT_WITH_AT.apply(i.getKey()), i -> convertAnyValue(i.getValue()))); } - /** - * Extracts the name and version of the used instrumentation library used - * - * @return A map, containing information about the instrumentation library - */ - public static Map getInstrumentationLibraryAttributes(final InstrumentationLibrary instrumentationLibrary) { - final Map instrumentationAttr = new HashMap<>(); - if (!instrumentationLibrary.getName().isEmpty()) { - instrumentationAttr.put(INSTRUMENTATION_LIBRARY_NAME, instrumentationLibrary.getName()); - } - if (!instrumentationLibrary.getVersion().isEmpty()) { - instrumentationAttr.put(INSTRUMENTATION_LIBRARY_VERSION, instrumentationLibrary.getVersion()); - } - return instrumentationAttr; - } /** * Extracts the name and version of the used instrumentation scope used diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java index 8875e90e53..ddaac079f4 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java @@ -12,7 +12,6 @@ import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.common.v1.AnyValue; -import io.opentelemetry.proto.common.v1.InstrumentationLibrary; import io.opentelemetry.proto.common.v1.InstrumentationScope; import io.opentelemetry.proto.common.v1.KeyValue; import io.opentelemetry.proto.logs.v1.LogRecord; @@ -20,11 +19,9 @@ import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; import io.opentelemetry.proto.metrics.v1.NumberDataPoint; import io.opentelemetry.proto.metrics.v1.SummaryDataPoint; -import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import io.opentelemetry.proto.metrics.v1.ScopeMetrics; import io.opentelemetry.proto.resource.v1.Resource; -import io.opentelemetry.proto.trace.v1.InstrumentationLibrarySpans; import io.opentelemetry.proto.trace.v1.ResourceSpans; import io.opentelemetry.proto.trace.v1.ScopeSpans; import io.opentelemetry.proto.trace.v1.Status; @@ -92,6 +89,7 @@ public class OTelProtoCodec { static final String INSTRUMENTATION_LIBRARY_VERSION = "instrumentationLibrary.version"; static final String STATUS_CODE = "status.code"; static final String STATUS_MESSAGE = "status.message"; + static final String ATTRIBUTES_KEY = "attributes"; /** @@ -198,17 +196,6 @@ protected Collection parseResourceLogs(ResourceLogs rs, final final Map resourceAttributes = OTelProtoCodec.getResourceAttributes(rs.getResource()); final String schemaUrl = rs.getSchemaUrl(); - Stream mappedInstrumentationLibraryLogs = rs.getInstrumentationLibraryLogsList() - .stream() - .map(ils -> - processLogsList(ils.getLogRecordsList(), - serviceName, - OTelProtoCodec.getInstrumentationLibraryAttributes(ils.getInstrumentationLibrary()), - resourceAttributes, - schemaUrl, - timeReceived)) - .flatMap(Collection::stream); - Stream mappedScopeListLogs = rs.getScopeLogsList() .stream() .map(sls -> @@ -220,7 +207,7 @@ protected Collection parseResourceLogs(ResourceLogs rs, final timeReceived)) .flatMap(Collection::stream); - return Stream.concat(mappedInstrumentationLibraryLogs, mappedScopeListLogs).collect(Collectors.toList()); + return mappedScopeListLogs.collect(Collectors.toList()); } protected Map splitResourceSpansByTraceId(final ResourceSpans resourceSpans) { @@ -239,22 +226,6 @@ protected Map splitResourceSpansByTraceId(final ResourceS } } - if (resourceSpans.getInstrumentationLibrarySpansList().size() > 0) { - for (Map.Entry> entry: splitInstrumentationLibrarySpansByTraceId(resourceSpans.getInstrumentationLibrarySpansList()).entrySet()) { - ResourceSpans.Builder resourceSpansBuilder; - String traceId = entry.getKey(); - if (resultBuilderMap.containsKey(traceId)) { - resourceSpansBuilder = resultBuilderMap.get(traceId); - } else { - resourceSpansBuilder = ResourceSpans.newBuilder(); - if (hasResource) { - resourceSpansBuilder.setResource(resource); - } - resultBuilderMap.put(traceId, resourceSpansBuilder); - } - resourceSpansBuilder.addAllInstrumentationLibrarySpans(entry.getValue()); - } - } for (Map.Entry entry: resultBuilderMap.entrySet()) { result.put(entry.getKey(), entry.getValue().build()); } @@ -270,10 +241,6 @@ protected List parseResourceSpans(final ResourceSpans resourceSpans, final return parseScopeSpans(resourceSpans.getScopeSpansList(), serviceName, resourceAttributes, timeReceived); } - if (resourceSpans.getInstrumentationLibrarySpansList().size() > 0) { - return parseInstrumentationLibrarySpans(resourceSpans.getInstrumentationLibrarySpansList(), serviceName, resourceAttributes, timeReceived); - } - LOG.debug("No spans found to parse from ResourceSpans object: {}", resourceSpans); return Collections.emptyList(); } @@ -306,39 +273,6 @@ private Map> splitScopeSpansByTraceId(final List parseInstrumentationLibrarySpans(final List instrumentationLibrarySpansList, - final String serviceName, final Map resourceAttributes, - final Instant timeReceived) { - return instrumentationLibrarySpansList.stream() - .map(instrumentationLibrarySpans -> parseSpans(instrumentationLibrarySpans.getSpansList(), - instrumentationLibrarySpans.getInstrumentationLibrary(), this::getInstrumentationLibraryAttributes, - serviceName, resourceAttributes, timeReceived)) - .flatMap(Collection::stream) - .collect(Collectors.toList()); - } - - private Map> splitInstrumentationLibrarySpansByTraceId(final List instrumentationLibrarySpansList) { - Map> result = new HashMap<>(); - for (InstrumentationLibrarySpans is: instrumentationLibrarySpansList) { - final boolean hasInstrumentationLibrary = is.hasInstrumentationLibrary(); - final io.opentelemetry.proto.common.v1.InstrumentationLibrary instrumentationLibrary = is.getInstrumentationLibrary(); - for (Map.Entry> entry: splitSpansByTraceId(is.getSpansList()).entrySet()) { - String traceId = entry.getKey(); - InstrumentationLibrarySpans.Builder ilSpansBuilder = InstrumentationLibrarySpans.newBuilder().setSchemaUrl(is.getSchemaUrl()).addAllSpans(entry.getValue()); - if (hasInstrumentationLibrary) { - ilSpansBuilder.setInstrumentationLibrary(instrumentationLibrary); - } - - if (!result.containsKey(traceId)) { - result.put(traceId, new ArrayList<>()); - } - result.get(traceId).add(ilSpansBuilder.build()); - } - } - return result; - } - - private Map> splitSpansByTraceId(final List spans) { Map> result = new HashMap<>(); for (io.opentelemetry.proto.trace.v1.Span span: spans) { @@ -547,17 +481,6 @@ protected TraceGroupFields getTraceGroupFields(final io.opentelemetry.proto.trac return traceGroupFieldsBuilder.build(); } - protected Map getInstrumentationLibraryAttributes(final InstrumentationLibrary instrumentationLibrary) { - final Map instrumentationAttr = new HashMap<>(); - if (!instrumentationLibrary.getName().isEmpty()) { - instrumentationAttr.put(INSTRUMENTATION_SCOPE_NAME, instrumentationLibrary.getName()); - } - if (!instrumentationLibrary.getVersion().isEmpty()) { - instrumentationAttr.put(INSTRUMENTATION_SCOPE_VERSION, instrumentationLibrary.getVersion()); - } - return instrumentationAttr; - } - protected Map getSpanStatusAttributes(final Status status) { final Map statusAttr = new HashMap<>(); statusAttr.put(STATUS_CODE, status.getCodeValue()); @@ -600,11 +523,6 @@ public Collection> parseExportMetricsServiceRequest( final Map resourceAttributes = OTelProtoCodec.getResourceAttributes(rs.getResource()); final String serviceName = OTelProtoCodec.getServiceName(rs.getResource()).orElse(null); - for (InstrumentationLibraryMetrics is : rs.getInstrumentationLibraryMetricsList()) { - final Map ils = OTelProtoCodec.getInstrumentationLibraryAttributes(is.getInstrumentationLibrary()); - recordsOut.addAll(processMetricsList(is.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl, droppedCounter, exponentialHistogramMaxAllowedScale, timeReceived, calculateHistogramBuckets, calculateExponentialHistogramBuckets, flattenAttributes)); - } - for (ScopeMetrics sm : rs.getScopeMetricsList()) { final Map ils = OTelProtoCodec.getInstrumentationScopeAttributes(sm.getScope()); recordsOut.addAll(processMetricsList(sm.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl, droppedCounter, exponentialHistogramMaxAllowedScale, timeReceived, calculateHistogramBuckets, calculateExponentialHistogramBuckets, flattenAttributes)); @@ -1171,23 +1089,6 @@ public static Map getResourceAttributes(final Resource resource) .collect(Collectors.toMap(i -> PREFIX_AND_RESOURCE_ATTRIBUTES_REPLACE_DOT_WITH_AT.apply(i.getKey()), i -> convertAnyValue(i.getValue()))); } - /** - * Extracts the name and version of the used instrumentation library used - * - * @param instrumentationLibrary the instrumentation library - * @return A map, containing information about the instrumentation library - */ - public static Map getInstrumentationLibraryAttributes(final InstrumentationLibrary instrumentationLibrary) { - final Map instrumentationAttr = new HashMap<>(); - if (!instrumentationLibrary.getName().isEmpty()) { - instrumentationAttr.put(INSTRUMENTATION_LIBRARY_NAME, instrumentationLibrary.getName()); - } - if (!instrumentationLibrary.getVersion().isEmpty()) { - instrumentationAttr.put(INSTRUMENTATION_LIBRARY_VERSION, instrumentationLibrary.getVersion()); - } - return instrumentationAttr; - } - /** * Extracts the name and version of the used instrumentation scope used * @@ -1202,6 +1103,9 @@ public static Map getInstrumentationScopeAttributes(final Instru if (!instrumentationScope.getVersion().isEmpty()) { instrumentationScopeAttr.put(INSTRUMENTATION_SCOPE_VERSION, instrumentationScope.getVersion()); } + if (!instrumentationScope.getAttributesList().isEmpty()) { + instrumentationScopeAttr.put(ATTRIBUTES_KEY, OTelProtoCodec.unpackKeyValueListLog(instrumentationScope.getAttributesList())); + } return instrumentationScopeAttr; } diff --git a/settings.gradle b/settings.gradle index d86bc7e1da..1121df5b4a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -39,7 +39,7 @@ dependencyResolutionManagement { version('protobuf', '3.24.3') library('protobuf-core', 'com.google.protobuf', 'protobuf-java').versionRef('protobuf') library('protobuf-util', 'com.google.protobuf', 'protobuf-java-util').versionRef('protobuf') - version('opentelemetry', '0.16.0-alpha') + version('opentelemetry', '1.2.0-alpha') library('opentelemetry-proto', 'io.opentelemetry.proto', 'opentelemetry-proto').versionRef('opentelemetry') version('opensearchJava', '2.8.1') library('opensearch-java', 'org.opensearch.client', 'opensearch-java').versionRef('opensearchJava')