Skip to content

Commit

Permalink
feat: file purger (#1030)
Browse files Browse the repository at this point in the history
* wip

* wip

* feat: file purger

* chore: add tests

* feat: delete removed file on sst merge

* chore: move MockAccessLayer to test_util

* fix: some cr comments

* feat: add await termination for scheduler

* fix: some cr comments

* chore: rename max_file_in_level0 to max_files_in_level0
  • Loading branch information
v0y4g3r authored Feb 19, 2023
1 parent a9c8584 commit af1f8d6
Show file tree
Hide file tree
Showing 25 changed files with 917 additions and 391 deletions.
5 changes: 3 additions & 2 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ connect_timeout_millis = 5000
tcp_nodelay = false

[compaction]
max_inflight_task = 4
max_file_in_level0 = 16
max_inflight_tasks = 4
max_files_in_level0 = 16
max_purge_tasks = 32
5 changes: 3 additions & 2 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,9 @@ mod tests {

assert_eq!(
CompactionConfig {
max_inflight_task: 4,
max_file_in_level0: 16,
max_inflight_tasks: 4,
max_files_in_level0: 16,
max_purge_tasks: 32,
},
options.compaction
);
Expand Down
16 changes: 10 additions & 6 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,32 +110,36 @@ impl Default for WalConfig {
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct CompactionConfig {
/// Max task number that can concurrently run.
pub max_inflight_task: usize,
pub max_inflight_tasks: usize,
/// Max files in level 0 to trigger compaction.
pub max_file_in_level0: usize,
pub max_files_in_level0: usize,
/// Max task number for SST purge task after compaction.
pub max_purge_tasks: usize,
}

impl Default for CompactionConfig {
fn default() -> Self {
Self {
max_inflight_task: 4,
max_file_in_level0: 8,
max_inflight_tasks: 4,
max_files_in_level0: 8,
max_purge_tasks: 32,
}
}
}

impl From<&DatanodeOptions> for SchedulerConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
max_inflight_task: value.compaction.max_inflight_task,
max_inflight_tasks: value.compaction.max_inflight_tasks,
}
}
}

