Skip to content

Commit

Permalink
[O11y][MongoDB Atlas] Fix error handling mechanism in disk, hardware …
Browse files Browse the repository at this point in the history
…and process data streams (elastic#11708)

* Fix error handling mechanism in disk, hardware and process data streams

* update changelog.yml

* minor change in indentation!

* address review comments

* address review comments
  • Loading branch information
harnish-elastic authored Jan 3, 2025
1 parent a877eff commit 15418df
Show file tree
Hide file tree
Showing 7 changed files with 451 additions and 387 deletions.
5 changes: 5 additions & 0 deletions packages/mongodb_atlas/changelog.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# newer versions go on top
- version: "0.1.1"
changes:
- description: Fix error handling mechanism in disk, hardware and process data streams.
type: bugfix
link: https://github.com/elastic/integrations/pull/11708
- version: "0.1.0"
changes:
- description: Add observability category.
Expand Down
234 changes: 140 additions & 94 deletions packages/mongodb_atlas/data_stream/disk/agent/stream/input.yml.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -35,132 +35,178 @@ state:
query: /measurements?granularity=PT{{period}}&period=PT{{period}}
redact:
fields: ~
program: |
program: |
(
(has(state.host_list) && size(state.host_list) > 0) ?
state
:
:
state.with(
request(
"GET",
state.url.trim_right("/") + "/api/atlas/v2/groups/" + state.group_id + "/processes?" + {
"pageNum": [string(state.page_num)],
"itemsPerPage": ["100"],
}.format_query()
).with({
}.format_query()
).with(
{
"Header": {
"Accept": ["application/vnd.atlas." + string(timestamp(now).getFullYear()) + "-01-01+json"],
},
}).do_request().as(resp, (resp.StatusCode == 200) ?
}
).do_request().as(resp, (resp.StatusCode == 200) ?
bytes(resp.Body).decode_json().as(body,
{
"host_list": body.results.map(e, state.url.trim_right("/") + "/api/atlas/v2/groups/" + state.group_id + "/processes/" + e.id + "/disks/"),
"next": 0,
"page_num": body.links.exists_one(res, res.rel == "next") ? (int(state.page_num) + 1) : 1,
})
:
{
"events": {
}
)
:
bytes(resp.Body).decode_json().as(body, (body != null) ?
{
"events": [
{
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET:" +
(
(size(resp.Body) != 0) ?
string(resp.Body)
:
string(resp.Status) + " (" + string(resp.StatusCode) + ")"
),
"code": body.error,
"message": body.detail,
},
},
],
"want_more": false,
}
)
)
).as(state, (state.next >= size(state.host_list)) ? {} :
(
(has(state.disk_list) && size(state.disk_list) > 0) ?
state
:
state.with(
request("GET", string(state.host_list[state.next] + "?pageNum=" + string(state.disk_page_num) + "&itemsPerPage=100"))
.with({
"Header": {
"Accept": ["application/vnd.atlas." + string(timestamp(now).getFullYear()) + "-01-01+json"],
},
}).do_request().as(resp, (resp.StatusCode == 200) ?
bytes(resp.Body).decode_json().as(body,
{
"disk_list": body.results.map(e, e.partitionName),
"disk_next": 0,
"disk_page_num": body.links.exists_one(res, res.rel == "next") ? (int(state.disk_page_num) + 1) : 1,
}
)
:
:
{
"events": [
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET:" +
(
(size(resp.Body) != 0) ?
string(resp.Body)
:
string(resp.Status) + " (" + string(resp.StatusCode) + ")"
),
},
"error": {
"code": resp.StatusCode,
"message": resp.Status,
},
"want_more": false,
}
)
},
],
"want_more": false,
}
)
).as(state, (state.disk_next >= size(state.disk_list)) ? {} :
request("GET", string(state.host_list[state.next] + state.disk_list[state.disk_next] + state.query))
.with({
"Header": {
"Accept": ["application/vnd.atlas." + string(timestamp(now).getFullYear()) + "-01-01+json"],
},
}).do_request().as(res, (res.StatusCode == 200) ?
)
)
).as(state, !has(state.next) ?
state
: (state.next >= size(state.host_list)) ?
{}
:
state.with(
request("GET", string(state.host_list[state.next] + "?pageNum=" + string(state.disk_page_num) + "&itemsPerPage=100")).with(
{
"Header": {
"Accept": ["application/vnd.atlas." + string(timestamp(now).getFullYear()) + "-01-01+json"],
},
}
).do_request().as(resp, (resp.StatusCode == 200) ?
bytes(resp.Body).decode_json().as(body,
{
"events": [bytes(res.Body).decode_json().as(f,
f.with(
{
"response": zip(
// Combining measurement names and actual values of measurement to generate `key : value` pairs.
f.measurements.map(m, m.name),
f.measurements.map(m, m.dataPoints.map(d, d.value).as(v, (size(v) == 0) ? null : (v[0])))
),
}
).drop(["measurements", "links"])
)],
"disk_list": (int(state.disk_next) + 1 < size(state.disk_list)) ? state.disk_list : [],
"disk_next": (int(state.disk_next) + 1 < size(state.disk_list)) ? (int(state.disk_next) + 1) : 0,
"disk_page_num": state.disk_page_num,
"host_list": (int(state.next) + 1 >= size(state.host_list) && int(state.disk_page_num) == 1 && int(state.disk_next) + 1 >= size(state.disk_list)) ? [] : state.host_list,
"next": (int(state.disk_next) + 1 >= size(state.disk_list) && int(state.disk_page_num) == 1 && int(state.next) + 1 < size(state.host_list)) ? (int(state.next) + 1) : int(state.next),
"want_more": int(state.next) + 1 < size(state.host_list) || int(state.page_num) != 1 || int(state.disk_next) + 1 < size(state.disk_list) || int(state.disk_page_num) != 1,
"page_num": state.page_num,
"group_id": state.group_id,
"query": state.query,
"disk_list": body.results.map(e, e.partitionName),
"disk_next": 0,
"disk_page_num": body.links.exists_one(res, res.rel == "next") ? (int(state.disk_page_num) + 1) : 1,
}
)
:
bytes(resp.Body).decode_json().as(body, (body != null) ?
{
"events": [
{
"error": {
"code": body.error,
"message": body.detail,
},
},
],
"want_more": false,
}
:
{
"events": {
"error": {
"code": string(res.StatusCode),
"id": string(res.Status),
"message": "GET:" +
(
(size(res.Body) != 0) ?
string(res.Body)
:
string(res.Status) + " (" + string(res.StatusCode) + ")"
),
"events": [
{
"error": {
"code": resp.StatusCode,
"message": resp.Status,
},
},
},
],
"want_more": false,
}
)
)
)
).as(state, !has(state.disk_next) ?
state
: (state.disk_next >= size(state.disk_list)) ?
{}
:
request("GET", string(state.host_list[state.next] + state.disk_list[state.disk_next] + state.query)).with(
{
"Header": {
"Accept": ["application/vnd.atlas." + string(timestamp(now).getFullYear()) + "-01-01+json"],
},
}
).do_request().as(resp, (resp.StatusCode == 200) ?
bytes(resp.Body).decode_json().as(body,
{
"events": [
body.with(
{
"response": zip(
// Combining measurement names and actual values of measurement to generate `key : value` pairs.
body.measurements.map(m, m.name),
body.measurements.map(m, m.dataPoints.map(d, d.value).as(v, (size(v) == 0) ? null : (v[0])))
),
}
).drop(["measurements", "links"]),
],
"disk_list": (int(state.disk_next) + 1 < size(state.disk_list)) ? state.disk_list : [],
"disk_next": (int(state.disk_next) + 1 < size(state.disk_list)) ? (int(state.disk_next) + 1) : 0,
"disk_page_num": state.disk_page_num,
"host_list":
(
int(state.next) + 1 >= size(state.host_list) && int(state.disk_page_num) == 1 &&
int(state.disk_next) + 1 >= size(state.disk_list)
) ? [] : state.host_list,
"next":
(
int(state.disk_next) + 1 >= size(state.disk_list) && int(state.disk_page_num) == 1 &&
int(state.next) + 1 < size(state.host_list)
) ? (int(state.next) + 1) : int(state.next),
"want_more": int(state.next) + 1 < size(state.host_list) || int(state.page_num) != 1 ||
int(state.disk_next) + 1 < size(state.disk_list) || int(state.disk_page_num) != 1,
"page_num": state.page_num,
"group_id": state.group_id,
"query": state.query,
}
)
:
bytes(resp.Body).decode_json().as(body, (body != null) ?
{
"events": [
{
"error": {
"code": body.error,
"message": body.detail,
},
},
],
"want_more": false,
}
:
{
"events": [
{
"error": {
"code": resp.StatusCode,
"message": resp.Status,
},
},
],
"want_more": false,
}
)
)
)
Loading

0 comments on commit 15418df

Please sign in to comment.