Skip to content

Commit

Permalink
feat:multithread made necessary modules public for using in mutithrea…
Browse files Browse the repository at this point in the history
…ded envs, implemented ChunkReader and provided example
  • Loading branch information
feefladder committed Sep 30, 2024
1 parent da8f183 commit a5c87b7
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 70 deletions.
15 changes: 13 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ exclude = ["tests/images/*", "tests/fuzz_images/*"]

[features]
async_decoder = ["dep:futures", "dep:async-trait"]
ehttp = ["dep:ehttp", "async_decoder"]
multithread = ["async_decoder"]
# only for async example reading a COG
ehttp = ["async_decoder", "dep:ehttp"]

[dependencies]
weezl = "0.1.0"
Expand All @@ -44,4 +46,13 @@ description = "Example showing use of async features using async http requests"
[[example]]
name = "async_http"
path="examples/async_http.rs"
required-features=["ehttp"]
required-features=["ehttp", "async_decoder"]

[package.metadata.example.multithread]
name="multithread_http"
description = "example showing multithreaded reading of COG"

[[example]]
name = "multithread_http"
path="examples/multithread_http.rs"
required-features=["ehttp", "multithread"]
194 changes: 194 additions & 0 deletions examples/multithread_http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Special thanks to Alice for the help: https://users.rust-lang.org/t/63019/6
use std::io::{Result, SeekFrom};
use std::pin::Pin;
use std::sync::Arc;
use futures::{
future::BoxFuture,
io::{AsyncRead, AsyncSeek},
Future,
};
use tiff::decoder::Decoder;

// extern crate ehttp;

// Arc for sharing, see https://users.rust-lang.org/t/how-to-clone-a-boxed-closure/31035/9
// or https://stackoverflow.com/a/27883612/14681457
pub type F = Arc<
dyn Fn(u64, u64) -> BoxFuture<'static, std::io::Result<SeekOutput>> + Send + Sync,
>;
pub struct RangedStreamer {
pos: u64,
length: u64, // total size
state: State,
range_get: F,
min_request_size: usize, // requests have at least this size
}

/// This is a fake clone, that doesn't clone the currently pending task, but everything else
impl Clone for RangedStreamer {
fn clone(&self) -> Self {
RangedStreamer {
pos: self.pos,
length: self.length,
state: State::HasChunk(SeekOutput {
start: 0,
data: vec![],
}),
range_get: self.range_get.clone(),
min_request_size: self.min_request_size,
}
}
}

