Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[elastic/beats] [HttpJson] - Improves request chaining with new 'while' step & client creation per step #32222

Merged
merged 15 commits into from
Jul 14, 2022
Merged
157 changes: 156 additions & 1 deletion x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,162 @@ request_url using file_name as 'file_2': https://example.com/services/data/v1.0/
+
Collect and make events from response in any format supported by httpjson for all calls.

NOTE: httpjson chain will only create and ingest events from last call on chained configurations. Also, the current chain only supports the following: `request.url`, `request.method`, `request.transforms`, `request.body`, `response.transforms` and `response.split`.
[float]
==== `chain[].while`

Contains basic request and response configuration for chained while calls. Chained while calls will keep making the requests for a given number of times until a condition is met
or the maximum number of attempts gets exhausted. While chain has an attribute `until` which holds the expression to be evaluated. Ideally the `until` field should always be used
together with the attributes `request.retry.max_attempts` and `request.retry.wait_min` which specifies the maximum number of attempts to evaluate `until` before giving up and the
maximum wait time in between such requests. If `request.retry.max_attempts` is not specified, it will only try to evaluate the expression once and give up if it fails. If
`request.retry.wait_min` is not specified the default wait time will always be `0` as in successive calls will be made immediately.

[float]
==== `chain[].while.request`

Please refer to <<request-parameters,request parameters>> for more information. Required.

Example:

First call: http://example.com/services/data/v1.0/exports

Second call: http://example.com/services/data/v1.0/9ef0e6a5/export_ids/status

Third call: http://example.com/services/data/v1.0/export_ids/1/info

[float]
==== `chain[].while.response.split`

Please refer <<response-split,response split parameter>>.

[float]
==== `chain[].while.replace`

Please refer `chain[].step.replace`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR still adds a few "Please refer X" expressions. My understanding is that those are not correct, please correct me if I'm wrong.

I lean towards using "See X" instead of "Please refer to X for more information" as a replacement.

Also, this one uses a `literal` instead of a <<link>>, can it be turned into a link?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adriansr updated the docs as per the suggestions. Please review once.


Example:

- First call: http://example.com/services/data/v1.0/exports
+
<<response-json1-while,response>>

- Second call: http://example.com/services/data/v1.0/`$.exportId`/export_ids/status
+
<<response-json2-while,response>>

- Third call: http://example.com/services/data/v1.0/export_ids/`$.files[:].id`/info
+
<<response-json3-while,response 1>> , <<response-json4-while,response 2>>

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: httpjson
enabled: true
# first call
id: my-httpjson-id
request.url: http://example.com/services/data/v1.0/exports
interval: 1h
chain:
# second call
- while:
request.url: http://example.com/services/data/v1.0/$.exportId/export_ids/status
request.method: GET
replace: $.exportId
until: '[[ eq .last_response.body.status "completed" ]]'
request.retry.max_attempts: 5
request.retry.wait_min: 5s
# third call
- step:
request.url: http://example.com/services/data/v1.0/export_ids/$.files[:].id/info
request.method: GET
replace: $.files[:].id
----

Example:

- First call to collect export ids

+
request_url: https://example.com/services/data/v1.0/exports

+
response_json:

+
[[response-json1-while]]
["source","json",subs="attributes"]
----
{
"exportId": "9ef0e6a5"
}
----

- Second call to collect `file_ids` using collected id from first call when `response.body.sataus == "completed"`. This call continues until the condition is satisfied or the maximum number of attempts gets exhausted.

+
request_url using id as '9ef0e6a5': https://example.com/services/data/v1.0/9ef0e6a5/export_ids/status

+
response_json using id as '9ef0e6a5':

+
[[response-json2-while]]
["source","json",subs="attributes"]
----
{
"status": "completed",
"files": [
{
"id": 1
},
{
"id": 2
},
{
"id": 3
}
]
}
----

- Third call to collect `files` using collected `file_id` from second call.

+
request_url using file_id as '1': https://example.com/services/data/v1.0/export_ids/1/info

+
request_url using file_id as '2': https://example.com/services/data/v1.0/export_ids/2/info

+
response_json using id as '1':

