Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[elastic/beats] [HttpJson] - Improves request chaining with new 'while' step & client creation per step #32222

Merged
merged 15 commits into from
Jul 14, 2022
Merged
157 changes: 156 additions & 1 deletion x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,162 @@ request_url using file_name as 'file_2': https://example.com/services/data/v1.0/
+
Collect and make events from response in any format supported by httpjson for all calls.

NOTE: httpjson chain will only create and ingest events from last call on chained configurations. Also, the current chain only supports the following: `request.url`, `request.method`, `request.transforms`, `request.body`, `response.transforms` and `response.split`.
[float]
==== `chain[].while`

Contains basic request and response configuration for chained while calls. Chained while calls will keep making the requests for a given number of times until a condition is met
or the maximum number of attempts gets exhausted. While chain has an attribute `till` which holds the expression to be evaluated. Ideally the `till` field should always be used
together with the attributes `request.retry.max_attempts` and `request.retry.wait_min` which specifies the maximum number of attempts to evaluate `till` before giving up and the
maximum wait time in between such requests. If `request.retry.max_attempts` is not specified, it will only try to evaluate the expression once and give up if it fails. If
`request.retry.wait_min` is not specified the default wait time will always be `0` as in successive calls will be made immediately.

[float]
==== `chain[].while.request`

Please refer <<request-parameters,request parameters>>. Required.

Example:

First call: http://example.com/services/data/v1.0/exports

Second call: http://example.com/services/data/v1.0/9ef0e6a5/export_ids/status

Third call: http://example.com/services/data/v1.0/export_ids/1/info

[float]
==== `chain[].while.response.split`

Please refer <<response-split,response split parameter>>.

[float]
==== `chain[].while.replace`

Please refer `chain[].step.replace`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR still adds a few "Please refer X" expressions. My understanding is that those are not correct, please correct me if I'm wrong.

I lean towards using "See X" instead of "Please refer to X for more information" as a replacement.

Also, this one uses a `literal` instead of a <<link>>, can it be turned into a link?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adriansr updated the docs as per the suggestions. Please review once.


Example:

- First call: http://example.com/services/data/v1.0/exports
+
<<response-json1-while,response>>

- Second call: http://example.com/services/data/v1.0/`$.exportId`/export_ids/status
+
<<response-json2-while,response>>

- Third call: http://example.com/services/data/v1.0/export_ids/`$.files[:].id`/info
+
<<response-json3-while,response 1>> , <<response-json4-while,response 2>>

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: httpjson
enabled: true
# first call
id: my-httpjson-id
request.url: http://example.com/services/data/v1.0/exports
interval: 1h
chain:
# second call
- while:
request.url: http://example.com/services/data/v1.0/$.exportId/export_ids/status
request.method: GET
replace: $.exportId
till: '[[ eq .last_response.body.status "completed" ]]'
request.retry.max_attempts: 5
request.retry.wait_min: 5s
# third call
- step:
request.url: http://example.com/services/data/v1.0/export_ids/$.files[:].id/info
request.method: GET
replace: $.files[:].id
----

Example:

- First call to collect export ids

+
request_url: https://example.com/services/data/v1.0/exports

+
response_json:

+
[[response-json1-while]]
["source","json",subs="attributes"]
----
{
"exportId": "9ef0e6a5"
}
----

- Second call to collect `file_ids` using collected id from first call when `response.body.sataus == "completed"`. This call continues until the condition is satisfied or the maximum number of attempts gets exhausted.

+
request_url using id as '9ef0e6a5': https://example.com/services/data/v1.0/9ef0e6a5/export_ids/status

+
response_json using id as '9ef0e6a5':

+
[[response-json2-while]]
["source","json",subs="attributes"]
----
{
"status": "completed",
"files": [
{
"id": 1
},
{
"id": 2
},
{
"id": 3
}
]
}
----

- Third call to collect `files` using collected `file_id` from second call.

+
request_url using file_id as '1': https://example.com/services/data/v1.0/export_ids/1/info

+
request_url using file_id as '2': https://example.com/services/data/v1.0/export_ids/2/info

+
response_json using id as '1':

+
[[response-json3-while]]
["source","json",subs="attributes"]
----
{
"file_name": "file_1",
"file_data": "some data"
}
----

+
response_json using id as '2':

+
[[response-json4-while]]
["source","json",subs="attributes"]
----
{
"file_name": "file_2",
"file_data": "some data"
}
----

+
Collect and make events from response in any format supported by httpjson for all calls.

NOTE: httpjson chain will only create and ingest events from last call on chained configurations. Also, the current chain only supports the following: all <<request-parameters, request parameters>>, `response.transforms` and `response.split`.

[[cursor]]
[float]
Expand Down
53 changes: 39 additions & 14 deletions x-pack/filebeat/input/httpjson/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,38 +56,63 @@

package httpjson