enum State {
HasChunk(SeekOutput),
Seeking(BoxFuture<'static, std::io::Result<SeekOutput>>),
}

#[derive(Debug, Clone)]
pub struct SeekOutput {
pub start: u64,
pub data: Vec<u8>,
}



impl RangedStreamer {
pub fn new(length: usize, min_request_size: usize, range_get: F) -> Self {
let length = length as u64;
Self {
pos: 0,
length,
state: State::HasChunk(SeekOutput {
start: 0,
data: vec![],
}),
range_get,
min_request_size,
}
}
}

// whether `test_interval` is inside `a` (start, length).
fn range_includes(a: (usize, usize), test_interval: (usize, usize)) -> bool {
if test_interval.0 < a.0 {
return false;
}
let test_end = test_interval.0 + test_interval.1;
let a_end = a.0 + a.1;
if test_end > a_end {
return false;
}
true
}

impl AsyncRead for RangedStreamer {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<Result<usize>> {
let requested_range = (self.pos as usize, buf.len());
let min_request_size = self.min_request_size;
match &mut self.state {
State::HasChunk(output) => {
let existing_range = (output.start as usize, output.data.len());
if range_includes(existing_range, requested_range) {
let offset = requested_range.0 - existing_range.0;
buf.copy_from_slice(&output.data[offset..offset + buf.len()]);
self.pos += buf.len() as u64;
std::task::Poll::Ready(Ok(buf.len()))
} else {
let start = requested_range.0 as u64;
let length = std::cmp::max(min_request_size, requested_range.1);
let future = (self.range_get)(start, length.try_into().unwrap());
self.state = State::Seeking(Box::pin(future));
self.poll_read(cx, buf)
}
}
State::Seeking(ref mut future) => match Pin::new(future).poll(cx) {
std::task::Poll::Ready(v) => {
match v {
Ok(output) => self.state = State::HasChunk(output),
Err(e) => return std::task::Poll::Ready(Err(e)),
};
self.poll_read(cx, buf)
}
std::task::Poll::Pending => std::task::Poll::Pending,
},
}
}
}

impl AsyncSeek for RangedStreamer {
fn poll_seek(
mut self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
pos: std::io::SeekFrom,
) -> std::task::Poll<Result<u64>> {
match pos {
SeekFrom::Start(pos) => self.pos = pos,
SeekFrom::End(pos) => self.pos = (self.length as i64 + pos) as u64,
SeekFrom::Current(pos) => self.pos = (self.pos as i64 + pos) as u64,
};
std::task::Poll::Ready(Ok(self.pos))
}
}



#[tokio::main]
async fn main() {
let url = "https://isdasoil.s3.amazonaws.com/covariates/dem_30m/dem_30m.tif";
let Ok(url_head) = ehttp::fetch_async(ehttp::Request::head(url)).await else {println!("EPIC FAIL!"); return;};
let length = usize::from_str_radix(url_head.headers.get("content-length").unwrap(), 10).expect("Could not parse content length");
println!("head: {:?}", url_head);
let range_get = Arc::new(move |start: u64, length: u64| {
// let bucket = bucket.clone();
let url = url;
Box::pin(async move {
println!("requested: {} kb", length / 1024);
let mut request = ehttp::Request::get(url);
request.headers.insert("Range".to_string(), format!("bytes={:?}-{:?}",start,start+length));
let resp = ehttp::fetch_async(request).await.map_err(|e| std::io::Error::other(e))?;
if !resp.ok {
Err(std::io::Error::other(format!("Received invalid response: {:?}", resp.status)))
} else {
println!("received: {} kb", resp.bytes.len() / 1024);
Ok(SeekOutput {start, data: resp.bytes})
}
}) as BoxFuture<'static, std::io::Result<SeekOutput>>
});
let reader = RangedStreamer::new(length, 30*1024, range_get);

// this decoder will read all necessary tags
let decoder = Decoder::new_overview_async(reader, 0).await.expect("oh noes!");
println!("initialized decoder");
let cloneable_decoder = tiff::decoder::ChunkDecoder::from_decoder(decoder);

let mut handles = Vec::new();
for chunk in 42..69 {
let mut cloned_decoder = cloneable_decoder.clone();

let handle = tokio::spawn(async move {

let result = cloned_decoder.read_chunk_async(chunk).await;
match result {
Ok(data) => {
println!("Successfully read chunk {}", chunk);
Ok(data) // Return the data for collection
}
Err(e) => {
eprintln!("Error reading chunk {}: {:?}", chunk, e);
Err(e) // Return the error for handling
}
}
});
handles.push(handle);
}

let results = futures::future::join_all(handles).await;
for r in results {
println!("result: {:?}", r.expect("idk").expect("idk²").len())
}
}
8 changes: 4 additions & 4 deletions src/decoder/async_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::{
};
use std::collections::{HashMap, HashSet};

use crate::{ColorType, TiffError, TiffFormatError, TiffResult, TiffUnsupportedError, UsageError};
use crate::{TiffError, TiffFormatError, TiffResult, TiffUnsupportedError};

// use self::ifd::Directory;
// use self::image::Image;
Expand All @@ -18,7 +18,7 @@ use crate::decoder::{
Decoder,
ifd::{Value, Directory}, Image, stream::{
ByteOrder, SmartReader,
}, ChunkType, DecodingBuffer, DecodingResult, Limits,
}, ChunkType, DecodingBuffer, DecodingResult,
};

