Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[elastic/beats] [HttpJson] - Improves request chaining with new 'while' step & client creation per step #32222

Merged
merged 15 commits into from
Jul 14, 2022
Merged
182 changes: 170 additions & 12 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ The state has the following elements:
- `first_event`: A map representing the first event sent to the output (result from applying transforms to `last_response.body`).
- `last_event`: A map representing the last event of the current request in the requests chain (result from applying transforms to `last_response.body`).
- `url`: The last requested URL as a raw https://pkg.go.dev/net/url#URL[`url.URL`] Go type.
- `header`: A map containing the headers. References the next request headers when used in <<request-transforms>> or <<response-pagination>> configuration sections, and to the last response headers when used in <<response-transforms>>, <<response-split>>, or <<request-rate-limit>> configuration sections.
- `body`: A map containing the body. References the next request body when used in <<request-transforms>> or <<response-pagination>> configuration sections, and to the last response body when used in <<response-transforms>> or <<response-split>> configuration sections.
- `header`: A map containing the headers. References the next request headers when used in <<request-transforms-headers>> or <<response-pagination>> configuration sections, and to the last response headers when used in <<response-transforms>>, <<response-split>>, or <<request-rate-limit>> configuration sections.
- `body`: A map containing the body. References the next request body when used in <<request-transforms-headers>> or <<response-pagination>> configuration sections, and to the last response body when used in <<response-transforms>> or <<response-split>> configuration sections.
- `cursor`: A map containing any data the user configured to be stored between restarts (See <<cursor>>).

All of the mentioned objects are only stored at runtime, except `cursor`, which has values that are persisted between restarts.
Expand Down Expand Up @@ -501,7 +501,7 @@ If the `remaining` header is missing from the Response, no rate-limiting will oc
The value of the response that specifies the epoch time when the rate limit will reset.
It is defined with a Go template value. Can read state from: [`.last_response.header`]

[[request-transforms]]
[[request-transforms-headers]]
[float]
==== `request.rate_limit.early_limit`

Expand All @@ -521,6 +521,7 @@ e.g. instead of rate-limiting when `remaining` hits `0`, rate-limiting will occu

It is not set by default (by default the rate-limiting as specified in the Response is followed).

[[request-transforms]]
[float]
==== `request.transforms`

Expand Down Expand Up @@ -607,7 +608,7 @@ Defines the field type of the target. Allowed values: `array`, `map`, `string`.
[float]
==== `response.split[].transforms`

A set of transforms can be defined. This list will be applied after `response.transforms` and after the object has been modified based on `response.split[].keep_parent` and `response.split[].key_field`.
A set of transforms can be defined. This list will be applied after <<response-transforms, response.transforms>> and after the object has been modified based on `response.split[].keep_parent` and `response.split[].key_field`.

Available transforms for response: [`append`, `delete`, `set`].

Expand Down Expand Up @@ -651,7 +652,7 @@ If set to true, the values in `request.body` are sent for pagination requests. D
[float]
==== `response.pagination`

List of transforms that will be applied to the response to every new page request. All the transforms from `request.transform` will be executed and then `response.pagination` will be added to modify the next request as needed. For subsequent responses, the usual `response.transforms` and `response.split` will be executed normally.
List of transforms that will be applied to the response to every new page request. All the transforms from `request.transform` will be executed and then `response.pagination` will be added to modify the next request as needed. For subsequent responses, the usual <<response-transforms, response.transforms>> and <<response-split, response.split>> will be executed normally.

Available transforms for pagination: [`append`, `delete`, `set`].

Expand Down Expand Up @@ -967,7 +968,7 @@ Contains basic request and response configuration for chained calls.
[float]
==== `chain[].step.request`

Please refer <<request-parameters,request parameters>>. Required.
See <<request-parameters,request parameters>>. Required.

Example:

Expand All @@ -980,12 +981,14 @@ Third call: https://example.com/services/data/v1.0/export_ids/file_1/info
[float]
==== `chain[].step.response.split`

Please refer <<response-split,response split parameter>>.
See <<response-split,response split parameter>>.

+
[[chain-step-replace]]
[float]
==== `chain[].step.replace`