+
[[response-json3-while]]
["source","json",subs="attributes"]
----
{
"file_name": "file_1",
"file_data": "some data"
}
----

+
response_json using id as '2':

+
[[response-json4-while]]
["source","json",subs="attributes"]
----
{
"file_name": "file_2",
"file_data": "some data"
}
----

+
Collect and make events from response in any format supported by httpjson for all calls.

NOTE: httpjson chain will only create and ingest events from last call on chained configurations. Also, the current chain only supports the following: all <<request-parameters, request parameters>>, `response.transforms` and `response.split`.

[[cursor]]
[float]
Expand Down
50 changes: 36 additions & 14 deletions x-pack/filebeat/input/httpjson/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,38 +56,60 @@

package httpjson

import (
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

// chainConfig for chain request.
// Following contains basic call structure for each call after normal httpjson
// call.
type chainConfig struct {
Step stepConfig `config:"step" validate:"required"`
Step *stepConfig `config:"step,omitempty"`
While *whileConfig `config:"while,omitempty"`
}

// stepConfig will contain basic properties like, request.url,
// request.method and replace parameter. Each step: request.url
// will contain replace string with original URL to make a skeleton for the
// call request.
type stepConfig struct {
Request requestChainConfig `config:"request"`
Auth *authConfig `config:"auth"`
Request requestConfig `config:"request" validate:"required"`
Response responseChainConfig `config:"response,omitempty"`
Replace string `config:"replace,omitempty"`
}

type requestChainConfig struct {
URL *urlConfig `config:"url" validate:"required"`
Method string `config:"method" validate:"required"`
Body *mapstr.M `config:"body"`
Transforms transformsConfig `config:"transforms"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`
// whileConfig will contain basic properties like auth parameters, request parameters,
// response parameters , a replace parameter and an expression parameter called 'until'.
// While is similar to stepConfig with the addition of 'until'. 'until' holds an expression
// and with the combination of "request.retry.max_attempts" retries a request 'until' the
// expression is evaluated to "true" or request.retry.max_attempts is exhausted. If
// request.retry.max_attempts is not specified , the max_attempts is always 1.
type whileConfig struct {
Auth *authConfig `config:"auth"`
Request requestConfig `config:"request" validate:"required"`
Response responseChainConfig `config:"response,omitempty"`
Replace string `config:"replace,omitempty"`
Until *valueTpl `config:"until" validate:"required"`
}

type responseChainConfig struct {
Transforms transformsConfig `config:"transforms"`
Split *splitConfig `config:"split"`
}

func defaultChainConfig() config {
chaincfg := defaultConfig()
chaincfg.Chain = []chainConfig{
{
While: &whileConfig{
Auth: chaincfg.Auth,
Request: *chaincfg.Request,
Response: responseChainConfig{},
},
Step: &stepConfig{
Auth: chaincfg.Auth,
Request: *chaincfg.Request,
Response: responseChainConfig{},
},
},
}

return chaincfg
}
5 changes: 5 additions & 0 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func (c config) Validate() error {
if c.Interval <= 0 {
return errors.New("interval must be greater than 0")
}
for _, v := range c.Chain {
if v.Step == nil && v.While == nil {
return errors.New("both step & while blocks in a chain cannot be empty")
}
}
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/httpjson/config_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ func (c retryConfig) getWaitMin() time.Duration {
}

func (c retryConfig) getWaitMax() time.Duration {
if c.WaitMax == nil {
if c.WaitMax == nil && c.WaitMin == nil {
return 0
} else if c.WaitMax == nil && c.WaitMin != nil {
return *c.WaitMin
}
return *c.WaitMax
}
Expand Down
6 changes: 5 additions & 1 deletion x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ func run(
return err
}

requestFactory := newRequestFactory(config, log)
requestFactory, err := newRequestFactory(stdCtx, config, log)
if err != nil {
log.Errorf("Error while creating requestFactory: %v", err)
return err
}
pagination := newPagination(config, httpClient, log)
responseProcessor := newResponseProcessor(config, pagination, log)
requester := newRequester(httpClient, requestFactory, responseProcessor, log)
Expand Down
Loading