Skip to content

Commit

Permalink
Do not adjust the number of jobs if the number of blocks is not avail…
Browse files Browse the repository at this point in the history
…able
  • Loading branch information
flanglet committed Apr 5, 2024
1 parent 2c40a43 commit 7685f17
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 39 deletions.
2 changes: 1 addition & 1 deletion v2/app/BlockDecompressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func (this *fileDecompressTask) call() (int, uint64) {
delta := after.Sub(before).Nanoseconds() / 1000000 // convert to ms

// If the whole input stream has been decoded and the original data size is present,
// check that the output size match the original data size.
// check that the output size matches the original data size.
_, hasTo := this.ctx["to"]
_, hasFrom := this.ctx["from"]

Expand Down
63 changes: 25 additions & 38 deletions v2/io/CompressedStream.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,12 @@ func createWriterWithCtx(obs kanzi.OutputBitStream, ctx map[string]any) (*Writer
nbBlocks := 0

// If input size has been provided, calculate the number of blocks
// in the input data else use 0. A value of 63 means '63 or more blocks'.
// This value is written to the bitstream header to let the decoder make
// better decisions about memory usage and job allocation in concurrent
// decompression scenario.
if val, hasKey := ctx["fileSize"]; hasKey {
this.inputSize = val.(int64)
nbBlocks = int((this.inputSize + int64(bSize-1)) / int64(bSize))
}

this.nbInputBlocks = max(min(nbBlocks, _MAX_CONCURRENCY-1), 1)
this.nbInputBlocks = max(min(nbBlocks, _MAX_CONCURRENCY-1), 0)

if checksum := ctx["checksum"].(bool); checksum == true {
var err error
Expand All @@ -248,12 +244,7 @@ func createWriterWithCtx(obs kanzi.OutputBitStream, ctx map[string]any) (*Writer
this.buffers = make([]blockBuffer, 2*this.jobs)

// Allocate first buffer and add padding for incompressible blocks
bufSize := this.blockSize + this.blockSize>>6

if bufSize < 65536 {
bufSize = 65536
}

bufSize := max(this.blockSize+this.blockSize>>6, 65536)
this.buffers[0] = blockBuffer{Buf: make([]byte, bufSize)}
this.buffers[this.jobs] = blockBuffer{Buf: make([]byte, 0)}

Expand Down Expand Up @@ -327,14 +318,19 @@ func (this *Writer) writeHeader() *IOError {
}

// this.inputSize not provided or >= 2^48 -> 0, <2^16 -> 1, <2^32 -> 2, <2^48 -> 3
szMask := uint(0)
var szMask uint

if this.inputSize != 0 && this.inputSize < (int64(1)<<48) {
if this.inputSize >= int64(1)<<32 {
szMask = 3
} else {
szMask = uint(internal.Log2NoCheck(uint32(this.inputSize))>>4) + 1
}
switch {
case this.inputSize == 0:
szMask = 0
case this.inputSize >= (int64(1)<<48):
szMask = 0
case this.inputSize >= (int64(1)<<32):
szMask = 3
case this.inputSize >= (int64(1)<<16):
szMask = 2
default:
szMask = 1
}

if this.obs.WriteBits(uint64(szMask), 2) != 2 {
Expand Down Expand Up @@ -400,12 +396,7 @@ func (this *Writer) Write(block []byte) (int, error) {
if bufID+1 < this.jobs {
// Current write buffer is full
if len(this.buffers[bufID+1].Buf) == 0 {
bufSize := this.blockSize + this.blockSize>>6

if bufSize < 65536 {
bufSize = 65536
}

bufSize := max(this.blockSize+this.blockSize>>6, 65536)
this.buffers[bufID+1].Buf = make([]byte, bufSize)
}
} else {
Expand Down Expand Up @@ -473,12 +464,12 @@ func (this *Writer) processBlock() error {
nbTasks := this.jobs
var jobsPerTask []uint

// Assign optimal number of tasks and jobs per task
// Assign optimal number of tasks and jobs per task (if the number of blocks is known)
if nbTasks > 1 {
// Limit the number of jobs if there are fewer blocks that this.jobs
// It allows more jobs per task and reduces memory usage.
if nbTasks > this.nbInputBlocks {
nbTasks = this.nbInputBlocks
if this.nbInputBlocks > 0 {
nbTasks = min(nbTasks, this.nbInputBlocks)
}

jobsPerTask, _ = internal.ComputeJobsPerTask(make([]uint, nbTasks), uint(this.jobs), uint(nbTasks))
Expand Down Expand Up @@ -913,7 +904,7 @@ func createReaderWithCtx(ibs kanzi.InputBitStream, ctx map[string]any) (*Reader,
this.consumed = 0
this.available = 0
this.outputSize = 0
this.nbInputBlocks = 1
this.nbInputBlocks = 0
this.bufferThreshold = 0
this.buffers = make([]blockBuffer, 2*this.jobs)

Expand Down Expand Up @@ -1009,7 +1000,7 @@ func (this *Reader) validateHeaderless() error {
}

nbBlocks := int((this.outputSize + int64(this.blockSize-1)) / int64(this.blockSize))
this.nbInputBlocks = max(min(nbBlocks, _MAX_CONCURRENCY-1), 1)
this.nbInputBlocks = max(min(nbBlocks, _MAX_CONCURRENCY-1), 0)
}

return nil
Expand Down Expand Up @@ -1124,7 +1115,7 @@ func (this *Reader) readHeader() error {
}

nbBlocks := int((this.outputSize + int64(this.blockSize-1)) / int64(this.blockSize))
this.nbInputBlocks = max(min(nbBlocks, _MAX_CONCURRENCY-1), 1)
this.nbInputBlocks = max(min(nbBlocks, _MAX_CONCURRENCY-1), 0)
}

// Read and verify checksum
Expand All @@ -1151,10 +1142,6 @@ func (this *Reader) readHeader() error {
// Read number of blocks in input. 0 means 'unknown' and 63 means 63 or more.
this.nbInputBlocks = int(this.ibs.ReadBits(6))

if this.nbInputBlocks == 0 {
this.nbInputBlocks = 65536
}

// Read and verify checksum
cksum1 := uint32(this.ibs.ReadBits(4))
var cksum2 uint32
Expand All @@ -1172,7 +1159,7 @@ func (this *Reader) readHeader() error {
}
} else {
// Header prior to version 3
this.ibs.ReadBits(6) // nbInputs
this.nbInputBlocks = int(this.ibs.ReadBits(6))
this.ibs.ReadBits(4) // reserved
}

Expand Down Expand Up @@ -1319,12 +1306,12 @@ func (this *Reader) processBlock() (int, error) {
nbTasks := this.jobs
var jobsPerTask []uint

// Assign optimal number of tasks and jobs per task
// Assign optimal number of tasks and jobs per task (if the number of blocks is known)
if nbTasks > 1 {
// Limit the number of jobs if there are fewer blocks that this.jobs
// It allows more jobs per task and reduces memory usage.
if nbTasks > this.nbInputBlocks {
nbTasks = this.nbInputBlocks
if this.nbInputBlocks > 0 {
nbTasks = min(nbTasks, this.nbInputBlocks)
}

jobsPerTask, _ = internal.ComputeJobsPerTask(make([]uint, nbTasks), uint(this.jobs), uint(nbTasks))
Expand Down

0 comments on commit 7685f17

Please sign in to comment.