Skip to content

Commit

Permalink
in_systemd: tests: Provide restoring way the previuous behavior
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Dec 24, 2024
1 parent 4a59b02 commit e35f861
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 41 deletions.
196 changes: 155 additions & 41 deletions plugins/in_systemd/systemd.c
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,71 @@ static int systemd_enumerate_data_store(struct flb_config *config,
return -1;
}

static int systemd_process_simple(struct flb_config *config,
struct flb_input_instance *ins,
void *plugin_context,
void *format_context,
const void *data, size_t data_size)
{
int i;
int ret;
int len;
size_t length = data_size;
char *buf = NULL;
const char *sep;
const char *key;
const char *val;
struct flb_systemd_config *ctx = plugin_context;

key = (const char *) data;
sep = strchr(key, '=');
if (sep == NULL) {
return -2;
}

len = (sep - key);

ret = flb_log_event_encoder_append_body_string_length(
ctx->log_encoder, len);

if (ctx->lowercase == FLB_TRUE) {
/*
* Ensure buf to have enough space for the key because the libsystemd
* might return larger data than the threshold.
*/
if (buf == NULL) {
buf = flb_sds_create_len(NULL, ctx->threshold);
}
if (flb_sds_alloc(buf) < len) {
buf = flb_sds_increase(buf, len - flb_sds_alloc(buf));
}
for (i = 0; i < len; i++) {
buf[i] = tolower(key[i]);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_string_body(
ctx->log_encoder, buf, len);
}
}
else {
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_string_body(
ctx->log_encoder, (char *) key, len);
}
}

val = sep + 1;
len = length - (sep - key) - 1;

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_string(
ctx->log_encoder, (char *) val, len);
}

return 0;
}

