Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to skip forward when decoding IR streams #41

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions clp_ffi_py/ir/native.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,11 @@ class Decoder:
query: Optional[Query] = None,
allow_incomplete_stream: bool = False,
) -> Optional[LogEvent]: ...
@staticmethod
def skip_forward(
decoder_buffer: DecoderBuffer,
num_events_to_skip: int,
allow_incomplete_stream: bool = False,
) -> None: ...

class IncompleteStreamError(Exception): ...
20 changes: 16 additions & 4 deletions clp_ffi_py/ir/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class ClpIrStreamReader(Iterator[LogEvent]):
generator with a customized search query.

:param istream: Input stream that contains encoded CLP IR.
:param starting_index: Starting index for reading log events. Log events
preceding this index will be sequentially decoded and then disregarded.
:param decoder_buffer_size: Initial size of the decoder buffer.
:param enable_compression: A flag indicating whether the istream is
compressed using `zstd`.
Expand All @@ -28,6 +30,7 @@ class ClpIrStreamReader(Iterator[LogEvent]):
def __init__(
self,
istream: IO[bytes],
starting_index: int = 0,
decoder_buffer_size: int = DEFAULT_DECODER_BUFFER_SIZE,
enable_compression: bool = True,
allow_incomplete_stream: bool = False,
Expand All @@ -38,6 +41,9 @@ def __init__(
self.__istream = dctx.stream_reader(istream, read_across_frames=True)
else:
self.__istream = istream
self._starting_index: int = starting_index
if 0 > self._starting_index:
raise IndexError(f"Negative starting index are not supported: {self._starting_index}")
self._decoder_buffer: DecoderBuffer = DecoderBuffer(self.__istream, decoder_buffer_size)
self._metadata: Optional[Metadata] = None
self._allow_incomplete_stream: bool = allow_incomplete_stream
Expand All @@ -59,17 +65,23 @@ def read_next_log_event(self) -> Optional[LogEvent]:

def read_preamble(self) -> None:
"""
Try to decode the preamble and set `metadata`. If `metadata` has been
set already, it will instantly return. It is separated from `__init__`
so that the input stream does not need to be readable on a reader's
construction, but until the user starts to iterate logs.
Try to decode the preamble and set `metadata`, and set the stream to
read from the position specified `starting_index`. If `metadata` has
been set already, it will instantly return. It is separated from
`__init__` so that the input stream does not need to be readable on a
reader's construction, but until the user starts to iterate logs.

:raise Exception:
If :meth:`~clp_ffi_py.ir.native.Decoder.decode_preamble` fails.
"""
if self.has_metadata():
return
self._metadata = Decoder.decode_preamble(self._decoder_buffer)
Decoder.skip_forward(
self._decoder_buffer,
num_events_to_skip=self._starting_index,
allow_incomplete_stream=self._allow_incomplete_stream,
)

def get_metadata(self) -> Metadata:
if None is self._metadata:
Expand Down
23 changes: 23 additions & 0 deletions src/clp_ffi_py/ir/native/PyDecoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ PyDoc_STRVAR(
" - None when the end of IR stream is reached or the query search terminates.\n"
);

// NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays)
PyDoc_STRVAR(
cSkipForwardDoc,
"skip_forward( "
"decoder_buffer, num_events_to_skip, allow_incomplete_stream=False)\n"
"--\n\n"
"Decodes and discards the given amount of log events from the IR stream buffered in the "
"given decoder buffer. `decoder_buffer` must have been returned by a successfully "
"invocation of `decode_preamble`. It will stop whenever EOF.\n\n"
":param decoder_buffer: The decoder buffer of the encoded CLP IR stream.\n"
":param num_events_to_skip: Number of events to skip forward.\n"
":param allow_incomplete_stream: If set to `True`, an incomplete CLP IR stream is not "
"treated as an error. Instead, encountering such a stream is seen as reaching its end, and "
"the function will return None without raising any exceptions.\n"
":raises: Appropriate exceptions with detailed information on any encountered failure.\n"
":return: None\n"
);

// NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays)
PyMethodDef PyDecoder_method_table[]{
{"decode_preamble",
Expand All @@ -55,6 +73,11 @@ PyMethodDef PyDecoder_method_table[]{
METH_VARARGS | METH_KEYWORDS | METH_STATIC,
static_cast<char const*>(cDecodeNextLogEventDoc)},

{"skip_forward",
py_c_function_cast(skip_forward),
METH_VARARGS | METH_KEYWORDS | METH_STATIC,
static_cast<char const*>(cSkipForwardDoc)},

{nullptr, nullptr, 0, nullptr}
};

Expand Down
2 changes: 1 addition & 1 deletion src/clp_ffi_py/ir/native/PyLogEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ auto PyLogEvent::module_level_init(PyObject* py_module) -> bool {
}

auto PyLogEvent::create_new_log_event(
std::string const& log_message,
std::string_view log_message,
ffi::epoch_time_ms_t timestamp,
size_t index,
PyMetadata* metadata
Expand Down
2 changes: 1 addition & 1 deletion src/clp_ffi_py/ir/native/PyLogEvent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class PyLogEvent {
* set.
*/
[[nodiscard]] static auto create_new_log_event(
std::string const& log_message,
std::string_view log_message,
ffi::epoch_time_ms_t timestamp,
size_t index,
PyMetadata* metadata
Expand Down
Loading
Loading