import (
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

// chainConfig for chain request.
// Following contains basic call structure for each call after normal httpjson
// call.
type chainConfig struct {
Step stepConfig `config:"step" validate:"required"`
Step *stepConfig `config:"step,omitempty"`
While *whileConfig `config:"while,omitempty"`
}

// stepConfig will contain basic properties like, request.url,
// request.method and replace parameter. Each step: request.url
// will contain replace string with original URL to make a skeleton for the
// call request.
type stepConfig struct {
Request requestChainConfig `config:"request"`
Auth *authConfig `config:"auth"`
Request requestConfig `config:"request" validate:"required"`
Response responseChainConfig `config:"response,omitempty"`
Replace string `config:"replace,omitempty"`
}

type requestChainConfig struct {
URL *urlConfig `config:"url" validate:"required"`
Method string `config:"method" validate:"required"`
Body *mapstr.M `config:"body"`
Transforms transformsConfig `config:"transforms"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`
// whileConfig will contain basic properties like auth parameters, request parameters,
// response parameters , a replace parameter and an expression parameter called till.
// While is similar to stepConfig with the addition of till. Till holds an expression
// and with the combination of "request.retry.max_attempts" retries a request till the
// expression is evaluated to "true" or request.retry.max_attempts is exhausted. If
// request.retry.max_attempts is not specified , the max_attempts is always 1.
type whileConfig struct {
Auth *authConfig `config:"auth"`
Request requestConfig `config:"request" validate:"required"`
Response responseChainConfig `config:"response,omitempty"`
Replace string `config:"replace,omitempty"`
Till *valueTpl `config:"till"`
}

type responseChainConfig struct {
Transforms transformsConfig `config:"transforms"`
Split *splitConfig `config:"split"`
}

func defaultChainConfig() config {
chaincfg := defaultConfig()
chaincfg.Chain = []chainConfig{
{
While: &whileConfig{
Auth: chaincfg.Auth,
Request: *chaincfg.Request,
Response: responseChainConfig{},
Replace: "",
Till: nil,
},
Step: &stepConfig{
Auth: chaincfg.Auth,
Request: *chaincfg.Request,
Response: responseChainConfig{},
Replace: "",
},
},
}

return chaincfg
}
5 changes: 5 additions & 0 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func (c config) Validate() error {
if c.Interval <= 0 {
return errors.New("interval must be greater than 0")
}
for _, v := range c.Chain {
if v.Step == nil && v.While == nil {
return errors.New("both step & while blocks in a chain cannot be empty")
}
}
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/httpjson/config_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ func (c retryConfig) getWaitMin() time.Duration {
}

func (c retryConfig) getWaitMax() time.Duration {
if c.WaitMax == nil {
if c.WaitMax == nil && c.WaitMin == nil {
return 0
} else if c.WaitMax == nil && c.WaitMin != nil {
return *c.WaitMin
}
return *c.WaitMax
}
Expand Down
6 changes: 5 additions & 1 deletion x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ func run(
return err
}

requestFactory := newRequestFactory(config, log)
requestFactory, err := newRequestFactory(stdCtx, config, log)
if err != nil {
log.Errorf("Error while creating requestFactory: %v", err)
return err
}
pagination := newPagination(config, httpClient, log)
responseProcessor := newResponseProcessor(config, pagination, log)
requester := newRequester(httpClient, requestFactory, responseProcessor, log)
Expand Down
120 changes: 120 additions & 0 deletions x-pack/filebeat/input/httpjson/policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package httpjson

import (
"bytes"
"context"
"crypto/x509"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"regexp"

"github.com/elastic/elastic-agent-libs/logp"
)

var (

// A regular expression to match the error returned by net/http when the
// configured number of redirects is exhausted. This error isn't typed
// specifically so we resort to matching on the error string.
redirectsErrorRe = regexp.MustCompile(`stopped after \d+ redirects\z`)

// A regular expression to match the error returned by net/http when the
// scheme specified in the URL is invalid. This error isn't typed
// specifically so we resort to matching on the error string.
schemeErrorRe = regexp.MustCompile(`unsupported protocol scheme`)
)

// Evaluate is a template expression evaluation function which accepts a
// valid go text/template expression and evaluates the expected field value to the
// field value present in data using the defined operator/function in the given expression.
// Example : [[ eq .last_response.body.status "completed" ]] -- which means here data is a http response
// containing a field "status" under the field "body" , and value status should be equal to the string "completed"
type Evaluate func(expression *valueTpl, data []byte, log *logp.Logger) (bool, error)

// Responsible for maintaining different http client policies
// Currently just contains a retry policy function
type Policy struct {
fn Evaluate
expression *valueTpl
log *logp.Logger
}

func newHTTPPolicy(fn Evaluate, expression *valueTpl, log *logp.Logger) *Policy {
return &Policy{
fn: fn,
expression: expression,
log: log,
}
}

// CustomRetryPolicy provides a custom callback for Client.CheckRetry, which
// will retry on connection errors and server errors.
func (p *Policy) CustomRetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error) {
// do not retry on context.Canceled or context.DeadlineExceeded
if ctx.Err() != nil {
return false, ctx.Err()
}

if err != nil {
var v *url.Error
if errors.As(err, &v) {
// Don't retry if the error was due to too many redirects.
if redirectsErrorRe.MatchString(v.Error()) {
return false, nil
}

// Don't retry if the error was due to an invalid protocol scheme.
if schemeErrorRe.MatchString(v.Error()) {
return false, nil
}

// Don't retry if the error was due to TLS cert verification failure.
var k x509.UnknownAuthorityError
if errors.As(v.Err, &k) {
return false, nil
}
}

// The error is likely recoverable so retry.
return true, nil
}

// Check the response code. We retry on 500-range responses to allow
// the server time to recover, as 500's are typically not permanent
// errors and may relate to outages on the server side. This will catch
// invalid response codes as well, like 0 and 999.
if resp.StatusCode == 0 || (resp.StatusCode >= 500 && resp.StatusCode != 501) {
return true, nil
}

// Evaluate custom expression
if p.fn != nil && p.expression != nil {
var retry bool

body, err := io.ReadAll(resp.Body)
if err != nil {
return retry, fmt.Errorf("failed to read http response body : %w", err)
}

result, err := p.fn(p.expression, body, p.log)
if err != nil {
return retry, err
}

if !result {
retry = true
}
resp.Body = io.NopCloser(bytes.NewBuffer(body))

return retry, nil
}

return false, nil
}
Loading