Skip to content

Commit

Permalink
wiz: fix state retention between iterations and refactor cel programs (
Browse files Browse the repository at this point in the history
…#10278)

* fix whitespace and correct state retention behaviour
* simplify timestamp formating
* fix timestamp use
* use optional types to simplify expressions
* simplify condition
* be more robust with url handling
* handle http errors in cel and cel errors in ingest pipeline
  • Loading branch information
efd6 authored Jun 30, 2024
1 parent ed0c201 commit 3936e82
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 223 deletions.
8 changes: 8 additions & 0 deletions packages/wiz/changelog.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
# newer versions go on top
- version: "1.3.0"
changes:
- description: Improve HTTP error handling and code clarity.
type: enhancement
link: https://github.com/elastic/integrations/pull/10278
- description: Fix state retention between iterations.
type: bugfix
link: https://github.com/elastic/integrations/pull/10278
- version: "1.2.0"
changes:
- description: Removed import_mappings. Update the kibana constraint to ^8.13.0. Modified the field definitions to remove ECS fields made redundant by the ecs@mappings component template.
Expand Down
131 changes: 57 additions & 74 deletions packages/wiz/data_stream/audit/agent/stream/cel.yml.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -53,85 +53,68 @@ state:
}
}
program: |
post_request(
state.url + "/graphql",
"application/json",
{
"query": state.query,
"variables": {
state.with(
post_request(
state.url.trim_right("/") + "/graphql",
"application/json",
{
"query": state.query,
"variables": {
"first": state.batch_size,
"after": (has(state.end_cursor) && has(state.end_cursor.value) && state.end_cursor.value != null ? state.end_cursor.value : null),
"after": state.?end_cursor.value.orValue(null),
"filterBy": {
"timestamp": {
"after":
(
has(state.want_more) && !state.want_more
?
(
has(state.cursor) && has(state.cursor.last_timestamp) && state.cursor.last_timestamp != null
?
state.cursor.last_timestamp
:
(now() - duration(state.initial_interval)).format(time_layout.RFC3339)
)
:
(
has(state.cursor) && has(state.cursor.first_timestamp) && state.cursor.first_timestamp != null
?
state.cursor.first_timestamp
:
null
)
) }
}
"after": state.want_more ?
state.?cursor.first_timestamp.orValue(null)
:
state.?cursor.last_timestamp.orValue(string(now() - duration(state.initial_interval)))
}
}
}
}.encode_json()
).do_request().as(resp, bytes(resp.Body).decode_json().as(body, {
"events": body.data.auditLogEntries.nodes.map(e, {
"message": e.encode_json(),
}),
"cursor": {
"last_timestamp": (
has(body.data.auditLogEntries.nodes) && body.data.auditLogEntries.nodes.size() > 0
?
(
has(state.cursor) && has(state.cursor.last_timestamp) && body.data.auditLogEntries.nodes.map(e, e.timestamp).max() < state.cursor.last_timestamp
?
state.cursor.last_timestamp
:
body.data.auditLogEntries.nodes.map(e, e.timestamp).max()
)
:
(
has(state.cursor) && has(state.cursor.last_timestamp)
?
state.cursor.last_timestamp
:
null
)
),
"first_timestamp": (
has(state.cursor) && has(state.cursor.first_timestamp) && has(body.data) && state.cursor.first_timestamp != null
?
( body.data.auditLogEntries.pageInfo.hasNextPage ? state.cursor.first_timestamp : state.cursor.last_timestamp )
:
(now() - duration(state.initial_interval)).format(time_layout.RFC3339)
),
},
"end_cursor": {
"value": (
has(body.data) && has(body.data.auditLogEntries) && has(body.data.auditLogEntries.pageInfo) && has(body.data.auditLogEntries.pageInfo.hasNextPage) && body.data.auditLogEntries.pageInfo.hasNextPage
?
body.data.auditLogEntries.pageInfo.endCursor
:
null
)
},
"query": state.query,
"url": state.url,
"want_more": body.data.auditLogEntries.pageInfo.hasNextPage,
"batch_size": state.batch_size,
}))
).do_request().as(resp, resp.StatusCode == 200 ?
bytes(resp.Body).decode_json().as(body, {
"events": body.data.auditLogEntries.nodes.map(e, {
"message": e.encode_json(),
}),
"cursor": {
?"last_timestamp": has(body.data.auditLogEntries.nodes) && body.data.auditLogEntries.nodes.size() > 0 ?
optional.of(body.data.auditLogEntries.nodes.map(e, timestamp(e.timestamp)).max().as(last,
has(state.?cursor.last_timestamp) && last < timestamp(state.cursor.last_timestamp) ?
state.cursor.last_timestamp
:
string(last)
))
:
state.?cursor.last_timestamp,
?"first_timestamp": !has(body.data) || state.?cursor.first_timestamp.orValue(null) == null ?
optional.of(string(now() - duration(state.initial_interval)))
: body.data.auditLogEntries.pageInfo.hasNextPage ?
state.?cursor.first_timestamp
:
state.?cursor.last_timestamp,
},
"end_cursor": {
?"value": body.?data.auditLogEntries.pageInfo.hasNextPage.orValue(false) ?
body.?data.auditLogEntries.pageInfo.endCursor
:
optional.none()
},
"want_more": body.?data.auditLogEntries.pageInfo.hasNextPage.orValue(false),
})
:
{
"events": [{
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "POST:"+string(resp.Body)
},
}],
"want_more": false,
}
)
)
tags:
{{#if preserve_original_event}}
- preserve_original_event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ processors:
field: ecs.version
tag: set_ecs_version
value: '8.11.0'
- fail:
if: ctx.error?.message != null && ctx.message == null && ctx.event?.original == null
message: error message set and no data to process
- rename:
field: message
tag: rename_message_to_event_original
Expand Down
131 changes: 57 additions & 74 deletions packages/wiz/data_stream/issue/agent/stream/cel.yml.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -120,85 +120,68 @@ state:
}
}
program: |
post_request(
state.url + "/graphql",
"application/json",
{
"query": state.query,
"variables": {
state.with(
post_request(
state.url.trim_right("/") + "/graphql",
"application/json",
{
"query": state.query,
"variables": {
"first": state.batch_size,
"after": (has(state.end_cursor) && has(state.end_cursor.value) && state.end_cursor.value != null ? state.end_cursor.value : null),
"after": state.?end_cursor.value.orValue(null),
"filterBy": {
"statusChangedAt": {
"after":
(
has(state.want_more) && !state.want_more
?
(
has(state.cursor) && has(state.cursor.last_timestamp) && state.cursor.last_timestamp != null
?
state.cursor.last_timestamp
:
(now() - duration(state.initial_interval)).format(time_layout.RFC3339)
)
:
(
has(state.cursor) && has(state.cursor.first_timestamp) && state.cursor.first_timestamp != null
?
state.cursor.first_timestamp
:
null
)
) }
}
"after": state.want_more ?
state.?cursor.first_timestamp.orValue(null)
:
state.?cursor.last_timestamp.orValue(string(now() - duration(state.initial_interval)))
}
}
}
}.encode_json()
).do_request().as(resp, bytes(resp.Body).decode_json().as(body, {
"events": body.data.issues.nodes.map(e, {
"message": e.encode_json(),
}),
"cursor": {
"last_timestamp": (
has(body.data.issues.nodes) && body.data.issues.nodes.size() > 0
?
(
has(state.cursor) && has(state.cursor.last_timestamp) && body.data.issues.nodes.map(e, e.statusChangedAt).max() < state.cursor.last_timestamp
?
state.cursor.last_timestamp
:
body.data.issues.nodes.map(e, e.statusChangedAt).max()
)
:
(
has(state.cursor) && has(state.cursor.last_timestamp)
?
state.cursor.last_timestamp
:
null
)
),
"first_timestamp": (
has(state.cursor) && has(state.cursor.first_timestamp) && has(body.data) && state.cursor.first_timestamp != null
?
( body.data.issues.pageInfo.hasNextPage ? state.cursor.first_timestamp : state.cursor.last_timestamp )
:
(now() - duration(state.initial_interval)).format(time_layout.RFC3339)
),
},
"end_cursor": {
"value": (
has(body.data) && has(body.data.issues) && has(body.data.issues.pageInfo) && has(body.data.issues.pageInfo.hasNextPage) && body.data.issues.pageInfo.hasNextPage
?
body.data.issues.pageInfo.endCursor
:
null
)
},
"query": state.query,
"url": state.url,
"want_more": body.data.issues.pageInfo.hasNextPage,
"batch_size": state.batch_size,
}))
).do_request().as(resp, resp.StatusCode == 200 ?
bytes(resp.Body).decode_json().as(body, {
"events": body.data.issues.nodes.map(e, {
"message": e.encode_json(),
}),
"cursor": {
?"last_timestamp": has(body.data.issues.nodes) && body.data.issues.nodes.size() > 0 ?
optional.of(body.data.issues.nodes.map(e, timestamp(e.statusChangedAt)).max().as(last,
has(state.?cursor.last_timestamp) && last < timestamp(state.cursor.last_timestamp) ?
state.cursor.last_timestamp
:
string(last)
))
:
state.?cursor.last_timestamp,
?"first_timestamp": !has(body.data) || state.?cursor.first_timestamp.orValue(null) == null ?
optional.of(string(now() - duration(state.initial_interval)))
: body.data.issues.pageInfo.hasNextPage ?
state.?cursor.first_timestamp
:
state.?cursor.last_timestamp,
},
"end_cursor": {
?"value": body.?data.issues.pageInfo.hasNextPage.orValue(false) ?
body.?data.issues.pageInfo.endCursor
:
optional.none()
},
"want_more": body.?data.issues.pageInfo.hasNextPage.orValue(false),
})
:
{
"events": [{
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "POST:"+string(resp.Body)
},
}],
"want_more": false,
}
)
)
tags:
{{#if preserve_original_event}}
- preserve_original_event
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
---
description: Pipeline for processing Issue logs.
processors:
- fail:
if: ctx.error?.message != null && ctx.message == null && ctx.event?.original == null
message: error message set and no data to process
- set:
field: ecs.version
tag: set_ecs_version
Expand Down
Loading

0 comments on commit 3936e82

Please sign in to comment.