impl From<&DatanodeOptions> for StorageEngineConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
max_files_in_l0: value.compaction.max_file_in_level0,
max_files_in_l0: value.compaction.max_files_in_level0,
max_purge_tasks: value.compaction.max_purge_tasks,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/compaction/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ where
{
type Request = R;

async fn schedule(&self, _request: Self::Request) -> crate::error::Result<bool> {
fn schedule(&self, _request: Self::Request) -> crate::error::Result<bool> {
Ok(true)
}

async fn stop(&self) -> crate::error::Result<()> {
async fn stop(&self, _await_termination: bool) -> crate::error::Result<()> {
Ok(())
}
}
250 changes: 0 additions & 250 deletions src/storage/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,253 +109,3 @@ where
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use crate::scheduler::dedup_deque::DedupDeque;
use crate::scheduler::rate_limit::{
BoxedRateLimitToken, CascadeRateLimiter, MaxInflightTaskLimiter,
};
use crate::scheduler::{HandlerLoop, LocalScheduler, Scheduler, SchedulerConfig};

struct CountdownLatch {
counter: std::sync::Mutex<usize>,
notifies: std::sync::RwLock<Vec<Arc<Notify>>>,
}

impl CountdownLatch {
fn new(size: usize) -> Self {
Self {
counter: std::sync::Mutex::new(size),
notifies: std::sync::RwLock::new(vec![]),
}
}

fn countdown(&self) {
let mut counter = self.counter.lock().unwrap();
if *counter >= 1 {
*counter -= 1;
if *counter == 0 {
let notifies = self.notifies.read().unwrap();
for waiter in notifies.iter() {
waiter.notify_one();
}
}
}
}

async fn wait(&self) {
let notify = Arc::new(Notify::new());
{
let notify = notify.clone();
let mut notifies = self.notifies.write().unwrap();
notifies.push(notify);
}
notify.notified().await
}
}

#[tokio::test]
async fn test_schedule_handler() {
common_telemetry::init_default_ut_logging();
let queue = Arc::new(std::sync::RwLock::new(DedupDeque::default()));
let latch = Arc::new(CountdownLatch::new(2));
let latch_cloned = latch.clone();
let handler = Arc::new(HandlerLoop {
req_queue: queue.clone(),
cancel_token: Default::default(),
task_notifier: Arc::new(Default::default()),
request_handler: MockHandler {
cb: move || {
latch_cloned.countdown();
},
},
limiter: Arc::new(CascadeRateLimiter::new(vec![Box::new(
MaxInflightTaskLimiter::new(3),
)])),
});

let handler_cloned = handler.clone();
common_runtime::spawn_bg(async move { handler_cloned.run().await });

queue.write().unwrap().push_back(1, MockRequest::default());
handler.task_notifier.notify_one();
queue.write().unwrap().push_back(2, MockRequest::default());
handler.task_notifier.notify_one();

tokio::time::timeout(Duration::from_secs(1), latch.wait())
.await
.unwrap();
}

#[derive(Default, Debug)]
struct MockRequest {
region_id: RegionId,
}

struct MockHandler<F> {
cb: F,
}

#[async_trait::async_trait]
impl<F> Handler for MockHandler<F>
where
F: Fn() + Send + Sync,
{
type Request = MockRequest;

async fn handle_request(
&self,
_req: Self::Request,
token: BoxedRateLimitToken,
finish_notifier: Arc<Notify>,
) -> Result<()> {
(self.cb)();
token.try_release();
finish_notifier.notify_one();
Ok(())
}
}

impl Request for MockRequest {
type Key = RegionId;

fn key(&self) -> Self::Key {
self.region_id
}
}

#[tokio::test]
async fn test_scheduler() {
let latch = Arc::new(CountdownLatch::new(2));
let latch_cloned = latch.clone();

let handler = MockHandler {
cb: move || {
latch_cloned.countdown();
},
};
let scheduler: LocalScheduler<MockRequest> = LocalScheduler::new(
SchedulerConfig {
max_inflight_task: 3,
},
handler,
);

scheduler
.schedule(MockRequest { region_id: 1 })
.await
.unwrap();

scheduler
.schedule(MockRequest { region_id: 2 })
.await
.unwrap();

tokio::time::timeout(Duration::from_secs(1), latch.wait())
.await
.unwrap();
}

#[tokio::test]
async fn test_scheduler_many() {
common_telemetry::init_default_ut_logging();
let task_size = 100;

let latch = Arc::new(CountdownLatch::new(task_size));
let latch_clone = latch.clone();

let handler = MockHandler {
cb: move || {
latch_clone.countdown();
},
};

let config = SchedulerConfig {
max_inflight_task: 3,
};
let scheduler = LocalScheduler::new(config, handler);

for i in 0..task_size {
scheduler
.schedule(MockRequest {
region_id: i as RegionId,
})
.await
.unwrap();
}

tokio::time::timeout(Duration::from_secs(3), latch.wait())
.await
.unwrap();
}

#[tokio::test]
async fn test_scheduler_interval() {
common_telemetry::init_default_ut_logging();
let task_size = 100;
let latch = Arc::new(CountdownLatch::new(task_size));
let latch_clone = latch.clone();

let handler = MockHandler {
cb: move || {
latch_clone.countdown();
},
};

let config = SchedulerConfig {
max_inflight_task: 3,
};
let scheduler = LocalScheduler::new(config, handler);

for i in 0..task_size / 2 {
scheduler
.schedule(MockRequest {
region_id: i as RegionId,
})
.await
.unwrap();
}

tokio::time::sleep(Duration::from_millis(100)).await;
for i in task_size / 2..task_size {
scheduler
.schedule(MockRequest {
region_id: i as RegionId,
})
.await
.unwrap();
}

tokio::time::timeout(Duration::from_secs(6), latch.wait())
.await
.unwrap();
}

#[tokio::test]
async fn test_schedule_duplicate_tasks() {
common_telemetry::init_default_ut_logging();
let handler = MockHandler { cb: || {} };
let config = SchedulerConfig {
max_inflight_task: 3,
};
let scheduler = LocalScheduler::new(config, handler);

let mut scheduled_task = 0;
for _ in 0..10 {
if scheduler
.schedule(MockRequest { region_id: 1 })
.await
.unwrap()
{
scheduled_task += 1;
}
}
scheduler.stop().await.unwrap();
debug!("Schedule tasks: {}", scheduled_task);
assert!(scheduled_task < 10);
}
}
26 changes: 17 additions & 9 deletions src/storage/src/compaction/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,10 @@ fn fit_time_bucket(span_sec: i64) -> i64 {

#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use super::*;
use crate::file_purger::noop::new_noop_file_purger;
use crate::sst::FileMeta;

#[test]
Expand Down Expand Up @@ -227,14 +228,21 @@ mod tests {
}

fn new_file_handle(name: &str, start_ts_millis: i64, end_ts_millis: i64) -> FileHandle {
FileHandle::new(FileMeta {
file_name: name.to_string(),
time_range: Some((
Timestamp::new_millisecond(start_ts_millis),
Timestamp::new_millisecond(end_ts_millis),
)),
level: 0,
})
let file_purger = new_noop_file_purger();
let layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer {});
FileHandle::new(
FileMeta {
region_id: 0,
file_name: name.to_string(),
time_range: Some((
Timestamp::new_millisecond(start_ts_millis),
Timestamp::new_millisecond(end_ts_millis),
)),
level: 0,
},
layer,
file_purger,
)
}

fn new_file_handles(input: &[(&str, i64, i64)]) -> Vec<FileHandle> {
Expand Down
Loading

0 comments on commit af1f8d6

Please sign in to comment.