diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index 7b85145b99fe..5a3c26c0f756 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -113,8 +113,8 @@ The state has the following elements: - `first_event`: A map representing the first event sent to the output (result from applying transforms to `last_response.body`). - `last_event`: A map representing the last event of the current request in the requests chain (result from applying transforms to `last_response.body`). - `url`: The last requested URL as a raw https://pkg.go.dev/net/url#URL[`url.URL`] Go type. -- `header`: A map containing the headers. References the next request headers when used in <> or <> configuration sections, and to the last response headers when used in <>, <>, or <> configuration sections. -- `body`: A map containing the body. References the next request body when used in <> or <> configuration sections, and to the last response body when used in <> or <> configuration sections. +- `header`: A map containing the headers. References the next request headers when used in <> or <> configuration sections, and to the last response headers when used in <>, <>, or <> configuration sections. +- `body`: A map containing the body. References the next request body when used in <> or <> configuration sections, and to the last response body when used in <> or <> configuration sections. - `cursor`: A map containing any data the user configured to be stored between restarts (See <>). All of the mentioned objects are only stored at runtime, except `cursor`, which has values that are persisted between restarts. @@ -501,7 +501,7 @@ If the `remaining` header is missing from the Response, no rate-limiting will oc The value of the response that specifies the epoch time when the rate limit will reset. It is defined with a Go template value. Can read state from: [`.last_response.header`] -[[request-transforms]] +[[request-transforms-headers]] [float] ==== `request.rate_limit.early_limit` @@ -521,6 +521,7 @@ e.g. instead of rate-limiting when `remaining` hits `0`, rate-limiting will occu It is not set by default (by default the rate-limiting as specified in the Response is followed). +[[request-transforms]] [float] ==== `request.transforms` @@ -607,7 +608,7 @@ Defines the field type of the target. Allowed values: `array`, `map`, `string`. [float] ==== `response.split[].transforms` -A set of transforms can be defined. This list will be applied after `response.transforms` and after the object has been modified based on `response.split[].keep_parent` and `response.split[].key_field`. +A set of transforms can be defined. This list will be applied after <> and after the object has been modified based on `response.split[].keep_parent` and `response.split[].key_field`. Available transforms for response: [`append`, `delete`, `set`]. @@ -651,7 +652,7 @@ If set to true, the values in `request.body` are sent for pagination requests. D [float] ==== `response.pagination` -List of transforms that will be applied to the response to every new page request. All the transforms from `request.transform` will be executed and then `response.pagination` will be added to modify the next request as needed. For subsequent responses, the usual `response.transforms` and `response.split` will be executed normally. +List of transforms that will be applied to the response to every new page request. All the transforms from `request.transform` will be executed and then `response.pagination` will be added to modify the next request as needed. For subsequent responses, the usual <> and <> will be executed normally. Available transforms for pagination: [`append`, `delete`, `set`]. @@ -967,7 +968,7 @@ Contains basic request and response configuration for chained calls. [float] ==== `chain[].step.request` -Please refer <>. Required. +See <>. Required. Example: @@ -980,12 +981,14 @@ Third call: https://example.com/services/data/v1.0/export_ids/file_1/info [float] ==== `chain[].step.response.split` -Please refer <>. +See <>. ++ +[[chain-step-replace]] [float] ==== `chain[].step.replace` -A [JSONPath](https://goessner.net/articles/JsonPath/index.html#e2[JSONPath]) string to parse values from responses JSON, collected from previous chain steps. Place same replace string in url where collected values from previous call should be placed. Required. +A link:https://goessner.net/articles/JsonPath/index.html#e2[JSONPath] string to parse values from responses JSON, collected from previous chain steps. Place same replace string in url where collected values from previous call should be placed. Required. Example: @@ -1091,7 +1094,162 @@ request_url using file_name as 'file_2': https://example.com/services/data/v1.0/ + Collect and make events from response in any format supported by httpjson for all calls. -NOTE: httpjson chain will only create and ingest events from last call on chained configurations. Also, the current chain only supports the following: `request.url`, `request.method`, `request.transforms`, `request.body`, `response.transforms` and `response.split`. +[float] +==== `chain[].while` + +Contains basic request and response configuration for chained while calls. Chained while calls will keep making the requests for a given number of times until a condition is met +or the maximum number of attempts gets exhausted. While chain has an attribute `until` which holds the expression to be evaluated. Ideally the `until` field should always be used +together with the attributes `request.retry.max_attempts` and `request.retry.wait_min` which specifies the maximum number of attempts to evaluate `until` before giving up and the +maximum wait time in between such requests. If `request.retry.max_attempts` is not specified, it will only try to evaluate the expression once and give up if it fails. If +`request.retry.wait_min` is not specified the default wait time will always be `0` as in successive calls will be made immediately. + +[float] +==== `chain[].while.request` + +See <> . + +Example: + +First call: http://example.com/services/data/v1.0/exports + +Second call: http://example.com/services/data/v1.0/9ef0e6a5/export_ids/status + +Third call: http://example.com/services/data/v1.0/export_ids/1/info + +[float] +==== `chain[].while.response.split` + +See <> . + +[float] +==== `chain[].while.replace` + +See <> . + +Example: + +- First call: http://example.com/services/data/v1.0/exports ++ +<> + +- Second call: http://example.com/services/data/v1.0/`$.exportId`/export_ids/status ++ +<> + +- Third call: http://example.com/services/data/v1.0/export_ids/`$.files[:].id`/info ++ +<> , <> + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +- type: httpjson + enabled: true + # first call + id: my-httpjson-id + request.url: http://example.com/services/data/v1.0/exports + interval: 1h + chain: + # second call + - while: + request.url: http://example.com/services/data/v1.0/$.exportId/export_ids/status + request.method: GET + replace: $.exportId + until: '[[ eq .last_response.body.status "completed" ]]' + request.retry.max_attempts: 5 + request.retry.wait_min: 5s + # third call + - step: + request.url: http://example.com/services/data/v1.0/export_ids/$.files[:].id/info + request.method: GET + replace: $.files[:].id +---- + +Example: + +- First call to collect export ids + ++ +request_url: https://example.com/services/data/v1.0/exports + ++ +response_json: + ++ +[[response-json1-while]] +["source","json",subs="attributes"] +---- +{ + "exportId": "9ef0e6a5" +} +---- + +- Second call to collect `file_ids` using collected id from first call when `response.body.sataus == "completed"`. This call continues until the condition is satisfied or the maximum number of attempts gets exhausted. + ++ +request_url using id as '9ef0e6a5': https://example.com/services/data/v1.0/9ef0e6a5/export_ids/status + ++ +response_json using id as '9ef0e6a5': + ++ +[[response-json2-while]] +["source","json",subs="attributes"] +---- +{ + "status": "completed", + "files": [ + { + "id": 1 + }, + { + "id": 2 + }, + { + "id": 3 + } + ] +} +---- + +- Third call to collect `files` using collected `file_id` from second call. + ++ +request_url using file_id as '1': https://example.com/services/data/v1.0/export_ids/1/info + ++ +request_url using file_id as '2': https://example.com/services/data/v1.0/export_ids/2/info + ++ +response_json using id as '1': + ++ +[[response-json3-while]] +["source","json",subs="attributes"] +---- +{ + "file_name": "file_1", + "file_data": "some data" +} +---- + ++ +response_json using id as '2': + ++ +[[response-json4-while]] +["source","json",subs="attributes"] +---- +{ + "file_name": "file_2", + "file_data": "some data" +} +---- + ++ +Collect and make events from response in any format supported by httpjson for all calls. + +NOTE: httpjson chain will only create and ingest events from last call on chained configurations. Also, the current chain only supports the following: all <>, <> and <>. [[cursor]] [float] @@ -1134,11 +1292,11 @@ filebeat.inputs: image:images/input-httpjson-lifecycle.png[Request lifecycle] . At every defined interval a new request is created. -. The request is transformed using the configured `request.transforms`. +. The request is transformed using the configured <>. . The resulting transformed request is executed. . The server responds (here is where any retry or rate limit policy takes place when configured). -. The response is transformed using the configured `response.transforms` and `response.split`. -. If a chain step is configured. Each step will generate new requests based on collected IDs from responses. The requests will be transformed using configured `request.transforms` and the resulting generated transformed requests will be executed. This process will happen for all the steps mentioned in the chain. +. The response is transformed using the configured <> and <>. +. If a chain step is configured. Each step will generate new requests based on collected IDs from responses. The requests will be transformed using configured <> and the resulting generated transformed requests will be executed. This process will happen for all the steps mentioned in the chain. . Each resulting event is published to the output. . If a `response.pagination` is configured and there are more pages, a new request is created using it, otherwise the process ends until the next interval. diff --git a/x-pack/filebeat/input/httpjson/chain.go b/x-pack/filebeat/input/httpjson/chain.go index f877876bee38..14fcafb9ce24 100644 --- a/x-pack/filebeat/input/httpjson/chain.go +++ b/x-pack/filebeat/input/httpjson/chain.go @@ -56,16 +56,12 @@ package httpjson -import ( - "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/elastic-agent-libs/transport/httpcommon" -) - // chainConfig for chain request. // Following contains basic call structure for each call after normal httpjson // call. type chainConfig struct { - Step stepConfig `config:"step" validate:"required"` + Step *stepConfig `config:"step,omitempty"` + While *whileConfig `config:"while,omitempty"` } // stepConfig will contain basic properties like, request.url, @@ -73,21 +69,47 @@ type chainConfig struct { // will contain replace string with original URL to make a skeleton for the // call request. type stepConfig struct { - Request requestChainConfig `config:"request"` + Auth *authConfig `config:"auth"` + Request requestConfig `config:"request" validate:"required"` Response responseChainConfig `config:"response,omitempty"` Replace string `config:"replace,omitempty"` } -type requestChainConfig struct { - URL *urlConfig `config:"url" validate:"required"` - Method string `config:"method" validate:"required"` - Body *mapstr.M `config:"body"` - Transforms transformsConfig `config:"transforms"` - - Transport httpcommon.HTTPTransportSettings `config:",inline"` +// whileConfig will contain basic properties like auth parameters, request parameters, +// response parameters , a replace parameter and an expression parameter called 'until'. +// While is similar to stepConfig with the addition of 'until'. 'until' holds an expression +// and with the combination of "request.retry.max_attempts" retries a request 'until' the +// expression is evaluated to "true" or request.retry.max_attempts is exhausted. If +// request.retry.max_attempts is not specified , the max_attempts is always 1. +type whileConfig struct { + Auth *authConfig `config:"auth"` + Request requestConfig `config:"request" validate:"required"` + Response responseChainConfig `config:"response,omitempty"` + Replace string `config:"replace,omitempty"` + Until *valueTpl `config:"until" validate:"required"` } type responseChainConfig struct { Transforms transformsConfig `config:"transforms"` Split *splitConfig `config:"split"` } + +func defaultChainConfig() config { + chaincfg := defaultConfig() + chaincfg.Chain = []chainConfig{ + { + While: &whileConfig{ + Auth: chaincfg.Auth, + Request: *chaincfg.Request, + Response: responseChainConfig{}, + }, + Step: &stepConfig{ + Auth: chaincfg.Auth, + Request: *chaincfg.Request, + Response: responseChainConfig{}, + }, + }, + } + + return chaincfg +} diff --git a/x-pack/filebeat/input/httpjson/config.go b/x-pack/filebeat/input/httpjson/config.go index cf8e018fa870..241d8f04b100 100644 --- a/x-pack/filebeat/input/httpjson/config.go +++ b/x-pack/filebeat/input/httpjson/config.go @@ -36,6 +36,11 @@ func (c config) Validate() error { if c.Interval <= 0 { return errors.New("interval must be greater than 0") } + for _, v := range c.Chain { + if v.Step == nil && v.While == nil { + return errors.New("both step & while blocks in a chain cannot be empty") + } + } return nil } diff --git a/x-pack/filebeat/input/httpjson/config_request.go b/x-pack/filebeat/input/httpjson/config_request.go index b4a6c5bfc55e..2e2b2402f9f0 100644 --- a/x-pack/filebeat/input/httpjson/config_request.go +++ b/x-pack/filebeat/input/httpjson/config_request.go @@ -49,8 +49,10 @@ func (c retryConfig) getWaitMin() time.Duration { } func (c retryConfig) getWaitMax() time.Duration { - if c.WaitMax == nil { + if c.WaitMax == nil && c.WaitMin == nil { return 0 + } else if c.WaitMax == nil && c.WaitMin != nil { + return *c.WaitMin } return *c.WaitMax } diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index ff4755c93472..84241a800a31 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -112,7 +112,11 @@ func run( return err } - requestFactory := newRequestFactory(config, log) + requestFactory, err := newRequestFactory(stdCtx, config, log) + if err != nil { + log.Errorf("Error while creating requestFactory: %v", err) + return err + } pagination := newPagination(config, httpClient, log) responseProcessor := newResponseProcessor(config, pagination, log) requester := newRequester(httpClient, requestFactory, responseProcessor, log) diff --git a/x-pack/filebeat/input/httpjson/policy.go b/x-pack/filebeat/input/httpjson/policy.go new file mode 100644 index 000000000000..0c671cb85bbb --- /dev/null +++ b/x-pack/filebeat/input/httpjson/policy.go @@ -0,0 +1,125 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + "bytes" + "context" + "crypto/x509" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "regexp" + + "github.com/elastic/elastic-agent-libs/logp" +) + +var ( + + // A regular expression to match the error returned by net/http when the + // configured number of redirects is exhausted. This error isn't typed + // specifically so we resort to matching on the error string. + redirectsErrorRe = regexp.MustCompile(`stopped after \d+ redirects\z`) + + // A regular expression to match the error returned by net/http when the + // scheme specified in the URL is invalid. This error isn't typed + // specifically so we resort to matching on the error string. + schemeErrorRe = regexp.MustCompile(`unsupported protocol scheme`) +) + +// Evaluate is a template expression evaluation function which accepts a +// valid go text/template expression and evaluates the expected field value to the +// field value present in data using the defined operator/function in the given expression. +// Example : [[ eq .last_response.body.status "completed" ]] -- which means here data is a http response +// containing a field "status" under the field "body" , and value status should be equal to the string "completed" +type Evaluate func(expression *valueTpl, data []byte, log *logp.Logger) (bool, error) + +// Policy is responsible for maintaining different http client policies +// Currently just contains a retry policy function +type Policy struct { + fn Evaluate + expression *valueTpl + log *logp.Logger +} + +func newHTTPPolicy(fn Evaluate, expression *valueTpl, log *logp.Logger) *Policy { + return &Policy{ + fn: fn, + expression: expression, + log: log, + } +} + +// CustomRetryPolicy provides a custom callback for Client.CheckRetry, which +// will retry on connection errors and server errors. +func (p *Policy) CustomRetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error) { + // do not retry on context.Canceled or context.DeadlineExceeded + if ctx.Err() != nil { + return false, ctx.Err() + } + + if err != nil { + var v *url.Error + if errors.As(err, &v) { + // Don't retry if the error was due to too many redirects. + if redirectsErrorRe.MatchString(v.Error()) { + return false, nil + } + + // Don't retry if the error was due to an invalid protocol scheme. + if schemeErrorRe.MatchString(v.Error()) { + return false, nil + } + + // Don't retry if the error was due to TLS cert verification failure. + var k x509.UnknownAuthorityError + if errors.As(v.Err, &k) { + return false, nil + } + } + + // The error is likely recoverable so retry. + return true, nil + } + + // Check the response code. We retry on 500-range responses to allow + // the server time to recover, as 500's are typically not permanent + // errors and may relate to outages on the server side. This will catch + // invalid response codes as well, like 0 and 999. + if resp.StatusCode == 0 || (resp.StatusCode >= 500 && resp.StatusCode != 501) { + return true, nil + } + + // Evaluate custom expression + if p.fn != nil && p.expression != nil { + var retry bool + + body, err := io.ReadAll(resp.Body) + if err != nil { + return retry, fmt.Errorf("failed to read http response body : %w", err) + } + + err = resp.Body.Close() + if err != nil { + return retry, fmt.Errorf("error closing response body : %w", err) + } + resp.Body = io.NopCloser(bytes.NewBuffer(body)) + + result, err := p.fn(p.expression, body, p.log) + if err != nil { + return retry, err + } + + if !result { + retry = true + } + + return retry, nil + } + + return false, nil +} diff --git a/x-pack/filebeat/input/httpjson/policy_test.go b/x-pack/filebeat/input/httpjson/policy_test.go new file mode 100644 index 000000000000..656881eeb074 --- /dev/null +++ b/x-pack/filebeat/input/httpjson/policy_test.go @@ -0,0 +1,163 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + "bytes" + "context" + "crypto/x509" + "io" + "net/http" + "net/url" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/logp" +) + +func TestPolicy_CustomRetryPolicy(t *testing.T) { + statusCompleted := `{"status":"completed"}` + statusInitiated := `{"status":"initiated"}` + + exp := &valueTpl{} + err := exp.Unpack(`[[ eq .last_response.body.status "completed" ]]`) + assert.NoError(t, err) + + expErr := &valueTpl{} + err = exp.Unpack("") + assert.NoError(t, err) + + statusCompletedResponse := getTestResponse(statusCompleted, 200) + defer statusCompletedResponse.Body.Close() + statusInitiatedResponse := getTestResponse(statusInitiated, 200) + defer statusInitiatedResponse.Body.Close() + internalServerErrorResponse := getTestResponse(statusCompleted, 500) + defer internalServerErrorResponse.Body.Close() + + type fields struct { + fn Evaluate + expression *valueTpl + log *logp.Logger + } + type args struct { + ctx context.Context + resp *http.Response + err error + } + tests := []struct { + name string + fields fields + args args + want bool + expectedError string + }{ + { + name: "customRetryPolicy_doNotRetryFurther", + fields: fields{ + fn: evaluateResponse, + expression: exp, + log: logp.NewLogger(""), + }, + args: args{ + ctx: context.Background(), + resp: statusCompletedResponse, + err: nil, + }, + want: false, + expectedError: "", + }, + { + name: "customRetryPolicy_keepRetrying", + fields: fields{ + fn: evaluateResponse, + expression: exp, + log: logp.NewLogger(""), + }, + args: args{ + ctx: context.Background(), + resp: statusInitiatedResponse, + err: nil, + }, + want: true, + expectedError: "", + }, + { + name: "customRetryPolicy_emptyTemplateError", + fields: fields{ + fn: evaluateResponse, + expression: expErr, + log: logp.NewLogger(""), + }, + args: args{ + ctx: context.Background(), + resp: statusCompletedResponse, + err: nil, + }, + want: false, + expectedError: "error while evaluating expression : the template result is empty", + }, + { + name: "customRetryPolicy_internalServerError", + fields: fields{ + fn: evaluateResponse, + expression: exp, + log: logp.NewLogger(""), + }, + args: args{ + ctx: context.Background(), + resp: internalServerErrorResponse, + err: nil, + }, + want: true, + expectedError: "", + }, + { + name: "customRetryPolicy_unknownCertError", + fields: fields{ + fn: evaluateResponse, + expression: exp, + log: logp.NewLogger(""), + }, + args: args{ + ctx: context.Background(), + resp: statusCompletedResponse, + err: &url.Error{Err: x509.UnknownAuthorityError{}}, + }, + want: false, + expectedError: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &Policy{ + fn: tt.fields.fn, + expression: tt.fields.expression, + log: tt.fields.log, + } + got, err := p.CustomRetryPolicy(tt.args.ctx, tt.args.resp, tt.args.err) + if err != nil { + assert.Error(t, err, tt.expectedError) + } else { + assert.Equal(t, tt.want, got) + } + }) + } +} + +func getTestResponse(exprStr string, statusCode int) *http.Response { + resp := &http.Response{ + StatusCode: statusCode, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + Body: io.NopCloser(bytes.NewBufferString(exprStr)), + ContentLength: int64(len(exprStr)), + Request: nil, + Header: make(http.Header, 0), + } + return resp +} diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index a51fef1bddd7..a1f773f044aa 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -92,18 +92,21 @@ func (rf *requestFactory) newRequest(ctx *transformContext) (transformable, erro } type requestFactory struct { - url url.URL - method string - body *mapstr.M - transforms []basicTransform - user string - password string - log *logp.Logger - encoder encoderFunc - replace string + url url.URL + method string + body *mapstr.M + transforms []basicTransform + user string + password string + log *logp.Logger + encoder encoderFunc + replace string + isChain bool + until *valueTpl + chainHTTPClient *httpClient } -func newRequestFactory(config config, log *logp.Logger) []*requestFactory { +func newRequestFactory(ctx context.Context, config config, log *logp.Logger) ([]*requestFactory, error) { // config validation already checked for errors here rfs := make([]*requestFactory, 0, len(config.Chain)+1) ts, _ := newBasicTransformsFromConfig(config.Request.Transforms, requestNamespace, log) @@ -122,20 +125,58 @@ func newRequestFactory(config config, log *logp.Logger) []*requestFactory { } rfs = append(rfs, rf) for _, ch := range config.Chain { + var rf *requestFactory // chain calls requestFactory object - ts, _ := newBasicTransformsFromConfig(ch.Step.Request.Transforms, requestNamespace, log) - rf := &requestFactory{ - url: *ch.Step.Request.URL.URL, - method: ch.Step.Request.Method, - body: ch.Step.Request.Body, - transforms: ts, - log: log, - encoder: registeredEncoders[config.Request.EncodeAs], - replace: ch.Step.Replace, + if ch.Step != nil { + ts, _ := newBasicTransformsFromConfig(ch.Step.Request.Transforms, requestNamespace, log) + ch.Step.Auth = tryAssignAuth(config.Auth, ch.Step.Auth) + httpClient, err := newChainHTTPClient(ctx, ch.Step.Auth, &ch.Step.Request, log) + if err != nil { + return nil, fmt.Errorf("failed in creating chain http client with error : %w", err) + } + if ch.Step.Auth != nil && ch.Step.Auth.Basic.isEnabled() { + rf.user = ch.Step.Auth.Basic.User + rf.password = ch.Step.Auth.Basic.Password + } + rf = &requestFactory{ + url: *ch.Step.Request.URL.URL, + method: ch.Step.Request.Method, + body: ch.Step.Request.Body, + transforms: ts, + log: log, + encoder: registeredEncoders[config.Request.EncodeAs], + replace: ch.Step.Replace, + isChain: true, + chainHTTPClient: httpClient, + } + } else if ch.While != nil { + ts, _ := newBasicTransformsFromConfig(ch.While.Request.Transforms, requestNamespace, log) + policy := newHTTPPolicy(evaluateResponse, ch.While.Until, log) + ch.While.Auth = tryAssignAuth(config.Auth, ch.While.Auth) + httpClient, err := newChainHTTPClient(ctx, ch.While.Auth, &ch.While.Request, log, policy) + if err != nil { + return nil, fmt.Errorf("failed in creating chain http client with error : %w", err) + } + if ch.While.Auth != nil && ch.While.Auth.Basic.isEnabled() { + rf.user = ch.While.Auth.Basic.User + rf.password = ch.While.Auth.Basic.Password + } + rf = &requestFactory{ + url: *ch.While.Request.URL.URL, + method: ch.While.Request.Method, + body: ch.While.Request.Body, + transforms: ts, + log: log, + encoder: registeredEncoders[config.Request.EncodeAs], + replace: ch.While.Replace, + until: ch.While.Until, + isChain: true, + chainHTTPClient: httpClient, + } } rfs = append(rfs, rf) } - return rfs + return rfs, nil } func (rf *requestFactory) newHTTPRequest(stdCtx context.Context, trCtx *transformContext) (*http.Request, error) { @@ -195,14 +236,26 @@ func newRequester( // collectResponse returns response from provided request func (rf *requestFactory) collectResponse(stdCtx context.Context, trCtx *transformContext, r *requester) (*http.Response, error) { + var err error + var httpResp *http.Response + req, err := rf.newHTTPRequest(stdCtx, trCtx) if err != nil { return nil, fmt.Errorf("failed to create http request: %w", err) } - httpResp, err := r.client.do(stdCtx, req) - if err != nil { - return nil, fmt.Errorf("failed to execute http client.Do: %w", err) + + if rf.isChain && rf.chainHTTPClient != nil { + httpResp, err = rf.chainHTTPClient.do(stdCtx, req) + if err != nil { + return nil, fmt.Errorf("failed to execute chain http client.Do: %w", err) + } + } else { + httpResp, err = r.client.do(stdCtx, req) + if err != nil { + return nil, fmt.Errorf("failed to execute http client.Do: %w", err) + } } + return httpResp, nil } diff --git a/x-pack/filebeat/input/httpjson/request_chain_helper.go b/x-pack/filebeat/input/httpjson/request_chain_helper.go new file mode 100644 index 000000000000..6f4bb669e715 --- /dev/null +++ b/x-pack/filebeat/input/httpjson/request_chain_helper.go @@ -0,0 +1,93 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + + retryablehttp "github.com/hashicorp/go-retryablehttp" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/transport/httpcommon" +) + +func newChainHTTPClient(ctx context.Context, authCfg *authConfig, requestCfg *requestConfig, log *logp.Logger, p ...*Policy) (*httpClient, error) { + // Make retryable HTTP client + netHTTPClient, err := requestCfg.Transport.Client( + httpcommon.WithAPMHTTPInstrumentation(), + httpcommon.WithKeepaliveSettings{Disable: true}, + ) + if err != nil { + return nil, err + } + + netHTTPClient.CheckRedirect = checkRedirect(requestCfg, log) + + var retryPolicyFunc retryablehttp.CheckRetry + if len(p) != 0 { + retryPolicyFunc = p[0].CustomRetryPolicy + } else { + retryPolicyFunc = retryablehttp.DefaultRetryPolicy + } + + client := &retryablehttp.Client{ + HTTPClient: netHTTPClient, + Logger: newRetryLogger(log), + RetryWaitMin: requestCfg.Retry.getWaitMin(), + RetryWaitMax: requestCfg.Retry.getWaitMax(), + RetryMax: requestCfg.Retry.getMaxAttempts(), + CheckRetry: retryPolicyFunc, + Backoff: retryablehttp.DefaultBackoff, + } + + limiter := newRateLimiterFromConfig(requestCfg.RateLimit, log) + + if authCfg != nil && authCfg.OAuth2.isEnabled() { + authClient, err := authCfg.OAuth2.client(ctx, client.StandardClient()) + if err != nil { + return nil, err + } + return &httpClient{client: authClient, limiter: limiter}, nil + } + + return &httpClient{client: client.StandardClient(), limiter: limiter}, nil +} + +func evaluateResponse(expression *valueTpl, data []byte, log *logp.Logger) (bool, error) { + var dataMap mapstr.M + + err := json.Unmarshal(data, &dataMap) + if err != nil { + return false, fmt.Errorf("error while unmarshalling data : %w", err) + } + tr := transformable{} + paramCtx := &transformContext{ + firstEvent: &mapstr.M{}, + lastEvent: &mapstr.M{}, + lastResponse: &response{body: dataMap}, + } + + val, err := expression.Execute(paramCtx, tr, nil, log) + if err != nil { + return false, fmt.Errorf("error while evaluating expression : %w", err) + } + result, err := strconv.ParseBool(val) + if err != nil { + return false, fmt.Errorf("error while parsing boolean value of string : %w", err) + } + + return result, nil +} + +func tryAssignAuth(parentConfig *authConfig, childConfig *authConfig) *authConfig { + if parentConfig != nil && childConfig == nil { + return parentConfig + } + return childConfig +} diff --git a/x-pack/filebeat/input/httpjson/request_chain_helper_test.go b/x-pack/filebeat/input/httpjson/request_chain_helper_test.go new file mode 100644 index 000000000000..f01484ec9151 --- /dev/null +++ b/x-pack/filebeat/input/httpjson/request_chain_helper_test.go @@ -0,0 +1,135 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + "bytes" + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/logp" +) + +func Test_newChainHTTPClient(t *testing.T) { + cfg := defaultChainConfig() + ctx := context.Background() + log := logp.NewLogger("newChainClientTestLogger") + + type args struct { + ctx context.Context + authCfg *authConfig + requestCfg *requestConfig + log *logp.Logger + p []*Policy + } + tests := []struct { + name string + args args + }{ + { + name: "newChainClientTest", + args: args{ + ctx: ctx, + authCfg: cfg.Auth, + requestCfg: cfg.Request, + log: log, + p: nil, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := newChainHTTPClient(tt.args.ctx, tt.args.authCfg, tt.args.requestCfg, tt.args.log, tt.args.p...) + assert.NoError(t, err) + assert.NotNil(t, got) + }) + } +} + +func Test_evaluateResponse(t *testing.T) { + log := logp.NewLogger("newEvaluateResponseTestLogger") + responseTrue := bytes.NewBufferString(`{"status": "completed"}`).Bytes() + responseFalse := bytes.NewBufferString(`{"status": "initiated"}`).Bytes() + + type args struct { + expression string + data []byte + log *logp.Logger + } + tests := []struct { + name string + args args + expectedError string + want bool + }{ + { + name: "newEvaluateResponse_resultIsTrue", + args: args{ + expression: `[[ eq .last_response.body.status "completed" ]]`, + data: responseTrue, + log: log, + }, + want: true, + expectedError: "", + }, + { + name: "newEvaluateResponse_resultIsFalse", + args: args{ + expression: `[[ eq .last_response.body.status "completed" ]]`, + data: responseFalse, + log: log, + }, + want: false, + expectedError: "", + }, + { + name: "newEvaluateResponse_invalidExpressionError", + args: args{ + expression: `eq .last_response.body.status "completed" ]]`, + data: responseFalse, + log: log, + }, + want: false, + expectedError: "error while parsing boolean value of string : strconv.ParseBool: parsing \"eq .last_response.body.status \\\"completed\\\" ]]\": invalid syntax", + }, + { + name: "newEvaluateResponse_emptyExpressionError", + args: args{ + expression: "", + data: responseFalse, + log: log, + }, + want: false, + expectedError: "error while evaluating expression : the template result is empty", + }, + { + name: "newEvaluateResponse_incompleteExpressionError", + args: args{ + expression: `[[.last_response.body.status]]`, + data: responseFalse, + log: log, + }, + want: false, + expectedError: "error while parsing boolean value of string : strconv.ParseBool: parsing \"initiated\": invalid syntax", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + expression := &valueTpl{} + err := expression.Unpack(tt.args.expression) + assert.NoError(t, err) + + got, err := evaluateResponse(expression, tt.args.data, tt.args.log) + if err != nil { + assert.EqualError(t, err, tt.expectedError) + } else { + assert.Equal(t, tt.want, got) + } + }) + } +} diff --git a/x-pack/filebeat/input/httpjson/request_test.go b/x-pack/filebeat/input/httpjson/request_test.go index c17b7cca0374..622d7c4b94bf 100644 --- a/x-pack/filebeat/input/httpjson/request_test.go +++ b/x-pack/filebeat/input/httpjson/request_test.go @@ -62,7 +62,8 @@ func TestCtxAfterDoRequest(t *testing.T) { client, err := newHTTPClient(ctx, config, log) assert.NoError(t, err) - requestFactory := newRequestFactory(config, log) + requestFactory, err := newRequestFactory(ctx, config, log) + assert.NoError(t, err) pagination := newPagination(config, client, log) responseProcessor := newResponseProcessor(config, pagination, log) diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go index a4abb6d374f6..469b71f3ff5c 100644 --- a/x-pack/filebeat/input/httpjson/response.go +++ b/x-pack/filebeat/input/httpjson/response.go @@ -81,9 +81,13 @@ func newResponseProcessor(config config, pagination *pagination, log *logp.Logge log: log, } // chain calls responseProcessor object - split, _ := newSplitResponse(ch.Step.Response.Split, log) - - rp.split = split + if ch.Step != nil { + split, _ := newSplitResponse(ch.Step.Response.Split, log) + rp.split = split + } else if ch.While != nil { + split, _ := newSplitResponse(ch.While.Response.Split, log) + rp.split = split + } rps = append(rps, rp) }