diff --git a/contrib/envoyproxy/go-control-plane/envoy.go b/contrib/envoyproxy/go-control-plane/envoy.go new file mode 100644 index 0000000000..52279e0138 --- /dev/null +++ b/contrib/envoyproxy/go-control-plane/envoy.go @@ -0,0 +1,350 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package go_control_plane + +import ( + "context" + "errors" + "io" + "math" + "net/http" + "strings" + + "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/httptrace" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/emitter/waf/actions" + "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + envoycore "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + envoyextproc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + envoytypes "github.com/envoyproxy/go-control-plane/envoy/type/v3" +) + +const componentName = "envoyproxy/go-control-plane" + +func init() { + telemetry.LoadIntegration(componentName) + tracer.MarkIntegrationImported("github.com/envoyproxy/go-control-plane") +} + +// appsecEnvoyExternalProcessorServer is a server that implements the Envoy ExternalProcessorServer interface. +type appsecEnvoyExternalProcessorServer struct { + envoyextproc.ExternalProcessorServer +} + +// AppsecEnvoyExternalProcessorServer creates and returns a new instance of appsecEnvoyExternalProcessorServer. +func AppsecEnvoyExternalProcessorServer(userImplementation envoyextproc.ExternalProcessorServer) envoyextproc.ExternalProcessorServer { + return &appsecEnvoyExternalProcessorServer{userImplementation} +} + +type currentRequest struct { + span tracer.Span + afterHandle func() + ctx context.Context + fakeResponseWriter *fakeResponseWriter + wrappedResponseWriter http.ResponseWriter +} + +// Process handles the bidirectional stream that Envoy uses to give the server control +// over what the filter does. It processes incoming requests and sends appropriate responses +// based on the type of request received. +// +// The method receive incoming requests, processes them, and sends responses back to the client. +// It handles different types of requests such as request headers, response headers, request body, +// response body, request trailers, and response trailers. +// +// If the request is blocked, it sends an immediate response and ends the stream. If an error occurs +// during processing, it logs the error and returns an appropriate gRPC status error. +func (s *appsecEnvoyExternalProcessorServer) Process(processServer envoyextproc.ExternalProcessor_ProcessServer) error { + var ( + ctx = processServer.Context() + blocked bool + currentRequest *currentRequest + processingRequest envoyextproc.ProcessingRequest + processingResponse *envoyextproc.ProcessingResponse + ) + + // Close the span when the request is done processing + defer func() { + if currentRequest == nil { + return + } + + log.Warn("external_processing: stream stopped during a request, making sure the current span is closed\n") + currentRequest.span.Finish() + currentRequest = nil + }() + + for { + select { + case <-ctx.Done(): + if errors.Is(ctx.Err(), context.Canceled) { + return nil + } + + return ctx.Err() + default: + // no op + } + + err := processServer.RecvMsg(&processingRequest) + if err != nil { + // Note: Envoy is inconsistent with the "end_of_stream" value of its headers responses, + // so we can't fully rely on it to determine when it will close (cancel) the stream. + if s, ok := status.FromError(err); (ok && s.Code() == codes.Canceled) || err == io.EOF { + return nil + } + + log.Warn("external_processing: error receiving request/response: %v\n", err) + return status.Errorf(codes.Unknown, "Error receiving request/response: %v", err) + } + + processingResponse, err = envoyExternalProcessingRequestTypeAssert(&processingRequest) + if err != nil { + log.Error("external_processing: error asserting request type: %v\n", err) + return status.Errorf(codes.Unknown, "Error asserting request type: %v", err) + } + + switch v := processingRequest.Request.(type) { + case *envoyextproc.ProcessingRequest_RequestHeaders: + processingResponse, currentRequest, blocked, err = processRequestHeaders(ctx, v) + case *envoyextproc.ProcessingRequest_ResponseHeaders: + processingResponse, err = processResponseHeaders(v, currentRequest) + currentRequest = nil // Request is done, reset the current request + } + + if err != nil { + log.Error("external_processing: error processing request: %v\n", err) + return err + } + + // End of stream reached, no more data to process + if processingResponse == nil { + log.Debug("external_processing: end of stream reached") + return nil + } + + if err := processServer.SendMsg(processingResponse); err != nil { + log.Warn("external_processing: error sending response (probably because of an Envoy timeout): %v", err) + return status.Errorf(codes.Unknown, "Error sending response (probably because of an Envoy timeout): %v", err) + } + + if !blocked { + continue + } + + log.Debug("external_processing: request blocked, end the stream") + currentRequest = nil + return nil + } +} + +func envoyExternalProcessingRequestTypeAssert(req *envoyextproc.ProcessingRequest) (*envoyextproc.ProcessingResponse, error) { + switch r := req.Request.(type) { + case *envoyextproc.ProcessingRequest_RequestHeaders, *envoyextproc.ProcessingRequest_ResponseHeaders: + return nil, nil + + case *envoyextproc.ProcessingRequest_RequestBody: + // TODO: Handle request raw body in the WAF + return &envoyextproc.ProcessingResponse{ + Response: &envoyextproc.ProcessingResponse_RequestBody{ + RequestBody: &envoyextproc.BodyResponse{ + Response: &envoyextproc.CommonResponse{ + Status: envoyextproc.CommonResponse_CONTINUE, + }, + }, + }, + }, nil + + case *envoyextproc.ProcessingRequest_RequestTrailers: + return &envoyextproc.ProcessingResponse{ + Response: &envoyextproc.ProcessingResponse_RequestTrailers{}, + }, nil + + case *envoyextproc.ProcessingRequest_ResponseBody: + // Note: The end of stream bool value is not reliable + // Sometimes it's not set to true even if there is no more data to process + if r.ResponseBody.GetEndOfStream() { + return nil, nil + } + + // TODO: Handle response raw body in the WAF + return &envoyextproc.ProcessingResponse{ + Response: &envoyextproc.ProcessingResponse_ResponseBody{}, + }, nil + + case *envoyextproc.ProcessingRequest_ResponseTrailers: + return &envoyextproc.ProcessingResponse{ + Response: &envoyextproc.ProcessingResponse_RequestTrailers{}, + }, nil + + default: + return nil, status.Errorf(codes.Unknown, "Unknown request type: %T", r) + } +} + +func processRequestHeaders(ctx context.Context, req *envoyextproc.ProcessingRequest_RequestHeaders) (*envoyextproc.ProcessingResponse, *currentRequest, bool, error) { + log.Debug("external_processing: received request headers: %v\n", req.RequestHeaders) + + request, err := newRequest(ctx, req) + if err != nil { + return nil, nil, false, status.Errorf(codes.InvalidArgument, "Error processing request headers from ext_proc: %v", err) + } + + var blocked bool + fakeResponseWriter := newFakeResponseWriter() + wrappedResponseWriter, request, afterHandle, blocked := httptrace.BeforeHandle(&httptrace.ServeConfig{ + SpanOpts: []ddtrace.StartSpanOption{ + tracer.Tag(ext.SpanKind, ext.SpanKindServer), + tracer.Tag(ext.Component, componentName), + }, + }, fakeResponseWriter, request) + + // Block handling: If triggered, we need to block the request, return an immediate response + if blocked { + afterHandle() + return doBlockResponse(fakeResponseWriter), nil, true, nil + } + + span, ok := tracer.SpanFromContext(request.Context()) + if !ok { + return nil, nil, false, status.Errorf(codes.Unknown, "Error getting span from context") + } + + processingResponse, err := propagationRequestHeaderMutation(span) + if err != nil { + return nil, nil, false, err + } + + return processingResponse, ¤tRequest{ + span: span, + ctx: request.Context(), + fakeResponseWriter: fakeResponseWriter, + wrappedResponseWriter: wrappedResponseWriter, + afterHandle: afterHandle, + }, false, nil +} + +func propagationRequestHeaderMutation(span ddtrace.Span) (*envoyextproc.ProcessingResponse, error) { + newHeaders := make(http.Header) + if err := tracer.Inject(span.Context(), tracer.HTTPHeadersCarrier(newHeaders)); err != nil { + return nil, status.Errorf(codes.Unknown, "Error injecting headers: %v", err) + } + + if len(newHeaders) > 0 { + log.Debug("external_processing: injecting propagation headers: %v\n", newHeaders) + } + + headerValueOptions := make([]*envoycore.HeaderValueOption, 0, len(newHeaders)) + for k, v := range newHeaders { + headerValueOptions = append(headerValueOptions, &envoycore.HeaderValueOption{ + Header: &envoycore.HeaderValue{ + Key: k, + RawValue: []byte(strings.Join(v, ",")), + }, + }) + } + + return &envoyextproc.ProcessingResponse{ + Response: &envoyextproc.ProcessingResponse_RequestHeaders{ + RequestHeaders: &envoyextproc.HeadersResponse{ + Response: &envoyextproc.CommonResponse{ + Status: envoyextproc.CommonResponse_CONTINUE, + HeaderMutation: &envoyextproc.HeaderMutation{ + SetHeaders: headerValueOptions, + }, + }, + }, + }, + }, nil +} + +func processResponseHeaders(res *envoyextproc.ProcessingRequest_ResponseHeaders, currentRequest *currentRequest) (*envoyextproc.ProcessingResponse, error) { + log.Debug("external_processing: received response headers: %v\n", res.ResponseHeaders) + + if err := createFakeResponseWriter(currentRequest.wrappedResponseWriter, res); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "Error processing response headers from ext_proc: %v", err) + } + + var blocked bool + + // Now we need to know if the request has been blocked, but we don't have any other way than to look for the operation and bind a blocking data listener to it + op, ok := dyngo.FromContext(currentRequest.ctx) + if ok { + dyngo.OnData(op, func(_ *actions.BlockHTTP) { + // We already wrote over the response writer, we need to reset it so the blocking handler can write to it + httptrace.ResetStatusCode(currentRequest.wrappedResponseWriter) + currentRequest.fakeResponseWriter.Reset() + blocked = true + }) + } + + currentRequest.afterHandle() + + if blocked { + response := doBlockResponse(currentRequest.fakeResponseWriter) + return response, nil + } + + log.Debug("external_processing: finishing request with status code: %v\n", currentRequest.fakeResponseWriter.status) + + // Note: (cf. comment in the stream error handling) + // The end of stream bool value is not reliable + if res.ResponseHeaders.GetEndOfStream() { + return nil, nil + } + + return &envoyextproc.ProcessingResponse{ + Response: &envoyextproc.ProcessingResponse_ResponseHeaders{ + ResponseHeaders: &envoyextproc.HeadersResponse{ + Response: &envoyextproc.CommonResponse{ + Status: envoyextproc.CommonResponse_CONTINUE, + }, + }, + }, + }, nil +} + +func doBlockResponse(writer *fakeResponseWriter) *envoyextproc.ProcessingResponse { + var headersMutation []*envoycore.HeaderValueOption + for k, v := range writer.headers { + headersMutation = append(headersMutation, &envoycore.HeaderValueOption{ + Header: &envoycore.HeaderValue{ + Key: k, + RawValue: []byte(strings.Join(v, ",")), + }, + }) + } + + var int32StatusCode int32 = 0 + if writer.status > 0 && writer.status <= math.MaxInt32 { + int32StatusCode = int32(writer.status) + } + + return &envoyextproc.ProcessingResponse{ + Response: &envoyextproc.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &envoyextproc.ImmediateResponse{ + Status: &envoytypes.HttpStatus{ + Code: envoytypes.StatusCode(int32StatusCode), + }, + Headers: &envoyextproc.HeaderMutation{ + SetHeaders: headersMutation, + }, + Body: string(writer.body), + GrpcStatus: &envoyextproc.GrpcStatus{ + Status: 0, + }, + }, + }, + } +} diff --git a/contrib/envoyproxy/go-control-plane/envoy_test.go b/contrib/envoyproxy/go-control-plane/envoy_test.go new file mode 100644 index 0000000000..8af05eaab3 --- /dev/null +++ b/contrib/envoyproxy/go-control-plane/envoy_test.go @@ -0,0 +1,576 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package go_control_plane + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "testing" + + envoyextproc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + envoytypes "github.com/envoyproxy/go-control-plane/envoy/type/v3" + + ddgrpc "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/grpc" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec" + + v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +func TestAppSec(t *testing.T) { + appsec.Start() + defer appsec.Stop() + if !appsec.Enabled() { + t.Skip("appsec disabled") + } + + setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) { + rig, err := newEnvoyAppsecRig(t, false) + require.NoError(t, err) + + mt := mocktracer.Start() + + return rig.client, mt, func() { + rig.Close() + mt.Stop() + } + } + + t.Run("monitoring-event-on-request", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + end2EndStreamRequest(t, stream, "/", "GET", map[string]string{"User-Agent": "dd-test-scanner-log"}, map[string]string{}, false) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + checkForAppsecEvent(t, finished, map[string]int{"ua0-600-55x": 1}) + }) + + t.Run("blocking-event-on-request", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestHeaders{ + RequestHeaders: &envoyextproc.HttpHeaders{ + Headers: makeRequestHeaders(t, map[string]string{"User-Agent": "dd-test-scanner-log-block"}, "GET", "/"), + }, + }, + }) + require.NoError(t, err) + + res, err := stream.Recv() + require.Equal(t, uint32(0), res.GetImmediateResponse().GetGrpcStatus().Status) + require.Equal(t, envoytypes.StatusCode(403), res.GetImmediateResponse().GetStatus().Code) + require.Equal(t, "Content-Type", res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().Key) + require.Equal(t, "application/json", string(res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().RawValue)) + require.NoError(t, err) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + checkForAppsecEvent(t, finished, map[string]int{"ua0-600-56x": 1}) + + // Check for tags + span := finished[0] + require.Equal(t, true, span.Tag("appsec.event")) + require.Equal(t, true, span.Tag("appsec.blocked")) + }) +} + +func TestBlockingWithUserRulesFile(t *testing.T) { + t.Setenv("DD_APPSEC_RULES", "../../../internal/appsec/testdata/user_rules.json") + appsec.Start() + defer appsec.Stop() + if !appsec.Enabled() { + t.Skip("appsec disabled") + } + + setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) { + rig, err := newEnvoyAppsecRig(t, false) + require.NoError(t, err) + + mt := mocktracer.Start() + + return rig.client, mt, func() { + rig.Close() + mt.Stop() + } + } + + t.Run("blocking-event-on-response", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + end2EndStreamRequest(t, stream, "/", "OPTION", map[string]string{"User-Agent": "dd-test-scanner-log-block"}, map[string]string{"User-Agent": "match-response-headers"}, true) + + // Handle the immediate response + res, err := stream.Recv() + require.Equal(t, uint32(0), res.GetImmediateResponse().GetGrpcStatus().Status) + require.Equal(t, envoytypes.StatusCode(418), res.GetImmediateResponse().GetStatus().Code) // 418 because of the rule file + require.Len(t, res.GetImmediateResponse().GetHeaders().SetHeaders, 1) + require.Equal(t, "Content-Type", res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().Key) + require.Equal(t, "application/json", string(res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().RawValue)) + require.NoError(t, err) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + checkForAppsecEvent(t, finished, map[string]int{"headers-003": 1}) + + // Check for tags + span := finished[0] + require.Equal(t, 1, span.Tag("_dd.appsec.enabled")) + require.Equal(t, true, span.Tag("appsec.event")) + require.Equal(t, true, span.Tag("appsec.blocked")) + }) + + t.Run("blocking-event-on-request-on-query", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestHeaders{ + RequestHeaders: &envoyextproc.HttpHeaders{ + Headers: makeRequestHeaders(t, map[string]string{"User-Agent": "Mistake Not..."}, "GET", "/hello?match=match-request-query"), + }, + }, + }) + require.NoError(t, err) + + res, err := stream.Recv() + require.Equal(t, uint32(0), res.GetImmediateResponse().GetGrpcStatus().Status) + require.Equal(t, envoytypes.StatusCode(418), res.GetImmediateResponse().GetStatus().Code) + require.Equal(t, "Content-Type", res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().Key) + require.Equal(t, "application/json", string(res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().RawValue)) + require.NoError(t, err) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + checkForAppsecEvent(t, finished, map[string]int{"query-002": 1}) + + // Check for tags + span := finished[0] + require.Equal(t, true, span.Tag("appsec.event")) + require.Equal(t, true, span.Tag("appsec.blocked")) + }) + + t.Run("blocking-event-on-request-on-cookies", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestHeaders{ + RequestHeaders: &envoyextproc.HttpHeaders{ + Headers: makeRequestHeaders(t, map[string]string{"Cookie": "foo=jdfoSDGFkivRG_234"}, "OPTIONS", "/"), + }, + }, + }) + require.NoError(t, err) + + res, err := stream.Recv() + require.Equal(t, uint32(0), res.GetImmediateResponse().GetGrpcStatus().Status) + require.Equal(t, envoytypes.StatusCode(418), res.GetImmediateResponse().GetStatus().Code) + require.Equal(t, "Content-Type", res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().Key) + require.Equal(t, "application/json", string(res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().RawValue)) + require.NoError(t, err) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + checkForAppsecEvent(t, finished, map[string]int{"tst-037-008": 1}) + + // Check for tags + span := finished[0] + require.Equal(t, true, span.Tag("appsec.event")) + require.Equal(t, true, span.Tag("appsec.blocked")) + }) +} + +func TestGeneratedSpan(t *testing.T) { + setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) { + rig, err := newEnvoyAppsecRig(t, false) + require.NoError(t, err) + + mt := mocktracer.Start() + + return rig.client, mt, func() { + rig.Close() + mt.Stop() + } + } + + t.Run("request-span", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + end2EndStreamRequest(t, stream, "/resource-span", "GET", map[string]string{"user-agent": "Mistake Not...", "test-key": "test-value"}, map[string]string{"response-test-key": "response-test-value"}, false) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + + // Check for tags + span := finished[0] + require.Equal(t, "http.request", span.OperationName()) + require.Equal(t, "https://datadoghq.com/resource-span", span.Tag("http.url")) + require.Equal(t, "GET", span.Tag("http.method")) + require.Equal(t, "datadoghq.com", span.Tag("http.host")) + // require.Equal(t, "GET /resource-span", span.Tag("resource.name")) + require.Equal(t, "server", span.Tag("span.kind")) + require.Equal(t, "Mistake Not...", span.Tag("http.useragent")) + }) +} + +func TestXForwardedForHeaderClientIp(t *testing.T) { + t.Setenv("DD_APPSEC_RULES", "../../../internal/appsec/testdata/blocking.json") + appsec.Start() + defer appsec.Stop() + if !appsec.Enabled() { + t.Skip("appsec disabled") + } + + setup := func() (envoyextproc.ExternalProcessorClient, mocktracer.Tracer, func()) { + rig, err := newEnvoyAppsecRig(t, false) + require.NoError(t, err) + + mt := mocktracer.Start() + + return rig.client, mt, func() { + rig.Close() + mt.Stop() + } + } + + t.Run("client-ip", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + end2EndStreamRequest(t, stream, "/", "OPTION", + map[string]string{"User-Agent": "Mistake not...", "X-Forwarded-For": "18.18.18.18"}, + map[string]string{"User-Agent": "match-response-headers"}, + true) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + + // Check for tags + span := finished[0] + require.Equal(t, "18.18.18.18", span.Tag("http.client_ip")) + + // Appsec + require.Equal(t, 1, span.Tag("_dd.appsec.enabled")) + }) + + t.Run("blocking-client-ip", func(t *testing.T) { + client, mt, cleanup := setup() + defer cleanup() + + ctx := context.Background() + stream, err := client.Process(ctx) + require.NoError(t, err) + + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestHeaders{ + RequestHeaders: &envoyextproc.HttpHeaders{ + Headers: makeRequestHeaders(t, map[string]string{"User-Agent": "Mistake not...", "X-Forwarded-For": "1.2.3.4"}, "GET", "/"), + }, + }, + }) + require.NoError(t, err) + + // Handle the immediate response + res, err := stream.Recv() + require.Equal(t, uint32(0), res.GetImmediateResponse().GetGrpcStatus().Status) + require.Equal(t, envoytypes.StatusCode(403), res.GetImmediateResponse().GetStatus().Code) + require.Equal(t, "Content-Type", res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().Key) + require.Equal(t, "application/json", string(res.GetImmediateResponse().GetHeaders().SetHeaders[0].GetHeader().RawValue)) + require.NoError(t, err) + + err = stream.CloseSend() + require.NoError(t, err) + stream.Recv() // to flush the spans + + finished := mt.FinishedSpans() + require.Len(t, finished, 1) + checkForAppsecEvent(t, finished, map[string]int{"blk-001-001": 1}) + + // Check for tags + span := finished[0] + require.Equal(t, "1.2.3.4", span.Tag("http.client_ip")) + require.Equal(t, 1, span.Tag("_dd.appsec.enabled")) + require.Equal(t, true, span.Tag("appsec.event")) + require.Equal(t, true, span.Tag("appsec.blocked")) + }) +} + +func newEnvoyAppsecRig(t *testing.T, traceClient bool, interceptorOpts ...ddgrpc.Option) (*envoyAppsecRig, error) { + t.Helper() + + interceptorOpts = append([]ddgrpc.InterceptorOption{ddgrpc.WithServiceName("grpc")}, interceptorOpts...) + + server := grpc.NewServer() + + fixtureServer := new(envoyFixtureServer) + appsecSrv := AppsecEnvoyExternalProcessorServer(fixtureServer) + envoyextproc.RegisterExternalProcessorServer(server, appsecSrv) + + li, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, err + } + _, port, _ := net.SplitHostPort(li.Addr().String()) + // start our test fixtureServer. + go server.Serve(li) + + opts := []grpc.DialOption{grpc.WithInsecure()} + if traceClient { + opts = append(opts, + grpc.WithStreamInterceptor(ddgrpc.StreamClientInterceptor(interceptorOpts...)), + ) + } + conn, err := grpc.Dial(li.Addr().String(), opts...) + if err != nil { + return nil, fmt.Errorf("error dialing: %s", err) + } + return &envoyAppsecRig{ + fixtureServer: fixtureServer, + listener: li, + port: port, + server: server, + conn: conn, + client: envoyextproc.NewExternalProcessorClient(conn), + }, err +} + +// rig contains all servers and connections we'd need for a grpc integration test +type envoyAppsecRig struct { + fixtureServer *envoyFixtureServer + server *grpc.Server + port string + listener net.Listener + conn *grpc.ClientConn + client envoyextproc.ExternalProcessorClient +} + +func (r *envoyAppsecRig) Close() { + r.server.Stop() + r.conn.Close() +} + +type envoyFixtureServer struct { + envoyextproc.ExternalProcessorServer +} + +// Helper functions + +func end2EndStreamRequest(t *testing.T, stream envoyextproc.ExternalProcessor_ProcessClient, path string, method string, requestHeaders map[string]string, responseHeaders map[string]string, blockOnResponse bool) { + t.Helper() + + // First part: request + // 1- Send the headers + err := stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestHeaders{ + RequestHeaders: &envoyextproc.HttpHeaders{ + Headers: makeRequestHeaders(t, requestHeaders, method, path), + }, + }, + }) + require.NoError(t, err) + + res, err := stream.Recv() + require.NoError(t, err) + require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetRequestHeaders().GetResponse().GetStatus()) + + // 2- Send the body + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestBody{ + RequestBody: &envoyextproc.HttpBody{ + Body: []byte("body"), + }, + }, + }) + require.NoError(t, err) + + res, err = stream.Recv() + require.NoError(t, err) + require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetRequestBody().GetResponse().GetStatus()) + + // 3- Send the trailers + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_RequestTrailers{ + RequestTrailers: &envoyextproc.HttpTrailers{ + Trailers: &v3.HeaderMap{ + Headers: []*v3.HeaderValue{ + {Key: "key", Value: "value"}, + }, + }, + }, + }, + }) + require.NoError(t, err) + + res, err = stream.Recv() + require.NoError(t, err) + require.NotNil(t, res.GetRequestTrailers()) + + // Second part: response + // 1- Send the response headers + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_ResponseHeaders{ + ResponseHeaders: &envoyextproc.HttpHeaders{ + Headers: makeResponseHeaders(t, responseHeaders, "200"), + }, + }, + }) + require.NoError(t, err) + + if blockOnResponse { + // Should have received an immediate response for blocking + // Let the test handle the response + return + } + + res, err = stream.Recv() + require.NoError(t, err) + require.Equal(t, envoyextproc.CommonResponse_CONTINUE, res.GetResponseHeaders().GetResponse().GetStatus()) + + // 2- Send the response body + err = stream.Send(&envoyextproc.ProcessingRequest{ + Request: &envoyextproc.ProcessingRequest_ResponseBody{ + ResponseBody: &envoyextproc.HttpBody{ + Body: []byte("body"), + EndOfStream: true, + }, + }, + }) + require.NoError(t, err) + + // The stream should now be closed + _, err = stream.Recv() + require.Equal(t, io.EOF, err) +} + +func checkForAppsecEvent(t *testing.T, finished []mocktracer.Span, expectedRuleIDs map[string]int) { + t.Helper() + + // The request should have the attack attempts + event := finished[len(finished)-1].Tag("_dd.appsec.json") + require.NotNil(t, event, "the _dd.appsec.json tag was not found") + + jsonText := event.(string) + type trigger struct { + Rule struct { + ID string `json:"id"` + } `json:"rule"` + } + var parsed struct { + Triggers []trigger `json:"triggers"` + } + err := json.Unmarshal([]byte(jsonText), &parsed) + require.NoError(t, err) + + histogram := map[string]uint8{} + for _, tr := range parsed.Triggers { + histogram[tr.Rule.ID]++ + } + + for ruleID, count := range expectedRuleIDs { + require.Equal(t, count, int(histogram[ruleID]), "rule %s has been triggered %d times but expected %d") + } + + require.Len(t, parsed.Triggers, len(expectedRuleIDs), "unexpected number of rules triggered") +} + +// Construct request headers +func makeRequestHeaders(t *testing.T, headers map[string]string, method string, path string) *v3.HeaderMap { + t.Helper() + + h := &v3.HeaderMap{} + for k, v := range headers { + h.Headers = append(h.Headers, &v3.HeaderValue{Key: k, RawValue: []byte(v)}) + } + + h.Headers = append(h.Headers, + &v3.HeaderValue{Key: ":method", RawValue: []byte(method)}, + &v3.HeaderValue{Key: ":path", RawValue: []byte(path)}, + &v3.HeaderValue{Key: ":scheme", RawValue: []byte("https")}, + &v3.HeaderValue{Key: ":authority", RawValue: []byte("datadoghq.com")}, + ) + + return h +} + +func makeResponseHeaders(t *testing.T, headers map[string]string, status string) *v3.HeaderMap { + t.Helper() + + h := &v3.HeaderMap{} + for k, v := range headers { + h.Headers = append(h.Headers, &v3.HeaderValue{Key: k, RawValue: []byte(v)}) + } + + h.Headers = append(h.Headers, &v3.HeaderValue{Key: ":status", RawValue: []byte(status)}) + + return h +} diff --git a/contrib/envoyproxy/go-control-plane/example_test.go b/contrib/envoyproxy/go-control-plane/example_test.go new file mode 100644 index 0000000000..f1e255dcaf --- /dev/null +++ b/contrib/envoyproxy/go-control-plane/example_test.go @@ -0,0 +1,44 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package go_control_plane_test + +import ( + "log" + "net" + + "google.golang.org/grpc" + + extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + gocontrolplane "gopkg.in/DataDog/dd-trace-go.v1/contrib/envoyproxy/go-control-plane" +) + +// interface fpr external processing server +type envoyExtProcServer struct { + extprocv3.ExternalProcessorServer +} + +func Example_server() { + // Create a listener for the server. + ln, err := net.Listen("tcp", ":50051") + if err != nil { + log.Fatal(err) + } + + // Initialize the grpc server as normal, using the envoy server interceptor. + s := grpc.NewServer() + srv := &envoyExtProcServer{} + + // Register the appsec envoy external processor service + appsecSrv := gocontrolplane.AppsecEnvoyExternalProcessorServer(srv) + extprocv3.RegisterExternalProcessorServer(s, appsecSrv) + + // ... register your services + + // Start serving incoming connections. + if err := s.Serve(ln); err != nil { + log.Fatalf("failed to serve: %v", err) + } +} diff --git a/contrib/envoyproxy/go-control-plane/fakehttp.go b/contrib/envoyproxy/go-control-plane/fakehttp.go new file mode 100644 index 0000000000..3f20725e1b --- /dev/null +++ b/contrib/envoyproxy/go-control-plane/fakehttp.go @@ -0,0 +1,189 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package go_control_plane + +import ( + "context" + "crypto/tls" + "fmt" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + + corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + extproc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "google.golang.org/grpc/metadata" +) + +// checkPseudoRequestHeaders Verify the required HTTP2 headers are present +// Some mandatory headers need to be set. It can happen when it wasn't a real HTTP2 request sent by Envoy, +func checkPseudoRequestHeaders(headers map[string]string) error { + for _, header := range []string{":authority", ":scheme", ":path", ":method"} { + if _, ok := headers[header]; !ok { + return fmt.Errorf("missing required headers: %q", header) + } + } + + return nil +} + +// checkPseudoResponseHeaders verifies the required HTTP2 headers are present +// Some mandatory headers need to be set. It can happen when it wasn't a real HTTP2 request sent by Envoy, +func checkPseudoResponseHeaders(headers map[string]string) error { + if _, ok := headers[":status"]; !ok { + return fmt.Errorf("missing required ':status' headers") + } + + return nil +} + +func getRemoteAddr(md metadata.MD) string { + xfwd := md.Get("x-forwarded-for") + length := len(xfwd) + if length == 0 { + return "" + } + + // Get the first right value of x-forwarded-for headers + // The rightmost IP address is the one that will be used as the remote client IP + // https://datadoghq.atlassian.net/wiki/spaces/TS/pages/2766733526/Sensitive+IP+information#Where-does-the-value-of-the-http.client_ip-tag-come-from%3F + return xfwd[length-1] +} + +// splitPseudoHeaders splits normal headers of the initial request made by the client and the pseudo headers of HTTP/2 +// - Format the headers to be used by the tracer as a map[string][]string +// - Set headers keys to be canonical +func splitPseudoHeaders(receivedHeaders []*corev3.HeaderValue) (headers map[string][]string, pseudoHeaders map[string]string) { + headers = make(map[string][]string, len(receivedHeaders)-4) + pseudoHeaders = make(map[string]string, 4) + for _, v := range receivedHeaders { + key := v.GetKey() + if key == "" { + continue + } + if key[0] == ':' { + pseudoHeaders[key] = string(v.GetRawValue()) + continue + } + + headers[http.CanonicalHeaderKey(key)] = []string{string(v.GetRawValue())} + } + return headers, pseudoHeaders +} + +func createFakeResponseWriter(w http.ResponseWriter, res *extproc.ProcessingRequest_ResponseHeaders) error { + headers, pseudoHeaders := splitPseudoHeaders(res.ResponseHeaders.GetHeaders().GetHeaders()) + + if err := checkPseudoResponseHeaders(pseudoHeaders); err != nil { + return err + } + + status, err := strconv.Atoi(pseudoHeaders[":status"]) + if err != nil { + return fmt.Errorf("error parsing status code %q: %w", pseudoHeaders[":status"], err) + } + + for k, v := range headers { + w.Header().Set(k, strings.Join(v, ",")) + } + + w.WriteHeader(status) + return nil +} + +// newRequest creates a new http.Request from an ext_proc RequestHeaders message +func newRequest(ctx context.Context, req *extproc.ProcessingRequest_RequestHeaders) (*http.Request, error) { + headers, pseudoHeaders := splitPseudoHeaders(req.RequestHeaders.GetHeaders().GetHeaders()) + if err := checkPseudoRequestHeaders(pseudoHeaders); err != nil { + return nil, err + } + + parsedURL, err := url.Parse(fmt.Sprintf("%s://%s%s", pseudoHeaders[":scheme"], pseudoHeaders[":authority"], pseudoHeaders[":path"])) + if err != nil { + return nil, fmt.Errorf( + "error building envoy URI from scheme %q, from host %q and from path %q: %w", + pseudoHeaders[":scheme"], + pseudoHeaders[":host"], + pseudoHeaders[":path"], + err) + } + + var remoteAddr string + md, ok := metadata.FromIncomingContext(ctx) + if ok { + remoteAddr = getRemoteAddr(md) + } + + var tlsState *tls.ConnectionState + if pseudoHeaders[":scheme"] == "https" { + tlsState = &tls.ConnectionState{} + } + + headers["Host"] = append(headers["Host"], pseudoHeaders[":authority"]) + + return (&http.Request{ + Method: pseudoHeaders[":method"], + Host: pseudoHeaders[":authority"], + RequestURI: pseudoHeaders[":path"], + URL: parsedURL, + Header: headers, + RemoteAddr: remoteAddr, + TLS: tlsState, + }).WithContext(ctx), nil +} + +type fakeResponseWriter struct { + mu sync.Mutex + status int + body []byte + headers http.Header +} + +// Reset resets the fakeResponseWriter to its initial state +func (w *fakeResponseWriter) Reset() { + w.mu.Lock() + defer w.mu.Unlock() + w.status = 0 + w.body = nil + w.headers = make(http.Header) +} + +// Status is not in the [http.ResponseWriter] interface, but it is cast into it by the tracing code +func (w *fakeResponseWriter) Status() int { + w.mu.Lock() + defer w.mu.Unlock() + return w.status +} + +func (w *fakeResponseWriter) WriteHeader(status int) { + w.mu.Lock() + defer w.mu.Unlock() + w.status = status +} + +func (w *fakeResponseWriter) Header() http.Header { + w.mu.Lock() + defer w.mu.Unlock() + return w.headers +} + +func (w *fakeResponseWriter) Write(b []byte) (int, error) { + w.mu.Lock() + defer w.mu.Unlock() + w.body = append(w.body, b...) + return len(b), nil +} + +var _ http.ResponseWriter = &fakeResponseWriter{} + +// newFakeResponseWriter creates a new fakeResponseWriter that can be used to store the response a [http.Handler] made +func newFakeResponseWriter() *fakeResponseWriter { + return &fakeResponseWriter{ + headers: make(http.Header), + } +} diff --git a/contrib/internal/httptrace/response_writer.go b/contrib/internal/httptrace/response_writer.go index 2bbc31bad7..f44fff762f 100644 --- a/contrib/internal/httptrace/response_writer.go +++ b/contrib/internal/httptrace/response_writer.go @@ -16,6 +16,13 @@ type responseWriter struct { status int } +// ResetStatusCode resets the status code of the response writer. +func ResetStatusCode(w http.ResponseWriter) { + if rw, ok := w.(*responseWriter); ok { + rw.status = 0 + } +} + func newResponseWriter(w http.ResponseWriter) *responseWriter { return &responseWriter{w, 0} } diff --git a/go.mod b/go.mod index f3a841b96d..62452c745f 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/elastic/go-elasticsearch/v8 v8.4.0 github.com/emicklei/go-restful v2.16.0+incompatible github.com/emicklei/go-restful/v3 v3.11.0 + github.com/envoyproxy/go-control-plane v0.12.0 github.com/garyburd/redigo v1.6.4 github.com/gin-gonic/gin v1.9.1 github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 @@ -154,6 +155,7 @@ require ( github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect github.com/chenzhuoyu/iasm v0.9.0 // indirect github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect + github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect @@ -162,6 +164,7 @@ require ( github.com/eapache/queue v1.1.0 // indirect github.com/ebitengine/purego v0.6.0-alpha.5 // indirect github.com/elastic/elastic-transport-go/v8 v8.1.0 // indirect + github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect github.com/fatih/color v1.16.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect diff --git a/go.sum b/go.sum index 6471b96ff5..8388b569ef 100644 --- a/go.sum +++ b/go.sum @@ -890,6 +890,8 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50 h1:DBmgJDC9dTfkVyGgipamEh2BpGYxScCH1TOF1LL1cXc= +github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50/go.mod h1:5e1+Vvlzido69INQaVO6d87Qn543Xr6nooe9Kz7oBFM= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5/go.mod h1:h6jFvWxBdQXxjopDMZyH2UVceIRfR84bdzbkoKrsWNo= github.com/cockroachdb/errors v1.2.4/go.mod h1:rQD95gz6FARkaKkQXUksEje/d9a6wBJoCr5oaCLELYA= @@ -1118,9 +1120,13 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/go-control-plane v0.10.3/go.mod h1:fJJn/j26vwOu972OllsvAgJJM//w9BV6Fxbg2LuVd34= +github.com/envoyproxy/go-control-plane v0.12.0 h1:4X+VP1GHd1Mhj6IB5mMeGbLCleqxjletLK6K0rbxyZI= +github.com/envoyproxy/go-control-plane v0.12.0/go.mod h1:ZBTaoJ23lqITozF0M6G4/IragXCQKCnYbmlmtHvwRG0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J6romD608Ba7Hij42vrOBCo= github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= +github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5 h1:Yzb9+7DPaBjB8zlTR87/ElzFsnQfuHnVUVqpZZIcV5Y= github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= diff --git a/internal/appsec/testdata/user_rules.json b/internal/appsec/testdata/user_rules.json index 6acb14089e..a13a0d67ed 100644 --- a/internal/appsec/testdata/user_rules.json +++ b/internal/appsec/testdata/user_rules.json @@ -53,6 +53,33 @@ "block" ] }, + { + "id": "tst-037-008", + "name": "Test block on cookies", + "tags": { + "type": "lfi", + "crs_id": "000008", + "category": "attack_attempt" + }, + "conditions": [ + { + "parameters": { + "inputs": [ + { + "address": "server.request.cookies" + } + ], + "regex": "jdfoSDGFkivRG_234" + }, + "operator": "match_regex" + } + ], + "transformers": [], + "on_match": [ + "block" + ] + }, + { "id": "headers-003", "name": "query match",