1 //! Asynchronous fuse implementation.
3 use std
::collections
::BTreeMap
;
4 use std
::convert
::TryFrom
;
5 use std
::ffi
::{OsStr, OsString}
;
6 use std
::future
::Future
;
10 use std
::os
::unix
::ffi
::OsStrExt
;
13 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
14 use std
::sync
::{Arc, RwLock}
;
15 use std
::task
::{Context, Poll}
;
17 use anyhow
::{format_err, Error}
;
18 use futures
::channel
::mpsc
::UnboundedSender
;
20 use futures
::sink
::SinkExt
;
21 use futures
::stream
::{StreamExt, TryStreamExt}
;
24 use pxar
::accessor
::{self, EntryRangeInfo, ReadAt}
;
26 use proxmox_fuse
::requests
::{self, FuseRequest}
;
27 use proxmox_fuse
::{EntryParam, Fuse, ReplyBufState, Request, ROOT_ID}
;
28 use proxmox_sys
::fs
::xattr
;
30 /// We mark inodes for regular files this way so we know how to access them.
31 const NON_DIRECTORY_INODE
: u64 = 1u64 << 63;
34 fn is_dir_inode(inode
: u64) -> bool
{
35 0 == (inode
& NON_DIRECTORY_INODE
)
38 /// Our reader type instance used for accessors.
39 pub type Reader
= Arc
<dyn ReadAt
+ Send
+ Sync
+ '
static>;
41 /// Our Accessor type instance.
42 pub type Accessor
= accessor
::aio
::Accessor
<Reader
>;
44 /// Our Directory type instance.
45 pub type Directory
= accessor
::aio
::Directory
<Reader
>;
47 /// Our FileEntry type instance.
48 pub type FileEntry
= accessor
::aio
::FileEntry
<Reader
>;
50 /// Our FileContents type instance.
51 pub type FileContents
= accessor
::aio
::FileContents
<Reader
>;
54 fut
: Pin
<Box
<dyn Future
<Output
= Result
<(), Error
>> + Send
+ Sync
+ '
static>>,
58 /// Create a fuse session for an archive.
59 pub async
fn mount_path(
64 ) -> Result
<Self, Error
> {
65 // TODO: Add a buffered/caching ReadAt layer?
66 let file
= std
::fs
::File
::open(archive_path
)?
;
67 let file_size
= file
.metadata()?
.len();
68 let reader
: Reader
= Arc
::new(accessor
::sync
::FileReader
::new(file
));
69 let accessor
= Accessor
::new(reader
, file_size
).await?
;
70 Self::mount(accessor
, options
, verbose
, mountpoint
)
73 /// Create a new fuse session for the given pxar `Accessor`.
79 ) -> Result
<Self, Error
> {
80 let fuse
= Fuse
::builder("pxar-mount")?
90 let session
= SessionImpl
::new(accessor
, verbose
);
93 fut
: Box
::pin(session
.main(fuse
)),
98 impl Future
for Session
{
99 type Output
= Result
<(), Error
>;
101 fn poll(mut self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Self::Output
> {
102 Pin
::new(&mut self.fut
).poll(cx
)
106 /// We use this to return an errno value back to the kernel.
107 macro_rules
! io_return
{
109 return Err(::std
::io
::Error
::from_raw_os_error($errno
).into());
113 /// Format an "other" error, see `io_bail` below for details.
114 macro_rules
! io_format_err
{
116 ::std
::io
::Error
::new(::std
::io
::ErrorKind
::Other
, format
!($
($fmt
)*))
120 /// We use this to bail out of a functionin an unexpected error case. This will cause the fuse
121 /// request to be answered with a generic `EIO` error code. The error message contained in here
122 /// will be printed to stdout if the verbose flag is used, otherwise silently dropped.
123 macro_rules
! io_bail
{
124 ($
($fmt
:tt
)*) => { return Err(io_format_err!($($fmt)*).into()); }
127 /// This is what we need to cache as a "lookup" entry. The kernel assumes that these are easily
134 entry_range_info
: EntryRangeInfo
,
135 content_range
: Option
<Range
<u64>>,
142 entry_range_info
: EntryRangeInfo
,
143 content_range
: Option
<Range
<u64>>,
146 refs
: AtomicUsize
::new(1),
154 /// Decrease the reference count by `count`. Note that this must not include the reference held
155 /// by `self` itself, so this must not decrease the count below 2.
156 fn forget(&self, count
: usize) -> Result
<(), Error
> {
158 let old
= self.refs
.load(Ordering
::Acquire
);
160 io_bail
!("reference count underflow");
162 let new
= old
- count
;
165 .compare_exchange(old
, new
, Ordering
::SeqCst
, Ordering
::SeqCst
)
167 Ok(_
) => break Ok(()),
173 fn get_ref
<'a
>(&self, session
: &'a SessionImpl
) -> LookupRef
<'a
> {
174 if self.refs
.fetch_add(1, Ordering
::AcqRel
) == 0 {
175 panic
!("atomic refcount increased from 0 to 1");
180 lookup
: self as *const Lookup
,
185 struct LookupRef
<'a
> {
186 session
: &'a SessionImpl
,
187 lookup
: *const Lookup
,
190 unsafe impl<'a
> Send
for LookupRef
<'a
> {}
191 unsafe impl<'a
> Sync
for LookupRef
<'a
> {}
193 impl<'a
> Clone
for LookupRef
<'a
> {
194 fn clone(&self) -> Self {
195 self.get_ref(self.session
)
199 impl<'a
> std
::ops
::Deref
for LookupRef
<'a
> {
200 type Target
= Lookup
;
202 fn deref(&self) -> &Self::Target
{
203 unsafe { &*self.lookup }
207 impl<'a
> Drop
for LookupRef
<'a
> {
209 if self.lookup
.is_null() {
213 if self.refs
.fetch_sub(1, Ordering
::AcqRel
) == 1 {
214 let inode
= self.inode
;
215 drop(self.session
.lookups
.write().unwrap().remove(&inode
));
220 impl<'a
> LookupRef
<'a
> {
221 fn leak(mut self) -> &'a Lookup
{
222 unsafe { &*mem::replace(&mut self.lookup, std::ptr::null()) }
229 lookups
: RwLock
<BTreeMap
<u64, Box
<Lookup
>>>,
233 fn new(accessor
: Accessor
, verbose
: bool
) -> Self {
234 let root
= Lookup
::new(
237 EntryRangeInfo
::toplevel(0..accessor
.size()),
241 let mut tree
= BTreeMap
::new();
242 tree
.insert(ROOT_ID
, root
);
247 lookups
: RwLock
::new(tree
),
251 /// Here's how we deal with errors:
253 /// Any error will be printed if the verbose flag was set, otherwise the message will be
254 /// silently dropped.
256 /// Opaque errors will cause the fuse main loop to bail out with that error.
258 /// `io::Error`s will cause the fuse request to responded to with the given `io::Error`. An
259 /// `io::ErrorKind::Other` translates to a generic `EIO`.
262 request
: impl FuseRequest
,
264 mut sender
: UnboundedSender
<Error
>,
266 let final_result
= match err
.downcast
::<io
::Error
>() {
268 if err
.kind() == io
::ErrorKind
::Other
&& self.verbose
{
269 eprintln
!("an IO error occurred: {}", err
);
273 request
.io_fail(err
).map_err(Error
::from
)
276 // `bail` (non-`io::Error`) is used for fatal errors which should actually cancel:
278 eprintln
!("internal error: {}, bailing out", err
);
283 if let Err(err
) = final_result
{
284 // either we failed to send the error code to fuse, or the above was not an
285 // `io::Error`, so in this case notify the main loop:
289 .expect("failed to propagate error to main loop");
293 async
fn main(self, fuse
: Fuse
) -> Result
<(), Error
> {
294 Arc
::new(self).main_do(fuse
).await
297 async
fn main_do(self: Arc
<Self>, fuse
: Fuse
) -> Result
<(), Error
> {
298 let (err_send
, mut err_recv
) = futures
::channel
::mpsc
::unbounded
::<Error
>();
299 let mut fuse
= fuse
.fuse(); // make this a futures::stream::FusedStream!
302 request
= fuse
.try_next() => match request?
{
304 tokio
::spawn(Arc
::clone(&self).handle_request(request
, err_send
.clone()));
308 err
= err_recv
.next() => match err
{
309 Some(err
) => if self.verbose
{
310 eprintln
!("cancelling fuse main loop due to error: {}", err
);
313 None
=> panic
!("error channel was closed unexpectedly"),
320 async
fn handle_request(
323 mut err_sender
: UnboundedSender
<Error
>,
325 let result
: Result
<(), Error
> = match request
{
326 Request
::Lookup(request
) => {
327 match self.lookup(request
.parent
, &request
.file_name
).await
{
328 Ok((entry
, lookup
)) => match request
.reply(&entry
) {
333 Err(err
) => Err(Error
::from(err
)),
335 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
338 Request
::Forget(request
) => match self.forget(request
.inode
, request
.count
as usize) {
343 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
345 Request
::Getattr(request
) => match self.getattr(request
.inode
).await
{
346 Ok(stat
) => request
.reply(&stat
, f64::MAX
).map_err(Error
::from
),
347 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
349 Request
::ReaddirPlus(mut request
) => match self.readdirplus(&mut request
).await
{
350 Ok(lookups
) => match request
.reply() {
357 Err(err
) => Err(Error
::from(err
)),
359 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
361 Request
::Read(request
) => {
362 match self.read(request
.inode
, request
.size
, request
.offset
).await
{
363 Ok(data
) => request
.reply(&data
).map_err(Error
::from
),
364 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
367 Request
::Readlink(request
) => match self.readlink(request
.inode
).await
{
368 Ok(data
) => request
.reply(&data
).map_err(Error
::from
),
369 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
371 Request
::ListXAttrSize(request
) => match self.listxattrs(request
.inode
).await
{
375 .fold(0, |sum
, i
| sum
+ i
.name().to_bytes_with_nul().len()),
377 .map_err(Error
::from
),
378 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
380 Request
::ListXAttr(mut request
) => match self.listxattrs_into(&mut request
).await
{
381 Ok(ReplyBufState
::Ok
) => request
.reply().map_err(Error
::from
),
382 Ok(ReplyBufState
::Full
) => request
.fail_full().map_err(Error
::from
),
383 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
385 Request
::GetXAttrSize(request
) => {
386 match self.getxattr(request
.inode
, &request
.attr_name
).await
{
387 Ok(xattr
) => request
.reply(xattr
.value().len()).map_err(Error
::from
),
388 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
391 Request
::GetXAttr(request
) => {
392 match self.getxattr(request
.inode
, &request
.attr_name
).await
{
393 Ok(xattr
) => request
.reply(xattr
.value()).map_err(Error
::from
),
394 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
399 eprintln
!("Received unexpected fuse request");
401 other
.fail(libc
::ENOSYS
).map_err(Error
::from
)
405 if let Err(err
) = result
{
409 .expect("failed to propagate error to main loop");
413 fn get_lookup(&self, inode
: u64) -> Result
<LookupRef
, Error
> {
414 let lookups
= self.lookups
.read().unwrap();
415 if let Some(lookup
) = lookups
.get(&inode
) {
416 return Ok(lookup
.get_ref(self));
418 io_return
!(libc
::ENOENT
);
421 async
fn open_dir(&self, inode
: u64) -> Result
<Directory
, Error
> {
422 if inode
== ROOT_ID
{
423 Ok(self.accessor
.open_root().await?
)
424 } else if !is_dir_inode(inode
) {
425 io_return
!(libc
::ENOTDIR
);
427 Ok(unsafe { self.accessor.open_dir_at_end(inode).await? }
)
431 async
fn open_entry(&self, lookup
: &LookupRef
<'_
>) -> io
::Result
<FileEntry
> {
434 .open_file_at_range(&lookup
.entry_range_info
)
439 fn open_content(&self, lookup
: &LookupRef
) -> Result
<FileContents
, Error
> {
440 if is_dir_inode(lookup
.inode
) {
441 io_return
!(libc
::EISDIR
);
444 match lookup
.content_range
.clone() {
445 Some(range
) => Ok(unsafe { self.accessor.open_contents_at_range(range) }
),
446 None
=> io_return
!(libc
::EBADF
),
450 fn make_lookup(&self, parent
: u64, inode
: u64, entry
: &FileEntry
) -> Result
<LookupRef
, Error
> {
451 let lookups
= self.lookups
.read().unwrap();
452 if let Some(lookup
) = lookups
.get(&inode
) {
453 return Ok(lookup
.get_ref(self));
457 let entry
= Lookup
::new(
460 entry
.entry_range_info().clone(),
461 entry
.content_range()?
,
463 let reference
= entry
.get_ref(self);
464 entry
.refs
.store(1, Ordering
::Release
);
466 let mut lookups
= self.lookups
.write().unwrap();
467 if let Some(lookup
) = lookups
.get(&inode
) {
468 return Ok(lookup
.get_ref(self));
471 lookups
.insert(inode
, entry
);
476 fn forget(&self, inode
: u64, count
: usize) -> Result
<(), Error
> {
477 let node
= self.get_lookup(inode
)?
;
486 ) -> Result
<(EntryParam
, LookupRef
<'_
>), Error
> {
487 let dir
= self.open_dir(parent
).await?
;
489 let entry
= match { dir }
.lookup(file_name
).await?
{
490 Some(entry
) => entry
,
491 None
=> io_return
!(libc
::ENOENT
),
494 let entry
= if let pxar
::EntryKind
::Hardlink(_
) = entry
.kind() {
495 // we don't know the file's end-offset, so we'll just allow the decoder to decode the
496 // entire rest of the archive until we figure out something better...
497 let entry
= self.accessor
.follow_hardlink(&entry
).await?
;
499 if let pxar
::EntryKind
::Hardlink(_
) = entry
.kind() {
500 // hardlinks must not point to other hardlinks...
501 io_return
!(libc
::ELOOP
);
509 let response
= to_entry(&entry
)?
;
510 let inode
= response
.inode
;
511 Ok((response
, self.make_lookup(parent
, inode
, &entry
)?
))
514 async
fn getattr(&self, inode
: u64) -> Result
<libc
::stat
, Error
> {
516 self.accessor
.open_file_at_range(&self.get_lookup(inode
)?
.entry_range_info
).await?
518 to_stat(inode
, &entry
)
521 async
fn readdirplus(
523 request
: &mut requests
::ReaddirPlus
,
524 ) -> Result
<Vec
<LookupRef
<'_
>>, Error
> {
525 let mut lookups
= Vec
::new();
526 let offset
= usize::try_from(request
.offset
)
527 .map_err(|_
| io_format_err
!("directory offset out of range"))?
;
529 let dir
= self.open_dir(request
.inode
).await?
;
530 let dir_lookup
= self.get_lookup(request
.inode
)?
;
532 let entry_count
= dir
.read_dir().count() as isize;
534 let mut next
= offset
as isize;
535 let mut iter
= dir
.read_dir().skip(offset
);
536 while let Some(file
) = iter
.next().await
{
538 let file
= file?
.decode_entry().await?
;
539 let stat
= to_stat(to_inode(&file
), &file
)?
;
540 let name
= file
.file_name();
541 match request
.add_entry(name
, &stat
, next
, 1, f64::MAX
, f64::MAX
)?
{
542 ReplyBufState
::Ok
=> (),
543 ReplyBufState
::Full
=> return Ok(lookups
),
545 lookups
.push(self.make_lookup(request
.inode
, stat
.st_ino
, &file
)?
);
548 if next
== entry_count
{
550 let file
= dir
.lookup_self().await?
;
551 let stat
= to_stat(to_inode(&file
), &file
)?
;
552 let name
= OsStr
::new(".");
553 match request
.add_entry(name
, &stat
, next
, 1, f64::MAX
, f64::MAX
)?
{
554 ReplyBufState
::Ok
=> (),
555 ReplyBufState
::Full
=> return Ok(lookups
),
557 lookups
.push(LookupRef
::clone(&dir_lookup
));
560 if next
== entry_count
+ 1 {
562 let lookup
= self.get_lookup(dir_lookup
.parent
)?
;
563 let parent_dir
= self.open_dir(lookup
.inode
).await?
;
564 let file
= parent_dir
.lookup_self().await?
;
565 let stat
= to_stat(to_inode(&file
), &file
)?
;
566 let name
= OsStr
::new("..");
567 match request
.add_entry(name
, &stat
, next
, 1, f64::MAX
, f64::MAX
)?
{
568 ReplyBufState
::Ok
=> (),
569 ReplyBufState
::Full
=> return Ok(lookups
),
571 lookups
.push(lookup
);
577 async
fn read(&self, inode
: u64, len
: usize, offset
: u64) -> Result
<Vec
<u8>, Error
> {
578 let file
= self.get_lookup(inode
)?
;
579 let content
= self.open_content(&file
)?
;
580 let mut buf
= vec
::undefined(len
);
581 let got
= content
.read_at(&mut buf
, offset
).await?
;
586 async
fn readlink(&self, inode
: u64) -> Result
<OsString
, Error
> {
587 let lookup
= self.get_lookup(inode
)?
;
588 let file
= self.open_entry(&lookup
).await?
;
589 match file
.get_symlink() {
590 None
=> io_return
!(libc
::EINVAL
),
591 Some(link
) => Ok(link
.to_owned()),
595 async
fn listxattrs(&self, inode
: u64) -> Result
<Vec
<pxar
::format
::XAttr
>, Error
> {
596 let lookup
= self.get_lookup(inode
)?
;
603 let mut xattrs
= metadata
.xattrs
;
605 use pxar
::format
::XAttr
;
607 if let Some(fcaps
) = metadata
.fcaps
{
608 xattrs
.push(XAttr
::new(xattr
::xattr_name_fcaps().to_bytes(), fcaps
.data
));
611 // TODO: Special cases:
612 // b"system.posix_acl_access
613 // b"system.posix_acl_default
615 // For these we need to be able to create posix acl format entries, at that point we could
616 // just ditch libacl as well...
621 async
fn listxattrs_into(
623 request
: &mut requests
::ListXAttr
,
624 ) -> Result
<ReplyBufState
, Error
> {
625 let xattrs
= self.listxattrs(request
.inode
).await?
;
627 for entry
in xattrs
{
628 match request
.add_c_string(entry
.name()) {
629 ReplyBufState
::Ok
=> (),
630 ReplyBufState
::Full
=> return Ok(ReplyBufState
::Full
),
634 Ok(ReplyBufState
::Ok
)
637 async
fn getxattr(&self, inode
: u64, xattr
: &OsStr
) -> Result
<pxar
::format
::XAttr
, Error
> {
638 // TODO: pxar::Accessor could probably get a more optimized method to fetch a specific
639 // xattr for an entry...
640 let xattrs
= self.listxattrs(inode
).await?
;
641 for entry
in xattrs
{
642 if entry
.name().to_bytes() == xattr
.as_bytes() {
646 io_return
!(libc
::ENODATA
);
651 fn to_entry(entry
: &FileEntry
) -> Result
<EntryParam
, Error
> {
652 to_entry_param(to_inode(&entry
), &entry
)
656 fn to_inode(entry
: &FileEntry
) -> u64 {
658 entry
.entry_range_info().entry_range
.end
660 entry
.entry_range_info().entry_range
.start
| NON_DIRECTORY_INODE
664 fn to_entry_param(inode
: u64, entry
: &pxar
::Entry
) -> Result
<EntryParam
, Error
> {
665 Ok(EntryParam
::simple(inode
, to_stat(inode
, entry
)?
))
668 fn to_stat(inode
: u64, entry
: &pxar
::Entry
) -> Result
<libc
::stat
, Error
> {
669 let nlink
= if entry
.is_dir() { 2 }
else { 1 }
;
671 let metadata
= entry
.metadata();
673 let mut stat
: libc
::stat
= unsafe { mem::zeroed() }
;
675 stat
.st_nlink
= nlink
;
676 stat
.st_mode
= u32::try_from(metadata
.stat
.mode
)
677 .map_err(|err
| format_err
!("mode does not fit into st_mode field: {}", err
))?
;
678 stat
.st_size
= i64::try_from(entry
.file_size().unwrap_or(0))
679 .map_err(|err
| format_err
!("size does not fit into st_size field: {}", err
))?
;
680 stat
.st_uid
= metadata
.stat
.uid
;
681 stat
.st_gid
= metadata
.stat
.gid
;
682 stat
.st_atime
= metadata
.stat
.mtime
.secs
;
683 stat
.st_atime_nsec
= metadata
.stat
.mtime
.nanos
as _
;
684 stat
.st_mtime
= metadata
.stat
.mtime
.secs
;
685 stat
.st_mtime_nsec
= metadata
.stat
.mtime
.nanos
as _
;
686 stat
.st_ctime
= metadata
.stat
.mtime
.secs
;
687 stat
.st_ctime_nsec
= metadata
.stat
.mtime
.nanos
as _
;