Skip to content

Commit

Permalink
Refactor cloudwatch worker task allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed Apr 15, 2024
1 parent 5b67a54 commit 219e857
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 230 deletions.
134 changes: 104 additions & 30 deletions x-pack/filebeat/input/awscloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,61 +14,69 @@ import (
awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"

awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/elastic-agent-libs/logp"
)

type cloudwatchPoller struct {
numberOfWorkers int
apiSleep time.Duration
config config
region string
logStreams []*string
logStreamPrefix string
startTime int64
endTime int64
workerSem *awscommon.Sem
log *logp.Logger
metrics *inputMetrics
workersListingMap *sync.Map
workersProcessingMap *sync.Map

// When a worker is ready for its next task, it should
// send to workRequestChan and then read from workResponseChan.
// The worker can cancel the request based on other context
// cancellations, but if the write succeeds it _must_ read from
// workResponseChan to avoid deadlocking the main loop.
workRequestChan chan struct{}
workResponseChan chan workResponse

workerWg sync.WaitGroup
}

type workResponse struct {
logGroup string
startTime, endTime time.Time
}

func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics,
awsRegion string, apiSleep time.Duration,
numberOfWorkers int, logStreams []*string, logStreamPrefix string) *cloudwatchPoller {
awsRegion string, config config) *cloudwatchPoller {
if metrics == nil {
metrics = newInputMetrics("", nil)
}

return &cloudwatchPoller{
numberOfWorkers: numberOfWorkers,
apiSleep: apiSleep,
region: awsRegion,
logStreams: logStreams,
logStreamPrefix: logStreamPrefix,
startTime: int64(0),
endTime: int64(0),
workerSem: awscommon.NewSem(numberOfWorkers),
log: log,
metrics: metrics,
region: awsRegion,
config: config,
workersListingMap: new(sync.Map),
workersProcessingMap: new(sync.Map),
// workRequestChan is unbuffered to guarantee that
// the worker and main loop agree whether a request
// was sent. workerResponseChan is buffered so the
// main loop doesn't have to block on the workers
// while distributing new data.
workRequestChan: make(chan struct{}),
workResponseChan: make(chan workResponse, 10),
}
}

func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) {
func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) {
err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor)
if err != nil {
var errRequestCanceled *awssdk.RequestCanceledError
if errors.As(err, &errRequestCanceled) {
p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err)
p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", errRequestCanceled)
}
p.log.Error("getLogEventsFromCloudWatch failed: ", err)
}
}

// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error {
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) error {
// construct FilterLogEventsInput
filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup)
paginator := cloudwatchlogs.NewFilterLogEventsPaginator(svc, filterLogEventsInput)
Expand All @@ -83,8 +91,8 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client
p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents)))

// This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region).
p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep)
time.Sleep(p.apiSleep)
p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.config.APISleep)
time.Sleep(p.config.APISleep)
p.log.Debug("done sleeping")

p.log.Debugf("Processing #%v events", len(logEvents))
Expand All @@ -93,21 +101,87 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client
return nil
}

func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput {
func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime, endTime time.Time, logGroup string) *cloudwatchlogs.FilterLogEventsInput {
filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: awssdk.String(logGroup),
StartTime: awssdk.Int64(startTime),
EndTime: awssdk.Int64(endTime),
StartTime: awssdk.Int64(startTime.UnixNano() / int64(time.Millisecond)),
EndTime: awssdk.Int64(endTime.UnixNano() / int64(time.Millisecond)),
}

if len(p.logStreams) > 0 {
for _, stream := range p.logStreams {
if len(p.config.LogStreams) > 0 {
for _, stream := range p.config.LogStreams {
filterLogEventsInput.LogStreamNames = append(filterLogEventsInput.LogStreamNames, *stream)
}
}

if p.logStreamPrefix != "" {
filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.logStreamPrefix)
if p.config.LogStreamPrefix != "" {
filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.config.LogStreamPrefix)
}
return filterLogEventsInput
}

func (p *cloudwatchPoller) startWorkers(
ctx context.Context,
svc *cloudwatchlogs.Client,
logProcessor *logProcessor,
) {
for i := 0; i < p.config.NumberOfWorkers; i++ {
p.workerWg.Add(1)
go func() {
defer p.workerWg.Done()
for {
var work workResponse
select {
case <-ctx.Done():
return
case p.workRequestChan <- struct{}{}:
work = <-p.workResponseChan
}

p.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", work.logGroup)
p.run(svc, work.logGroup, work.startTime, work.endTime, logProcessor)
p.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", work.logGroup)
}
}()
}
}

