From 4b4870c705178b65d2f37395a668cfc7cbf98db4 Mon Sep 17 00:00:00 2001 From: Nithin Puthenveettil Date: Mon, 16 Dec 2024 12:46:47 +0530 Subject: [PATCH] feat: instrumentation of fasthttp client methods (#974) * feat: instrumentation of fasthttp client methods * Delete instrumentation/instafasthttp/coverage.out * code refactoring * code refactoring * code refactoring * code refactoring * added tests * code refactoring * code refactoring * code refactoring * code refactoring * code refactoring * fix in examples * added extra tests * readme * code comments. * code comments * readme * review comments --- example/basic_usage_fasthttp/README.md | 1 + example/basic_usage_fasthttp/go.mod | 2 +- example/basic_usage_fasthttp/main.go | 54 ++ instrumentation/instafasthttp/README.md | 55 +- instrumentation/instafasthttp/client.go | 231 ++++++ instrumentation/instafasthttp/client_test.go | 717 ++++++++++++++++++ .../instafasthttp/instrumentation_helper.go | 176 +++++ instrumentation/instafasthttp/roundtripper.go | 37 + .../instafasthttp/rounttripper_test.go | 312 ++++++++ .../{instrumentation.go => server.go} | 157 +--- ...instrumentation_test.go => server_test.go} | 297 -------- 11 files changed, 1593 insertions(+), 446 deletions(-) create mode 100644 instrumentation/instafasthttp/client.go create mode 100644 instrumentation/instafasthttp/client_test.go create mode 100644 instrumentation/instafasthttp/instrumentation_helper.go create mode 100644 instrumentation/instafasthttp/roundtripper.go create mode 100644 instrumentation/instafasthttp/rounttripper_test.go rename instrumentation/instafasthttp/{instrumentation.go => server.go} (57%) rename instrumentation/instafasthttp/{instrumentation_test.go => server_test.go} (71%) diff --git a/example/basic_usage_fasthttp/README.md b/example/basic_usage_fasthttp/README.md index 123d91387..89d800902 100644 --- a/example/basic_usage_fasthttp/README.md +++ b/example/basic_usage_fasthttp/README.md @@ -21,5 +21,6 @@ The available routes are, - [localhost:8080/round-trip](http://localhost:7070/round-trip) - [localhost:8080/error-handler](http://localhost:7070/error-handler) - [localhost:8080/panic-handler](http://localhost:7070/panic-handler) +- [localhost:8080/client-call-handler](http://localhost:7070/client-call-handler) After issuing a couple of API requests, you will be able to see the call traces in the Instana dashboard. diff --git a/example/basic_usage_fasthttp/go.mod b/example/basic_usage_fasthttp/go.mod index 04086b797..5520f1cbd 100644 --- a/example/basic_usage_fasthttp/go.mod +++ b/example/basic_usage_fasthttp/go.mod @@ -4,7 +4,7 @@ go 1.22 require ( github.com/instana/go-sensor v1.66.0 - github.com/instana/go-sensor/instrumentation/instafasthttp v0.0.0-20241021051914-d1fd3525c5b5 + github.com/instana/go-sensor/instrumentation/instafasthttp v0.1.0 github.com/instana/go-sensor/instrumentation/instagorm v1.15.0 github.com/valyala/fasthttp v1.58.0 gorm.io/driver/postgres v1.5.11 diff --git a/example/basic_usage_fasthttp/main.go b/example/basic_usage_fasthttp/main.go index 4fae28dc2..70aa325ac 100644 --- a/example/basic_usage_fasthttp/main.go +++ b/example/basic_usage_fasthttp/main.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log" + "time" instana "github.com/instana/go-sensor" "github.com/instana/go-sensor/instrumentation/instafasthttp" @@ -68,6 +69,8 @@ func fastHTTPHandler(ctx *fasthttp.RequestCtx) { instafasthttp.TraceHandler(sensor, "panic-handler", "/panic-handler", panicHandler)(ctx) case "/round-trip": instafasthttp.TraceHandler(sensor, "round-trip", "/round-trip", roundTripHandler)(ctx) + case "/client-call-handler": + instafasthttp.TraceHandler(sensor, "client-call-handler", "/client-call-handler", clientCallHandler)(ctx) default: ctx.Error("Unsupported path", fasthttp.StatusNotFound) } @@ -122,6 +125,57 @@ func roundTripHandler(ctx *fasthttp.RequestCtx) { } +func clientCallHandler(ctx *fasthttp.RequestCtx) { + uCtx := instafasthttp.UserContext(ctx) + + url := fasthttp.AcquireURI() + url.Parse(nil, []byte("http://localhost:7070/greet")) + + // You may read the timeouts from some config + readTimeout, _ := time.ParseDuration("500ms") + writeTimeout, _ := time.ParseDuration("500ms") + maxIdleConnDuration, _ := time.ParseDuration("1h") + c := &fasthttp.Client{ + ReadTimeout: readTimeout, + WriteTimeout: writeTimeout, + MaxIdleConnDuration: maxIdleConnDuration, + NoDefaultUserAgentHeader: true, // Don't send: User-Agent: fasthttp + DisableHeaderNamesNormalizing: true, // If you set the case on your headers correctly you can enable this + DisablePathNormalizing: true, + // increase DNS cache time to an hour instead of default minute + Dial: (&fasthttp.TCPDialer{ + Concurrency: 4096, + DNSCacheDuration: time.Hour, + }).Dial, + } + + // create instana instrumented client + ic := instafasthttp.GetInstrumentedClient(sensor, c) + + req := fasthttp.AcquireRequest() + defer fasthttp.ReleaseRequest(req) + req.SetURI(url) + fasthttp.ReleaseURI(url) // now you may release the URI + req.Header.SetMethod(fasthttp.MethodGet) + + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(resp) + + // Make the request + err := ic.Do(uCtx, req, resp) + if err != nil { + log.Fatalf("failed to GET http://localhost:7070/greet: %s", err) + } + + bs := string(resp.Body()) + + fmt.Println(bs) + + ctx.SetStatusCode(fasthttp.StatusOK) + fmt.Fprintf(ctx, bs) + +} + func panicHandler(ctx *fasthttp.RequestCtx) { fmt.Fprintf(ctx, "This is a panic!\n") panic(errors.New("This is a panic!")) diff --git a/instrumentation/instafasthttp/README.md b/instrumentation/instafasthttp/README.md index 886d69973..7958a786b 100644 --- a/instrumentation/instafasthttp/README.md +++ b/instrumentation/instafasthttp/README.md @@ -1,4 +1,4 @@ -Instana instrumentation for fasthttp +instafasthttp - Instana instrumentation for fasthttp ===================================== This package provides Instana instrumentation for the [`fasthttp`](https://pkg.go.dev/github.com/valyala/fasthttp) package. @@ -67,7 +67,7 @@ func greetEndpointHandler(ctx *fasthttp.RequestCtx) { } ``` -### RoundTripper +### HostClient The `instafasthttp.RoundTripper` provides an implementation of the `fasthttp.RoundTripper` interface. It can be used to instrument client calls with the help of `instafasthttp.HostClient`. Refer to the details below for more information. @@ -125,3 +125,54 @@ func fastHTTPHandler(ctx *fasthttp.RequestCtx) { log.Fatal(fasthttp.ListenAndServe(":7070", fastHTTPHandler)) ``` +### Client + +The `client.Do` and related methods can be traced using Instana. However, the usage differs slightly from that of the standard HostClient. Below are the steps to use an Instana instrumented client. + +- To enable tracing, you must create an instrumented client using the `instafasthttp.GetInstrumentedClient` method as shown below: + +```go + // fasthttp client + client := &fasthttp.Client{ + ReadTimeout: readTimeout, + WriteTimeout: writeTimeout, + MaxIdleConnDuration: maxIdleConnDuration, + NoDefaultUserAgentHeader: true, // Don't send: User-Agent: fasthttp + DisableHeaderNamesNormalizing: true, // If you set the case on your headers correctly you can enable this + DisablePathNormalizing: true, + // increase DNS cache time to an hour instead of default minute + Dial: (&fasthttp.TCPDialer{ + Concurrency: 4096, + DNSCacheDuration: time.Hour, + }).Dial, + } + + // create instana instrumented client + ic := instafasthttp.GetInstrumentedClient(sensor, client) +``` +- Use the instrumented client(ic) for all requests instead of the original client. +- Tracing is supported for the following methods, where an additional `context.Context` parameter is required as the first argument. Ensure the context is set properly for span correlation: +1. Do +2. DoTimeout +3. DoDeadline +4. DoRedirects + +```go + req := fasthttp.AcquireRequest() + defer fasthttp.ReleaseRequest(req) + req.SetURI(url) + fasthttp.ReleaseURI(url) // now you may release the URI + req.Header.SetMethod(fasthttp.MethodGet) + + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(resp) + + // Make the request + err := ic.Do(uCtx, req, resp) + if err != nil { + log.Fatalf("failed to GET http://localhost:7070/greet: %s", err) + } +``` + +- For methods other than the four listed above, use the usual method signatures without passing a context. These methods will not support tracing. +- Use the `Unwrap()` method if you require the original fasthttp.Client instance. However, avoid using the unwrapped instance directly for the above four methods, as Instana tracing will not be applied in such cases. diff --git a/instrumentation/instafasthttp/client.go b/instrumentation/instafasthttp/client.go new file mode 100644 index 000000000..132351b69 --- /dev/null +++ b/instrumentation/instafasthttp/client.go @@ -0,0 +1,231 @@ +// (c) Copyright IBM Corp. 2024 + +package instafasthttp + +import ( + "context" + "time" + + instana "github.com/instana/go-sensor" + "github.com/valyala/fasthttp" +) + +// GetInstrumentedClient returns an instrumented instafasthttp.Client instance derived from a *fasthttp.Client instance +func GetInstrumentedClient(sensor instana.TracerLogger, orgClient *fasthttp.Client) Client { + return &instaClient{ + Client: orgClient, + sensor: sensor, + } +} + +// Instrumented fasthttp.Client +// +// Most of the methods are the same as those in fasthttp.Client. +// +// Only Do, DoTimeout, and DoDeadline differ from fasthttp.Client. +// Please use these methods instead of their fasthttp.Client counterparts to enable tracing. +type Client interface { + // The following methods are from the original *fasthttp.Client; there is no need to implement them. + + // Get returns the status code and body of url. + // + // The contents of dst will be replaced by the body and returned, if the dst + // is too small a new slice will be allocated. + // + // The function follows redirects. Use Do* for manually handling redirects. + Get(dst []byte, url string) (statusCode int, body []byte, err error) + + // GetTimeout returns the status code and body of url. + // + // The contents of dst will be replaced by the body and returned, if the dst + // is too small a new slice will be allocated. + // + // The function follows redirects. Use Do* for manually handling redirects. + // + // ErrTimeout error is returned if url contents couldn't be fetched + // during the given timeout. + GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) + + // GetDeadline returns the status code and body of url. + // + // The contents of dst will be replaced by the body and returned, if the dst + // is too small a new slice will be allocated. + // + // The function follows redirects. Use Do* for manually handling redirects. + // + // ErrTimeout error is returned if url contents couldn't be fetched + // until the given deadline. + GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) + + // Post sends POST request to the given url with the given POST arguments. + // + // The contents of dst will be replaced by the body and returned, if the dst + // is too small a new slice will be allocated. + // + // The function follows redirects. Use Do* for manually handling redirects. + // + // Empty POST body is sent if postArgs is nil. + Post(dst []byte, url string, postArgs *fasthttp.Args) (statusCode int, body []byte, err error) + + // CloseIdleConnections closes any connections which were previously + // connected from previous requests but are now sitting idle in a + // "keep-alive" state. It does not interrupt any connections currently + // in use. + CloseIdleConnections() + + // DoTimeout performs the given request and waits for response during + // the given timeout duration. + // + // Request must contain at least non-zero RequestURI with full url (including + // scheme and host) or non-zero Host header + RequestURI. + // + // Client determines the server to be requested in the following order: + // + // - from RequestURI if it contains full url with scheme and host; + // - from Host header otherwise. + // + // The function doesn't follow redirects. Use Get* for following redirects. + // + // Response is ignored if resp is nil. + // + // ErrTimeout is returned if the response wasn't returned during + // the given timeout. + // Immediately returns ErrTimeout if timeout value is negative. + // + // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections + // to the requested host are busy. + // + // It is recommended obtaining req and resp via Acquire + // + // Pass a valid context as the first argument for for span correlation + DoTimeout(ctx context.Context, req *fasthttp.Request, resp *fasthttp.Response, timeout time.Duration) error + + // DoDeadline performs the given request and waits for response until + // the given deadline. + // + // Request must contain at least non-zero RequestURI with full url (including + // scheme and host) or non-zero Host header + RequestURI. + // + // Client determines the server to be requested in the following order: + // + // - from RequestURI if it contains full url with scheme and host; + // - from Host header otherwise. + // + // The function doesn't follow redirects. Use Get* for following redirects. + // + // Response is ignored if resp is nil. + // + // ErrTimeout is returned if the response wasn't returned until + // the given deadline. + // Immediately returns ErrTimeout if the deadline has already been reached. + // + // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections + // to the requested host are busy. + // + // It is recommended obtaining req and resp via AcquireRequest + // and AcquireResponse in performance-critical code. + // + // Pass a valid context as the first argument for for span correlation + DoDeadline(ctx context.Context, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error + + // DoRedirects performs the given http request and fills the given http response, + // following up to maxRedirectsCount redirects. When the redirect count exceeds + // maxRedirectsCount, ErrTooManyRedirects is returned. + // + // Request must contain at least non-zero RequestURI with full url (including + // scheme and host) or non-zero Host header + RequestURI. + // + // Client determines the server to be requested in the following order: + // + // - from RequestURI if it contains full url with scheme and host; + // - from Host header otherwise. + // + // Response is ignored if resp is nil. + // + // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections + // to the requested host are busy. + // + // It is recommended obtaining req and resp via AcquireRequest + // and AcquireResponse in performance-critical code. + // + // Pass a valid context as the first argument for for span correlation + DoRedirects(ctx context.Context, req *fasthttp.Request, resp *fasthttp.Response, maxRedirectsCount int) error + + // Do performs the given http request and fills the given http response. + // + // Request must contain at least non-zero RequestURI with full url (including + // scheme and host) or non-zero Host header + RequestURI. + // + // Client determines the server to be requested in the following order: + // + // - from RequestURI if it contains full url with scheme and host; + // - from Host header otherwise. + // + // Response is ignored if resp is nil. + // + // The function doesn't follow redirects. Use Get* for following redirects. + // + // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections + // to the requested host are busy. + // + // It is recommended obtaining req and resp via AcquireRequest + // and AcquireResponse in performance-critical code. + // + // Pass a valid context as the first argument for for span correlation + Do(ctx context.Context, req *fasthttp.Request, resp *fasthttp.Response) error + + // Unwrap returns the original *fasthttp.Client + Unwrap() *fasthttp.Client +} + +type instaClient struct { + *fasthttp.Client + sensor instana.TracerLogger +} + +func (ic *instaClient) Unwrap() *fasthttp.Client { + return ic.Client +} + +func (ic *instaClient) DoTimeout(ctx context.Context, req *fasthttp.Request, resp *fasthttp.Response, timeout time.Duration) error { + cfp := &clientFuncParams{ + sensor: ic.sensor, + ic: ic, + clientFuncType: doWithTimeoutFunc, + timeout: timeout, + } + _, err := instrumentClient(ctx, req, resp, cfp) + return err +} + +func (ic *instaClient) DoDeadline(ctx context.Context, req *fasthttp.Request, resp *fasthttp.Response, deadline time.Time) error { + cfp := &clientFuncParams{ + sensor: ic.sensor, + ic: ic, + clientFuncType: doWithDeadlineFunc, + deadline: deadline, + } + _, err := instrumentClient(ctx, req, resp, cfp) + return err +} + +func (ic *instaClient) DoRedirects(ctx context.Context, req *fasthttp.Request, resp *fasthttp.Response, maxRedirectsCount int) error { + cfp := &clientFuncParams{ + sensor: ic.sensor, + ic: ic, + clientFuncType: doWithRedirectsFunc, + maxRedirectsCount: maxRedirectsCount, + } + _, err := instrumentClient(ctx, req, resp, cfp) + return err +} + +func (ic *instaClient) Do(ctx context.Context, req *fasthttp.Request, resp *fasthttp.Response) error { + cfp := &clientFuncParams{ + sensor: ic.sensor, + ic: ic, + clientFuncType: doFunc, + } + _, err := instrumentClient(ctx, req, resp, cfp) + return err +} diff --git a/instrumentation/instafasthttp/client_test.go b/instrumentation/instafasthttp/client_test.go new file mode 100644 index 000000000..43b9e0cbe --- /dev/null +++ b/instrumentation/instafasthttp/client_test.go @@ -0,0 +1,717 @@ +// (c) Copyright IBM Corp. 2024 + +package instafasthttp_test + +import ( + "context" + "net" + "testing" + "time" + + instana "github.com/instana/go-sensor" + "github.com/instana/go-sensor/instrumentation/instafasthttp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/valyala/fasthttp" + "github.com/valyala/fasthttp/fasthttputil" +) + +func TestClient_Do(t *testing.T) { + recorder := instana.NewTestRecorder() + opts := &instana.Options{ + Service: "test-service", + Tracer: instana.TracerOptions{ + CollectableHTTPHeaders: []string{"x-custom-header-1", "x-custom-header-2"}, + }, + AgentClient: alwaysReadyClient{}, + } + tracer := instana.NewTracerWithEverything(opts, recorder) + s := instana.NewSensorWithTracer(tracer) + + parentSpan := tracer.StartSpan("parent") + ctx := instana.ContextWithSpan(context.Background(), parentSpan) + + var fieldTFrmHeader, fieldSFrmHeader string + + server := &fasthttp.Server{ + Handler: func(ctx *fasthttp.RequestCtx) { + // get the header span and trace id from request header + fieldTFrmHeader = string(ctx.Request.Header.Peek(instana.FieldT)) + fieldSFrmHeader = string(ctx.Request.Header.Peek(instana.FieldS)) + ctx.Response.Header.Add("X-Response", "true") + ctx.Response.Header.Add("X-Custom-Header-2", "response") + ctx.Success("aaa/bbb", []byte("Ok response!")) + }, + } + + ln := fasthttputil.NewInmemoryListener() + + go func() { + if err := server.Serve(ln); err != nil { + t.Errorf("unexpected error: %v", err) + } + }() + + c := &fasthttp.Client{ + Dial: func(addr string) (net.Conn, error) { return ln.Dial() }, + } + ic := instafasthttp.GetInstrumentedClient(s, c) + + r := &fasthttp.Request{} + r.Header.SetMethod(fasthttp.MethodGet) + r.Header.Set("X-Custom-Header-1", "request") + r.Header.Set("Authorization", "Basic blah") + r.URI().SetPath("/hello") + r.URI().SetQueryString("q=term&sensitive_key=s3cr3t&myPassword=qwerty&SECRET_VALUE=1") + r.URI().SetHost("example.com") + + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(resp) + + // Make the request + err := ic.Do(ctx, r, resp) + + require.NoError(t, err) + + parentSpan.Finish() + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + cSpan, pSpan := spans[0], spans[1] + assert.Equal(t, 0, cSpan.Ec) + assert.EqualValues(t, instana.ExitSpanKind, cSpan.Kind) + + assert.Equal(t, pSpan.TraceID, cSpan.TraceID) + assert.Equal(t, pSpan.SpanID, cSpan.ParentID) + + assert.Equal(t, instana.FormatID(cSpan.TraceID), fieldTFrmHeader) + assert.Equal(t, instana.FormatID(cSpan.SpanID), fieldSFrmHeader) + + require.IsType(t, instana.HTTPSpanData{}, cSpan.Data) + data := cSpan.Data.(instana.HTTPSpanData) + + assert.Equal(t, instana.HTTPSpanTags{ + Method: "GET", + Status: fasthttp.StatusOK, + URL: "http://example.com/hello", + Params: "SECRET_VALUE=%3Credacted%3E&myPassword=%3Credacted%3E&q=term&sensitive_key=%3Credacted%3E", + Headers: map[string]string{ + "x-custom-header-1": "request", + "x-custom-header-2": "response", + }, + }, data.Tags) + + if err := ln.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestClient_Do_Error(t *testing.T) { + + recorder := instana.NewTestRecorder() + s := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{AgentClient: alwaysReadyClient{}}, recorder)) + + parentSpan := s.Tracer().StartSpan("parent") + ctx := instana.ContextWithSpan(context.Background(), parentSpan) + + server := &fasthttp.Server{ + Handler: func(ctx *fasthttp.RequestCtx) { + ctx.Success("aaa/bbb", []byte("Ok response!")) + }, + } + + ln := fasthttputil.NewInmemoryListener() + + go func() { + if err := server.Serve(ln); err != nil { + t.Errorf("unexpected error: %v", err) + } + }() + + ln.Close() + + c := &fasthttp.Client{ + Dial: func(addr string) (net.Conn, error) { return ln.Dial() }, + } + ic := instafasthttp.GetInstrumentedClient(s, c) + + r := &fasthttp.Request{} + r.Header.SetMethod(fasthttp.MethodGet) + r.Header.Set("Authorization", "Basic blah") + r.URI().SetPath("/hello") + r.URI().SetQueryString("q=term&key=s3cr3t") + r.URI().SetHost("example.com") + + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(resp) + + // Make the request + err := ic.Do(ctx, r, resp) + + assert.Error(t, err) + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + span, logSpan := spans[0], spans[1] + assert.Equal(t, 1, span.Ec) + assert.EqualValues(t, instana.ExitSpanKind, span.Kind) + + require.IsType(t, instana.HTTPSpanData{}, span.Data) + data := span.Data.(instana.HTTPSpanData) + + assert.Equal(t, instana.HTTPSpanTags{ + Method: "GET", + URL: "http://example.com/hello", + Params: "key=%3Credacted%3E&q=term", + Error: "InmemoryListener is already closed: use of closed network connection", + }, data.Tags) + + assert.Equal(t, span.TraceID, logSpan.TraceID) + assert.Equal(t, span.SpanID, logSpan.ParentID) + assert.Equal(t, "log.go", logSpan.Name) + + // assert that log message has been recorded within the span interval + assert.GreaterOrEqual(t, logSpan.Timestamp, span.Timestamp) + assert.LessOrEqual(t, logSpan.Duration, span.Duration) + + require.IsType(t, instana.LogSpanData{}, logSpan.Data) + logData := logSpan.Data.(instana.LogSpanData) + + assert.Equal(t, instana.LogSpanTags{ + Level: "ERROR", + Message: `error.object: "InmemoryListener is already closed: use of closed network connection"`, + }, logData.Tags) +} + +func TestClient_DoTimeout(t *testing.T) { + recorder := instana.NewTestRecorder() + opts := &instana.Options{ + Service: "test-service", + Tracer: instana.TracerOptions{ + CollectableHTTPHeaders: []string{"x-custom-header-1", "x-custom-header-2"}, + }, + AgentClient: alwaysReadyClient{}, + } + tracer := instana.NewTracerWithEverything(opts, recorder) + s := instana.NewSensorWithTracer(tracer) + + parentSpan := tracer.StartSpan("parent") + ctx := instana.ContextWithSpan(context.Background(), parentSpan) + + var fieldTFrmHeader, fieldSFrmHeader string + + server := &fasthttp.Server{ + Handler: func(ctx *fasthttp.RequestCtx) { + // get the header span and trace id from request header + fieldTFrmHeader = string(ctx.Request.Header.Peek(instana.FieldT)) + fieldSFrmHeader = string(ctx.Request.Header.Peek(instana.FieldS)) + ctx.Response.Header.Add("X-Response", "true") + ctx.Response.Header.Add("X-Custom-Header-2", "response") + ctx.Success("aaa/bbb", []byte("Ok response!")) + }, + } + + ln := fasthttputil.NewInmemoryListener() + + go func() { + if err := server.Serve(ln); err != nil { + t.Errorf("unexpected error: %v", err) + } + }() + + c := &fasthttp.Client{ + Dial: func(addr string) (net.Conn, error) { return ln.Dial() }, + } + ic := instafasthttp.GetInstrumentedClient(s, c) + + r := &fasthttp.Request{} + r.Header.SetMethod(fasthttp.MethodGet) + r.Header.Set("X-Custom-Header-1", "request") + r.Header.Set("Authorization", "Basic blah") + r.URI().SetPath("/hello") + r.URI().SetQueryString("q=term&sensitive_key=s3cr3t&myPassword=qwerty&SECRET_VALUE=1") + r.URI().SetHost("example.com") + + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(resp) + + // Make the request + err := ic.DoTimeout(ctx, r, resp, time.Minute*10) + + require.NoError(t, err) + + parentSpan.Finish() + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + cSpan, pSpan := spans[0], spans[1] + assert.Equal(t, 0, cSpan.Ec) + assert.EqualValues(t, instana.ExitSpanKind, cSpan.Kind) + + assert.Equal(t, pSpan.TraceID, cSpan.TraceID) + assert.Equal(t, pSpan.SpanID, cSpan.ParentID) + + assert.Equal(t, instana.FormatID(cSpan.TraceID), fieldTFrmHeader) + assert.Equal(t, instana.FormatID(cSpan.SpanID), fieldSFrmHeader) + + require.IsType(t, instana.HTTPSpanData{}, cSpan.Data) + data := cSpan.Data.(instana.HTTPSpanData) + + assert.Equal(t, instana.HTTPSpanTags{ + Method: "GET", + Status: fasthttp.StatusOK, + URL: "http://example.com/hello", + Params: "SECRET_VALUE=%3Credacted%3E&myPassword=%3Credacted%3E&q=term&sensitive_key=%3Credacted%3E", + Headers: map[string]string{ + "x-custom-header-1": "request", + "x-custom-header-2": "response", + }, + }, data.Tags) + + if err := ln.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestClient_DoTimeout_Error(t *testing.T) { + + recorder := instana.NewTestRecorder() + s := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{AgentClient: alwaysReadyClient{}}, recorder)) + + parentSpan := s.Tracer().StartSpan("parent") + ctx := instana.ContextWithSpan(context.Background(), parentSpan) + + server := &fasthttp.Server{ + Handler: func(ctx *fasthttp.RequestCtx) { + ctx.Success("aaa/bbb", []byte("Ok response!")) + }, + } + + ln := fasthttputil.NewInmemoryListener() + + go func() { + if err := server.Serve(ln); err != nil { + t.Errorf("unexpected error: %v", err) + } + }() + + ln.Close() + + c := &fasthttp.Client{ + Dial: func(addr string) (net.Conn, error) { return ln.Dial() }, + } + ic := instafasthttp.GetInstrumentedClient(s, c) + + r := &fasthttp.Request{} + r.Header.SetMethod(fasthttp.MethodGet) + r.Header.Set("Authorization", "Basic blah") + r.URI().SetPath("/hello") + r.URI().SetQueryString("q=term&key=s3cr3t") + r.URI().SetHost("example.com") + + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(resp) + + // Make the request + err := ic.DoTimeout(ctx, r, resp, time.Minute*10) + + assert.Error(t, err) + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + span, logSpan := spans[0], spans[1] + assert.Equal(t, 1, span.Ec) + assert.EqualValues(t, instana.ExitSpanKind, span.Kind) + + require.IsType(t, instana.HTTPSpanData{}, span.Data) + data := span.Data.(instana.HTTPSpanData) + + assert.Equal(t, instana.HTTPSpanTags{ + Method: "GET", + URL: "http://example.com/hello", + Params: "key=%3Credacted%3E&q=term", + Error: "InmemoryListener is already closed: use of closed network connection", + }, data.Tags) + + assert.Equal(t, span.TraceID, logSpan.TraceID) + assert.Equal(t, span.SpanID, logSpan.ParentID) + assert.Equal(t, "log.go", logSpan.Name) + + // assert that log message has been recorded within the span interval + assert.GreaterOrEqual(t, logSpan.Timestamp, span.Timestamp) + assert.LessOrEqual(t, logSpan.Duration, span.Duration) + + require.IsType(t, instana.LogSpanData{}, logSpan.Data) + logData := logSpan.Data.(instana.LogSpanData) + + assert.Equal(t, instana.LogSpanTags{ + Level: "ERROR", + Message: `error.object: "InmemoryListener is already closed: use of closed network connection"`, + }, logData.Tags) +} + +func TestClient_DoDeadline(t *testing.T) { + recorder := instana.NewTestRecorder() + opts := &instana.Options{ + Service: "test-service", + Tracer: instana.TracerOptions{ + CollectableHTTPHeaders: []string{"x-custom-header-1", "x-custom-header-2"}, + }, + AgentClient: alwaysReadyClient{}, + } + tracer := instana.NewTracerWithEverything(opts, recorder) + s := instana.NewSensorWithTracer(tracer) + + parentSpan := tracer.StartSpan("parent") + ctx := instana.ContextWithSpan(context.Background(), parentSpan) + + var fieldTFrmHeader, fieldSFrmHeader string + + server := &fasthttp.Server{ + Handler: func(ctx *fasthttp.RequestCtx) { + // get the header span and trace id from request header + fieldTFrmHeader = string(ctx.Request.Header.Peek(instana.FieldT)) + fieldSFrmHeader = string(ctx.Request.Header.Peek(instana.FieldS)) + ctx.Response.Header.Add("X-Response", "true") + ctx.Response.Header.Add("X-Custom-Header-2", "response") + ctx.Success("aaa/bbb", []byte("Ok response!")) + }, + } + + ln := fasthttputil.NewInmemoryListener() + + go func() { + if err := server.Serve(ln); err != nil { + t.Errorf("unexpected error: %v", err) + } + }() + + c := &fasthttp.Client{ + Dial: func(addr string) (net.Conn, error) { return ln.Dial() }, + } + ic := instafasthttp.GetInstrumentedClient(s, c) + + r := &fasthttp.Request{} + r.Header.SetMethod(fasthttp.MethodGet) + r.Header.Set("X-Custom-Header-1", "request") + r.Header.Set("Authorization", "Basic blah") + r.URI().SetPath("/hello") + r.URI().SetQueryString("q=term&sensitive_key=s3cr3t&myPassword=qwerty&SECRET_VALUE=1") + r.URI().SetHost("example.com") + + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(resp) + + // Make the request + err := ic.DoDeadline(ctx, r, resp, time.Now().Add(time.Minute*10)) + + require.NoError(t, err) + + parentSpan.Finish() + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + cSpan, pSpan := spans[0], spans[1] + assert.Equal(t, 0, cSpan.Ec) + assert.EqualValues(t, instana.ExitSpanKind, cSpan.Kind) + + assert.Equal(t, pSpan.TraceID, cSpan.TraceID) + assert.Equal(t, pSpan.SpanID, cSpan.ParentID) + + assert.Equal(t, instana.FormatID(cSpan.TraceID), fieldTFrmHeader) + assert.Equal(t, instana.FormatID(cSpan.SpanID), fieldSFrmHeader) + + require.IsType(t, instana.HTTPSpanData{}, cSpan.Data) + data := cSpan.Data.(instana.HTTPSpanData) + + assert.Equal(t, instana.HTTPSpanTags{ + Method: "GET", + Status: fasthttp.StatusOK, + URL: "http://example.com/hello", + Params: "SECRET_VALUE=%3Credacted%3E&myPassword=%3Credacted%3E&q=term&sensitive_key=%3Credacted%3E", + Headers: map[string]string{ + "x-custom-header-1": "request", + "x-custom-header-2": "response", + }, + }, data.Tags) + + if err := ln.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestClient_DoDeadline_Error(t *testing.T) { + + recorder := instana.NewTestRecorder() + s := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{AgentClient: alwaysReadyClient{}}, recorder)) + + parentSpan := s.Tracer().StartSpan("parent") + ctx := instana.ContextWithSpan(context.Background(), parentSpan) + + server := &fasthttp.Server{ + Handler: func(ctx *fasthttp.RequestCtx) { + ctx.Success("aaa/bbb", []byte("Ok response!")) + }, + } + + ln := fasthttputil.NewInmemoryListener() + + go func() { + if err := server.Serve(ln); err != nil { + t.Errorf("unexpected error: %v", err) + } + }() + + ln.Close() + + c := &fasthttp.Client{ + Dial: func(addr string) (net.Conn, error) { return ln.Dial() }, + } + ic := instafasthttp.GetInstrumentedClient(s, c) + + r := &fasthttp.Request{} + r.Header.SetMethod(fasthttp.MethodGet) + r.Header.Set("Authorization", "Basic blah") + r.URI().SetPath("/hello") + r.URI().SetQueryString("q=term&key=s3cr3t") + r.URI().SetHost("example.com") + + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(resp) + + // Make the request + err := ic.DoDeadline(ctx, r, resp, time.Now().Add(time.Minute*10)) + + assert.Error(t, err) + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + span, logSpan := spans[0], spans[1] + assert.Equal(t, 1, span.Ec) + assert.EqualValues(t, instana.ExitSpanKind, span.Kind) + + require.IsType(t, instana.HTTPSpanData{}, span.Data) + data := span.Data.(instana.HTTPSpanData) + + assert.Equal(t, instana.HTTPSpanTags{ + Method: "GET", + URL: "http://example.com/hello", + Params: "key=%3Credacted%3E&q=term", + Error: "InmemoryListener is already closed: use of closed network connection", + }, data.Tags) + + assert.Equal(t, span.TraceID, logSpan.TraceID) + assert.Equal(t, span.SpanID, logSpan.ParentID) + assert.Equal(t, "log.go", logSpan.Name) + + // assert that log message has been recorded within the span interval + assert.GreaterOrEqual(t, logSpan.Timestamp, span.Timestamp) + assert.LessOrEqual(t, logSpan.Duration, span.Duration) + + require.IsType(t, instana.LogSpanData{}, logSpan.Data) + logData := logSpan.Data.(instana.LogSpanData) + + assert.Equal(t, instana.LogSpanTags{ + Level: "ERROR", + Message: `error.object: "InmemoryListener is already closed: use of closed network connection"`, + }, logData.Tags) +} + +func TestClient_DoRedirects(t *testing.T) { + recorder := instana.NewTestRecorder() + opts := &instana.Options{ + Service: "test-service", + Tracer: instana.TracerOptions{ + CollectableHTTPHeaders: []string{"x-custom-header-1", "x-custom-header-2"}, + }, + AgentClient: alwaysReadyClient{}, + } + tracer := instana.NewTracerWithEverything(opts, recorder) + s := instana.NewSensorWithTracer(tracer) + + parentSpan := tracer.StartSpan("parent") + ctx := instana.ContextWithSpan(context.Background(), parentSpan) + + var fieldTFrmHeader, fieldSFrmHeader string + + server := &fasthttp.Server{ + Handler: func(ctx *fasthttp.RequestCtx) { + // get the header span and trace id from request header + fieldTFrmHeader = string(ctx.Request.Header.Peek(instana.FieldT)) + fieldSFrmHeader = string(ctx.Request.Header.Peek(instana.FieldS)) + ctx.Response.Header.Add("X-Response", "true") + ctx.Response.Header.Add("X-Custom-Header-2", "response") + ctx.Success("aaa/bbb", []byte("Ok response!")) + }, + } + + ln := fasthttputil.NewInmemoryListener() + + go func() { + if err := server.Serve(ln); err != nil { + t.Errorf("unexpected error: %v", err) + } + }() + + c := &fasthttp.Client{ + Dial: func(addr string) (net.Conn, error) { return ln.Dial() }, + } + ic := instafasthttp.GetInstrumentedClient(s, c) + + r := &fasthttp.Request{} + r.Header.SetMethod(fasthttp.MethodGet) + r.Header.Set("X-Custom-Header-1", "request") + r.Header.Set("Authorization", "Basic blah") + r.URI().SetPath("/hello") + r.URI().SetQueryString("q=term&sensitive_key=s3cr3t&myPassword=qwerty&SECRET_VALUE=1") + r.URI().SetHost("example.com") + + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(resp) + + // Make the request + err := ic.DoRedirects(ctx, r, resp, 2) + + require.NoError(t, err) + + parentSpan.Finish() + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + cSpan, pSpan := spans[0], spans[1] + assert.Equal(t, 0, cSpan.Ec) + assert.EqualValues(t, instana.ExitSpanKind, cSpan.Kind) + + assert.Equal(t, pSpan.TraceID, cSpan.TraceID) + assert.Equal(t, pSpan.SpanID, cSpan.ParentID) + + assert.Equal(t, instana.FormatID(cSpan.TraceID), fieldTFrmHeader) + assert.Equal(t, instana.FormatID(cSpan.SpanID), fieldSFrmHeader) + + require.IsType(t, instana.HTTPSpanData{}, cSpan.Data) + data := cSpan.Data.(instana.HTTPSpanData) + + assert.Equal(t, instana.HTTPSpanTags{ + Method: "GET", + Status: fasthttp.StatusOK, + URL: "http://example.com/hello", + Params: "SECRET_VALUE=%3Credacted%3E&myPassword=%3Credacted%3E&q=term&sensitive_key=%3Credacted%3E", + Headers: map[string]string{ + "x-custom-header-1": "request", + "x-custom-header-2": "response", + }, + }, data.Tags) + + if err := ln.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestClient_DoRedirects_Error(t *testing.T) { + + recorder := instana.NewTestRecorder() + s := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{AgentClient: alwaysReadyClient{}}, recorder)) + + parentSpan := s.Tracer().StartSpan("parent") + ctx := instana.ContextWithSpan(context.Background(), parentSpan) + + server := &fasthttp.Server{ + Handler: func(ctx *fasthttp.RequestCtx) { + ctx.Success("aaa/bbb", []byte("Ok response!")) + }, + } + + ln := fasthttputil.NewInmemoryListener() + + go func() { + if err := server.Serve(ln); err != nil { + t.Errorf("unexpected error: %v", err) + } + }() + + ln.Close() + + c := &fasthttp.Client{ + Dial: func(addr string) (net.Conn, error) { return ln.Dial() }, + } + ic := instafasthttp.GetInstrumentedClient(s, c) + + r := &fasthttp.Request{} + r.Header.SetMethod(fasthttp.MethodGet) + r.Header.Set("Authorization", "Basic blah") + r.URI().SetPath("/hello") + r.URI().SetQueryString("q=term&key=s3cr3t") + r.URI().SetHost("example.com") + + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(resp) + + // Make the request + err := ic.DoRedirects(ctx, r, resp, 2) + + assert.Error(t, err) + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + span, logSpan := spans[0], spans[1] + assert.Equal(t, 1, span.Ec) + assert.EqualValues(t, instana.ExitSpanKind, span.Kind) + + require.IsType(t, instana.HTTPSpanData{}, span.Data) + data := span.Data.(instana.HTTPSpanData) + + assert.Equal(t, instana.HTTPSpanTags{ + Method: "GET", + URL: "http://example.com/hello", + Params: "key=%3Credacted%3E&q=term", + Error: "InmemoryListener is already closed: use of closed network connection", + }, data.Tags) + + assert.Equal(t, span.TraceID, logSpan.TraceID) + assert.Equal(t, span.SpanID, logSpan.ParentID) + assert.Equal(t, "log.go", logSpan.Name) + + // assert that log message has been recorded within the span interval + assert.GreaterOrEqual(t, logSpan.Timestamp, span.Timestamp) + assert.LessOrEqual(t, logSpan.Duration, span.Duration) + + require.IsType(t, instana.LogSpanData{}, logSpan.Data) + logData := logSpan.Data.(instana.LogSpanData) + + assert.Equal(t, instana.LogSpanTags{ + Level: "ERROR", + Message: `error.object: "InmemoryListener is already closed: use of closed network connection"`, + }, logData.Tags) +} + +func Test_Client_Unwrap(t *testing.T) { + recorder := instana.NewTestRecorder() + opts := &instana.Options{ + Service: "test-service", + Tracer: instana.TracerOptions{ + CollectableHTTPHeaders: []string{"x-custom-header-1", "x-custom-header-2"}, + }, + AgentClient: alwaysReadyClient{}, + } + tracer := instana.NewTracerWithEverything(opts, recorder) + s := instana.NewSensorWithTracer(tracer) + + ln := fasthttputil.NewInmemoryListener() + c := &fasthttp.Client{ + Dial: func(addr string) (net.Conn, error) { return ln.Dial() }, + } + ic := instafasthttp.GetInstrumentedClient(s, c) + + org := ic.Unwrap() + + assert.IsType(t, &fasthttp.Client{}, org) + assert.NotNil(t, org) +} diff --git a/instrumentation/instafasthttp/instrumentation_helper.go b/instrumentation/instafasthttp/instrumentation_helper.go new file mode 100644 index 000000000..0cb6d83bf --- /dev/null +++ b/instrumentation/instafasthttp/instrumentation_helper.go @@ -0,0 +1,176 @@ +// (c) Copyright IBM Corp. 2024 + +package instafasthttp + +import ( + "context" + "net/http" + "net/url" + "strings" + "time" + + instana "github.com/instana/go-sensor" + ot "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + otlog "github.com/opentracing/opentracing-go/log" + "github.com/valyala/fasthttp" +) + +type clientFuncType int + +const ( + doFunc clientFuncType = iota + doWithTimeoutFunc + doWithDeadlineFunc + doWithRedirectsFunc + + doRoundTripFunc +) + +type clientFuncParams struct { + sensor instana.TracerLogger + + hc *fasthttp.HostClient + rt fasthttp.RoundTripper + + ic *instaClient + clientFuncType clientFuncType + timeout time.Duration + deadline time.Time + maxRedirectsCount int +} + +func instrumentClient(ctx context.Context, req *fasthttp.Request, resp *fasthttp.Response, cfp *clientFuncParams) (bool, error) { + sanitizedURL := new(fasthttp.URI) + req.URI().CopyTo(sanitizedURL) + sanitizedURL.SetUsername("") + sanitizedURL.SetPassword("") + sanitizedURL.SetQueryString("") + + opts := []ot.StartSpanOption{ + ext.SpanKindRPCClient, + ot.Tags{ + "http.url": sanitizedURL.String(), + "http.method": req.Header.Method(), + }, + } + + tracer := cfp.sensor.Tracer() + parentSpan, ok := instana.SpanFromContext(ctx) + if ok { + tracer = parentSpan.Tracer() + opts = append(opts, ot.ChildOf(parentSpan.Context())) + } + + span := tracer.StartSpan("http", opts...) + defer span.Finish() + + // clone the request since the RoundTrip should not modify the original one + reqClone := &fasthttp.Request{} + req.CopyTo(reqClone) + + // Inject the span details to the headers + h := make(ot.HTTPHeadersCarrier) + tracer.Inject(span.Context(), ot.HTTPHeaders, h) + for k, v := range h { + reqClone.Header.Del(k) + reqClone.Header.Set(k, strings.Join(v, ",")) + } + + var params url.Values + + var collectableHTTPHeaders []string + if t, ok := tracer.(instana.Tracer); ok { + opts := t.Options() + params = collectHTTPParamsFastHttp(req, opts.Secrets) + collectableHTTPHeaders = opts.CollectableHTTPHeaders + } + + collectedHeaders := make(map[string]string, len(collectableHTTPHeaders)) + + // ensure collected headers/params are sent in case of panic/error + defer setHeadersAndParamsToSpan(span, collectedHeaders, params) + + reqHeaders := collectAllHeaders(&req.Header) + collectHeadersFastHTTP(reqHeaders, collectableHTTPHeaders, collectedHeaders) + + var err error + var retry bool + + switch cfp.clientFuncType { + case doWithRedirectsFunc: + err = cfp.ic.Client.DoRedirects(reqClone, resp, cfp.maxRedirectsCount) + case doWithDeadlineFunc: + err = cfp.ic.Client.DoDeadline(reqClone, resp, cfp.deadline) + case doWithTimeoutFunc: + err = cfp.ic.Client.DoTimeout(reqClone, resp, cfp.timeout) + case doFunc: + err = cfp.ic.Client.Do(reqClone, resp) + case doRoundTripFunc: + retry, err = cfp.rt.RoundTrip(cfp.hc, reqClone, resp) + } + + if err != nil { + span.SetTag("http.error", err.Error()) + span.LogFields(otlog.Error(err)) + return retry, err + } + + resHeaders := collectAllHeaders(&resp.Header) + collectHeadersFastHTTP(resHeaders, collectableHTTPHeaders, collectedHeaders) + + span.SetTag(string(ext.HTTPStatusCode), resp.StatusCode()) + + return retry, err +} + +// interface for req and res headers +// used to collect headers +type headerVisiter interface { + VisitAll(f func(key, value []byte)) +} + +func collectAllHeaders(header headerVisiter) http.Header { + headers := make(http.Header, 0) + + header.VisitAll(func(key, value []byte) { + headerKey := make([]byte, len(key)) + copy(headerKey, key) + + headerVal := make([]byte, len(value)) + copy(headerVal, value) + + headers.Add(string(headerKey), string(headerVal)) + }) + + return headers +} + +func collectHeadersFastHTTP(headers http.Header, collectableHTTPHeaders []string, collectedHeaders map[string]string) { + for _, h := range collectableHTTPHeaders { + if v := headers.Get(h); v != "" { + collectedHeaders[h] = v + } + } +} + +func collectHTTPParamsFastHttp(req *fasthttp.Request, matcher instana.Matcher) url.Values { + params, _ := url.ParseQuery(string(req.URI().QueryString())) + + for k := range params { + if matcher.Match(k) { + params[k] = []string{""} + } + } + + return params +} + +func setHeadersAndParamsToSpan(span ot.Span, headers map[string]string, params url.Values) { + if len(headers) > 0 { + span.SetTag("http.header", headers) + } + if len(params) > 0 { + span.SetTag("http.params", params.Encode()) + } +} diff --git a/instrumentation/instafasthttp/roundtripper.go b/instrumentation/instafasthttp/roundtripper.go new file mode 100644 index 000000000..8a4b698da --- /dev/null +++ b/instrumentation/instafasthttp/roundtripper.go @@ -0,0 +1,37 @@ +// (c) Copyright IBM Corp. 2024 + +package instafasthttp + +import ( + "context" + + instana "github.com/instana/go-sensor" + "github.com/valyala/fasthttp" +) + +type tracingRoundTripper func(*fasthttp.HostClient, *fasthttp.Request, *fasthttp.Response) (bool, error) + +func (rt tracingRoundTripper) RoundTrip(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response) (retry bool, err error) { + return rt(hc, req, resp) +} + +// RoundTripper wraps an existing fasthttp.RoundTripper and injects the tracing headers into the outgoing request. +// If the original RoundTripper is nil, the fasthttp.DefaultTransport will be used. +func RoundTripper(ctx context.Context, sensor instana.TracerLogger, original fasthttp.RoundTripper) fasthttp.RoundTripper { + if ctx == nil { + ctx = context.Background() + } + if original == nil { + original = fasthttp.DefaultTransport + } + return tracingRoundTripper(func(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response) (bool, error) { + cfp := &clientFuncParams{ + sensor: sensor, + hc: hc, + rt: original, + clientFuncType: doRoundTripFunc, + } + retry, err := instrumentClient(ctx, req, resp, cfp) + return retry, err + }) +} diff --git a/instrumentation/instafasthttp/rounttripper_test.go b/instrumentation/instafasthttp/rounttripper_test.go new file mode 100644 index 000000000..b8196c6f3 --- /dev/null +++ b/instrumentation/instafasthttp/rounttripper_test.go @@ -0,0 +1,312 @@ +// (c) Copyright IBM Corp. 2024 + +package instafasthttp_test + +import ( + "bufio" + "context" + "errors" + "net" + "testing" + + instana "github.com/instana/go-sensor" + "github.com/instana/go-sensor/acceptor" + "github.com/instana/go-sensor/autoprofile" + "github.com/instana/go-sensor/instrumentation/instafasthttp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/valyala/fasthttp" + "github.com/valyala/fasthttp/fasthttputil" +) + +func TestRoundTripper(t *testing.T) { + recorder := instana.NewTestRecorder() + opts := &instana.Options{ + Service: "test-service", + Tracer: instana.TracerOptions{ + CollectableHTTPHeaders: []string{"x-custom-header-1", "x-custom-header-2"}, + }, + AgentClient: alwaysReadyClient{}, + } + tracer := instana.NewTracerWithEverything(opts, recorder) + s := instana.NewSensorWithTracer(tracer) + + parentSpan := tracer.StartSpan("parent") + ctx := instana.ContextWithSpan(context.Background(), parentSpan) + + server := &fasthttp.Server{ + Handler: func(ctx *fasthttp.RequestCtx) { + ctx.Response.Header.Add("X-Response", "true") + ctx.Response.Header.Add("X-Custom-Header-2", "response") + ctx.Success("aaa/bbb", []byte("Ok response!")) + }, + } + + ln := fasthttputil.NewInmemoryListener() + + go func() { + if err := server.Serve(ln); err != nil { + t.Errorf("unexpected error: %v", err) + } + }() + + testT := func() fasthttp.RoundTripper { + c, _ := ln.Dial() + br := bufio.NewReader(c) + bw := bufio.NewWriter(c) + return &transportTest{br: br, bw: bw} + }() + + hc := &fasthttp.HostClient{ + Transport: instafasthttp.RoundTripper(ctx, s, testT), + Addr: "example.com", + } + + r := &fasthttp.Request{} + r.Header.SetMethod(fasthttp.MethodGet) + r.Header.Set("X-Custom-Header-1", "request") + r.Header.Set("Authorization", "Basic blah") + r.URI().SetPath("/hello") + r.URI().SetQueryString("q=term&sensitive_key=s3cr3t&myPassword=qwerty&SECRET_VALUE=1") + r.URI().SetHost("example.com") + + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(resp) + + // Make the request + err := hc.Do(r, resp) + + require.NoError(t, err) + + parentSpan.Finish() + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + cSpan, pSpan := spans[0], spans[1] + assert.Equal(t, 0, cSpan.Ec) + assert.EqualValues(t, instana.ExitSpanKind, cSpan.Kind) + + assert.Equal(t, pSpan.TraceID, cSpan.TraceID) + assert.Equal(t, pSpan.SpanID, cSpan.ParentID) + + assert.Equal(t, instana.FormatID(cSpan.TraceID), testT.(*transportTest).traceIDHeader) + assert.Equal(t, instana.FormatID(cSpan.SpanID), testT.(*transportTest).spanIDHeader) + + require.IsType(t, instana.HTTPSpanData{}, cSpan.Data) + data := cSpan.Data.(instana.HTTPSpanData) + + assert.Equal(t, instana.HTTPSpanTags{ + Method: "GET", + Status: fasthttp.StatusOK, + URL: "http://example.com/hello", + Params: "SECRET_VALUE=%3Credacted%3E&myPassword=%3Credacted%3E&q=term&sensitive_key=%3Credacted%3E", + Headers: map[string]string{ + "x-custom-header-1": "request", + "x-custom-header-2": "response", + }, + }, data.Tags) + + if err := ln.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestRoundTripper_Error(t *testing.T) { + + recorder := instana.NewTestRecorder() + s := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{AgentClient: alwaysReadyClient{}}, recorder)) + + parentSpan := s.Tracer().StartSpan("parent") + ctx := instana.ContextWithSpan(context.Background(), parentSpan) + + server := &fasthttp.Server{ + Handler: func(ctx *fasthttp.RequestCtx) { + ctx.Response.Header.Add("X-Response", "true") + ctx.Response.Header.Add("X-Custom-Header-2", "response") + ctx.Success("aaa/bbb", []byte("Ok response!")) + }, + } + + ln := fasthttputil.NewInmemoryListener() + + go func() { + if err := server.Serve(ln); err != nil { + t.Errorf("unexpected error: %v", err) + } + }() + + testT := func() fasthttp.RoundTripper { + c, _ := ln.Dial() + br := bufio.NewReader(c) + bw := bufio.NewWriter(c) + return &transportTest{br: br, bw: bw, isErr: true} + }() + + hc := &fasthttp.HostClient{ + Transport: instafasthttp.RoundTripper(ctx, s, testT), + Addr: "example.com", + } + + r := &fasthttp.Request{} + r.Header.SetMethod(fasthttp.MethodGet) + r.Header.Set("Authorization", "Basic blah") + r.URI().SetPath("/hello") + r.URI().SetQueryString("q=term&key=s3cr3t") + r.URI().SetHost("example.com") + + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(resp) + + // Make the request + err := hc.Do(r, resp) + + assert.Error(t, err) + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + span, logSpan := spans[0], spans[1] + assert.Equal(t, 1, span.Ec) + assert.EqualValues(t, instana.ExitSpanKind, span.Kind) + + require.IsType(t, instana.HTTPSpanData{}, span.Data) + data := span.Data.(instana.HTTPSpanData) + + assert.Equal(t, instana.HTTPSpanTags{ + Method: "GET", + URL: "http://example.com/hello", + Params: "key=%3Credacted%3E&q=term", + Error: "something went wrong", + }, data.Tags) + + assert.Equal(t, span.TraceID, logSpan.TraceID) + assert.Equal(t, span.SpanID, logSpan.ParentID) + assert.Equal(t, "log.go", logSpan.Name) + + // assert that log message has been recorded within the span interval + assert.GreaterOrEqual(t, logSpan.Timestamp, span.Timestamp) + assert.LessOrEqual(t, logSpan.Duration, span.Duration) + + require.IsType(t, instana.LogSpanData{}, logSpan.Data) + logData := logSpan.Data.(instana.LogSpanData) + + assert.Equal(t, instana.LogSpanTags{ + Level: "ERROR", + Message: `error.object: "something went wrong"`, + }, logData.Tags) + + if err := ln.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestRoundTripper_DefaultTransport(t *testing.T) { + recorder := instana.NewTestRecorder() + s := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{AgentClient: alwaysReadyClient{}}, recorder)) + var numCalls int + parentSpan := s.Tracer().StartSpan("parent") + ctx := instana.ContextWithSpan(context.Background(), parentSpan) + + server := &fasthttp.Server{ + Handler: func(ctx *fasthttp.RequestCtx) { + numCalls++ + // ctx.Response.Header.Add("X-Response", "true") + // ctx.Response.Header.Add("X-Custom-Header-2", "response") + ctx.Success("aaa/bbb", []byte("Ok response!")) + }, + } + + ln := fasthttputil.NewInmemoryListener() + + go func() { + if err := server.Serve(ln); err != nil { + t.Errorf("unexpected error: %v", err) + } + }() + + hc := &fasthttp.HostClient{ + Transport: instafasthttp.RoundTripper(ctx, s, nil), + Addr: "example.com", + Dial: func(addr string) (net.Conn, error) { return ln.Dial() }, + } + + r := &fasthttp.Request{} + r.Header.SetMethod(fasthttp.MethodGet) + r.Header.Set("Authorization", "Basic blah") + r.URI().SetPath("/hello") + r.URI().SetHost("example.com") + + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(resp) + + // Make the request + err := hc.Do(r, resp) + + require.NoError(t, err) + assert.Equal(t, fasthttp.StatusOK, resp.StatusCode()) + + assert.Equal(t, 1, numCalls) + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 1) + + span := spans[0] + assert.Equal(t, 0, span.Ec) + assert.EqualValues(t, instana.ExitSpanKind, span.Kind) + + require.IsType(t, instana.HTTPSpanData{}, span.Data) + data := span.Data.(instana.HTTPSpanData) + + assert.Equal(t, instana.HTTPSpanTags{ + Status: fasthttp.StatusOK, + Method: "GET", + URL: "http://example.com/hello", + }, data.Tags) + + if err := ln.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +type alwaysReadyClient struct{} + +func (alwaysReadyClient) Ready() bool { return true } +func (alwaysReadyClient) SendMetrics(data acceptor.Metrics) error { return nil } +func (alwaysReadyClient) SendEvent(event *instana.EventData) error { return nil } +func (alwaysReadyClient) SendSpans(spans []instana.Span) error { return nil } +func (alwaysReadyClient) SendProfiles(profiles []autoprofile.Profile) error { return nil } +func (alwaysReadyClient) Flush(context.Context) error { return nil } + +type transportTest struct { + // If the transport is expected to return an error + isErr bool + + br *bufio.Reader + bw *bufio.Writer + + // for extracting tracer headers from request + traceIDHeader string + spanIDHeader string +} + +func (t *transportTest) RoundTrip(hc *fasthttp.HostClient, req *fasthttp.Request, res *fasthttp.Response) (retry bool, err error) { + if t.isErr { + serverErr := errors.New("something went wrong") + return false, serverErr + } + + if err = req.Write(t.bw); err != nil { + return false, err + } + if err = t.bw.Flush(); err != nil { + return false, err + } + + // extract tracer specific headers + t.traceIDHeader = string(req.Header.Peek(instana.FieldT)) + t.spanIDHeader = string(req.Header.Peek(instana.FieldS)) + + err = res.Read(t.br) + return err != nil, err +} diff --git a/instrumentation/instafasthttp/instrumentation.go b/instrumentation/instafasthttp/server.go similarity index 57% rename from instrumentation/instafasthttp/instrumentation.go rename to instrumentation/instafasthttp/server.go index b528083f3..e91b59e41 100644 --- a/instrumentation/instafasthttp/instrumentation.go +++ b/instrumentation/instafasthttp/server.go @@ -6,7 +6,6 @@ package instafasthttp import ( "context" "errors" - "net/http" "net/url" "strings" @@ -72,7 +71,6 @@ func TraceHandler(sensor instana.TracerLogger, routeID, pathTemplate string, han defer span.Finish() var params url.Values - collectedHeaders := make(map[string]string) var collectableHTTPHeaders []string if t, ok := tracer.(instana.Tracer); ok { @@ -81,8 +79,10 @@ func TraceHandler(sensor instana.TracerLogger, routeID, pathTemplate string, han collectableHTTPHeaders = opts.CollectableHTTPHeaders } + collectedHeaders := make(map[string]string, len(collectableHTTPHeaders)) + // ensure collected headers/params are sent in case of panic/error - defer sendParamsAndHeaders(span, params, collectedHeaders) + defer setHeadersAndParamsToSpan(span, collectedHeaders, params) collectHeadersFastHTTP(reqHeaders, collectableHTTPHeaders, collectedHeaders) @@ -136,50 +136,6 @@ func initSpanOptionsFastHttp(req *fasthttp.Request, routeID string) []ot.StartSp return opts } -// interface for req and res headers -// used to collect headers -type headerVisiter interface { - VisitAll(f func(key, value []byte)) -} - -func collectAllHeaders(header headerVisiter) http.Header { - headers := make(http.Header, 0) - - header.VisitAll(func(key, value []byte) { - headerKey := make([]byte, len(key)) - copy(headerKey, key) - - headerVal := make([]byte, len(value)) - copy(headerVal, value) - - headers.Add(string(headerKey), string(headerVal)) - }) - - return headers -} - -func processResponseStatusFasthttp(response *fasthttp.Response, span ot.Span) { - stCode := response.StatusCode() - if stCode > 0 { - if stCode >= fasthttp.StatusInternalServerError { - statusText := fasthttp.StatusMessage(stCode) - - span.SetTag("http.error", statusText) - span.LogFields(otlog.Object("error", statusText)) - } - - span.SetTag("http.status", stCode) - } -} - -func collectHeadersFastHTTP(headers http.Header, collectableHTTPHeaders []string, collectedHeaders map[string]string) { - for _, h := range collectableHTTPHeaders { - if v := headers.Get(h); v != "" { - collectedHeaders[h] = v - } - } -} - func extractStartSpanOptionsFromHeadersFastHttp(tracer ot.Tracer, req *fasthttp.Request, headers map[string][]string, sensor instana.TracerLogger) []ot.StartSpanOption { var opts []ot.StartSpanOption wireContext, err := tracer.Extract(ot.HTTPHeaders, ot.HTTPHeadersCarrier(headers)) @@ -196,107 +152,16 @@ func extractStartSpanOptionsFromHeadersFastHttp(tracer ot.Tracer, req *fasthttp. return opts } -func collectHTTPParamsFastHttp(req *fasthttp.Request, matcher instana.Matcher) url.Values { - params, _ := url.ParseQuery(string(req.URI().QueryString())) +func processResponseStatusFasthttp(response *fasthttp.Response, span ot.Span) { + stCode := response.StatusCode() + if stCode > 0 { + if stCode >= fasthttp.StatusInternalServerError { + statusText := fasthttp.StatusMessage(stCode) - for k := range params { - if matcher.Match(k) { - params[k] = []string{""} + span.SetTag("http.error", statusText) + span.LogFields(otlog.Object("error", statusText)) } - } - - return params -} - -func sendParamsAndHeaders(span ot.Span, params url.Values, collectedHeaders map[string]string) { - if len(collectedHeaders) > 0 { - span.SetTag("http.header", collectedHeaders) - } - if len(params) > 0 { - span.SetTag("http.params", params.Encode()) - } -} - -type tracingRoundTripper func(*fasthttp.HostClient, *fasthttp.Request, *fasthttp.Response) (bool, error) -func (rt tracingRoundTripper) RoundTrip(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response) (retry bool, err error) { - return rt(hc, req, resp) -} - -// RoundTripper wraps an existing fasthttp.RoundTripper and injects the tracing headers into the outgoing request. -// If the original RoundTripper is nil, the fasthttp.DefaultTransport will be used. -func RoundTripper(ctx context.Context, sensor instana.TracerLogger, original fasthttp.RoundTripper) fasthttp.RoundTripper { - if ctx == nil { - ctx = context.Background() - } - if original == nil { - original = fasthttp.DefaultTransport + span.SetTag("http.status", stCode) } - return tracingRoundTripper(func(hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response) (bool, error) { - sanitizedURL := new(fasthttp.URI) - req.URI().CopyTo(sanitizedURL) - sanitizedURL.SetUsername("") - sanitizedURL.SetPassword("") - sanitizedURL.SetQueryString("") - - opts := []ot.StartSpanOption{ - ext.SpanKindRPCClient, - ot.Tags{ - "http.url": sanitizedURL.String(), - "http.method": req.Header.Method(), - }, - } - - tracer := sensor.Tracer() - parentSpan, ok := instana.SpanFromContext(ctx) - if ok { - tracer = parentSpan.Tracer() - opts = append(opts, ot.ChildOf(parentSpan.Context())) - } - - span := tracer.StartSpan("http", opts...) - defer span.Finish() - - // clone the request since the RoundTrip should not modify the original one - reqClone := &fasthttp.Request{} - req.CopyTo(reqClone) - - // Inject the span details to the headers - h := make(ot.HTTPHeadersCarrier) - tracer.Inject(span.Context(), ot.HTTPHeaders, h) - for k, v := range h { - reqClone.Header.Del(k) - reqClone.Header.Set(k, strings.Join(v, ",")) - } - - var params url.Values - collectedHeaders := make(map[string]string) - - var collectableHTTPHeaders []string - if t, ok := tracer.(instana.Tracer); ok { - opts := t.Options() - params = collectHTTPParamsFastHttp(req, opts.Secrets) - collectableHTTPHeaders = opts.CollectableHTTPHeaders - } - - // ensure collected headers/params are sent in case of panic/error - defer sendParamsAndHeaders(span, params, collectedHeaders) - - reqHeaders := collectAllHeaders(&req.Header) - collectHeadersFastHTTP(reqHeaders, collectableHTTPHeaders, collectedHeaders) - - retry, err := original.RoundTrip(hc, reqClone, resp) - if err != nil { - span.SetTag("http.error", err.Error()) - span.LogFields(otlog.Error(err)) - return retry, err - } - - resHeaders := collectAllHeaders(&resp.Header) - collectHeadersFastHTTP(resHeaders, collectableHTTPHeaders, collectedHeaders) - - span.SetTag(string(ext.HTTPStatusCode), resp.StatusCode()) - - return retry, err - }) } diff --git a/instrumentation/instafasthttp/instrumentation_test.go b/instrumentation/instafasthttp/server_test.go similarity index 71% rename from instrumentation/instafasthttp/instrumentation_test.go rename to instrumentation/instafasthttp/server_test.go index 0a9f24b5b..03c99dc08 100644 --- a/instrumentation/instafasthttp/instrumentation_test.go +++ b/instrumentation/instafasthttp/server_test.go @@ -5,16 +5,11 @@ package instafasthttp_test import ( "bufio" "bytes" - "context" - "errors" "fmt" - "net" "strings" "testing" instana "github.com/instana/go-sensor" - "github.com/instana/go-sensor/acceptor" - "github.com/instana/go-sensor/autoprofile" "github.com/instana/go-sensor/instrumentation/instafasthttp" "github.com/instana/go-sensor/w3ctrace" "github.com/stretchr/testify/assert" @@ -629,256 +624,6 @@ func TestTracingHandlerFunc_PanicHandling(t *testing.T) { }, logData.Tags) } -func TestRoundTripper(t *testing.T) { - recorder := instana.NewTestRecorder() - opts := &instana.Options{ - Service: "test-service", - Tracer: instana.TracerOptions{ - CollectableHTTPHeaders: []string{"x-custom-header-1", "x-custom-header-2"}, - }, - AgentClient: alwaysReadyClient{}, - } - tracer := instana.NewTracerWithEverything(opts, recorder) - s := instana.NewSensorWithTracer(tracer) - - parentSpan := tracer.StartSpan("parent") - ctx := instana.ContextWithSpan(context.Background(), parentSpan) - - server := &fasthttp.Server{ - Handler: func(ctx *fasthttp.RequestCtx) { - ctx.Response.Header.Add("X-Response", "true") - ctx.Response.Header.Add("X-Custom-Header-2", "response") - ctx.Success("aaa/bbb", []byte("Ok response!")) - }, - } - - ln := fasthttputil.NewInmemoryListener() - - go func() { - if err := server.Serve(ln); err != nil { - t.Errorf("unexpected error: %v", err) - } - }() - - testT := func() fasthttp.RoundTripper { - c, _ := ln.Dial() - br := bufio.NewReader(c) - bw := bufio.NewWriter(c) - return &transportTest{br: br, bw: bw} - }() - - hc := &fasthttp.HostClient{ - Transport: instafasthttp.RoundTripper(ctx, s, testT), - Addr: "example.com", - } - - r := &fasthttp.Request{} - r.Header.SetMethod(fasthttp.MethodGet) - r.Header.Set("X-Custom-Header-1", "request") - r.Header.Set("Authorization", "Basic blah") - r.URI().SetPath("/hello") - r.URI().SetQueryString("q=term&sensitive_key=s3cr3t&myPassword=qwerty&SECRET_VALUE=1") - r.URI().SetHost("example.com") - - resp := fasthttp.AcquireResponse() - defer fasthttp.ReleaseResponse(resp) - - // Make the request - err := hc.Do(r, resp) - - require.NoError(t, err) - - parentSpan.Finish() - - spans := recorder.GetQueuedSpans() - require.Len(t, spans, 2) - - cSpan, pSpan := spans[0], spans[1] - assert.Equal(t, 0, cSpan.Ec) - assert.EqualValues(t, instana.ExitSpanKind, cSpan.Kind) - - assert.Equal(t, pSpan.TraceID, cSpan.TraceID) - assert.Equal(t, pSpan.SpanID, cSpan.ParentID) - - assert.Equal(t, instana.FormatID(cSpan.TraceID), testT.(*transportTest).traceIDHeader) - assert.Equal(t, instana.FormatID(cSpan.SpanID), testT.(*transportTest).spanIDHeader) - - require.IsType(t, instana.HTTPSpanData{}, cSpan.Data) - data := cSpan.Data.(instana.HTTPSpanData) - - assert.Equal(t, instana.HTTPSpanTags{ - Method: "GET", - Status: fasthttp.StatusOK, - URL: "http://example.com/hello", - Params: "SECRET_VALUE=%3Credacted%3E&myPassword=%3Credacted%3E&q=term&sensitive_key=%3Credacted%3E", - Headers: map[string]string{ - "x-custom-header-1": "request", - "x-custom-header-2": "response", - }, - }, data.Tags) - - if err := ln.Close(); err != nil { - t.Fatalf("unexpected error: %v", err) - } -} - -func TestRoundTripper_Error(t *testing.T) { - - recorder := instana.NewTestRecorder() - s := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{AgentClient: alwaysReadyClient{}}, recorder)) - - parentSpan := s.Tracer().StartSpan("parent") - ctx := instana.ContextWithSpan(context.Background(), parentSpan) - - server := &fasthttp.Server{ - Handler: func(ctx *fasthttp.RequestCtx) { - ctx.Response.Header.Add("X-Response", "true") - ctx.Response.Header.Add("X-Custom-Header-2", "response") - ctx.Success("aaa/bbb", []byte("Ok response!")) - }, - } - - ln := fasthttputil.NewInmemoryListener() - - go func() { - if err := server.Serve(ln); err != nil { - t.Errorf("unexpected error: %v", err) - } - }() - - testT := func() fasthttp.RoundTripper { - c, _ := ln.Dial() - br := bufio.NewReader(c) - bw := bufio.NewWriter(c) - return &transportTest{br: br, bw: bw, isErr: true} - }() - - hc := &fasthttp.HostClient{ - Transport: instafasthttp.RoundTripper(ctx, s, testT), - Addr: "example.com", - } - - r := &fasthttp.Request{} - r.Header.SetMethod(fasthttp.MethodGet) - r.Header.Set("Authorization", "Basic blah") - r.URI().SetPath("/hello") - r.URI().SetQueryString("q=term&key=s3cr3t") - r.URI().SetHost("example.com") - - resp := fasthttp.AcquireResponse() - defer fasthttp.ReleaseResponse(resp) - - // Make the request - err := hc.Do(r, resp) - - assert.Error(t, err) - - spans := recorder.GetQueuedSpans() - require.Len(t, spans, 2) - - span, logSpan := spans[0], spans[1] - assert.Equal(t, 1, span.Ec) - assert.EqualValues(t, instana.ExitSpanKind, span.Kind) - - require.IsType(t, instana.HTTPSpanData{}, span.Data) - data := span.Data.(instana.HTTPSpanData) - - assert.Equal(t, instana.HTTPSpanTags{ - Method: "GET", - URL: "http://example.com/hello", - Params: "key=%3Credacted%3E&q=term", - Error: "something went wrong", - }, data.Tags) - - assert.Equal(t, span.TraceID, logSpan.TraceID) - assert.Equal(t, span.SpanID, logSpan.ParentID) - assert.Equal(t, "log.go", logSpan.Name) - - // assert that log message has been recorded within the span interval - assert.GreaterOrEqual(t, logSpan.Timestamp, span.Timestamp) - assert.LessOrEqual(t, logSpan.Duration, span.Duration) - - require.IsType(t, instana.LogSpanData{}, logSpan.Data) - logData := logSpan.Data.(instana.LogSpanData) - - assert.Equal(t, instana.LogSpanTags{ - Level: "ERROR", - Message: `error.object: "something went wrong"`, - }, logData.Tags) - - if err := ln.Close(); err != nil { - t.Fatalf("unexpected error: %v", err) - } -} - -func TestRoundTripper_DefaultTransport(t *testing.T) { - recorder := instana.NewTestRecorder() - s := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{AgentClient: alwaysReadyClient{}}, recorder)) - var numCalls int - parentSpan := s.Tracer().StartSpan("parent") - ctx := instana.ContextWithSpan(context.Background(), parentSpan) - - server := &fasthttp.Server{ - Handler: func(ctx *fasthttp.RequestCtx) { - numCalls++ - // ctx.Response.Header.Add("X-Response", "true") - // ctx.Response.Header.Add("X-Custom-Header-2", "response") - ctx.Success("aaa/bbb", []byte("Ok response!")) - }, - } - - ln := fasthttputil.NewInmemoryListener() - - go func() { - if err := server.Serve(ln); err != nil { - t.Errorf("unexpected error: %v", err) - } - }() - - hc := &fasthttp.HostClient{ - Transport: instafasthttp.RoundTripper(ctx, s, nil), - Addr: "example.com", - Dial: func(addr string) (net.Conn, error) { return ln.Dial() }, - } - - r := &fasthttp.Request{} - r.Header.SetMethod(fasthttp.MethodGet) - r.Header.Set("Authorization", "Basic blah") - r.URI().SetPath("/hello") - r.URI().SetHost("example.com") - - resp := fasthttp.AcquireResponse() - defer fasthttp.ReleaseResponse(resp) - - // Make the request - err := hc.Do(r, resp) - - require.NoError(t, err) - assert.Equal(t, fasthttp.StatusOK, resp.StatusCode()) - - assert.Equal(t, 1, numCalls) - - spans := recorder.GetQueuedSpans() - require.Len(t, spans, 1) - - span := spans[0] - assert.Equal(t, 0, span.Ec) - assert.EqualValues(t, instana.ExitSpanKind, span.Kind) - - require.IsType(t, instana.HTTPSpanData{}, span.Data) - data := span.Data.(instana.HTTPSpanData) - - assert.Equal(t, instana.HTTPSpanTags{ - Status: fasthttp.StatusOK, - Method: "GET", - URL: "http://example.com/hello", - }, data.Tags) - - if err := ln.Close(); err != nil { - t.Fatalf("unexpected error: %v", err) - } -} - func verifyResponse(t *testing.T, r *bufio.Reader, expectedStatusCode int, expectedContentType, expectedBody string) *fasthttp.Response { var resp fasthttp.Response if err := resp.Read(r); err != nil { @@ -906,45 +651,3 @@ func verifyResponseHeader(t *testing.T, h *fasthttp.ResponseHeader, expectedStat t.Fatalf("Unexpected content encoding %q. Expected %q", h.ContentEncoding(), expectedContentEncoding) } } - -type alwaysReadyClient struct{} - -func (alwaysReadyClient) Ready() bool { return true } -func (alwaysReadyClient) SendMetrics(data acceptor.Metrics) error { return nil } -func (alwaysReadyClient) SendEvent(event *instana.EventData) error { return nil } -func (alwaysReadyClient) SendSpans(spans []instana.Span) error { return nil } -func (alwaysReadyClient) SendProfiles(profiles []autoprofile.Profile) error { return nil } -func (alwaysReadyClient) Flush(context.Context) error { return nil } - -type transportTest struct { - // If the transport is expected to return an error - isErr bool - - br *bufio.Reader - bw *bufio.Writer - - // for extracting tracer headers from request - traceIDHeader string - spanIDHeader string -} - -func (t *transportTest) RoundTrip(hc *fasthttp.HostClient, req *fasthttp.Request, res *fasthttp.Response) (retry bool, err error) { - if t.isErr { - serverErr := errors.New("something went wrong") - return false, serverErr - } - - if err = req.Write(t.bw); err != nil { - return false, err - } - if err = t.bw.Flush(); err != nil { - return false, err - } - - // extract tracer specific headers - t.traceIDHeader = string(req.Header.Peek(instana.FieldT)) - t.spanIDHeader = string(req.Header.Peek(instana.FieldS)) - - err = res.Read(t.br) - return err != nil, err -}