1 //! Asynchronous fuse implementation.
3 use std
::collections
::BTreeMap
;
4 use std
::convert
::TryFrom
;
5 use std
::ffi
::{OsStr, OsString}
;
6 use std
::future
::Future
;
10 use std
::os
::unix
::ffi
::OsStrExt
;
13 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
14 use std
::sync
::{Arc, RwLock}
;
15 use std
::task
::{Context, Poll}
;
17 use anyhow
::{format_err, Error}
;
18 use futures
::channel
::mpsc
::UnboundedSender
;
20 use futures
::sink
::SinkExt
;
21 use futures
::stream
::{StreamExt, TryStreamExt}
;
23 use proxmox
::tools
::vec
;
24 use pxar
::accessor
::{self, EntryRangeInfo, ReadAt}
;
26 use proxmox_fuse
::requests
::{self, FuseRequest}
;
27 use proxmox_fuse
::{EntryParam, Fuse, ReplyBufState, Request, ROOT_ID}
;
29 use crate::tools
::xattr
;
31 /// We mark inodes for regular files this way so we know how to access them.
32 const NON_DIRECTORY_INODE
: u64 = 1u64 << 63;
35 fn is_dir_inode(inode
: u64) -> bool
{
36 0 == (inode
& NON_DIRECTORY_INODE
)
39 /// Our reader type instance used for accessors.
40 pub type Reader
= Arc
<dyn ReadAt
+ Send
+ Sync
+ '
static>;
42 /// Our Accessor type instance.
43 pub type Accessor
= accessor
::aio
::Accessor
<Reader
>;
45 /// Our Directory type instance.
46 pub type Directory
= accessor
::aio
::Directory
<Reader
>;
48 /// Our FileEntry type instance.
49 pub type FileEntry
= accessor
::aio
::FileEntry
<Reader
>;
51 /// Our FileContents type instance.
52 pub type FileContents
= accessor
::aio
::FileContents
<Reader
>;
55 fut
: Pin
<Box
<dyn Future
<Output
= Result
<(), Error
>> + Send
+ Sync
+ '
static>>,
59 /// Create a fuse session for an archive.
60 pub async
fn mount_path(
65 ) -> Result
<Self, Error
> {
66 // TODO: Add a buffered/caching ReadAt layer?
67 let file
= std
::fs
::File
::open(archive_path
)?
;
68 let file_size
= file
.metadata()?
.len();
69 let reader
: Reader
= Arc
::new(accessor
::sync
::FileReader
::new(file
));
70 let accessor
= Accessor
::new(reader
, file_size
).await?
;
71 Self::mount(accessor
, options
, verbose
, mountpoint
)
74 /// Create a new fuse session for the given pxar `Accessor`.
80 ) -> Result
<Self, Error
> {
81 let fuse
= Fuse
::builder("pxar-mount")?
91 let session
= SessionImpl
::new(accessor
, verbose
);
94 fut
: Box
::pin(session
.main(fuse
)),
99 impl Future
for Session
{
100 type Output
= Result
<(), Error
>;
102 fn poll(mut self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Self::Output
> {
103 Pin
::new(&mut self.fut
).poll(cx
)
107 /// We use this to return an errno value back to the kernel.
108 macro_rules
! io_return
{
110 return Err(::std
::io
::Error
::from_raw_os_error($errno
).into());
114 /// Format an "other" error, see `io_bail` below for details.
115 macro_rules
! io_format_err
{
117 ::std
::io
::Error
::new(::std
::io
::ErrorKind
::Other
, format
!($
($fmt
)*))
121 /// We use this to bail out of a functionin an unexpected error case. This will cause the fuse
122 /// request to be answered with a generic `EIO` error code. The error message contained in here
123 /// will be printed to stdout if the verbose flag is used, otherwise silently dropped.
124 macro_rules
! io_bail
{
125 ($
($fmt
:tt
)*) => { return Err(io_format_err!($($fmt)*).into()); }
128 /// This is what we need to cache as a "lookup" entry. The kernel assumes that these are easily
135 entry_range_info
: EntryRangeInfo
,
136 content_range
: Option
<Range
<u64>>,
143 entry_range_info
: EntryRangeInfo
,
144 content_range
: Option
<Range
<u64>>,
147 refs
: AtomicUsize
::new(1),
155 /// Decrease the reference count by `count`. Note that this must not include the reference held
156 /// by `self` itself, so this must not decrease the count below 2.
157 fn forget(&self, count
: usize) -> Result
<(), Error
> {
159 let old
= self.refs
.load(Ordering
::Acquire
);
161 io_bail
!("reference count underflow");
163 let new
= old
- count
;
166 .compare_exchange(old
, new
, Ordering
::SeqCst
, Ordering
::SeqCst
)
168 Ok(_
) => break Ok(()),
174 fn get_ref
<'a
>(&self, session
: &'a SessionImpl
) -> LookupRef
<'a
> {
175 if self.refs
.fetch_add(1, Ordering
::AcqRel
) == 0 {
176 panic
!("atomic refcount increased from 0 to 1");
181 lookup
: self as *const Lookup
,
186 struct LookupRef
<'a
> {
187 session
: &'a SessionImpl
,
188 lookup
: *const Lookup
,
191 unsafe impl<'a
> Send
for LookupRef
<'a
> {}
192 unsafe impl<'a
> Sync
for LookupRef
<'a
> {}
194 impl<'a
> Clone
for LookupRef
<'a
> {
195 fn clone(&self) -> Self {
196 self.get_ref(self.session
)
200 impl<'a
> std
::ops
::Deref
for LookupRef
<'a
> {
201 type Target
= Lookup
;
203 fn deref(&self) -> &Self::Target
{
204 unsafe { &*self.lookup }
208 impl<'a
> Drop
for LookupRef
<'a
> {
210 if self.lookup
.is_null() {
214 if self.refs
.fetch_sub(1, Ordering
::AcqRel
) == 1 {
215 let inode
= self.inode
;
216 drop(self.session
.lookups
.write().unwrap().remove(&inode
));
221 impl<'a
> LookupRef
<'a
> {
222 fn leak(mut self) -> &'a Lookup
{
223 unsafe { &*mem::replace(&mut self.lookup, std::ptr::null()) }
230 lookups
: RwLock
<BTreeMap
<u64, Box
<Lookup
>>>,
234 fn new(accessor
: Accessor
, verbose
: bool
) -> Self {
235 let root
= Lookup
::new(
238 EntryRangeInfo
::toplevel(0..accessor
.size()),
242 let mut tree
= BTreeMap
::new();
243 tree
.insert(ROOT_ID
, root
);
248 lookups
: RwLock
::new(tree
),
252 /// Here's how we deal with errors:
254 /// Any error will be printed if the verbose flag was set, otherwise the message will be
255 /// silently dropped.
257 /// Opaque errors will cause the fuse main loop to bail out with that error.
259 /// `io::Error`s will cause the fuse request to responded to with the given `io::Error`. An
260 /// `io::ErrorKind::Other` translates to a generic `EIO`.
263 request
: impl FuseRequest
,
265 mut sender
: UnboundedSender
<Error
>,
267 let final_result
= match err
.downcast
::<io
::Error
>() {
269 if err
.kind() == io
::ErrorKind
::Other
&& self.verbose
{
270 eprintln
!("an IO error occurred: {}", err
);
274 request
.io_fail(err
).map_err(Error
::from
)
277 // `bail` (non-`io::Error`) is used for fatal errors which should actually cancel:
279 eprintln
!("internal error: {}, bailing out", err
);
284 if let Err(err
) = final_result
{
285 // either we failed to send the error code to fuse, or the above was not an
286 // `io::Error`, so in this case notify the main loop:
290 .expect("failed to propagate error to main loop");
294 async
fn main(self, fuse
: Fuse
) -> Result
<(), Error
> {
295 Arc
::new(self).main_do(fuse
).await
298 async
fn main_do(self: Arc
<Self>, fuse
: Fuse
) -> Result
<(), Error
> {
299 let (err_send
, mut err_recv
) = futures
::channel
::mpsc
::unbounded
::<Error
>();
300 let mut fuse
= fuse
.fuse(); // make this a futures::stream::FusedStream!
303 request
= fuse
.try_next() => match request?
{
305 tokio
::spawn(Arc
::clone(&self).handle_request(request
, err_send
.clone()));
309 err
= err_recv
.next() => match err
{
310 Some(err
) => if self.verbose
{
311 eprintln
!("cancelling fuse main loop due to error: {}", err
);
314 None
=> panic
!("error channel was closed unexpectedly"),
321 async
fn handle_request(
324 mut err_sender
: UnboundedSender
<Error
>,
326 let result
: Result
<(), Error
> = match request
{
327 Request
::Lookup(request
) => {
328 match self.lookup(request
.parent
, &request
.file_name
).await
{
329 Ok((entry
, lookup
)) => match request
.reply(&entry
) {
334 Err(err
) => Err(Error
::from(err
)),
336 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
339 Request
::Forget(request
) => match self.forget(request
.inode
, request
.count
as usize) {
344 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
346 Request
::Getattr(request
) => match self.getattr(request
.inode
).await
{
347 Ok(stat
) => request
.reply(&stat
, std
::f64::MAX
).map_err(Error
::from
),
348 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
350 Request
::ReaddirPlus(mut request
) => match self.readdirplus(&mut request
).await
{
351 Ok(lookups
) => match request
.reply() {
358 Err(err
) => Err(Error
::from(err
)),
360 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
362 Request
::Read(request
) => {
363 match self.read(request
.inode
, request
.size
, request
.offset
).await
{
364 Ok(data
) => request
.reply(&data
).map_err(Error
::from
),
365 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
368 Request
::Readlink(request
) => match self.readlink(request
.inode
).await
{
369 Ok(data
) => request
.reply(&data
).map_err(Error
::from
),
370 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
372 Request
::ListXAttrSize(request
) => match self.listxattrs(request
.inode
).await
{
376 .fold(0, |sum
, i
| sum
+ i
.name().to_bytes_with_nul().len()),
378 .map_err(Error
::from
),
379 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
381 Request
::ListXAttr(mut request
) => match self.listxattrs_into(&mut request
).await
{
382 Ok(ReplyBufState
::Ok
) => request
.reply().map_err(Error
::from
),
383 Ok(ReplyBufState
::Full
) => request
.fail_full().map_err(Error
::from
),
384 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
386 Request
::GetXAttrSize(request
) => {
387 match self.getxattr(request
.inode
, &request
.attr_name
).await
{
388 Ok(xattr
) => request
.reply(xattr
.value().len()).map_err(Error
::from
),
389 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
392 Request
::GetXAttr(request
) => {
393 match self.getxattr(request
.inode
, &request
.attr_name
).await
{
394 Ok(xattr
) => request
.reply(xattr
.value()).map_err(Error
::from
),
395 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
400 eprintln
!("Received unexpected fuse request");
402 other
.fail(libc
::ENOSYS
).map_err(Error
::from
)
406 if let Err(err
) = result
{
410 .expect("failed to propagate error to main loop");
414 fn get_lookup(&self, inode
: u64) -> Result
<LookupRef
, Error
> {
415 let lookups
= self.lookups
.read().unwrap();
416 if let Some(lookup
) = lookups
.get(&inode
) {
417 return Ok(lookup
.get_ref(self));
419 io_return
!(libc
::ENOENT
);
422 async
fn open_dir(&self, inode
: u64) -> Result
<Directory
, Error
> {
423 if inode
== ROOT_ID
{
424 Ok(self.accessor
.open_root().await?
)
425 } else if !is_dir_inode(inode
) {
426 io_return
!(libc
::ENOTDIR
);
428 Ok(unsafe { self.accessor.open_dir_at_end(inode).await? }
)
432 async
fn open_entry(&self, lookup
: &LookupRef
<'_
>) -> io
::Result
<FileEntry
> {
435 .open_file_at_range(&lookup
.entry_range_info
)
440 fn open_content(&self, lookup
: &LookupRef
) -> Result
<FileContents
, Error
> {
441 if is_dir_inode(lookup
.inode
) {
442 io_return
!(libc
::EISDIR
);
445 match lookup
.content_range
.clone() {
446 Some(range
) => Ok(unsafe { self.accessor.open_contents_at_range(range) }
),
447 None
=> io_return
!(libc
::EBADF
),
451 fn make_lookup(&self, parent
: u64, inode
: u64, entry
: &FileEntry
) -> Result
<LookupRef
, Error
> {
452 let lookups
= self.lookups
.read().unwrap();
453 if let Some(lookup
) = lookups
.get(&inode
) {
454 return Ok(lookup
.get_ref(self));
458 let entry
= Lookup
::new(
461 entry
.entry_range_info().clone(),
462 entry
.content_range()?
,
464 let reference
= entry
.get_ref(self);
465 entry
.refs
.store(1, Ordering
::Release
);
467 let mut lookups
= self.lookups
.write().unwrap();
468 if let Some(lookup
) = lookups
.get(&inode
) {
469 return Ok(lookup
.get_ref(self));
472 lookups
.insert(inode
, entry
);
477 fn forget(&self, inode
: u64, count
: usize) -> Result
<(), Error
> {
478 let node
= self.get_lookup(inode
)?
;
487 ) -> Result
<(EntryParam
, LookupRef
<'_
>), Error
> {
488 let dir
= self.open_dir(parent
).await?
;
490 let entry
= match { dir }
.lookup(file_name
).await?
{
491 Some(entry
) => entry
,
492 None
=> io_return
!(libc
::ENOENT
),
495 let entry
= if let pxar
::EntryKind
::Hardlink(_
) = entry
.kind() {
496 // we don't know the file's end-offset, so we'll just allow the decoder to decode the
497 // entire rest of the archive until we figure out something better...
498 let entry
= self.accessor
.follow_hardlink(&entry
).await?
;
500 if let pxar
::EntryKind
::Hardlink(_
) = entry
.kind() {
501 // hardlinks must not point to other hardlinks...
502 io_return
!(libc
::ELOOP
);
510 let response
= to_entry(&entry
)?
;
511 let inode
= response
.inode
;
512 Ok((response
, self.make_lookup(parent
, inode
, &entry
)?
))
515 async
fn getattr(&self, inode
: u64) -> Result
<libc
::stat
, Error
> {
517 self.accessor
.open_file_at_range(&self.get_lookup(inode
)?
.entry_range_info
).await?
519 to_stat(inode
, &entry
)
522 async
fn readdirplus(
524 request
: &mut requests
::ReaddirPlus
,
525 ) -> Result
<Vec
<LookupRef
<'_
>>, Error
> {
526 let mut lookups
= Vec
::new();
527 let offset
= usize::try_from(request
.offset
)
528 .map_err(|_
| io_format_err
!("directory offset out of range"))?
;
530 let dir
= self.open_dir(request
.inode
).await?
;
531 let dir_lookup
= self.get_lookup(request
.inode
)?
;
533 let entry_count
= dir
.read_dir().count() as isize;
535 let mut next
= offset
as isize;
536 let mut iter
= dir
.read_dir().skip(offset
);
537 while let Some(file
) = iter
.next().await
{
539 let file
= file?
.decode_entry().await?
;
540 let stat
= to_stat(to_inode(&file
), &file
)?
;
541 let name
= file
.file_name();
542 match request
.add_entry(name
, &stat
, next
, 1, std
::f64::MAX
, std
::f64::MAX
)?
{
543 ReplyBufState
::Ok
=> (),
544 ReplyBufState
::Full
=> return Ok(lookups
),
546 lookups
.push(self.make_lookup(request
.inode
, stat
.st_ino
, &file
)?
);
549 if next
== entry_count
{
551 let file
= dir
.lookup_self().await?
;
552 let stat
= to_stat(to_inode(&file
), &file
)?
;
553 let name
= OsStr
::new(".");
554 match request
.add_entry(name
, &stat
, next
, 1, std
::f64::MAX
, std
::f64::MAX
)?
{
555 ReplyBufState
::Ok
=> (),
556 ReplyBufState
::Full
=> return Ok(lookups
),
558 lookups
.push(LookupRef
::clone(&dir_lookup
));
561 if next
== entry_count
+ 1 {
563 let lookup
= self.get_lookup(dir_lookup
.parent
)?
;
564 let parent_dir
= self.open_dir(lookup
.inode
).await?
;
565 let file
= parent_dir
.lookup_self().await?
;
566 let stat
= to_stat(to_inode(&file
), &file
)?
;
567 let name
= OsStr
::new("..");
568 match request
.add_entry(name
, &stat
, next
, 1, std
::f64::MAX
, std
::f64::MAX
)?
{
569 ReplyBufState
::Ok
=> (),
570 ReplyBufState
::Full
=> return Ok(lookups
),
572 lookups
.push(lookup
);
578 async
fn read(&self, inode
: u64, len
: usize, offset
: u64) -> Result
<Vec
<u8>, Error
> {
579 let file
= self.get_lookup(inode
)?
;
580 let content
= self.open_content(&file
)?
;
581 let mut buf
= vec
::undefined(len
);
582 let got
= content
.read_at(&mut buf
, offset
).await?
;
587 async
fn readlink(&self, inode
: u64) -> Result
<OsString
, Error
> {
588 let lookup
= self.get_lookup(inode
)?
;
589 let file
= self.open_entry(&lookup
).await?
;
590 match file
.get_symlink() {
591 None
=> io_return
!(libc
::EINVAL
),
592 Some(link
) => Ok(link
.to_owned()),
596 async
fn listxattrs(&self, inode
: u64) -> Result
<Vec
<pxar
::format
::XAttr
>, Error
> {
597 let lookup
= self.get_lookup(inode
)?
;
604 let mut xattrs
= metadata
.xattrs
;
606 use pxar
::format
::XAttr
;
608 if let Some(fcaps
) = metadata
.fcaps
{
609 xattrs
.push(XAttr
::new(xattr
::xattr_name_fcaps().to_bytes(), fcaps
.data
));
612 // TODO: Special cases:
613 // b"system.posix_acl_access
614 // b"system.posix_acl_default
616 // For these we need to be able to create posix acl format entries, at that point we could
617 // just ditch libacl as well...
622 async
fn listxattrs_into(
624 request
: &mut requests
::ListXAttr
,
625 ) -> Result
<ReplyBufState
, Error
> {
626 let xattrs
= self.listxattrs(request
.inode
).await?
;
628 for entry
in xattrs
{
629 match request
.add_c_string(entry
.name()) {
630 ReplyBufState
::Ok
=> (),
631 ReplyBufState
::Full
=> return Ok(ReplyBufState
::Full
),
635 Ok(ReplyBufState
::Ok
)
638 async
fn getxattr(&self, inode
: u64, xattr
: &OsStr
) -> Result
<pxar
::format
::XAttr
, Error
> {
639 // TODO: pxar::Accessor could probably get a more optimized method to fetch a specific
640 // xattr for an entry...
641 let xattrs
= self.listxattrs(inode
).await?
;
642 for entry
in xattrs
{
643 if entry
.name().to_bytes() == xattr
.as_bytes() {
647 io_return
!(libc
::ENODATA
);
652 fn to_entry(entry
: &FileEntry
) -> Result
<EntryParam
, Error
> {
653 to_entry_param(to_inode(&entry
), &entry
)
657 fn to_inode(entry
: &FileEntry
) -> u64 {
659 entry
.entry_range_info().entry_range
.end
661 entry
.entry_range_info().entry_range
.start
| NON_DIRECTORY_INODE
665 fn to_entry_param(inode
: u64, entry
: &pxar
::Entry
) -> Result
<EntryParam
, Error
> {
666 Ok(EntryParam
::simple(inode
, to_stat(inode
, entry
)?
))
669 fn to_stat(inode
: u64, entry
: &pxar
::Entry
) -> Result
<libc
::stat
, Error
> {
670 let nlink
= if entry
.is_dir() { 2 }
else { 1 }
;
672 let metadata
= entry
.metadata();
674 let mut stat
: libc
::stat
= unsafe { mem::zeroed() }
;
676 stat
.st_nlink
= nlink
;
677 stat
.st_mode
= u32::try_from(metadata
.stat
.mode
)
678 .map_err(|err
| format_err
!("mode does not fit into st_mode field: {}", err
))?
;
679 stat
.st_size
= i64::try_from(entry
.file_size().unwrap_or(0))
680 .map_err(|err
| format_err
!("size does not fit into st_size field: {}", err
))?
;
681 stat
.st_uid
= metadata
.stat
.uid
;
682 stat
.st_gid
= metadata
.stat
.gid
;
683 stat
.st_atime
= metadata
.stat
.mtime
.secs
;
684 stat
.st_atime_nsec
= metadata
.stat
.mtime
.nanos
as _
;
685 stat
.st_mtime
= metadata
.stat
.mtime
.secs
;
686 stat
.st_mtime_nsec
= metadata
.stat
.mtime
.nanos
as _
;
687 stat
.st_ctime
= metadata
.stat
.mtime
.secs
;
688 stat
.st_ctime_nsec
= metadata
.stat
.mtime
.nanos
as _
;