Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Draft: Add link-awareness #882

Draft
wants to merge 4 commits into
base: 3.x-work-branch
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions collect/cache/cuckooSentCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,25 @@ func (c *cuckooSentCache) Record(trace *types.Trace, keep bool) {
}

func (c *cuckooSentCache) Check(span *types.Span) (TraceSentRecord, bool) {
sr, ok := c.CheckTraceID(span.TraceID)
// if we kept it, then this span being checked needs counting too
if ok {
sr.Count(span)
}
return sr, ok
}

func (c *cuckooSentCache) CheckTraceID(traceID string) (TraceSentRecord, bool) {
// was it dropped?
if c.dropped.Check(span.TraceID) {
if c.dropped.Check(traceID) {
// we recognize it as dropped, so just say so; there's nothing else to do
return &cuckooDroppedRecord{}, false
}
// was it kept?
c.keptMut.Lock()
defer c.keptMut.Unlock()
if sentRecord, found := c.kept.Get(span.TraceID); found {
if sr, ok := sentRecord.(*cuckooKeptRecord); ok {
// if we kept it, then this span being checked needs counting too
sr.Count(span)
return sr, true
}
if sentRecord, found := c.kept.Get(traceID); found {
return sentRecord.(*cuckooKeptRecord), found
}
// we have no memory of this place
return nil, false
Expand Down
2 changes: 2 additions & 0 deletions collect/cache/traceSentCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ type TraceSentCache interface {
Stop()
// Resize adjusts the size of the cache according to the Config passed in
Resize(cfg config.SampleCacheConfig) error
// Ask the cache directly about TraceIDs
CheckTraceID(traceID string) (TraceSentRecord, bool)
}
176 changes: 168 additions & 8 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package collect

import (
"errors"
"io"
"net/http"
"os"
"runtime"
"sort"
"strings"
"sync"
"time"

Expand All @@ -13,6 +16,7 @@ import (
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/sharder"
"github.com/honeycombio/refinery/transmit"
"github.com/honeycombio/refinery/types"
"github.com/sirupsen/logrus"
Expand All @@ -29,6 +33,7 @@ type Collector interface {
Stressed() bool
GetStressedSampleRate(traceID string) (rate uint, keep bool, reason string)
ProcessSpanImmediately(sp *types.Span, keep bool, sampleRate uint, reason string)
AlreadySeen(traceID string) (keep bool, timeout time.Duration, err error)
}

func GetCollectorImplementation(c config.Config) Collector {
Expand All @@ -50,6 +55,7 @@ type InMemCollector struct {
Transmission transmit.Transmission `inject:"upstreamTransmission"`
Metrics metrics.Metrics `inject:"genericMetrics"`
SamplerFactory *sample.SamplerFactory `inject:""`
Sharder sharder.Sharder `inject:""`
StressRelief StressReliever `inject:"stressRelief"`

// For test use only
Expand Down Expand Up @@ -261,6 +267,24 @@ func (i *InMemCollector) AddSpan(sp *types.Span) error {
return i.add(sp, i.incoming)
}

// AlreadySeen(traceID string) (keep bool, timeout uint)
// if timeout and error are 0 and nil, that means kept or not-kept are the answer
// if timeout is greater than zero, it's still waiting to send
// if error is not nil, it didn't get an answer.
func (i *InMemCollector) AlreadySeen(traceID string) (keep bool, timeout_ms time.Duration, err error) {
_, kept := i.sampleTraceCache.CheckTraceID(traceID)
if kept {
return true, 0, nil
}

trace := i.cache.Get(traceID)
if trace != nil { // trace is in the cache. How long is left?
plzwait := time.Until(trace.SendBy)
return false, plzwait, nil
}
return false, 0, errors.New("trace not found")
}

// AddSpan accepts the incoming span to a queue and returns immediately
func (i *InMemCollector) AddSpanFromPeer(sp *types.Span) error {
return i.add(sp, i.fromPeer)
Expand Down Expand Up @@ -365,8 +389,10 @@ func (i *InMemCollector) collect() {

func (i *InMemCollector) sendTracesInCache(now time.Time) {
traces := i.cache.TakeExpiredTraces(now)

for _, t := range traces {
if t.RootSpan != nil {
// if the root span has a link
i.send(t, TraceSendGotRoot)
} else {
i.send(t, TraceSendExpired)
Expand Down Expand Up @@ -438,6 +464,19 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
trace.SendBy = time.Now().Add(timeout)
trace.RootSpan = sp
}

// if there are links, throw them in the pile
if i.isSpanLink(sp) {
parentId := i.getParentId(sp)
// root spans shouldn't be able to have links in OTEL land because links are
// turned into a separate event when they arrive at Refinery.
// if someone wants to do a link this way, we should allow it.
if len(parentId) < 1 {
parentId = "*It's*The*Root*Span*"
}
// this should include a reference to the span, not a copy of the span
trace.AddSpanLink(parentId, sp)
}
}

// ProcessSpanImmediately is an escape hatch used under stressful conditions --
Expand Down Expand Up @@ -552,6 +591,41 @@ func (i *InMemCollector) isRootSpan(sp *types.Span) bool {
return true
}

func (i *InMemCollector) getParentId(sp *types.Span) string {
for _, parentIdFieldName := range i.Config.GetParentIdFieldNames() {
parentId := sp.Data[parentIdFieldName]
if _, ok := parentId.(string); ok {
return parentId.(string)
}
}
return ""
}

func (i *InMemCollector) isSpanLink(sp *types.Span) bool {
// do we even bother checking for the meta.annotation_type?
// if sp.Data["meta.annotation_type"] = "link"
// Not going to include this today because I don't think it matters.
// if someone wants to use the spanlink functionality with their own fields, that should be fine.
for _, linkIdFieldName := range i.Config.GetLinkFieldNames() {
linkId := sp.Data[linkIdFieldName]
if _, ok := linkId.(string); ok {
return true
}
}
return false
}

func (i *InMemCollector) getSpanLinkId(sp *types.Span) string {
for _, linkIdFieldName := range i.Config.GetLinkFieldNames() {
linkId := sp.Data[linkIdFieldName]
if _, ok := linkId.(string); ok {
// return the first one we find. If someone is using multiple link field names in a single span, eek.
return linkId.(string)
}
}
return ""
}

func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
if trace.Sent {
// someone else already sent this so we shouldn't also send it.
Expand Down Expand Up @@ -592,15 +666,102 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
if i.Config.GetAddSpanCountToRoot() && trace.RootSpan != nil {
trace.RootSpan.Data["meta.span_count"] = int64(trace.DescendantCount())
}

// use sampler key to find sampler; create and cache if not found
if sampler, found = i.datasetSamplers[samplerKey]; !found {
sampler = i.SamplerFactory.GetSamplerImplementationForKey(samplerKey, isLegacyKey)
i.datasetSamplers[samplerKey] = sampler
rate, shouldSend, reason, key := uint(1), true, "unprocessed", ""
keep_override := false
linkstrat, err := i.Config.GetLinkStrategy()
if trace.SpanLinks != nil && err == nil && linkstrat == "RootLinkOverride" && trace.RootSpan != nil {
// If the root span has a link, let's check the proper shard
var link_sp *types.Span
var ok bool
// for RootLinkOverride, we just want to look at the root span and its children (because links are children)
if link_sp, ok = trace.SpanLinks[trace.RootSpan.Data["trace.span_id"].(string)]; !ok {
link_sp, ok = trace.SpanLinks["*It's*The*Root*Span*"]
}
// Another mode like AnyLinkOverride would scan the entire trace for links and then return an array of spans to check
if ok {
rate, shouldSend, reason, key = uint(1), true, linkstrat, ""
linked_trace_id := i.getSpanLinkId(link_sp)
linked_shard := i.Sharder.WhichShard(linked_trace_id)
plzwait, retries := 5, 5
if linked_shard.Equals(i.Sharder.MyShard()) {
for plzwait > 0 && retries > 0 {
keep, plzwait, error := i.AlreadySeen(linked_trace_id)
if err != nil {
i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("local cache check error: %v")
} else if keep && error == nil {
i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("local linked trace found")
keep_override = true
plzwait = 0
}
if plzwait > 0 {
i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("local linked trace timenot not reached: %v", plzwait)
time.Sleep(time.Duration(plzwait+20) * time.Millisecond)
}
retries = retries - 1
}
} else {
// we store other shard's linked trace IDs for a bit so let's look
for plzwait > 0 && retries > 0 {
keep, plzwait, error := i.AlreadySeen(linked_trace_id)
if keep && error == nil {
i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("remote linked trace found in local queue")
keep_override = true
plzwait = 0
}
if plzwait > 0 {
i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("remote linked trace timeout in local queue not reached: %v", plzwait)
time.Sleep(time.Duration(plzwait+20) * time.Millisecond)
}
retries = retries - 1
}
if !keep_override {
i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("linked trace not found in local cache: %v", err)
// we have to ask the proper shard
retries := 5
for retries > 0 {
url := linked_shard.GetAddress() + "/decision/" + linked_trace_id
resp, err := http.Get(url)
i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("checking %v for trace decision", url)
// parse json someday.
retries = retries - 1
if err == nil {
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("Failed to get decision, retries left: %d", retries)
} else {
if strings.Contains(string(body), "\"decision\":\"kept\"") {
i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("received decision, keep")
i.sampleTraceCache.Record(&types.Trace{TraceID: linked_trace_id, SampleRate: 1}, true)
keep_override = true
retries = 0
} else if strings.Contains(string(body), "\"decision\":\"dropped\"") {
keep_override = false
retries = 0
} else if strings.Contains(string(body), "\"decision\":\"wait\"") {
i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("Waiting for decision, retries left: %d", retries)
time.Sleep(200)
} else {
i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("something else happened, retries left: %d\n%v", retries, string(body))
}
}
}
}
}
}
}
}

// make sampling decision and update the trace
rate, shouldSend, reason, key := sampler.GetSampleRate(trace)
if !keep_override {
// use sampler key to find sampler; create and cache if not found
if sampler, found = i.datasetSamplers[samplerKey]; !found {
sampler = i.SamplerFactory.GetSamplerImplementationForKey(samplerKey, isLegacyKey)
i.datasetSamplers[samplerKey] = sampler
}

// make sampling decision and update the trace
rate, shouldSend, reason, key = sampler.GetSampleRate(trace)
}
trace.SampleRate = rate
trace.KeepSample = shouldSend
logFields["reason"] = reason
Expand All @@ -609,7 +770,6 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
}
// This will observe sample rate attempts even if the trace is dropped
i.Metrics.Histogram("trace_aggregate_sample_rate", float64(rate))

i.sampleTraceCache.Record(trace, shouldSend)

// if we're supposed to drop this trace, and dry run mode is not enabled, then we're done.
Expand Down
6 changes: 6 additions & 0 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/sharder"
"github.com/honeycombio/refinery/transmit"
"github.com/honeycombio/refinery/types"
)
Expand Down Expand Up @@ -792,6 +793,7 @@ func TestDependencyInjection(t *testing.T) {
&inject.Object{Value: &sample.SamplerFactory{}},
&inject.Object{Value: &MockStressReliever{}, Name: "stressRelief"},
&inject.Object{Value: &peer.MockPeers{}},
&inject.Object{Value: &sharder.TestSharder{}},
)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -945,6 +947,10 @@ func TestLateRootGetsSpanCount(t *testing.T) {
assert.Equal(t, int64(2), transmission.Events[1].Data["meta.span_count"], "root span metadata should be populated with span count")
assert.Equal(t, "late", transmission.Events[1].Data["meta.refinery.reason"], "late spans should have meta.refinery.reason set to late.")
transmission.Mux.RUnlock()

// hitchhiking test for the AlreadySeen functionality related to link-awareness
alreadyseen, _, _ := coll.AlreadySeen(traceID)
assert.Equal(t, true, alreadyseen)
}

// TestLateRootNotDecorated tests that spans do not get decorated with 'meta.refinery.reason' meta field
Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type Config interface {
// complete before sending it, to allow stragglers to arrive
GetSendDelay() (time.Duration, error)

// GetLinkStrategy returns a name for the link strategy to use.
GetLinkStrategy() (string, error)

// GetBatchTimeout returns how often to send off batches in seconds
GetBatchTimeout() time.Duration

Expand Down Expand Up @@ -191,6 +194,8 @@ type Config interface {
GetTraceIdFieldNames() []string

GetParentIdFieldNames() []string

GetLinkFieldNames() []string
}

type ConfigMetadata struct {
Expand Down
23 changes: 23 additions & 0 deletions config/file_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ type TracesConfig struct {
TraceTimeout Duration `yaml:"TraceTimeout" default:"60s"`
MaxBatchSize uint `yaml:"MaxBatchSize" default:"500"`
SendTicker Duration `yaml:"SendTicker" default:"100ms"`
LinkStrategy string `yaml:"LinkStrategy" default:"ignore"`
}

type DebuggingConfig struct {
Expand Down Expand Up @@ -349,6 +350,7 @@ type SpecializedConfig struct {
type IDFieldsConfig struct {
TraceNames []string `yaml:"TraceNames" default:"[\"trace.trace_id\",\"traceId\"]"`
ParentNames []string `yaml:"ParentNames" default:"[\"trace.parent_id\",\"parentId\"]"`
LinkNames []string `yaml:"LinkNames" default:"[\"link.trace.trace_id\",\"linkTraceId\"]"`
}

// GRPCServerParameters allow you to configure the GRPC ServerParameters used
Expand Down Expand Up @@ -872,6 +874,21 @@ func (f *fileConfig) GetSendTickerValue() time.Duration {
return time.Duration(f.mainConfig.Traces.SendTicker)
}

func (f *fileConfig) GetLinkStrategy() (string, error) {
f.mux.RLock()
defer f.mux.RUnlock()

switch f.mainConfig.Traces.LinkStrategy {
case "":
case "ignore":
return "ignore", nil
case "RootLinkOverride":
return "RootLinkOverride", nil
}

return "", errors.New("invalid LinkStrategy")
}

func (f *fileConfig) GetDebugServiceAddr() (string, error) {
f.mux.RLock()
defer f.mux.RUnlock()
Expand Down Expand Up @@ -1002,6 +1019,12 @@ func (f *fileConfig) GetTraceIdFieldNames() []string {
return f.mainConfig.IDFieldNames.TraceNames
}

func (f *fileConfig) GetLinkFieldNames() []string {
f.mux.RLock()
defer f.mux.RUnlock()

return f.mainConfig.IDFieldNames.LinkNames
}
func (f *fileConfig) GetParentIdFieldNames() []string {
f.mux.RLock()
defer f.mux.RUnlock()
Expand Down
Loading