From eb02406a8fe3afaaa32862c9a9101b481a50a03d Mon Sep 17 00:00:00 2001 From: Frederic Langlet Date: Fri, 19 Jan 2024 15:06:00 -0800 Subject: [PATCH] Build transforms once per job instead of once per block. It saves many transform creations but also the memory allocations in the trasnforms. It works because transforms do not retain any state between calls to forward()/inverse(). --- v2/entropy/RiceGolombCodec.go | 2 +- v2/io/CompressedStream.go | 321 +++++++++++++++++++-------------- v2/io/CompressedStream_test.go | 2 +- 3 files changed, 192 insertions(+), 133 deletions(-) diff --git a/v2/entropy/RiceGolombCodec.go b/v2/entropy/RiceGolombCodec.go index 2ee7e51f..9300aaca 100644 --- a/v2/entropy/RiceGolombCodec.go +++ b/v2/entropy/RiceGolombCodec.go @@ -111,7 +111,7 @@ type RiceGolombDecoder struct { bitstream kanzi.InputBitStream } -// NewRiceGolombDecoder creates a new instance of ExpGolombDecoder +// NewRiceGolombDecoder creates a new instance of RiceGolombDecoder // If sgn is true, values from the bitstream will be decoded as signed (int8) func NewRiceGolombDecoder(bs kanzi.InputBitStream, sgn bool, logBase uint) (*RiceGolombDecoder, error) { if bs == nil { diff --git a/v2/io/CompressedStream.go b/v2/io/CompressedStream.go index e610ff64..1a79edb8 100644 --- a/v2/io/CompressedStream.go +++ b/v2/io/CompressedStream.go @@ -101,27 +101,33 @@ type Writer struct { listeners []kanzi.Listener ctx map[string]any headless bool + taskInfos []encodingTaskInfo } type encodingTask struct { - iBuffer *blockBuffer - oBuffer *blockBuffer - hasher *hash.XXHash32 - blockLength uint - blockTransformType uint64 - blockEntropyType uint32 - currentBlockID int32 - processedBlockID *int32 - wg *sync.WaitGroup - listeners []kanzi.Listener - obs kanzi.OutputBitStream - ctx map[string]any + info *encodingTaskInfo + blockLength uint + currentBlockID int32 + processedBlockID *int32 + wg *sync.WaitGroup + obs kanzi.OutputBitStream + ctx map[string]any } type encodingTaskResult struct { err *IOError } +type encodingTaskInfo struct { + iBuffer *blockBuffer + oBuffer *blockBuffer + hasher *hash.XXHash32 + transform *transform.ByteTransformSequence + listeners []kanzi.Listener + transformType uint64 + entropyType uint32 +} + // NewWriter creates a new instance of Writer. // The writer writes compressed data blocks to the provided os. func NewWriter(os io.WriteCloser, entropy, transform string, blockSize, jobs uint, checksum bool) (*Writer, error) { @@ -251,6 +257,7 @@ func createWriterWithCtx(obs kanzi.OutputBitStream, ctx map[string]any) (*Writer ctx["bsVersion"] = uint(_BITSTREAM_FORMAT_VERSION) this.jobs = int(tasks) + this.taskInfos = make([]encodingTaskInfo, this.jobs) this.buffers = make([]blockBuffer, 2*this.jobs) // Allocate first buffer and add padding for incompressible blocks @@ -499,19 +506,27 @@ func (this *Writer) processBlock() error { off += dataLength this.available -= dataLength + if firstID == 0 { + // Create the task static infos + this.taskInfos[taskID] = encodingTaskInfo{ + hasher: this.hasher, + transform: nil, + listeners: listeners, + iBuffer: &this.buffers[taskID], + oBuffer: &this.buffers[this.jobs+taskID], + transformType: this.transformType, + entropyType: this.entropyType} + + } + task := encodingTask{ - iBuffer: &this.buffers[taskID], - oBuffer: &this.buffers[this.jobs+taskID], - hasher: this.hasher, - blockLength: uint(dataLength), - blockTransformType: this.transformType, - blockEntropyType: this.entropyType, - currentBlockID: firstID + int32(taskID) + 1, - processedBlockID: &this.blockID, - wg: &wg, - obs: this.obs, - listeners: listeners, - ctx: copyCtx} + info: &this.taskInfos[taskID], + blockLength: uint(dataLength), + currentBlockID: firstID + int32(taskID) + 1, + processedBlockID: &this.blockID, + wg: &wg, + obs: this.obs, + ctx: copyCtx} // Invoke the tasks concurrently go task.encode(&results[taskID]) @@ -547,10 +562,12 @@ func (this *Writer) GetWritten() uint64 { // // then 0byyyyyyyy => transform sequence skip flags (1 means skip) func (this *encodingTask) encode(res *encodingTaskResult) { - data := this.iBuffer.Buf - buffer := this.oBuffer.Buf + data := this.info.iBuffer.Buf + buffer := this.info.oBuffer.Buf mode := byte(0) checksum := uint32(0) + blockTransformType := this.info.transformType + blockEntropyType := this.info.entropyType defer func() { if r := recover(); r != nil { @@ -568,29 +585,25 @@ func (this *encodingTask) encode(res *encodingTaskResult) { }() // Compute block checksum - if this.hasher != nil { - checksum = this.hasher.Hash(data[0:this.blockLength]) + if this.info.hasher != nil { + checksum = this.info.hasher.Hash(data[0:this.blockLength]) } - if len(this.listeners) > 0 { + if len(this.info.listeners) > 0 { // Notify before transform evt := kanzi.NewEvent(kanzi.EVT_BEFORE_TRANSFORM, int(this.currentBlockID), - int64(this.blockLength), checksum, this.hasher != nil, time.Now()) - notifyListeners(this.listeners, evt) + int64(this.blockLength), checksum, this.info.hasher != nil, time.Now()) + notifyListeners(this.info.listeners, evt) } if this.blockLength <= _SMALL_BLOCK_SIZE { - this.blockTransformType = transform.NONE_TYPE - this.blockEntropyType = entropy.NONE_TYPE - mode |= byte(_COPY_BLOCK_MASK) + blockTransformType = transform.NONE_TYPE + blockEntropyType = entropy.NONE_TYPE + mode |= _COPY_BLOCK_MASK } else { if skipOpt, hasKey := this.ctx["skipBlocks"]; hasKey == true { if skipOpt.(bool) == true { - skip := false - - if this.blockLength >= 8 { - skip = internal.IsDataCompressed(internal.GetMagicType(data)) - } + skip := internal.IsDataCompressed(internal.GetMagicType(data)) if skip == false { histo := [256]int{} @@ -601,8 +614,8 @@ func (this *encodingTask) encode(res *encodingTaskResult) { } if skip == true { - this.blockTransformType = transform.NONE_TYPE - this.blockEntropyType = entropy.NONE_TYPE + blockTransformType = transform.NONE_TYPE + blockEntropyType = entropy.NONE_TYPE mode |= _COPY_BLOCK_MASK } } @@ -610,41 +623,54 @@ func (this *encodingTask) encode(res *encodingTaskResult) { } this.ctx["size"] = this.blockLength - t, err := transform.New(&this.ctx, this.blockTransformType) + t := this.info.transform - if err != nil { - res.err = &IOError{msg: err.Error(), code: kanzi.ERR_CREATE_CODEC} - return - } + if t == nil { + var err error - requiredSize := t.MaxEncodedLen(int(this.blockLength)) + if t, err = transform.New(&this.ctx, this.info.transformType); err != nil { + res.err = &IOError{msg: err.Error(), code: kanzi.ERR_CREATE_CODEC} + return + } - if this.blockLength >= 4 { - magic := internal.GetMagicType(data) + this.info.transform = t + } - if internal.IsDataCompressed(magic) == true { - this.ctx["dataType"] = internal.DT_BIN - } else if internal.IsDataMultimedia(magic) == true { - this.ctx["dataType"] = internal.DT_MULTIMEDIA - } else if internal.IsDataExecutable(magic) == true { - this.ctx["dataType"] = internal.DT_EXE + if blockTransformType == transform.NONE_TYPE && this.info.transformType != transform.NONE_TYPE { + // Null trasnsform for small blocks + var err error + + if t, err = transform.New(&this.ctx, transform.NONE_TYPE); err != nil { + res.err = &IOError{msg: err.Error(), code: kanzi.ERR_CREATE_CODEC} + return } } - if len(this.iBuffer.Buf) < requiredSize { - extraBuf := make([]byte, requiredSize-len(this.iBuffer.Buf)) + requiredSize := t.MaxEncodedLen(int(this.blockLength)) + magic := internal.GetMagicType(data) + + if internal.IsDataCompressed(magic) == true { + this.ctx["dataType"] = internal.DT_BIN + } else if internal.IsDataMultimedia(magic) == true { + this.ctx["dataType"] = internal.DT_MULTIMEDIA + } else if internal.IsDataExecutable(magic) == true { + this.ctx["dataType"] = internal.DT_EXE + } + + if len(this.info.iBuffer.Buf) < requiredSize { + extraBuf := make([]byte, requiredSize-len(this.info.iBuffer.Buf)) data = append(data, extraBuf...) - this.iBuffer.Buf = data + this.info.iBuffer.Buf = data } - if len(this.oBuffer.Buf) < requiredSize { - extraBuf := make([]byte, requiredSize-len(this.oBuffer.Buf)) + if len(this.info.oBuffer.Buf) < requiredSize { + extraBuf := make([]byte, requiredSize-len(this.info.oBuffer.Buf)) buffer = append(buffer, extraBuf...) - this.oBuffer.Buf = buffer + this.info.oBuffer.Buf = buffer } // Forward transform (ignore error, encode skipFlags) - _, postTransformLength, _ := t.Forward(data[0:this.blockLength], buffer) + _, postTransformLength, _ := this.info.transform.Forward(data[0:this.blockLength], buffer) this.ctx["size"] = postTransformLength dataSize := uint(1) @@ -660,11 +686,11 @@ func (this *encodingTask) encode(res *encodingTaskResult) { // Record size of 'block size' - 1 in bytes mode |= byte(((dataSize - 1) & 0x03) << 5) - if len(this.listeners) > 0 { + if len(this.info.listeners) > 0 { // Notify after transform evt := kanzi.NewEvent(kanzi.EVT_AFTER_TRANSFORM, int(this.currentBlockID), - int64(postTransformLength), checksum, this.hasher != nil, time.Now()) - notifyListeners(this.listeners, evt) + int64(postTransformLength), checksum, this.info.hasher != nil, time.Now()) + notifyListeners(this.info.listeners, evt) } bufSize := postTransformLength @@ -700,20 +726,20 @@ func (this *encodingTask) encode(res *encodingTaskResult) { obs.WriteBits(uint64(postTransformLength), 8*dataSize) // Write checksum - if this.hasher != nil { + if this.info.hasher != nil { obs.WriteBits(uint64(checksum), 32) } - if len(this.listeners) > 0 { + if len(this.info.listeners) > 0 { // Notify before entropy evt := kanzi.NewEvent(kanzi.EVT_BEFORE_ENTROPY, int(this.currentBlockID), - int64(postTransformLength), checksum, this.hasher != nil, time.Now()) - notifyListeners(this.listeners, evt) + int64(postTransformLength), checksum, this.info.hasher != nil, time.Now()) + notifyListeners(this.info.listeners, evt) } // Each block is encoded separately // Rebuild the entropy encoder to reset block statistics - ee, err := entropy.NewEntropyEncoder(obs, this.ctx, this.blockEntropyType) + ee, err := entropy.NewEntropyEncoder(obs, this.ctx, blockEntropyType) if err != nil { res.err = &IOError{msg: err.Error(), code: kanzi.ERR_CREATE_CODEC} @@ -743,14 +769,16 @@ func (this *encodingTask) encode(res *encodingTaskResult) { break } - runtime.Gosched() + if n&0x1F == 0 { + runtime.Gosched() + } } - if len(this.listeners) > 0 { + if len(this.info.listeners) > 0 { // Notify after entropy evt := kanzi.NewEvent(kanzi.EVT_AFTER_ENTROPY, int(this.currentBlockID), - int64((written+7)>>3), checksum, this.hasher != nil, time.Now()) - notifyListeners(this.listeners, evt) + int64((written+7)>>3), checksum, this.info.hasher != nil, time.Now()) + notifyListeners(this.info.listeners, evt) } // Emit block size in bits (max size pre-entropy is 1 GB = 1 << 30 bytes) @@ -825,21 +853,27 @@ type Reader struct { listeners []kanzi.Listener ctx map[string]any headless bool + taskInfos []decodingTaskInfo } type decodingTask struct { - iBuffer *blockBuffer - oBuffer *blockBuffer - hasher *hash.XXHash32 - blockLength uint - blockTransformType uint64 - blockEntropyType uint32 - currentBlockID int32 - processedBlockID *int32 - wg *sync.WaitGroup - listeners []kanzi.Listener - ibs kanzi.InputBitStream - ctx map[string]any + info *decodingTaskInfo + blockLength uint + currentBlockID int32 + processedBlockID *int32 + wg *sync.WaitGroup + ibs kanzi.InputBitStream + ctx map[string]any +} + +type decodingTaskInfo struct { + iBuffer *blockBuffer + oBuffer *blockBuffer + hasher *hash.XXHash32 + transform *transform.ByteTransformSequence + listeners []kanzi.Listener + transformType uint64 + entropyType uint32 } // NewReader creates a new instance of Reader. @@ -896,6 +930,7 @@ func createReaderWithCtx(ibs kanzi.InputBitStream, ctx map[string]any) (*Reader, this.consumed = 0 this.available = 0 this.bufferThreshold = 0 + this.taskInfos = make([]decodingTaskInfo, this.jobs) this.buffers = make([]blockBuffer, 2*this.jobs) for i := range this.buffers { @@ -907,6 +942,7 @@ func createReaderWithCtx(ibs kanzi.InputBitStream, ctx map[string]any) (*Reader, this.blockSize = 0 this.entropyType = entropy.NONE_TYPE this.transformType = transform.NONE_TYPE + this.headless = false if hdl, hasKey := ctx["headerless"]; hasKey == true { this.headless = hdl.(bool) @@ -915,8 +951,6 @@ func createReaderWithCtx(ibs kanzi.InputBitStream, ctx map[string]any) (*Reader, if err := this.validateHeaderless(); err != nil { return nil, err } - } else { - this.headless = false } return this, nil @@ -1290,19 +1324,26 @@ func (this *Reader) processBlock() (int, error) { results[taskID] = decodingTaskResult{} wg.Add(1) + if firstID == 0 { + // Create the task static infos + this.taskInfos[taskID] = decodingTaskInfo{ + hasher: this.hasher, + transform: nil, + listeners: listeners, + iBuffer: &this.buffers[taskID], + oBuffer: &this.buffers[this.jobs+taskID], + transformType: this.transformType, + entropyType: this.entropyType} + } + task := decodingTask{ - iBuffer: &this.buffers[taskID], - oBuffer: &this.buffers[this.jobs+taskID], - hasher: this.hasher, - blockLength: uint(blkSize), - blockTransformType: this.transformType, - blockEntropyType: this.entropyType, - currentBlockID: firstID + int32(taskID) + 1, - processedBlockID: &this.blockID, - wg: &wg, - listeners: listeners, - ibs: this.ibs, - ctx: copyCtx} + info: &this.taskInfos[taskID], + blockLength: uint(blkSize), + currentBlockID: firstID + int32(taskID) + 1, + processedBlockID: &this.blockID, + wg: &wg, + ibs: this.ibs, + ctx: copyCtx} // Invoke the tasks concurrently go task.decode(&results[taskID]) @@ -1369,14 +1410,16 @@ func (this *Reader) GetRead() uint64 { // // then 0byyyyyyyy => transform sequence skip flags (1 means skip) func (this *decodingTask) decode(res *decodingTaskResult) { - data := this.iBuffer.Buf - buffer := this.oBuffer.Buf + data := this.info.iBuffer.Buf + buffer := this.info.oBuffer.Buf decoded := 0 checksum1 := uint32(0) skipped := false + blockTransformType := this.info.transformType + blockEntropyType := this.info.entropyType defer func() { - res.data = this.iBuffer.Buf + res.data = this.info.iBuffer.Buf res.decoded = decoded res.blockID = int(this.currentBlockID) res.completionTime = time.Now() @@ -1409,7 +1452,9 @@ func (this *decodingTask) decode(res *decodingTaskResult) { break } - runtime.Gosched() + if n&0x1F == 0 { + runtime.Gosched() + } } // Read shared bitstream sequentially @@ -1435,7 +1480,7 @@ func (this *decodingTask) decode(res *decodingTaskResult) { if len(data) < maxL { extraBuf := make([]byte, maxL-len(data)) buffer = append(data, extraBuf...) - this.iBuffer.Buf = data + this.info.iBuffer.Buf = data } // Read data from shared bitstream @@ -1479,8 +1524,8 @@ func (this *decodingTask) decode(res *decodingTaskResult) { skipFlags := byte(0) if mode&_COPY_BLOCK_MASK != 0 { - this.blockTransformType = transform.NONE_TYPE - this.blockEntropyType = entropy.NONE_TYPE + blockTransformType = transform.NONE_TYPE + blockEntropyType = entropy.NONE_TYPE } else { if mode&_TRANSFORMS_MASK != 0 { skipFlags = byte(ibs.ReadBits(8)) @@ -1501,21 +1546,21 @@ func (this *decodingTask) decode(res *decodingTaskResult) { if preTransformLength > _MAX_BITSTREAM_BLOCK_SIZE { // Error => cancel concurrent decoding tasks - errMsg := fmt.Sprintf("Invalid compressed block length: %d", preTransformLength) + errMsg := fmt.Sprintf("Invalid compressed block size: %d", preTransformLength) res.err = &IOError{msg: errMsg, code: kanzi.ERR_BLOCK_SIZE} return } // Extract checksum from bit stream (if any) - if this.hasher != nil { + if this.info.hasher != nil { checksum1 = uint32(ibs.ReadBits(32)) } - if len(this.listeners) > 0 { + if len(this.info.listeners) > 0 { // Notify before entropy (block size in bitstream is unknown) evt := kanzi.NewEvent(kanzi.EVT_BEFORE_ENTROPY, int(this.currentBlockID), - int64(-1), checksum1, this.hasher != nil, time.Now()) - notifyListeners(this.listeners, evt) + int64(-1), checksum1, this.info.hasher != nil, time.Now()) + notifyListeners(this.info.listeners, evt) } bufferSize := this.blockLength @@ -1527,14 +1572,14 @@ func (this *decodingTask) decode(res *decodingTaskResult) { if len(buffer) < int(bufferSize) { extraBuf := make([]byte, int(bufferSize)-len(buffer)) buffer = append(buffer, extraBuf...) - this.oBuffer.Buf = buffer + this.info.oBuffer.Buf = buffer } this.ctx["size"] = preTransformLength // Each block is decoded separately // Rebuild the entropy decoder to reset block statistics - ed, err := entropy.NewEntropyDecoder(ibs, this.ctx, this.blockEntropyType) + ed, err := entropy.NewEntropyDecoder(ibs, this.ctx, blockEntropyType) if err != nil { // Error => cancel concurrent decoding tasks @@ -1542,8 +1587,6 @@ func (this *decodingTask) decode(res *decodingTaskResult) { return } - defer ed.Dispose() - // Block entropy decode if _, err = ed.Read(buffer[0:preTransformLength]); err != nil { // Error => cancel concurrent decoding tasks @@ -1551,36 +1594,52 @@ func (this *decodingTask) decode(res *decodingTaskResult) { return } + ed.Dispose() ibs.Close() - if len(this.listeners) > 0 { + if len(this.info.listeners) > 0 { // Notify after entropy evt := kanzi.NewEvent(kanzi.EVT_AFTER_ENTROPY, int(this.currentBlockID), - int64(ibs.Read())/8, checksum1, this.hasher != nil, time.Now()) - notifyListeners(this.listeners, evt) + int64(ibs.Read())/8, checksum1, this.info.hasher != nil, time.Now()) + notifyListeners(this.info.listeners, evt) } - if len(this.listeners) > 0 { + if len(this.info.listeners) > 0 { // Notify before transform evt := kanzi.NewEvent(kanzi.EVT_BEFORE_TRANSFORM, int(this.currentBlockID), - int64(preTransformLength), checksum1, this.hasher != nil, time.Now()) - notifyListeners(this.listeners, evt) + int64(preTransformLength), checksum1, this.info.hasher != nil, time.Now()) + notifyListeners(this.info.listeners, evt) } this.ctx["size"] = preTransformLength - transform, err := transform.New(&this.ctx, this.blockTransformType) + t := this.info.transform - if err != nil { - // Error => return - res.err = &IOError{msg: err.Error(), code: kanzi.ERR_INVALID_CODEC} - return + if t == nil { + var err error + + if t, err = transform.New(&this.ctx, this.info.transformType); err != nil { + res.err = &IOError{msg: err.Error(), code: kanzi.ERR_CREATE_CODEC} + return + } + + this.info.transform = t } - transform.SetSkipFlags(skipFlags) + if blockTransformType == transform.NONE_TYPE && this.info.transformType != transform.NONE_TYPE { + // Null trasnsform for small blocks + var err error + + if t, err = transform.New(&this.ctx, transform.NONE_TYPE); err != nil { + res.err = &IOError{msg: err.Error(), code: kanzi.ERR_CREATE_CODEC} + return + } + } + + t.SetSkipFlags(skipFlags) var oIdx uint // Inverse transform - if _, oIdx, err = transform.Inverse(buffer[0:preTransformLength], data); err != nil { + if _, oIdx, err = t.Inverse(buffer[0:preTransformLength], data); err != nil { // Error => return res.err = &IOError{msg: err.Error(), code: kanzi.ERR_PROCESS_BLOCK} return @@ -1589,8 +1648,8 @@ func (this *decodingTask) decode(res *decodingTaskResult) { decoded = int(oIdx) // Verify checksum - if this.hasher != nil { - checksum2 := this.hasher.Hash(data[0:decoded]) + if this.info.hasher != nil { + checksum2 := this.info.hasher.Hash(data[0:decoded]) if checksum2 != checksum1 { errMsg := fmt.Sprintf("Corrupted bitstream: expected checksum %x, found %x", checksum1, checksum2) diff --git a/v2/io/CompressedStream_test.go b/v2/io/CompressedStream_test.go index 7127ccfc..651fa9ae 100644 --- a/v2/io/CompressedStream_test.go +++ b/v2/io/CompressedStream_test.go @@ -102,7 +102,7 @@ func compress(block []byte, entropy, transform string) int { blockSize = uint((len(block) / (n + 1)) & -16) } - fmt.Printf("Block size: %v, jobs: %v \n", blockSize, jobs) + fmt.Printf("Block size: %v, jobs: %v \n", blockSize, jobs) { // Create an io.WriteCloser