diff --git a/v2/entropy/RiceGolombCodec.go b/v2/entropy/RiceGolombCodec.go index 2ee7e51f..9300aaca 100644 --- a/v2/entropy/RiceGolombCodec.go +++ b/v2/entropy/RiceGolombCodec.go @@ -111,7 +111,7 @@ type RiceGolombDecoder struct { bitstream kanzi.InputBitStream } -// NewRiceGolombDecoder creates a new instance of ExpGolombDecoder +// NewRiceGolombDecoder creates a new instance of RiceGolombDecoder // If sgn is true, values from the bitstream will be decoded as signed (int8) func NewRiceGolombDecoder(bs kanzi.InputBitStream, sgn bool, logBase uint) (*RiceGolombDecoder, error) { if bs == nil { diff --git a/v2/io/CompressedStream.go b/v2/io/CompressedStream.go index e610ff64..1a79edb8 100644 --- a/v2/io/CompressedStream.go +++ b/v2/io/CompressedStream.go @@ -101,27 +101,33 @@ type Writer struct { listeners []kanzi.Listener ctx map[string]any headless bool + taskInfos []encodingTaskInfo } type encodingTask struct { - iBuffer *blockBuffer - oBuffer *blockBuffer - hasher *hash.XXHash32 - blockLength uint - blockTransformType uint64 - blockEntropyType uint32 - currentBlockID int32 - processedBlockID *int32 - wg *sync.WaitGroup - listeners []kanzi.Listener - obs kanzi.OutputBitStream - ctx map[string]any + info *encodingTaskInfo + blockLength uint + currentBlockID int32 + processedBlockID *int32 + wg *sync.WaitGroup + obs kanzi.OutputBitStream + ctx map[string]any } type encodingTaskResult struct { err *IOError } +type encodingTaskInfo struct { + iBuffer *blockBuffer + oBuffer *blockBuffer + hasher *hash.XXHash32 + transform *transform.ByteTransformSequence + listeners []kanzi.Listener + transformType uint64 + entropyType uint32 +} + // NewWriter creates a new instance of Writer. // The writer writes compressed data blocks to the provided os. func NewWriter(os io.WriteCloser, entropy, transform string, blockSize, jobs uint, checksum bool) (*Writer, error) { @@ -251,6 +257,7 @@ func createWriterWithCtx(obs kanzi.OutputBitStream, ctx map[string]any) (*Writer ctx["bsVersion"] = uint(_BITSTREAM_FORMAT_VERSION) this.jobs = int(tasks) + this.taskInfos = make([]encodingTaskInfo, this.jobs) this.buffers = make([]blockBuffer, 2*this.jobs) // Allocate first buffer and add padding for incompressible blocks @@ -499,19 +506,27 @@ func (this *Writer) processBlock() error { off += dataLength this.available -= dataLength + if firstID == 0 { + // Create the task static infos + this.taskInfos[taskID] = encodingTaskInfo{ + hasher: this.hasher, + transform: nil, + listeners: listeners, + iBuffer: &this.buffers[taskID], + oBuffer: &this.buffers[this.jobs+taskID], + transformType: this.transformType, + entropyType: this.entropyType} + + } + task := encodingTask{ - iBuffer: &this.buffers[taskID], - oBuffer: &this.buffers[this.jobs+taskID], - hasher: this.hasher, - blockLength: uint(dataLength), - blockTransformType: this.transformType, - blockEntropyType: this.entropyType, - currentBlockID: firstID + int32(taskID) + 1, - processedBlockID: &this.blockID, - wg: &wg, - obs: this.obs, - listeners: listeners, - ctx: copyCtx} + info: &this.taskInfos[taskID], + blockLength: uint(dataLength), + currentBlockID: firstID + int32(taskID) + 1, + processedBlockID: &this.blockID, + wg: &wg, + obs: this.obs, + ctx: copyCtx} // Invoke the tasks concurrently go task.encode(&results[taskID]) @@ -547,10 +562,12 @@ func (this *Writer) GetWritten() uint64 { // // then 0byyyyyyyy => transform sequence skip flags (1 means skip) func (this *encodingTask) encode(res *encodingTaskResult) { - data := this.iBuffer.Buf - buffer := this.oBuffer.Buf + data := this.info.iBuffer.Buf + buffer := this.info.oBuffer.Buf mode := byte(0) checksum := uint32(0) + blockTransformType := this.info.transformType + blockEntropyType := this.info.entropyType defer func() { if r := recover(); r != nil { @@ -568,29 +585,25 @@ func (this *encodingTask) encode(res *encodingTaskResult) { }() // Compute block checksum - if this.hasher != nil { - checksum = this.hasher.Hash(data[0:this.blockLength]) + if this.info.hasher != nil { + checksum = this.info.hasher.Hash(data[0:this.blockLength]) } - if len(this.listeners) > 0 { + if len(this.info.listeners) > 0 { // Notify before transform evt := kanzi.NewEvent(kanzi.EVT_BEFORE_TRANSFORM, int(this.currentBlockID), - int64(this.blockLength), checksum, this.hasher != nil, time.Now()) - notifyListeners(this.listeners, evt) + int64(this.blockLength), checksum, this.info.hasher != nil, time.Now()) + notifyListeners(this.info.listeners, evt) } if this.blockLength <= _SMALL_BLOCK_SIZE { - this.blockTransformType = transform.NONE_TYPE - this.blockEntropyType = entropy.NONE_TYPE - mode |= byte(_COPY_BLOCK_MASK) + blockTransformType = transform.NONE_TYPE + blockEntropyType = entropy.NONE_TYPE + mode |= _COPY_BLOCK_MASK } else { if skipOpt, hasKey := this.ctx["skipBlocks"]; hasKey == true { if skipOpt.(bool) == true { - skip := false - - if this.blockLength >= 8 { - skip = internal.IsDataCompressed(internal.GetMagicType(data)) - } + skip := internal.IsDataCompressed(internal.GetMagicType(data)) if skip == false { histo := [256]int{} @@ -601,8 +614,8 @@ func (this *encodingTask) encode(res *encodingTaskResult) { } if skip == true { - this.blockTransformType = transform.NONE_TYPE - this.blockEntropyType = entropy.NONE_TYPE + blockTransformType = transform.NONE_TYPE + blockEntropyType = entropy.NONE_TYPE mode |= _COPY_BLOCK_MASK } } @@ -610,41 +623,54 @@ func (this *encodingTask) encode(res *encodingTaskResult) { } this.ctx["size"] = this.blockLength - t, err := transform.New(&this.ctx, this.blockTransformType) + t := this.info.transform - if err != nil { - res.err = &IOError{msg: err.Error(), code: kanzi.ERR_CREATE_CODEC} - return - } + if t == nil { + var err error - requiredSize := t.MaxEncodedLen(int(this.blockLength)) + if t, err = transform.New(&this.ctx, this.info.transformType); err != nil { + res.err = &IOError{msg: err.Error(), code: kanzi.ERR_CREATE_CODEC} + return + } - if this.blockLength >= 4 { - magic := internal.GetMagicType(data) + this.info.transform = t + } - if internal.IsDataCompressed(magic) == true { - this.ctx["dataType"] = internal.DT_BIN - } else if internal.IsDataMultimedia(magic) == true { - this.ctx["dataType"] = internal.DT_MULTIMEDIA - } else if internal.IsDataExecutable(magic) == true { - this.ctx["dataType"] = internal.DT_EXE + if blockTransformType == transform.NONE_TYPE && this.info.transformType != transform.NONE_TYPE { + // Null trasnsform for small blocks + var err error + + if t, err = transform.New(&this.ctx, transform.NONE_TYPE); err != nil { + res.err = &IOError{msg: err.Error(), code: kanzi.ERR_CREATE_CODEC} + return } } - if len(this.iBuffer.Buf) < requiredSize { - extraBuf := make([]byte, requiredSize-len(this.iBuffer.Buf)) + requiredSize := t.MaxEncodedLen(int(this.blockLength)) + magic := internal.GetMagicType(data) + + if internal.IsDataCompressed(magic) == true { + this.ctx["dataType"] = internal.DT_BIN + } else if internal.IsDataMultimedia(magic) == true { + this.ctx["dataType"] = internal.DT_MULTIMEDIA + } else if internal.IsDataExecutable(magic) == true { + this.ctx["dataType"] = internal.DT_EXE + } + + if len(this.info.iBuffer.Buf) < requiredSize { + extraBuf := make([]byte, requiredSize-len(this.info.iBuffer.Buf)) data = append(data, extraBuf...) - this.iBuffer.Buf = data + this.info.iBuffer.Buf = data } - if len(this.oBuffer.Buf) < requiredSize { - extraBuf := make([]byte, requiredSize-len(this.oBuffer.Buf)) + if len(this.info.oBuffer.Buf) < requiredSize { + extraBuf := make([]byte, requiredSize-len(this.info.oBuffer.Buf)) buffer = append(buffer, extraBuf...) - this.oBuffer.Buf = buffer + this.info.oBuffer.Buf = buffer } // Forward transform (ignore error, encode skipFlags) - _, postTransformLength, _ := t.Forward(data[0:this.blockLength], buffer) + _, postTransformLength, _ := this.info.transform.Forward(data[0:this.blockLength], buffer) this.ctx["size"] = postTransformLength dataSize := uint(1) @@ -660,11 +686,11 @@ func (this *encodingTask) encode(res *encodingTaskResult) { // Record size of 'block size' - 1 in bytes mode |= byte(((dataSize - 1) & 0x03) << 5) - if len(this.listeners) > 0 { + if len(this.info.listeners) > 0 { // Notify after transform evt := kanzi.NewEvent(kanzi.EVT_AFTER_TRANSFORM, int(this.currentBlockID), - int64(postTransformLength), checksum, this.hasher != nil, time.Now()) - notifyListeners(this.listeners, evt) + int64(postTransformLength), checksum, this.info.hasher != nil, time.Now()) + notifyListeners(this.info.listeners, evt) } bufSize := postTransformLength @@ -700,20 +726,20 @@ func (this *encodingTask) encode(res *encodingTaskResult) { obs.WriteBits(uint64(postTransformLength), 8*dataSize) // Write checksum - if this.hasher != nil { + if this.info.hasher != nil { obs.WriteBits(uint64(checksum), 32) } - if len(this.listeners) > 0 { + if len(this.info.listeners) > 0 { // Notify before entropy evt := kanzi.NewEvent(kanzi.EVT_BEFORE_ENTROPY, int(this.currentBlockID), - int64(postTransformLength), checksum, this.hasher != nil, time.Now()) - notifyListeners(this.listeners, evt) + int64(postTransformLength), checksum, this.info.hasher != nil, time.Now()) + notifyListeners(this.info.listeners, evt) } // Each block is encoded separately // Rebuild the entropy encoder to reset block statistics - ee, err := entropy.NewEntropyEncoder(obs, this.ctx, this.blockEntropyType) + ee, err := entropy.NewEntropyEncoder(obs, this.ctx, blockEntropyType) if err != nil { res.err = &IOError{msg: err.Error(), code: kanzi.ERR_CREATE_CODEC} @@ -743,14 +769,16 @@ func (this *encodingTask) encode(res *encodingTaskResult) { break } - runtime.Gosched() + if n&0x1F == 0 { + runtime.Gosched() + } } - if len(this.listeners) > 0 { + if len(this.info.listeners) > 0 { // Notify after entropy evt := kanzi.NewEvent(kanzi.EVT_AFTER_ENTROPY, int(this.currentBlockID), - int64((written+7)>>3), checksum, this.hasher != nil, time.Now()) - notifyListeners(this.listeners, evt) + int64((written+7)>>3), checksum, this.info.hasher != nil, time.Now()) + notifyListeners(this.info.listeners, evt) } // Emit block size in bits (max size pre-entropy is 1 GB = 1 << 30 bytes) @@ -825,21 +853,27 @@ type Reader struct { listeners []kanzi.Listener ctx map[string]any headless bool + taskInfos []decodingTaskInfo } type decodingTask struct { - iBuffer *blockBuffer - oBuffer *blockBuffer - hasher *hash.XXHash32 - blockLength uint - blockTransformType uint64 - blockEntropyType uint32 - currentBlockID int32 - processedBlockID *int32 - wg *sync.WaitGroup - listeners []kanzi.Listener - ibs kanzi.InputBitStream - ctx map[string]any + info *decodingTaskInfo + blockLength uint + currentBlockID int32 + processedBlockID *int32 + wg *sync.WaitGroup + ibs kanzi.InputBitStream + ctx map[string]any +} + +type decodingTaskInfo struct { + iBuffer *blockBuffer + oBuffer *blockBuffer + hasher *hash.XXHash32 + transform *transform.ByteTransformSequence + listeners []kanzi.Listener + transformType uint64 + entropyType uint32 } // NewReader creates a new instance of Reader. @@ -896,6 +930,7 @@ func createReaderWithCtx(ibs kanzi.InputBitStream, ctx map[string]any) (*Reader, this.consumed = 0 this.available = 0 this.bufferThreshold = 0 + this.taskInfos = make([]decodingTaskInfo, this.jobs) this.buffers = make([]blockBuffer, 2*this.jobs) for i := range this.buffers { @@ -907,6 +942,7 @@ func createReaderWithCtx(ibs kanzi.InputBitStream, ctx map[string]any) (*Reader, this.blockSize = 0 this.entropyType = entropy.NONE_TYPE this.transformType = transform.NONE_TYPE + this.headless = false if hdl, hasKey := ctx["headerless"]; hasKey == true { this.headless = hdl.(bool) @@ -915,8 +951,6 @@ func createReaderWithCtx(ibs kanzi.InputBitStream, ctx map[string]any) (*Reader, if err := this.validateHeaderless(); err != nil { return nil, err } - } else { - this.headless = false } return this, nil @@ -1290,19 +1324,26 @@ func (this *Reader) processBlock() (int, error) { results[taskID] = decodingTaskResult{} wg.Add(1) + if firstID == 0 { + // Create the task static infos + this.taskInfos[taskID] = decodingTaskInfo{ + hasher: this.hasher, + transform: nil, + listeners: listeners, + iBuffer: &this.buffers[taskID], + oBuffer: &this.buffers[this.jobs+taskID], + transformType: this.transformType, + entropyType: this.entropyType} + } + task := decodingTask{ - iBuffer: &this.buffers[taskID], - oBuffer: &this.buffers[this.jobs+taskID], - hasher: this.hasher, - blockLength: uint(blkSize), - blockTransformType: this.transformType, - blockEntropyType: this.entropyType, - currentBlockID: firstID + int32(taskID) + 1, - processedBlockID: &this.blockID, - wg: &wg, - listeners: listeners, - ibs: this.ibs, - ctx: copyCtx} + info: &this.taskInfos[taskID], + blockLength: uint(blkSize), + currentBlockID: firstID + int32(taskID) + 1, + processedBlockID: &this.blockID, + wg: &wg, + ibs: this.ibs, + ctx: copyCtx} // Invoke the tasks concurrently go task.decode(&results[taskID]) @@ -1369,14 +1410,16 @@ func (this *Reader) GetRead() uint64 { // // then 0byyyyyyyy => transform sequence skip flags (1 means skip) func (this *decodingTask) decode(res *decodingTaskResult) { - data := this.iBuffer.Buf - buffer := this.oBuffer.Buf + data := this.info.iBuffer.Buf + buffer := this.info.oBuffer.Buf decoded := 0 checksum1 := uint32(0) skipped := false + blockTransformType := this.info.transformType + blockEntropyType := this.info.entropyType defer func() { - res.data = this.iBuffer.Buf + res.data = this.info.iBuffer.Buf res.decoded = decoded res.blockID = int(this.currentBlockID) res.completionTime = time.Now() @@ -1409,7 +1452,9 @@ func (this *decodingTask) decode(res *decodingTaskResult) { break } - runtime.Gosched() + if n&0x1F == 0 { + runtime.Gosched() + } } // Read shared bitstream sequentially @@ -1435,7 +1480,7 @@ func (this *decodingTask) decode(res *decodingTaskResult) { if len(data) < maxL { extraBuf := make([]byte, maxL-len(data)) buffer = append(data, extraBuf...) - this.iBuffer.Buf = data + this.info.iBuffer.Buf = data } // Read data from shared bitstream @@ -1479,8 +1524,8 @@ func (this *decodingTask) decode(res *decodingTaskResult) { skipFlags := byte(0) if mode&_COPY_BLOCK_MASK != 0 { - this.blockTransformType = transform.NONE_TYPE - this.blockEntropyType = entropy.NONE_TYPE + blockTransformType = transform.NONE_TYPE + blockEntropyType = entropy.NONE_TYPE } else { if mode&_TRANSFORMS_MASK != 0 { skipFlags = byte(ibs.ReadBits(8)) @@ -1501,21 +1546,21 @@ func (this *decodingTask) decode(res *decodingTaskResult) { if preTransformLength > _MAX_BITSTREAM_BLOCK_SIZE { // Error => cancel concurrent decoding tasks - errMsg := fmt.Sprintf("Invalid compressed block length: %d", preTransformLength) + errMsg := fmt.Sprintf("Invalid compressed block size: %d", preTransformLength) res.err = &IOError{msg: errMsg, code: kanzi.ERR_BLOCK_SIZE} return } // Extract checksum from bit stream (if any) - if this.hasher != nil { + if this.info.hasher != nil { checksum1 = uint32(ibs.ReadBits(32)) } - if len(this.listeners) > 0 { + if len(this.info.listeners) > 0 { // Notify before entropy (block size in bitstream is unknown) evt := kanzi.NewEvent(kanzi.EVT_BEFORE_ENTROPY, int(this.currentBlockID), - int64(-1), checksum1, this.hasher != nil, time.Now()) - notifyListeners(this.listeners, evt) + int64(-1), checksum1, this.info.hasher != nil, time.Now()) + notifyListeners(this.info.listeners, evt) } bufferSize := this.blockLength @@ -1527,14 +1572,14 @@ func (this *decodingTask) decode(res *decodingTaskResult) { if len(buffer) < int(bufferSize) { extraBuf := make([]byte, int(bufferSize)-len(buffer)) buffer = append(buffer, extraBuf...) - this.oBuffer.Buf = buffer + this.info.oBuffer.Buf = buffer } this.ctx["size"] = preTransformLength // Each block is decoded separately // Rebuild the entropy decoder to reset block statistics - ed, err := entropy.NewEntropyDecoder(ibs, this.ctx, this.blockEntropyType) + ed, err := entropy.NewEntropyDecoder(ibs, this.ctx, blockEntropyType) if err != nil { // Error => cancel concurrent decoding tasks @@ -1542,8 +1587,6 @@ func (this *decodingTask) decode(res *decodingTaskResult) { return } - defer ed.Dispose() - // Block entropy decode if _, err = ed.Read(buffer[0:preTransformLength]); err != nil { // Error => cancel concurrent decoding tasks @@ -1551,36 +1594,52 @@ func (this *decodingTask) decode(res *decodingTaskResult) { return } + ed.Dispose() ibs.Close() - if len(this.listeners) > 0 { + if len(this.info.listeners) > 0 { // Notify after entropy evt := kanzi.NewEvent(kanzi.EVT_AFTER_ENTROPY, int(this.currentBlockID), - int64(ibs.Read())/8, checksum1, this.hasher != nil, time.Now()) - notifyListeners(this.listeners, evt) + int64(ibs.Read())/8, checksum1, this.info.hasher != nil, time.Now()) + notifyListeners(this.info.listeners, evt) } - if len(this.listeners) > 0 { + if len(this.info.listeners) > 0 { // Notify before transform evt := kanzi.NewEvent(kanzi.EVT_BEFORE_TRANSFORM, int(this.currentBlockID), - int64(preTransformLength), checksum1, this.hasher != nil, time.Now()) - notifyListeners(this.listeners, evt) + int64(preTransformLength), checksum1, this.info.hasher != nil, time.Now()) + notifyListeners(this.info.listeners, evt) } this.ctx["size"] = preTransformLength - transform, err := transform.New(&this.ctx, this.blockTransformType) + t := this.info.transform - if err != nil { - // Error => return - res.err = &IOError{msg: err.Error(), code: kanzi.ERR_INVALID_CODEC} - return + if t == nil { + var err error + + if t, err = transform.New(&this.ctx, this.info.transformType); err != nil { + res.err = &IOError{msg: err.Error(), code: kanzi.ERR_CREATE_CODEC} + return + } + + this.info.transform = t } - transform.SetSkipFlags(skipFlags) + if blockTransformType == transform.NONE_TYPE && this.info.transformType != transform.NONE_TYPE { + // Null trasnsform for small blocks + var err error + + if t, err = transform.New(&this.ctx, transform.NONE_TYPE); err != nil { + res.err = &IOError{msg: err.Error(), code: kanzi.ERR_CREATE_CODEC} + return + } + } + + t.SetSkipFlags(skipFlags) var oIdx uint // Inverse transform - if _, oIdx, err = transform.Inverse(buffer[0:preTransformLength], data); err != nil { + if _, oIdx, err = t.Inverse(buffer[0:preTransformLength], data); err != nil { // Error => return res.err = &IOError{msg: err.Error(), code: kanzi.ERR_PROCESS_BLOCK} return @@ -1589,8 +1648,8 @@ func (this *decodingTask) decode(res *decodingTaskResult) { decoded = int(oIdx) // Verify checksum - if this.hasher != nil { - checksum2 := this.hasher.Hash(data[0:decoded]) + if this.info.hasher != nil { + checksum2 := this.info.hasher.Hash(data[0:decoded]) if checksum2 != checksum1 { errMsg := fmt.Sprintf("Corrupted bitstream: expected checksum %x, found %x", checksum1, checksum2) diff --git a/v2/io/CompressedStream_test.go b/v2/io/CompressedStream_test.go index 7127ccfc..651fa9ae 100644 --- a/v2/io/CompressedStream_test.go +++ b/v2/io/CompressedStream_test.go @@ -102,7 +102,7 @@ func compress(block []byte, entropy, transform string) int { blockSize = uint((len(block) / (n + 1)) & -16) } - fmt.Printf("Block size: %v, jobs: %v \n", blockSize, jobs) + fmt.Printf("Block size: %v, jobs: %v \n", blockSize, jobs) { // Create an io.WriteCloser