Skip to content

Commit

Permalink
Use mesh::Sender as mpsc sender everywhere (#856)
Browse files Browse the repository at this point in the history
Replace uses of `MpscSender` and `Arc<Sender>` with just `Sender`, since
`Sender` now implements `Clone` directly.
  • Loading branch information
jstarks authored Feb 14, 2025
1 parent fabee0a commit f98aebb
Show file tree
Hide file tree
Showing 41 changed files with 88 additions and 114 deletions.
4 changes: 2 additions & 2 deletions hyperv/tools/hypestv/src/windows/completions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use rustyline::Validator;

#[derive(Helper, Highlighter, Hinter, Validator)]
pub(crate) struct OpenvmmRustylineEditor {
pub(crate) req: std::sync::Arc<mesh::Sender<super::Request>>,
pub(crate) req: mesh::Sender<super::Request>,
}

impl rustyline::completion::Completer for OpenvmmRustylineEditor {
Expand Down Expand Up @@ -69,7 +69,7 @@ impl clap_dyn_complete::CustomCompleterFactory for &OpenvmmRustylineEditor {
}

pub struct OpenvmmComplete {
req: std::sync::Arc<mesh::Sender<super::Request>>,
req: mesh::Sender<super::Request>,
}

impl clap_dyn_complete::CustomCompleter for OpenvmmComplete {
Expand Down
2 changes: 0 additions & 2 deletions hyperv/tools/hypestv/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use pal_async::DefaultDriver;
use rustyline_printer::Printer;
use std::fmt::Display;
use std::path::PathBuf;
use std::sync::Arc;
use vm::Vm;

#[derive(Parser)]
Expand Down Expand Up @@ -209,7 +208,6 @@ pub async fn main(driver: DefaultDriver) -> anyhow::Result<()> {
};

let (send, mut recv) = mesh::channel();
let send = Arc::new(send);

rl.set_helper(Some(completions::OpenvmmRustylineEditor {
req: send.clone(),
Expand Down
3 changes: 1 addition & 2 deletions openhcl/underhill_core/src/emuplat/framebuffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT License.

use std::convert::Infallible;
use std::sync::Arc;
use video_core::FramebufferControl;
use video_core::FramebufferFormat;
use video_core::ResolvedFramebuffer;
Expand All @@ -13,7 +12,7 @@ use vm_resource::ResolveResource;
#[derive(Clone)]
pub struct FramebufferRemoteControl {
pub get: guest_emulation_transport::GuestEmulationTransportClient,
pub format_send: Arc<mesh::Sender<FramebufferFormat>>,
pub format_send: mesh::Sender<FramebufferFormat>,
}

#[async_trait::async_trait]
Expand Down
12 changes: 3 additions & 9 deletions openhcl/underhill_core/src/inspect_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ use inspect::SensitivityLevel;
use mesh::Sender;
use pal_async::task::Spawn;
use pal_async::DefaultDriver;
use std::sync::Arc;

pub(crate) fn inspect_internal_diagnostics(
req: Request<'_>,
reinspect: Arc<Sender<Deferred>>,
reinspect: Sender<Deferred>,
driver: DefaultDriver,
) {
req.respond()
Expand All @@ -54,7 +53,7 @@ pub(crate) fn inspect_internal_diagnostics(
});
}

fn net(req: Request<'_>, reinspect: Arc<Sender<Deferred>>, driver: DefaultDriver) {
fn net(req: Request<'_>, reinspect: Sender<Deferred>, driver: DefaultDriver) {
let defer = req.defer();
let driver2 = driver.clone();
driver
Expand Down Expand Up @@ -94,12 +93,7 @@ fn net(req: Request<'_>, reinspect: Arc<Sender<Deferred>>, driver: DefaultDriver

// net/mac_address
// Format for mac address is no separators, lowercase letters, e.g. 00155d121212.
fn net_nic(
req: Request<'_>,
name: String,
reinspect: Arc<Sender<Deferred>>,
driver: DefaultDriver,
) {
fn net_nic(req: Request<'_>, name: String, reinspect: Sender<Deferred>, driver: DefaultDriver) {
let defer = req.defer();
driver
.spawn("inspect-diagnostics-net-nic", async move {
Expand Down
6 changes: 2 additions & 4 deletions openhcl/underhill_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ use pal_async::DefaultPool;
use profiler_worker::ProfilerWorker;
#[cfg(feature = "profiler")]
use profiler_worker::ProfilerWorkerParameters;
use std::sync::Arc;
use std::time::Duration;
use vmsocket::VmAddress;
use vmsocket::VmListener;
Expand Down Expand Up @@ -107,7 +106,7 @@ fn new_underhill_remote_console_cfg(
synth_keyboard: true,
synth_mouse: true,
synth_video: true,
input: mesh::MpscReceiver::new(),
input: mesh::Receiver::new(),
framebuffer: Some(fb),
},
Some(fba),
Expand All @@ -118,7 +117,7 @@ fn new_underhill_remote_console_cfg(
synth_keyboard: false,
synth_mouse: false,
synth_video: false,
input: mesh::MpscReceiver::new(),
input: mesh::Receiver::new(),
framebuffer: None,
},
None,
Expand Down Expand Up @@ -450,7 +449,6 @@ async fn run_control(
let mut diag = DiagState::new().await?;

let (diag_reinspect_send, mut diag_reinspect_recv) = mesh::channel();
let diag_reinspect_send = Arc::new(diag_reinspect_send);
#[cfg(feature = "profiler")]
let mut profiler_host = None;
let mut state;
Expand Down
7 changes: 2 additions & 5 deletions openhcl/underhill_core/src/nvme_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use pal_async::task::Spawn;
use pal_async::task::Task;
use std::collections::hash_map;
use std::collections::HashMap;
use std::sync::Arc;
use thiserror::Error;
use tracing::Instrument;
use user_driver::vfio::VfioDevice;
Expand Down Expand Up @@ -113,9 +112,7 @@ impl NvmeManager {
});
Self {
task,
client: NvmeManagerClient {
sender: Arc::new(send),
},
client: NvmeManagerClient { sender: send },
save_restore_supported,
}
}
Expand Down Expand Up @@ -177,7 +174,7 @@ enum Request {

#[derive(Debug, Clone)]
pub struct NvmeManagerClient {
sender: Arc<mesh::Sender<Request>>,
sender: mesh::Sender<Request>,
}

impl NvmeManagerClient {
Expand Down
6 changes: 3 additions & 3 deletions openhcl/underhill_core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ pub struct UnderhillRemoteConsoleCfg {
pub synth_keyboard: bool,
pub synth_mouse: bool,
pub synth_video: bool,
pub input: mesh::MpscReceiver<InputData>,
pub input: mesh::Receiver<InputData>,
pub framebuffer: Option<framebuffer::Framebuffer>,
}

Expand Down Expand Up @@ -427,7 +427,7 @@ impl Worker for UnderhillVmWorker {
synth_keyboard: false,
synth_mouse: false,
synth_video: false,
input: mesh::MpscReceiver::new(),
input: mesh::Receiver::new(),
framebuffer: None,
},
debugger_rpc: None,
Expand Down Expand Up @@ -2863,7 +2863,7 @@ async fn new_underhill_vm(
if let Some(framebuffer) = remote_console_cfg.framebuffer {
resolver.add_resolver(FramebufferRemoteControl {
get: get_client.clone(),
format_send: Arc::new(framebuffer.format_send()),
format_send: framebuffer.format_send(),
});

vmbus_device_handles.push(
Expand Down
6 changes: 3 additions & 3 deletions openvmm/hvlite_core/src/emuplat/firmware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ use firmware_uefi::platform::logger::UefiEvent;
use firmware_uefi::platform::logger::UefiLogger;
use get_resources::ged::FirmwareEvent;

/// Forwards UEFI and PCAT events to via the provided [`mesh::MpscSender`].
/// Forwards UEFI and PCAT events to via the provided [`mesh::Sender`].
#[derive(Debug)]
pub struct MeshLogger {
sender: Option<mesh::MpscSender<FirmwareEvent>>,
sender: Option<mesh::Sender<FirmwareEvent>>,
}

impl MeshLogger {
pub fn new(sender: Option<mesh::MpscSender<FirmwareEvent>>) -> Self {
pub fn new(sender: Option<mesh::Sender<FirmwareEvent>>) -> Self {
Self { sender }
}

Expand Down
6 changes: 3 additions & 3 deletions openvmm/hvlite_core/src/worker/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ pub struct Manifest {
chipset: BaseChipsetManifest,
#[cfg(windows)]
kernel_vmnics: Vec<hvlite_defs::config::KernelVmNicConfig>,
input: mesh::MpscReceiver<InputData>,
input: mesh::Receiver<InputData>,
framebuffer: Option<framebuffer::Framebuffer>,
vga_firmware: Option<RomFileLocation>,
vtl2_gfx: bool,
Expand All @@ -226,7 +226,7 @@ pub struct Manifest {
vmgs_disk: Option<Resource<DiskHandleKind>>,
secure_boot_enabled: bool,
custom_uefi_vars: firmware_uefi_custom_vars::CustomVars,
firmware_event_send: Option<mesh::MpscSender<get_resources::ged::FirmwareEvent>>,
firmware_event_send: Option<mesh::Sender<get_resources::ged::FirmwareEvent>>,
debugger_rpc: Option<mesh::Receiver<vmm_core_defs::debug_rpc::DebugRequest>>,
vmbus_devices: Vec<(DeviceVtl, Resource<VmbusDeviceHandleKind>)>,
chipset_devices: Vec<ChipsetDeviceHandle>,
Expand Down Expand Up @@ -530,7 +530,7 @@ struct LoadedVmInner {
/// ((device, function), interrupt)
#[cfg_attr(not(guest_arch = "x86_64"), allow(dead_code))]
pci_legacy_interrupts: Vec<((u8, Option<u8>), u32)>,
firmware_event_send: Option<mesh::MpscSender<get_resources::ged::FirmwareEvent>>,
firmware_event_send: Option<mesh::Sender<get_resources::ged::FirmwareEvent>>,

load_mode: LoadMode,
igvm_file: Option<IgvmFile>,
Expand Down
4 changes: 2 additions & 2 deletions openvmm/hvlite_defs/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct Config {
pub vtl2_vmbus: Option<VmbusConfig>,
#[cfg(windows)]
pub kernel_vmnics: Vec<KernelVmNicConfig>,
pub input: mesh::MpscReceiver<InputData>,
pub input: mesh::Receiver<InputData>,
pub framebuffer: Option<framebuffer::Framebuffer>,
pub vga_firmware: Option<RomFileLocation>,
pub vtl2_gfx: bool,
Expand All @@ -48,7 +48,7 @@ pub struct Config {
pub secure_boot_enabled: bool,
pub custom_uefi_vars: firmware_uefi_custom_vars::CustomVars,
// TODO: move FirmwareEvent somewhere not GED-specific.
pub firmware_event_send: Option<mesh::MpscSender<get_resources::ged::FirmwareEvent>>,
pub firmware_event_send: Option<mesh::Sender<get_resources::ged::FirmwareEvent>>,
pub debugger_rpc: Option<mesh::Receiver<vmm_core_defs::debug_rpc::DebugRequest>>,
pub vmbus_devices: Vec<(DeviceVtl, Resource<VmbusDeviceHandleKind>)>,
pub chipset_devices: Vec<ChipsetDeviceHandle>,
Expand Down
4 changes: 2 additions & 2 deletions openvmm/membacking/src/mapping_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl MappingManager {
/// Provides access to the mapping manager.
#[derive(Debug, MeshPayload, Clone)]
pub struct MappingManagerClient {
req_send: mesh::MpscSender<MappingRequest>,
req_send: mesh::Sender<MappingRequest>,
id: ObjectId,
max_addr: u64,
}
Expand Down Expand Up @@ -224,7 +224,7 @@ impl MappingManagerTask {
}
}

async fn run(&mut self, req_recv: &mut mesh::MpscReceiver<MappingRequest>) {
async fn run(&mut self, req_recv: &mut mesh::Receiver<MappingRequest>) {
while let Some(req) = req_recv.next().await {
match req {
MappingRequest::AddMapper(rpc) => rpc.handle_sync(|send| self.add_mapper(send)),
Expand Down
4 changes: 2 additions & 2 deletions openvmm/membacking/src/mapping_manager/va_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl std::fmt::Debug for VaMapper {
struct MapperInner {
mapping: SparseMapping,
waiters: Mutex<Option<Vec<MapWaiter>>>,
req_send: mesh::MpscSender<MappingRequest>,
req_send: mesh::Sender<MappingRequest>,
id: MapperId,
}

Expand Down Expand Up @@ -191,7 +191,7 @@ impl MapperInner {

impl VaMapper {
pub(crate) async fn new(
req_send: mesh::MpscSender<MappingRequest>,
req_send: mesh::Sender<MappingRequest>,
len: u64,
remote_process: Option<RemoteProcess>,
) -> Result<Self, VaMapperError> {
Expand Down
6 changes: 3 additions & 3 deletions openvmm/membacking/src/region_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Inspect for RegionManager {
/// Provides access to the region manager.
#[derive(Debug, MeshPayload, Clone)]
pub struct RegionManagerClient {
req_send: mesh::MpscSender<RegionRequest>,
req_send: mesh::Sender<RegionRequest>,
}

struct Region {
Expand Down Expand Up @@ -182,7 +182,7 @@ impl RegionManagerTask {
}
}

async fn run(&mut self, req_recv: &mut mesh::MpscReceiver<RegionRequest>) {
async fn run(&mut self, req_recv: &mut mesh::Receiver<RegionRequest>) {
while let Some(req) = req_recv.next().await {
match req {
RegionRequest::AddMapping(rpc) => {
Expand Down Expand Up @@ -559,7 +559,7 @@ impl RegionManagerClient {
#[must_use]
pub struct RegionHandle {
id: Option<RegionId>,
req_send: mesh::MpscSender<RegionRequest>,
req_send: mesh::Sender<RegionRequest>,
}

impl RegionHandle {
Expand Down
5 changes: 2 additions & 3 deletions openvmm/openvmm_entry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1283,7 +1283,7 @@ fn vm_config_from_command_line(
},
#[cfg(windows)]
kernel_vmnics,
input: mesh::MpscReceiver::new(),
input: mesh::Receiver::new(),
framebuffer,
vga_firmware,
vtl2_gfx: opt.vtl2_gfx,
Expand Down Expand Up @@ -1959,7 +1959,6 @@ async fn run_control(driver: &DefaultDriver, mesh: &VmmMesh, opt: Options) -> an

// spin up the VM
let (vm_rpc, rpc_recv) = mesh::channel();
let vm_rpc = Arc::new(vm_rpc);
let (notify_send, notify_recv) = mesh::channel();
let mut vm_worker = {
let vm_host = mesh.make_host("vm", opt.log_file.clone()).await?;
Expand Down Expand Up @@ -2831,7 +2830,7 @@ async fn run_control(driver: &DefaultDriver, mesh: &VmmMesh, opt: Options) -> an

struct DiagDialer {
driver: DefaultDriver,
vm_rpc: Arc<mesh::Sender<VmRpc>>,
vm_rpc: mesh::Sender<VmRpc>,
openhcl_vtl: DeviceVtl,
}

Expand Down
2 changes: 1 addition & 1 deletion openvmm/openvmm_entry/src/ttrpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ impl VmService {
},
#[cfg(windows)]
kernel_vmnics: vec![],
input: mesh::MpscReceiver::new(),
input: mesh::Receiver::new(),
framebuffer: None,
vga_firmware: None,
vtl2_gfx: false,
Expand Down
5 changes: 2 additions & 3 deletions petri/pipette/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use pipette_protocol::DiagnosticFile;
use pipette_protocol::PipetteBootstrap;
use pipette_protocol::PipetteRequest;
use socket2::Socket;
use std::sync::Arc;
use std::time::Duration;
use unicycle::FuturesUnordered;
use vmsocket::VmAddress;
Expand All @@ -33,7 +32,7 @@ pub struct Agent {

#[allow(dead_code)] // Not used on all platforms yet
#[derive(Clone)]
pub struct DiagnosticSender(Arc<mesh::Sender<DiagnosticFile>>);
pub struct DiagnosticSender(mesh::Sender<DiagnosticFile>);

impl Agent {
pub async fn new(driver: DefaultDriver) -> anyhow::Result<Self> {
Expand Down Expand Up @@ -74,7 +73,7 @@ impl Agent {
driver,
mesh,
request_recv,
diag_file_send: DiagnosticSender(Arc::new(diag_file_send)),
diag_file_send: DiagnosticSender(diag_file_send),
watch_send,
})
}
Expand Down
6 changes: 2 additions & 4 deletions petri/src/vm/openvmm/construct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ use serial_socket::net::OpenSocketSerialConfig;
use sparse_mmap::alloc_shared_memory;
use std::fmt::Write as _;
use std::path::PathBuf;
use std::sync::Arc;
use storvsp_resources::ScsiControllerHandle;
use storvsp_resources::ScsiDeviceAndPath;
use storvsp_resources::ScsiPath;
Expand Down Expand Up @@ -170,7 +169,6 @@ impl PetriVmConfigOpenVmm {
framebuffer.is_some(),
)?;
let (vtl2_vsock_listener, vtl2_vsock_path) = make_vsock_listener()?;
let ged_send = Arc::new(ged_send);
(
Some(Vtl2Config {
vtl0_alias_map: false, // TODO: enable when OpenVMM supports it for DMA
Expand Down Expand Up @@ -326,7 +324,7 @@ impl PetriVmConfigOpenVmm {
// Disabled for VMM tests by default
#[cfg(windows)]
kernel_vmnics: vec![],
input: mesh::MpscReceiver::new(),
input: mesh::Receiver::new(),
vtl2_gfx: false,
virtio_console_pci: false,
virtio_serial: None,
Expand Down Expand Up @@ -793,7 +791,7 @@ impl PetriVmConfigSetupCore<'_> {
&self,
serial: &mut [Option<Resource<SerialBackendHandle>>],
devices: &mut impl Extend<Device>,
firmware_event_send: &mesh::MpscSender<FirmwareEvent>,
firmware_event_send: &mesh::Sender<FirmwareEvent>,
framebuffer: bool,
) -> anyhow::Result<(
get_resources::ged::GuestEmulationDeviceHandle,
Expand Down
Loading

0 comments on commit f98aebb

Please sign in to comment.