Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove crossbeam-channel as a runtime dependency #13048

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion crates/bevy_asset/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ bevy_utils = { path = "../bevy_utils", version = "0.14.0-dev" }
async-broadcast = "0.5"
async-fs = "2.0"
async-lock = "3.0"
crossbeam-channel = "0.5"
async-channel = "2.2"
concurrent-queue = "2.4"
downcast-rs = "1.2"
futures-io = "0.3"
futures-lite = "2.0.1"
Expand Down
28 changes: 11 additions & 17 deletions crates/bevy_asset/src/assets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use bevy_ecs::{
};
use bevy_reflect::{Reflect, TypePath};
use bevy_utils::HashMap;
use crossbeam_channel::{Receiver, Sender};
use concurrent_queue::ConcurrentQueue;
use serde::{Deserialize, Serialize};
use std::{
any::TypeId,
Expand Down Expand Up @@ -50,22 +50,16 @@ impl AssetIndex {
pub(crate) struct AssetIndexAllocator {
/// A monotonically increasing index.
next_index: AtomicU32,
recycled_queue_sender: Sender<AssetIndex>,
recycled_queue: ConcurrentQueue<AssetIndex>,
/// This receives every recycled [`AssetIndex`]. It serves as a buffer/queue to store indices ready for reuse.
recycled_queue_receiver: Receiver<AssetIndex>,
recycled_sender: Sender<AssetIndex>,
recycled_receiver: Receiver<AssetIndex>,
recycled: ConcurrentQueue<AssetIndex>,
Comment on lines 50 to +55
Copy link
Member

@joseph-gio joseph-gio Jul 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this refactor is 100% worth doing, regardless of whether we end up removing the other channels. Right here the queues have clear a clear owner so it only makes sense to remove the indirection incurred by the channels

}

impl Default for AssetIndexAllocator {
fn default() -> Self {
let (recycled_queue_sender, recycled_queue_receiver) = crossbeam_channel::unbounded();
let (recycled_sender, recycled_receiver) = crossbeam_channel::unbounded();
Self {
recycled_queue_sender,
recycled_queue_receiver,
recycled_sender,
recycled_receiver,
recycled_queue: ConcurrentQueue::unbounded(),
recycled: ConcurrentQueue::unbounded(),
next_index: Default::default(),
}
}
Expand All @@ -75,9 +69,9 @@ impl AssetIndexAllocator {
/// Reserves a new [`AssetIndex`], either by reusing a recycled index (with an incremented generation), or by creating a new index
/// by incrementing the index counter for a given asset type `A`.
pub fn reserve(&self) -> AssetIndex {
if let Ok(mut recycled) = self.recycled_queue_receiver.try_recv() {
if let Ok(mut recycled) = self.recycled_queue.pop() {
recycled.generation += 1;
self.recycled_sender.send(recycled).unwrap();
self.recycled.push(recycled).unwrap();
recycled
} else {
AssetIndex {
Expand All @@ -91,7 +85,7 @@ impl AssetIndexAllocator {

/// Queues the given `index` for reuse. This should only be done if the `index` is no longer being used.
pub fn recycle(&self, index: AssetIndex) {
self.recycled_queue_sender.send(index).unwrap();
self.recycled_queue.push(index).unwrap();
}
}

Expand Down Expand Up @@ -246,7 +240,7 @@ impl<A: Asset> DenseAssetStorage<A> {
value: None,
generation: 0,
});
while let Ok(recycled) = self.allocator.recycled_receiver.try_recv() {
for recycled in self.allocator.recycled.try_iter() {
let entry = &mut self.storage[recycled.index as usize];
*entry = Entry::Some {
value: None,
Expand Down Expand Up @@ -546,7 +540,7 @@ impl<A: Asset> Assets<A> {
// to other asset info operations
let mut infos = asset_server.data.infos.write();
let mut not_ready = Vec::new();
while let Ok(drop_event) = assets.handle_provider.drop_receiver.try_recv() {
while let Ok(drop_event) = assets.handle_provider.drop_queue.try_recv() {
let id = drop_event.id.typed();

if drop_event.asset_server_managed {
Expand All @@ -572,7 +566,7 @@ impl<A: Asset> Assets<A> {
// TODO: this is _extremely_ inefficient find a better fix
// This will also loop failed assets indefinitely. Is that ok?
for event in not_ready {
assets.handle_provider.drop_sender.send(event).unwrap();
assets.handle_provider.drop_queue.send(event);
}
}

Expand Down
21 changes: 9 additions & 12 deletions crates/bevy_asset/src/handle.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::{
meta::MetaTransform, Asset, AssetId, AssetIndexAllocator, AssetPath, InternalAssetId,
UntypedAssetId,
meta::MetaTransform,
utils::{EventQueue, EventSender},
Asset, AssetId, AssetIndexAllocator, AssetPath, InternalAssetId, UntypedAssetId,
};
use bevy_ecs::prelude::*;
use bevy_reflect::{std_traits::ReflectDefault, Reflect, TypePath};
use bevy_utils::get_short_name;
use crossbeam_channel::{Receiver, Sender};
use std::{
any::TypeId,
hash::{Hash, Hasher},
Expand All @@ -19,8 +19,7 @@ use uuid::Uuid;
#[derive(Clone)]
pub struct AssetHandleProvider {
pub(crate) allocator: Arc<AssetIndexAllocator>,
pub(crate) drop_sender: Sender<DropEvent>,
pub(crate) drop_receiver: Receiver<DropEvent>,
pub(crate) drop_queue: EventQueue<DropEvent>,
pub(crate) type_id: TypeId,
}

Expand All @@ -32,12 +31,10 @@ pub(crate) struct DropEvent {

impl AssetHandleProvider {
pub(crate) fn new(type_id: TypeId, allocator: Arc<AssetIndexAllocator>) -> Self {
let (drop_sender, drop_receiver) = crossbeam_channel::unbounded();
Self {
type_id,
allocator,
drop_sender,
drop_receiver,
drop_queue: EventQueue::new(),
}
}

Expand All @@ -57,7 +54,7 @@ impl AssetHandleProvider {
) -> Arc<StrongHandle> {
Arc::new(StrongHandle {
id: id.untyped(self.type_id),
drop_sender: self.drop_sender.clone(),
drop: self.drop_queue.sender(),
meta_transform,
path,
asset_server_managed,
Expand Down Expand Up @@ -91,12 +88,12 @@ pub struct StrongHandle {
/// 1. configuration tied to the lifetime of a specific asset load
/// 2. configuration that must be repeatable when the asset is hot-reloaded
pub(crate) meta_transform: Option<MetaTransform>,
pub(crate) drop_sender: Sender<DropEvent>,
pub(crate) drop: EventSender<DropEvent>,
}

impl Drop for StrongHandle {
fn drop(&mut self) {
let _ = self.drop_sender.send(DropEvent {
self.drop.send(DropEvent {
id: self.id.internal(),
asset_server_managed: self.asset_server_managed,
});
Expand All @@ -109,7 +106,7 @@ impl std::fmt::Debug for StrongHandle {
.field("id", &self.id)
.field("asset_server_managed", &self.asset_server_managed)
.field("path", &self.path)
.field("drop_sender", &self.drop_sender)
.field("drop_sender", &self.drop)
.finish()
}
}
Expand Down
17 changes: 10 additions & 7 deletions crates/bevy_asset/src/io/embedded/embedded_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::io::{
file::{get_asset_path, get_base_path, new_asset_event_debouncer, FilesystemEventHandler},
memory::Dir,
AssetSourceEvent, AssetWatcher,
use crate::{
io::{
file::{get_asset_path, get_base_path, new_asset_event_debouncer, FilesystemEventHandler},
memory::Dir,
AssetSourceEvent, AssetWatcher,
},
EventSender,
};
use bevy_utils::tracing::warn;
use bevy_utils::{Duration, HashMap};
Expand All @@ -26,7 +29,7 @@ impl EmbeddedWatcher {
pub fn new(
dir: Dir,
root_paths: Arc<RwLock<HashMap<Box<Path>, PathBuf>>>,
sender: crossbeam_channel::Sender<AssetSourceEvent>,
sender: EventSender<AssetSourceEvent>,
debounce_wait_time: Duration,
) -> Self {
let root = get_base_path();
Expand All @@ -48,7 +51,7 @@ impl AssetWatcher for EmbeddedWatcher {}
/// binary-embedded Rust source files. This will read the contents of changed files from the file system and overwrite
/// the initial static bytes from the file embedded in the binary.
pub(crate) struct EmbeddedEventHandler {
sender: crossbeam_channel::Sender<AssetSourceEvent>,
sender: EventSender<AssetSourceEvent>,
root_paths: Arc<RwLock<HashMap<Box<Path>, PathBuf>>>,
root: PathBuf,
dir: Dir,
Expand Down Expand Up @@ -82,7 +85,7 @@ impl FilesystemEventHandler for EmbeddedEventHandler {
}
}
self.last_event = Some(event.clone());
self.sender.send(event).unwrap();
self.sender.send(event);
}
}
}
8 changes: 4 additions & 4 deletions crates/bevy_asset/src/io/file/file_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::io::{AssetSourceEvent, AssetWatcher};
use crate::path::normalize_path;
use crate::EventSender;
use bevy_utils::tracing::error;
use bevy_utils::Duration;
use crossbeam_channel::Sender;
use notify_debouncer_full::{
new_debouncer,
notify::{
Expand All @@ -26,7 +26,7 @@ pub struct FileWatcher {
impl FileWatcher {
pub fn new(
root: PathBuf,
sender: Sender<AssetSourceEvent>,
sender: EventSender<AssetSourceEvent>,
debounce_wait_time: Duration,
) -> Result<Self, notify::Error> {
let root = normalize_path(super::get_base_path().join(root).as_path());
Expand Down Expand Up @@ -244,7 +244,7 @@ pub(crate) fn new_asset_event_debouncer(
}

pub(crate) struct FileEventHandler {
sender: Sender<AssetSourceEvent>,
sender: EventSender<AssetSourceEvent>,
root: PathBuf,
last_event: Option<AssetSourceEvent>,
}
Expand All @@ -260,7 +260,7 @@ impl FilesystemEventHandler for FileEventHandler {
fn handle(&mut self, _absolute_paths: &[PathBuf], event: AssetSourceEvent) {
if self.last_event.as_ref() != Some(&event) {
self.last_event = Some(event.clone());
self.sender.send(event).unwrap();
self.sender.send(event);
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions crates/bevy_asset/src/io/gated.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::io::{AssetReader, AssetReaderError, PathStream, Reader};
use async_channel::{Receiver, Sender};
use bevy_utils::HashMap;
use crossbeam_channel::{Receiver, Sender};
use parking_lot::RwLock;
use std::{path::Path, sync::Arc};

Expand Down Expand Up @@ -34,8 +34,9 @@ impl GateOpener {
let mut gates = self.gates.write();
let gates = gates
.entry_ref(path.as_ref())
.or_insert_with(crossbeam_channel::unbounded);
gates.0.send(()).unwrap();
.or_insert_with(async_channel::unbounded);
// Should never fail as these channels are always initialized as unbounded.
gates.0.try_send(()).unwrap();
}
}

Expand All @@ -60,10 +61,11 @@ impl<R: AssetReader> AssetReader for GatedReader<R> {
let mut gates = self.gates.write();
let gates = gates
.entry_ref(path.as_ref())
.or_insert_with(crossbeam_channel::unbounded);
.or_insert_with(async_channel::unbounded);
gates.1.clone()
};
receiver.recv().unwrap();
// Should never fail as these channels are always initialized as unbounded and never closed.
receiver.recv().await.unwrap();
let result = self.reader.read(path).await?;
Ok(result)
}
Expand Down
Loading