-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
External sorting not working for (maybe only for string columns??) #12136
Comments
This is probably unrelated, but there's something that puzzles me about the fair spill pool logic. The Consider a single spillable consumer that allocates the entire pool. It shouldn't be able to grow the resulting reservation. But if it splits the reservation into two, it can now grow it - even when the other reservation remains allocated. Here's a failing test showing this:
Let me know if I'm misunderstanding something here. |
I could reproduce the pod freezing up at 1TB disk usage. This happened within a minute of spawning 500 threads. All threads were blocked on a mutex (
It actually makes sense because spill files use Apache IPC format without compression, while the partition uses Parquet files with Snappy compression. |
This change replaces `try_resize` with `resize` in three sites, allowing memory to overshoot the configured pool size. These are sites where we don't fall back to spilling to disk when the allocation fails. Fixes: apache#12136
This change replaces `try_resize` with `resize` in three sites, allowing memory to overshoot the configured pool size. These are sites where we don't fall back to spilling to disk when the allocation fails. Fixes: apache#12136
This change replaces `try_resize` with `resize` in three sites, allowing memory to overshoot the configured pool size. These are sites where we don't fall back to spilling to disk when the allocation fails. Fixes: apache#12136
This change replaces `try_resize` with `resize` in three sites, allowing memory to overshoot the configured pool size. These are sites where we don't fall back to spilling to disk when the allocation fails. Fixes: apache#12136
Also hitting this bug, is there any update on a fix ? |
I don't know of anyone working explicitly working to make external sorting better. Some recent work maybe would make it better
But realistically I think there is significant additional room for improvement. Any help welcome |
I don't believe that adding an option for compression in DiskManager would be all that difficult - the IPCWriter and reader already has support for LZ4. It would of course incur the cost of compression/decompression but would likely result in 50+% space savings on disk. It's a nice to have though - I think of it more as a band-aid than a solution. |
For the given reproducer, I got the error
Configure datafusion.execution.sort_spill_reservation_bytes to 1MB can let it run successfully. ( I don't know whether the parquet related error message is caused by the same issue) // Reproducer: place in datafusion/core/tests/memory_limit/mod.rs
#[tokio::test]
async fn test_sort_with_memory_limit() -> Result<()> {
// initialize logging to see DataFusion's internal logging
let _ = env_logger::try_init();
// how much data to sort
let row_limit = 10 * 1000;
let mem_limit = 10 * 1024 * 1024; // 10 MB
let sort_spill_reservation_bytes = 1024 * 1024; // 1 MB
let generator = AccessLogGenerator::new()
.with_row_limit(row_limit)
.with_max_batch_size(100); // 100 rows per batch
let pool = Arc::new(GreedyMemoryPool::new(mem_limit));
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(pool)
.with_disk_manager(DiskManagerConfig::new())
.build()?;
let session_config = SessionConfig::new()
.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes);
let state = SessionStateBuilder::new()
.with_config(session_config)
.with_runtime_env(Arc::new(runtime))
.build();
let ctx = SessionContext::new_with_state(state);
// create a plan that simply sorts on the hostname
let df = ctx
.read_batches(generator)?
.sort(vec![col("host").sort(true, true)])?;
// execute the plan (it should succeed)
let _results: Vec<RecordBatch> = df.collect().await?;
Ok(())
} Reasons:
Thoughts: |
@2010YOUY01 Your solution does not work for me. I did play around with |
We have also encountered this issue. After some debugging (by adding debug logs before every call to I set After insert some batches, the pool is full and start to in-memory sort, at this moment the pool usage is: datafusion/datafusion/physical-plan/src/sorts/sort.rs Lines 440 to 443 in 0228bee
finished without error. After sort, all memory are released. Then the following code count the sorted batches' memory: datafusion/datafusion/physical-plan/src/sorts/sort.rs Lines 445 to 449 in 0228bee
And this size is: 698840964, which exceeds the memory limit and the following self.reservation.try_resize(size)?; failed:
Not sure why the sorted batches' memory is over 2.6x than the batches before sort. |
Some findings so far:
|
This reproducer can also run successfully by disabling string view via |
Thank you @xuchen-plus , so we disable the stringView it will succeed when we increase the sort_spill_reservation_bytes. If so we need to investigate how to fix stringView case? |
It still failed for me for the setting, the reproduce code: use arrow::array::{RecordBatch, StringBuilder};
use arrow_schema::{DataType, Field, Schema};
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use futures::TryStreamExt;
use std::sync::Arc;
#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
pub async fn main() {
build_parquet();
let env = RuntimeEnvBuilder::new()
.with_disk_manager(DiskManagerConfig::default())
.with_memory_pool(Arc::new(FairSpillPool::new(100 * 1024 * 1024)))
.build_arc()
.unwrap();
let mut config = SessionConfig::new().with_sort_spill_reservation_bytes(16 * 1024 * 1024);
config.options_mut().execution.parquet.schema_force_view_types = false;
let ctx = SessionContext::new_with_config_rt(config, env);
ctx.register_parquet(
"big_strings",
"/tmp/big_strings.parquet",
ParquetReadOptions::default(),
)
.await
.unwrap();
let sql = "SELECT * FROM big_strings ORDER BY strings";
println!("Sorting strings");
ctx.sql(sql)
.await
.unwrap()
.execute_stream()
.await
.unwrap()
.try_for_each(|_| std::future::ready(Ok(())))
.await
.unwrap();
}
fn build_parquet() {
if std::fs::File::open("/tmp/big_strings.parquet").is_ok() {
println!("Using existing file at /tmp/big_strings.parquet");
return;
}
println!("Generating test file at /tmp/big_strings.parquet");
let file = std::fs::File::create("/tmp/big_strings.parquet").unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"strings",
DataType::Utf8,
false,
)]));
let mut writer = ArrowWriter::try_new(file, schema.clone(), None).unwrap();
for batch_idx in 0..100 {
println!("Generating batch {} of 100", batch_idx);
let mut string_array_builder =
StringBuilder::with_capacity(1024 * 1024, 1024 * 1024 * 3 * 14);
for i in 0..(1024 * 1024) {
string_array_builder
.append_value(format!("string-{}string-{}string-{}", i, i, i));
}
let array = Arc::new(string_array_builder.finish());
let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
writer.write(&batch).unwrap();
}
writer.close().unwrap();
} And when disable stringView, a new finding is the sorted data is smaller than the original data: Before sorting we have 6233344 bytes of unsorted data
After sorting we now have 4932480 bytes of sorted data
Spilling
Before sorting we have 6233344 bytes of unsorted data
After sorting we now have 4932480 bytes of sorted data
Spilling
Before sorting we have 6233344 bytes of unsorted data
After sorting we now have 4932480 bytes of sorted data
Spilling
Before sorting we have 6233344 bytes of unsorted data
After sorting we now have 4932480 bytes of sorted data
Spilling
Before sorting we have 6233344 bytes of unsorted data
After sorting we now have 4932480 bytes of sorted data Failed with error: Before sorting we have 6233344 bytes of unsorted data
After sorting we now have 4932480 bytes of sorted data
Spilling
Before sorting we have 6233344 bytes of unsorted data
thread 'main' panicked at datafusion-examples/examples/large_string_sort.rs:62:10:
called `Result::unwrap()` on an `Err` value: ResourcesExhausted("Failed to allocate additional 353536 bytes for ExternalSorter[1] with 7138944 bytes already allocated for this reservation - 7489828 bytes remain available for the total pool") |
@zhuqi-lucas Sorry. I actually adjusted |
Thanks @xuchen-plus , unluckily after change to 32MB for sort_spill_reservation_bytes, it still failed for me, i am not sure which i am missing. use arrow::array::{RecordBatch, StringBuilder};
use arrow_schema::{DataType, Field, Schema};
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use futures::TryStreamExt;
use std::sync::Arc;
#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
pub async fn main() {
build_parquet();
let env = RuntimeEnvBuilder::new()
.with_disk_manager(DiskManagerConfig::default())
.with_memory_pool(Arc::new(FairSpillPool::new(100 * 1024 * 1024)))
.build_arc()
.unwrap();
let mut config = SessionConfig::new().with_sort_spill_reservation_bytes(32 * 1024 * 1024);
config.options_mut().execution.parquet.schema_force_view_types = false;
let ctx = SessionContext::new_with_config_rt(config, env);
ctx.register_parquet(
"big_strings",
"/tmp/big_strings.parquet",
ParquetReadOptions::default(),
)
.await
.unwrap();
let sql = "SELECT * FROM big_strings ORDER BY strings";
println!("Sorting strings");
ctx.sql(sql)
.await
.unwrap()
.execute_stream()
.await
.unwrap()
.try_for_each(|_| std::future::ready(Ok(())))
.await
.unwrap();
}
fn build_parquet() {
if std::fs::File::open("/tmp/big_strings.parquet").is_ok() {
println!("Using existing file at /tmp/big_strings.parquet");
return;
}
println!("Generating test file at /tmp/big_strings.parquet");
let file = std::fs::File::create("/tmp/big_strings.parquet").unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"strings",
DataType::Utf8,
false,
)]));
let mut writer = ArrowWriter::try_new(file, schema.clone(), None).unwrap();
for batch_idx in 0..100 {
println!("Generating batch {} of 100", batch_idx);
let mut string_array_builder =
StringBuilder::with_capacity(1024 * 1024, 1024 * 1024 * 3 * 14);
for i in 0..(1024 * 1024) {
string_array_builder
.append_value(format!("string-{}string-{}string-{}", i, i, i));
}
let array = Arc::new(string_array_builder.finish());
let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
writer.write(&batch).unwrap();
}
writer.close().unwrap();
} called `Result::unwrap()` on an `Err` value: ResourcesExhausted("Failed to allocate additional 3875520 bytes for ExternalSorter[2] with 0 bytes already allocated for this reservation - 2696338 bytes remain available for the total pool") |
@zhuqi-lucas I was using main branch at commit id |
I haven't followed the discussion yet, but to make sure the external execution results are reproducible, |
I have also encountered the same problem with string views. DataFusion uses When spilling happens, these interleaved arrays will be serialized using Arrow IPC and produces very large binaries. When we read them back in spill-read phase, we have to allocate super large buffers for these arrays, which makes things much worse. |
I think the fix for apache/arrow-rs#6779 is in DataFusion 45 -- does this still happen? |
I am using the latest main branch of DataFusion (commit 28856e1) and the problem still happens. Edit: this can be easily reproduced using sort-tpch Q3. The following error message is obtained from a
We see 100MB in-memory batches got sorted and became 2.1GB. Backtrace:
|
Describe the bug
Filing a ticket based on a conversation in discord: https://discord.com/channels/885562378132000778/1166447479609376850/1275728622224932959
Basically, I expect that when properly configured, DataFusion would be able to sort data that doesn't fit in RAM, but instead it results in an error like
To Reproduce
Here is a reproducer: rust_playground.tar.gz
tar xf rust_playground.tar.gz cd rust_playground/ cargo run
The code looks like this
Expected behavior
I expect the query to succeed (by spilling data to disk, etc)
Additional context
@westonpace notes #10073 may be related
Here is some of the commentary from discord:
datafusion/datafusion/physical-plan/src/sorts/sort.rs
Line 303 in a50aeef
datafusion/datafusion/physical-plan/src/sorts/sort.rs
Line 434 in a50aeef
datafusion/datafusion/physical-plan/src/sorts/sort.rs
Line 513 in a50aeef
datafusion/datafusion/physical-plan/src/sorts/builder.rs
Line 72 in a50aeef
datafusion/datafusion/physical-plan/src/sorts/sort.rs
Line 434 in a50aeef
datafusion/datafusion/physical-plan/src/sorts/sort.rs
Line 572 in a50aeef
datafusion/datafusion/physical-plan/src/sorts/builder.rs
Line 72 in a50aeef
The text was updated successfully, but these errors were encountered: