Skip to content

Commit

Permalink
Add CSV enrichment example with schema
Browse files Browse the repository at this point in the history
Signed-off-by: Luc Perkins <[email protected]>
  • Loading branch information
Luc Perkins committed Oct 18, 2021
1 parent b2d7c25 commit d7101fd
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 7 deletions.
4 changes: 4 additions & 0 deletions csv-enrichment/data/codes.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
code,tag,message
1,"EPERM","Operation not permitted"
2,"ENOENT","No such file or directory"
3,"ESRCH","No such process"
53 changes: 46 additions & 7 deletions csv-enrichment/vector.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
# User info table (all fields are string, thus no schema)
[enrichment_tables.users]
type = "file"
file.path = "/var/lib/vector/data/users.csv"
file.encoding.type = "csv"

[sources.random]
[enrichment_tables.users.file]
path = "/var/lib/vector/data/users.csv"
encoding = { type = "csv" }

# Error codes table (with specified schema)
[enrichment_tables.codes]
type = "file"

[enrichment_tables.codes.file]
path = "/var/lib/vector/data/codes.csv"
encoding = { type = "csv" }

[enrichment_tables.codes.schema]
code = "integer"
tag = "string"
message = "string"

# Generate user info messages
[sources.random_user_info]
type = "generator"
format = "shuffle"
lines = [
Expand All @@ -15,15 +32,37 @@ lines = [
]
interval = 2

[transforms.remap]
# Generate coded error messages
[sources.coded_error_messages]
type = "generator"
format = "shuffle"
lines = [
'{"code":1,"device_id":"e5ad503d","timestamp":"2021-10-18T15:35:09.158139Z"}',
'{"code":2,"device_id":"a5b2401e","timestamp":"2021-10-18T15:35:28.517210Z"}',
'{"code":3,"device_id":"b48f41aa","timestamp":"2021-10-18T15:35:37.846783Z"}'
]
interval = 2

[transforms.remap_random_user_info]
type = "remap"
inputs = ["random"]
inputs = ["random_user_info"]
source = """
. = parse_json(.message) ?? {}
. |= get_enrichment_table_record("users", { "last_name": .last_name, "first_name": .first_name }) ?? {}
"""

[transforms.remap_coded_errors]
type = "remap"
inputs = ["coded_error_messages"]
source = """
. = parse_json(.message) ?? {}
row = get_enrichment_table_record!("codes", { "code": del(.code) })
.message = row.message
.tag = row.tag
"""

[sinks.console]
type = "console"
inputs = ["remap"]
encoding.codec = "json"
inputs = ["remap_*"]
encoding = { codec = "json" }

0 comments on commit d7101fd

Please sign in to comment.