static int in_systemd_collect(struct flb_input_instance *ins,
struct flb_config *config, void *in_context)
{
Expand Down Expand Up @@ -397,34 +462,64 @@ static int in_systemd_collect(struct flb_input_instance *ins,
/* Pack every field in the entry */
entries = 0;
skip_entries = 0;
while (sd_journal_enumerate_data(ctx->j, &data, &length) > 0 &&
entries < ctx->max_fields) {
key = (const char *) data;
if (ctx->strip_underscores == FLB_TRUE && key[0] == '_') {
key++;
length--;
}
if (ctx->compact_key == FLB_TRUE) {
while (sd_journal_enumerate_data(ctx->j, &data, &length) > 0 &&
entries < ctx->max_fields) {
key = (const char *) data;
if (ctx->strip_underscores == FLB_TRUE && key[0] == '_') {
key++;
length--;
}

ret = systemd_enumerate_data_store(config, ctx->ins,
(void *)ctx, (void *)kvlist,
key, length);
if (ret == -2) {
skip_entries++;
continue;
}
else if (ret == -1) {
continue;
ret = systemd_enumerate_data_store(config, ctx->ins,
(void *)ctx, (void *)kvlist,
key, length);
if (ret == -2) {
skip_entries++;
continue;
}
else if (ret == -1) {
continue;
}

entries++;
}
rows++;

entries++;
/* Interpret cfl_kvlist as logs type of events later. */
ret = append_enumerate_data(ctx, kvlist);

if (kvlist) {
cfl_kvlist_destroy(kvlist);
}
}
rows++;
else {
/* Pack every field in the entry */
while (sd_journal_enumerate_data(ctx->j, &data, &length) > 0 &&
entries < ctx->max_fields) {
key = (const char *) data;
if (ctx->strip_underscores == FLB_TRUE && key[0] == '_') {
key++;
length--;
}

/* Interpret cfl_kvlist as logs type of events later. */
ret = append_enumerate_data(ctx, kvlist);
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = systemd_process_simple(config, ctx->ins,
(void *)ctx, NULL,
key, length);
}

if (kvlist) {
cfl_kvlist_destroy(kvlist);
if (ret == -2) {
skip_entries++;
continue;
}
else if (ret == -1) {
continue;
}

entries++;
}
rows++;
}

if (skip_entries > 0) {
Expand Down Expand Up @@ -668,35 +763,49 @@ static int cb_systemd_format_test(struct flb_config *config,
ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder, &tm);
}

/* create an empty kvlist as the labels */
kvlist = cfl_kvlist_create();
if (!kvlist) {
flb_plg_error(ctx->ins, "error allocating kvlist");
return -1;
}

keys = (const char *) data;
kvs = cfl_utils_split(keys, '\n', -1 );
if (kvs == NULL) {
goto split_error;
}

cfl_list_foreach(head, kvs) {
cur = cfl_list_entry(head, struct cfl_split_entry, _head);
ret = systemd_enumerate_data_store(config, ctx->ins,
(void *)ctx, (void *)kvlist,
cur->value, cur->len);
if (ctx->compact_key == FLB_TRUE) {
/* create an empty kvlist as the labels */
kvlist = cfl_kvlist_create();
if (!kvlist) {
flb_plg_error(ctx->ins, "error allocating kvlist");
return -1;
}

cfl_list_foreach(head, kvs) {
cur = cfl_list_entry(head, struct cfl_split_entry, _head);
ret = systemd_enumerate_data_store(config, ctx->ins,
(void *)ctx, (void *)kvlist,
cur->value, cur->len);

if (ret == -2 || ret == -1) {
continue;
if (ret == -2 || ret == -1) {
continue;
}
}
}

/* Interpret cfl_kvlist as logs type of events later. */
ret = append_enumerate_data(ctx, kvlist);
/* Interpret cfl_kvlist as logs type of events later. */
ret = append_enumerate_data(ctx, kvlist);

if (kvlist) {
cfl_kvlist_destroy(kvlist);
if (kvlist) {
cfl_kvlist_destroy(kvlist);
}
}
else {
cfl_list_foreach(head, kvs) {
cur = cfl_list_entry(head, struct cfl_split_entry, _head);
ret = systemd_process_simple(config, ctx->ins,
(void *)ctx, NULL,
cur->value, cur->len);

if (ret == -2 || ret == -1) {
continue;
}
}
}

if (kvs != NULL) {
Expand Down Expand Up @@ -760,6 +869,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_systemd_config, strip_underscores),
"Strip undersecores from fields"
},
{
FLB_CONFIG_MAP_BOOL, "compact_key", "true",
0, FLB_TRUE, offsetof(struct flb_systemd_config, compact_key),
"Do compaction for dupliucated keys into an array"
},
#ifdef FLB_HAVE_SQLDB
{
FLB_CONFIG_MAP_STR, "db.sync", (char *)NULL,
Expand Down
1 change: 1 addition & 0 deletions plugins/in_systemd/systemd_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct flb_systemd_config {
int max_fields; /* max number of fields per record */
int max_entries; /* max number of records per iteration */
size_t threshold; /* threshold for retriveing journal */
int compact_key; /* Unify deprecated keys into an array */

#ifdef FLB_HAVE_SQLDB
flb_sds_t db_path;
Expand Down
85 changes: 85 additions & 0 deletions tests/runtime/in_systemd.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,40 @@ static void cb_check_cfl_variant_properties(void *ctx, int ffd,
flb_sds_destroy(output);
}

static void cb_check_simply_processed_properties(void *ctx, int ffd,
int res_ret, void *res_data, size_t res_size,
void *data)
{
flb_sds_t output;
char *result = NULL;

/* Convert from msgpack to JSON */
output = flb_msgpack_raw_to_json_sds(res_data, res_size);
TEST_CHECK(output != NULL);

result = strstr(output, "\"MESSAGE\":\"test native message with multiple values\"");
if (TEST_CHECK(result != NULL)) {
TEST_MSG("output:%s\n", output);
}

result = strstr(output, "\"KEY\":\"another\"");
if (TEST_CHECK(result != NULL)) {
TEST_MSG("output:%s\n", output);
}

result = strstr(output, "\"KEY2\":\"final_field\"");
if (TEST_CHECK(result != NULL)) {
TEST_MSG("output:%s\n", output);
}

result = strstr(output, "\"KEY3\":\"wow\"");
if (TEST_CHECK(result != NULL)) {
TEST_MSG("output:%s\n", output);
}

flb_sds_destroy(output);
}

void flb_test_duplicated_keys()
{
int ret;
Expand Down Expand Up @@ -107,8 +141,59 @@ void flb_test_duplicated_keys()
flb_destroy(ctx);
}

void flb_test_dont_compact_keys()
{
int ret;
int in_ffd;
int out_ffd;
flb_ctx_t *ctx;
char *message = "MESSAGE=test native message with multiple values\nKEY=value1\nKEY=value4\n"
"KEY2=value2\nKEY=another\nKEY2=value3\nKEY2=value5\nKEY3=howdy\nKEY3=prettygood\nKEY2=value10\n"
"KEY3=wow\nKEY2=final_field\n";

/* Create context, flush every second (some checks omitted here) */
ctx = flb_create();
flb_service_set(ctx,
"flush", "2",
"grace", "1",
"Log_Level", "error",
NULL);

/* Systemd */
in_ffd = flb_input(ctx, (char *) "systemd", NULL);
flb_input_set(ctx, in_ffd,
"tag", "test",
"Read_From_Tail", "On",
"Compact_Key", "Off",
NULL);


out_ffd = flb_output(ctx, (char *) "null", NULL);
flb_output_set(ctx, out_ffd,
"match", "test",
NULL);

/* Enable test mode */
ret = flb_input_set_test(ctx, in_ffd, "formatter",
cb_check_simply_processed_properties,
NULL);

/* Start */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data sample to run test formatter */
ret = flb_lib_push(ctx, in_ffd, message, strlen(message));
TEST_CHECK(ret == 0);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

/* Test list */
TEST_LIST = {
{ "duplicated_keys", flb_test_duplicated_keys },
{ "dont_compact_keys", flb_test_dont_compact_keys },
{ NULL, NULL}
};

0 comments on commit e35f861

Please sign in to comment.