Skip to content

Commit

Permalink
[elastic/beats] [HttpJson] - Improves request chaining with new 'whil…
Browse files Browse the repository at this point in the history
…e' step & client creation per step (#32222)

* initial commit

* While block introduced in chain config, per step creation of httpclient added as an improvement.

* removed debug code

* added auth inheriting from parent

* updated docs

* made linter recommended fixes

* made recommended changes as per PR suggetions

* added tests

* fixed errors for linting checks

* eliminated specifying nill values

* made till , a mandatory parameter for whileconfig

* Update x-pack/filebeat/input/httpjson/policy.go

Co-authored-by: Adrian Serrano <[email protected]>

* Update x-pack/filebeat/docs/inputs/input-httpjson.asciidoc

Co-authored-by: Adrian Serrano <[email protected]>

* updated docs , made PR fixes

* updated the docs as per suggetions

Co-authored-by: Adrian Serrano <[email protected]>
  • Loading branch information
2 people authored and chrisberkhout committed Jun 1, 2023
1 parent 76b958e commit 1912a03
Show file tree
Hide file tree
Showing 12 changed files with 820 additions and 55 deletions.
182 changes: 170 additions & 12 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ The state has the following elements:
- `first_event`: A map representing the first event sent to the output (result from applying transforms to `last_response.body`).
- `last_event`: A map representing the last event of the current request in the requests chain (result from applying transforms to `last_response.body`).
- `url`: The last requested URL as a raw https://pkg.go.dev/net/url#URL[`url.URL`] Go type.
- `header`: A map containing the headers. References the next request headers when used in <<request-transforms>> or <<response-pagination>> configuration sections, and to the last response headers when used in <<response-transforms>>, <<response-split>>, or <<request-rate-limit>> configuration sections.
- `body`: A map containing the body. References the next request body when used in <<request-transforms>> or <<response-pagination>> configuration sections, and to the last response body when used in <<response-transforms>> or <<response-split>> configuration sections.
- `header`: A map containing the headers. References the next request headers when used in <<request-transforms-headers>> or <<response-pagination>> configuration sections, and to the last response headers when used in <<response-transforms>>, <<response-split>>, or <<request-rate-limit>> configuration sections.
- `body`: A map containing the body. References the next request body when used in <<request-transforms-headers>> or <<response-pagination>> configuration sections, and to the last response body when used in <<response-transforms>> or <<response-split>> configuration sections.
- `cursor`: A map containing any data the user configured to be stored between restarts (See <<cursor>>).

All of the mentioned objects are only stored at runtime, except `cursor`, which has values that are persisted between restarts.
Expand Down Expand Up @@ -501,7 +501,7 @@ If the `remaining` header is missing from the Response, no rate-limiting will oc
The value of the response that specifies the epoch time when the rate limit will reset.
It is defined with a Go template value. Can read state from: [`.last_response.header`]

[[request-transforms]]
[[request-transforms-headers]]
[float]
==== `request.rate_limit.early_limit`

Expand All @@ -521,6 +521,7 @@ e.g. instead of rate-limiting when `remaining` hits `0`, rate-limiting will occu

It is not set by default (by default the rate-limiting as specified in the Response is followed).

[[request-transforms]]
[float]
==== `request.transforms`

Expand Down Expand Up @@ -607,7 +608,7 @@ Defines the field type of the target. Allowed values: `array`, `map`, `string`.
[float]
==== `response.split[].transforms`

A set of transforms can be defined. This list will be applied after `response.transforms` and after the object has been modified based on `response.split[].keep_parent` and `response.split[].key_field`.
A set of transforms can be defined. This list will be applied after <<response-transforms, response.transforms>> and after the object has been modified based on `response.split[].keep_parent` and `response.split[].key_field`.

Available transforms for response: [`append`, `delete`, `set`].

Expand Down Expand Up @@ -651,7 +652,7 @@ If set to true, the values in `request.body` are sent for pagination requests. D
[float]
==== `response.pagination`

List of transforms that will be applied to the response to every new page request. All the transforms from `request.transform` will be executed and then `response.pagination` will be added to modify the next request as needed. For subsequent responses, the usual `response.transforms` and `response.split` will be executed normally.
List of transforms that will be applied to the response to every new page request. All the transforms from `request.transform` will be executed and then `response.pagination` will be added to modify the next request as needed. For subsequent responses, the usual <<response-transforms, response.transforms>> and <<response-split, response.split>> will be executed normally.

Available transforms for pagination: [`append`, `delete`, `set`].

Expand Down Expand Up @@ -967,7 +968,7 @@ Contains basic request and response configuration for chained calls.
[float]
==== `chain[].step.request`

Please refer <<request-parameters,request parameters>>. Required.
See <<request-parameters,request parameters>>. Required.

Example:

Expand All @@ -980,12 +981,14 @@ Third call: https://example.com/services/data/v1.0/export_ids/file_1/info
[float]
==== `chain[].step.response.split`

Please refer <<response-split,response split parameter>>.
See <<response-split,response split parameter>>.

+
[[chain-step-replace]]
[float]
==== `chain[].step.replace`

A [JSONPath](https://goessner.net/articles/JsonPath/index.html#e2[JSONPath]) string to parse values from responses JSON, collected from previous chain steps. Place same replace string in url where collected values from previous call should be placed. Required.
A link:https://goessner.net/articles/JsonPath/index.html#e2[JSONPath] string to parse values from responses JSON, collected from previous chain steps. Place same replace string in url where collected values from previous call should be placed. Required.

Example:

Expand Down Expand Up @@ -1091,7 +1094,162 @@ request_url using file_name as 'file_2': https://example.com/services/data/v1.0/
+
Collect and make events from response in any format supported by httpjson for all calls.

NOTE: httpjson chain will only create and ingest events from last call on chained configurations. Also, the current chain only supports the following: `request.url`, `request.method`, `request.transforms`, `request.body`, `response.transforms` and `response.split`.
[float]
==== `chain[].while`

Contains basic request and response configuration for chained while calls. Chained while calls will keep making the requests for a given number of times until a condition is met
or the maximum number of attempts gets exhausted. While chain has an attribute `until` which holds the expression to be evaluated. Ideally the `until` field should always be used
together with the attributes `request.retry.max_attempts` and `request.retry.wait_min` which specifies the maximum number of attempts to evaluate `until` before giving up and the
maximum wait time in between such requests. If `request.retry.max_attempts` is not specified, it will only try to evaluate the expression once and give up if it fails. If
`request.retry.wait_min` is not specified the default wait time will always be `0` as in successive calls will be made immediately.

[float]
==== `chain[].while.request`

See <<request-parameters,request parameters>> .

Example:

First call: http://example.com/services/data/v1.0/exports

Second call: http://example.com/services/data/v1.0/9ef0e6a5/export_ids/status

Third call: http://example.com/services/data/v1.0/export_ids/1/info

[float]
==== `chain[].while.response.split`

See <<response-split,response split parameter>> .

[float]
==== `chain[].while.replace`

See <<chain-step-replace, chain[].step.replace>> .

Example:

- First call: http://example.com/services/data/v1.0/exports
+
<<response-json1-while,response>>

- Second call: http://example.com/services/data/v1.0/`$.exportId`/export_ids/status
+
<<response-json2-while,response>>

- Third call: http://example.com/services/data/v1.0/export_ids/`$.files[:].id`/info
+
<<response-json3-while,response 1>> , <<response-json4-while,response 2>>

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: httpjson
enabled: true
# first call
id: my-httpjson-id
request.url: http://example.com/services/data/v1.0/exports
interval: 1h
chain:
# second call
- while:
request.url: http://example.com/services/data/v1.0/$.exportId/export_ids/status
request.method: GET
replace: $.exportId
until: '[[ eq .last_response.body.status "completed" ]]'
request.retry.max_attempts: 5
request.retry.wait_min: 5s
# third call
- step:
request.url: http://example.com/services/data/v1.0/export_ids/$.files[:].id/info
request.method: GET
replace: $.files[:].id
----

Example:

- First call to collect export ids

+
request_url: https://example.com/services/data/v1.0/exports

+
response_json:

+
[[response-json1-while]]
["source","json",subs="attributes"]
----
{
"exportId": "9ef0e6a5"
}
----

- Second call to collect `file_ids` using collected id from first call when `response.body.sataus == "completed"`. This call continues until the condition is satisfied or the maximum number of attempts gets exhausted.

+
request_url using id as '9ef0e6a5': https://example.com/services/data/v1.0/9ef0e6a5/export_ids/status

+
response_json using id as '9ef0e6a5':

+
[[response-json2-while]]
["source","json",subs="attributes"]
----
{
"status": "completed",
"files": [
{
"id": 1
},
{
"id": 2
},
{
"id": 3
}
]
}
----

- Third call to collect `files` using collected `file_id` from second call.

+
request_url using file_id as '1': https://example.com/services/data/v1.0/export_ids/1/info

+
request_url using file_id as '2': https://example.com/services/data/v1.0/export_ids/2/info

+
response_json using id as '1':

+
[[response-json3-while]]
["source","json",subs="attributes"]
----
{
"file_name": "file_1",
"file_data": "some data"
}
----

+
response_json using id as '2':

+
[[response-json4-while]]
["source","json",subs="attributes"]
----
{
"file_name": "file_2",
"file_data": "some data"
}
----

+
Collect and make events from response in any format supported by httpjson for all calls.

NOTE: httpjson chain will only create and ingest events from last call on chained configurations. Also, the current chain only supports the following: all <<request-parameters, request parameters>>, <<response-transforms, response.transforms>> and <<response-split, response.split>>.

[[cursor]]
[float]
Expand Down Expand Up @@ -1134,11 +1292,11 @@ filebeat.inputs:
image:images/input-httpjson-lifecycle.png[Request lifecycle]

. At every defined interval a new request is created.
. The request is transformed using the configured `request.transforms`.
. The request is transformed using the configured <<request-transforms, request.transforms>>.
. The resulting transformed request is executed.
. The server responds (here is where any retry or rate limit policy takes place when configured).
. The response is transformed using the configured `response.transforms` and `response.split`.
. If a chain step is configured. Each step will generate new requests based on collected IDs from responses. The requests will be transformed using configured `request.transforms` and the resulting generated transformed requests will be executed. This process will happen for all the steps mentioned in the chain.
. The response is transformed using the configured <<response-transforms, response.transforms>> and <<response-split, response.split>>.
. If a chain step is configured. Each step will generate new requests based on collected IDs from responses. The requests will be transformed using configured <<request-transforms, request.transforms>> and the resulting generated transformed requests will be executed. This process will happen for all the steps mentioned in the chain.
. Each resulting event is published to the output.
. If a `response.pagination` is configured and there are more pages, a new request is created using it, otherwise the process ends until the next interval.

Expand Down
50 changes: 36 additions & 14 deletions x-pack/filebeat/input/httpjson/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,38 +56,60 @@

package httpjson

import (
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

// chainConfig for chain request.
// Following contains basic call structure for each call after normal httpjson
// call.
type chainConfig struct {
Step stepConfig `config:"step" validate:"required"`
Step *stepConfig `config:"step,omitempty"`
While *whileConfig `config:"while,omitempty"`
}

// stepConfig will contain basic properties like, request.url,
// request.method and replace parameter. Each step: request.url
// will contain replace string with original URL to make a skeleton for the
// call request.
type stepConfig struct {
Request requestChainConfig `config:"request"`
Auth *authConfig `config:"auth"`
Request requestConfig `config:"request" validate:"required"`
Response responseChainConfig `config:"response,omitempty"`
Replace string `config:"replace,omitempty"`
}

type requestChainConfig struct {
URL *urlConfig `config:"url" validate:"required"`
Method string `config:"method" validate:"required"`
Body *mapstr.M `config:"body"`
Transforms transformsConfig `config:"transforms"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`
// whileConfig will contain basic properties like auth parameters, request parameters,
// response parameters , a replace parameter and an expression parameter called 'until'.
// While is similar to stepConfig with the addition of 'until'. 'until' holds an expression
// and with the combination of "request.retry.max_attempts" retries a request 'until' the
// expression is evaluated to "true" or request.retry.max_attempts is exhausted. If
// request.retry.max_attempts is not specified , the max_attempts is always 1.
type whileConfig struct {
Auth *authConfig `config:"auth"`
Request requestConfig `config:"request" validate:"required"`
Response responseChainConfig `config:"response,omitempty"`
Replace string `config:"replace,omitempty"`
Until *valueTpl `config:"until" validate:"required"`
}

type responseChainConfig struct {
Transforms transformsConfig `config:"transforms"`
Split *splitConfig `config:"split"`
}

func defaultChainConfig() config {
chaincfg := defaultConfig()
chaincfg.Chain = []chainConfig{
{
While: &whileConfig{
Auth: chaincfg.Auth,
Request: *chaincfg.Request,
Response: responseChainConfig{},
},
Step: &stepConfig{
Auth: chaincfg.Auth,
Request: *chaincfg.Request,
Response: responseChainConfig{},
},
},
}

return chaincfg
}
5 changes: 5 additions & 0 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func (c config) Validate() error {
if c.Interval <= 0 {
return errors.New("interval must be greater than 0")
}
for _, v := range c.Chain {
if v.Step == nil && v.While == nil {
return errors.New("both step & while blocks in a chain cannot be empty")
}
}
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/httpjson/config_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ func (c retryConfig) getWaitMin() time.Duration {
}

func (c retryConfig) getWaitMax() time.Duration {
if c.WaitMax == nil {
if c.WaitMax == nil && c.WaitMin == nil {
return 0
} else if c.WaitMax == nil && c.WaitMin != nil {
return *c.WaitMin
}
return *c.WaitMax
}
Expand Down
6 changes: 5 additions & 1 deletion x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ func run(
return err
}

requestFactory := newRequestFactory(config, log)
requestFactory, err := newRequestFactory(stdCtx, config, log)
if err != nil {
log.Errorf("Error while creating requestFactory: %v", err)
return err
}
pagination := newPagination(config, httpClient, log)
responseProcessor := newResponseProcessor(config, pagination, log)
requester := newRequester(httpClient, requestFactory, responseProcessor, log)
Expand Down
Loading

0 comments on commit 1912a03

Please sign in to comment.