Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[elastic/beats] [HttpJson] - Improves request chaining with new 'while' step & client creation per step #32222

Merged
merged 15 commits into from
Jul 14, 2022
Merged
153 changes: 152 additions & 1 deletion x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,158 @@ request_url using file_name as 'file_2': https://example.com/services/data/v1.0/
+
Collect and make events from response in any format supported by httpjson for all calls.

NOTE: httpjson chain will only create and ingest events from last call on chained configurations. Also, the current chain only supports the following: `request.url`, `request.method`, `request.transforms`, `request.body`, `response.transforms` and `response.split`.
[float]
==== `chain[].while`

Contains basic request and response configuration for chained while calls. Chained while calls keep making the requests for a given number of times until a condition is met.

[float]
==== `chain[].while.request`

Please refer <<request-parameters,request parameters>>. Required.

Example:

First call: http://example.com/services/data/v1.0/exports

Second call: http://example.com/services/data/v1.0/9ef0e6a5/export_ids/status

Third call: http://example.com/services/data/v1.0/export_ids/1/info

[float]
==== `chain[].while.response.split`

Please refer <<response-split,response split parameter>>.

[float]
==== `chain[].while.replace`

Please refer `chain[].step.replace`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR still adds a few "Please refer X" expressions. My understanding is that those are not correct, please correct me if I'm wrong.

I lean towards using "See X" instead of "Please refer to X for more information" as a replacement.

Also, this one uses a `literal` instead of a <<link>>, can it be turned into a link?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adriansr updated the docs as per the suggestions. Please review once.


Example:

- First call: http://example.com/services/data/v1.0/exports
+
<<response-json1-while,response>>

- Second call: http://example.com/services/data/v1.0/`$.exportId`/export_ids/status
+
<<response-json2-while,response>>

- Third call: http://example.com/services/data/v1.0/export_ids/`$.files[:].id`/info
+
<<response-json3-while,response 1>> , <<response-json4-while,response 2>>

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: httpjson
enabled: true
# first call
id: my-httpjson-id
request.url: http://example.com/services/data/v1.0/exports
interval: 1h
chain:
# second call
- while:
request.url: http://example.com/services/data/v1.0/$.exportId/export_ids/status
request.method: GET
replace: $.exportId
till: '[[ eq .last_response.body.status "completed" ]]'
request.retry.max_attempts: 5
request.retry.wait_min: 5s
# third call
- step:
request.url: http://example.com/services/data/v1.0/export_ids/$.files[:].id/info
request.method: GET
replace: $.files[:].id
----

Example:

- First call to collect export ids

+
request_url: https://example.com/services/data/v1.0/exports

+
response_json:

+
[[response-json1-while]]
["source","json",subs="attributes"]
----
{
"exportId": "9ef0e6a5"
}
----

- Second call to collect `file_ids` using collected id from first call when `response.body.sataus == "completed"`. This call continues until the condition is satisfied or the maximum number of attempts gets exhausted.

+
request_url using id as '9ef0e6a5': https://example.com/services/data/v1.0/9ef0e6a5/export_ids/status

+
response_json using id as '9ef0e6a5':

+
[[response-json2-while]]
["source","json",subs="attributes"]
----
{
"status": "completed",
"files": [
{
"id": 1
},
{
"id": 2
},
{
"id": 3
}
]
}
----

- Third call to collect `files` using collected `file_id` from second call.

+
request_url using file_id as '1': https://example.com/services/data/v1.0/export_ids/1/info

+
request_url using file_id as '2': https://example.com/services/data/v1.0/export_ids/2/info

+
response_json using id as '1':

+
[[response-json3-while]]
["source","json",subs="attributes"]
----
{
"file_name": "file_1",
"file_data": "some data"
}
----

+
response_json using id as '2':

+
[[response-json4-while]]
["source","json",subs="attributes"]
----
{
"file_name": "file_2",
"file_data": "some data"
}
----

+
Collect and make events from response in any format supported by httpjson for all calls.

NOTE: httpjson chain will only create and ingest events from last call on chained configurations. Also, the current chain only supports the following: all <<request-parameters, request parameters>>, `response.transforms` and `response.split`.

[[cursor]]
[float]
Expand Down
24 changes: 10 additions & 14 deletions x-pack/filebeat/input/httpjson/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,35 +56,31 @@

package httpjson

