From c9f1426f12d9f9cdfb317c747118e2744abdccd7 Mon Sep 17 00:00:00 2001 From: Alex K <8418476+fearful-symmetry@users.noreply.github.com> Date: Wed, 3 Jan 2024 10:26:07 -0800 Subject: [PATCH] Add histogram and output metrics for output latency (#37445) * add metrics for output latency * plz linter * use milliseconds * move around timekeeping statements * move logstash latency window --- libbeat/docs/metrics-in-logs.asciidoc | 1 + libbeat/outputs/elasticsearch/client.go | 8 +++- libbeat/outputs/fileout/file.go | 6 +++ libbeat/outputs/logstash/sync.go | 6 ++- libbeat/outputs/metrics.go | 21 +++++++++- libbeat/outputs/observer.go | 52 +++++++++++++------------ libbeat/outputs/redis/client.go | 5 ++- libbeat/outputs/shipper/shipper_test.go | 25 ++++++------ 8 files changed, 82 insertions(+), 42 deletions(-) diff --git a/libbeat/docs/metrics-in-logs.asciidoc b/libbeat/docs/metrics-in-logs.asciidoc index c499e7462f4d..97aac4f3a302 100644 --- a/libbeat/docs/metrics-in-logs.asciidoc +++ b/libbeat/docs/metrics-in-logs.asciidoc @@ -170,6 +170,7 @@ endif::[] | `.output.events.total` | Integer | Number of events currently being processed by the output. | If this number grows over time, it may indicate that the output destination (e.g. {ls} pipeline or {es} cluster) is not able to accept events at the same or faster rate than what {beatname_uc} is sending to it. | `.output.events.acked` | Integer | Number of events acknowledged by the output destination. | Generally, we want this number to be the same as `.output.events.total` as this indicates that the output destination has reliably received all the events sent to it. | `.output.events.failed` | Integer | Number of events that {beatname_uc} tried to send to the output destination, but the destination failed to receive them. | Generally, we want this field to be absent or its value to be zero. When the value is greater than zero, it's useful to check {beatname_uc}'s logs right before this log entry's `@timestamp` to see if there are any connectivity issues with the output destination. Note that failed events are not lost or dropped; they will be sent back to the publisher pipeline for retrying later. +| `.output.write.latency` | Object | Reports statistics on the time to send an event to the connected output, in milliseconds. This can be used to diagnose delays and performance issues caused by I/O or output configuration. This metric is available for the Elasticsearch, file, redis, and logstash outputs. |=== ifeval::["{beatname_lc}"=="filebeat"] diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 4996dba887e2..8aeef2c623e7 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -221,7 +221,7 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) ([]publisher.Event, error) { span, ctx := apm.StartSpan(ctx, "publishEvents", "output") defer span.End() - begin := time.Now() + st := client.observer if st != nil { @@ -246,8 +246,10 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) return nil, nil } + begin := time.Now() params := map[string]string{"filter_path": "errors,items.*.error,items.*.status"} status, result, sendErr := client.conn.Bulk(ctx, "", "", params, bulkItems) + timeSinceSend := time.Since(begin) if sendErr != nil { if status == http.StatusRequestEntityTooLarge { @@ -265,7 +267,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) client.log.Debugf("PublishEvents: %d events have been published to elasticsearch in %v.", pubCount, - time.Since(begin)) + timeSinceSend) // check response for transient errors var failedEvents []publisher.Event @@ -289,6 +291,8 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) st.Dropped(dropped) st.Duplicate(duplicates) st.ErrTooMany(stats.tooMany) + st.ReportLatency(timeSinceSend) + } if failed > 0 { diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index d12a11b25c3c..4ddc5955d6ef 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -21,6 +21,7 @@ import ( "context" "os" "path/filepath" + "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" @@ -119,6 +120,7 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error { st.NewBatch(len(events)) dropped := 0 + for i := range events { event := &events[i] @@ -135,6 +137,7 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error { continue } + begin := time.Now() if _, err = out.rotator.Write(append(serializedEvent, '\n')); err != nil { st.WriteError(err) @@ -149,9 +152,12 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error { } st.WriteBytes(len(serializedEvent) + 1) + took := time.Since(begin) + st.ReportLatency(took) } st.Dropped(dropped) + st.Acked(len(events) - dropped) return nil diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index ad4293eb9f79..2a49324c46f9 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -114,6 +114,7 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error { } for len(events) > 0 { + // check if we need to reconnect if c.ticker != nil { select { @@ -136,12 +137,14 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error { err error ) + begin := time.Now() if c.win == nil { n, err = c.sendEvents(events) } else { n, err = c.publishWindowed(events) } - + took := time.Since(begin) + st.ReportLatency(took) c.log.Debugf("%v events out of %v events sent to logstash host %s. Continue sending", n, len(events), c.Host()) @@ -163,6 +166,7 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error { return err } + } batch.ACK() diff --git a/libbeat/outputs/metrics.go b/libbeat/outputs/metrics.go index 77374df3e61e..5502c4e4ae06 100644 --- a/libbeat/outputs/metrics.go +++ b/libbeat/outputs/metrics.go @@ -17,7 +17,14 @@ package outputs -import "github.com/elastic/elastic-agent-libs/monitoring" +import ( + "time" + + "github.com/rcrowley/go-metrics" + + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" +) // Stats implements the Observer interface, for collecting metrics on common // outputs events. @@ -46,13 +53,15 @@ type Stats struct { readBytes *monitoring.Uint // total amount of bytes read readErrors *monitoring.Uint // total number of errors while waiting for response on output + + sendLatencyMillis metrics.Sample } // NewStats creates a new Stats instance using a backing monitoring registry. // This function will create and register a number of metrics with the registry passed. // The registry must not be null. func NewStats(reg *monitoring.Registry) *Stats { - return &Stats{ + obj := &Stats{ batches: monitoring.NewUint(reg, "events.batches"), events: monitoring.NewUint(reg, "events.total"), acked: monitoring.NewUint(reg, "events.acked"), @@ -69,7 +78,11 @@ func NewStats(reg *monitoring.Registry) *Stats { readBytes: monitoring.NewUint(reg, "read.bytes"), readErrors: monitoring.NewUint(reg, "read.errors"), + + sendLatencyMillis: metrics.NewUniformSample(1024), } + _ = adapter.NewGoMetrics(reg, "write.latency", adapter.Accept).Register("histogram", metrics.NewHistogram(obj.sendLatencyMillis)) + return obj } // NewBatch updates active batch and event metrics. @@ -81,6 +94,10 @@ func (s *Stats) NewBatch(n int) { } } +func (s *Stats) ReportLatency(time time.Duration) { + s.sendLatencyMillis.Update(time.Milliseconds()) +} + // Acked updates active and acked event metrics. func (s *Stats) Acked(n int) { if s != nil { diff --git a/libbeat/outputs/observer.go b/libbeat/outputs/observer.go index 9d7a3aec4a0c..3a330e4a43ac 100644 --- a/libbeat/outputs/observer.go +++ b/libbeat/outputs/observer.go @@ -17,21 +17,24 @@ package outputs +import "time" + // Observer provides an interface used by outputs to report common events on // documents/events being published and I/O workload. type Observer interface { - NewBatch(int) // report new batch being processed with number of events - Acked(int) // report number of acked events - Failed(int) // report number of failed events - Dropped(int) // report number of dropped events - Duplicate(int) // report number of events detected as duplicates (e.g. on resends) - Cancelled(int) // report number of cancelled events - Split() // report a batch was split for being too large to ingest - WriteError(error) // report an I/O error on write - WriteBytes(int) // report number of bytes being written - ReadError(error) // report an I/O error on read - ReadBytes(int) // report number of bytes being read - ErrTooMany(int) // report too many requests response + NewBatch(int) // report new batch being processed with number of events + ReportLatency(time.Duration) // report the duration a send to the output takes + Acked(int) // report number of acked events + Failed(int) // report number of failed events + Dropped(int) // report number of dropped events + Duplicate(int) // report number of events detected as duplicates (e.g. on resends) + Cancelled(int) // report number of cancelled events + Split() // report a batch was split for being too large to ingest + WriteError(error) // report an I/O error on write + WriteBytes(int) // report number of bytes being written + ReadError(error) // report an I/O error on read + ReadBytes(int) // report number of bytes being read + ErrTooMany(int) // report too many requests response } type emptyObserver struct{} @@ -43,15 +46,16 @@ func NewNilObserver() Observer { return nilObserver } -func (*emptyObserver) NewBatch(int) {} -func (*emptyObserver) Acked(int) {} -func (*emptyObserver) Duplicate(int) {} -func (*emptyObserver) Failed(int) {} -func (*emptyObserver) Dropped(int) {} -func (*emptyObserver) Cancelled(int) {} -func (*emptyObserver) Split() {} -func (*emptyObserver) WriteError(error) {} -func (*emptyObserver) WriteBytes(int) {} -func (*emptyObserver) ReadError(error) {} -func (*emptyObserver) ReadBytes(int) {} -func (*emptyObserver) ErrTooMany(int) {} +func (*emptyObserver) NewBatch(int) {} +func (*emptyObserver) ReportLatency(_ time.Duration) {} +func (*emptyObserver) Acked(int) {} +func (*emptyObserver) Duplicate(int) {} +func (*emptyObserver) Failed(int) {} +func (*emptyObserver) Dropped(int) {} +func (*emptyObserver) Cancelled(int) {} +func (*emptyObserver) Split() {} +func (*emptyObserver) WriteError(error) {} +func (*emptyObserver) WriteBytes(int) {} +func (*emptyObserver) ReadError(error) {} +func (*emptyObserver) ReadBytes(int) {} +func (*emptyObserver) ErrTooMany(int) {} diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index 5165d894f654..5a299749aac8 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -233,8 +233,11 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn { return nil, nil } + start := time.Now() // RPUSH returns total length of list -> fail and retry all on error _, err := conn.Do(command, args...) + took := time.Since(start) + c.observer.ReportLatency(took) if err != nil { c.log.Errorf("Failed to %v to redis list with: %+v", command, err) return okEvents, err @@ -283,7 +286,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF for i := range serialized { _, err := conn.Receive() if err != nil { - if _, ok := err.(redis.Error); ok { + if _, ok := err.(redis.Error); ok { //nolint:errorlint //this line checks against a type, not an instance of an error c.log.Errorf("Failed to %v event to list with %+v", command, err) failed = append(failed, data[i]) diff --git a/libbeat/outputs/shipper/shipper_test.go b/libbeat/outputs/shipper/shipper_test.go index ef6b628ba8c9..e26d44635aff 100644 --- a/libbeat/outputs/shipper/shipper_test.go +++ b/libbeat/outputs/shipper/shipper_test.go @@ -637,15 +637,16 @@ type TestObserver struct { errTooMany int } -func (to *TestObserver) NewBatch(batch int) { to.batch += batch } -func (to *TestObserver) Acked(acked int) { to.acked += acked } -func (to *TestObserver) Duplicate(duplicate int) { to.duplicate += duplicate } -func (to *TestObserver) Failed(failed int) { to.failed += failed } -func (to *TestObserver) Dropped(dropped int) { to.dropped += dropped } -func (to *TestObserver) Cancelled(cancelled int) { to.cancelled += cancelled } -func (to *TestObserver) Split() { to.split++ } -func (to *TestObserver) WriteError(we error) { to.writeError = we } -func (to *TestObserver) WriteBytes(wb int) { to.writeBytes += wb } -func (to *TestObserver) ReadError(re error) { to.readError = re } -func (to *TestObserver) ReadBytes(rb int) { to.readBytes += rb } -func (to *TestObserver) ErrTooMany(err int) { to.errTooMany = +err } +func (to *TestObserver) NewBatch(batch int) { to.batch += batch } +func (to *TestObserver) Acked(acked int) { to.acked += acked } +func (to *TestObserver) ReportLatency(_ time.Duration) {} +func (to *TestObserver) Duplicate(duplicate int) { to.duplicate += duplicate } +func (to *TestObserver) Failed(failed int) { to.failed += failed } +func (to *TestObserver) Dropped(dropped int) { to.dropped += dropped } +func (to *TestObserver) Cancelled(cancelled int) { to.cancelled += cancelled } +func (to *TestObserver) Split() { to.split++ } +func (to *TestObserver) WriteError(we error) { to.writeError = we } +func (to *TestObserver) WriteBytes(wb int) { to.writeBytes += wb } +func (to *TestObserver) ReadError(re error) { to.readError = re } +func (to *TestObserver) ReadBytes(rb int) { to.readBytes += rb } +func (to *TestObserver) ErrTooMany(err int) { to.errTooMany = +err }