Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn main() -> Result<()> {
});

// Receive an incoming FUSE request from the kernel.
let mut buf = session.new_fallback_buffer();
let mut buf = session.new_request_buffer()?;
while session.recv_request(&device, &mut buf)? {
let (req, op, _remains) = session.decode(&device, &mut buf)?;
match op {
Expand Down
4 changes: 2 additions & 2 deletions examples/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ fn main() -> Result<()> {
}
});

let mut buf = session.new_splice_buffer()?;
let mut buf = session.new_request_buffer()?;
while session.recv_request(device, &mut buf)? {
let (req, op, remains) = session.decode(device, &mut buf)?;
let (req, op, mut remains) = session.decode(device, &mut buf)?;
match op {
Operation::Getattr(op) => {
if op.ino == NodeID::ROOT {
Expand Down
2 changes: 1 addition & 1 deletion examples/heartbeat_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn main() -> Result<()> {
}
});

let mut buf = session.new_splice_buffer()?;
let mut buf = session.new_request_buffer()?;
while session.recv_request(device, &mut buf)? {
let (req, op, _remains) = session.decode(device, &mut buf)?;
match op {
Expand Down
2 changes: 1 addition & 1 deletion examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn main() -> Result<()> {
}
});

let mut buf = session.new_splice_buffer()?;
let mut buf = session.new_request_buffer()?;
while session.recv_request(&device, &mut buf)? {
let (req, op, _remains) = session.decode(&device, &mut buf)?;
match op {
Expand Down
2 changes: 1 addition & 1 deletion examples/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn main() -> Result<()> {
}
});

let mut buf = session.new_splice_buffer()?;
let mut buf = session.new_request_buffer()?;
while session.recv_request(device, &mut buf)? {
let (req, op, _remains) = session.decode(device, &mut buf)?;
match op {
Expand Down
175 changes: 97 additions & 78 deletions src/buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,22 +127,75 @@ impl InHeader {
}
}

/// The trait that represents the receiving process of an incoming FUSE request from the kernel.
pub trait TryReceive<T: ?Sized> {
fn try_receive(&mut self, conn: &mut T) -> io::Result<&InHeader>;
pub struct RequestBuf {
header: InHeader,
kind: BufKind,
}

enum BufKind {
Pipe(PipeBuf),
Vec(VecBuf),
}

pub trait ToParts {
/// The type of object for reading the remaining part of received request.
type Data<'a>
impl RequestBuf {
pub(crate) fn new_pipe(bufsize: usize) -> io::Result<Self> {
Ok(Self {
header: InHeader {
raw: fuse_in_header::new_zeroed(),
},
kind: BufKind::Pipe(PipeBuf {
arg: {
let capacity = FUSE_MIN_READ_BUFFER as usize - mem::size_of::<fuse_in_header>();
let mut vec = vec![0; capacity]; // ensure that the underlying buffer is zeroed.
vec.truncate(0);
vec
},
pipe: Pipe::new(PipeFlags::NONBLOCK)?,
bufsize,
}),
})
}

pub(crate) fn new_vec(bufsize: usize) -> Self {
Self {
header: InHeader {
raw: fuse_in_header::new_zeroed(),
},
kind: BufKind::Vec(VecBuf {
arg: vec![0u8; bufsize - mem::size_of::<fuse_in_header>()].into_boxed_slice(),
pos: 0,
}),
}
}

pub(crate) fn try_receive<T: ?Sized>(&mut self, conn: &mut T) -> io::Result<&InHeader>
where
Self: 'a;
T: SpliceRead,
{
match &mut self.kind {
BufKind::Pipe(buf) => buf.try_receive(&mut self.header, conn)?,
BufKind::Vec(buf) => buf.try_receive(&mut self.header, conn)?,
}

fn to_parts(&mut self) -> (&InHeader, &[u8], Self::Data<'_>);
Ok(&self.header)
}

pub(crate) fn to_parts(&mut self) -> (&InHeader, &[u8], RemainingData<'_>) {
match &mut self.kind {
BufKind::Pipe(buf) => (
&self.header,
&buf.arg[..],
RemainingData::Pipe(&mut buf.pipe),
),
BufKind::Vec(buf) => {
let (arg, remains) = buf.arg.split_at(buf.pos);
(&self.header, arg, RemainingData::Vec(remains))
}
}
}
}

pub struct SpliceBuf {
header: InHeader,
struct PipeBuf {
// MEMO:
// * 再アロケートされる可能性があるので Vec<u8> で持つ
// * デフォルトの system allocator を使用している限りは alignment の心配をする必要は基本的はないはず (malloc依存)
Expand All @@ -151,23 +204,7 @@ pub struct SpliceBuf {
bufsize: usize,
}

impl SpliceBuf {
pub fn new(bufsize: usize) -> io::Result<Self> {
Ok(Self {
header: InHeader {
raw: fuse_in_header::new_zeroed(),
},
arg: {
let capacity = FUSE_MIN_READ_BUFFER as usize - mem::size_of::<fuse_in_header>();
let mut vec = vec![0; capacity]; // ensure that the underlying buffer is zeroed.
vec.truncate(0);
vec
},
pipe: Pipe::new(PipeFlags::NONBLOCK)?,
bufsize,
})
}

impl PipeBuf {
fn reset(&mut self) -> io::Result<()> {
self.arg.truncate(0);
if !self.pipe.is_empty() {
Expand All @@ -176,92 +213,60 @@ impl SpliceBuf {
}
Ok(())
}
}

impl ToParts for SpliceBuf {
type Data<'a> = &'a mut Pipe;

fn to_parts(&mut self) -> (&InHeader, &[u8], Self::Data<'_>) {
(&self.header, &self.arg[..], &mut self.pipe)
}
}

impl<T: ?Sized> TryReceive<T> for SpliceBuf
where
T: SpliceRead,
{
fn try_receive(&mut self, conn: &mut T) -> io::Result<&InHeader> {
fn try_receive<T: ?Sized>(&mut self, header: &mut InHeader, conn: &mut T) -> io::Result<()>
where
T: SpliceRead,
{
self.reset()?;

let len = conn.splice_read(&mut self.pipe, self.bufsize, SpliceFlags::NONBLOCK)?;

if len < mem::size_of_val(&self.header.raw) {
if len < mem::size_of_val(&header.raw) {
Err(invalid_data("dequeued request message is too short"))?
}
self.pipe.read_exact(self.header.raw.as_mut_bytes())?;
self.pipe.read_exact(header.raw.as_mut_bytes())?;

if len != self.header.raw.len as usize {
if len != header.raw.len as usize {
Err(invalid_data(
"The value in_header.len is mismatched to the result of splice(2)",
))?
}

self.arg.resize(self.header.arg_len(), 0);
self.arg.resize(header.arg_len(), 0);
self.pipe.read_exact(&mut self.arg[..])?;

Ok(&self.header)
Ok(())
}
}

pub struct FallbackBuf {
header: InHeader,
struct VecBuf {
// どうせ再アロケートすることはないので、最初に確保した分で固定してしまう
arg: Box<[u8]>,
pos: usize,
}

impl FallbackBuf {
pub fn new(bufsize: usize) -> Self {
Self {
header: InHeader {
raw: fuse_in_header::new_zeroed(),
},
arg: vec![0u8; bufsize - mem::size_of::<fuse_in_header>()].into_boxed_slice(),
pos: 0,
}
}
}

impl ToParts for FallbackBuf {
type Data<'a> = &'a [u8];

fn to_parts(&mut self) -> (&InHeader, &[u8], Self::Data<'_>) {
let (arg, remains) = self.arg.split_at(self.pos);
(&self.header, arg, remains)
}
}

impl<T: ?Sized> TryReceive<T> for FallbackBuf
where
T: io::Read,
{
fn try_receive(&mut self, conn: &mut T) -> io::Result<&InHeader> {
impl VecBuf {
fn try_receive<T: ?Sized>(&mut self, header: &mut InHeader, conn: &mut T) -> io::Result<()>
where
T: io::Read,
{
self.pos = 0;

let len = conn.read_vectored(&mut [
io::IoSliceMut::new(self.header.raw.as_mut_bytes()),
io::IoSliceMut::new(header.raw.as_mut_bytes()),
io::IoSliceMut::new(&mut self.arg[..]),
])?;

if len != self.header.raw.len as usize {
if len != header.raw.len as usize {
Err(invalid_data(
"The value in_header.len is mismatched to the result of readv(2)",
))?
}

self.pos = self.header.arg_len();
self.pos = header.arg_len();

Ok(&self.header)
Ok(())
}
}

Expand All @@ -271,3 +276,17 @@ where
{
io::Error::new(io::ErrorKind::InvalidData, source)
}

pub enum RemainingData<'buf> {
Pipe(&'buf mut Pipe),
Vec(&'buf [u8]),
}

impl io::Read for RemainingData<'_> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
Self::Pipe(remains) => remains.read(buf),
Self::Vec(remains) => remains.read(buf),
}
}
}
4 changes: 2 additions & 2 deletions src/connect.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
buf::{FallbackBuf, ToParts as _, TryReceive as _},
buf::RequestBuf,
device::Device,
init::{InitIn, KernelConfig, NegotiationError},
mount::{Mount, MountOptions},
Expand All @@ -25,7 +25,7 @@ where
let (fd, mount) = crate::mount::mount(mountpoint.into(), mountopts)?;
let device = Device::from_fd(fd);

let mut buf = FallbackBuf::new(FUSE_MIN_READ_BUFFER as usize);
let mut buf = RequestBuf::new_vec(FUSE_MIN_READ_BUFFER as usize);
loop {
buf.try_receive(&mut &device)?;
let (header, arg, _remains) = buf.to_parts();
Expand Down
30 changes: 11 additions & 19 deletions src/session.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{
buf::{FallbackBuf, InHeader, SpliceBuf, ToParts, TryReceive},
buf::{InHeader, RemainingData, RequestBuf},
bytes::Bytes,
init::{KernelConfig, KernelFlags},
io::SpliceRead,
msg::{send_msg, MessageKind},
op::{DecodeError, Operation},
reply::ReplySender,
Expand Down Expand Up @@ -51,18 +52,14 @@ impl Session {
+ self.config.max_write as usize
}

pub fn new_splice_buffer(&self) -> io::Result<SpliceBuf> {
pub fn new_request_buffer(&self) -> io::Result<RequestBuf> {
if self.config.flags.contains(KernelFlags::SPLICE_READ) {
SpliceBuf::new(self.request_buffer_size())
RequestBuf::new_pipe(self.request_buffer_size())
} else {
Err(Errno::NOTSUP.into())
Ok(RequestBuf::new_vec(self.request_buffer_size()))
}
}

pub fn new_fallback_buffer(&self) -> FallbackBuf {
FallbackBuf::new(self.request_buffer_size())
}

#[inline]
pub fn exited(&self) -> bool {
self.exited.load(Ordering::Acquire)
Expand All @@ -74,9 +71,9 @@ impl Session {
}

/// Receive an incoming FUSE request from the kernel.
pub fn recv_request<T, B>(&self, mut conn: T, buf: &mut B) -> io::Result<bool>
pub fn recv_request<T>(&self, mut conn: T, buf: &mut RequestBuf) -> io::Result<bool>
where
B: TryReceive<T>,
T: SpliceRead,
{
if self.exited() {
return Ok(false);
Expand Down Expand Up @@ -119,14 +116,13 @@ impl Session {
/// If anything else (including cloning with `FUSE_IOC_CLONE`) is specified,
/// the corresponding kernel processing will be isolated, and the process
/// that issued the associated syscall may enter a deadlock state.
pub fn decode<'req, T, B>(
pub fn decode<'req, T>(
&'req self,
conn: T,
buf: &'req mut B,
) -> Result<RequestParts<'req, T, B>, DecodeError>
buf: &'req mut RequestBuf,
) -> Result<RequestParts<'req, T>, DecodeError>
where
T: io::Write,
B: ToParts,
{
let (header, arg, remains) = buf.to_parts();
let op = Operation::decode(&self.config, header, arg)?;
Expand Down Expand Up @@ -164,11 +160,7 @@ impl Session {
}
}

pub type RequestParts<'req, T, B> = (
Request<'req, T>,
Operation<'req>,
<B as ToParts>::Data<'req>,
);
pub type RequestParts<'req, T> = (Request<'req, T>, Operation<'req>, RemainingData<'req>);

pub struct Request<'req, T> {
session: &'req Session,
Expand Down