Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/cloud.google.com/go/pubsub.v1: split tracing code #2852

Merged
merged 10 commits into from
Sep 12, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package tracing

import (
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
)

type config struct {
serviceName string
publishSpanName string
receiveSpanName string
measured bool
}

func defaultConfig() *config {
return &config{
serviceName: namingschema.ServiceNameOverrideV0("", ""),
publishSpanName: namingschema.OpName(namingschema.GCPPubSubOutbound),
receiveSpanName: namingschema.OpName(namingschema.GCPPubSubInbound),
measured: false,
}
}

// Option is used to customize spans started by WrapReceiveHandler or Publish.
type Option func(cfg *config)

// WithServiceName sets the service name tag for traces started by WrapReceiveHandler or Publish.
func WithServiceName(serviceName string) Option {
return func(cfg *config) {
cfg.serviceName = serviceName
}
}

// WithMeasured sets the measured tag for traces started by WrapReceiveHandler or Publish.
func WithMeasured() Option {
return func(cfg *config) {
cfg.measured = true
}
}
120 changes: 120 additions & 0 deletions contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package tracing

import (
"context"
"sync"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"
)

const componentName = "cloud.google.com/go/pubsub.v1"

func init() {
telemetry.LoadIntegration(componentName)
tracer.MarkIntegrationImported(componentName)
}

type Message struct {
ID string
Data []byte
OrderingKey string
Attributes map[string]string
DeliveryAttempt *int
PublishTime time.Time
}

type Topic interface {
String() string
}

type Subscription interface {
String() string
}

func TracePublish(ctx context.Context, topic Topic, msg *Message, opts ...Option) (context.Context, func(serverID string, err error)) {
cfg := defaultConfig()
for _, opt := range opts {
opt(cfg)
}
spanOpts := []ddtrace.StartSpanOption{
tracer.ResourceName(topic.String()),
tracer.SpanType(ext.SpanTypeMessageProducer),
tracer.Tag("message_size", len(msg.Data)),
tracer.Tag("ordering_key", msg.OrderingKey),
tracer.Tag(ext.Component, componentName),
tracer.Tag(ext.SpanKind, ext.SpanKindProducer),
tracer.Tag(ext.MessagingSystem, ext.MessagingSystemGCPPubsub),
}
if cfg.serviceName != "" {
spanOpts = append(spanOpts, tracer.ServiceName(cfg.serviceName))
}
if cfg.measured {
spanOpts = append(spanOpts, tracer.Measured())
}
span, ctx := tracer.StartSpanFromContext(
ctx,
cfg.publishSpanName,
spanOpts...,
)
if msg.Attributes == nil {
msg.Attributes = make(map[string]string)
}
if err := tracer.Inject(span.Context(), tracer.TextMapCarrier(msg.Attributes)); err != nil {
log.Debug("contrib/cloud.google.com/go/pubsub.v1/trace: failed injecting tracing attributes: %v", err)
}
span.SetTag("num_attributes", len(msg.Attributes))

var once sync.Once
closeSpan := func(serverID string, err error) {
once.Do(func() {
span.SetTag("server_id", serverID)
span.Finish(tracer.WithError(err))
})
}
return ctx, closeSpan
}