// receive implements the main run loop that distributes tasks to the worker
// goroutines. It accepts a "clock" callback (which on a live input should
// equal time.Now) to allow deterministic unit tests.
func (p *cloudwatchPoller) receive(ctx context.Context, logGroupNames []string, clock func() time.Time) {
defer p.workerWg.Wait()
// startTime and endTime are the bounds of the current scanning interval.
// If we're starting at the end of the logs, advance the start time to the
// most recent scan window
var startTime time.Time
endTime := clock().Add(-p.config.Latency)
if p.config.StartPosition == "end" {
startTime = endTime.Add(-p.config.ScanFrequency)
}
for ctx.Err() == nil {
for _, lg := range logGroupNames {
select {
case <-ctx.Done():
return
case <-p.workRequestChan:
p.workResponseChan <- workResponse{
logGroup: lg,
startTime: startTime,
endTime: endTime,
}
}
}

// Delay for ScanFrequency after finishing a time span
p.log.Debugf("sleeping for %v before checking new logs", p.config.ScanFrequency)
select {
case <-time.After(p.config.ScanFrequency):
case <-ctx.Done():
}
p.log.Debug("done sleeping")

// Advance to the next time span
startTime, endTime = endTime, clock().Add(-p.config.Latency)
}
}
101 changes: 4 additions & 97 deletions x-pack/filebeat/input/awscloudwatch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ package awscloudwatch

import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -137,82 +135,12 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline)
log.Named("cloudwatch_poller"),
in.metrics,
in.awsConfig.Region,
in.config.APISleep,
in.config.NumberOfWorkers,
in.config.LogStreams,
in.config.LogStreamPrefix)
in.config)
logProcessor := newLogProcessor(log.Named("log_processor"), in.metrics, client, ctx)
cwPoller.metrics.logGroupsTotal.Add(uint64(len(logGroupNames)))
return in.Receive(svc, cwPoller, ctx, logProcessor, logGroupNames)
}

func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwatchPoller, ctx context.Context, logProcessor *logProcessor, logGroupNames []string) error {
// This loop tries to keep the workers busy as much as possible while
// honoring the number in config opposed to a simpler loop that does one
// listing, sequentially processes every object and then does another listing
start := true
workerWg := new(sync.WaitGroup)
lastLogGroupOffset := 0
for ctx.Err() == nil {
if !start {
cwPoller.log.Debugf("sleeping for %v before checking new logs", in.config.ScanFrequency)
time.Sleep(in.config.ScanFrequency)
cwPoller.log.Debug("done sleeping")
}
start = false

currentTime := time.Now()
cwPoller.startTime, cwPoller.endTime = getStartPosition(in.config.StartPosition, currentTime, cwPoller.endTime, in.config.ScanFrequency, in.config.Latency)
cwPoller.log.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(cwPoller.startTime/1000, 0), time.Unix(cwPoller.endTime/1000, 0))
availableWorkers, err := cwPoller.workerSem.AcquireContext(in.config.NumberOfWorkers, ctx)
if err != nil {
break
}

if availableWorkers == 0 {
continue
}

workerWg.Add(availableWorkers)
logGroupNamesLength := len(logGroupNames)
runningGoroutines := 0

for i := lastLogGroupOffset; i < logGroupNamesLength; i++ {
if runningGoroutines >= availableWorkers {
break
}

runningGoroutines++
lastLogGroupOffset = i + 1
if lastLogGroupOffset >= logGroupNamesLength {
// release unused workers
cwPoller.workerSem.Release(availableWorkers - runningGoroutines)
for j := 0; j < availableWorkers-runningGoroutines; j++ {
workerWg.Done()
}
lastLogGroupOffset = 0
}

lg := logGroupNames[i]
go func(logGroup string, startTime int64, endTime int64) {
defer func() {
cwPoller.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", logGroup)
workerWg.Done()
cwPoller.workerSem.Release(1)
}()
cwPoller.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", logGroup)
cwPoller.run(svc, logGroup, startTime, endTime, logProcessor)
}(lg, cwPoller.startTime, cwPoller.endTime)
}
}

// Wait for all workers to finish.
workerWg.Wait()
if errors.Is(ctx.Err(), context.Canceled) {
// A canceled context is a normal shutdown.
return nil
}
return ctx.Err()
cwPoller.startWorkers(ctx, svc, logProcessor)
cwPoller.receive(ctx, logGroupNames, time.Now)
return nil
}

func parseARN(logGroupARN string) (string, string, error) {
Expand Down Expand Up @@ -256,24 +184,3 @@ func getLogGroupNames(svc *cloudwatchlogs.Client, logGroupNamePrefix string, log
}
return logGroupNames, nil
}

func getStartPosition(startPosition string, currentTime time.Time, endTime int64, scanFrequency time.Duration, latency time.Duration) (int64, int64) {
if latency != 0 {
// add latency if config is not 0
currentTime = currentTime.Add(latency * -1)
}

switch startPosition {
case "beginning":
if endTime != int64(0) {
return endTime, currentTime.UnixNano() / int64(time.Millisecond)
}
return 0, currentTime.UnixNano() / int64(time.Millisecond)
case "end":
if endTime != int64(0) {
return endTime, currentTime.UnixNano() / int64(time.Millisecond)
}
return currentTime.Add(-scanFrequency).UnixNano() / int64(time.Millisecond), currentTime.UnixNano() / int64(time.Millisecond)
}
return 0, 0
}
Loading

0 comments on commit 219e857

Please sign in to comment.