From 4da1cae24592395eeaf33f33c6b81ae2a7eda4af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Va=C5=A1ko?= Date: Tue, 3 Sep 2024 08:51:20 +0200 Subject: [PATCH 1/2] feat: Add server response header for HTTP source node --- .../stream/source/httpsource/httpsource.go | 1 + .../stream/source/httpsource/httpsource_test.go | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/internal/pkg/service/stream/source/httpsource/httpsource.go b/internal/pkg/service/stream/source/httpsource/httpsource.go index 9a28c324eb..21c31997d3 100644 --- a/internal/pkg/service/stream/source/httpsource/httpsource.go +++ b/internal/pkg/service/stream/source/httpsource/httpsource.go @@ -70,6 +70,7 @@ func Start(ctx context.Context, d dependencies, cfg Config) error { // Route import requests to the dispatcher router.Post("/stream///", func(c *routing.Context) error { + c.Response.Header.Set("Server", "Keboola stream HTTP source") // Get parameters projectIDStr := c.Param("projectID") projectIDInt, err := strconv.Atoi(projectIDStr) diff --git a/internal/pkg/service/stream/source/httpsource/httpsource_test.go b/internal/pkg/service/stream/source/httpsource/httpsource_test.go index 5293b6c8cf..9f4ba98a0d 100644 --- a/internal/pkg/service/stream/source/httpsource/httpsource_test.go +++ b/internal/pkg/service/stream/source/httpsource/httpsource_test.go @@ -378,6 +378,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { ExpectedStatusCode: http.StatusAccepted, ExpectedHeaders: map[string]string{ "Content-Type": "text/plain", + "Server": "Keboola stream HTTP source", }, ExpectedBody: "OK", }, @@ -395,6 +396,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { ExpectedStatusCode: http.StatusOK, ExpectedHeaders: map[string]string{ "Content-Type": "text/plain", + "Server": "Keboola stream HTTP source", }, ExpectedBody: "OK", }, @@ -411,6 +413,9 @@ func testCases(t *testing.T, ts *testState) []TestCase { Query: "verbose=true", Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize)), ExpectedStatusCode: http.StatusAccepted, + ExpectedHeaders: map[string]string{ + "Server": "Keboola stream HTTP source", + }, ExpectedBody: ` { "statusCode": 202, @@ -460,6 +465,9 @@ func testCases(t *testing.T, ts *testState) []TestCase { Query: "verbose=true", Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize)), ExpectedStatusCode: http.StatusOK, + ExpectedHeaders: map[string]string{ + "Server": "Keboola stream HTTP source", + }, ExpectedBody: ` { "statusCode": 200, @@ -502,7 +510,8 @@ func testCases(t *testing.T, ts *testState) []TestCase { Path: "/stream/123/my-source-1/" + ts.validSecret, Headers: map[string]string{"foo": strings.Repeat(".", ts.maxHeaderSize+1)}, ExpectedStatusCode: http.StatusRequestEntityTooLarge, - ExpectedLogs: `{"level":"info","message":"request header size is over the maximum \"2000B\"","error.type":"%s/errors.HeaderTooLargeError"}`, + // No expected Server headers, fasthttp internal return + ExpectedLogs: `{"level":"info","message":"request header size is over the maximum \"2000B\"","error.type":"%s/errors.HeaderTooLargeError"}`, ExpectedBody: ` { "statusCode": 413, @@ -516,7 +525,8 @@ func testCases(t *testing.T, ts *testState) []TestCase { Path: "/stream/123/my-source/" + ts.validSecret, Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize+1)), ExpectedStatusCode: http.StatusRequestEntityTooLarge, - ExpectedLogs: `{"level":"info","message":"request body size is over the maximum \"8000B\"","error.type":"%s/errors.BodyTooLargeError"}`, + // No expected Server headers, fasthttp internal return + ExpectedLogs: `{"level":"info","message":"request body size is over the maximum \"8000B\"","error.type":"%s/errors.BodyTooLargeError"}`, ExpectedBody: ` { "statusCode": 413, @@ -544,6 +554,9 @@ func testCases(t *testing.T, ts *testState) []TestCase { Query: "verbose=true", Body: strings.NewReader("foo"), ExpectedStatusCode: http.StatusOK, + ExpectedHeaders: map[string]string{ + "Server": "Keboola stream HTTP source", + }, ExpectedBody: ` { "statusCode": 200, From 58ff0a50717d99af737a315231d4018a951a416a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Tue, 3 Sep 2024 09:42:45 +0200 Subject: [PATCH 2/2] fix: Add Server header to all responses --- .../service/stream/source/httpsource/error.go | 2 ++ .../stream/source/httpsource/httpsource.go | 6 +++- .../source/httpsource/httpsource_test.go | 34 +++++++++++-------- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/internal/pkg/service/stream/source/httpsource/error.go b/internal/pkg/service/stream/source/httpsource/error.go index e4970b4cae..95f779f7fb 100644 --- a/internal/pkg/service/stream/source/httpsource/error.go +++ b/internal/pkg/service/stream/source/httpsource/error.go @@ -28,6 +28,8 @@ func newErrorHandler(cfg Config, logger log.Logger) func(c *fasthttp.RequestCtx, // Error handler errorWriter := newErrorWriter(logger) return func(c *fasthttp.RequestCtx, err error) { + c.Response.Header.Set("Server", ServerHeader) + var smallBufferErr *fasthttp.ErrSmallBuffer switch { diff --git a/internal/pkg/service/stream/source/httpsource/httpsource.go b/internal/pkg/service/stream/source/httpsource/httpsource.go index 21c31997d3..47a3901061 100644 --- a/internal/pkg/service/stream/source/httpsource/httpsource.go +++ b/internal/pkg/service/stream/source/httpsource/httpsource.go @@ -26,6 +26,7 @@ import ( ) const ( + ServerHeader = "Keboola/Stream/HTTPSource" gracefulShutdownTimeout = 30 * time.Second ) @@ -53,6 +54,10 @@ func Start(ctx context.Context, d dependencies, cfg Config) error { // Static routes router := routing.New() + router.Use(func(c *routing.Context) error { + c.Response.Header.Set("Server", ServerHeader) + return nil + }) router.NotFound(routing.MethodNotAllowedHandler, func(c *routing.Context) error { errorHandler(c.RequestCtx, svcErrors.NewRouteNotFound(errors.New("not found, please send data using POST /stream///"))) return nil @@ -70,7 +75,6 @@ func Start(ctx context.Context, d dependencies, cfg Config) error { // Route import requests to the dispatcher router.Post("/stream///", func(c *routing.Context) error { - c.Response.Header.Set("Server", "Keboola stream HTTP source") // Get parameters projectIDStr := c.Param("projectID") projectIDInt, err := strconv.Atoi(projectIDStr) diff --git a/internal/pkg/service/stream/source/httpsource/httpsource_test.go b/internal/pkg/service/stream/source/httpsource/httpsource_test.go index 9f4ba98a0d..0bb8de38d3 100644 --- a/internal/pkg/service/stream/source/httpsource/httpsource_test.go +++ b/internal/pkg/service/stream/source/httpsource/httpsource_test.go @@ -23,6 +23,7 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/pipeline" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/source/httpsource" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test/dummy" "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" @@ -166,6 +167,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { Method: http.MethodGet, Path: "/health-check", ExpectedStatusCode: http.StatusOK, + ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader}, ExpectedBody: "OK\n", }, { @@ -173,6 +175,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { Method: http.MethodGet, Path: "/foo", ExpectedStatusCode: http.StatusNotFound, + ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader}, ExpectedLogs: `{"level":"info","message":"not found, please send data using POST /stream///"}`, ExpectedBody: ` { @@ -189,6 +192,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { ExpectedHeaders: map[string]string{ "Allow": "OPTIONS, POST", "Content-Length": "0", + "Server": httpsource.ServerHeader, }, }, { @@ -197,6 +201,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { Path: "/stream/foo/my-source/my-secret", Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize)), ExpectedStatusCode: http.StatusBadRequest, + ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader}, ExpectedBody: ` { "statusCode": 400, @@ -210,6 +215,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { Path: "/stream/1111/my-source/my-secret", Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize)), ExpectedStatusCode: http.StatusNotFound, + ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader}, ExpectedBody: ` { "statusCode": 404, @@ -223,6 +229,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { Path: "/stream/123/my-source-1/" + ts.invalidSecret, Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize)), ExpectedStatusCode: http.StatusNotFound, + ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader}, ExpectedBody: ` { "statusCode": 404, @@ -236,6 +243,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { Path: "/stream/123/my-source-2/" + ts.validSecret, Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize)), ExpectedStatusCode: http.StatusNotFound, + ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader}, ExpectedBody: ` { "statusCode": 404, @@ -257,6 +265,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { ExpectedStatusCode: http.StatusInternalServerError, ExpectedHeaders: map[string]string{ "Content-Type": "application/json", + "Server": httpsource.ServerHeader, }, ExpectedLogs: ` {"level":"error","message":"write record error: cannot open sink pipeline: some open error, next attempt after %s","component":"sink.router"} @@ -318,6 +327,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { ExpectedStatusCode: http.StatusInternalServerError, ExpectedHeaders: map[string]string{ "Content-Type": "application/json", + "Server": httpsource.ServerHeader, }, ExpectedLogs: ` {"level":"error","message":"write record error: some write error","component":"sink.router"} @@ -378,7 +388,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { ExpectedStatusCode: http.StatusAccepted, ExpectedHeaders: map[string]string{ "Content-Type": "text/plain", - "Server": "Keboola stream HTTP source", + "Server": httpsource.ServerHeader, }, ExpectedBody: "OK", }, @@ -396,7 +406,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { ExpectedStatusCode: http.StatusOK, ExpectedHeaders: map[string]string{ "Content-Type": "text/plain", - "Server": "Keboola stream HTTP source", + "Server": httpsource.ServerHeader, }, ExpectedBody: "OK", }, @@ -413,9 +423,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { Query: "verbose=true", Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize)), ExpectedStatusCode: http.StatusAccepted, - ExpectedHeaders: map[string]string{ - "Server": "Keboola stream HTTP source", - }, + ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader}, ExpectedBody: ` { "statusCode": 202, @@ -465,9 +473,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { Query: "verbose=true", Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize)), ExpectedStatusCode: http.StatusOK, - ExpectedHeaders: map[string]string{ - "Server": "Keboola stream HTTP source", - }, + ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader}, ExpectedBody: ` { "statusCode": 200, @@ -510,8 +516,8 @@ func testCases(t *testing.T, ts *testState) []TestCase { Path: "/stream/123/my-source-1/" + ts.validSecret, Headers: map[string]string{"foo": strings.Repeat(".", ts.maxHeaderSize+1)}, ExpectedStatusCode: http.StatusRequestEntityTooLarge, - // No expected Server headers, fasthttp internal return - ExpectedLogs: `{"level":"info","message":"request header size is over the maximum \"2000B\"","error.type":"%s/errors.HeaderTooLargeError"}`, + ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader}, + ExpectedLogs: `{"level":"info","message":"request header size is over the maximum \"2000B\"","error.type":"%s/errors.HeaderTooLargeError"}`, ExpectedBody: ` { "statusCode": 413, @@ -525,8 +531,8 @@ func testCases(t *testing.T, ts *testState) []TestCase { Path: "/stream/123/my-source/" + ts.validSecret, Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize+1)), ExpectedStatusCode: http.StatusRequestEntityTooLarge, - // No expected Server headers, fasthttp internal return - ExpectedLogs: `{"level":"info","message":"request body size is over the maximum \"8000B\"","error.type":"%s/errors.BodyTooLargeError"}`, + ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader}, + ExpectedLogs: `{"level":"info","message":"request body size is over the maximum \"8000B\"","error.type":"%s/errors.BodyTooLargeError"}`, ExpectedBody: ` { "statusCode": 413, @@ -554,9 +560,7 @@ func testCases(t *testing.T, ts *testState) []TestCase { Query: "verbose=true", Body: strings.NewReader("foo"), ExpectedStatusCode: http.StatusOK, - ExpectedHeaders: map[string]string{ - "Server": "Keboola stream HTTP source", - }, + ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader}, ExpectedBody: ` { "statusCode": 200,