-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Improved python module architecture and added algos.
* Splitted core and algos * Introduced a new API * Added zstd, brotli and bzip * Added paramaters for all algos
- Loading branch information
Showing
17 changed files
with
367 additions
and
45 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
__pycache__ | ||
*.pyc | ||
*.pyo | ||
./build | ||
./dist | ||
./*.egg-info |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
from .lz4 import Lz4 | ||
from .zlib import Zlib | ||
from .lzma_raw import LzmaRaw | ||
from .lzma_lzma import LzmaLzma | ||
from .lzma_xz import LzmaXz | ||
from .core import * | ||
from .algorithms.zlib import Zlib | ||
from .algorithms.lzma import Lzma | ||
from .algorithms.lz4 import Lz4 | ||
from .algorithms.brotli import Brotli | ||
from .algorithms.zstd import Zstd | ||
from .algorithms.bz2 import Bz2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
import typing | ||
from ..core import KaitaiProcessor, ProcessorContextStub | ||
import brotli | ||
from enum import Enum | ||
from functools import partial | ||
|
||
class Brotli(KaitaiProcessor): | ||
__slots__ = ("compressorParams", "decompressorParams", "brotli") | ||
def __init__(self, level:typing.Optional[int]=None, mode:typing.Optional[str]="generic", log_window_size:typing.Optional[int]=None, log_block_size:typing.Optional[int]=None, dictionary:typing.Optional[bytes]=None): | ||
if self.__class__.dO is None: | ||
import brotli | ||
self.__class__.brotli = brotli | ||
self.compressorParams = {} | ||
self.decompressorParams = {} | ||
|
||
if mode is not None: | ||
if isinstance(mode, str): | ||
mode = getattr(self.__class__.brotli, "MODE_" + mode.upper()) | ||
self.compressorParams["mode"] = mode | ||
|
||
if level is not None: | ||
self.compressorParams["quality"] = level | ||
|
||
if window_size is not None: | ||
self.compressorParams["lgwin"] = log_window_size | ||
|
||
if block_size is not None: | ||
self.compressorParams["lgblock"] = log_block_size | ||
|
||
if dictionary is not None: | ||
self.decompressorParams["dictionary"] = | ||
self.compressorParams["dictionary"] = dictionary | ||
|
||
# new API | ||
def process(self, data:typing.Union[bytes, bytearray], *args, **kwargs): | ||
return ProcessorContextStub(self.__class__.brotli.decompress(data, **self.decompressorParams)) | ||
|
||
def unprocess(self, data:typing.Union[bytes, bytearray], *args, **kwargs): | ||
return ProcessorContextStub(self.__class__.brotli.compress(data, **self.compressorParams)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
import typing | ||
from ..core import KaitaiProcessor, ProcessorContextStub | ||
import bz2 | ||
|
||
class Bz2(KaitaiProcessor) : | ||
__slots__ = ("decompressor", "compressor") | ||
def __init__(self, level:int=22, *args, **kwargs): | ||
self.decompressor = bz2.BZ2Decompressor() | ||
self.compressor = bz2.BZ2Compressor(level) | ||
|
||
def process(self, data:typing.Union[bytes, bytearray], *args, **kwargs): | ||
return ProcessorContextStub(self.decompressor.decompress(data)) | ||
|
||
def unprocess(self, data:typing.Union[bytes, bytearray], *args, **kwargs): | ||
return ProcessorContextStub(self.compressor.compress(data)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
import typing | ||
from ..core import KaitaiProcessor, ProcessorContextStub | ||
|
||
class Lz4(KaitaiProcessor): | ||
__slots__ = ("obj",) | ||
def __init__(self, block_size:bool=0, linker:bool=True, compression_level:int=16, frame_checksum:bool=False, block_checksum:bool=False, *args, **kwargs): | ||
import lz4.frame | ||
self.obj=lz4.frame.LZ4FrameCompressor(block_size=block_size, block_linker=linker, compression_level=compression_level, content_checksum=frame_checksum, block_checksum=block_checksum, return_bytearray=False) | ||
|
||
def process(self, data:typing.Union[bytes, bytearray], *args, **kwargs): | ||
return ProcessorContextStub(self.obj.decompress(data)) | ||
|
||
def unprocess(self, data:typing.Union[bytes, bytearray], *args, **kwargs): | ||
return ProcessorContextStub(self.obj.compress(data)) | ||
|
||
def getArgs(self, data:typing.Union[bytes, bytearray], *args, **kwargs): | ||
import lz4.frame | ||
res = lz4.frame.get_frame_info(data) | ||
return (res["block_size"], res["linker"], res["compression_level"], res["content_checksum"], res["block_checksum"]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
import typing | ||
from ..core import KaitaiProcessor, ProcessorContextStub | ||
import lzma | ||
from enum import Enum | ||
from functools import wraps | ||
|
||
modifiersMapping = { | ||
"e": lzma.PRESET_EXTREME | ||
} | ||
|
||
class Lzma(KaitaiProcessor): | ||
__slots__ = ("decompressor", "compressor") | ||
def __init__(self, algo:int=2, level:int=9, format:typing.Optional[typing.Union[str, int]]=lzma.FORMAT_AUTO, check:typing.Optional[typing.Union[str, int]]=-1, modifiers:str="e", dict_size:typing.Optional[int]=None, literal_context_bits:typing.Optional[int]=3, literal_position_bits:typing.Optional[int]=0, position_bits:typing.Optional[int]=2, match_finder:typing.Optional[str]="bt4", mode:typing.Optional[str]="normal", additional_filters:typing.Iterable[typing.Dict[str, typing.Any]]=(), *args, **kwargs): | ||
if isinstance(format, str): | ||
format = getattr(lzma, "FORMAT_" + format.upper()) | ||
|
||
if isinstance(check, str): | ||
check = getattr(lzma, "CHECK_" + check.upper()) | ||
|
||
|
||
filters = list(additional_filters) | ||
if algo > -1: | ||
if isinstance(modifiers, str): | ||
modifiersNum = 0 | ||
for m in modifiers: | ||
modifiersNum |= modifiersMapping[m] | ||
modifiers = modifiersNum | ||
del modifiersNum | ||
|
||
lzmaFilter={ | ||
"id": "lzma" + str(algo), | ||
"preset": level | modifiers, | ||
} | ||
|
||
if dict_size is not None: | ||
lzmaFilter["dict"] = dict_size, | ||
if literal_context_bits is not None: | ||
lzmaFilter["lc"] = literal_context_bits | ||
if literal_position_bits is not None: | ||
lzmaFilter["lp"] = literal_position_bits | ||
if position_bits is not None: | ||
lzmaFilter["pb"] = position_bits | ||
if match_finder is not None: | ||
if isinstance(match_finder, str): | ||
match_finder = getattr(lzma, "MF_"+match_finder.upper()) | ||
lzmaFilter["mf"] = match_finder | ||
if mode is not None: | ||
if isinstance(mode, str): | ||
mode = getattr(lzma, "MODE_"+mode.upper()) | ||
lzmaFilter["mode"] = mode | ||
filters.append(lzmaFilter) | ||
|
||
for f in filters: | ||
if isinstance(f["id"], str): | ||
f["id"] = getattr(lzma, "FILTER_"+f["id"].upper()) | ||
|
||
compressorParams = { | ||
"format": format, | ||
"check": check, | ||
"preset": None, # set in filters | ||
"filters": filters | ||
} | ||
decompressorParams = { | ||
"format": format, | ||
"memlimit": None, | ||
} | ||
|
||
if format is lzma.FORMAT_RAW: | ||
decompressorParams["filters"] = filters | ||
|
||
self.decompressor = lzma.LZMADecompressor(**decompressorParams) | ||
|
||
if "format" not in compressorParams or compressorParams["format"] == lzma.FORMAT_AUTO: | ||
compressorParams["format"] = lzma.FORMAT_XZ # TODO: detect from stream | ||
self.compressor = lzma.LZMACompressor(**compressorParams) | ||
|
||
def process(self, data:typing.Union[bytes, bytearray], *args, **kwargs): | ||
return ProcessorContextStub(self.decompressor.decompress(data)) | ||
|
||
def unprocess(self, data:typing.Union[bytes, bytearray], *args, **kwargs): | ||
return ProcessorContextStub(self.compressor.compress(data)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import typing | ||
import zlib | ||
from enum import IntEnum | ||
from ..core import KaitaiProcessor, ProcessorContextStub | ||
|
||
class Container(IntEnum): | ||
raw = -1 | ||
zlib = 1 | ||
gzip = 16 | ||
|
||
containerWLenTransformers={ | ||
Container.raw: lambda x: -x, | ||
Container.zlib: lambda x: x, | ||
Container.gzip: lambda x: Container.gzip.value + x | ||
} | ||
|
||
|
||
class Zlib(KaitaiProcessor): | ||
__slots__ = ("compressorParams", "decompressorParams", "dO", "cO") | ||
def __init__(self, containerType:Container=Container.zlib, log_window_size:int=15, zdict:typing.Optional[bytes]=None, level:int=-1, mem_level:typing.Union[str, int]="DEF_MEM_LEVEL", strategy:typing.Union[str, int]="DEFAULT_STRATEGY", method:typing.Optional[typing.Union[str, int]]="deflated", *args, **kwargs): | ||
#containerType = Container(containerType) | ||
self.compressorParams = {} | ||
self.decompressorParams = {} | ||
if method is not None: | ||
if isinstance(method, str): | ||
method = getattr(zlib, method.upper()) | ||
self.compressorParams["method"] = method | ||
|
||
if mem_level is not None: | ||
if isinstance(mem_level, str): | ||
memLevel = getattr(zlib, mem_level) | ||
self.compressorParams["memLevel"] = method | ||
|
||
if strategy is not None: | ||
if isinstance(strategy , str): | ||
strategy = getattr(zlib, "Z_" + strategy.upper()) | ||
self.compressorParams["strategy"] = method | ||
|
||
|
||
self.compressorParams["level"] = level | ||
self.decompressorParams["wbits"] = self.compressorParams["wbits"] = containerWLenTransformers[containerType](log_window_size) | ||
|
||
if zdict is not None: | ||
self.decompressorParams["zdict"] = self.compressorParams["zdict"] = zdict | ||
|
||
def process(self, data:typing.Union[bytes, bytearray], *args, **kwargs): | ||
dO = zlib.decompressobj(**self.decompressorParams) | ||
return ProcessorContextStub( dO, dO.decompress, data) | ||
|
||
def unprocess(self, data:typing.Union[bytes, bytearray], *args, **kwargs): | ||
cO = zlib.compressobj(**self.compressorParams) | ||
return ProcessorContextStub( cO, cO.compress, data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
import typing | ||
from ..core import KaitaiProcessor, ProcessorContextStub | ||
from enum import Enum | ||
from functools import partial | ||
|
||
class Zstd(KaitaiProcessor): | ||
__slots__ = ("decompressor", "compressor") | ||
zstd = None | ||
def __init__(self, format:typing.Union[int, str]="zstd1_magicless", log_window_size:typing.Optional[int]=None, dictionary:typing.Optional[bytes]=None, level:int=22, should_write_checksum:bool=True, should_write_uncompressed_size:bool=True, should_write_dict_id:bool=True, strategy:typing.Optional[typing.Union[int, str]]=None, hash_log_size:typing.Optional[int]=None, match_min_size:typing.Optional[int]=None, chain_log_size:typing.Optional[int]=None, search_log_size:typing.Optional[int]=None, overlap_log_size:typing.Optional[int]=None, target_length:typing.Optional[int]=None, ldm:typing.Optional[bool]=None, ldm_hash_log_size:typing.Optional[int]=None, ldm_match_min_size:typing.Optional[int]=None, ldm_bucket_size_log:typing.Optional[int]=None, ldm_hash_rate_log:typing.Optional[int]=None, job_size:typing.Optional[int]=None, force_max_window:typing.Optional[int]=None): | ||
if self.__class__.decompressor is None: | ||
import zstd | ||
self.__class__.zstd = zstd | ||
if isinstance(format, str): | ||
format = getattr(self.__class__.zstd, "FORMAT_" + format.upper()) | ||
|
||
decompressorParams = {"format":format} | ||
compressorParamsDict = {"threads":-1, "format":format} | ||
compressorParams = {} | ||
|
||
if dictionary is not None: | ||
decompressorParams["dict_data"] = compressorParams["dict_data"] = dictionary | ||
|
||
if log_window_size is not None: | ||
decompressorParams["max_window_size"] = 2**log_window_size | ||
compressorParamsDict["window_log"] = log_window_size | ||
|
||
self.__class__.decompressor = self.__class__.zstd.ZstdDecompressor(dict_data=dictionary, **decompressorParams) | ||
|
||
compressorParamsDict["write_checksum"] = should_write_checksum | ||
compressorParamsDict["write_content_size"] = should_write_uncompressed_size | ||
compressorParamsDict["write_dict_id"] = should_write_dict_id | ||
|
||
if strategy is not None: | ||
if isinstance(strategy, str): | ||
strategy = getattr(self.__class__.zstd, "STRATEGY_" + strategy.upper()) | ||
compressorParamsDict["strategy"] = strategy | ||
|
||
if hash_log_size is not None: | ||
compressorParamsDict["hash_log"] = hash_log_size | ||
if match_min_size is not None: | ||
compressorParamsDict["min_match"] = match_min_size | ||
|
||
if chain_log_size is not None: | ||
compressorParamsDict["chain_log"] = chain_log_size | ||
if search_log_size is not None: | ||
compressorParamsDict["search_log"] = search_log_size | ||
if overlap_log_size is not None: | ||
compressorParamsDict["overlap_log"] = overlap_log_size | ||
if target_length is not None: | ||
compressorParamsDict["target_length"] = target_length | ||
if ldm is not None: | ||
compressorParamsDict["enable_ldm"] = ldm | ||
if ldm: | ||
if ldm_hash_log_size is not None: | ||
compressorParamsDict["ldm_hash_log"] = ldm_hash_log_size | ||
if ldm_match_min_size is not None: | ||
compressorParamsDict["ldm_min_match"] = ldm_match_min_size | ||
if ldm_bucket_size_log is not None: | ||
compressorParamsDict["ldm_bucket_size_log"] = ldm_bucket_size_log | ||
if ldm_hash_rate_log is not None: | ||
compressorParamsDict["ldm_hash_rate_log"] = ldm_hash_rate_log | ||
if job_size is not None: | ||
compressorParamsDict["job_size"] = job_size | ||
if force_max_window is not None: | ||
compressorParamsDict["force_max_window"] = force_max_window | ||
|
||
compressorParams["compression_params"] = self.__class__.zstd.ZstdCompressionParameters.from_level(level, **compressorParamsDict) | ||
self.__class__.compressor = zstd.ZstdCompressor(**compressorParams) | ||
|
||
def process(self, data:typing.Union[bytes, bytearray]): | ||
return ProcessorContextStub(self.__class__.decompressor(data)) | ||
|
||
def unprocess(self, data:typing.Union[bytes, bytearray]): | ||
return ProcessorContextStub(self.__class__.compressor.compress(data)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
import typing | ||
|
||
class KaitaiProcessorContext: | ||
def __call__(self, slce:slice, *args, **kwargs) -> bytes: | ||
raise NotImplementedException("Please implement process") | ||
|
||
class ProcessorContextStub(KaitaiProcessorContext): | ||
"""A dummy implementation for non-seekable streams. Just decompresses all the data and saves it.""" | ||
__slots__ = ("data",) | ||
def __init__(self, data:typing.Union[bytes, bytearray], *args, **kwargs): | ||
self.data = meth(data) | ||
|
||
def __call__(self, slc:slice, *args, **kwargs) -> bytes: | ||
return self.data[slc] | ||
|
||
class KaitaiProcessor: | ||
"""The base processor class""" | ||
def __init__(self, *args, **kwargs): | ||
raise NotImplementedException("Please implement __init__") | ||
|
||
def decode(self, data:typing.Union[bytes, bytearray], *args, **kwargs) -> bytes: | ||
"""The method implementing compatibility to legacy API. Gonna be removed somewhen.""" | ||
return self.process(data, *args, **kwargs)(slice(None, None, None)) | ||
|
||
def process(self, data:typing.Union[bytes, bytearray], *args, **kwargs) -> KaitaiProcessorContext: | ||
raise NotImplementedException("Please implement process") | ||
|
||
def unprocess(self, data:typing.Union[bytes, bytearray], *args, **kwargs) -> KaitaiProcessorContext: | ||
raise NotImplementedException(self.__class__.__name__ + " processing is not invertible") | ||
|
||
def getArgs(self, data:typing.Union[bytes, bytearray], *args, **kwargs) -> tuple: | ||
raise NotImplementedException("Cannot get args of " + self.__class__.__name__) |
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.