Skip to content

Commit

Permalink
net: add support for anonymous unix pipes
Browse files Browse the repository at this point in the history
  • Loading branch information
satakuma committed Nov 3, 2023
1 parent 65f861f commit e5507a0
Show file tree
Hide file tree
Showing 2 changed files with 274 additions and 27 deletions.
222 changes: 195 additions & 27 deletions tokio/src/net/unix/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready};
use mio::unix::pipe as mio_pipe;
use std::fs::File;
use std::io::{self, Read, Write};
use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
use std::os::fd::OwnedFd;
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
use std::path::Path;
use std::pin::Pin;
Expand All @@ -16,6 +17,33 @@ cfg_io_util! {
use bytes::BufMut;
}

/// Creates a new anonymous Unix pipe.
///
/// This function will open a new pipe and associate both pipe ends with the default
/// event loop.
///
/// If you need to create a pipe for communicating with a spawned process, you can
/// also see how to use `Stdio::piped()` with [`tokio::process`].
///
/// [`tokio::process`]: crate::process
///
/// # Errors
///
/// If creating a pipe fails, this function will return with the related OS error.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new() -> io::Result<(Sender, Receiver)> {
let (tx, rx) = mio_pipe::new()?;
Ok((Sender::from_mio(tx)?, Receiver::from_mio(rx)?))
}

/// Options and flags which can be used to configure how a FIFO file is opened.
///
/// This builder allows configuring how to create a pipe end from a FIFO file.
Expand Down Expand Up @@ -218,7 +246,7 @@ impl OpenOptions {

let file = options.open(path)?;

if !self.unchecked && !is_fifo(&file)? {
if !self.unchecked && !is_pipe(file.as_fd())? {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
}

Expand Down Expand Up @@ -338,15 +366,40 @@ impl Sender {
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_file(mut file: File) -> io::Result<Sender> {
if !is_fifo(&file)? {
pub fn from_file(file: File) -> io::Result<Sender> {
Sender::from_owned_fd(file.into())
}

/// Creates a new `Sender` from an [`OwnedFd`].
///
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
/// an anonymous pipe or a special FIFO file. It will check if the file descriptor
/// is a pipe and has write access, set it in non-blocking mode and perform the
/// conversion.
///
/// # Errors
///
/// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe
/// or it does not have write access. Also fails with any standard OS error if it
/// occurs.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Sender> {
if !is_pipe(owned_fd.as_fd())? {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
}

let flags = get_file_flags(&file)?;
let flags = get_file_flags(owned_fd.as_fd())?;
if has_write_access(flags) {
set_nonblocking(&mut file, flags)?;
Sender::from_file_unchecked(file)
set_nonblocking(owned_fd.as_fd(), flags)?;
Sender::from_owned_fd_unchecked(owned_fd)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
Expand Down Expand Up @@ -394,8 +447,28 @@ impl Sender {
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_file_unchecked(file: File) -> io::Result<Sender> {
let raw_fd = file.into_raw_fd();
let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(raw_fd) };
Sender::from_owned_fd_unchecked(file.into())
}

/// Creates a new `Sender` from an [`OwnedFd`] without checking pipe properties.
///
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
/// an anonymous pipe or a special FIFO file. The conversion assumes nothing about
/// the underlying pipe; it is left up to the user to make sure that the file
/// descriptor represents the writing end of a pipe and the pipe is set in
/// non-blocking mode.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Sender> {
// Safety: OwnedFd represents a valid, open file descriptor.
let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(owned_fd.into_raw_fd()) };
Sender::from_mio(mio_tx)
}

Expand Down Expand Up @@ -623,6 +696,21 @@ impl Sender {
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}

/// Converts the pipe into an [`OwnedFd`].
///
/// This function will deregister this pipe end from the event loop, set
/// it in blocking mode and perform the conversion.
pub fn into_owned_fd(self) -> io::Result<OwnedFd> {
let mio_pipe = self.io.into_inner()?;
set_blocking(&mio_pipe)?;

// Safety: the pipe is now deregistered from the event loop
// and we are the only owner of this pipe end.
let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };

Ok(owned_fd)
}
}

impl AsyncWrite for Sender {
Expand Down Expand Up @@ -764,15 +852,40 @@ impl Receiver {
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_file(mut file: File) -> io::Result<Receiver> {
if !is_fifo(&file)? {
pub fn from_file(file: File) -> io::Result<Receiver> {
Receiver::from_owned_fd(file.into())
}

/// Creates a new `Receiver` from an [`OwnedFd`].
///
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
/// an anonymous pipe or a special FIFO file. It will check if the file descriptor
/// is a pipe and has read access, set it in non-blocking mode and perform the
/// conversion.
///
/// # Errors
///
/// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe
/// or it does not have read access. Also fails with any standard OS error if it
/// occurs.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Receiver> {
if !is_pipe(owned_fd.as_fd())? {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
}

let flags = get_file_flags(&file)?;
let flags = get_file_flags(owned_fd.as_fd())?;
if has_read_access(flags) {
set_nonblocking(&mut file, flags)?;
Receiver::from_file_unchecked(file)
set_nonblocking(owned_fd.as_fd(), flags)?;
Receiver::from_owned_fd_unchecked(owned_fd)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
Expand Down Expand Up @@ -820,8 +933,28 @@ impl Receiver {
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_file_unchecked(file: File) -> io::Result<Receiver> {
let raw_fd = file.into_raw_fd();
let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(raw_fd) };
Receiver::from_owned_fd_unchecked(file.into())
}

/// Creates a new `Receiver` from an [`OwnedFd`] without checking pipe properties.
///
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
/// an anonymous pipe or a special FIFO file. The conversion assumes nothing about
/// the underlying pipe; it is left up to the user to make sure that the file
/// descriptor represents the reading end of a pipe and the pipe is set in
/// non-blocking mode.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Receiver> {
// Safety: OwnedFd represents a valid, open file descriptor.
let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(owned_fd.into_raw_fd()) };
Receiver::from_mio(mio_rx)
}

Expand Down Expand Up @@ -1146,6 +1279,21 @@ impl Receiver {
})
}
}

/// Converts the pipe into an [`OwnedFd`].
///
/// This function will deregister this pipe end from the event loop, set
/// it in blocking mode and perform the conversion.
pub fn into_owned_fd(self) -> io::Result<OwnedFd> {
let mio_pipe = self.io.into_inner()?;
set_blocking(&mio_pipe)?;

// Safety: the pipe is now deregistered from the event loop
// and we are the only owner of this pipe end.
let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };

Ok(owned_fd)
}
}

