Skip to content

Commit

Permalink
Add Environment & Services support (#403)
Browse files Browse the repository at this point in the history
Co-authored-by: Vera Reynolds <[email protected]>
  • Loading branch information
MikeGoldsmith and vreynolds authored Feb 17, 2022
1 parent 4c9612a commit e5aa8cd
Show file tree
Hide file tree
Showing 15 changed files with 595 additions and 81 deletions.
172 changes: 160 additions & 12 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ import (
"github.com/honeycombio/refinery/transmit"
)

const legacyAPIKey = "c9945edf5d245834089a1bd6cc9ad01e"
const nonLegacyAPIKey = "d245834089a1bd6cc9ad01e"

type countingWriterSender struct {
transmission.WriterSender

Expand Down Expand Up @@ -113,7 +116,7 @@ func newStartedApp(
GetPeerBufferSizeVal: 10000,
GetListenAddrVal: "127.0.0.1:" + strconv.Itoa(basePort),
GetPeerListenAddrVal: "127.0.0.1:" + strconv.Itoa(basePort+1),
GetAPIKeysVal: []string{"KEY"},
GetAPIKeysVal: []string{legacyAPIKey, nonLegacyAPIKey},
GetHoneycombAPIVal: "http://api.honeycomb.io",
GetInMemoryCollectorCacheCapacityVal: config.InMemoryCollectorCacheCapacity{CacheCapacity: 10000},
AddHostMetadataToTrace: enableHostMetadata,
Expand Down Expand Up @@ -227,7 +230,48 @@ func TestAppIntegration(t *testing.T) {
"http://localhost:10000/1/batch/dataset",
strings.NewReader(`[{"data":{"trace.trace_id":"1","foo":"bar"}}]`),
)
req.Header.Set("X-Honeycomb-Team", "KEY")
req.Header.Set("X-Honeycomb-Team", legacyAPIKey)
req.Header.Set("Content-Type", "application/json")

resp, err := http.DefaultTransport.RoundTrip(req)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
resp.Body.Close()

err = startstop.Stop(graph.Objects(), nil)
assert.NoError(t, err)

// Wait for span to be sent.
deadline := time.After(time.Second)
for {
if out.Len() > 62 {
break
}
select {
case <-deadline:
t.Error("timed out waiting for output")
return
case <-time.After(time.Millisecond):
}
}
assert.Equal(t, `{"data":{"foo":"bar","trace.trace_id":"1"},"dataset":"dataset"}`+"\n", out.String())
}

func TestAppIntegrationWithNonLegacyKey(t *testing.T) {
t.Parallel()

var out bytes.Buffer
a, graph := newStartedApp(t, &transmission.WriterSender{W: &out}, 10500, nil, false)
a.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) {return "test", nil})
a.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) {return "test", nil})

// Send a root span, it should be sent in short order.
req := httptest.NewRequest(
"POST",
"http://localhost:10500/1/batch/dataset",
strings.NewReader(`[{"data":{"trace.trace_id":"1","foo":"bar"}}]`),
)
req.Header.Set("X-Honeycomb-Team", nonLegacyAPIKey)
req.Header.Set("Content-Type", "application/json")

resp, err := http.DefaultTransport.RoundTrip(req)
Expand Down Expand Up @@ -284,7 +328,7 @@ func TestPeerRouting(t *testing.T) {
nil,
)
assert.NoError(t, err)
req.Header.Set("X-Honeycomb-Team", "KEY")
req.Header.Set("X-Honeycomb-Team", legacyAPIKey)
req.Header.Set("Content-Type", "application/json")

blob := `[` + string(spans[0]) + `]`
Expand All @@ -295,7 +339,7 @@ func TestPeerRouting(t *testing.T) {
}, 2*time.Second, 2*time.Millisecond)

