diff --git a/crates/bevy_asset/Cargo.toml b/crates/bevy_asset/Cargo.toml index 6d5e3f5fcd56b..9350dabbba93f 100644 --- a/crates/bevy_asset/Cargo.toml +++ b/crates/bevy_asset/Cargo.toml @@ -31,7 +31,8 @@ bevy_utils = { path = "../bevy_utils", version = "0.14.0-dev" } async-broadcast = "0.5" async-fs = "2.0" async-lock = "3.0" -crossbeam-channel = "0.5" +async-channel = "2.2" +concurrent-queue = "2.4" downcast-rs = "1.2" futures-io = "0.3" futures-lite = "2.0.1" diff --git a/crates/bevy_asset/src/assets.rs b/crates/bevy_asset/src/assets.rs index b162f3f21b9ea..65f6fca125fb4 100644 --- a/crates/bevy_asset/src/assets.rs +++ b/crates/bevy_asset/src/assets.rs @@ -8,7 +8,7 @@ use bevy_ecs::{ }; use bevy_reflect::{Reflect, TypePath}; use bevy_utils::HashMap; -use crossbeam_channel::{Receiver, Sender}; +use concurrent_queue::ConcurrentQueue; use serde::{Deserialize, Serialize}; use std::{ any::TypeId, @@ -50,22 +50,16 @@ impl AssetIndex { pub(crate) struct AssetIndexAllocator { /// A monotonically increasing index. next_index: AtomicU32, - recycled_queue_sender: Sender, + recycled_queue: ConcurrentQueue, /// This receives every recycled [`AssetIndex`]. It serves as a buffer/queue to store indices ready for reuse. - recycled_queue_receiver: Receiver, - recycled_sender: Sender, - recycled_receiver: Receiver, + recycled: ConcurrentQueue, } impl Default for AssetIndexAllocator { fn default() -> Self { - let (recycled_queue_sender, recycled_queue_receiver) = crossbeam_channel::unbounded(); - let (recycled_sender, recycled_receiver) = crossbeam_channel::unbounded(); Self { - recycled_queue_sender, - recycled_queue_receiver, - recycled_sender, - recycled_receiver, + recycled_queue: ConcurrentQueue::unbounded(), + recycled: ConcurrentQueue::unbounded(), next_index: Default::default(), } } @@ -75,9 +69,9 @@ impl AssetIndexAllocator { /// Reserves a new [`AssetIndex`], either by reusing a recycled index (with an incremented generation), or by creating a new index /// by incrementing the index counter for a given asset type `A`. pub fn reserve(&self) -> AssetIndex { - if let Ok(mut recycled) = self.recycled_queue_receiver.try_recv() { + if let Ok(mut recycled) = self.recycled_queue.pop() { recycled.generation += 1; - self.recycled_sender.send(recycled).unwrap(); + self.recycled.push(recycled).unwrap(); recycled } else { AssetIndex { @@ -91,7 +85,7 @@ impl AssetIndexAllocator { /// Queues the given `index` for reuse. This should only be done if the `index` is no longer being used. pub fn recycle(&self, index: AssetIndex) { - self.recycled_queue_sender.send(index).unwrap(); + self.recycled_queue.push(index).unwrap(); } } @@ -246,7 +240,7 @@ impl DenseAssetStorage { value: None, generation: 0, }); - while let Ok(recycled) = self.allocator.recycled_receiver.try_recv() { + for recycled in self.allocator.recycled.try_iter() { let entry = &mut self.storage[recycled.index as usize]; *entry = Entry::Some { value: None, @@ -546,7 +540,7 @@ impl Assets { // to other asset info operations let mut infos = asset_server.data.infos.write(); let mut not_ready = Vec::new(); - while let Ok(drop_event) = assets.handle_provider.drop_receiver.try_recv() { + while let Ok(drop_event) = assets.handle_provider.drop_queue.try_recv() { let id = drop_event.id.typed(); if drop_event.asset_server_managed { @@ -572,7 +566,7 @@ impl Assets { // TODO: this is _extremely_ inefficient find a better fix // This will also loop failed assets indefinitely. Is that ok? for event in not_ready { - assets.handle_provider.drop_sender.send(event).unwrap(); + assets.handle_provider.drop_queue.send(event); } } diff --git a/crates/bevy_asset/src/handle.rs b/crates/bevy_asset/src/handle.rs index b2ebea2af9024..3ecd0f0dae2bb 100644 --- a/crates/bevy_asset/src/handle.rs +++ b/crates/bevy_asset/src/handle.rs @@ -1,11 +1,11 @@ use crate::{ - meta::MetaTransform, Asset, AssetId, AssetIndexAllocator, AssetPath, InternalAssetId, - UntypedAssetId, + meta::MetaTransform, + utils::{EventQueue, EventSender}, + Asset, AssetId, AssetIndexAllocator, AssetPath, InternalAssetId, UntypedAssetId, }; use bevy_ecs::prelude::*; use bevy_reflect::{std_traits::ReflectDefault, Reflect, TypePath}; use bevy_utils::get_short_name; -use crossbeam_channel::{Receiver, Sender}; use std::{ any::TypeId, hash::{Hash, Hasher}, @@ -19,8 +19,7 @@ use uuid::Uuid; #[derive(Clone)] pub struct AssetHandleProvider { pub(crate) allocator: Arc, - pub(crate) drop_sender: Sender, - pub(crate) drop_receiver: Receiver, + pub(crate) drop_queue: EventQueue, pub(crate) type_id: TypeId, } @@ -32,12 +31,10 @@ pub(crate) struct DropEvent { impl AssetHandleProvider { pub(crate) fn new(type_id: TypeId, allocator: Arc) -> Self { - let (drop_sender, drop_receiver) = crossbeam_channel::unbounded(); Self { type_id, allocator, - drop_sender, - drop_receiver, + drop_queue: EventQueue::new(), } } @@ -57,7 +54,7 @@ impl AssetHandleProvider { ) -> Arc { Arc::new(StrongHandle { id: id.untyped(self.type_id), - drop_sender: self.drop_sender.clone(), + drop: self.drop_queue.sender(), meta_transform, path, asset_server_managed, @@ -91,12 +88,12 @@ pub struct StrongHandle { /// 1. configuration tied to the lifetime of a specific asset load /// 2. configuration that must be repeatable when the asset is hot-reloaded pub(crate) meta_transform: Option, - pub(crate) drop_sender: Sender, + pub(crate) drop: EventSender, } impl Drop for StrongHandle { fn drop(&mut self) { - let _ = self.drop_sender.send(DropEvent { + self.drop.send(DropEvent { id: self.id.internal(), asset_server_managed: self.asset_server_managed, }); @@ -109,7 +106,7 @@ impl std::fmt::Debug for StrongHandle { .field("id", &self.id) .field("asset_server_managed", &self.asset_server_managed) .field("path", &self.path) - .field("drop_sender", &self.drop_sender) + .field("drop_sender", &self.drop) .finish() } } diff --git a/crates/bevy_asset/src/io/embedded/embedded_watcher.rs b/crates/bevy_asset/src/io/embedded/embedded_watcher.rs index 485593599c489..61e7152833e41 100644 --- a/crates/bevy_asset/src/io/embedded/embedded_watcher.rs +++ b/crates/bevy_asset/src/io/embedded/embedded_watcher.rs @@ -1,7 +1,10 @@ -use crate::io::{ - file::{get_asset_path, get_base_path, new_asset_event_debouncer, FilesystemEventHandler}, - memory::Dir, - AssetSourceEvent, AssetWatcher, +use crate::{ + io::{ + file::{get_asset_path, get_base_path, new_asset_event_debouncer, FilesystemEventHandler}, + memory::Dir, + AssetSourceEvent, AssetWatcher, + }, + EventSender, }; use bevy_utils::tracing::warn; use bevy_utils::{Duration, HashMap}; @@ -26,7 +29,7 @@ impl EmbeddedWatcher { pub fn new( dir: Dir, root_paths: Arc, PathBuf>>>, - sender: crossbeam_channel::Sender, + sender: EventSender, debounce_wait_time: Duration, ) -> Self { let root = get_base_path(); @@ -48,7 +51,7 @@ impl AssetWatcher for EmbeddedWatcher {} /// binary-embedded Rust source files. This will read the contents of changed files from the file system and overwrite /// the initial static bytes from the file embedded in the binary. pub(crate) struct EmbeddedEventHandler { - sender: crossbeam_channel::Sender, + sender: EventSender, root_paths: Arc, PathBuf>>>, root: PathBuf, dir: Dir, @@ -82,7 +85,7 @@ impl FilesystemEventHandler for EmbeddedEventHandler { } } self.last_event = Some(event.clone()); - self.sender.send(event).unwrap(); + self.sender.send(event); } } } diff --git a/crates/bevy_asset/src/io/file/file_watcher.rs b/crates/bevy_asset/src/io/file/file_watcher.rs index fbed3f8a0ecbe..c43df82835c84 100644 --- a/crates/bevy_asset/src/io/file/file_watcher.rs +++ b/crates/bevy_asset/src/io/file/file_watcher.rs @@ -1,8 +1,8 @@ use crate::io::{AssetSourceEvent, AssetWatcher}; use crate::path::normalize_path; +use crate::EventSender; use bevy_utils::tracing::error; use bevy_utils::Duration; -use crossbeam_channel::Sender; use notify_debouncer_full::{ new_debouncer, notify::{ @@ -26,7 +26,7 @@ pub struct FileWatcher { impl FileWatcher { pub fn new( root: PathBuf, - sender: Sender, + sender: EventSender, debounce_wait_time: Duration, ) -> Result { let root = normalize_path(super::get_base_path().join(root).as_path()); @@ -244,7 +244,7 @@ pub(crate) fn new_asset_event_debouncer( } pub(crate) struct FileEventHandler { - sender: Sender, + sender: EventSender, root: PathBuf, last_event: Option, } @@ -260,7 +260,7 @@ impl FilesystemEventHandler for FileEventHandler { fn handle(&mut self, _absolute_paths: &[PathBuf], event: AssetSourceEvent) { if self.last_event.as_ref() != Some(&event) { self.last_event = Some(event.clone()); - self.sender.send(event).unwrap(); + self.sender.send(event); } } } diff --git a/crates/bevy_asset/src/io/gated.rs b/crates/bevy_asset/src/io/gated.rs index d3d2b35f1f066..b03c9d4f4b6ca 100644 --- a/crates/bevy_asset/src/io/gated.rs +++ b/crates/bevy_asset/src/io/gated.rs @@ -1,6 +1,6 @@ use crate::io::{AssetReader, AssetReaderError, PathStream, Reader}; +use async_channel::{Receiver, Sender}; use bevy_utils::HashMap; -use crossbeam_channel::{Receiver, Sender}; use parking_lot::RwLock; use std::{path::Path, sync::Arc}; @@ -34,8 +34,9 @@ impl GateOpener { let mut gates = self.gates.write(); let gates = gates .entry_ref(path.as_ref()) - .or_insert_with(crossbeam_channel::unbounded); - gates.0.send(()).unwrap(); + .or_insert_with(async_channel::unbounded); + // Should never fail as these channels are always initialized as unbounded. + gates.0.try_send(()).unwrap(); } } @@ -60,10 +61,11 @@ impl AssetReader for GatedReader { let mut gates = self.gates.write(); let gates = gates .entry_ref(path.as_ref()) - .or_insert_with(crossbeam_channel::unbounded); + .or_insert_with(async_channel::unbounded); gates.1.clone() }; - receiver.recv().unwrap(); + // Should never fail as these channels are always initialized as unbounded and never closed. + receiver.recv().await.unwrap(); let result = self.reader.read(path).await?; Ok(result) } diff --git a/crates/bevy_asset/src/io/source.rs b/crates/bevy_asset/src/io/source.rs index cd42d31f01de1..eea19b4079776 100644 --- a/crates/bevy_asset/src/io/source.rs +++ b/crates/bevy_asset/src/io/source.rs @@ -1,6 +1,7 @@ use crate::{ io::{processor_gated::ProcessorGatedReader, AssetSourceEvent, AssetWatcher}, processor::AssetProcessorData, + utils::{EventQueue, EventReceiver, EventSender}, }; use bevy_ecs::system::Resource; use bevy_utils::tracing::{error, warn}; @@ -117,9 +118,7 @@ pub struct AssetSourceBuilder { pub writer: Option Option> + Send + Sync>>, pub watcher: Option< Box< - dyn FnMut(crossbeam_channel::Sender) -> Option> - + Send - + Sync, + dyn FnMut(EventSender) -> Option> + Send + Sync, >, >, pub processed_reader: Option Box + Send + Sync>>, @@ -127,9 +126,7 @@ pub struct AssetSourceBuilder { Option Option> + Send + Sync>>, pub processed_watcher: Option< Box< - dyn FnMut(crossbeam_channel::Sender) -> Option> - + Send - + Sync, + dyn FnMut(EventSender) -> Option> + Send + Sync, >, >, pub watch_warning: Option<&'static str>, @@ -161,11 +158,11 @@ impl AssetSourceBuilder { }; if watch { - let (sender, receiver) = crossbeam_channel::unbounded(); - match self.watcher.as_mut().and_then(|w| w(sender)) { + let queue = EventQueue::new(); + match self.watcher.as_mut().and_then(|w| w(queue.sender())) { Some(w) => { source.watcher = Some(w); - source.event_receiver = Some(receiver); + source.event_receiver = Some(queue.receiver()); } None => { if let Some(warning) = self.watch_warning { @@ -176,11 +173,15 @@ impl AssetSourceBuilder { } if watch_processed { - let (sender, receiver) = crossbeam_channel::unbounded(); - match self.processed_watcher.as_mut().and_then(|w| w(sender)) { + let queue = EventQueue::new(); + match self + .processed_watcher + .as_mut() + .and_then(|w| w(queue.sender())) + { Some(w) => { source.processed_watcher = Some(w); - source.processed_event_receiver = Some(receiver); + source.processed_event_receiver = Some(queue.receiver()); } None => { if let Some(warning) = self.processed_watch_warning { @@ -213,7 +214,7 @@ impl AssetSourceBuilder { /// Will use the given `watcher` function to construct unprocessed [`AssetWatcher`] instances. pub fn with_watcher( mut self, - watcher: impl FnMut(crossbeam_channel::Sender) -> Option> + watcher: impl FnMut(EventSender) -> Option> + Send + Sync + 'static, @@ -243,7 +244,7 @@ impl AssetSourceBuilder { /// Will use the given `watcher` function to construct processed [`AssetWatcher`] instances. pub fn with_processed_watcher( mut self, - watcher: impl FnMut(crossbeam_channel::Sender) -> Option> + watcher: impl FnMut(EventSender) -> Option> + Send + Sync + 'static, @@ -364,8 +365,8 @@ pub struct AssetSource { processed_writer: Option>, watcher: Option>, processed_watcher: Option>, - event_receiver: Option>, - processed_event_receiver: Option>, + event_receiver: Option>, + processed_event_receiver: Option>, } impl AssetSource { @@ -416,15 +417,13 @@ impl AssetSource { /// Return's this source's unprocessed event receiver, if the source is currently watching for changes. #[inline] - pub fn event_receiver(&self) -> Option<&crossbeam_channel::Receiver> { + pub(crate) fn event_receiver(&self) -> Option<&EventReceiver> { self.event_receiver.as_ref() } /// Return's this source's processed event receiver, if the source is currently watching for changes. #[inline] - pub fn processed_event_receiver( - &self, - ) -> Option<&crossbeam_channel::Receiver> { + pub(crate) fn processed_event_receiver(&self) -> Option<&EventReceiver> { self.processed_event_receiver.as_ref() } @@ -484,10 +483,9 @@ impl AssetSource { pub fn get_default_watcher( path: String, file_debounce_wait_time: Duration, - ) -> impl FnMut(crossbeam_channel::Sender) -> Option> - + Send - + Sync { - move |sender: crossbeam_channel::Sender| { + ) -> impl FnMut(EventSender) -> Option> + Send + Sync + { + move |sender: EventSender| { #[cfg(all( feature = "file_watcher", not(target_arch = "wasm32"), diff --git a/crates/bevy_asset/src/lib.rs b/crates/bevy_asset/src/lib.rs index 4b3be75ecd80c..62b9d9e3d8a11 100644 --- a/crates/bevy_asset/src/lib.rs +++ b/crates/bevy_asset/src/lib.rs @@ -30,6 +30,7 @@ mod loader; mod path; mod reflect; mod server; +mod utils; pub use assets::*; pub use bevy_asset_macros::Asset; @@ -43,6 +44,7 @@ pub use loader::*; pub use path::*; pub use reflect::*; pub use server::*; +pub use utils::*; /// Rusty Object Notation, a crate used to serialize and deserialize bevy assets. pub use ron; diff --git a/crates/bevy_asset/src/server/info.rs b/crates/bevy_asset/src/server/info.rs index 79cdce9721f10..f63a72654b6f3 100644 --- a/crates/bevy_asset/src/server/info.rs +++ b/crates/bevy_asset/src/server/info.rs @@ -7,7 +7,7 @@ use crate::{ use bevy_ecs::world::World; use bevy_utils::tracing::warn; use bevy_utils::{Entry, HashMap, HashSet, TypeIdMap}; -use crossbeam_channel::Sender; +use concurrent_queue::ConcurrentQueue; use std::{ any::TypeId, sync::{Arc, Weak}, @@ -375,7 +375,7 @@ impl AssetInfos { loaded_asset_id: UntypedAssetId, loaded_asset: ErasedLoadedAsset, world: &mut World, - sender: &Sender, + sender: &ConcurrentQueue, ) { loaded_asset.value.insert(loaded_asset_id, world); let mut loading_deps = loaded_asset.dependencies; @@ -435,10 +435,10 @@ impl AssetInfos { let rec_dep_load_state = match (loading_rec_deps.len(), failed_rec_deps.len()) { (0, 0) => { sender - .send(InternalAssetEvent::LoadedWithDependencies { + .push(InternalAssetEvent::LoadedWithDependencies { id: loaded_asset_id, }) - .unwrap(); + .unwrap_or_else(|_| panic!("Failed to push internal asset event.")); RecursiveDependencyLoadState::Loaded } (_loading, 0) => RecursiveDependencyLoadState::Loading, @@ -529,7 +529,7 @@ impl AssetInfos { infos: &mut AssetInfos, loaded_id: UntypedAssetId, waiting_id: UntypedAssetId, - sender: &Sender, + sender: &ConcurrentQueue, ) { let dependants_waiting_on_rec_load = if let Some(info) = infos.get_mut(waiting_id) { info.loading_rec_dependencies.remove(&loaded_id); @@ -537,8 +537,8 @@ impl AssetInfos { info.rec_dep_load_state = RecursiveDependencyLoadState::Loaded; if info.load_state == LoadState::Loaded { sender - .send(InternalAssetEvent::LoadedWithDependencies { id: waiting_id }) - .unwrap(); + .push(InternalAssetEvent::LoadedWithDependencies { id: waiting_id }) + .unwrap_or_else(|_| panic!("Failed to push internal asset event.")); } Some(std::mem::take( &mut info.dependants_waiting_on_recursive_dep_load, @@ -690,7 +690,7 @@ impl AssetInfos { /// [`Assets`]: crate::Assets pub(crate) fn consume_handle_drop_events(&mut self) { for provider in self.handle_providers.values() { - while let Ok(drop_event) = provider.drop_receiver.try_recv() { + for drop_event in provider.drop_queue.try_iter() { let id = drop_event.id; if drop_event.asset_server_managed { Self::process_handle_drop_internal( diff --git a/crates/bevy_asset/src/server/mod.rs b/crates/bevy_asset/src/server/mod.rs index 0e4ae6aab1f78..eeb664bbc1ee6 100644 --- a/crates/bevy_asset/src/server/mod.rs +++ b/crates/bevy_asset/src/server/mod.rs @@ -21,7 +21,7 @@ use bevy_ecs::prelude::*; use bevy_tasks::IoTaskPool; use bevy_utils::tracing::{error, info}; use bevy_utils::{CowArc, HashSet}; -use crossbeam_channel::{Receiver, Sender}; +use concurrent_queue::ConcurrentQueue; use futures_lite::StreamExt; use info::*; use loaders::*; @@ -57,8 +57,7 @@ pub struct AssetServer { pub(crate) struct AssetServerData { pub(crate) infos: RwLock, pub(crate) loaders: Arc>, - asset_event_sender: Sender, - asset_event_receiver: Receiver, + asset_event_queue: ConcurrentQueue, sources: AssetSources, mode: AssetServerMode, meta_check: AssetMetaCheck, @@ -110,7 +109,6 @@ impl AssetServer { meta_check: AssetMetaCheck, watching_for_changes: bool, ) -> Self { - let (asset_event_sender, asset_event_receiver) = crossbeam_channel::unbounded(); let mut infos = AssetInfos::default(); infos.watching_for_changes = watching_for_changes; Self { @@ -118,8 +116,7 @@ impl AssetServer { sources, mode, meta_check, - asset_event_sender, - asset_event_receiver, + asset_event_queue: ConcurrentQueue::unbounded(), loaders, infos: RwLock::new(infos), }), @@ -742,7 +739,10 @@ impl AssetServer { } fn send_asset_event(&self, event: InternalAssetEvent) { - self.data.asset_event_sender.send(event).unwrap(); + self.data + .asset_event_queue + .push(event) + .unwrap_or_else(|_| panic!("Failed to push internal asset event.")); } /// Retrieves all loads states for the given asset id. @@ -1059,14 +1059,14 @@ pub fn handle_internal_asset_events(world: &mut World) { world.resource_scope(|world, server: Mut| { let mut infos = server.data.infos.write(); let mut untyped_failures = vec![]; - for event in server.data.asset_event_receiver.try_iter() { + for event in server.data.asset_event_queue.try_iter() { match event { InternalAssetEvent::Loaded { id, loaded_asset } => { infos.process_asset_load( id, loaded_asset, world, - &server.data.asset_event_sender, + &server.data.asset_event_queue, ); } InternalAssetEvent::LoadedWithDependencies { id } => { diff --git a/crates/bevy_asset/src/utils.rs b/crates/bevy_asset/src/utils.rs new file mode 100644 index 0000000000000..676c1f251ef96 --- /dev/null +++ b/crates/bevy_asset/src/utils.rs @@ -0,0 +1,72 @@ +use concurrent_queue::ConcurrentQueue; +use std::sync::Arc; + +pub(crate) struct EventQueue(Arc>); + +impl EventQueue { + pub fn new() -> Self { + Self(Arc::new(ConcurrentQueue::unbounded())) + } + + pub fn sender(&self) -> EventSender { + EventSender(self.0.clone()) + } + + pub fn receiver(&self) -> EventReceiver { + EventReceiver(self.0.clone()) + } + + pub fn send(&self, value: T) { + self.0 + .push(value) + .unwrap_or_else(|_| panic!("Failed to send value.")); + } + + pub fn try_recv(&self) -> Result { + self.0.pop() + } + + pub fn try_iter(&self) -> concurrent_queue::TryIter { + self.0.try_iter() + } +} + +impl Clone for EventQueue { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +/// A strictly non-blocking sender for a multi-producer multi-consumer channel. +#[derive(Debug)] +pub struct EventSender(Arc>); + +impl EventSender { + pub fn send(&self, value: T) { + self.0 + .push(value) + .unwrap_or_else(|_| panic!("Failed to send value.")); + } +} + +impl Clone for EventSender { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +/// A strictly non-blocking receiver for a multi-producer multi-consumer channel. +#[derive(Debug)] +pub(crate) struct EventReceiver(Arc>); + +impl EventReceiver { + pub fn try_iter(&self) -> concurrent_queue::TryIter { + self.0.try_iter() + } +} + +impl Clone for EventReceiver { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} diff --git a/crates/bevy_render/src/lib.rs b/crates/bevy_render/src/lib.rs index 822a77b2e6278..ddbc8cb4d4406 100644 --- a/crates/bevy_render/src/lib.rs +++ b/crates/bevy_render/src/lib.rs @@ -468,9 +468,9 @@ unsafe fn initialize_render_app(app: &mut App) { extract(main_world, render_world); }); - let (sender, receiver) = bevy_time::create_time_channels(); - render_app.insert_resource(sender); - app.insert_resource(receiver); + let event_queue = bevy_time::create_time_event_queue(); + render_app.insert_resource(event_queue.clone()); + app.insert_resource(event_queue); app.insert_sub_app(RenderApp, render_app); } diff --git a/crates/bevy_render/src/renderer/mod.rs b/crates/bevy_render/src/renderer/mod.rs index 23ad14c136df5..322704dda2f80 100644 --- a/crates/bevy_render/src/renderer/mod.rs +++ b/crates/bevy_render/src/renderer/mod.rs @@ -16,7 +16,7 @@ use crate::{ view::{ExtractedWindows, ViewTarget}, }; use bevy_ecs::{prelude::*, system::SystemState}; -use bevy_time::TimeSender; +use bevy_time::TimeEventQueue; use bevy_utils::Instant; use std::sync::Arc; use wgpu::{ @@ -104,16 +104,10 @@ pub fn render_system(world: &mut World, state: &mut SystemState(); - if let Err(error) = time_sender.0.try_send(Instant::now()) { - match error { - bevy_time::TrySendError::Full(_) => { - panic!("The TimeSender channel should always be empty during render. You might need to add the bevy::core::time_system to your app.",); - } - bevy_time::TrySendError::Disconnected(_) => { - // ignore disconnected errors, the main world probably just got dropped during shutdown - } - } + let time_sender = world.resource::(); + // ignore disconnected errors, the main world probably just got dropped during shutdown + if let Err(bevy_time::PushError::Full(_)) = time_sender.0.push(Instant::now()) { + panic!("The TimeEventQueue should always be empty during render. You might need to add the bevy::core::time_system to your app.",); } } diff --git a/crates/bevy_time/Cargo.toml b/crates/bevy_time/Cargo.toml index a2ea432d86e1f..74c6398adf718 100644 --- a/crates/bevy_time/Cargo.toml +++ b/crates/bevy_time/Cargo.toml @@ -24,7 +24,7 @@ bevy_reflect = { path = "../bevy_reflect", version = "0.14.0-dev", features = [ bevy_utils = { path = "../bevy_utils", version = "0.14.0-dev" } # other -crossbeam-channel = "0.5.0" +concurrent-queue = "2.4" serde = { version = "1", features = ["derive"], optional = true } thiserror = "1.0" diff --git a/crates/bevy_time/src/lib.rs b/crates/bevy_time/src/lib.rs index 912a600bb237e..7c721d5ff30cf 100644 --- a/crates/bevy_time/src/lib.rs +++ b/crates/bevy_time/src/lib.rs @@ -33,8 +33,9 @@ use bevy_app::{prelude::*, RunFixedMainLoop}; use bevy_ecs::event::signal_event_update_system; use bevy_ecs::prelude::*; use bevy_utils::{tracing::warn, Duration, Instant}; -pub use crossbeam_channel::TrySendError; -use crossbeam_channel::{Receiver, Sender}; +use concurrent_queue::ConcurrentQueue; +pub use concurrent_queue::PushError; +use std::sync::Arc; /// Adds time functionality to Apps. #[derive(Default)] @@ -84,20 +85,15 @@ pub enum TimeUpdateStrategy { ManualDuration(Duration), } -/// Channel resource used to receive time from the render world. -#[derive(Resource)] -pub struct TimeReceiver(pub Receiver); - -/// Channel resource used to send time from the render world. -#[derive(Resource)] -pub struct TimeSender(pub Sender); +/// Queue resource used to receive time from the render world. +#[derive(Resource, Clone)] +pub struct TimeEventQueue(pub Arc>); /// Creates channels used for sending time between the render world and the main world. -pub fn create_time_channels() -> (TimeSender, TimeReceiver) { +pub fn create_time_event_queue() -> TimeEventQueue { // bound the channel to 2 since when pipelined the render phase can finish before // the time system runs. - let (s, r) = crossbeam_channel::bounded::(2); - (TimeSender(s), TimeReceiver(r)) + TimeEventQueue(Arc::new(ConcurrentQueue::bounded(2))) } /// The system used to update the [`Time`] used by app logic. If there is a render world the time is @@ -107,12 +103,12 @@ fn time_system( mut virtual_time: ResMut>, mut time: ResMut