Skip to content

Commit

Permalink
journal-remote,journal-upload: added compression support (systemd#34822)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuwata authored Feb 9, 2025
2 parents 88a0a54 + cfaf780 commit 52e9bc2
Show file tree
Hide file tree
Showing 20 changed files with 477 additions and 50 deletions.
13 changes: 13 additions & 0 deletions man/journal-remote.conf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@
[Remote] section:</para>

<variablelist class='config-directives'>
<varlistentry>
<term><varname>Compression=</varname></term>

<listitem><para>Acceptable compression algorithms to be used by <command>systemd-journal-upload</command>. Compression algorithms are
used for <literal>Accept-Encoding</literal> header contruction with priorities set according to an order in configuration.
This parameter takes space separated list of compression algorithms. Example:
<programlisting>Compression=zstd lz4</programlisting>
This option can be specified multiple times. If an empty string is assigned, then all the previous assignments are cleared.
</para>

<xi:include href="version-info.xml" xpointer="v258"/></listitem>
</varlistentry>

<varlistentry>
<term><varname>Seal=</varname></term>

Expand Down
31 changes: 31 additions & 0 deletions man/journal-upload.conf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,37 @@
<xi:include href="version-info.xml" xpointer="v232"/></listitem>
</varlistentry>

<varlistentry>
<term><varname>Compression=</varname></term>

<listitem><para>Takes a space separated list of compression algorithms to be applied to logs data before sending.
Supported algorithms are <literal>none</literal>, <literal>zstd</literal>, <literal>xz</literal>,
or <literal>lz4</literal>. Optionally, each algorithm (except for <literal>none</literal>)
followed by a colon (<literal>:</literal>) and its compression level, for example <literal>zstd:4</literal>.
The compression level is expected to be a positive integer. This option can be specified multiple times.
If an empty string is assigned, then all previous assignments are cleared.
Defaults to unset, and data will not be compressed.</para>

<para>Example:
<programlisting>Compression=zstd:4 lz4:2</programlisting></para>

<para>Even when compression is enabled, the initial requests are sent without compression.
It becomes effective either if <literal>ForceCompression=</literal> is enabled,
or the server response contains <literal>Accept-Encoding</literal> headers with a list of
compression algorithms that contains one of the algorithms specified in this option.</para>

<xi:include href="version-info.xml" xpointer="v258"/></listitem>
</varlistentry>

<varlistentry>
<term><varname>ForceCompression=</varname></term>

<listitem><para>Takes a boolean value, enforces using compression without content encoding negotiation.
Defaults to <literal>false</literal>.</para>

<xi:include href="version-info.xml" xpointer="v258"/></listitem>
</varlistentry>

<varlistentry>
<term><varname>ServerKeyFile=</varname></term>

Expand Down
43 changes: 34 additions & 9 deletions src/basic/compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
#include <sys/types.h>
#include <unistd.h>

#if HAVE_LZ4
#include <lz4hc.h>
#endif

#if HAVE_XZ
#include <lzma.h>
#endif
Expand Down Expand Up @@ -43,6 +47,7 @@ static DLSYM_PROTOTYPE(LZ4F_freeCompressionContext) = NULL;
static DLSYM_PROTOTYPE(LZ4F_freeDecompressionContext) = NULL;
static DLSYM_PROTOTYPE(LZ4F_isError) = NULL;
DLSYM_PROTOTYPE(LZ4_compress_default) = NULL;
DLSYM_PROTOTYPE(LZ4_compress_HC) = NULL;
DLSYM_PROTOTYPE(LZ4_decompress_safe) = NULL;
DLSYM_PROTOTYPE(LZ4_decompress_safe_partial) = NULL;
DLSYM_PROTOTYPE(LZ4_versionNumber) = NULL;
Expand Down Expand Up @@ -94,6 +99,7 @@ static DLSYM_PROTOTYPE(lzma_easy_encoder) = NULL;
static DLSYM_PROTOTYPE(lzma_end) = NULL;
static DLSYM_PROTOTYPE(lzma_stream_buffer_encode) = NULL;
static DLSYM_PROTOTYPE(lzma_stream_decoder) = NULL;
static DLSYM_PROTOTYPE(lzma_lzma_preset) = NULL;

/* We can't just do _cleanup_(sym_lzma_end) because a compiler bug makes
* this fail with:
Expand All @@ -116,7 +122,15 @@ static const char* const compression_table[_COMPRESSION_MAX] = {
[COMPRESSION_ZSTD] = "ZSTD",
};

static const char* const compression_lowercase_table[_COMPRESSION_MAX] = {
[COMPRESSION_NONE] = "none",
[COMPRESSION_XZ] = "xz",
[COMPRESSION_LZ4] = "lz4",
[COMPRESSION_ZSTD] = "zstd",
};

DEFINE_STRING_TABLE_LOOKUP(compression, Compression);
DEFINE_STRING_TABLE_LOOKUP(compression_lowercase, Compression);

bool compression_supported(Compression c) {
static const unsigned supported =
Expand Down Expand Up @@ -145,12 +159,13 @@ int dlopen_lzma(void) {
DLSYM_ARG(lzma_easy_encoder),
DLSYM_ARG(lzma_end),
DLSYM_ARG(lzma_stream_buffer_encode),
DLSYM_ARG(lzma_lzma_preset),
DLSYM_ARG(lzma_stream_decoder));
}
#endif

int compress_blob_xz(const void *src, uint64_t src_size,
void *dst, size_t dst_alloc_size, size_t *dst_size) {
void *dst, size_t dst_alloc_size, size_t *dst_size, int level) {

assert(src);
assert(src_size > 0);
Expand All @@ -159,12 +174,12 @@ int compress_blob_xz(const void *src, uint64_t src_size,
assert(dst_size);

#if HAVE_XZ
static const lzma_options_lzma opt = {
lzma_options_lzma opt = {
1u << 20u, NULL, 0, LZMA_LC_DEFAULT, LZMA_LP_DEFAULT,
LZMA_PB_DEFAULT, LZMA_MODE_FAST, 128, LZMA_MF_HC3, 4
};
static const lzma_filter filters[] = {
{ LZMA_FILTER_LZMA2, (lzma_options_lzma*) &opt },
lzma_filter filters[] = {
{ LZMA_FILTER_LZMA2, &opt },
{ LZMA_VLI_UNKNOWN, NULL }
};
lzma_ret ret;
Expand All @@ -175,13 +190,19 @@ int compress_blob_xz(const void *src, uint64_t src_size,
if (r < 0)
return r;

if (level >= 0) {
r = sym_lzma_lzma_preset(&opt, (uint32_t) level);
if (r < 0)
return r;
}

/* Returns < 0 if we couldn't compress the data or the
* compressed result is longer than the original */

if (src_size < 80)
return -ENOBUFS;

ret = sym_lzma_stream_buffer_encode((lzma_filter*) filters, LZMA_CHECK_NONE, NULL,
ret = sym_lzma_stream_buffer_encode(filters, LZMA_CHECK_NONE, NULL,
src, src_size, dst, &out_pos, dst_alloc_size);
if (ret != LZMA_OK)
return -ENOBUFS;
Expand Down Expand Up @@ -214,14 +235,15 @@ int dlopen_lz4(void) {
DLSYM_ARG(LZ4F_freeDecompressionContext),
DLSYM_ARG(LZ4F_isError),
DLSYM_ARG(LZ4_compress_default),
DLSYM_ARG(LZ4_compress_HC),
DLSYM_ARG(LZ4_decompress_safe),
DLSYM_ARG(LZ4_decompress_safe_partial),
DLSYM_ARG(LZ4_versionNumber));
}
#endif

int compress_blob_lz4(const void *src, uint64_t src_size,
void *dst, size_t dst_alloc_size, size_t *dst_size) {
void *dst, size_t dst_alloc_size, size_t *dst_size, int level) {

assert(src);
assert(src_size > 0);
Expand All @@ -241,7 +263,10 @@ int compress_blob_lz4(const void *src, uint64_t src_size,
if (src_size < 9)
return -ENOBUFS;

r = sym_LZ4_compress_default(src, (char*)dst + 8, src_size, (int) dst_alloc_size - 8);
if (level <= 0)
r = sym_LZ4_compress_default(src, (char*)dst + 8, src_size, (int) dst_alloc_size - 8);
else
r = sym_LZ4_compress_HC(src, (char*)dst + 8, src_size, (int) dst_alloc_size - 8, level);
if (r <= 0)
return -ENOBUFS;

Expand Down Expand Up @@ -285,7 +310,7 @@ int dlopen_zstd(void) {

int compress_blob_zstd(
const void *src, uint64_t src_size,
void *dst, size_t dst_alloc_size, size_t *dst_size) {
void *dst, size_t dst_alloc_size, size_t *dst_size, int level) {

assert(src);
assert(src_size > 0);
Expand All @@ -301,7 +326,7 @@ int compress_blob_zstd(
if (r < 0)
return r;

k = sym_ZSTD_compress(dst, dst_alloc_size, src, src_size, 0);
k = sym_ZSTD_compress(dst, dst_alloc_size, src, src_size, level < 0 ? 0 : level);
if (sym_ZSTD_isError(k))
return zstd_ret_to_errno(k);

Expand Down
16 changes: 9 additions & 7 deletions src/basic/compress.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ typedef enum Compression {

const char* compression_to_string(Compression compression);
Compression compression_from_string(const char *compression);
const char* compression_lowercase_to_string(Compression compression);
Compression compression_lowercase_from_string(const char *compression);

bool compression_supported(Compression c);

int compress_blob_xz(const void *src, uint64_t src_size,
void *dst, size_t dst_alloc_size, size_t *dst_size);
void *dst, size_t dst_alloc_size, size_t *dst_size, int level);
int compress_blob_lz4(const void *src, uint64_t src_size,
void *dst, size_t dst_alloc_size, size_t *dst_size);
void *dst, size_t dst_alloc_size, size_t *dst_size, int level);
int compress_blob_zstd(const void *src, uint64_t src_size,
void *dst, size_t dst_alloc_size, size_t *dst_size);
void *dst, size_t dst_alloc_size, size_t *dst_size, int level);

int decompress_blob_xz(const void *src, uint64_t src_size,
void **dst, size_t* dst_size, size_t dst_max);
Expand Down Expand Up @@ -90,15 +92,15 @@ int dlopen_lzma(void);
static inline int compress_blob(
Compression compression,
const void *src, uint64_t src_size,
void *dst, size_t dst_alloc_size, size_t *dst_size) {
void *dst, size_t dst_alloc_size, size_t *dst_size, int level) {

switch (compression) {
case COMPRESSION_ZSTD:
return compress_blob_zstd(src, src_size, dst, dst_alloc_size, dst_size);
return compress_blob_zstd(src, src_size, dst, dst_alloc_size, dst_size, level);
case COMPRESSION_LZ4:
return compress_blob_lz4(src, src_size, dst, dst_alloc_size, dst_size);
return compress_blob_lz4(src, src_size, dst, dst_alloc_size, dst_size, level);
case COMPRESSION_XZ:
return compress_blob_xz(src, src_size, dst, dst_alloc_size, dst_size);
return compress_blob_xz(src, src_size, dst, dst_alloc_size, dst_size, level);
default:
return -EOPNOTSUPP;
}
Expand Down
2 changes: 1 addition & 1 deletion src/fuzz/fuzz-compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
}

size_t csize;
r = compress_blob(alg, h->data, data_len, buf, size, &csize);
r = compress_blob(alg, h->data, data_len, buf, size, &csize, /* level = */ -1);
if (r < 0) {
log_error_errno(r, "Compression failed: %m");
return 0;
Expand Down
91 changes: 91 additions & 0 deletions src/journal-remote/journal-compression-util.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/* SPDX-License-Identifier: LGPL-2.1-or-later */

#include "extract-word.h"
#include "journal-compression-util.h"
#include "parse-util.h"

void compression_args_clear(CompressionArgs *args) {
assert(args);
args->size = 0;
args->opts = mfree(args->opts);
}

int config_parse_compression(
const char *unit,
const char *filename,
unsigned line,
const char *section,
unsigned section_line,
const char *lvalue,
int ltype,
const char *rvalue,
void *data,
void *userdata) {

CompressionArgs *args = ASSERT_PTR(data);
bool parse_level = ltype;
int r;

assert(filename);
assert(lvalue);
assert(rvalue);

if (isempty(rvalue)) {
compression_args_clear(args);
return 1;
}

for (const char *p = rvalue;;) {
_cleanup_free_ char *algorithm = NULL, *word = NULL;
int level = -1;

r = extract_first_word(&p, &word, NULL, 0);
if (r < 0)
return log_syntax_parse_error(unit, filename, line, r, lvalue, rvalue);
if (r == 0)
return 1;

if (parse_level) {
const char *q = word;
r = extract_first_word(&q, &algorithm, ":", 0);
if (r < 0)
return log_syntax_parse_error(unit, filename, line, r, lvalue, rvalue);
if (!isempty(q)) {
r = safe_atoi(q, &level);
if (r < 0) {
log_syntax(unit, LOG_WARNING, filename, line, r,
"Compression level %s should be positive, ignoring.", q);
continue;
}
}
} else
algorithm = TAKE_PTR(word);

Compression c = compression_lowercase_from_string(algorithm);
if (c < 0 || !compression_supported(c)) {
log_syntax(unit, LOG_WARNING, filename, line, c,
"Compression=%s is not supported on a system, ignoring.", algorithm);
continue;
}

bool found = false;
FOREACH_ARRAY(opt, args->opts, args->size)
if (opt->algorithm == c) {
found = true;
if (parse_level)
opt->level = level;
break;
}

if (found)
continue;

if (!GREEDY_REALLOC(args->opts, args->size + 1))
return log_oom();

args->opts[args->size++] = (CompressionOpts) {
.algorithm = c,
.level = level,
};
}
}
19 changes: 19 additions & 0 deletions src/journal-remote/journal-compression-util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* SPDX-License-Identifier: LGPL-2.1-or-later */
#pragma once

#include "compress.h"
#include "conf-parser.h"

typedef struct CompressionOpts {
Compression algorithm;
int level;
} CompressionOpts;

typedef struct CompressionArgs {
CompressionOpts *opts;
size_t size;
} CompressionArgs;

CONFIG_PARSER_PROTOTYPE(config_parse_compression);

void compression_args_clear(CompressionArgs *args);
Loading

0 comments on commit 52e9bc2

Please sign in to comment.