expectedEvent := &transmission.Event{
APIKey: "KEY",
APIKey: legacyAPIKey,
Dataset: "dataset",
SampleRate: 2,
APIHost: "http://api.honeycomb.io",
Expand Down Expand Up @@ -330,7 +374,7 @@ func TestPeerRouting(t *testing.T) {
nil,
)
assert.NoError(t, err)
req.Header.Set("X-Honeycomb-Team", "KEY")
req.Header.Set("X-Honeycomb-Team", legacyAPIKey)
req.Header.Set("Content-Type", "application/json")

req.Body = ioutil.NopCloser(strings.NewReader(blob))
Expand All @@ -354,7 +398,7 @@ func TestHostMetadataSpanAdditions(t *testing.T) {
"http://localhost:14000/1/batch/dataset",
strings.NewReader(`[{"data":{"foo":"bar","trace.trace_id":"1"}}]`),
)
req.Header.Set("X-Honeycomb-Team", "KEY")
req.Header.Set("X-Honeycomb-Team", legacyAPIKey)
req.Header.Set("Content-Type", "application/json")

resp, err := http.DefaultTransport.RoundTrip(req)
Expand Down Expand Up @@ -415,7 +459,7 @@ func TestEventsEndpoint(t *testing.T) {
bytes.NewReader(blob),
)
assert.NoError(t, err)
req.Header.Set("X-Honeycomb-Team", "KEY")
req.Header.Set("X-Honeycomb-Team", legacyAPIKey)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "zstd")
req.Header.Set("X-Honeycomb-Event-Time", now.Format(time.RFC3339Nano))
Expand All @@ -429,7 +473,7 @@ func TestEventsEndpoint(t *testing.T) {
assert.Equal(
t,
&transmission.Event{
APIKey: "KEY",
APIKey: legacyAPIKey,
Dataset: "dataset",
SampleRate: 10,
APIHost: "http://api.honeycomb.io",
Expand Down Expand Up @@ -457,7 +501,111 @@ func TestEventsEndpoint(t *testing.T) {
buf,
)
assert.NoError(t, err)
req.Header.Set("X-Honeycomb-Team", "KEY")
req.Header.Set("X-Honeycomb-Team", legacyAPIKey)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("X-Honeycomb-Event-Time", now.Format(time.RFC3339Nano))
req.Header.Set("X-Honeycomb-Samplerate", "10")

post(t, req)
assert.Eventually(t, func() bool {
return len(senders[1].Events()) == 1
}, 2*time.Second, 2*time.Millisecond)

assert.Equal(
t,
&transmission.Event{
APIKey: legacyAPIKey,
Dataset: "dataset",
SampleRate: 10,
APIHost: "http://api.honeycomb.io",
Timestamp: now,
Data: map[string]interface{}{
"trace.trace_id": "1",
"foo": "bar",
},
},
senders[1].Events()[0],
)
}

func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
t.Parallel()

peers := &testPeers{
peers: []string{
"http://localhost:15001",
"http://localhost:15003",
},
}

var apps [2]*App
var addrs [2]string
var senders [2]*transmission.MockSender
for i := range apps {
basePort := 15000 + (i * 2)
senders[i] = &transmission.MockSender{}
app, graph := newStartedApp(t, senders[i], basePort, peers, false)
app.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil})
app.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil})
apps[i] = app
defer startstop.Stop(graph.Objects(), nil)

addrs[i] = "localhost:" + strconv.Itoa(basePort)
}

// Deliver to host 1, it should be passed to host 0 and emitted there.
zEnc, _ := zstd.NewWriter(nil)
blob := zEnc.EncodeAll([]byte(`{"foo":"bar","trace.trace_id":"1"}`), nil)
req, err := http.NewRequest(
"POST",
"http://localhost:15002/1/events/dataset",
bytes.NewReader(blob),
)
assert.NoError(t, err)
req.Header.Set("X-Honeycomb-Team", nonLegacyAPIKey)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "zstd")
req.Header.Set("X-Honeycomb-Event-Time", now.Format(time.RFC3339Nano))
req.Header.Set("X-Honeycomb-Samplerate", "10")

post(t, req)
assert.Eventually(t, func() bool {
return len(senders[0].Events()) == 1
}, 2*time.Second, 2*time.Millisecond)

assert.Equal(
t,
&transmission.Event{
APIKey: nonLegacyAPIKey,
Dataset: "dataset",
SampleRate: 10,
APIHost: "http://api.honeycomb.io",
Timestamp: now,
Data: map[string]interface{}{
"trace.trace_id": "1",
"foo": "bar",
},
},
senders[0].Events()[0],
)

// Repeat, but deliver to host 1 on the peer channel, it should not be
// passed to host 0.

