From d7101fdc077ddd2f938f81f196bc90c7d6a8b7f0 Mon Sep 17 00:00:00 2001 From: Luc Perkins Date: Mon, 18 Oct 2021 08:37:14 -0700 Subject: [PATCH] Add CSV enrichment example with schema Signed-off-by: Luc Perkins --- csv-enrichment/data/codes.csv | 4 +++ csv-enrichment/vector.toml | 53 ++++++++++++++++++++++++++++++----- 2 files changed, 50 insertions(+), 7 deletions(-) create mode 100644 csv-enrichment/data/codes.csv diff --git a/csv-enrichment/data/codes.csv b/csv-enrichment/data/codes.csv new file mode 100644 index 0000000..6b4d588 --- /dev/null +++ b/csv-enrichment/data/codes.csv @@ -0,0 +1,4 @@ +code,tag,message +1,"EPERM","Operation not permitted" +2,"ENOENT","No such file or directory" +3,"ESRCH","No such process" diff --git a/csv-enrichment/vector.toml b/csv-enrichment/vector.toml index 0171b5d..03a3cfc 100644 --- a/csv-enrichment/vector.toml +++ b/csv-enrichment/vector.toml @@ -1,9 +1,26 @@ +# User info table (all fields are string, thus no schema) [enrichment_tables.users] type = "file" -file.path = "/var/lib/vector/data/users.csv" -file.encoding.type = "csv" -[sources.random] +[enrichment_tables.users.file] +path = "/var/lib/vector/data/users.csv" +encoding = { type = "csv" } + +# Error codes table (with specified schema) +[enrichment_tables.codes] +type = "file" + +[enrichment_tables.codes.file] +path = "/var/lib/vector/data/codes.csv" +encoding = { type = "csv" } + +[enrichment_tables.codes.schema] +code = "integer" +tag = "string" +message = "string" + +# Generate user info messages +[sources.random_user_info] type = "generator" format = "shuffle" lines = [ @@ -15,15 +32,37 @@ lines = [ ] interval = 2 -[transforms.remap] +# Generate coded error messages +[sources.coded_error_messages] +type = "generator" +format = "shuffle" +lines = [ + '{"code":1,"device_id":"e5ad503d","timestamp":"2021-10-18T15:35:09.158139Z"}', + '{"code":2,"device_id":"a5b2401e","timestamp":"2021-10-18T15:35:28.517210Z"}', + '{"code":3,"device_id":"b48f41aa","timestamp":"2021-10-18T15:35:37.846783Z"}' +] +interval = 2 + +[transforms.remap_random_user_info] type = "remap" -inputs = ["random"] +inputs = ["random_user_info"] source = """ . = parse_json(.message) ?? {} . |= get_enrichment_table_record("users", { "last_name": .last_name, "first_name": .first_name }) ?? {} """ +[transforms.remap_coded_errors] +type = "remap" +inputs = ["coded_error_messages"] +source = """ +. = parse_json(.message) ?? {} + +row = get_enrichment_table_record!("codes", { "code": del(.code) }) +.message = row.message +.tag = row.tag +""" + [sinks.console] type = "console" -inputs = ["remap"] -encoding.codec = "json" +inputs = ["remap_*"] +encoding = { codec = "json" }