impl AsyncRead for Receiver {
Expand All @@ -1172,15 +1320,20 @@ impl AsFd for Receiver {
}
}

/// Checks if file is a FIFO
fn is_fifo(file: &File) -> io::Result<bool> {
Ok(file.metadata()?.file_type().is_fifo())
/// Checks if the file descriptor is a pipe or a FIFO.
fn is_pipe(fd: BorrowedFd<'_>) -> io::Result<bool> {
let mut stat: libc::stat = unsafe { std::mem::zeroed() };
let r = unsafe { libc::fstat(fd.as_raw_fd(), &mut stat) };
if r == -1 {
Err(io::Error::last_os_error())
} else {
Ok((stat.st_mode & libc::S_IFMT) == libc::S_IFIFO)
}
}

/// Gets file descriptor's flags by fcntl.
fn get_file_flags(file: &File) -> io::Result<libc::c_int> {
let fd = file.as_raw_fd();
let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
fn get_file_flags(fd: BorrowedFd<'_>) -> io::Result<libc::c_int> {
let flags = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
if flags < 0 {
Err(io::Error::last_os_error())
} else {
Expand All @@ -1200,18 +1353,33 @@ fn has_write_access(flags: libc::c_int) -> bool {
mode == libc::O_WRONLY || mode == libc::O_RDWR
}

/// Sets file's flags with `O_NONBLOCK` by fcntl.
fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()> {
let fd = file.as_raw_fd();

/// Sets file descriptor's flags with `O_NONBLOCK` by fcntl.
fn set_nonblocking(fd: BorrowedFd<'_>, current_flags: libc::c_int) -> io::Result<()> {
let flags = current_flags | libc::O_NONBLOCK;

if flags != current_flags {
let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags) };
let ret = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, flags) };
if ret < 0 {
return Err(io::Error::last_os_error());
}
}

Ok(())
}

/// Removes `O_NONBLOCK` from fd's flags.
fn set_blocking<T: AsRawFd>(fd: &T) -> io::Result<()> {
let previous = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
if previous == -1 {
return Err(io::Error::last_os_error());
}

let new = previous & !libc::O_NONBLOCK;

let r = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, new) };
if r == -1 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
Loading

0 comments on commit e5507a0

Please sign in to comment.