From 5034b1b337dde43b55242ca171a1d9a27637ecf8 Mon Sep 17 00:00:00 2001 From: Pablo <2617411+thepalbi@users.noreply.github.com> Date: Sat, 9 Sep 2023 16:35:57 -0300 Subject: [PATCH] Add support for EventBridge s3 events in lambda-promtail (#10449) **What this PR does / why we need it**: Lambda promtail supports s3 events, which are used for scraping several log sources such as ALB access logs. This works by configuring at the S3 bucket level "s3 event notification", that are configured to target the lambda deployment of lambda-promtail. However, if one is configuring this through CloudFormation, there's a known issue with AWS that doesn't allow to configure both the lambda, the bucket, and the notifications in the same stack. See [this issue](https://github.com/aws-cloudformation/cloudformation-coverage-roadmap/issues/79) for details. For that, AWS introduced EventBridge notifications, which can be used to ship s3 events to a lambda deployment as well. This flow looks like: s3 -> eventbridge bus -> eventbridge rule -> lambda EventBridge has it's own message structure for s3 notifications. This PR adds a translation layer, just for `Object created` events (since they are the only ones we should take into account), so that EventBridge events can be received, and trigger the lambda as if they were from s3. **Which issue(s) this PR fixes**: Fixes https://github.com/grafana/loki/issues/10209 **Special notes for your reviewer**: - [x] Pending testing this with an actual deployment of the s3 -> event bridge -> lambda flow - [x] ~~Add CF template for the `s3 -> event bridge -> lambda` deployment~~ Follow up PR **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --- .../lambda-promtail/eventbridge.go | 61 ++++++++++++++++++ .../lambda-promtail/eventbridge_test.go | 63 +++++++++++++++++++ tools/lambda-promtail/lambda-promtail/main.go | 5 +- .../testdata/eventbridge-s3-event.json | 28 +++++++++ 4 files changed, 156 insertions(+), 1 deletion(-) create mode 100644 tools/lambda-promtail/lambda-promtail/eventbridge.go create mode 100644 tools/lambda-promtail/lambda-promtail/eventbridge_test.go create mode 100644 tools/lambda-promtail/testdata/eventbridge-s3-event.json diff --git a/tools/lambda-promtail/lambda-promtail/eventbridge.go b/tools/lambda-promtail/lambda-promtail/eventbridge.go new file mode 100644 index 0000000000000..f6c7798a6efad --- /dev/null +++ b/tools/lambda-promtail/lambda-promtail/eventbridge.go @@ -0,0 +1,61 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "github.com/aws/aws-lambda-go/events" + "github.com/go-kit/log" +) + +// S3Detail encodes the message structure in EventBridge s3 notifications. +// https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html +type S3Detail struct { + Version string `json:"version"` + Bucket struct { + Name string `json:"name"` + } `json:"bucket"` + Object S3ObjectDetail `json:"object"` +} + +type S3ObjectDetail struct { + Key string `json:"key"` + Size int `json:"size"` + ETag string `json:"etag"` + VersionID string `json:"version-id"` + Sequencer string `json:"sequencer"` +} + +type s3EventProcessor func(ctx context.Context, ev *events.S3Event, pc Client, log *log.Logger) error + +func processEventBridgeEvent(ctx context.Context, ev *events.CloudWatchEvent, pc Client, log *log.Logger, process s3EventProcessor) error { + // lambda-promtail should only be used with S3 object creation events, since those indicate that a new file has been + // added to bucket, and need to be fetched and parsed accordingly. + if !(ev.Source == "aws.s3" && ev.DetailType == "Object Created") { + return fmt.Errorf("event bridge event type not supported") + } + + var eventDetail S3Detail + if err := json.Unmarshal(ev.Detail, &eventDetail); err != nil { + return err + } + + // TODO(thepalbi): how to fill bucket owner? + var s3Event = events.S3Event{ + Records: []events.S3EventRecord{ + { + AWSRegion: ev.Region, + S3: events.S3Entity{ + Bucket: events.S3Bucket{ + Name: eventDetail.Bucket.Name, + }, + Object: events.S3Object{ + Key: eventDetail.Object.Key, + }, + }, + }, + }, + } + + return process(ctx, &s3Event, pc, log) +} diff --git a/tools/lambda-promtail/lambda-promtail/eventbridge_test.go b/tools/lambda-promtail/lambda-promtail/eventbridge_test.go new file mode 100644 index 0000000000000..e919bb4d89eb7 --- /dev/null +++ b/tools/lambda-promtail/lambda-promtail/eventbridge_test.go @@ -0,0 +1,63 @@ +package main + +import ( + "context" + "encoding/json" + "github.com/aws/aws-lambda-go/events" + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "os" + "testing" +) + +type testPromtailClient struct{} + +func (t testPromtailClient) sendToPromtail(ctx context.Context, b *batch) error { + return nil +} + +func Test_processEventBridgeEvent(t *testing.T) { + logger := log.NewNopLogger() + t.Run("s3 object created event", func(t *testing.T) { + bs, err := os.ReadFile("../testdata/eventbridge-s3-event.json") + require.NoError(t, err) + + var ebEvent events.CloudWatchEvent + require.NoError(t, json.Unmarshal(bs, &ebEvent)) + + processor := s3EventProcessor(func(ctx context.Context, ev *events.S3Event, pc Client, log *log.Logger) error { + require.Len(t, ev.Records, 1) + require.Equal(t, events.S3EventRecord{ + AWSRegion: "us-east-2", + S3: events.S3Entity{ + Bucket: events.S3Bucket{ + Name: "bucket", + }, + Object: events.S3Object{ + Key: "pizza.txt", + }, + }, + }, ev.Records[0]) + return nil + }) + + err = processEventBridgeEvent(context.Background(), &ebEvent, testPromtailClient{}, &logger, processor) + require.NoError(t, err) + + t.Run("s3 object created event", func(t *testing.T) { + var ebEvent = events.CloudWatchEvent{ + Source: "aws.s3", + // picking a different s3 event type + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-mapping-troubleshooting.html + DetailType: "Object Restore Initiated", + } + + processor := s3EventProcessor(func(ctx context.Context, ev *events.S3Event, pc Client, log *log.Logger) error { + return nil + }) + + err = processEventBridgeEvent(context.Background(), &ebEvent, testPromtailClient{}, &logger, processor) + require.Error(t, err, "expected process to fail due to unsupported event type") + }) + }) +} diff --git a/tools/lambda-promtail/lambda-promtail/main.go b/tools/lambda-promtail/lambda-promtail/main.go index e140fb842668f..3d230ca1334fc 100644 --- a/tools/lambda-promtail/lambda-promtail/main.go +++ b/tools/lambda-promtail/lambda-promtail/main.go @@ -139,8 +139,9 @@ func checkEventType(ev map[string]interface{}) (interface{}, error) { var kinesisEvent events.KinesisEvent var sqsEvent events.SQSEvent var snsEvent events.SNSEvent + var eventBridgeEvent events.CloudWatchEvent - types := [...]interface{}{&s3Event, &s3TestEvent, &cwEvent, &kinesisEvent, &sqsEvent, &snsEvent} + types := [...]interface{}{&s3Event, &s3TestEvent, &cwEvent, &kinesisEvent, &sqsEvent, &snsEvent, &eventBridgeEvent} j, _ := json.Marshal(ev) reader := strings.NewReader(string(j)) @@ -185,6 +186,8 @@ func handler(ctx context.Context, ev map[string]interface{}) error { } switch evt := event.(type) { + case *events.CloudWatchEvent: + err = processEventBridgeEvent(ctx, evt, pClient, pClient.log, processS3Event) case *events.S3Event: err = processS3Event(ctx, evt, pClient, pClient.log) case *events.CloudwatchLogsEvent: diff --git a/tools/lambda-promtail/testdata/eventbridge-s3-event.json b/tools/lambda-promtail/testdata/eventbridge-s3-event.json new file mode 100644 index 0000000000000..6f2ae7648214b --- /dev/null +++ b/tools/lambda-promtail/testdata/eventbridge-s3-event.json @@ -0,0 +1,28 @@ +{ + "version": "0", + "id": "52f34d1b-bd8e-5a96-4e0a-0343037b0e63", + "detail-type": "Object Created", + "source": "aws.s3", + "account": "123", + "time": "2023-08-25T17:41:58Z", + "region": "us-east-2", + "resources": [ + "arn:aws:s3:::thepalbi-test" + ], + "detail": { + "version": "0", + "bucket": { + "name": "bucket" + }, + "object": { + "key": "pizza.txt", + "size": 100, + "etag": "8adc5937e635f6c9af646f0b23560fae", + "sequencer": "0064E8E7E6404BABBD" + }, + "request-id": "VHS8EE09Q94HJZDZ", + "requester": "366620023056", + "source-ip-address": "1.167.76.151", + "reason": "PutObject" + } +}