From 07c64678f80ff77be3dbd3d99fbe5558b4e72c97 Mon Sep 17 00:00:00 2001 From: koskja <87826472+koskja@users.noreply.github.com> Date: Thu, 19 Aug 2021 20:08:35 +0200 Subject: [PATCH] Chunk saving (#446) * First draft * Fix chunk loading * Make clippy happy * Revert "Make clippy happy" This reverts commit 6ea646869c3ae2700ecb5b26ce27a509b9a4f23d. * Implemented caching unloaded chunks * cache gets purged now * Added tests, fixed unload queue * cargo fmt * found it --- Cargo.lock | 1 + feather/base/src/anvil/region.rs | 10 +- feather/base/src/chunk.rs | 26 +-- feather/base/src/chunk_lock.rs | 134 +++++++++++++++ feather/base/src/lib.rs | 2 + feather/common/Cargo.toml | 1 + feather/common/src/chunk_cache.rs | 152 ++++++++++++++++++ feather/common/src/chunk_loading.rs | 9 +- feather/common/src/chunk_worker.rs | 115 +++++++++++++ feather/common/src/events.rs | 7 +- feather/common/src/lib.rs | 5 +- .../region.rs => region_worker.rs} | 97 ++++++----- feather/common/src/world.rs | 89 +++++----- feather/common/src/world_source.rs | 84 ---------- feather/common/src/world_source/generating.rs | 61 ------- feather/common/src/world_source/null.rs | 22 --- feather/old/core/network/src/packets.rs | 2 +- feather/old/server/lighting/src/lib.rs | 2 +- feather/old/server/player/src/view.rs | 2 +- .../src/packets/server/play/chunk_data.rs | 9 +- .../src/packets/server/play/update_light.rs | 9 +- feather/server/src/client.rs | 9 +- feather/server/src/main.rs | 34 ++-- 23 files changed, 569 insertions(+), 313 deletions(-) create mode 100644 feather/base/src/chunk_lock.rs create mode 100644 feather/common/src/chunk_cache.rs create mode 100644 feather/common/src/chunk_worker.rs rename feather/common/src/{world_source/region.rs => region_worker.rs} (61%) delete mode 100644 feather/common/src/world_source.rs delete mode 100644 feather/common/src/world_source/generating.rs delete mode 100644 feather/common/src/world_source/null.rs diff --git a/Cargo.lock b/Cargo.lock index e99ed3035..16f9b1c87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -822,6 +822,7 @@ dependencies = [ "log", "parking_lot", "quill-common", + "rand 0.8.3", "rayon", "smartstring", "uuid", diff --git a/feather/base/src/anvil/region.rs b/feather/base/src/anvil/region.rs index cbc50bca4..1fa1fe7e3 100644 --- a/feather/base/src/anvil/region.rs +++ b/feather/base/src/anvil/region.rs @@ -243,6 +243,14 @@ impl RegionHandle { Ok((chunk, level.entities.clone(), level.block_entities.clone())) } + /// Checks if the specified chunk position is generated in this region. + /// # Panics + /// Panics if the specified chunk position is not within this + /// region file. + pub fn check_chunk_existence(&self, pos: ChunkPosition) -> bool { + self.header.location_for_chunk(pos).exists() + } + /// Saves the given chunk to this region file. The header will be updated /// accordingly and saved as well. /// @@ -387,7 +395,7 @@ fn read_section_into_chunk(section: &mut LevelSection, chunk: &mut Chunk) -> Res let chunk_section = ChunkSection::new(blocks, light); - chunk.set_section_at(section.y as isize, Some(chunk_section)); + chunk.set_section_at_raw(section.y as isize, Some(chunk_section)); Ok(()) } diff --git a/feather/base/src/chunk.rs b/feather/base/src/chunk.rs index 9d71b04cf..d790d4314 100644 --- a/feather/base/src/chunk.rs +++ b/feather/base/src/chunk.rs @@ -254,13 +254,13 @@ impl Chunk { } /// Gets the chunk section at index `y`. - pub fn section(&self, y: usize) -> Option<&ChunkSection> { - self.sections.get(y)?.as_ref() + pub fn section(&self, y: isize) -> Option<&ChunkSection> { + self.sections.get((y + 1) as usize)?.as_ref() } /// Mutably gets the chunk section at index `y`. - pub fn section_mut(&mut self, y: usize) -> Option<&mut ChunkSection> { - self.sections.get_mut(y)?.as_mut() + pub fn section_mut(&mut self, y: isize) -> Option<&mut ChunkSection> { + self.sections.get_mut((y + 1) as usize)?.as_mut() } /// Sets the section at index `y`. @@ -268,6 +268,11 @@ impl Chunk { self.sections[(y + 1) as usize] = section; } + /// Directly sets the section at index `y` without offseting it. Useful when loading from region files + pub fn set_section_at_raw(&mut self, y: isize, section: Option) { + self.sections[y as usize] = section; + } + /// Gets the sections of this chunk. pub fn sections(&self) -> &[Option] { &self.sections @@ -419,7 +424,7 @@ mod tests { chunk.set_block_at(0, 0, 0, BlockId::andesite()); assert_eq!(chunk.block_at(0, 0, 0).unwrap(), BlockId::andesite()); - assert!(chunk.section(1).is_some()); + assert!(chunk.section(0).is_some()); } #[test] @@ -472,7 +477,7 @@ mod tests { assert_eq!(chunk.block_at(x, (section * 16) + y, z), Some(block)); if counter != 0 { assert!( - chunk.section(section + 1).is_some(), + chunk.section(section as isize).is_some(), "Section {} failed", section ); @@ -485,14 +490,17 @@ mod tests { // Go through again to be sure for section in 0..16 { - assert!(chunk.section(section + 1).is_some()); + assert!(chunk.section(section).is_some()); let mut counter = 0; for x in 0..16 { for y in 0..16 { for z in 0..16 { let block = BlockId::from_vanilla_id(counter); - assert_eq!(chunk.block_at(x, (section * 16) + y, z), Some(block)); - assert!(chunk.section(section + 1).is_some()); + assert_eq!( + chunk.block_at(x, (section as usize * 16) + y, z), + Some(block) + ); + assert!(chunk.section(section).is_some()); counter += 1; } } diff --git a/feather/base/src/chunk_lock.rs b/feather/base/src/chunk_lock.rs new file mode 100644 index 000000000..033281d09 --- /dev/null +++ b/feather/base/src/chunk_lock.rs @@ -0,0 +1,134 @@ +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +use crate::Chunk; +use anyhow::bail; +use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; + +pub type ChunkHandle = Arc; +/// A wrapper around a RwLock. Cannot be locked for writing when unloaded. +/// This structure exists so that a chunk can be read from even after being unloaded without accidentaly writing to it. +#[derive(Debug)] +pub struct ChunkLock { + loaded: AtomicBool, + lock: RwLock, +} +impl ChunkLock { + pub fn new(chunk: Chunk, loaded: bool) -> Self { + Self { + loaded: AtomicBool::new(loaded), + lock: RwLock::new(chunk), + } + } + /// Returns whether the chunk is loaded. + pub fn is_loaded(&self) -> bool { + self.loaded.load(Ordering::SeqCst) + } + /// Attempts to set the chunk as unloaded. Returns an error if the chunk is locked as writable. + pub fn set_unloaded(&self) -> anyhow::Result<()> { + if self.loaded.swap(false, Ordering::SeqCst) { + // FIXME: Decide what to do when unloading an unloaded chunk + } + if self.lock.try_read().is_none() { + // Locking fails when someone else already owns a write lock + bail!("Cannot unload chunk because it is locked as writable!") + } + Ok(()) + } + /// Sets the chunk as loaded and returns the previous state. + pub fn set_loaded(&self) -> bool { + self.loaded.swap(true, Ordering::SeqCst) + } + + /// Locks this chunk with read acccess. Doesn't block. + /// Returns None if the chunk is unloaded or locked for writing, Some otherwise. + pub fn try_read(&self) -> Option> { + self.lock.try_read() + } + + /// Locks this chunk with read acccess, blocking the current thread until it can be acquired. + /// Returns None if the chunk is unloaded, Some otherwise. + pub fn read(&self) -> RwLockReadGuard { + self.lock.read() + } + /// Locks this chunk with exclusive write acccess. Doesn't block. + /// Returns None if the chunk is unloaded or locked already, Some otherwise. + pub fn try_write(&self) -> Option> { + if self.is_loaded() { + self.lock.try_write() + } else { + None + } + } + /// Locks this chunk with exclusive write acccess, blocking the current thread until it can be acquired. + /// Returns None if the chunk is unloaded, Some otherwise. + pub fn write(&self) -> Option> { + if self.is_loaded() { + Some(self.lock.write()) + } else { + None + } + } + + pub fn is_locked(&self) -> bool { + self.lock.is_locked() + } +} + +#[cfg(test)] +mod tests { + use std::{ + thread::{sleep, spawn, JoinHandle}, + time::Duration, + }; + + use libcraft_core::ChunkPosition; + + use super::*; + fn empty_lock(x: i32, z: i32, loaded: bool) -> ChunkLock { + ChunkLock::new(Chunk::new(ChunkPosition::new(x, z)), loaded) + } + #[test] + fn normal_function() { + let lock = empty_lock(0, 0, true); + for _ in 0..100 { + // It should be possible to lock in any way + if rand::random::() { + let _guard = lock.try_read().unwrap(); + } else { + let _guard = lock.try_write().unwrap(); + } + } + } + #[test] + fn cannot_write_unloaded() { + let lock = empty_lock(0, 0, false); + assert!(lock.try_write().is_none()) + } + #[test] + fn can_read_unloaded() { + let lock = empty_lock(0, 0, false); + assert!(lock.try_read().is_some()) + } + #[test] + fn multithreaded() { + let lock = Arc::new(empty_lock(0, 0, true)); + let mut handles: Vec> = vec![]; + for _ in 0..20 { + let l = lock.clone(); + handles.push(spawn(move || { + while let Some(guard) = l.write() { + sleep(Duration::from_millis(10)); + drop(guard) + } + })) + } + sleep(Duration::from_millis(1000)); + lock.set_unloaded().unwrap_or(()); // Discard error + for h in handles { + h.join().unwrap() // Wait for all threads to stop + } + } +} diff --git a/feather/base/src/lib.rs b/feather/base/src/lib.rs index fe2337332..b4fc81c4d 100644 --- a/feather/base/src/lib.rs +++ b/feather/base/src/lib.rs @@ -11,11 +11,13 @@ use serde::{Deserialize, Serialize}; pub mod anvil; pub mod chunk; +pub mod chunk_lock; pub mod inventory; pub mod metadata; pub use blocks::*; pub use chunk::{Chunk, ChunkSection, CHUNK_HEIGHT, CHUNK_WIDTH}; +pub use chunk_lock::*; pub use generated::{Area, Biome, EntityKind, Inventory, Item, ItemStack}; pub use libcraft_blocks::{BlockKind, BlockState}; pub use libcraft_core::{position, vec3, BlockPosition, ChunkPosition, Gamemode, Position, Vec3d}; diff --git a/feather/common/Cargo.toml b/feather/common/Cargo.toml index a5c03589d..4560fb162 100644 --- a/feather/common/Cargo.toml +++ b/feather/common/Cargo.toml @@ -22,3 +22,4 @@ uuid = { version = "0.8", features = [ "v4" ] } libcraft-core = { path = "../../libcraft/core" } rayon = "1.5" worldgen = { path = "../worldgen", package = "feather-worldgen" } +rand = "0.8" \ No newline at end of file diff --git a/feather/common/src/chunk_cache.rs b/feather/common/src/chunk_cache.rs new file mode 100644 index 000000000..c07908968 --- /dev/null +++ b/feather/common/src/chunk_cache.rs @@ -0,0 +1,152 @@ +use std::{ + collections::VecDeque, + sync::Arc, + time::{Duration, Instant}, +}; + +use ahash::AHashMap; +use base::{ChunkHandle, ChunkPosition}; + +#[cfg(not(test))] +const CACHE_TIME: Duration = Duration::from_secs(30); +#[cfg(test)] +const CACHE_TIME: Duration = Duration::from_millis(500); + +/// This struct contains chunks that were unloaded but remain in memory in case they are needed. +#[derive(Default)] +pub struct ChunkCache { + map: AHashMap, // expire time + handle + unload_queue: VecDeque, +} +impl ChunkCache { + pub fn new() -> Self { + Self { + map: AHashMap::new(), + unload_queue: VecDeque::new(), + } + } + /// Purges all unused chunk handles. Handles that exist elswhere in the memory are not removed. + pub fn purge_unused(&mut self) { + let mut to_remove: Vec = vec![]; + for (pos, (_, arc)) in self.map.iter() { + if Arc::strong_count(arc) == 1 { + to_remove.push(*pos) + } + } + for i in to_remove { + self.map.remove(&i); + } + } + /// Purges all chunk handles in the cache, including those that exist elswhere. + pub fn purge_all(&mut self) { + self.map.clear(); + self.unload_queue.clear(); + } + fn ref_count(&self, pos: &ChunkPosition) -> Option { + self.map.get(pos).map(|(_, arc)| Arc::strong_count(arc)) + } + /// Purges all chunks that have been in unused the cache for longer than `CACHE_TIME`. Refreshes this timer for chunks that are in use at the moment. + pub fn purge_old_unused(&mut self) { + while let Some(&pos) = self.unload_queue.get(0) { + if !self.contains(&pos) { + // Might be caused by a manual purge + self.unload_queue.pop_front(); + continue; + } + if self.map.get(&pos).unwrap().0 > Instant::now() { + // Subsequent entries are 'scheduled' for later + break; + } + self.unload_queue.pop_front(); + if self.ref_count(&pos).unwrap() > 1 { + // Another copy of this handle already exists + self.unload_queue.push_back(pos); + self.map.entry(pos).and_modify(|(time, _)| { + *time = Instant::now() + CACHE_TIME; + }); + } else { + self.map.remove_entry(&pos); + } + } + } + /// Returns whether the chunk at the position is cached. + pub fn contains(&self, pos: &ChunkPosition) -> bool { + self.map.contains_key(pos) + } + /// Inserts a chunk handle into the cache, returning the previous handle if there was one. + pub fn insert(&mut self, pos: ChunkPosition, handle: ChunkHandle) -> Option { + self.unload_queue.push_back(pos); + self.map + .insert(pos, (Instant::now() + CACHE_TIME, handle)) + .map(|(_, handle)| handle) + } + /// Inserts a chunk handle into the cache. Reads the chunk's position by locking it. Blocks. + pub fn insert_read_pos(&mut self, handle: ChunkHandle) -> Option { + let pos = handle.read().position(); + self.insert(pos, handle) + } + /// Removes the chunk handle at the given position, returning the handle if it was cached. + pub fn remove(&mut self, pos: ChunkPosition) -> Option { + self.map.remove(&pos).map(|(_, handle)| handle) + } + /// Returns the chunk handle at the given position, if there was one. + pub fn get(&mut self, pos: ChunkPosition) -> Option { + self.map.get(&pos).map(|(_, handle)| handle.clone()) + } + pub fn len(&self) -> usize { + self.map.len() + } + pub fn is_empty(&self) -> bool { + self.map.is_empty() + } +} + +#[cfg(test)] +mod tests { + use std::{sync::Arc, thread::sleep}; + + use base::{Chunk, ChunkHandle, ChunkLock, ChunkPosition}; + + use super::{ChunkCache, CACHE_TIME}; + + #[test] + fn purge_unused() { + let mut cache = ChunkCache::new(); + let mut stored_handles: Vec = vec![]; + let mut used_count = 0; + for i in 0..100 { + let handle = Arc::new(ChunkLock::new(Chunk::new(ChunkPosition::new(i, 0)), false)); + if rand::random::() { + // clone this handle and pretend it is used + used_count += 1; + stored_handles.push(handle.clone()); + } + assert!(cache.insert_read_pos(handle).is_none()); + } + assert_eq!(cache.len(), 100); + cache.purge_unused(); + assert_eq!(cache.len(), used_count); + } + #[test] + fn purge_old_unused() { + let mut cache = ChunkCache::new(); + let mut stored_handles: Vec = vec![]; + let mut used_count = 0; + for i in 0..100 { + let handle = Arc::new(ChunkLock::new(Chunk::new(ChunkPosition::new(i, 0)), false)); + if rand::random::() { + // clone this handle and pretend it is used + used_count += 1; + stored_handles.push(handle.clone()); + } + assert!(cache.insert_read_pos(handle).is_none()); + } + cache.purge_old_unused(); + assert_eq!(cache.len(), 100); + sleep(CACHE_TIME); + sleep(CACHE_TIME); + assert_eq!(cache.len(), 100); + cache.purge_old_unused(); + assert_eq!(cache.len(), used_count); + } +} diff --git a/feather/common/src/chunk_loading.rs b/feather/common/src/chunk_loading.rs index e43463406..442d077f5 100644 --- a/feather/common/src/chunk_loading.rs +++ b/feather/common/src/chunk_loading.rs @@ -12,6 +12,7 @@ use ecs::{Entity, SysResult, SystemExecutor}; use utils::vec_remove_item; use crate::{ + chunk_worker::LoadRequest, events::{EntityRemoveEvent, ViewUpdateEvent}, Game, }; @@ -131,7 +132,7 @@ fn update_tickets_for_players(game: &mut Game, state: &mut ChunkLoadState) -> Sy // Load if needed if !game.world.is_chunk_loaded(new_chunk) && !game.world.is_chunk_loading(new_chunk) { - game.world.queue_chunk_load(new_chunk); + game.world.queue_chunk_load(LoadRequest { pos: new_chunk }); } } } @@ -155,8 +156,9 @@ fn unload_chunks(game: &mut Game, state: &mut ChunkLoadState) -> SysResult { continue; } - game.world.unload_chunk(unload.pos); + game.world.unload_chunk(unload.pos)?; } + game.world.cache.purge_unused(); Ok(()) } @@ -172,6 +174,5 @@ fn remove_dead_entities(game: &mut Game, state: &mut ChunkLoadState) -> SysResul /// System to call `World::load_chunks` each tick fn load_chunks(game: &mut Game, _state: &mut ChunkLoadState) -> SysResult { - game.world.load_chunks(&mut game.ecs); - Ok(()) + game.world.load_chunks(&mut game.ecs) } diff --git a/feather/common/src/chunk_worker.rs b/feather/common/src/chunk_worker.rs new file mode 100644 index 000000000..85f4d7229 --- /dev/null +++ b/feather/common/src/chunk_worker.rs @@ -0,0 +1,115 @@ +use std::{path::PathBuf, sync::Arc}; + +use anyhow::bail; +use base::{ + anvil::{block_entity::BlockEntityData, entity::EntityData}, + Chunk, ChunkHandle, ChunkPosition, +}; +use flume::{Receiver, Sender}; +use worldgen::WorldGenerator; + +use crate::region_worker::RegionWorker; + +#[derive(Debug)] +pub struct LoadRequest { + pub pos: ChunkPosition, +} +#[derive(Debug)] +pub struct LoadedChunk { + pub pos: ChunkPosition, + pub chunk: Chunk, +} + +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +pub enum ChunkLoadResult { + /// The chunk does not exist in this source. + Missing(ChunkPosition), + /// An error occurred while loading the chunk. + Error(anyhow::Error), + /// Successfully loaded the chunk. + Loaded(LoadedChunk), +} + +#[derive(Debug)] +pub struct SaveRequest { + pub pos: ChunkPosition, + pub chunk: ChunkHandle, + pub entities: Vec, + pub block_entities: Vec, +} + +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +pub enum WorkerRequest { + Load(LoadRequest), + Save(SaveRequest), +} +pub struct ChunkWorker { + generator: Arc, + send_req: Sender, + send_gen: Sender, + recv_gen: Receiver, // Chunk generation should be infallible. + recv_load: Receiver, +} + +impl ChunkWorker { + pub fn new(world_dir: impl Into, generator: Arc) -> Self { + let (send_req, recv_req) = flume::unbounded(); + let (send_gen, recv_gen) = flume::unbounded(); + let (region_worker, recv_load) = RegionWorker::new(world_dir.into(), recv_req); + region_worker.start(); + Self { + generator, + send_req, + send_gen, + recv_gen, + recv_load, + } + } + pub fn queue_load(&mut self, request: LoadRequest) { + self.send_req.send(WorkerRequest::Load(request)).unwrap() + } + + /// Helper function for poll_loaded_chunk. Attemts to receive a freshly generated chunk. + /// Function signature identical to that of poll_loaded_chunk for ease of use. + fn try_recv_gen(&mut self) -> Result, anyhow::Error> { + match self.recv_gen.try_recv() { + Ok(l) => Ok(Some(l)), + Err(e) => match e { + flume::TryRecvError::Empty => Ok(None), + flume::TryRecvError::Disconnected => bail!("chunkgen channel died"), + }, + } + } + pub fn poll_loaded_chunk(&mut self) -> Result, anyhow::Error> { + match self.recv_load.try_recv() { + Ok(answer) => { + match answer { + // RegionWorker answered + ChunkLoadResult::Missing(pos) => { + // chunk does not exist, queue it for generation + let send_gen = self.send_gen.clone(); + let gen = self.generator.clone(); + rayon::spawn(move || { + // spawn task to generate chunk + let chunk = gen.generate_chunk(pos); + send_gen.send(LoadedChunk { pos, chunk }).unwrap() + }); + self.try_recv_gen() // check for generated chunks + } + ChunkLoadResult::Error(e) => Err(e), + ChunkLoadResult::Loaded(l) => Ok(Some(l)), + } + } + Err(e) => match e { + flume::TryRecvError::Empty => self.try_recv_gen(), // check for generated chunks + flume::TryRecvError::Disconnected => bail!("RegionWorker died"), + }, + } + } + + pub fn queue_chunk_save(&mut self, req: SaveRequest) { + self.send_req.send(WorkerRequest::Save(req)).unwrap() + } +} diff --git a/feather/common/src/events.rs b/feather/common/src/events.rs index b1ee3422c..2426c2896 100644 --- a/feather/common/src/events.rs +++ b/feather/common/src/events.rs @@ -1,7 +1,4 @@ -use std::sync::Arc; - -use base::{Chunk, ChunkPosition}; -use parking_lot::RwLock; +use base::{ChunkHandle, ChunkPosition}; use crate::view::View; @@ -57,7 +54,7 @@ pub struct ChunkCrossEvent { #[derive(Debug)] pub struct ChunkLoadEvent { pub position: ChunkPosition, - pub chunk: Arc>, + pub chunk: ChunkHandle, } /// Triggered when an error occurs while loading a chunk. diff --git a/feather/common/src/lib.rs b/feather/common/src/lib.rs index ad70d0b70..94223229d 100644 --- a/feather/common/src/lib.rs +++ b/feather/common/src/lib.rs @@ -19,7 +19,10 @@ pub use window::Window; pub mod events; -pub mod world_source; +pub mod chunk_worker; +mod region_worker; + +pub mod chunk_cache; pub mod world; pub use world::World; diff --git a/feather/common/src/world_source/region.rs b/feather/common/src/region_worker.rs similarity index 61% rename from feather/common/src/world_source/region.rs rename to feather/common/src/region_worker.rs index 4f1ec64e6..836c252c5 100644 --- a/feather/common/src/world_source/region.rs +++ b/feather/common/src/region_worker.rs @@ -5,45 +5,13 @@ use std::{ }; use ahash::AHashMap; -use base::{ - anvil::region::{RegionHandle, RegionPosition}, - ChunkPosition, +use base::anvil::{ + self, + region::{RegionHandle, RegionPosition}, }; use flume::{Receiver, Sender}; -use super::{ChunkLoadResult, LoadedChunk, WorldSource}; - -/// World source loading from a vanilla (Anvil) world. -pub struct RegionWorldSource { - request_sender: Sender, - result_receiver: Receiver, -} - -impl RegionWorldSource { - pub fn new(world_dir: impl Into) -> Self { - let (request_sender, request_receiver) = flume::unbounded(); - let (worker, result_receiver) = Worker::new(world_dir.into(), request_receiver); - - worker.start(); - - Self { - request_sender, - result_receiver, - } - } -} - -impl WorldSource for RegionWorldSource { - fn queue_load(&mut self, pos: ChunkPosition) { - self.request_sender - .send(pos) - .expect("chunk worker panicked"); - } - - fn poll_loaded_chunk(&mut self) -> Option { - self.result_receiver.try_recv().ok() - } -} +use crate::chunk_worker::{ChunkLoadResult, LoadRequest, LoadedChunk, SaveRequest, WorkerRequest}; /// Duration to keep a region file open when not in use. const CACHE_TIME: Duration = Duration::from_secs(60); @@ -66,19 +34,19 @@ impl OpenRegionFile { } } -struct Worker { - request_receiver: Receiver, - result_sender: Sender, +pub struct RegionWorker { + request_receiver: Receiver, + result_sender: Sender, world_dir: PathBuf, region_files: AHashMap, last_cache_update: Instant, } -impl Worker { +impl RegionWorker { pub fn new( world_dir: PathBuf, - request_receiver: Receiver, - ) -> (Self, Receiver) { + request_receiver: Receiver, + ) -> (Self, Receiver) { let (result_sender, result_receiver) = flume::bounded(256); ( Self { @@ -102,8 +70,11 @@ impl Worker { fn run(mut self) { log::info!("Chunk worker started"); loop { - match self.request_receiver.recv_timeout(Duration::from_secs(30)) { - Ok(pos) => self.load_chunk(pos), + match self.request_receiver.recv_timeout(CACHE_TIME) { + Ok(req) => match req { + WorkerRequest::Load(load) => self.load_chunk(load), + WorkerRequest::Save(save) => self.save_chunk(save).unwrap(), + }, Err(flume::RecvTimeoutError::Timeout) => (), Err(flume::RecvTimeoutError::Disconnected) => { log::info!("Chunk worker shutting down"); @@ -114,26 +85,50 @@ impl Worker { } } - fn load_chunk(&mut self, pos: ChunkPosition) { - let result = self.get_chunk_load_result(pos); - let _ = self.result_sender.send(LoadedChunk { pos, result }); + fn save_chunk(&mut self, req: SaveRequest) -> anyhow::Result<()> { + let reg_pos = RegionPosition::from_chunk(req.pos); + let handle = &mut match self.region_file_handle(reg_pos) { + Some(h) => h, + None => { + let new_handle = anvil::region::create_region(&self.world_dir, reg_pos)?; + self.region_files + .insert(reg_pos, OpenRegionFile::new(new_handle)); + self.region_file_handle(reg_pos).unwrap() + } + } + .handle; + handle.save_chunk( + &req.chunk.read(), + &req.entities[..], + &req.block_entities[..], + )?; + Ok(()) } - fn get_chunk_load_result(&mut self, pos: ChunkPosition) -> ChunkLoadResult { + fn load_chunk(&mut self, req: LoadRequest) { + let result = self.get_chunk_load_result(req); + let _ = self.result_sender.send(result); + } + + fn get_chunk_load_result(&mut self, req: LoadRequest) -> ChunkLoadResult { + let pos = req.pos; let region = RegionPosition::from_chunk(pos); let file = match self.region_file_handle(region) { Some(file) => file, - None => return ChunkLoadResult::Missing, + None => return ChunkLoadResult::Missing(pos), }; let chunk = match file.handle.load_chunk(pos) { Ok((chunk, _, _)) => chunk, - Err(e) => return ChunkLoadResult::Error(e.into()), + Err(e) => match e { + anvil::region::Error::ChunkNotExist => return ChunkLoadResult::Missing(pos), + err => return ChunkLoadResult::Error(err.into()), + }, }; file.last_used = Instant::now(); - ChunkLoadResult::Loaded { chunk } + ChunkLoadResult::Loaded(LoadedChunk { pos, chunk }) } fn region_file_handle(&mut self, region: RegionPosition) -> Option<&mut OpenRegionFile> { diff --git a/feather/common/src/world.rs b/feather/common/src/world.rs index 72e5e64a3..d27070755 100644 --- a/feather/common/src/world.rs +++ b/feather/common/src/world.rs @@ -1,13 +1,15 @@ use ahash::{AHashMap, AHashSet}; -use base::{BlockPosition, Chunk, ChunkPosition, CHUNK_HEIGHT}; +use base::{BlockPosition, Chunk, ChunkHandle, ChunkLock, ChunkPosition, CHUNK_HEIGHT}; use blocks::BlockId; -use ecs::Ecs; -use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -use std::sync::Arc; +use ecs::{Ecs, SysResult}; +use parking_lot::{RwLockReadGuard, RwLockWriteGuard}; +use std::{path::PathBuf, sync::Arc}; +use worldgen::{ComposableGenerator, WorldGenerator}; use crate::{ + chunk_cache::ChunkCache, + chunk_worker::{ChunkWorker, LoadRequest, SaveRequest}, events::ChunkLoadEvent, - world_source::{null::NullWorldSource, ChunkLoadResult, WorldSource}, }; /// Stores all blocks and chunks in a world, @@ -18,7 +20,8 @@ use crate::{ /// This does not store entities; it only contains blocks. pub struct World { chunk_map: ChunkMap, - world_source: Box, + pub cache: ChunkCache, + chunk_worker: ChunkWorker, loading_chunks: AHashSet, canceled_chunk_loads: AHashSet, } @@ -27,7 +30,11 @@ impl Default for World { fn default() -> Self { Self { chunk_map: ChunkMap::new(), - world_source: Box::new(NullWorldSource::default()), + chunk_worker: ChunkWorker::new( + "world", + Arc::new(ComposableGenerator::default_with_seed(0)), + ), + cache: ChunkCache::new(), loading_chunks: AHashSet::new(), canceled_chunk_loads: AHashSet::new(), } @@ -39,43 +46,41 @@ impl World { Self::default() } - /// Creates a `World` from a `WorldSource` for loading chunks. - pub fn with_source(world_source: impl WorldSource + 'static) -> Self { + pub fn with_gen_and_path( + generator: Arc, + world_dir: impl Into, + ) -> Self { Self { - world_source: Box::new(world_source), + chunk_worker: ChunkWorker::new(world_dir, generator), ..Default::default() } } - /// Queues the given chunk to be loaded. - pub fn queue_chunk_load(&mut self, pos: ChunkPosition) { - self.loading_chunks.insert(pos); - self.world_source.queue_load(pos); + /// Queues the given chunk to be loaded. If the chunk was cached, it is loaded immediately. + pub fn queue_chunk_load(&mut self, req: LoadRequest) { + let pos = req.pos; + if self.cache.contains(&pos) { + // Move the chunk from the cache to the map + self.chunk_map + .0 + .insert(pos, self.cache.remove(pos).unwrap()); + self.chunk_map.chunk_handle_at(pos).unwrap().set_loaded(); + } else { + self.loading_chunks.insert(req.pos); + self.chunk_worker.queue_load(req); + } } /// Loads any chunks that have been loaded asynchronously /// after a call to [`World::queue_chunk_load`]. - pub fn load_chunks(&mut self, ecs: &mut Ecs) { - while let Some(loaded) = self.world_source.poll_loaded_chunk() { + pub fn load_chunks(&mut self, ecs: &mut Ecs) -> SysResult { + while let Some(loaded) = self.chunk_worker.poll_loaded_chunk()? { self.loading_chunks.remove(&loaded.pos); if self.canceled_chunk_loads.remove(&loaded.pos) { continue; } + let chunk = loaded.chunk; - let chunk = match loaded.result { - ChunkLoadResult::Missing => { - log::debug!( - "Chunk {:?} is missing; using default empty chunk", - loaded.pos - ); - Chunk::new(loaded.pos) - } - ChunkLoadResult::Error(e) => { - log::error!("Failed to load chunk {:?}: {:?}", loaded.pos, e); - continue; - } - ChunkLoadResult::Loaded { chunk } => chunk, - }; self.chunk_map.insert_chunk(chunk); ecs.insert_event(ChunkLoadEvent { chunk: Arc::clone(&self.chunk_map.0[&loaded.pos]), @@ -83,16 +88,28 @@ impl World { }); log::trace!("Loaded chunk {:?}", loaded.pos); } + Ok(()) } /// Unloads the given chunk. - pub fn unload_chunk(&mut self, pos: ChunkPosition) { + pub fn unload_chunk(&mut self, pos: ChunkPosition) -> anyhow::Result<()> { + if let Some((pos, handle)) = self.chunk_map.0.remove_entry(&pos) { + handle.set_unloaded()?; + self.chunk_worker.queue_chunk_save(SaveRequest { + pos, + chunk: handle.clone(), + entities: vec![], + block_entities: vec![], + }); + self.cache.insert(pos, handle); + } self.chunk_map.remove_chunk(pos); if self.is_chunk_loading(pos) { self.canceled_chunk_loads.insert(pos); } log::trace!("Unloaded chunk {:?}", pos); + Ok(()) } /// Returns whether the given chunk is loaded. @@ -134,7 +151,7 @@ impl World { } } -pub type ChunkMapInner = AHashMap>>; +pub type ChunkMapInner = AHashMap; /// This struct stores all the chunks on the server, /// so it allows access to blocks and lighting data. @@ -162,11 +179,11 @@ impl ChunkMap { /// Retrieves a handle to the chunk at the given /// position, or `None` if it is not loaded. pub fn chunk_at_mut(&self, pos: ChunkPosition) -> Option> { - self.0.get(&pos).map(|lock| lock.write()) + self.0.get(&pos).map(|lock| lock.write()).flatten() } /// Returns an `Arc>` at the given position. - pub fn chunk_handle_at(&self, pos: ChunkPosition) -> Option>> { + pub fn chunk_handle_at(&self, pos: ChunkPosition) -> Option { self.0.get(&pos).map(Arc::clone) } @@ -190,14 +207,14 @@ impl ChunkMap { } /// Returns an iterator over chunks. - pub fn iter_chunks(&self) -> impl IntoIterator>> { + pub fn iter_chunks(&self) -> impl IntoIterator { self.0.values() } /// Inserts a new chunk into the chunk map. pub fn insert_chunk(&mut self, chunk: Chunk) { self.0 - .insert(chunk.position(), Arc::new(RwLock::new(chunk))); + .insert(chunk.position(), Arc::new(ChunkLock::new(chunk, true))); } /// Removes the chunk at the given position, returning `true` if it existed. diff --git a/feather/common/src/world_source.rs b/feather/common/src/world_source.rs deleted file mode 100644 index 2e158daeb..000000000 --- a/feather/common/src/world_source.rs +++ /dev/null @@ -1,84 +0,0 @@ -use base::{Chunk, ChunkPosition}; - -pub mod generating; -pub mod null; -pub mod region; - -/// A chunk loaded from a [`WorldSource`]. -pub struct LoadedChunk { - pub pos: ChunkPosition, - pub result: ChunkLoadResult, -} - -#[derive(Debug)] -#[allow(clippy::large_enum_variant)] -pub enum ChunkLoadResult { - /// The chunk does not exist in this source. - Missing, - /// An error occurred while loading the chunk. - Error(anyhow::Error), - /// Successfully loaded the chunk. - Loaded { chunk: Chunk }, -} - -/// Provides methods to load chunks, entities, and global world data. -pub trait WorldSource: 'static { - /// Enqueues the chunk at `pos` to be loaded. - /// A future call to `poll_loaded_chunk` should - /// return this chunk. - fn queue_load(&mut self, pos: ChunkPosition); - - /// Polls for the next loaded chunk. Should not block - /// - /// The order in which chunks are loaded is not defined. In other - /// words, this method does not need to yield chunks in the - /// same order they were queued for loading. - fn poll_loaded_chunk(&mut self) -> Option; - - /// Creates a `WorldSource` that falls back to `fallback` - /// if chunks in `self` are missing or corrupt. - fn with_fallback(self, fallback: impl WorldSource) -> FallbackWorldSource - where - Self: Sized, - { - FallbackWorldSource { - first: Box::new(self), - fallback: Box::new(fallback), - } - } -} - -/// `WorldSource` wrapping two world sources. Falls back -/// to the second source if the first one is missing a chunk. -pub struct FallbackWorldSource { - first: Box, - fallback: Box, -} - -impl WorldSource for FallbackWorldSource { - fn queue_load(&mut self, pos: ChunkPosition) { - self.first.queue_load(pos); - } - - fn poll_loaded_chunk(&mut self) -> Option { - self.first - .poll_loaded_chunk() - .map(|chunk| { - if matches!( - &chunk.result, - ChunkLoadResult::Error(_) | ChunkLoadResult::Missing - ) { - self.fallback.queue_load(chunk.pos); - log::trace!( - "Chunk load falling back (failure cause: {:?})", - chunk.result - ); - None - } else { - Some(chunk) - } - }) - .flatten() - .or_else(|| self.fallback.poll_loaded_chunk()) - } -} diff --git a/feather/common/src/world_source/generating.rs b/feather/common/src/world_source/generating.rs deleted file mode 100644 index 92e73ee97..000000000 --- a/feather/common/src/world_source/generating.rs +++ /dev/null @@ -1,61 +0,0 @@ -use std::sync::Arc; - -use base::ChunkPosition; -use flume::{unbounded, Receiver, Sender}; -use worldgen::{ComposableGenerator, WorldGenerator}; - -use super::{ChunkLoadResult, LoadedChunk, WorldSource}; - -pub struct GeneratingWorldSource { - send: Sender, - recv: Receiver, - generator: Arc, -} - -impl Default for GeneratingWorldSource { - fn default() -> Self { - Self::new(ComposableGenerator::default_with_seed(42)) - } -} - -impl GeneratingWorldSource { - pub fn from_arc(generator: Arc) -> Self - where - G: WorldGenerator + 'static, - { - let (send_gen, recv_gen) = unbounded::(); - Self { - send: send_gen, - recv: recv_gen, - generator, - } - } - pub fn new(generator: G) -> Self - where - G: WorldGenerator + 'static, - { - Self::from_arc(Arc::new(generator)) - } - pub fn default_with_seed(seed: u64) -> Self { - Self::new(ComposableGenerator::default_with_seed(seed)) - } -} - -impl WorldSource for GeneratingWorldSource { - fn queue_load(&mut self, pos: ChunkPosition) { - let send = self.send.clone(); - let gen = self.generator.clone(); - rayon::spawn(move || { - let chunk = gen.generate_chunk(pos); - let loaded = LoadedChunk { - pos, - result: ChunkLoadResult::Loaded { chunk }, - }; - send.send(loaded).unwrap(); - }); - } - - fn poll_loaded_chunk(&mut self) -> Option { - self.recv.try_recv().ok() - } -} diff --git a/feather/common/src/world_source/null.rs b/feather/common/src/world_source/null.rs deleted file mode 100644 index 8a3a929d6..000000000 --- a/feather/common/src/world_source/null.rs +++ /dev/null @@ -1,22 +0,0 @@ -use base::ChunkPosition; - -use super::{ChunkLoadResult, LoadedChunk, WorldSource}; - -/// A world source that always yields `ChunkLoadResult::Missing`. -#[derive(Default)] -pub struct NullWorldSource { - loaded: Vec, -} - -impl WorldSource for NullWorldSource { - fn queue_load(&mut self, pos: ChunkPosition) { - self.loaded.push(pos); - } - - fn poll_loaded_chunk(&mut self) -> Option { - self.loaded.pop().map(|pos| LoadedChunk { - pos, - result: ChunkLoadResult::Missing, - }) - } -} diff --git a/feather/old/core/network/src/packets.rs b/feather/old/core/network/src/packets.rs index 994a277cb..e5b26ddd1 100644 --- a/feather/old/core/network/src/packets.rs +++ b/feather/old/core/network/src/packets.rs @@ -1817,7 +1817,7 @@ pub struct KeepAliveClientbound { #[derive(Default, AsAny, Clone)] pub struct ChunkData { - pub chunk: Arc>, + pub chunk: ChunkHandle, } impl Packet for ChunkData { diff --git a/feather/old/server/lighting/src/lib.rs b/feather/old/server/lighting/src/lib.rs index e433a071a..9e165cd45 100644 --- a/feather/old/server/lighting/src/lib.rs +++ b/feather/old/server/lighting/src/lib.rs @@ -107,7 +107,7 @@ pub enum Request { /// Notifies the worker of a new loaded chunk. LoadChunk { pos: ChunkPosition, - handle: Arc>, + handle: ChunkHandle, }, /// Notifies the worker that a chunk was unloaded. UnloadChunk { pos: ChunkPosition }, diff --git a/feather/old/server/player/src/view.rs b/feather/old/server/player/src/view.rs index aadb08657..d39bac190 100644 --- a/feather/old/server/player/src/view.rs +++ b/feather/old/server/player/src/view.rs @@ -356,6 +356,6 @@ pub fn on_chunk_load_send_to_clients( } /// Creates a chunk data packet for the given chunk. -fn create_chunk_data(chunk: Arc>) -> ChunkData { +fn create_chunk_data(chunk: ChunkHandle) -> ChunkData { ChunkData { chunk } } diff --git a/feather/protocol/src/packets/server/play/chunk_data.rs b/feather/protocol/src/packets/server/play/chunk_data.rs index d6301c6d2..6eab56915 100644 --- a/feather/protocol/src/packets/server/play/chunk_data.rs +++ b/feather/protocol/src/packets/server/play/chunk_data.rs @@ -4,10 +4,9 @@ use std::{ sync::Arc, }; -use base::{Chunk, ChunkPosition, ChunkSection}; +use base::{Chunk, ChunkHandle, ChunkLock, ChunkPosition, ChunkSection}; use blocks::BlockId; use generated::Biome; -use parking_lot::RwLock; use serde::{ de, de::{SeqAccess, Visitor}, @@ -37,7 +36,7 @@ pub enum ChunkDataKind { #[derive(Clone)] pub struct ChunkData { /// The chunk to send. - pub chunk: Arc>, + pub chunk: ChunkHandle, /// Whether this packet will load a chunk on /// the client or overwrite an existing one. @@ -198,7 +197,7 @@ impl Readable for ChunkData { for i in 0..16 { if (primary_bit_mask & (1 << i)) != 0 { - if chunk.section(i + 1).is_none() { + if chunk.section(i).is_none() { chunk.set_section_at(i as isize, Some(ChunkSection::default())); } if let Some(section) = chunk.section_mut(i + 1) { @@ -232,7 +231,7 @@ impl Readable for ChunkData { VarInt::read(buffer, version)?; // Block entities length, redundant for feather right now Ok(Self { - chunk: Arc::new(RwLock::new(chunk)), + chunk: Arc::new(ChunkLock::new(chunk, true)), kind: chunk_data_kind, }) } diff --git a/feather/protocol/src/packets/server/play/update_light.rs b/feather/protocol/src/packets/server/play/update_light.rs index 2bb045537..27e76c9f5 100644 --- a/feather/protocol/src/packets/server/play/update_light.rs +++ b/feather/protocol/src/packets/server/play/update_light.rs @@ -1,13 +1,12 @@ use std::{fmt::Debug, sync::Arc}; -use base::{chunk::PackedArray, Chunk, ChunkPosition, ChunkSection}; -use parking_lot::RwLock; +use base::{chunk::PackedArray, Chunk, ChunkHandle, ChunkLock, ChunkPosition, ChunkSection}; use crate::{io::VarInt, ProtocolVersion, Readable, Writeable}; #[derive(Clone)] pub struct UpdateLight { - pub chunk: Arc>, + pub chunk: ChunkHandle, } impl Debug for UpdateLight { @@ -88,7 +87,7 @@ impl Readable for UpdateLight { bytes.push(u8::read(buffer, version)?); } let mut bytes = bytes.iter(); - if chunk.section(i + 1).is_none() { + if chunk.section(i).is_none() { chunk.set_section_at(i as isize, Some(ChunkSection::default())); } if let Some(section) = chunk.section_mut(i + 1) { @@ -125,7 +124,7 @@ impl Readable for UpdateLight { } Ok(Self { - chunk: Arc::new(RwLock::new(chunk)), + chunk: Arc::new(ChunkLock::new(chunk, true)), }) } } diff --git a/feather/server/src/client.rs b/feather/server/src/client.rs index 37cb253fa..96cdee503 100644 --- a/feather/server/src/client.rs +++ b/feather/server/src/client.rs @@ -7,8 +7,8 @@ use std::{ use ahash::AHashSet; use base::{ - BlockId, BlockPosition, Chunk, ChunkPosition, EntityKind, EntityMetadata, Gamemode, ItemStack, - Position, ProfileProperty, Text, + BlockId, BlockPosition, ChunkHandle, ChunkPosition, EntityKind, EntityMetadata, Gamemode, + ItemStack, Position, ProfileProperty, Text, }; use common::{ chat::{ChatKind, ChatMessage}, @@ -16,7 +16,6 @@ use common::{ }; use flume::{Receiver, Sender}; use packets::server::{Particle, SetSlot, SpawnLivingEntity, UpdateLight, WindowConfirmation}; -use parking_lot::RwLock; use protocol::{ packets::{ self, @@ -257,7 +256,7 @@ impl Client { }); } - pub fn send_chunk(&self, chunk: &Arc>) { + pub fn send_chunk(&self, chunk: &ChunkHandle) { self.chunk_send_queue.borrow_mut().push_back(ChunkData { chunk: Arc::clone(chunk), kind: ChunkDataKind::LoadChunk, @@ -267,7 +266,7 @@ impl Client { .insert(chunk.read().position()); } - pub fn overwrite_chunk_sections(&self, chunk: &Arc>, sections: Vec) { + pub fn overwrite_chunk_sections(&self, chunk: &ChunkHandle, sections: Vec) { self.send_packet(ChunkData { chunk: Arc::clone(chunk), kind: ChunkDataKind::OverwriteChunk { sections }, diff --git a/feather/server/src/main.rs b/feather/server/src/main.rs index 698349c5c..615857028 100644 --- a/feather/server/src/main.rs +++ b/feather/server/src/main.rs @@ -1,18 +1,12 @@ -use std::{cell::RefCell, rc::Rc}; +use std::{cell::RefCell, rc::Rc, sync::Arc}; use anyhow::Context; use base::anvil::level::SuperflatGeneratorOptions; -use common::{ - world_source::{ - generating::GeneratingWorldSource, null::NullWorldSource, region::RegionWorldSource, - WorldSource, - }, - Game, TickLoop, World, -}; +use common::{Game, TickLoop, World}; use ecs::SystemExecutor; -use feather_server::{config, Server}; +use feather_server::{config::Config, Server}; use plugin_host::PluginManager; -use worldgen::SuperflatWorldGenerator; +use worldgen::{ComposableGenerator, SuperflatWorldGenerator, WorldGenerator}; mod logging; @@ -35,17 +29,17 @@ async fn main() -> anyhow::Result<()> { let options = config.to_options(); let server = Server::bind(options).await?; - let game = init_game(server, config.world)?; + let game = init_game(server, &config)?; run(game); Ok(()) } -fn init_game(server: Server, world_options: config::World) -> anyhow::Result { +fn init_game(server: Server, config: &Config) -> anyhow::Result { let mut game = Game::new(); init_systems(&mut game, server); - init_world_source(&mut game, world_options); + init_world_source(&mut game, config); init_plugin_manager(&mut game)?; Ok(game) } @@ -64,23 +58,21 @@ fn init_systems(game: &mut Game, server: Server) { game.system_executor = Rc::new(RefCell::new(systems)); } -fn init_world_source(game: &mut Game, options: config::World) { +fn init_world_source(game: &mut Game, config: &Config) { // Load chunks from the world save first, // and fall back to generating a superflat // world otherwise. This is a placeholder: // we don't have proper world generation yet. - let region_source = RegionWorldSource::new(options.name); let seed = 42; // FIXME: load from the level file - let world_source = match &options.generator[..] { - "default" => region_source.with_fallback(GeneratingWorldSource::default_with_seed(seed)), - "flat" => region_source.with_fallback(GeneratingWorldSource::new( - SuperflatWorldGenerator::new(SuperflatGeneratorOptions::default()), + let generator: Arc = match &config.world.generator[..] { + "flat" => Arc::new(SuperflatWorldGenerator::new( + SuperflatGeneratorOptions::default(), )), - _ => region_source.with_fallback(NullWorldSource::default()), + _ => Arc::new(ComposableGenerator::default_with_seed(seed)), }; - game.world = World::with_source(world_source); + game.world = World::with_gen_and_path(generator, config.world.name.clone()); } fn init_plugin_manager(game: &mut Game) -> anyhow::Result<()> {