use stream::AsyncEndianReader;
Expand Down Expand Up @@ -453,7 +453,7 @@ impl<R: AsyncRead + AsyncSeek + RangeReader + Unpin + Send> Decoder<R> {
pub async fn read_chunk_async(&mut self, chunk_index: u32) -> TiffResult<DecodingResult> {
let data_dims = self.image().chunk_data_dimensions(chunk_index)?;

let mut result = self.result_buffer(data_dims.0 as usize, data_dims.1 as usize)?;
let mut result = Self::result_buffer(data_dims.0 as usize, data_dims.1 as usize, self.image(), &self.limits)?;

self.read_chunk_to_buffer_async(result.as_buffer(0), chunk_index, data_dims.0 as usize)
.await?;
Expand All @@ -465,7 +465,7 @@ impl<R: AsyncRead + AsyncSeek + RangeReader + Unpin + Send> Decoder<R> {
pub async fn read_image_async(&mut self) -> TiffResult<DecodingResult> {
let width = self.image().width;
let height = self.image().height;
let mut result = self.result_buffer(width as usize, height as usize)?;
let mut result = Self::result_buffer(usize::try_from(width)?, usize::try_from(height)?, self.image(), &self.limits )?;
if width == 0 || height == 0 {
return Ok(result);
}
Expand Down
35 changes: 0 additions & 35 deletions src/decoder/ifd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use std::collections::HashMap;
use std::io::{self, Read, Seek};
use std::mem;
use std::str;

use super::stream::{ByteOrder, EndianReader, SmartReader};
Expand Down Expand Up @@ -679,40 +678,6 @@ impl Entry {
}
Ok(List(v))
}

#[inline]
fn decode_offset<R, F>(
&self,
value_count: u64,
bo: ByteOrder,
bigtiff: bool,
limits: &super::Limits,
reader: &mut SmartReader<R>,
decode_fn: F,
) -> TiffResult<Value>
where
R: Read + Seek,
F: Fn(&mut SmartReader<R>) -> TiffResult<Value>,
{
let value_count = usize::try_from(value_count)?;
if value_count > limits.decoding_buffer_size / mem::size_of::<Value>() {
return Err(TiffError::LimitsExceeded);
}

let mut v = Vec::with_capacity(value_count);

let offset = if bigtiff {
self.r(bo).read_u64()?
} else {
self.r(bo).read_u32()?.into()
};
reader.goto_offset(offset)?;

for _ in 0..value_count {
v.push(decode_fn(reader)?)
}
Ok(List(v))
}
}

/// Extracts a list of BYTE tags stored in an offset
Expand Down
16 changes: 7 additions & 9 deletions src/decoder/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use crate::{ColorType, TiffError, TiffFormatError, TiffResult, TiffUnsupportedEr
use std::io::{self, Cursor, Read, Seek};
use std::sync::Arc;

#[derive(Debug)]
pub(crate) struct StripDecodeState {
#[derive(Debug, Clone)]
pub struct StripDecodeState {
pub rows_per_strip: u32,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
/// Computed values useful for tile decoding
pub(crate) struct TileAttributes {
pub struct TileAttributes {
pub image_width: usize,
pub image_height: usize,

Expand Down Expand Up @@ -58,8 +58,8 @@ impl TileAttributes {
}
}

#[derive(Debug)]
pub(crate) struct Image {
#[derive(Debug, Clone)]
pub struct Image {
pub ifd: Option<Directory>,
pub width: u32,
pub height: u32,
Expand Down Expand Up @@ -679,16 +679,14 @@ impl Image {
}
}
} else {
for (i, row) in buf
for (_, row) in buf
.chunks_mut(output_row_stride)
.take(data_dims.1 as usize)
.enumerate()
{
let row = &mut row[..data_row_bytes];
reader.read_exact(row)?;

println!("chunk={chunk_index}, index={i}");

// Skip horizontal padding
if chunk_row_bytes > data_row_bytes {
let len = u64::try_from(chunk_row_bytes - data_row_bytes)?;
Expand Down
Loading

0 comments on commit a5c87b7

Please sign in to comment.