Skip to content

Commit

Permalink
Support for resuming decoding after UnexpectedEof.
Browse files Browse the repository at this point in the history
This commit supports resuming decoding after `UnexpectedEof` in two
ways:

1. Support for detecting an unexpected EOF using the public API
   of `DecodingError`.  Before this commit `UnexpectedEof`,
   `UnexpectedEndOfChunk`, and `NoMoreImageData` errors were represented
   as a crate-internal `FormatErrorInner` type.  After this commit,
   these errors have a representation that can be detected using the
   public API:
   `DecodingError::IoError(std::io::ErrorKind::UnexpectedEof.into())`.

2. Support for resuming decoding after an unexpected EOF.  Before this
   commit `fn next_interlaced_row` would unconditionally call
   `next_pass` - advancing to the next row.  After this commit this
   will only happen if the previous row has been successfully decoded.
  • Loading branch information
anforowicz committed Sep 5, 2024
1 parent 790994e commit d9b6e57
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 53 deletions.
75 changes: 36 additions & 39 deletions src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub use self::stream::{DecodeOptions, Decoded, DecodingError, StreamingDecoder};
use self::stream::{FormatErrorInner, CHUNK_BUFFER_SIZE};
use self::transform::{create_transform_fn, TransformFn};

use std::io::{BufRead, BufReader, Read};
use std::io::{BufRead, BufReader, ErrorKind, Read};
use std::mem;
use std::ops::Range;

