diff --git a/CHANGELOG.md b/CHANGELOG.md index 05a1357b..dae990fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +Next +---- +- Breaking: refactor the cloud handler to use the modern pipeline. This removes the `cloudprovider.items_queued` metric, + and now tracks the absolute number of hosts to look up, regardless of type. + 34.0.1 ------ - Forcing a new tag release to allow for docker release diff --git a/METRICS.md b/METRICS.md index 621fa42d..511643a2 100644 --- a/METRICS.md +++ b/METRICS.md @@ -53,8 +53,7 @@ Metrics: | cloudprovider.cache_refresh_negative | gauge (cumulative) | | The cumulative number of refreshes which had an error refreshing and used old data | cloudprovider.cache_hit | gauge (cumulative) | | The cumulative number of cache hits (host was in the cache) | cloudprovider.cache_miss | gauge (cumulative) | | The cumulative number of cache misses -| cloudprovider.hosts_queued | gauge (flush) | type | The absolute number of hosts waiting to be looked up -| cloudprovider.items_queued | gauge (flush) | type | The absolute number of metrics or events waiting for a host lookup to complete +| cloudprovider.hosts_queued | gauge (flush) | | The absolute number of hosts waiting to be looked up | http.forwarder.invalid | counter | | The number of failures to prepare a batch of metrics to forward | http.forwarder.created | counter | | The number of batches prepared for forwarding | http.forwarder.sent | counter | | The number of batches successfully forwarded @@ -70,7 +69,6 @@ Metrics: | version | The git tag of the build | commit | The short git commit of the build | backend | The backend sending a particular metric -| type | Either metric or event for cloudprovider.hosts_queued, or event for cloudprovider.items_queued | result | Success to indicate a batch of metrics was successfully processed, failure to indicate a batch of metrics was not processed, with additional failure tag for why) | failure | The reason a batch of metrics was not processed | server-name | The name of an http-server as specified in the config file diff --git a/pkg/statsd/handler_backend.go b/pkg/statsd/handler_backend.go index ccd17137..efda1a94 100644 --- a/pkg/statsd/handler_backend.go +++ b/pkg/statsd/handler_backend.go @@ -167,7 +167,7 @@ func (bh *BackendHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event) for _, backend := range bh.backends { select { case <-ctx.Done(): - // Not all backends got the event, should decrement the wg counter + // Not all backends got the event, should decrement the wg counter to account for it bh.eventWg.Add(eventsDispatched - len(bh.backends)) return case bh.concurrentEvents <- struct{}{}: diff --git a/pkg/statsd/handler_cloud.go b/pkg/statsd/handler_cloud.go index d39cf4ec..c5130e5c 100644 --- a/pkg/statsd/handler_cloud.go +++ b/pkg/statsd/handler_cloud.go @@ -11,28 +11,28 @@ import ( "github.com/atlassian/gostatsd/pkg/stats" ) +type pendingMetricsAndEvents struct { + metrics *gostatsd.MetricMap + events []*gostatsd.Event +} + // CloudHandler enriches metrics and events with additional information fetched from cloud provider. type CloudHandler struct { // These fields are accessed by any go routine, must use atomic ops statsCacheHit uint64 // Cumulative number of cache hits statsCacheMiss uint64 // Cumulative number of cache misses - // All other stats fields may only be read or written by the main CloudHandler.Run goroutine - statsMetricHostsQueued uint64 // Absolute number of IPs waiting for a CP to respond for metrics - statsEventItemsQueued uint64 // Absolute number of events queued, waiting for a CP to respond - statsEventHostsQueued uint64 // Absolute number of IPs waiting for a CP to respond for events - cachedInstances gostatsd.CachedInstances handler gostatsd.PipelineHandler incomingMetrics chan *gostatsd.MetricMap incomingEvents chan *gostatsd.Event // emitChan triggers a write of all the current stats when it is given a Statser - emitChan chan stats.Statser - awaitingEvents map[gostatsd.Source][]*gostatsd.Event - awaitingMetrics map[gostatsd.Source]*gostatsd.MetricMap + emitChan chan stats.Statser + + perHostPending map[gostatsd.Source]*pendingMetricsAndEvents toLookupIPs []gostatsd.Source - wg sync.WaitGroup + wgPendingEvents sync.WaitGroup estimatedTags int } @@ -45,8 +45,7 @@ func NewCloudHandler(cachedInstances gostatsd.CachedInstances, handler gostatsd. incomingMetrics: make(chan *gostatsd.MetricMap), incomingEvents: make(chan *gostatsd.Event), emitChan: make(chan stats.Statser), - awaitingEvents: make(map[gostatsd.Source][]*gostatsd.Event), - awaitingMetrics: make(map[gostatsd.Source]*gostatsd.MetricMap), + perHostPending: make(map[gostatsd.Source]*pendingMetricsAndEvents), estimatedTags: handler.EstimatedTags() + cachedInstances.EstimatedTags(), } } @@ -105,17 +104,17 @@ func (ch *CloudHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event) { ch.handler.DispatchEvent(ctx, e) return } - ch.wg.Add(1) // Increment before sending to the channel + ch.wgPendingEvents.Add(1) // Increment before sending to the channel select { case <-ctx.Done(): - ch.wg.Done() + ch.wgPendingEvents.Done() case ch.incomingEvents <- e: } } // WaitForEvents waits for all event-dispatching goroutines to finish. func (ch *CloudHandler) WaitForEvents() { - ch.wg.Wait() + ch.wgPendingEvents.Wait() ch.handler.WaitForEvents() } @@ -160,11 +159,8 @@ func (ch *CloudHandler) emit(statser stats.Statser) { // atomic statser.Gauge("cloudprovider.cache_hit", float64(atomic.LoadUint64(&ch.statsCacheHit)), nil) statser.Gauge("cloudprovider.cache_miss", float64(atomic.LoadUint64(&ch.statsCacheMiss)), nil) - t := gostatsd.Tags{"type:metric"} - statser.Gauge("cloudprovider.hosts_queued", float64(ch.statsMetricHostsQueued), t) - t = gostatsd.Tags{"type:event"} - statser.Gauge("cloudprovider.hosts_queued", float64(ch.statsEventHostsQueued), t) - statser.Gauge("cloudprovider.items_queued", float64(ch.statsEventItemsQueued), t) + // non-atomic + statser.Gauge("cloudprovider.hosts_queued", float64(len(ch.perHostPending)), nil) } func (ch *CloudHandler) Run(ctx context.Context) { @@ -184,10 +180,8 @@ func (ch *CloudHandler) Run(ctx context.Context) { case info := <-infoSource: ch.handleInstanceInfo(ctx, info) case metrics := <-ch.incomingMetrics: - // Add metrics to awaitingMetrics, accumulate IPs to lookup ch.handleIncomingMetrics(metrics) case e := <-ch.incomingEvents: - // Add event to awaitingEvents, accumulate IPs to lookup ch.handleIncomingEvent(e) case statser := <-ch.emitChan: ch.emit(statser) @@ -203,38 +197,46 @@ func (ch *CloudHandler) Run(ctx context.Context) { } func (ch *CloudHandler) handleInstanceInfo(ctx context.Context, info gostatsd.InstanceInfo) { - mm := ch.awaitingMetrics[info.IP] - if mm != nil { - delete(ch.awaitingMetrics, info.IP) - ch.statsMetricHostsQueued-- - go ch.updateAndDispatchMetrics(ctx, info.Instance, mm) + pending, ok := ch.perHostPending[info.IP] + if !ok { + return // got an instance for something we didn't request, ignore it. } - events := ch.awaitingEvents[info.IP] - if len(events) > 0 { - delete(ch.awaitingEvents, info.IP) - ch.statsEventItemsQueued -= uint64(len(events)) - ch.statsEventHostsQueued-- - go ch.updateAndDispatchEvents(ctx, info.Instance, events) + + delete(ch.perHostPending, info.IP) + if pending.metrics != nil { + go ch.updateAndDispatchMetrics(ctx, info.Instance, pending.metrics) + } + if len(pending.events) > 0 { + go ch.updateAndDispatchEvents(ctx, info.Instance, pending.events) } } -// prepareMetricQueue will ensure that ch.awaitingMetrics has a matching MetricMap for -// source, and return it. If it did not have one initially, it will also enqueue source -// for lookup. The functionality is overloaded to minimize code duplication. -func (ch *CloudHandler) prepareMetricQueue(source gostatsd.Source) *gostatsd.MetricMap { - if queue, ok := ch.awaitingMetrics[source]; ok { - return queue - } - if len(ch.awaitingEvents[source]) == 0 { +// preparePending will return a place to queue things that are waiting to be processed, +// and ensure that source will be looked up if it wasn't already. +func (ch *CloudHandler) preparePending(source gostatsd.Source) *pendingMetricsAndEvents { + if _, ok := ch.perHostPending[source]; !ok { + ch.perHostPending[source] = &pendingMetricsAndEvents{} ch.toLookupIPs = append(ch.toLookupIPs, source) - ch.statsMetricHostsQueued++ } - queue := gostatsd.NewMetricMap() - ch.awaitingMetrics[source] = queue - return queue + return ch.perHostPending[source] +} + +// prepareMetricQueue will ensure that ch.perHostPending has a matching MetricMap for +// the provided source and return it. +func (ch *CloudHandler) prepareMetricQueue(source gostatsd.Source) *gostatsd.MetricMap { + queue := ch.preparePending(source) + if queue.metrics == nil { + // There might be value in pushing this to preparePending, since the split is + // really only beneficial if a host is only sending events and not metrics, and + // this adds an extra comparison to every lookup. + queue.metrics = gostatsd.NewMetricMap() + } + return queue.metrics } func (ch *CloudHandler) handleIncomingMetrics(mm *gostatsd.MetricMap) { + // The .Source values could be from different hosts if they were + // forwarded, therefore we need to do a lookup each time. mm.Counters.Each(func(metricName string, tagsKey string, c gostatsd.Counter) { ch.prepareMetricQueue(c.Source).MergeCounter(metricName, tagsKey, c) }) @@ -250,14 +252,8 @@ func (ch *CloudHandler) handleIncomingMetrics(mm *gostatsd.MetricMap) { } func (ch *CloudHandler) handleIncomingEvent(e *gostatsd.Event) { - queue := ch.awaitingEvents[e.Source] - ch.awaitingEvents[e.Source] = append(queue, e) - if len(queue) == 0 && ch.awaitingMetrics[e.Source] == nil { - // This is the first event for that IP in the queue. Need to fetch an Instance for this IP. - ch.toLookupIPs = append(ch.toLookupIPs, e.Source) - ch.statsEventHostsQueued++ - } - ch.statsEventItemsQueued++ + queue := ch.preparePending(e.Source) + queue.events = append(queue.events, e) } func (ch *CloudHandler) updateAndDispatchMetrics(ctx context.Context, instance *gostatsd.Instance, mmIn *gostatsd.MetricMap) { @@ -284,7 +280,7 @@ func (ch *CloudHandler) updateAndDispatchMetrics(ctx context.Context, instance * func (ch *CloudHandler) updateAndDispatchEvents(ctx context.Context, instance *gostatsd.Instance, events []*gostatsd.Event) { var dispatched int defer func() { - ch.wg.Add(-dispatched) + ch.wgPendingEvents.Add(-dispatched) }() for _, e := range events { updateInplace(e, instance)