Source code for clp_ffi_py.ir.native_deprecated
from typing import IO, Optional
from deprecated.sphinx import deprecated
from clp_ffi_py.ir.native import (
DeserializerBuffer,
FourByteDeserializer,
FourByteSerializer,
LogEvent,
Metadata,
Query,
)
[docs]
@deprecated(
version="0.0.13",
reason=":class:`FourByteEncoder` is deprecated and has been renamed to"
" :class:`~clp_ffi_py.ir.native.FourByteSerializer`.",
)
class FourByteEncoder:
[docs]
@staticmethod
def encode_preamble(ref_timestamp: int, timestamp_format: str, timezone: str) -> bytearray:
"""
This method is deprecated and has been renamed to
:meth:`~clp_ffi_py.ir.native.FourByteSerializer.serialize_preamble`.
"""
return FourByteSerializer.serialize_preamble(ref_timestamp, timestamp_format, timezone)
[docs]
@staticmethod
def encode_message_and_timestamp_delta(timestamp_delta: int, msg: bytes) -> bytearray:
"""
This method is deprecated and has been renamed to
:meth:`~clp_ffi_py.ir.native.FourByteSerializer.serialize_message_and_timestamp_delta`.
"""
return FourByteSerializer.serialize_message_and_timestamp_delta(
timestamp_delta,
msg,
)
[docs]
@staticmethod
def encode_message(msg: bytes) -> bytearray:
"""
This method is deprecated and has been renamed to
:meth:`~clp_ffi_py.ir.native.FourByteSerializer.serialize_message`.
"""
return FourByteSerializer.serialize_message(msg)
[docs]
@staticmethod
def encode_timestamp_delta(timestamp_delta: int) -> bytearray:
"""
This method is deprecated and has been renamed to
:meth:`~clp_ffi_py.ir.native.FourByteSerializer.serialize_timestamp_delta`.
"""
return FourByteSerializer.serialize_timestamp_delta(timestamp_delta)
[docs]
@staticmethod
def encode_end_of_ir() -> bytearray:
"""
This method is deprecated and has been renamed to
:meth:`~clp_ffi_py.ir.native.FourByteSerializer.serialize_end_of_ir`.
"""
return FourByteSerializer.serialize_end_of_ir()
[docs]
@deprecated(
version="0.0.13",
reason=":class:`DecoderBuffer` is deprecated and has been renamed to"
" :class:`~clp_ffi_py.ir.native.DeserializerBuffer`.",
)
class DecoderBuffer:
[docs]
def __init__(self, input_stream: IO[bytes], initial_buffer_capacity: int = 4096):
self._deserializer_buffer = DeserializerBuffer(
input_stream=input_stream, initial_buffer_capacity=initial_buffer_capacity
)
[docs]
def get_num_decoded_log_messages(self) -> int:
"""
This method is deprecated and has been renamed to
:meth:`~clp_ffi_py.ir.native.DeserializerBuffer.get_num_deserialized_log_messages`.
"""
return self._deserializer_buffer.get_num_deserialized_log_messages()
def _test_streaming(self, seed: int) -> bytearray:
"""
See :meth:`~clp_ffi_py.ir.native.DeserializerBuffer._test_streaming`.
"""
return self._deserializer_buffer._test_streaming(seed)
[docs]
@deprecated(
version="0.0.13",
reason=":class:`Decoder` is deprecated and has been renamed to"
" :class:`~clp_ffi_py.ir.native.FourByteDeserializer`.",
)
class Decoder:
[docs]
@staticmethod
def decode_preamble(decoder_buffer: DecoderBuffer) -> Metadata:
"""
This method is deprecated and has been renamed to
:meth:`~clp_ffi_py.ir.native.FourByteDeserializer.deserialize_preamble`.
"""
return FourByteDeserializer.deserialize_preamble(decoder_buffer._deserializer_buffer)
[docs]
@staticmethod
def decode_next_log_event(
decoder_buffer: DecoderBuffer,
query: Optional[Query] = None,
allow_incomplete_stream: bool = False,
) -> Optional[LogEvent]:
"""
This method is deprecated and has been renamed to
:meth:`~clp_ffi_py.ir.native.FourByteDeserializer.deserialize_next_log_event`.
"""
return FourByteDeserializer.deserialize_next_log_event(
deserializer_buffer=decoder_buffer._deserializer_buffer,
query=query,
allow_incomplete_stream=allow_incomplete_stream,
)
# Delete `__new__` so that it will be ignored by Sphinx.
del Decoder.__new__
del FourByteEncoder.__new__