Skip to content

Commit

Permalink
Add original size to bitstream header
Browse files Browse the repository at this point in the history
  • Loading branch information
flanglet committed Apr 4, 2024
1 parent cc555fc commit e71e268
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 71 deletions.
26 changes: 15 additions & 11 deletions v2/app/BlockCompressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,25 +798,29 @@ func (this *fileCompressTask) call() (int, uint64, uint64) {
before := time.Now()

for {
if length, err := input.Read(buffer); err != nil {
var length int

if length, err = input.Read(buffer); err != nil {
fmt.Printf("Failed to read block from file '%s': %v\n", inputName, err)
return kanzi.ERR_READ_FILE, read, cos.GetWritten()
}

if length <= 0 {
break
}
if length > 0 {
read += uint64(length)

read += uint64(length)
if _, err = cos.Write(buffer[0:length]); err != nil {
if ioerr, isIOErr := err.(kio.IOError); isIOErr == true {
fmt.Printf("%s\n", ioerr.Error())
return ioerr.ErrorCode(), read, cos.GetWritten()
}

if _, err = cos.Write(buffer[0:length]); err != nil {
if ioerr, isIOErr := err.(kio.IOError); isIOErr == true {
fmt.Printf("%s\n", ioerr.Error())
return ioerr.ErrorCode(), read, cos.GetWritten()
fmt.Printf("An unexpected condition happened. Exiting ...\n%v\n", err.Error())
return kanzi.ERR_PROCESS_BLOCK, read, cos.GetWritten()
}
}

fmt.Printf("An unexpected condition happened. Exiting ...\n%v\n", err.Error())
return kanzi.ERR_PROCESS_BLOCK, read, cos.GetWritten()
if length < len(buffer) {
break
}
}

Expand Down
50 changes: 25 additions & 25 deletions v2/app/BlockDecompressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,6 @@ func (this *fileDecompressTask) call() (int, uint64) {
}()

// Decode
read := int64(0)
log.Println("\nDecompressing "+inputName+" ...", verbosity > 1)
log.Println("", verbosity > 3)
var input io.ReadCloser
Expand All @@ -557,7 +556,7 @@ func (this *fileDecompressTask) call() (int, uint64) {

if input, err = os.Open(inputName); err != nil {
fmt.Printf("Cannot open input file '%s': %v\n", inputName, err)
return kanzi.ERR_OPEN_FILE, uint64(read)
return kanzi.ERR_OPEN_FILE, 0
}

defer func() {
Expand All @@ -570,51 +569,52 @@ func (this *fileDecompressTask) call() (int, uint64) {
if err != nil {
if err.(*kio.IOError) != nil {
fmt.Printf("%s\n", err.(*kio.IOError).Message())
return err.(*kio.IOError).ErrorCode(), uint64(read)
return err.(*kio.IOError).ErrorCode(), 0
}

fmt.Printf("Cannot create compressed stream: %v\n", err)
return kanzi.ERR_CREATE_DECOMPRESSOR, uint64(read)
return kanzi.ERR_CREATE_DECOMPRESSOR, 0
}

for _, bl := range this.listeners {
cis.AddListener(bl)
}

buffer := make([]byte, _DECOMP_DEFAULT_BUFFER_SIZE)
written := uint64(0)
decoded := int64(0)
before := time.Now()

// Decode next block
for {
if decoded, err := cis.Read(buffer); err != nil {
var decodedBlock int

if decodedBlock, err = cis.Read(buffer); err != nil {
if ioerr, isIOErr := err.(*kio.IOError); isIOErr == true {
fmt.Printf("%s\n", ioerr.Message())
return ioerr.ErrorCode(), uint64(read)
return ioerr.ErrorCode(), uint64(decoded)
}

if errors.Is(err, io.EOF) == false {
// Ignore EOF (see comment in io.Copy:
// Because Copy is defined to read from src until EOF, it does not
// treat EOF from Read an an error to be reported)
fmt.Printf("An unexpected condition happened. Exiting ...\n%v\n", err)
return kanzi.ERR_PROCESS_BLOCK, uint64(read)
return kanzi.ERR_PROCESS_BLOCK, uint64(decoded)
}
}

if decoded > 0 {
w, err = output.Write(buffer[0:decoded])
if decodedBlock > 0 {
_, err := output.Write(buffer[0:decodedBlock])

if err != nil {
fmt.Printf("Failed to write decompressed block to file '%s': %v\n", outputName, err)
return kanzi.ERR_WRITE_FILE, uint64(read)
return kanzi.ERR_WRITE_FILE, uint64(decoded)
}

read += int64(decoded)
written += uint64(w)
decoded += int64(decodedBlock)
}

if decoded != len(buffer) {
if decodedBlock != len(buffer) {
break
}
}
Expand All @@ -623,24 +623,24 @@ func (this *fileDecompressTask) call() (int, uint64) {
// Deferred close is fallback for error paths
if err := cis.Close(); err != nil {
fmt.Printf("%v\n", err)
return kanzi.ERR_PROCESS_BLOCK, uint64(read)
return kanzi.ERR_PROCESS_BLOCK, uint64(decoded)
}

after := time.Now()
delta := after.Sub(before).Nanoseconds() / 1000000 // convert to ms

// If the whole input stream has been decoded and the original data size is present,
// check that the output size match the original data size.
to, hasTo := this.ctx["to"]
from, hasFrom := this.ctx["from"]
_, hasTo := this.ctx["to"]
_, hasFrom := this.ctx["from"]

if hasTo == false && hasFrom == false {
if osz, hasKey := this.ctx["outputSize"]; hasKey == true {
outputSize := osz.(uint64)
outputSize := osz.(int64)

if outputSize != 0 && written != outputSize {
fmt.Printf("Corrupted bitstream: invalid output size (expected %d, got %d)\n", written, outputSize)
return kanzi.ERR_INVALID_FILE, uint64(read)
if outputSize != 0 && decoded != outputSize {
fmt.Printf("Corrupted bitstream: invalid output size (expected %d, got %d)\n", decoded, outputSize)
return kanzi.ERR_INVALID_FILE, uint64(decoded)
}
}
}
Expand All @@ -659,17 +659,17 @@ func (this *fileDecompressTask) call() (int, uint64) {
log.Println(msg, true)
msg = fmt.Sprintf("Input size: %d", cis.GetRead())
log.Println(msg, true)
msg = fmt.Sprintf("Output size: %d", read)
msg = fmt.Sprintf("Output size: %d", decoded)
log.Println(msg, true)
}

if verbosity == 1 {
msg = fmt.Sprintf("Decompressing %s: %d => %d in %s", inputName, cis.GetRead(), read, msg)
msg = fmt.Sprintf("Decompressing %s: %d => %d in %s", inputName, cis.GetRead(), decoded, msg)
log.Println(msg, true)
}

if verbosity > 1 && delta > 0 {
msg = fmt.Sprintf("Throughput (KB/s): %d", ((read*int64(1000))>>10)/delta)
msg = fmt.Sprintf("Throughput (KB/s): %d", ((decoded*int64(1000))>>10)/delta)
log.Println(msg, true)
}

Expand All @@ -681,5 +681,5 @@ func (this *fileDecompressTask) call() (int, uint64) {
notifyBDListeners(this.listeners, evt)
}

return 0, uint64(read)
return 0, uint64(decoded)
}
Loading

0 comments on commit e71e268

Please sign in to comment.