Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multiline: introduce static cri parser backend, providing 9mb/s performance gain over current regex cri parser #9418

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#define FLB_PARSER_JSON 2
#define FLB_PARSER_LTSV 3
#define FLB_PARSER_LOGFMT 4
#define FLB_PARSER_CRI 5

struct flb_parser_types {
char *key;
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ if(FLB_PARSER)
flb_parser_regex.c
flb_parser_json.c
flb_parser_decoder.c
flb_parser_cri.c
flb_parser_ltsv.c
flb_parser_logfmt.c
)
Expand Down
11 changes: 11 additions & 0 deletions src/flb_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ int flb_parser_logfmt_do(struct flb_parser *parser,
void **out_buf, size_t *out_size,
struct flb_time *out_time);

int flb_parser_cri_do(struct flb_parser *parser,
const char *buf, size_t length,
void **out_buf, size_t *out_size,
struct flb_time *out_time);
/*
* This function is used to free all aspects of a parser
* which is provided by the caller of flb_create_parser.
Expand Down Expand Up @@ -202,6 +206,9 @@ struct flb_parser *flb_parser_create(const char *name, const char *format,
else if (strcasecmp(format, "logfmt") == 0) {
p->type = FLB_PARSER_LOGFMT;
}
else if (strcasecmp(format, "cri") == 0) {
p->type = FLB_PARSER_CRI;
}
else {
flb_error("[parser:%s] Invalid format %s", name, format);
mk_list_del(&p->_head);
Expand Down Expand Up @@ -1061,6 +1068,10 @@ int flb_parser_do(struct flb_parser *parser, const char *buf, size_t length,
return flb_parser_logfmt_do(parser, buf, length,
out_buf, out_size, out_time);
}
else if (parser->type == FLB_PARSER_CRI) {
return flb_parser_cri_do(parser, buf, length,
out_buf, out_size, out_time);
}

return -1;
}
Expand Down
146 changes: 146 additions & 0 deletions src/flb_parser_cri.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2024 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#define _GNU_SOURCE
#include <time.h>

#include <fluent-bit/flb_parser.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_parser_decoder.h>
#include <fluent-bit/flb_unescape.h>
#include <fluent-bit/flb_mem.h>

#define CRI_SPACE_DELIM ' '

int flb_parser_cri_do(struct flb_parser *parser,
const char *in_buf, size_t in_size,
void **out_buf, size_t *out_size,
struct flb_time *out_time)
{
int ret;
time_t time_lookup = 0;
double tmfrac = 0;
msgpack_sbuffer tmp_sbuf;
msgpack_packer tmp_pck;
char *dec_out_buf;
size_t dec_out_size;
size_t map_size = 4; /* always 4 fields for CRI */
char *time_key;
size_t time_key_len;
char* end_of_line = NULL;
char* token_end = NULL;

if (parser->time_key) {
time_key = parser->time_key;
}
else {
time_key = "time";
}
time_key_len = strlen(time_key);

/* Time */
token_end = strchr(in_buf, CRI_SPACE_DELIM);

/* after we find 'time' field (which is variable length),
* we also check that we have enough room for static size fields
* - 1 space + stream (6 chars) + 1 space
* - _p (1 char) + 1 space
* = 10 characters past 'time' field
*/
if (token_end == NULL || token_end-in_buf+10 > in_size) {
return -1;
}

struct flb_tm tm = {0};
ret = flb_parser_time_lookup(in_buf, token_end-in_buf,
0, parser, &tm, &tmfrac);
if (ret == -1) {
flb_error("[parser:%s] Invalid time format %s",
parser->name, parser->time_fmt_full);
return -1;
}

/* Prepare new outgoing buffer, then add time to it */
msgpack_sbuffer_init(&tmp_sbuf);
msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
msgpack_pack_map(&tmp_pck, map_size);

msgpack_pack_str(&tmp_pck, time_key_len);
msgpack_pack_str_body(&tmp_pck, time_key, time_key_len);
msgpack_pack_str(&tmp_pck, token_end-in_buf);
msgpack_pack_str_body(&tmp_pck, in_buf, token_end-in_buf);
token_end = token_end + 1; /* time + a space */

