Skip to content

Commit

Permalink
Merge branch 'main' into fosb1
Browse files Browse the repository at this point in the history
  • Loading branch information
khushijain21 authored Dec 19, 2024
2 parents 2870c08 + d42aab1 commit 7f98ce0
Show file tree
Hide file tree
Showing 26 changed files with 474 additions and 75 deletions.
2 changes: 1 addition & 1 deletion .go-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.22.9
1.22.10
8 changes: 4 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ linters-settings:

gosimple:
# Select the Go version to target. The default is '1.13'.
go: "1.22.9"
go: "1.22.10"

nakedret:
# make an issue if func has more lines of code than this setting and it has naked returns; default is 30
Expand All @@ -170,19 +170,19 @@ linters-settings:

staticcheck:
# Select the Go version to target. The default is '1.13'.
go: "1.22.9"
go: "1.22.10"
checks: ["all"]

stylecheck:
# Select the Go version to target. The default is '1.13'.
go: "1.22.9"
go: "1.22.10"
# Disabled:
# ST1005: error strings should not be capitalized
checks: ["all", "-ST1005"]

unused:
# Select the Go version to target. The default is '1.13'.
go: "1.22.9"
go: "1.22.10"

gosec:
excludes:
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- The environment variable `BEATS_ADD_CLOUD_METADATA_PROVIDERS` overrides configured/default `add_cloud_metadata` providers {pull}38669[38669]
- When running under Elastic-Agent Kafka output allows dynamic topic in `topic` field {pull}40415[40415]
- The script processor has a new configuration option that only uses the cached javascript sessions and prevents the creation of new javascript sessions.
- Update to Go 1.22.7. {pull}41018[41018]
- Update to Go 1.22.10. {pull}42095[42095]
- Replace Ubuntu 20.04 with 24.04 for Docker base images {issue}40743[40743] {pull}40942[40942]
- Reduce memory consumption of k8s autodiscovery and the add_kubernetes_metadata processor when Deployment metadata is enabled
- Add `lowercase` processor. {issue}22254[22254] {pull}41424[41424]
Expand Down Expand Up @@ -424,6 +424,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Only watch metadata for ReplicaSets in metricbeat k8s module {pull}41289[41289]
- Add support for region/zone for Vertex AI service in GCP module {pull}41551[41551]
- Add support for location label as an optional configuration parameter in GCP metrics metricset. {issue}41550[41550] {pull}41626[41626]
- Collect .NET CLR (IIS) Memory, Exceptions and LocksAndThreads metrics {pull}41929[41929]
- Added `tier_preference`, `creation_date` and `version` fields to the `elasticsearch.index` metricset. {pull}41944[41944]
- Add `use_performance_counters` to collect CPU metrics using performance counters on Windows for `system/cpu` and `system/core` {pull}41965[41965]

Expand Down
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12488,11 +12488,11 @@ SOFTWARE

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-libs
Version: v0.17.4
Version: v0.17.5
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected].4/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected].5/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down
2 changes: 1 addition & 1 deletion auditbeat/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22.9
FROM golang:1.22.10

RUN \
apt-get update \
Expand Down
2 changes: 1 addition & 1 deletion dev-tools/kubernetes/filebeat/Dockerfile.debug
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22.9 as builder
FROM golang:1.22.10 as builder

ENV PATH=/usr/bin:/bin:/usr/sbin:/sbin:/usr/local/bin:/go/bin:/usr/local/go/bin

Expand Down
2 changes: 1 addition & 1 deletion dev-tools/kubernetes/metricbeat/Dockerfile.debug
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22.9 as builder
FROM golang:1.22.10 as builder

ENV PATH=/usr/bin:/bin:/usr/sbin:/sbin:/usr/local/bin:/go/bin:/usr/local/go/bin

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/elastic/beats/v7

go 1.22.9
go 1.22.10

