diff --git a/collect/cache/cuckooSentCache.go b/collect/cache/cuckooSentCache.go index 209d87d12f..519c34d840 100644 --- a/collect/cache/cuckooSentCache.go +++ b/collect/cache/cuckooSentCache.go @@ -136,20 +136,25 @@ func (c *cuckooSentCache) Record(trace *types.Trace, keep bool) { } func (c *cuckooSentCache) Check(span *types.Span) (TraceSentRecord, bool) { + sr, ok := c.CheckTraceID(span.TraceID) + // if we kept it, then this span being checked needs counting too + if ok { + sr.Count(span) + } + return sr, ok +} + +func (c *cuckooSentCache) CheckTraceID(traceID string) (TraceSentRecord, bool) { // was it dropped? - if c.dropped.Check(span.TraceID) { + if c.dropped.Check(traceID) { // we recognize it as dropped, so just say so; there's nothing else to do return &cuckooDroppedRecord{}, false } // was it kept? c.keptMut.Lock() defer c.keptMut.Unlock() - if sentRecord, found := c.kept.Get(span.TraceID); found { - if sr, ok := sentRecord.(*cuckooKeptRecord); ok { - // if we kept it, then this span being checked needs counting too - sr.Count(span) - return sr, true - } + if sentRecord, found := c.kept.Get(traceID); found { + return sentRecord.(*cuckooKeptRecord), found } // we have no memory of this place return nil, false diff --git a/collect/cache/traceSentCache.go b/collect/cache/traceSentCache.go index b9e2d8f213..600cea2259 100644 --- a/collect/cache/traceSentCache.go +++ b/collect/cache/traceSentCache.go @@ -26,4 +26,6 @@ type TraceSentCache interface { Stop() // Resize adjusts the size of the cache according to the Config passed in Resize(cfg config.SampleCacheConfig) error + // Ask the cache directly about TraceIDs + CheckTraceID(traceID string) (TraceSentRecord, bool) } diff --git a/collect/collect.go b/collect/collect.go index 0c1f0ea1d3..f60038abb7 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -2,9 +2,12 @@ package collect import ( "errors" + "io" + "net/http" "os" "runtime" "sort" + "strings" "sync" "time" @@ -13,6 +16,7 @@ import ( "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" "github.com/honeycombio/refinery/sample" + "github.com/honeycombio/refinery/sharder" "github.com/honeycombio/refinery/transmit" "github.com/honeycombio/refinery/types" "github.com/sirupsen/logrus" @@ -29,6 +33,7 @@ type Collector interface { Stressed() bool GetStressedSampleRate(traceID string) (rate uint, keep bool, reason string) ProcessSpanImmediately(sp *types.Span, keep bool, sampleRate uint, reason string) + AlreadySeen(traceID string) (keep bool, timeout time.Duration, err error) } func GetCollectorImplementation(c config.Config) Collector { @@ -50,6 +55,7 @@ type InMemCollector struct { Transmission transmit.Transmission `inject:"upstreamTransmission"` Metrics metrics.Metrics `inject:"genericMetrics"` SamplerFactory *sample.SamplerFactory `inject:""` + Sharder sharder.Sharder `inject:""` StressRelief StressReliever `inject:"stressRelief"` // For test use only @@ -261,6 +267,24 @@ func (i *InMemCollector) AddSpan(sp *types.Span) error { return i.add(sp, i.incoming) } +// AlreadySeen(traceID string) (keep bool, timeout uint) +// if timeout and error are 0 and nil, that means kept or not-kept are the answer +// if timeout is greater than zero, it's still waiting to send +// if error is not nil, it didn't get an answer. +func (i *InMemCollector) AlreadySeen(traceID string) (keep bool, timeout_ms time.Duration, err error) { + _, kept := i.sampleTraceCache.CheckTraceID(traceID) + if kept { + return true, 0, nil + } + + trace := i.cache.Get(traceID) + if trace != nil { // trace is in the cache. How long is left? + plzwait := time.Until(trace.SendBy) + return false, plzwait, nil + } + return false, 0, errors.New("trace not found") +} + // AddSpan accepts the incoming span to a queue and returns immediately func (i *InMemCollector) AddSpanFromPeer(sp *types.Span) error { return i.add(sp, i.fromPeer) @@ -365,8 +389,10 @@ func (i *InMemCollector) collect() { func (i *InMemCollector) sendTracesInCache(now time.Time) { traces := i.cache.TakeExpiredTraces(now) + for _, t := range traces { if t.RootSpan != nil { + // if the root span has a link i.send(t, TraceSendGotRoot) } else { i.send(t, TraceSendExpired) @@ -438,6 +464,19 @@ func (i *InMemCollector) processSpan(sp *types.Span) { trace.SendBy = time.Now().Add(timeout) trace.RootSpan = sp } + + // if there are links, throw them in the pile + if i.isSpanLink(sp) { + parentId := i.getParentId(sp) + // root spans shouldn't be able to have links in OTEL land because links are + // turned into a separate event when they arrive at Refinery. + // if someone wants to do a link this way, we should allow it. + if len(parentId) < 1 { + parentId = "*It's*The*Root*Span*" + } + // this should include a reference to the span, not a copy of the span + trace.AddSpanLink(parentId, sp) + } } // ProcessSpanImmediately is an escape hatch used under stressful conditions -- @@ -552,6 +591,41 @@ func (i *InMemCollector) isRootSpan(sp *types.Span) bool { return true } +func (i *InMemCollector) getParentId(sp *types.Span) string { + for _, parentIdFieldName := range i.Config.GetParentIdFieldNames() { + parentId := sp.Data[parentIdFieldName] + if _, ok := parentId.(string); ok { + return parentId.(string) + } + } + return "" +} + +func (i *InMemCollector) isSpanLink(sp *types.Span) bool { + // do we even bother checking for the meta.annotation_type? + // if sp.Data["meta.annotation_type"] = "link" + // Not going to include this today because I don't think it matters. + // if someone wants to use the spanlink functionality with their own fields, that should be fine. + for _, linkIdFieldName := range i.Config.GetLinkFieldNames() { + linkId := sp.Data[linkIdFieldName] + if _, ok := linkId.(string); ok { + return true + } + } + return false +} + +func (i *InMemCollector) getSpanLinkId(sp *types.Span) string { + for _, linkIdFieldName := range i.Config.GetLinkFieldNames() { + linkId := sp.Data[linkIdFieldName] + if _, ok := linkId.(string); ok { + // return the first one we find. If someone is using multiple link field names in a single span, eek. + return linkId.(string) + } + } + return "" +} + func (i *InMemCollector) send(trace *types.Trace, sendReason string) { if trace.Sent { // someone else already sent this so we shouldn't also send it. @@ -592,15 +666,102 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) { if i.Config.GetAddSpanCountToRoot() && trace.RootSpan != nil { trace.RootSpan.Data["meta.span_count"] = int64(trace.DescendantCount()) } - - // use sampler key to find sampler; create and cache if not found - if sampler, found = i.datasetSamplers[samplerKey]; !found { - sampler = i.SamplerFactory.GetSamplerImplementationForKey(samplerKey, isLegacyKey) - i.datasetSamplers[samplerKey] = sampler + rate, shouldSend, reason, key := uint(1), true, "unprocessed", "" + keep_override := false + linkstrat, err := i.Config.GetLinkStrategy() + if trace.SpanLinks != nil && err == nil && linkstrat == "RootLinkOverride" && trace.RootSpan != nil { + // If the root span has a link, let's check the proper shard + var link_sp *types.Span + var ok bool + // for RootLinkOverride, we just want to look at the root span and its children (because links are children) + if link_sp, ok = trace.SpanLinks[trace.RootSpan.Data["trace.span_id"].(string)]; !ok { + link_sp, ok = trace.SpanLinks["*It's*The*Root*Span*"] + } + // Another mode like AnyLinkOverride would scan the entire trace for links and then return an array of spans to check + if ok { + rate, shouldSend, reason, key = uint(1), true, linkstrat, "" + linked_trace_id := i.getSpanLinkId(link_sp) + linked_shard := i.Sharder.WhichShard(linked_trace_id) + plzwait, retries := 5, 5 + if linked_shard.Equals(i.Sharder.MyShard()) { + for plzwait > 0 && retries > 0 { + keep, plzwait, error := i.AlreadySeen(linked_trace_id) + if err != nil { + i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("local cache check error: %v") + } else if keep && error == nil { + i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("local linked trace found") + keep_override = true + plzwait = 0 + } + if plzwait > 0 { + i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("local linked trace timenot not reached: %v", plzwait) + time.Sleep(time.Duration(plzwait+20) * time.Millisecond) + } + retries = retries - 1 + } + } else { + // we store other shard's linked trace IDs for a bit so let's look + for plzwait > 0 && retries > 0 { + keep, plzwait, error := i.AlreadySeen(linked_trace_id) + if keep && error == nil { + i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("remote linked trace found in local queue") + keep_override = true + plzwait = 0 + } + if plzwait > 0 { + i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("remote linked trace timeout in local queue not reached: %v", plzwait) + time.Sleep(time.Duration(plzwait+20) * time.Millisecond) + } + retries = retries - 1 + } + if !keep_override { + i.Logger.Debug().WithField("link.trace_id", linked_trace_id).Logf("linked trace not found in local cache: %v", err) + // we have to ask the proper shard + retries := 5 + for retries > 0 { + url := linked_shard.GetAddress() + "/decision/" + linked_trace_id + resp, err := http.Get(url) + i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("checking %v for trace decision", url) + // parse json someday. + retries = retries - 1 + if err == nil { + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("Failed to get decision, retries left: %d", retries) + } else { + if strings.Contains(string(body), "\"decision\":\"kept\"") { + i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("received decision, keep") + i.sampleTraceCache.Record(&types.Trace{TraceID: linked_trace_id, SampleRate: 1}, true) + keep_override = true + retries = 0 + } else if strings.Contains(string(body), "\"decision\":\"dropped\"") { + keep_override = false + retries = 0 + } else if strings.Contains(string(body), "\"decision\":\"wait\"") { + i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("Waiting for decision, retries left: %d", retries) + time.Sleep(200) + } else { + i.Logger.Debug().WithField("trace_id", trace.TraceID).Logf("something else happened, retries left: %d\n%v", retries, string(body)) + } + } + } + } + } + } + } } - // make sampling decision and update the trace - rate, shouldSend, reason, key := sampler.GetSampleRate(trace) + if !keep_override { + // use sampler key to find sampler; create and cache if not found + if sampler, found = i.datasetSamplers[samplerKey]; !found { + sampler = i.SamplerFactory.GetSamplerImplementationForKey(samplerKey, isLegacyKey) + i.datasetSamplers[samplerKey] = sampler + } + + // make sampling decision and update the trace + rate, shouldSend, reason, key = sampler.GetSampleRate(trace) + } trace.SampleRate = rate trace.KeepSample = shouldSend logFields["reason"] = reason @@ -609,7 +770,6 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) { } // This will observe sample rate attempts even if the trace is dropped i.Metrics.Histogram("trace_aggregate_sample_rate", float64(rate)) - i.sampleTraceCache.Record(trace, shouldSend) // if we're supposed to drop this trace, and dry run mode is not enabled, then we're done. diff --git a/collect/collect_test.go b/collect/collect_test.go index 7e615e8b8d..f19df065d6 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -18,6 +18,7 @@ import ( "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" "github.com/honeycombio/refinery/sample" + "github.com/honeycombio/refinery/sharder" "github.com/honeycombio/refinery/transmit" "github.com/honeycombio/refinery/types" ) @@ -792,6 +793,7 @@ func TestDependencyInjection(t *testing.T) { &inject.Object{Value: &sample.SamplerFactory{}}, &inject.Object{Value: &MockStressReliever{}, Name: "stressRelief"}, &inject.Object{Value: &peer.MockPeers{}}, + &inject.Object{Value: &sharder.TestSharder{}}, ) if err != nil { t.Error(err) @@ -945,6 +947,10 @@ func TestLateRootGetsSpanCount(t *testing.T) { assert.Equal(t, int64(2), transmission.Events[1].Data["meta.span_count"], "root span metadata should be populated with span count") assert.Equal(t, "late", transmission.Events[1].Data["meta.refinery.reason"], "late spans should have meta.refinery.reason set to late.") transmission.Mux.RUnlock() + + // hitchhiking test for the AlreadySeen functionality related to link-awareness + alreadyseen, _, _ := coll.AlreadySeen(traceID) + assert.Equal(t, true, alreadyseen) } // TestLateRootNotDecorated tests that spans do not get decorated with 'meta.refinery.reason' meta field diff --git a/config/config.go b/config/config.go index ef6669807a..48c7f20a5e 100644 --- a/config/config.go +++ b/config/config.go @@ -88,6 +88,9 @@ type Config interface { // complete before sending it, to allow stragglers to arrive GetSendDelay() (time.Duration, error) + // GetLinkStrategy returns a name for the link strategy to use. + GetLinkStrategy() (string, error) + // GetBatchTimeout returns how often to send off batches in seconds GetBatchTimeout() time.Duration @@ -191,6 +194,8 @@ type Config interface { GetTraceIdFieldNames() []string GetParentIdFieldNames() []string + + GetLinkFieldNames() []string } type ConfigMetadata struct { diff --git a/config/file_config.go b/config/file_config.go index 7dffb70d5c..a68274270a 100644 --- a/config/file_config.go +++ b/config/file_config.go @@ -252,6 +252,7 @@ type TracesConfig struct { TraceTimeout Duration `yaml:"TraceTimeout" default:"60s"` MaxBatchSize uint `yaml:"MaxBatchSize" default:"500"` SendTicker Duration `yaml:"SendTicker" default:"100ms"` + LinkStrategy string `yaml:"LinkStrategy" default:"ignore"` } type DebuggingConfig struct { @@ -349,6 +350,7 @@ type SpecializedConfig struct { type IDFieldsConfig struct { TraceNames []string `yaml:"TraceNames" default:"[\"trace.trace_id\",\"traceId\"]"` ParentNames []string `yaml:"ParentNames" default:"[\"trace.parent_id\",\"parentId\"]"` + LinkNames []string `yaml:"LinkNames" default:"[\"link.trace.trace_id\",\"linkTraceId\"]"` } // GRPCServerParameters allow you to configure the GRPC ServerParameters used @@ -872,6 +874,21 @@ func (f *fileConfig) GetSendTickerValue() time.Duration { return time.Duration(f.mainConfig.Traces.SendTicker) } +func (f *fileConfig) GetLinkStrategy() (string, error) { + f.mux.RLock() + defer f.mux.RUnlock() + + switch f.mainConfig.Traces.LinkStrategy { + case "": + case "ignore": + return "ignore", nil + case "RootLinkOverride": + return "RootLinkOverride", nil + } + + return "", errors.New("invalid LinkStrategy") +} + func (f *fileConfig) GetDebugServiceAddr() (string, error) { f.mux.RLock() defer f.mux.RUnlock() @@ -1002,6 +1019,12 @@ func (f *fileConfig) GetTraceIdFieldNames() []string { return f.mainConfig.IDFieldNames.TraceNames } +func (f *fileConfig) GetLinkFieldNames() []string { + f.mux.RLock() + defer f.mux.RUnlock() + + return f.mainConfig.IDFieldNames.LinkNames +} func (f *fileConfig) GetParentIdFieldNames() []string { f.mux.RLock() defer f.mux.RUnlock() diff --git a/config/metadata/configMeta.yaml b/config/metadata/configMeta.yaml index 8cd5f7bf1a..1e9cf0d071 100644 --- a/config/metadata/configMeta.yaml +++ b/config/metadata/configMeta.yaml @@ -292,6 +292,19 @@ groups: events to reduce `incoming_` or `peer_router_dropped` spikes. Decreasing this will check the trace cache for timeouts more frequently. + + - name: LinkStrategy + type: string + valuetype: string + default: ignore + reload: true + summary: tells refinery how to consider span links in sampling decisions. + description: > + Span links relate one trace to another by linking a span from one trace + to a span in another trace. Options include "ignore" or "RootLinkOverride". + "ignore" does not consider span links at all. "RootLinkOverride" considers links + that are attached to a root span. It will request the decision from the + shard that owns the trace identifier set by TraceName. - name: Debugging title: "Debugging" @@ -1154,6 +1167,19 @@ groups: The first field in the list that is present in an event will be used as the parent ID. A trace without a `parent_id` is assumed to be a root span. + + - name: LinkNames + type: stringarray + valuetype: stringarray + example: "trace.link.trace_id,trace.link_id" + reload: true + validations: + - type: elementType + arg: string + summary: is the list of field names to use for link-awareness. + description: > + Spans are checked for these fields to identify if a span link is present. + These must link to a trace identifier from TraceNames configuration. - name: GRPCServerParameters title: "gRPC Server Parameters" diff --git a/config/mock.go b/config/mock.go index f9e7525748..c745a85880 100644 --- a/config/mock.go +++ b/config/mock.go @@ -91,7 +91,9 @@ type MockConfig struct { AdditionalAttributes map[string]string TraceIdFieldNames []string ParentIdFieldNames []string + LinkIdFieldNames []string CfgMetadata []ConfigMetadata + LinkStrategy string Mux sync.RWMutex } @@ -549,3 +551,17 @@ func (f *MockConfig) GetAdditionalAttributes() map[string]string { return f.AdditionalAttributes } + +func (f *MockConfig) GetLinkStrategy() (string, error) { + f.Mux.RLock() + defer f.Mux.RUnlock() + + return f.LinkStrategy, nil +} + +func (f *MockConfig) GetLinkFieldNames() []string { + f.Mux.RLock() + defer f.Mux.RUnlock() + + return f.ParentIdFieldNames +} diff --git a/route/route.go b/route/route.go index bd83ee4c63..548d1964c3 100644 --- a/route/route.go +++ b/route/route.go @@ -153,6 +153,7 @@ func (r *Router) LnS(incomingOrPeer string) { muxxer.HandleFunc("/alive", r.alive).Name("local health") muxxer.HandleFunc("/panic", r.panic).Name("intentional panic") muxxer.HandleFunc("/version", r.version).Name("report version info") + muxxer.HandleFunc("/decision/{traceID}", r.getTraceDecision).Name("get trace decision") // require a local auth for query usage queryMuxxer := muxxer.PathPrefix("/query/").Methods("GET").Subrouter() @@ -306,6 +307,35 @@ func (r *Router) getConfigMetadata(w http.ResponseWriter, req *http.Request) { r.marshalToFormat(w, cm, "json") } +func (r *Router) getTraceDecision(w http.ResponseWriter, req *http.Request) { + traceID := mux.Vars(req)["traceID"] + shard := r.Sharder.WhichShard(traceID) + if !shard.Equals(r.Sharder.MyShard()) { + w.Write([]byte(fmt.Sprintf("traceid %v is not on this shard, try %v", traceID, shard.GetAddress()))) + w.WriteHeader(http.StatusNotFound) // maybe better to use `http.StatusMisdirectedRequest` + return + } + + // look in the decided traces pile and cache + kept, timeout, err := r.Collector.AlreadySeen(traceID) + if err != nil { + w.Write([]byte(fmt.Sprintf("traceid %v cache lookup failed: %v", traceID, err))) + w.WriteHeader(http.StatusNotFound) + return + } + if timeout > 0 { + w.Write([]byte(fmt.Sprintf(`{"traceID":"%s","decision":"wait","timeout_ms":%d}`, traceID, timeout))) + w.WriteHeader(http.StatusOK) + return + } + decision := "dropped" + if kept { + decision = "kept" + } + w.Write([]byte(fmt.Sprintf(`{"traceID":"%s","decision":"%v"}`, traceID, decision))) + w.WriteHeader(http.StatusOK) +} + func (r *Router) marshalToFormat(w http.ResponseWriter, obj interface{}, format string) { var body []byte var err error diff --git a/route/route_test.go b/route/route_test.go index bed5149da7..02d3c2612b 100644 --- a/route/route_test.go +++ b/route/route_test.go @@ -313,7 +313,7 @@ func TestDebugTrace(t *testing.T) { rr := httptest.NewRecorder() router := &Router{ - Sharder: &TestSharder{}, + Sharder: &sharder.TestSharder{}, } router.debugTrace(rr, req) @@ -425,7 +425,7 @@ func TestDependencyInjection(t *testing.T) { &inject.Object{Value: http.DefaultTransport, Name: "upstreamTransport"}, &inject.Object{Value: &transmit.MockTransmission{}, Name: "upstreamTransmission"}, &inject.Object{Value: &transmit.MockTransmission{}, Name: "peerTransmission"}, - &inject.Object{Value: &TestSharder{}}, + &inject.Object{Value: &sharder.TestSharder{}}, &inject.Object{Value: &collect.InMemCollector{}}, &inject.Object{Value: &metrics.NullMetrics{}, Name: "metrics"}, &inject.Object{Value: &metrics.NullMetrics{}, Name: "genericMetrics"}, @@ -440,23 +440,6 @@ func TestDependencyInjection(t *testing.T) { } } -type TestSharder struct{} - -func (s *TestSharder) MyShard() sharder.Shard { return nil } - -func (s *TestSharder) WhichShard(string) sharder.Shard { - return &TestShard{ - addr: "http://localhost:12345", - } -} - -type TestShard struct { - addr string -} - -func (s *TestShard) Equals(other sharder.Shard) bool { return true } -func (s *TestShard) GetAddress() string { return s.addr } - func TestEnvironmentCache(t *testing.T) { t.Run("calls getFn on cache miss", func(t *testing.T) { cache := newEnvironmentCache(time.Second, func(key string) (string, error) { diff --git a/sharder/mock.go b/sharder/mock.go new file mode 100644 index 0000000000..81963cc5a0 --- /dev/null +++ b/sharder/mock.go @@ -0,0 +1,18 @@ +package sharder + +type TestSharder struct{} + +func (s *TestSharder) MyShard() Shard { return nil } + +func (s *TestSharder) WhichShard(string) Shard { + return &TestShard{ + addr: "http://localhost:12345", + } +} + +type TestShard struct { + addr string +} + +func (s *TestShard) Equals(other Shard) bool { return true } +func (s *TestShard) GetAddress() string { return s.addr } diff --git a/types/event.go b/types/event.go index d7f37cd7a3..b30a399859 100644 --- a/types/event.go +++ b/types/event.go @@ -54,6 +54,10 @@ type Trace struct { RootSpan *Span + // SpanLinks contains a map of spans that are indexed by the trace.parent_id + // so that they can be recalled by using th RootSpan.span_id in rootoverride. + SpanLinks map[string]*Span + // DataSize is the sum of the DataSize of spans that are added. // It's used to help expire the most expensive traces. DataSize int @@ -120,6 +124,14 @@ func (t *Trace) GetSamplerKey() (string, bool) { return env, false } +func (t *Trace) AddSpanLink(parentId string, sp *Span) error { + if t.SpanLinks == nil { + t.SpanLinks = map[string]*Span{} + } + t.SpanLinks[parentId] = sp + return nil +} + // Span is an event that shows up with a trace ID, so will be part of a Trace type Span struct { Event