/* Stream */
if (!(!strncmp(token_end, "stdout ", 7) || !strncmp(token_end, "stderr ", 7))) {
msgpack_sbuffer_destroy(&tmp_sbuf);
return -1;
}

msgpack_pack_str(&tmp_pck, 6);
msgpack_pack_str_body(&tmp_pck, "stream", 6);
msgpack_pack_str(&tmp_pck, 6);
msgpack_pack_str_body(&tmp_pck, token_end, 6);
token_end = token_end + 7; /* stream + a space */

/* Partial/Full Indicator (P|F) */
if (!(!strncmp(token_end, "F ", 2) || !strncmp(token_end, "P ", 2))) {
msgpack_sbuffer_destroy(&tmp_sbuf);
return -1;
}
msgpack_pack_str(&tmp_pck, 2);
msgpack_pack_str_body(&tmp_pck, "_p", 2);
msgpack_pack_str(&tmp_pck, 1);
msgpack_pack_str_body(&tmp_pck, token_end, 1);
token_end = token_end + 2; /* indicator + a space */

/* Log */
end_of_line = strpbrk(token_end, "\r\n");
if (end_of_line == NULL || end_of_line-token_end > in_size) {
end_of_line = (char *)in_buf+in_size;
}

msgpack_pack_str(&tmp_pck, 3);
msgpack_pack_str_body(&tmp_pck, "log", 3);
msgpack_pack_str(&tmp_pck, end_of_line-token_end);
msgpack_pack_str_body(&tmp_pck, token_end, end_of_line-token_end);

/* Export results */
time_lookup = flb_parser_tm2time(&tm, parser->time_system_timezone);
out_time->tm.tv_sec = time_lookup;
out_time->tm.tv_nsec = (tmfrac * 1000000000);

*out_buf = tmp_sbuf.data;
*out_size = tmp_sbuf.size;

/* Check if some decoder was specified */
if (parser->decoders) {
ret = flb_parser_decoder_do(parser->decoders,
tmp_sbuf.data, tmp_sbuf.size,
&dec_out_buf, &dec_out_size);
if (ret == 0) {
*out_buf = dec_out_buf;
*out_size = dec_out_size;
msgpack_sbuffer_destroy(&tmp_sbuf);
}
}

return end_of_line-in_buf;
}
1 change: 0 additions & 1 deletion src/multiline/flb_ml_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <fluent-bit/flb_log.h>
#include <fluent-bit/multiline/flb_ml.h>
#include <fluent-bit/multiline/flb_ml_rule.h>
#include <fluent-bit/multiline/flb_ml_mode.h>
#include <fluent-bit/multiline/flb_ml_group.h>

int flb_ml_parser_init(struct flb_ml_parser *ml_parser)
Expand Down
11 changes: 4 additions & 7 deletions src/multiline/flb_ml_parser_cri.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@
#include <fluent-bit/multiline/flb_ml.h>
#include <fluent-bit/multiline/flb_ml_parser.h>

#define FLB_ML_CRI_REGEX \
"^(?<time>.+?) (?<stream>stdout|stderr) (?<_p>F|P) (?<log>.*)$"
#define FLB_ML_CRI_TIME \
"%Y-%m-%dT%H:%M:%S.%L%z"

/* Creates a parser for Docker */
/* Creates a parser for CRI */
static struct flb_parser *cri_parser_create(struct flb_config *config)
{
struct flb_parser *p;

p = flb_parser_create("_ml_cri", /* parser name */
"regex", /* backend type */
FLB_ML_CRI_REGEX, /* regex */
"cri", /* backend type */
NULL, /* regex */
FLB_FALSE, /* skip_empty */
FLB_ML_CRI_TIME, /* time format */
"time", /* time key */
Expand All @@ -49,13 +47,12 @@ static struct flb_parser *cri_parser_create(struct flb_config *config)
return p;
}

/* Our first multiline mode: 'docker' */
struct flb_ml_parser *flb_ml_parser_cri(struct flb_config *config)
{
struct flb_parser *parser;
struct flb_ml_parser *mlp;

/* Create a Docker parser */
/* Create a CRI parser */
parser = cri_parser_create(config);
if (!parser) {
return NULL;
Expand Down
Loading