A [JSONPath](https://goessner.net/articles/JsonPath/index.html#e2[JSONPath]) string to parse values from responses JSON, collected from previous chain steps. Place same replace string in url where collected values from previous call should be placed. Required.
A link:https://goessner.net/articles/JsonPath/index.html#e2[JSONPath] string to parse values from responses JSON, collected from previous chain steps. Place same replace string in url where collected values from previous call should be placed. Required.

Example:

Expand Down Expand Up @@ -1091,7 +1094,162 @@ request_url using file_name as 'file_2': https://example.com/services/data/v1.0/
+
Collect and make events from response in any format supported by httpjson for all calls.

NOTE: httpjson chain will only create and ingest events from last call on chained configurations. Also, the current chain only supports the following: `request.url`, `request.method`, `request.transforms`, `request.body`, `response.transforms` and `response.split`.
[float]
==== `chain[].while`

Contains basic request and response configuration for chained while calls. Chained while calls will keep making the requests for a given number of times until a condition is met
or the maximum number of attempts gets exhausted. While chain has an attribute `until` which holds the expression to be evaluated. Ideally the `until` field should always be used
together with the attributes `request.retry.max_attempts` and `request.retry.wait_min` which specifies the maximum number of attempts to evaluate `until` before giving up and the
maximum wait time in between such requests. If `request.retry.max_attempts` is not specified, it will only try to evaluate the expression once and give up if it fails. If
`request.retry.wait_min` is not specified the default wait time will always be `0` as in successive calls will be made immediately.

[float]
==== `chain[].while.request`

See <<request-parameters,request parameters>> .

Example:

First call: http://example.com/services/data/v1.0/exports

Second call: http://example.com/services/data/v1.0/9ef0e6a5/export_ids/status

Third call: http://example.com/services/data/v1.0/export_ids/1/info

[float]
==== `chain[].while.response.split`

See <<response-split,response split parameter>> .

[float]
==== `chain[].while.replace`

See <<chain-step-replace, chain[].step.replace>> .

Example:

- First call: http://example.com/services/data/v1.0/exports
+
<<response-json1-while,response>>

- Second call: http://example.com/services/data/v1.0/`$.exportId`/export_ids/status
+
<<response-json2-while,response>>

- Third call: http://example.com/services/data/v1.0/export_ids/`$.files[:].id`/info
+
<<response-json3-while,response 1>> , <<response-json4-while,response 2>>

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: httpjson
enabled: true
# first call
id: my-httpjson-id
request.url: http://example.com/services/data/v1.0/exports
interval: 1h
chain:
# second call
- while:
request.url: http://example.com/services/data/v1.0/$.exportId/export_ids/status
request.method: GET
replace: $.exportId
until: '[[ eq .last_response.body.status "completed" ]]'
request.retry.max_attempts: 5
request.retry.wait_min: 5s
# third call
- step:
request.url: http://example.com/services/data/v1.0/export_ids/$.files[:].id/info
request.method: GET
replace: $.files[:].id
----

Example:

- First call to collect export ids

+
request_url: https://example.com/services/data/v1.0/exports

+
response_json:

+
[[response-json1-while]]
["source","json",subs="attributes"]
----
{
"exportId": "9ef0e6a5"
}
----

- Second call to collect `file_ids` using collected id from first call when `response.body.sataus == "completed"`. This call continues until the condition is satisfied or the maximum number of attempts gets exhausted.

+
request_url using id as '9ef0e6a5': https://example.com/services/data/v1.0/9ef0e6a5/export_ids/status

+
response_json using id as '9ef0e6a5':

+
[[response-json2-while]]
["source","json",subs="attributes"]
----
{
"status": "completed",
"files": [
{
"id": 1
},
{
"id": 2
},
{
"id": 3
}
]
}
----

- Third call to collect `files` using collected `file_id` from second call.

+
request_url using file_id as '1': https://example.com/services/data/v1.0/export_ids/1/info

+
request_url using file_id as '2': https://example.com/services/data/v1.0/export_ids/2/info

+
response_json using id as '1':

+
[[response-json3-while]]
["source","json",subs="attributes"]
----
{
"file_name": "file_1",
"file_data": "some data"
}
----

+
response_json using id as '2':

+
[[response-json4-while]]
["source","json",subs="attributes"]
----
{
"file_name": "file_2",
"file_data": "some data"
}
----

+
Collect and make events from response in any format supported by httpjson for all calls.

NOTE: httpjson chain will only create and ingest events from last call on chained configurations. Also, the current chain only supports the following: all <<request-parameters, request parameters>>, <<response-transforms, response.transforms>> and <<response-split, response.split>>.

[[cursor]]
[float]
Expand Down Expand Up @@ -1134,11 +1292,11 @@ filebeat.inputs:
image:images/input-httpjson-lifecycle.png[Request lifecycle]

. At every defined interval a new request is created.
. The request is transformed using the configured `request.transforms`.
. The request is transformed using the configured <<request-transforms, request.transforms>>.
. The resulting transformed request is executed.
. The server responds (here is where any retry or rate limit policy takes place when configured).
. The response is transformed using the configured `response.transforms` and `response.split`.
. If a chain step is configured. Each step will generate new requests based on collected IDs from responses. The requests will be transformed using configured `request.transforms` and the resulting generated transformed requests will be executed. This process will happen for all the steps mentioned in the chain.
. The response is transformed using the configured <<response-transforms, response.transforms>> and <<response-split, response.split>>.
. If a chain step is configured. Each step will generate new requests based on collected IDs from responses. The requests will be transformed using configured <<request-transforms, request.transforms>> and the resulting generated transformed requests will be executed. This process will happen for all the steps mentioned in the chain.
. Each resulting event is published to the output.
. If a `response.pagination` is configured and there are more pages, a new request is created using it, otherwise the process ends until the next interval.

Expand Down
50 changes: 36 additions & 14 deletions x-pack/filebeat/input/httpjson/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,38 +56,60 @@

package httpjson

import (
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

// chainConfig for chain request.
// Following contains basic call structure for each call after normal httpjson
// call.
type chainConfig struct {
Step stepConfig `config:"step" validate:"required"`
Step *stepConfig `config:"step,omitempty"`
While *whileConfig `config:"while,omitempty"`
}

// stepConfig will contain basic properties like, request.url,
// request.method and replace parameter. Each step: request.url
// will contain replace string with original URL to make a skeleton for the
// call request.
type stepConfig struct {
Request requestChainConfig `config:"request"`
Auth *authConfig `config:"auth"`
Request requestConfig `config:"request" validate:"required"`
Response responseChainConfig `config:"response,omitempty"`
Replace string `config:"replace,omitempty"`
}

type requestChainConfig struct {
URL *urlConfig `config:"url" validate:"required"`
Method string `config:"method" validate:"required"`
Body *mapstr.M `config:"body"`
Transforms transformsConfig `config:"transforms"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`
// whileConfig will contain basic properties like auth parameters, request parameters,
// response parameters , a replace parameter and an expression parameter called 'until'.
// While is similar to stepConfig with the addition of 'until'. 'until' holds an expression
// and with the combination of "request.retry.max_attempts" retries a request 'until' the
// expression is evaluated to "true" or request.retry.max_attempts is exhausted. If
// request.retry.max_attempts is not specified , the max_attempts is always 1.
type whileConfig struct {
Auth *authConfig `config:"auth"`
Request requestConfig `config:"request" validate:"required"`
Response responseChainConfig `config:"response,omitempty"`
Replace string `config:"replace,omitempty"`
Until *valueTpl `config:"until" validate:"required"`
}

type responseChainConfig struct {
Transforms transformsConfig `config:"transforms"`
Split *splitConfig `config:"split"`
}

func defaultChainConfig() config {
chaincfg := defaultConfig()
chaincfg.Chain = []chainConfig{
{
While: &whileConfig{
Auth: chaincfg.Auth,
Request: *chaincfg.Request,
Response: responseChainConfig{},
},
Step: &stepConfig{
Auth: chaincfg.Auth,
Request: *chaincfg.Request,
Response: responseChainConfig{},
},
},
}

return chaincfg
}
5 changes: 5 additions & 0 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func (c config) Validate() error {
if c.Interval <= 0 {
return errors.New("interval must be greater than 0")
}
for _, v := range c.Chain {
if v.Step == nil && v.While == nil {
return errors.New("both step & while blocks in a chain cannot be empty")
}
}
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/httpjson/config_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ func (c retryConfig) getWaitMin() time.Duration {
}

func (c retryConfig) getWaitMax() time.Duration {
if c.WaitMax == nil {
if c.WaitMax == nil && c.WaitMin == nil {
return 0
} else if c.WaitMax == nil && c.WaitMin != nil {
return *c.WaitMin
}
return *c.WaitMax
}
Expand Down
6 changes: 5 additions & 1 deletion x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ func run(
return err
}

requestFactory := newRequestFactory(config, log)
requestFactory, err := newRequestFactory(stdCtx, config, log)
if err != nil {
log.Errorf("Error while creating requestFactory: %v", err)
return err
}
pagination := newPagination(config, httpClient, log)
responseProcessor := newResponseProcessor(config, pagination, log)
requester := newRequester(httpClient, requestFactory, responseProcessor, log)
Expand Down
Loading