Skip to content

Commit

Permalink
Add histogram and output metrics for output latency (#37445)
Browse files Browse the repository at this point in the history
* add metrics for output latency

* plz linter

* use milliseconds

* move around timekeeping statements

* move logstash latency window
  • Loading branch information
fearful-symmetry authored Jan 3, 2024
1 parent 6f192c0 commit c9f1426
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 42 deletions.
1 change: 1 addition & 0 deletions libbeat/docs/metrics-in-logs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ endif::[]
| `.output.events.total` | Integer | Number of events currently being processed by the output. | If this number grows over time, it may indicate that the output destination (e.g. {ls} pipeline or {es} cluster) is not able to accept events at the same or faster rate than what {beatname_uc} is sending to it.
| `.output.events.acked` | Integer | Number of events acknowledged by the output destination. | Generally, we want this number to be the same as `.output.events.total` as this indicates that the output destination has reliably received all the events sent to it.
| `.output.events.failed` | Integer | Number of events that {beatname_uc} tried to send to the output destination, but the destination failed to receive them. | Generally, we want this field to be absent or its value to be zero. When the value is greater than zero, it's useful to check {beatname_uc}'s logs right before this log entry's `@timestamp` to see if there are any connectivity issues with the output destination. Note that failed events are not lost or dropped; they will be sent back to the publisher pipeline for retrying later.
| `.output.write.latency` | Object | Reports statistics on the time to send an event to the connected output, in milliseconds. This can be used to diagnose delays and performance issues caused by I/O or output configuration. This metric is available for the Elasticsearch, file, redis, and logstash outputs.
|===
ifeval::["{beatname_lc}"=="filebeat"]
Expand Down
8 changes: 6 additions & 2 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error
func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) ([]publisher.Event, error) {
span, ctx := apm.StartSpan(ctx, "publishEvents", "output")
defer span.End()
begin := time.Now()

st := client.observer

if st != nil {
Expand All @@ -246,8 +246,10 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)
return nil, nil
}

begin := time.Now()
params := map[string]string{"filter_path": "errors,items.*.error,items.*.status"}
status, result, sendErr := client.conn.Bulk(ctx, "", "", params, bulkItems)
timeSinceSend := time.Since(begin)

if sendErr != nil {
if status == http.StatusRequestEntityTooLarge {
Expand All @@ -265,7 +267,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)

client.log.Debugf("PublishEvents: %d events have been published to elasticsearch in %v.",
pubCount,
time.Since(begin))
timeSinceSend)

// check response for transient errors
var failedEvents []publisher.Event
Expand All @@ -289,6 +291,8 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)
st.Dropped(dropped)
st.Duplicate(duplicates)
st.ErrTooMany(stats.tooMany)
st.ReportLatency(timeSinceSend)

}