func TraceReceiveFunc(s Subscription, opts ...Option) func(ctx context.Context, msg *Message) (context.Context, func()) {
cfg := defaultConfig()
for _, opt := range opts {
opt(cfg)
}
log.Debug("contrib/cloud.google.com/go/pubsub.v1/trace: Wrapping Receive Handler: %#v", cfg)
return func(ctx context.Context, msg *Message) (context.Context, func()) {
parentSpanCtx, _ := tracer.Extract(tracer.TextMapCarrier(msg.Attributes))
opts := []ddtrace.StartSpanOption{
tracer.ResourceName(s.String()),
tracer.SpanType(ext.SpanTypeMessageConsumer),
tracer.Tag("message_size", len(msg.Data)),
tracer.Tag("num_attributes", len(msg.Attributes)),
tracer.Tag("ordering_key", msg.OrderingKey),
tracer.Tag("message_id", msg.ID),
tracer.Tag("publish_time", msg.PublishTime.String()),
tracer.Tag(ext.Component, componentName),
tracer.Tag(ext.SpanKind, ext.SpanKindConsumer),
tracer.Tag(ext.MessagingSystem, ext.MessagingSystemGCPPubsub),
tracer.ChildOf(parentSpanCtx),
}
if cfg.serviceName != "" {
opts = append(opts, tracer.ServiceName(cfg.serviceName))
}
if cfg.measured {
opts = append(opts, tracer.Measured())
}
span, ctx := tracer.StartSpanFromContext(ctx, cfg.receiveSpanName, opts...)
if msg.DeliveryAttempt != nil {
span.SetTag("delivery_attempt", *msg.DeliveryAttempt)
}
return ctx, func() { span.Finish() }
}
}
36 changes: 6 additions & 30 deletions contrib/cloud.google.com/go/pubsub.v1/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,17 @@
package pubsub

import (
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
"gopkg.in/DataDog/dd-trace-go.v1/contrib/cloud.google.com/go/pubsub.v1/internal/tracing"
)

type config struct {
serviceName string
publishSpanName string
receiveSpanName string
measured bool
}
// Option is used to customize spans started by WrapReceiveHandler or Publish.
type Option = tracing.Option

func defaultConfig() *config {
return &config{
serviceName: namingschema.ServiceNameOverrideV0("", ""),
publishSpanName: namingschema.OpName(namingschema.GCPPubSubOutbound),
receiveSpanName: namingschema.OpName(namingschema.GCPPubSubInbound),
measured: false,
}
}

// A Option is used to customize spans started by WrapReceiveHandler or Publish.
type Option func(cfg *config)

// A ReceiveOption has been deprecated in favor of Option.
// Deprecated: ReceiveOption has been deprecated in favor of Option.
type ReceiveOption = Option

// WithServiceName sets the service name tag for traces started by WrapReceiveHandler or Publish.
func WithServiceName(serviceName string) Option {
return func(cfg *config) {
cfg.serviceName = serviceName
}
}
var WithServiceName = tracing.WithServiceName

// WithMeasured sets the measured tag for traces started by WrapReceiveHandler or Publish.
func WithMeasured() Option {
return func(cfg *config) {
cfg.measured = true
}
}
var WithMeasured = tracing.WithMeasured
108 changes: 26 additions & 82 deletions contrib/cloud.google.com/go/pubsub.v1/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,11 @@ package pubsub

import (
"context"
"sync"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"

"cloud.google.com/go/pubsub"
)

const componentName = "cloud.google.com/go/pubsub.v1"

func init() {
telemetry.LoadIntegration(componentName)
tracer.MarkIntegrationImported(componentName)
}
"gopkg.in/DataDog/dd-trace-go.v1/contrib/cloud.google.com/go/pubsub.v1/internal/tracing"
)