Expand Down Expand Up @@ -207,9 +207,7 @@ impl<R: Read> Decoder<R> {
while self.read_decoder.info().is_none() {
buf.clear();
if self.read_decoder.decode_next(&mut buf)?.is_none() {
return Err(DecodingError::Format(
FormatErrorInner::UnexpectedEof.into(),
));
return Err(DecodingError::IoError(ErrorKind::UnexpectedEof.into()));
}
}
Ok(self.read_decoder.info().unwrap())
Expand All @@ -228,6 +226,7 @@ impl<R: Read> Decoder<R> {
data_stream: Vec::new(),
prev_start: 0,
current_start: 0,
interlace_info_of_unfinished_row: None,
transform: self.transform,
transform_fn: None,
scratch_buffer: Vec::new(),
Expand Down Expand Up @@ -319,9 +318,7 @@ impl<R: Read> ReadDecoder<R> {
let (consumed, result) = {
let buf = self.reader.fill_buf()?;
if buf.is_empty() {
return Err(DecodingError::Format(
FormatErrorInner::UnexpectedEof.into(),
));
return Err(DecodingError::IoError(ErrorKind::UnexpectedEof.into()));
}
self.decoder.update(buf, image_data)?
};
Expand All @@ -339,9 +336,7 @@ impl<R: Read> ReadDecoder<R> {
while !self.at_eof {
let buf = self.reader.fill_buf()?;
if buf.is_empty() {
return Err(DecodingError::Format(
FormatErrorInner::UnexpectedEof.into(),
));
return Err(DecodingError::IoError(ErrorKind::UnexpectedEof.into()));
}
let (consumed, event) = self.decoder.update(buf, &mut vec![])?;
self.reader.consume(consumed);
Expand All @@ -356,9 +351,7 @@ impl<R: Read> ReadDecoder<R> {
}
}

Err(DecodingError::Format(
FormatErrorInner::UnexpectedEof.into(),
))
Err(DecodingError::IoError(ErrorKind::UnexpectedEof.into()))
}

fn info(&self) -> Option<&Info<'static>> {
Expand All @@ -384,6 +377,8 @@ pub struct Reader<R: Read> {
prev_start: usize,
/// Index in `data_stream` where the current row starts.
current_start: usize,
/// Size and interlace info for the row that is currently being decoded.
interlace_info_of_unfinished_row: Option<InterlaceInfo>,
/// Output transformations
transform: Transformations,
/// Function that can transform decompressed, unfiltered rows into final output.
Expand Down Expand Up @@ -594,15 +589,22 @@ impl<R: Read> Reader<R> {

/// Returns the next processed row of the image
pub fn next_interlaced_row(&mut self) -> Result<Option<InterlacedRow>, DecodingError> {
let (rowlen, interlace) = match self.next_pass() {
Some((rowlen, interlace)) => (rowlen, interlace),
None => return Ok(None),
// Take `interlace_info_of_unfinished_row` of a previous, non-`Ok` attempt (if available).
// This supports resuming when calling `next_interlaced_row` after `UnexpectedEof` was
// encountered earlier.
let interlace = match self.interlace_info_of_unfinished_row.take() {
Some(interlace) => interlace,
None => match self.next_pass() {
Some(interlace) => interlace,
None => return Ok(None),
},
};

let width = if let InterlaceInfo::Adam7 { width, .. } = interlace {
width
let (rowlen, width) = if let InterlaceInfo::Adam7 { width, .. } = interlace {
let rowlen = self.info().raw_row_length_from_width(width);
(rowlen, width)
} else {
self.subframe.width
(self.subframe.rowlen, self.subframe.width)
};
let output_line_size = self.output_line_size(width);

Expand All @@ -612,12 +614,17 @@ impl<R: Read> Reader<R> {
output_buffer.resize(output_line_size, 0u8);
let ret = self.next_interlaced_row_impl(rowlen, &mut output_buffer);
self.scratch_buffer = output_buffer;
ret?;

Ok(Some(InterlacedRow {
data: &self.scratch_buffer[..output_line_size],
interlace,
}))
match ret {
Err(e) => {
self.interlace_info_of_unfinished_row = Some(interlace);
Err(e)
}
Ok(_) => Ok(Some(InterlacedRow {
data: &self.scratch_buffer[..output_line_size],
interlace,
})),
}
}

/// Read the rest of the image and chunks and finish up, including text chunks or others
Expand Down Expand Up @@ -711,20 +718,19 @@ impl<R: Read> Reader<R> {
color.raw_row_length_from_width(depth, width) - 1
}

fn next_pass(&mut self) -> Option<(usize, InterlaceInfo)> {
fn next_pass(&mut self) -> Option<InterlaceInfo> {
match self.subframe.interlace {
InterlaceIter::Adam7(ref mut adam7) => {
let last_pass = adam7.current_pass();
let (pass, line, width) = adam7.next()?;
let rowlen = self.info().raw_row_length_from_width(width);
if last_pass != pass {
self.prev_start = self.current_start;
}
Some((rowlen, InterlaceInfo::Adam7 { pass, line, width }))
Some(InterlaceInfo::Adam7 { pass, line, width })
}
InterlaceIter::None(ref mut height) => {
let _ = height.next()?;
Some((self.subframe.rowlen, InterlaceInfo::Null))
Some(InterlaceInfo::Null)
}
}
}
Expand All @@ -736,9 +742,7 @@ impl<R: Read> Reader<R> {
// Read image data until we have at least one full row (but possibly more than one).
while self.data_stream.len() - self.current_start < rowlen {
if self.subframe.consumed_and_flushed {
return Err(DecodingError::Format(
FormatErrorInner::NoMoreImageData.into(),
));
return Err(DecodingError::IoError(ErrorKind::UnexpectedEof.into()));
}

// Clear the current buffer before appending more data.
Expand All @@ -756,14 +760,7 @@ impl<R: Read> Reader<R> {
self.subframe.consumed_and_flushed = true;
}
None => {
return Err(DecodingError::Format(
if self.data_stream.is_empty() {
FormatErrorInner::NoMoreImageData
} else {
FormatErrorInner::UnexpectedEndOfChunk
}
.into(),
));
return Err(DecodingError::IoError(ErrorKind::UnexpectedEof.into()));
}
_ => (),
}
Expand Down
171 changes: 157 additions & 14 deletions src/decoder/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ pub enum Decoded {
#[derive(Debug)]
pub enum DecodingError {
/// An error in IO of the underlying reader.
///
/// Note that some IO errors may be recoverable - decoding may be retried after the
/// error is resolved. For example, decoding from a slow stream of data (e.g. decoding from a
/// network stream) may occasionally result in [std::io::ErrorKind::UnexpectedEof] kind of
/// error, but decoding can resume when more data becomes available.
IoError(io::Error),
/// The input image was not a valid PNG.
///
Expand Down Expand Up @@ -160,10 +165,6 @@ pub(crate) enum FormatErrorInner {
},
/// Not a PNG, the magic signature is missing.
InvalidSignature,
/// End of file, within a chunk event.
UnexpectedEof,
/// End of file, while expecting more image data.
UnexpectedEndOfChunk,
// Errors of chunk level ordering, missing etc.
/// Ihdr must occur.
MissingIhdr,
Expand Down Expand Up @@ -232,8 +233,6 @@ pub(crate) enum FormatErrorInner {
CorruptFlateStream {
err: fdeflate::DecompressionError,
},
/// The image data chunk was too short for the expected pixel count.
NoMoreImageData,
/// Bad text encoding
BadTextEncoding(TextDecodingError),
/// fdAT shorter than 4 bytes
Expand Down Expand Up @@ -323,12 +322,6 @@ impl fmt::Display for FormatError {
UnknownInterlaceMethod(nr) => write!(fmt, "Unknown interlace method {}.", nr),
BadSubFrameBounds {} => write!(fmt, "Sub frame is out-of-bounds."),
InvalidSignature => write!(fmt, "Invalid PNG signature."),
UnexpectedEof => write!(fmt, "Unexpected end of data before image end."),
UnexpectedEndOfChunk => write!(fmt, "Unexpected end of data within a chunk."),
NoMoreImageData => write!(
fmt,
"IDAT or fDAT chunk does not have enough data for image."
),
CorruptFlateStream { err } => {
write!(fmt, "Corrupt deflate stream. ")?;
write!(fmt, "{:?}", err)
Expand Down Expand Up @@ -1510,10 +1503,12 @@ mod tests {
use super::ScaledFloat;
use super::SourceChromaticities;
use crate::test_utils::*;
use crate::{Decoder, DecodingError};
use crate::{Decoder, DecodingError, Reader};
use byteorder::WriteBytesExt;
use std::cell::RefCell;
use std::fs::File;
use std::io::Write;
use std::io::{ErrorKind, Read, Write};
use std::rc::Rc;

#[test]
fn image_gamma() -> Result<(), ()> {
Expand Down Expand Up @@ -1976,4 +1971,152 @@ mod tests {
reader.next_frame(&mut buf).unwrap();
assert_eq!(3093270825, crc32fast::hash(&buf));
}

/// `StremingInput` can be used by tests to simulate a streaming input
/// (e.g. a slow http response, where all bytes are not immediately available).
#[derive(Clone)]
struct StreamingInput(Rc<RefCell<StreamingInputState>>);

struct StreamingInputState {
full_input: Vec<u8>,
current_pos: usize,
available_len: usize,
}

impl StreamingInput {
fn new(full_input: Vec<u8>) -> Self {
Self(Rc::new(RefCell::new(StreamingInputState {
full_input,
current_pos: 0,
available_len: 0,
})))
}

fn with_noncompressed_png(width: u32, idat_size: usize) -> Self {
let mut png = Vec::new();
write_noncompressed_png(&mut png, width, idat_size);
Self::new(png)
}

fn expose_next_byte(&self) {
let mut state = self.0.borrow_mut();
assert!(state.available_len < state.full_input.len());
state.available_len += 1;
}

fn rewind(&self) {
let mut state = self.0.borrow_mut();
state.current_pos = 0;
}

fn decode_full_input<F, R>(&self, f: F) -> R
where
F: FnOnce(Reader<&[u8]>) -> R,
{
let state = self.0.borrow();
let decoder = Decoder::new(state.full_input.as_slice());
f(decoder.read_info().unwrap())
}
}

impl Read for StreamingInput {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let mut state = self.0.borrow_mut();
let mut available_bytes = &state.full_input[state.current_pos..state.available_len];
let number_of_read_bytes = available_bytes.read(buf)?;
state.current_pos += number_of_read_bytes;
assert!(state.current_pos <= state.available_len);
Ok(number_of_read_bytes)
}
}

/// Test resuming/retrying `Reader.next_frame` after `UnexpectedEof`.
#[test]
fn test_stream_input_and_decode_single_next_frame() {
const WIDTH: u32 = 16;
const IDAT_SIZE: usize = 512;
let streaming_input = StreamingInput::with_noncompressed_png(WIDTH, IDAT_SIZE);

let decoded_from_whole_input = streaming_input.decode_full_input(|mut r| {
let mut buf = vec![0; r.output_buffer_size()];
r.next_frame(&mut buf).unwrap();
buf
});

let mut png_reader = None;
let mut decoded_from_streaming_input = Vec::new();
loop {
if png_reader.is_none() {
streaming_input.rewind();
png_reader = match Decoder::new(streaming_input.clone()).read_info() {
Ok(reader) => {
assert_eq!(reader.info().width, WIDTH);
Some(reader)
}
Err(DecodingError::IoError(e)) if e.kind() == ErrorKind::UnexpectedEof => {
streaming_input.expose_next_byte();
None
}
_ => panic!("Unexpected error"),
};
}
if let Some(png_reader) = png_reader.as_mut() {
match png_reader.next_row() {
Ok(None) => break,
Ok(Some(row)) => decoded_from_streaming_input.extend_from_slice(row.data()),
Err(DecodingError::IoError(e)) if e.kind() == ErrorKind::UnexpectedEof => {
streaming_input.expose_next_byte()
}
e => panic!("Unexpected error: {:?}", e),
}
}
}
assert_eq!(
decoded_from_whole_input.len(),
decoded_from_streaming_input.len()
);
assert_eq!(
crc32fast::hash(&decoded_from_whole_input),
crc32fast::hash(&decoded_from_streaming_input)
);
}

/// Test resuming/retrying `Decoder.read_header_info` after `UnexpectedEof`.
#[test]
fn test_stream_input_and_decode_image_info() {
const WIDTH: u32 = 16;
const IDAT_SIZE: usize = 512;
let streaming_input = StreamingInput::with_noncompressed_png(WIDTH, IDAT_SIZE);

let info_from_whole_input = streaming_input.decode_full_input(|r| r.info().clone());

let mut decoder = Decoder::new(streaming_input.clone());
let info_from_streaming_input = loop {
match decoder.read_header_info() {
Ok(info) => break info.clone(),
Err(DecodingError::IoError(e)) if e.kind() == ErrorKind::UnexpectedEof => {
streaming_input.expose_next_byte()
}
e => panic!("Unexpected error: {:?}", e),
}
};

assert_eq!(info_from_whole_input.width, info_from_streaming_input.width);
assert_eq!(
info_from_whole_input.height,
info_from_streaming_input.height
);
assert_eq!(
info_from_whole_input.bit_depth,
info_from_streaming_input.bit_depth
);
assert_eq!(
info_from_whole_input.color_type,
info_from_streaming_input.color_type
);
assert_eq!(
info_from_whole_input.interlaced,
info_from_streaming_input.interlaced
);
}
}

0 comments on commit d9b6e57

Please sign in to comment.