From a2426285f1b39184b543042c3000012ec156a426 Mon Sep 17 00:00:00 2001 From: ryanohnemus Date: Mon, 23 Sep 2024 14:25:53 -0500 Subject: [PATCH 1/5] multiline: cri parser backend, do not use regex Signed-off-by: ryanohnemus --- include/fluent-bit/flb_parser.h | 1 + src/CMakeLists.txt | 1 + src/flb_parser.c | 11 +++ src/flb_parser_cri.c | 139 ++++++++++++++++++++++++++++++ src/multiline/flb_ml_parser.c | 1 - src/multiline/flb_ml_parser_cri.c | 11 +-- 6 files changed, 156 insertions(+), 8 deletions(-) create mode 100644 src/flb_parser_cri.c diff --git a/include/fluent-bit/flb_parser.h b/include/fluent-bit/flb_parser.h index afc8159b100..44ab2122479 100644 --- a/include/fluent-bit/flb_parser.h +++ b/include/fluent-bit/flb_parser.h @@ -31,6 +31,7 @@ #define FLB_PARSER_JSON 2 #define FLB_PARSER_LTSV 3 #define FLB_PARSER_LOGFMT 4 +#define FLB_PARSER_CRI 5 struct flb_parser_types { char *key; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b3df3ebbb91..9826c8ed527 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -126,6 +126,7 @@ if(FLB_PARSER) flb_parser_regex.c flb_parser_json.c flb_parser_decoder.c + flb_parser_cri.c flb_parser_ltsv.c flb_parser_logfmt.c ) diff --git a/src/flb_parser.c b/src/flb_parser.c index 149ab7f1b91..f035a66cf4c 100644 --- a/src/flb_parser.c +++ b/src/flb_parser.c @@ -112,6 +112,10 @@ int flb_parser_logfmt_do(struct flb_parser *parser, void **out_buf, size_t *out_size, struct flb_time *out_time); +int flb_parser_cri_do(struct flb_parser *parser, + const char *buf, size_t length, + void **out_buf, size_t *out_size, + struct flb_time *out_time); /* * This function is used to free all aspects of a parser * which is provided by the caller of flb_create_parser. @@ -202,6 +206,9 @@ struct flb_parser *flb_parser_create(const char *name, const char *format, else if (strcasecmp(format, "logfmt") == 0) { p->type = FLB_PARSER_LOGFMT; } + else if (strcasecmp(format, "cri") == 0) { + p->type = FLB_PARSER_CRI; + } else { flb_error("[parser:%s] Invalid format %s", name, format); mk_list_del(&p->_head); @@ -1061,6 +1068,10 @@ int flb_parser_do(struct flb_parser *parser, const char *buf, size_t length, return flb_parser_logfmt_do(parser, buf, length, out_buf, out_size, out_time); } + else if (parser->type == FLB_PARSER_CRI) { + return flb_parser_cri_do(parser, buf, length, + out_buf, out_size, out_time); + } return -1; } diff --git a/src/flb_parser_cri.c b/src/flb_parser_cri.c new file mode 100644 index 00000000000..1be879094f9 --- /dev/null +++ b/src/flb_parser_cri.c @@ -0,0 +1,139 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define _GNU_SOURCE +#include + +#include +#include +#include +#include +#include +#include + +#define CRI_SPACE_DELIM " " + +int flb_parser_cri_do(struct flb_parser *parser, + const char *in_buf, size_t in_size, + void **out_buf, size_t *out_size, + struct flb_time *out_time) +{ + int ret; + time_t time_lookup = 0; + double tmfrac = 0; + msgpack_sbuffer tmp_sbuf; + msgpack_packer tmp_pck; + char *dec_out_buf; + size_t dec_out_size; + size_t map_size = 4; /* always 4 fields for CRI */ + char *time_key; + size_t time_key_len; + char* end_of_line = NULL; + char* token_end = NULL; + + if (parser->time_key) { + time_key = parser->time_key; + } + else { + time_key = "time"; + } + time_key_len = strlen(time_key); + + /* Prepare new outgoing buffer */ + msgpack_sbuffer_init(&tmp_sbuf); + msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); + msgpack_pack_map(&tmp_pck, map_size); + + /* Time */ + token_end = strpbrk(in_buf, CRI_SPACE_DELIM); + if (token_end == NULL) { + msgpack_sbuffer_destroy(&tmp_sbuf); + return -1; + } + token_end = token_end + 1; /* time + a space */ + + struct flb_tm tm = {0}; + ret = flb_parser_time_lookup(in_buf, token_end-in_buf-1, + 0, parser, &tm, &tmfrac); + if (ret == -1) { + flb_error("[parser:%s] Invalid time format %s", + parser->name, parser->time_fmt_full); + return -1; + } + + msgpack_pack_str(&tmp_pck, time_key_len); + msgpack_pack_str_body(&tmp_pck, time_key, time_key_len); + msgpack_pack_str(&tmp_pck, token_end-in_buf-1); + msgpack_pack_str_body(&tmp_pck, in_buf, token_end-in_buf-1); + + /* Stream */ + if (!(!strncmp(token_end, "stdout ", 7) || !strncmp(token_end, "stderr ", 7))) { + msgpack_sbuffer_destroy(&tmp_sbuf); + return -1; + } + + msgpack_pack_str(&tmp_pck, 6); + msgpack_pack_str_body(&tmp_pck, "stream", 6); + msgpack_pack_str(&tmp_pck, 6); + msgpack_pack_str_body(&tmp_pck, token_end, 6); + token_end = token_end + 7; /* stream + a space */ + + /* Partial/Full Indicator (P|F) */ + if (!(!strncmp(token_end, "F ", 2) || !strncmp(token_end, "P ", 2))) { + msgpack_sbuffer_destroy(&tmp_sbuf); + return -1; + } + msgpack_pack_str(&tmp_pck, 2); + msgpack_pack_str_body(&tmp_pck, "_p", 2); + msgpack_pack_str(&tmp_pck, 1); + msgpack_pack_str_body(&tmp_pck, token_end, 1); + token_end = token_end + 2; /* indicator + a space */ + + /* Log */ + end_of_line = strpbrk(token_end, "\n"); + if (end_of_line == NULL ) { + end_of_line = in_buf+in_size; + } + msgpack_pack_str(&tmp_pck, 3); + msgpack_pack_str_body(&tmp_pck, "log", 3); + msgpack_pack_str(&tmp_pck, end_of_line-token_end); + msgpack_pack_str_body(&tmp_pck, token_end, end_of_line-token_end); + + /* Export results */ + time_lookup = flb_parser_tm2time(&tm, parser->time_system_timezone); + out_time->tm.tv_sec = time_lookup; + out_time->tm.tv_nsec = (tmfrac * 1000000000); + + *out_buf = tmp_sbuf.data; + *out_size = tmp_sbuf.size; + + /* Check if some decoder was specified */ + if (parser->decoders) { + ret = flb_parser_decoder_do(parser->decoders, + tmp_sbuf.data, tmp_sbuf.size, + &dec_out_buf, &dec_out_size); + if (ret == 0) { + *out_buf = dec_out_buf; + *out_size = dec_out_size; + msgpack_sbuffer_destroy(&tmp_sbuf); + } + } + + return end_of_line-in_buf; +} diff --git a/src/multiline/flb_ml_parser.c b/src/multiline/flb_ml_parser.c index db0c8f2f983..57ef1258de1 100644 --- a/src/multiline/flb_ml_parser.c +++ b/src/multiline/flb_ml_parser.c @@ -21,7 +21,6 @@ #include #include #include -#include #include int flb_ml_parser_init(struct flb_ml_parser *ml_parser) diff --git a/src/multiline/flb_ml_parser_cri.c b/src/multiline/flb_ml_parser_cri.c index e4bd6a0e0f3..60fd73ac27c 100644 --- a/src/multiline/flb_ml_parser_cri.c +++ b/src/multiline/flb_ml_parser_cri.c @@ -21,19 +21,17 @@ #include #include -#define FLB_ML_CRI_REGEX \ - "^(?