if failed > 0 {
Expand Down
6 changes: 6 additions & 0 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"os"
"path/filepath"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
Expand Down Expand Up @@ -119,6 +120,7 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error {
st.NewBatch(len(events))

dropped := 0

for i := range events {
event := &events[i]

Expand All @@ -135,6 +137,7 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error {
continue
}

begin := time.Now()
if _, err = out.rotator.Write(append(serializedEvent, '\n')); err != nil {
st.WriteError(err)

Expand All @@ -149,9 +152,12 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error {
}

st.WriteBytes(len(serializedEvent) + 1)
took := time.Since(begin)
st.ReportLatency(took)
}

st.Dropped(dropped)

st.Acked(len(events) - dropped)

return nil
Expand Down
6 changes: 5 additions & 1 deletion libbeat/outputs/logstash/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error {
}

for len(events) > 0 {

// check if we need to reconnect
if c.ticker != nil {
select {
Expand All @@ -136,12 +137,14 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error {
err error
)

begin := time.Now()
if c.win == nil {
n, err = c.sendEvents(events)
} else {
n, err = c.publishWindowed(events)
}

took := time.Since(begin)
st.ReportLatency(took)
c.log.Debugf("%v events out of %v events sent to logstash host %s. Continue sending",
n, len(events), c.Host())

Expand All @@ -163,6 +166,7 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error {

return err
}

}

batch.ACK()
Expand Down
21 changes: 19 additions & 2 deletions libbeat/outputs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@

package outputs

import "github.com/elastic/elastic-agent-libs/monitoring"
import (
"time"

"github.com/rcrowley/go-metrics"

"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
)

// Stats implements the Observer interface, for collecting metrics on common
// outputs events.
Expand Down Expand Up @@ -46,13 +53,15 @@ type Stats struct {

readBytes *monitoring.Uint // total amount of bytes read
readErrors *monitoring.Uint // total number of errors while waiting for response on output

sendLatencyMillis metrics.Sample
}

// NewStats creates a new Stats instance using a backing monitoring registry.
// This function will create and register a number of metrics with the registry passed.
// The registry must not be null.
func NewStats(reg *monitoring.Registry) *Stats {
return &Stats{
obj := &Stats{
batches: monitoring.NewUint(reg, "events.batches"),
events: monitoring.NewUint(reg, "events.total"),
acked: monitoring.NewUint(reg, "events.acked"),
Expand All @@ -69,7 +78,11 @@ func NewStats(reg *monitoring.Registry) *Stats {

readBytes: monitoring.NewUint(reg, "read.bytes"),
readErrors: monitoring.NewUint(reg, "read.errors"),

sendLatencyMillis: metrics.NewUniformSample(1024),
}
_ = adapter.NewGoMetrics(reg, "write.latency", adapter.Accept).Register("histogram", metrics.NewHistogram(obj.sendLatencyMillis))
return obj
}

// NewBatch updates active batch and event metrics.
Expand All @@ -81,6 +94,10 @@ func (s *Stats) NewBatch(n int) {
}
}

func (s *Stats) ReportLatency(time time.Duration) {
s.sendLatencyMillis.Update(time.Milliseconds())
}

// Acked updates active and acked event metrics.
func (s *Stats) Acked(n int) {
if s != nil {
Expand Down
52 changes: 28 additions & 24 deletions libbeat/outputs/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,24 @@

package outputs

import "time"

// Observer provides an interface used by outputs to report common events on
// documents/events being published and I/O workload.
type Observer interface {
NewBatch(int) // report new batch being processed with number of events
Acked(int) // report number of acked events
Failed(int) // report number of failed events
Dropped(int) // report number of dropped events
Duplicate(int) // report number of events detected as duplicates (e.g. on resends)
Cancelled(int) // report number of cancelled events
Split() // report a batch was split for being too large to ingest
WriteError(error) // report an I/O error on write
WriteBytes(int) // report number of bytes being written
ReadError(error) // report an I/O error on read
ReadBytes(int) // report number of bytes being read
ErrTooMany(int) // report too many requests response
NewBatch(int) // report new batch being processed with number of events
ReportLatency(time.Duration) // report the duration a send to the output takes
Acked(int) // report number of acked events
Failed(int) // report number of failed events
Dropped(int) // report number of dropped events
Duplicate(int) // report number of events detected as duplicates (e.g. on resends)
Cancelled(int) // report number of cancelled events
Split() // report a batch was split for being too large to ingest
WriteError(error) // report an I/O error on write
WriteBytes(int) // report number of bytes being written
ReadError(error) // report an I/O error on read
ReadBytes(int) // report number of bytes being read
ErrTooMany(int) // report too many requests response
}

type emptyObserver struct{}
Expand All @@ -43,15 +46,16 @@ func NewNilObserver() Observer {
return nilObserver
}

func (*emptyObserver) NewBatch(int) {}
func (*emptyObserver) Acked(int) {}
func (*emptyObserver) Duplicate(int) {}
func (*emptyObserver) Failed(int) {}
func (*emptyObserver) Dropped(int) {}
func (*emptyObserver) Cancelled(int) {}
func (*emptyObserver) Split() {}
func (*emptyObserver) WriteError(error) {}
func (*emptyObserver) WriteBytes(int) {}
func (*emptyObserver) ReadError(error) {}
func (*emptyObserver) ReadBytes(int) {}
func (*emptyObserver) ErrTooMany(int) {}
func (*emptyObserver) NewBatch(int) {}
func (*emptyObserver) ReportLatency(_ time.Duration) {}
func (*emptyObserver) Acked(int) {}
func (*emptyObserver) Duplicate(int) {}
func (*emptyObserver) Failed(int) {}
func (*emptyObserver) Dropped(int) {}
func (*emptyObserver) Cancelled(int) {}
func (*emptyObserver) Split() {}
func (*emptyObserver) WriteError(error) {}
func (*emptyObserver) WriteBytes(int) {}
func (*emptyObserver) ReadError(error) {}
func (*emptyObserver) ReadBytes(int) {}
func (*emptyObserver) ErrTooMany(int) {}
5 changes: 4 additions & 1 deletion libbeat/outputs/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,11 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn {
return nil, nil
}

start := time.Now()
// RPUSH returns total length of list -> fail and retry all on error
_, err := conn.Do(command, args...)
took := time.Since(start)
c.observer.ReportLatency(took)
if err != nil {
c.log.Errorf("Failed to %v to redis list with: %+v", command, err)
return okEvents, err
Expand Down Expand Up @@ -283,7 +286,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF
for i := range serialized {
_, err := conn.Receive()
if err != nil {
if _, ok := err.(redis.Error); ok {
if _, ok := err.(redis.Error); ok { //nolint:errorlint //this line checks against a type, not an instance of an error
c.log.Errorf("Failed to %v event to list with %+v",
command, err)
failed = append(failed, data[i])
Expand Down
25 changes: 13 additions & 12 deletions libbeat/outputs/shipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,15 +637,16 @@ type TestObserver struct {
errTooMany int
}

func (to *TestObserver) NewBatch(batch int) { to.batch += batch }
func (to *TestObserver) Acked(acked int) { to.acked += acked }
func (to *TestObserver) Duplicate(duplicate int) { to.duplicate += duplicate }
func (to *TestObserver) Failed(failed int) { to.failed += failed }
func (to *TestObserver) Dropped(dropped int) { to.dropped += dropped }
func (to *TestObserver) Cancelled(cancelled int) { to.cancelled += cancelled }
func (to *TestObserver) Split() { to.split++ }
func (to *TestObserver) WriteError(we error) { to.writeError = we }
func (to *TestObserver) WriteBytes(wb int) { to.writeBytes += wb }
func (to *TestObserver) ReadError(re error) { to.readError = re }
func (to *TestObserver) ReadBytes(rb int) { to.readBytes += rb }
func (to *TestObserver) ErrTooMany(err int) { to.errTooMany = +err }
func (to *TestObserver) NewBatch(batch int) { to.batch += batch }
func (to *TestObserver) Acked(acked int) { to.acked += acked }
func (to *TestObserver) ReportLatency(_ time.Duration) {}
func (to *TestObserver) Duplicate(duplicate int) { to.duplicate += duplicate }
func (to *TestObserver) Failed(failed int) { to.failed += failed }
func (to *TestObserver) Dropped(dropped int) { to.dropped += dropped }
func (to *TestObserver) Cancelled(cancelled int) { to.cancelled += cancelled }
func (to *TestObserver) Split() { to.split++ }
func (to *TestObserver) WriteError(we error) { to.writeError = we }
func (to *TestObserver) WriteBytes(wb int) { to.writeBytes += wb }
func (to *TestObserver) ReadError(re error) { to.readError = re }
func (to *TestObserver) ReadBytes(rb int) { to.readBytes += rb }
func (to *TestObserver) ErrTooMany(err int) { to.errTooMany = +err }

0 comments on commit c9f1426

Please sign in to comment.