blob = blob[:0]
buf := bytes.NewBuffer(blob)
gz := gzip.NewWriter(buf)
gz.Write([]byte(`{"foo":"bar","trace.trace_id":"1"}`))
gz.Close()

req, err = http.NewRequest(
"POST",
"http://localhost:15003/1/events/dataset",
buf,
)
assert.NoError(t, err)
req.Header.Set("X-Honeycomb-Team", nonLegacyAPIKey)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("X-Honeycomb-Event-Time", now.Format(time.RFC3339Nano))
Expand All @@ -471,7 +619,7 @@ func TestEventsEndpoint(t *testing.T) {
assert.Equal(
t,
&transmission.Event{
APIKey: "KEY",
APIKey: nonLegacyAPIKey,
Dataset: "dataset",
SampleRate: 10,
APIHost: "http://api.honeycomb.io",
Expand Down Expand Up @@ -553,7 +701,7 @@ func BenchmarkTraces(b *testing.B) {
nil,
)
assert.NoError(b, err)
req.Header.Set("X-Honeycomb-Team", "KEY")
req.Header.Set("X-Honeycomb-Team", legacyAPIKey)
req.Header.Set("Content-Type", "application/json")

b.Run("single", func(b *testing.B) {
Expand Down Expand Up @@ -656,7 +804,7 @@ func BenchmarkDistributedTraces(b *testing.B) {
nil,
)
assert.NoError(b, err)
req.Header.Set("X-Honeycomb-Team", "KEY")
req.Header.Set("X-Honeycomb-Team", legacyAPIKey)
req.Header.Set("Content-Type", "application/json")

b.Run("single", func(b *testing.B) {
Expand Down
26 changes: 19 additions & 7 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/transmit"
"github.com/honeycombio/refinery/types"
"github.com/sirupsen/logrus"
)

var ErrWouldBlock = errors.New("not adding span, channel buffer is full")
Expand Down Expand Up @@ -437,10 +438,21 @@ func (i *InMemCollector) send(trace *types.Trace) {
var sampler sample.Sampler
var found bool

if sampler, found = i.datasetSamplers[trace.Dataset]; !found {
sampler = i.SamplerFactory.GetSamplerImplementationForDataset(trace.Dataset)
// save sampler for later
i.datasetSamplers[trace.Dataset] = sampler
// get sampler key (dataset for legacy keys, environment for new keys)
samplerKey, isLegacyKey := trace.GetSamplerKey()
logFields := logrus.Fields{
"trace_id": trace.TraceID,
}
if isLegacyKey {
logFields["dataset"] = samplerKey
} else {
logFields["environment"] = samplerKey
}

// use sampler key to find sampler, crete and cache if not found
if sampler, found = i.datasetSamplers[samplerKey]; !found {
sampler = i.SamplerFactory.GetSamplerImplementationForDataset(samplerKey)
i.datasetSamplers[samplerKey] = sampler
}

// make sampling decision and update the trace
Expand All @@ -458,16 +470,16 @@ func (i *InMemCollector) send(trace *types.Trace) {
// if we're supposed to drop this trace, and dry run mode is not enabled, then we're done.
if !shouldSend && !i.Config.GetIsDryRun() {
i.Metrics.Increment("trace_send_dropped")
i.Logger.Info().WithString("trace_id", trace.TraceID).WithString("dataset", trace.Dataset).Logf("Dropping trace because of sampling, trace to dataset")
i.Logger.Info().WithFields(logFields).Logf("Dropping trace because of sampling")
return
}
i.Metrics.Increment("trace_send_kept")

// ok, we're not dropping this trace; send all the spans
if i.Config.GetIsDryRun() && !shouldSend {
i.Logger.Info().WithString("trace_id", trace.TraceID).WithString("dataset", trace.Dataset).Logf("Trace would have been dropped, but dry run mode is enabled")
i.Logger.Info().WithFields(logFields).Logf("Trace would have been dropped, but dry run mode is enabled")
}
i.Logger.Info().WithString("trace_id", trace.TraceID).WithString("dataset", trace.Dataset).Logf("Sending trace to dataset")
i.Logger.Info().WithFields(logFields).Logf("Sending trace")
for _, sp := range trace.GetSpans() {
if sp.SampleRate < 1 {
sp.SampleRate = 1
Expand Down
Loading

0 comments on commit e5aa8cd

Please sign in to comment.