// Publish publishes a message on the specified topic and returns a PublishResult.
// This function is functionally equivalent to t.Publish(ctx, msg), but it also starts a publish
Expand All @@ -33,96 +21,52 @@ func init() {
// It is required to call (*PublishResult).Get(ctx) on the value returned by Publish to complete
// the span.
func Publish(ctx context.Context, t *pubsub.Topic, msg *pubsub.Message, opts ...Option) *PublishResult {
cfg := defaultConfig()
for _, opt := range opts {
opt(cfg)
}
spanOpts := []ddtrace.StartSpanOption{
tracer.ResourceName(t.String()),
tracer.SpanType(ext.SpanTypeMessageProducer),
tracer.Tag("message_size", len(msg.Data)),
tracer.Tag("ordering_key", msg.OrderingKey),
tracer.Tag(ext.Component, componentName),
tracer.Tag(ext.SpanKind, ext.SpanKindProducer),
tracer.Tag(ext.MessagingSystem, ext.MessagingSystemGCPPubsub),
}
if cfg.serviceName != "" {
spanOpts = append(spanOpts, tracer.ServiceName(cfg.serviceName))
}
if cfg.measured {
spanOpts = append(spanOpts, tracer.Measured())
}
span, ctx := tracer.StartSpanFromContext(
ctx,
cfg.publishSpanName,
spanOpts...,
)
if msg.Attributes == nil {
msg.Attributes = make(map[string]string)
}
if err := tracer.Inject(span.Context(), tracer.TextMapCarrier(msg.Attributes)); err != nil {
log.Debug("contrib/cloud.google.com/go/pubsub.v1/: failed injecting tracing attributes: %v", err)
}
span.SetTag("num_attributes", len(msg.Attributes))
traceMsg := newTraceMessage(msg)
ctx, closeSpan := tracing.TracePublish(ctx, t, traceMsg, opts...)
msg.Attributes = traceMsg.Attributes

return &PublishResult{
PublishResult: t.Publish(ctx, msg),
span: span,
closeSpan: closeSpan,
}
}

// PublishResult wraps *pubsub.PublishResult
type PublishResult struct {
*pubsub.PublishResult
once sync.Once
span tracer.Span
closeSpan func(serverID string, err error)
}

// Get wraps (pubsub.PublishResult).Get(ctx). When this function returns the publish
// span created in Publish is completed.
func (r *PublishResult) Get(ctx context.Context) (string, error) {
serverID, err := r.PublishResult.Get(ctx)
r.once.Do(func() {
r.span.SetTag("server_id", serverID)
r.span.Finish(tracer.WithError(err))
})
r.closeSpan(serverID, err)
return serverID, err
}

// WrapReceiveHandler returns a receive handler that wraps the supplied handler,
// extracts any tracing metadata attached to the received message, and starts a
// receive span.
func WrapReceiveHandler(s *pubsub.Subscription, f func(context.Context, *pubsub.Message), opts ...Option) func(context.Context, *pubsub.Message) {
cfg := defaultConfig()
for _, opt := range opts {
opt(cfg)
}
log.Debug("contrib/cloud.google.com/go/pubsub.v1: Wrapping Receive Handler: %#v", cfg)
traceFn := tracing.TraceReceiveFunc(s, opts...)
return func(ctx context.Context, msg *pubsub.Message) {
parentSpanCtx, _ := tracer.Extract(tracer.TextMapCarrier(msg.Attributes))
opts := []ddtrace.StartSpanOption{
tracer.ResourceName(s.String()),
tracer.SpanType(ext.SpanTypeMessageConsumer),
tracer.Tag("message_size", len(msg.Data)),
tracer.Tag("num_attributes", len(msg.Attributes)),
tracer.Tag("ordering_key", msg.OrderingKey),
tracer.Tag("message_id", msg.ID),
tracer.Tag("publish_time", msg.PublishTime.String()),
tracer.Tag(ext.Component, componentName),
tracer.Tag(ext.SpanKind, ext.SpanKindConsumer),
tracer.Tag(ext.MessagingSystem, ext.MessagingSystemGCPPubsub),
tracer.ChildOf(parentSpanCtx),
}
if cfg.serviceName != "" {
opts = append(opts, tracer.ServiceName(cfg.serviceName))
}
if cfg.measured {
opts = append(opts, tracer.Measured())
}
span, ctx := tracer.StartSpanFromContext(ctx, cfg.receiveSpanName, opts...)
if msg.DeliveryAttempt != nil {
span.SetTag("delivery_attempt", *msg.DeliveryAttempt)
}
defer span.Finish()
ctx, closeSpan := traceFn(ctx, newTraceMessage(msg))
defer closeSpan()
f(ctx, msg)
}
}

func newTraceMessage(msg *pubsub.Message) *tracing.Message {
if msg == nil {
return nil
}
return &tracing.Message{
ID: msg.ID,
Data: msg.Data,
OrderingKey: msg.OrderingKey,
Attributes: msg.Attributes,
DeliveryAttempt: msg.DeliveryAttempt,
PublishTime: msg.PublishTime,
}
}
Loading
Loading