require (
cloud.google.com/go/bigquery v1.62.0
Expand Down Expand Up @@ -177,7 +177,7 @@ require (
github.com/elastic/bayeux v1.0.5
github.com/elastic/ebpfevents v0.6.0
github.com/elastic/elastic-agent-autodiscover v0.9.0
github.com/elastic/elastic-agent-libs v0.17.4
github.com/elastic/elastic-agent-libs v0.17.5
github.com/elastic/elastic-agent-system-metrics v0.11.5
github.com/elastic/go-elasticsearch/v8 v8.14.0
github.com/elastic/go-quark v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ github.com/elastic/elastic-agent-autodiscover v0.9.0 h1:+iWIKh0u3e8I+CJa3FfWe9h0
github.com/elastic/elastic-agent-autodiscover v0.9.0/go.mod h1:5iUxLHhVdaGSWYTveSwfJEY4RqPXTG13LPiFoxcpFd4=
github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7bMgXoT2DsHfolO2CHE=
github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI=
github.com/elastic/elastic-agent-libs v0.17.4 h1:kWK5Kn2EQjM97yHqbeXv+cFAIti4IiI9Qj8huM+lZzE=
github.com/elastic/elastic-agent-libs v0.17.4/go.mod h1:5CR02awPrBr+tfmjBBK+JI+dMmHNQjpVY24J0wjbC7M=
github.com/elastic/elastic-agent-libs v0.17.5 h1:oyv5BohMia+49tZnsOmTyRWp5LoZbH8iOmGa7c4TqTs=
github.com/elastic/elastic-agent-libs v0.17.5/go.mod h1:5CR02awPrBr+tfmjBBK+JI+dMmHNQjpVY24J0wjbC7M=
github.com/elastic/elastic-agent-system-metrics v0.11.5 h1:JSjXFEn8uYZ9hoC/GxZNMgJ622UoP96sjYP/49/Uvuo=
github.com/elastic/elastic-agent-system-metrics v0.11.5/go.mod h1:nzkrGajQA29YNcfP62gfzhxX9an3/xdQ3RmfQNw9YTI=
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22.9
FROM golang:1.22.10

RUN \
apt-get update \
Expand Down
2 changes: 1 addition & 1 deletion libbeat/docs/version.asciidoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
:stack-version: 9.0.0
:doc-branch: main
:go-version: 1.22.9
:go-version: 1.22.10
:release-state: unreleased
:python: 3.7
:docker: 1.12
Expand Down
59 changes: 25 additions & 34 deletions libbeat/outputs/logstash/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ type asyncClient struct {
}

type msgRef struct {
client *asyncClient
count atomic.Uint32
batch publisher.Batch
slice []publisher.Event
err error
win *window
batchSize int
client *asyncClient
count atomic.Uint32
batch publisher.Batch
slice []publisher.Event
err error
win *window
batchSize int
deadlockListener *deadlockListener
}

func newAsyncClient(
Expand Down Expand Up @@ -146,12 +147,13 @@ func (c *asyncClient) Publish(_ context.Context, batch publisher.Batch) error {
}

ref := &msgRef{
client: c,
batch: batch,
slice: events,
batchSize: len(events),
win: c.win,
err: nil,
client: c,
batch: batch,
slice: events,
batchSize: len(events),
win: c.win,
err: nil,
deadlockListener: newDeadlockListener(c.log, logstashDeadlockTimeout),
}
ref.count.Store(1)
defer ref.dec()
Expand Down Expand Up @@ -229,34 +231,21 @@ func (c *asyncClient) getClient() *v2.AsyncClient {
return client
}

func (r *msgRef) callback(seq uint32, err error) {
if err != nil {
r.fail(seq, err)
} else {
r.done(seq)
}
}

func (r *msgRef) done(n uint32) {
func (r *msgRef) callback(n uint32, err error) {
r.client.observer.AckedEvents(int(n))
r.slice = r.slice[n:]
if r.win != nil {
r.win.tryGrowWindow(r.batchSize)
}
r.dec()
}

func (r *msgRef) fail(n uint32, err error) {
r.deadlockListener.ack(int(n))
if r.err == nil {
r.err = err
}
r.slice = r.slice[n:]
// If publishing is windowed, update the window size.
if r.win != nil {
r.win.shrinkWindow()
if err != nil {
r.win.shrinkWindow()
} else {
r.win.tryGrowWindow(r.batchSize)
}
}

r.client.observer.AckedEvents(int(n))

r.dec()
}

Expand All @@ -266,6 +255,8 @@ func (r *msgRef) dec() {
return
}

r.deadlockListener.close()

if L := len(r.slice); L > 0 {
r.client.observer.RetryableErrors(L)
}
Expand Down
95 changes: 95 additions & 0 deletions libbeat/outputs/logstash/deadlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package logstash

import (
"time"

"github.com/elastic/elastic-agent-libs/logp"
)

type deadlockListener struct {
log *logp.Logger
timeout time.Duration
ticker *time.Ticker

ackChan chan int

doneChan chan struct{}
}

const logstashDeadlockTimeout = 5 * time.Minute

func newDeadlockListener(log *logp.Logger, timeout time.Duration) *deadlockListener {
if timeout <= 0 {
return nil
}
r := &deadlockListener{
log: log,
timeout: timeout,
ticker: time.NewTicker(timeout),

ackChan: make(chan int),
doneChan: make(chan struct{}),
}
go r.run()
return r
}

func (r *deadlockListener) run() {
defer r.ticker.Stop()
defer close(r.doneChan)
for {
select {
case n, ok := <-r.ackChan:
if !ok {
// Listener has been closed
return
}
if n > 0 {
// If progress was made, reset the countdown.
r.ticker.Reset(r.timeout)
}
case <-r.ticker.C:
// No progress was made within the timeout, log error so users
// know there is likely a problem with the upstream host
r.log.Errorf("Logstash batch hasn't reported progress in the last %v, the Logstash host may be stalled. This problem can be prevented by configuring Logstash to use PipelineBusV1 or by upgrading Logstash to 8.17+, for details see https://github.com/elastic/logstash/issues/16657", r.timeout)
return
}
}
}

func (r *deadlockListener) ack(n int) {
if r == nil {
return
}
// Send the new ack to the run loop, unless it has already shut down in
// which case it can be safely ignored.
select {
case r.ackChan <- n:
case <-r.doneChan:
}
}

func (r *deadlockListener) close() {
if r == nil {
return
}
// Signal the run loop to shut down
close(r.ackChan)
}
51 changes: 51 additions & 0 deletions libbeat/outputs/logstash/deadlock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package logstash

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/logp"
)

func TestDeadlockListener(t *testing.T) {
const timeout = 5 * time.Millisecond
log := logp.NewLogger("test")
listener := newDeadlockListener(log, timeout)

// Verify that the listener doesn't trigger when receiving regular acks
for i := 0; i < 5; i++ {
time.Sleep(timeout / 2)
listener.ack(1)
}
select {
case <-listener.doneChan:
require.Fail(t, "Deadlock listener should not trigger unless there is no progress for the configured time interval")
case <-time.After(timeout / 2):
}

// Verify that the listener does trigger when the acks stop
select {
case <-time.After(timeout):
require.Fail(t, "Deadlock listener should trigger when there is no progress for the configured time interval")
case <-listener.doneChan:
}
}
7 changes: 4 additions & 3 deletions libbeat/outputs/logstash/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error {
return nil
}

deadlockListener := newDeadlockListener(c.log, logstashDeadlockTimeout)
defer deadlockListener.close()
for len(events) > 0 {

// check if we need to reconnect
Expand Down Expand Up @@ -150,13 +152,11 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error {

events = events[n:]
st.AckedEvents(n)
deadlockListener.ack(n)
if err != nil {
// return batch to pipeline before reporting/counting error
batch.RetryEvents(events)

if c.win != nil {
c.win.shrinkWindow()
}
_ = c.Close()

c.log.Errorf("Failed to publish events caused by: %+v", err)
Expand Down Expand Up @@ -186,6 +186,7 @@ func (c *syncClient) publishWindowed(events []publisher.Event) (int, error) {

n, err := c.sendEvents(events)
if err != nil {
c.win.shrinkWindow()
return n, err
}

Expand Down
Loading

0 comments on commit 7f98ce0

Please sign in to comment.