import (
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

// chainConfig for chain request.
// Following contains basic call structure for each call after normal httpjson
// call.
type chainConfig struct {
Step stepConfig `config:"step" validate:"required"`
Step *stepConfig `config:"step,omitempty"`
While *whileConfig `config:"while,omitempty"`
}

// stepConfig will contain basic properties like, request.url,
// request.method and replace parameter. Each step: request.url
// will contain replace string with original URL to make a skeleton for the
// call request.
type stepConfig struct {
Request requestChainConfig `config:"request"`
Auth *authConfig `config:"auth"`
Request requestConfig `config:"request" validate:"required"`
Response responseChainConfig `config:"response,omitempty"`
Replace string `config:"replace,omitempty"`
}

type requestChainConfig struct {
URL *urlConfig `config:"url" validate:"required"`
Method string `config:"method" validate:"required"`
Body *mapstr.M `config:"body"`
Transforms transformsConfig `config:"transforms"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`
type whileConfig struct {
Auth *authConfig `config:"auth"`
Request requestConfig `config:"request" validate:"required"`
Response responseChainConfig `config:"response,omitempty"`
Replace string `config:"replace,omitempty"`
Till *valueTpl `config:"till"`
}

type responseChainConfig struct {
Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func (c config) Validate() error {
if c.Interval <= 0 {
return errors.New("interval must be greater than 0")
}
for _, v := range c.Chain {
if v.Step == nil && v.While == nil {
return errors.New("both step & while blocks in a chain cannot be empty")
}
}
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/httpjson/config_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ func (c retryConfig) getWaitMin() time.Duration {
}

func (c retryConfig) getWaitMax() time.Duration {
if c.WaitMax == nil {
if c.WaitMax == nil && c.WaitMin == nil {
return 0
} else if c.WaitMax == nil && c.WaitMin != nil {
return *c.WaitMin
}
return *c.WaitMax
}
Expand Down
6 changes: 5 additions & 1 deletion x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ func run(
return err
}

requestFactory := newRequestFactory(config, log)
requestFactory, err := newRequestFactory(stdCtx, config, log)
if err != nil {
log.Errorf("Error while creating requestFactory: %v", err)
return err
}
pagination := newPagination(config, httpClient, log)
responseProcessor := newResponseProcessor(config, pagination, log)
requester := newRequester(httpClient, requestFactory, responseProcessor, log)
Expand Down
115 changes: 115 additions & 0 deletions x-pack/filebeat/input/httpjson/policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package httpjson

import (
"bytes"
"context"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"regexp"

"github.com/elastic/elastic-agent-libs/logp"
)

var (

// A regular expression to match the error returned by net/http when the
// configured number of redirects is exhausted. This error isn't typed
// specifically so we resort to matching on the error string.
redirectsErrorRe = regexp.MustCompile(`stopped after \d+ redirects\z`)

// A regular expression to match the error returned by net/http when the
// scheme specified in the URL is invalid. This error isn't typed
// specifically so we resort to matching on the error string.
schemeErrorRe = regexp.MustCompile(`unsupported protocol scheme`)
)

type Evaluate func(expression *valueTpl, data []byte, log *logp.Logger) (bool, error)

// Responsible for maintaining different http client policies
// Currently just contains a retry policy function
type Policy struct {
fn Evaluate
expression *valueTpl
log *logp.Logger
}

func newHTTPPolicy(fn Evaluate, expression *valueTpl, log *logp.Logger) *Policy {
return &Policy{
fn: fn,
expression: expression,
log: log,
}
}

// CustomRetryPolicy provides a custom callback for Client.CheckRetry, which
// will retry on connection errors and server errors.
func (p *Policy) CustomRetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error) {
// do not retry on context.Canceled or context.DeadlineExceeded
if ctx.Err() != nil {
return false, ctx.Err()
}

if err != nil {
var v *url.Error
if errors.As(err, &v) {
// Don't retry if the error was due to too many redirects.
if redirectsErrorRe.MatchString(v.Error()) {
return false, nil
}

// Don't retry if the error was due to an invalid protocol scheme.
if schemeErrorRe.MatchString(v.Error()) {
return false, nil
}

// Don't retry if the error was due to TLS cert verification failure.
var k x509.UnknownAuthorityError
if errors.As(v.Err, &k) {
return false, nil
}
}

// The error is likely recoverable so retry.
return true, nil
}

// Check the response code. We retry on 500-range responses to allow
// the server time to recover, as 500's are typically not permanent
// errors and may relate to outages on the server side. This will catch
// invalid response codes as well, like 0 and 999.
if resp.StatusCode == 0 || (resp.StatusCode >= 500 && resp.StatusCode != 501) {
return true, nil
}

// Evaluate custom
if p.fn != nil && p.expression != nil {
var retry bool

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return retry, fmt.Errorf("failed to read http response body : %w", err)
}

result, err := p.fn(p.expression, body, p.log)
if err != nil {
return retry, err
}

if !result {
retry = true
}
resp.Body = ioutil.NopCloser(bytes.NewBuffer(body))

return retry, nil
}

return false, nil
}
Loading