1 //! Asynchronous fuse implementation.
3 use std
::collections
::BTreeMap
;
4 use std
::convert
::TryFrom
;
5 use std
::ffi
::{OsStr, OsString}
;
6 use std
::future
::Future
;
10 use std
::os
::unix
::ffi
::OsStrExt
;
13 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
14 use std
::sync
::{Arc, RwLock}
;
15 use std
::task
::{Context, Poll}
;
17 use anyhow
::{format_err, Error}
;
18 use futures
::channel
::mpsc
::UnboundedSender
;
20 use futures
::sink
::SinkExt
;
21 use futures
::stream
::{StreamExt, TryStreamExt}
;
23 use proxmox
::tools
::vec
;
24 use pxar
::accessor
::{self, EntryRangeInfo, ReadAt}
;
26 use proxmox_fuse
::requests
::{self, FuseRequest}
;
27 use proxmox_fuse
::{EntryParam, Fuse, ReplyBufState, Request, ROOT_ID}
;
29 use crate::tools
::xattr
;
31 /// We mark inodes for regular files this way so we know how to access them.
32 const NON_DIRECTORY_INODE
: u64 = 1u64 << 63;
35 fn is_dir_inode(inode
: u64) -> bool
{
36 0 == (inode
& NON_DIRECTORY_INODE
)
39 /// Our reader type instance used for accessors.
40 pub type Reader
= Arc
<dyn ReadAt
+ Send
+ Sync
+ '
static>;
42 /// Our Accessor type instance.
43 pub type Accessor
= accessor
::aio
::Accessor
<Reader
>;
45 /// Our Directory type instance.
46 pub type Directory
= accessor
::aio
::Directory
<Reader
>;
48 /// Our FileEntry type instance.
49 pub type FileEntry
= accessor
::aio
::FileEntry
<Reader
>;
51 /// Our FileContents type instance.
52 pub type FileContents
= accessor
::aio
::FileContents
<Reader
>;
55 fut
: Pin
<Box
<dyn Future
<Output
= Result
<(), Error
>> + Send
+ Sync
+ '
static>>,
59 /// Create a fuse session for an archive.
60 pub async
fn mount_path(
65 ) -> Result
<Self, Error
> {
66 // TODO: Add a buffered/caching ReadAt layer?
67 let file
= std
::fs
::File
::open(archive_path
)?
;
68 let file_size
= file
.metadata()?
.len();
69 let reader
: Reader
= Arc
::new(accessor
::sync
::FileReader
::new(file
));
70 let accessor
= Accessor
::new(reader
, file_size
).await?
;
71 Self::mount(accessor
, options
, verbose
, mountpoint
)
74 /// Create a new fuse session for the given pxar `Accessor`.
80 ) -> Result
<Self, Error
> {
81 let fuse
= Fuse
::builder("pxar-mount")?
91 let session
= SessionImpl
::new(accessor
, verbose
);
94 fut
: Box
::pin(session
.main(fuse
)),
99 impl Future
for Session
{
100 type Output
= Result
<(), Error
>;
102 fn poll(mut self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Self::Output
> {
103 Pin
::new(&mut self.fut
).poll(cx
)
107 /// We use this to return an errno value back to the kernel.
108 macro_rules
! io_return
{
110 return Err(::std
::io
::Error
::from_raw_os_error($errno
).into());
114 /// Format an "other" error, see `io_bail` below for details.
115 macro_rules
! io_format_err
{
117 ::std
::io
::Error
::new(::std
::io
::ErrorKind
::Other
, format
!($
($fmt
)*))
121 /// We use this to bail out of a functionin an unexpected error case. This will cause the fuse
122 /// request to be answered with a generic `EIO` error code. The error message contained in here
123 /// will be printed to stdout if the verbose flag is used, otherwise silently dropped.
124 macro_rules
! io_bail
{
125 ($
($fmt
:tt
)*) => { return Err(io_format_err!($($fmt)*).into()); }
128 /// This is what we need to cache as a "lookup" entry. The kernel assumes that these are easily
135 entry_range_info
: EntryRangeInfo
,
136 content_range
: Option
<Range
<u64>>,
143 entry_range_info
: EntryRangeInfo
,
144 content_range
: Option
<Range
<u64>>,
147 refs
: AtomicUsize
::new(1),
155 /// Decrease the reference count by `count`. Note that this must not include the reference held
156 /// by `self` itself, so this must not decrease the count below 2.
157 fn forget(&self, count
: usize) -> Result
<(), Error
> {
159 let old
= self.refs
.load(Ordering
::Acquire
);
161 io_bail
!("reference count underflow");
163 let new
= old
- count
;
166 .compare_exchange(old
, new
, Ordering
::SeqCst
, Ordering
::SeqCst
)
168 Ok(_
) => break Ok(()),
174 fn get_ref
<'a
>(&self, session
: &'a SessionImpl
) -> LookupRef
<'a
> {
175 if self.refs
.fetch_add(1, Ordering
::AcqRel
) == 0 {
176 panic
!("atomic refcount increased from 0 to 1");
181 lookup
: self as *const Lookup
,
186 struct LookupRef
<'a
> {
187 session
: &'a SessionImpl
,
188 lookup
: *const Lookup
,
191 unsafe impl<'a
> Send
for LookupRef
<'a
> {}
192 unsafe impl<'a
> Sync
for LookupRef
<'a
> {}
194 impl<'a
> Clone
for LookupRef
<'a
> {
195 fn clone(&self) -> Self {
196 self.get_ref(self.session
)
200 impl<'a
> std
::ops
::Deref
for LookupRef
<'a
> {
201 type Target
= Lookup
;
203 fn deref(&self) -> &Self::Target
{
204 unsafe { &*self.lookup }
208 impl<'a
> Drop
for LookupRef
<'a
> {
210 if self.lookup
.is_null() {
214 if self.refs
.fetch_sub(1, Ordering
::AcqRel
) == 1 {
215 let inode
= self.inode
;
216 drop(self.session
.lookups
.write().unwrap().remove(&inode
));
221 impl<'a
> LookupRef
<'a
> {
222 fn leak(mut self) -> &'a Lookup
{
223 unsafe { &*mem::replace(&mut self.lookup, std::ptr::null()) }
230 lookups
: RwLock
<BTreeMap
<u64, Box
<Lookup
>>>,
234 fn new(accessor
: Accessor
, verbose
: bool
) -> Self {
235 let root
= Lookup
::new(
238 EntryRangeInfo
::toplevel(0..accessor
.size()),
242 let mut tree
= BTreeMap
::new();
243 tree
.insert(ROOT_ID
, root
);
248 lookups
: RwLock
::new(tree
),
252 /// Here's how we deal with errors:
254 /// Any error will be printed if the verbose flag was set, otherwise the message will be
255 /// silently dropped.
257 /// Opaque errors will cause the fuse main loop to bail out with that error.
259 /// `io::Error`s will cause the fuse request to responded to with the given `io::Error`. An
260 /// `io::ErrorKind::Other` translates to a generic `EIO`.
263 request
: impl FuseRequest
,
265 mut sender
: UnboundedSender
<Error
>,
267 let final_result
= match err
.downcast
::<io
::Error
>() {
269 if err
.kind() == io
::ErrorKind
::Other
{
271 eprintln
!("an IO error occurred: {}", err
);
276 request
.io_fail(err
).map_err(Error
::from
)
279 // `bail` (non-`io::Error`) is used for fatal errors which should actually cancel:
281 eprintln
!("internal error: {}, bailing out", err
);
286 if let Err(err
) = final_result
{
287 // either we failed to send the error code to fuse, or the above was not an
288 // `io::Error`, so in this case notify the main loop:
292 .expect("failed to propagate error to main loop");
296 async
fn main(self, fuse
: Fuse
) -> Result
<(), Error
> {
297 Arc
::new(self).main_do(fuse
).await
300 async
fn main_do(self: Arc
<Self>, fuse
: Fuse
) -> Result
<(), Error
> {
301 let (err_send
, mut err_recv
) = futures
::channel
::mpsc
::unbounded
::<Error
>();
302 let mut fuse
= fuse
.fuse(); // make this a futures::stream::FusedStream!
305 request
= fuse
.try_next() => match request?
{
307 tokio
::spawn(Arc
::clone(&self).handle_request(request
, err_send
.clone()));
311 err
= err_recv
.next() => match err
{
312 Some(err
) => if self.verbose
{
313 eprintln
!("cancelling fuse main loop due to error: {}", err
);
316 None
=> panic
!("error channel was closed unexpectedly"),
323 async
fn handle_request(
326 mut err_sender
: UnboundedSender
<Error
>,
328 let result
: Result
<(), Error
> = match request
{
329 Request
::Lookup(request
) => {
330 match self.lookup(request
.parent
, &request
.file_name
).await
{
331 Ok((entry
, lookup
)) => match request
.reply(&entry
) {
336 Err(err
) => Err(Error
::from(err
)),
338 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
341 Request
::Forget(request
) => match self.forget(request
.inode
, request
.count
as usize) {
346 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
348 Request
::Getattr(request
) => match self.getattr(request
.inode
).await
{
349 Ok(stat
) => request
.reply(&stat
, std
::f64::MAX
).map_err(Error
::from
),
350 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
352 Request
::ReaddirPlus(mut request
) => match self.readdirplus(&mut request
).await
{
353 Ok(lookups
) => match request
.reply() {
360 Err(err
) => Err(Error
::from(err
)),
362 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
364 Request
::Read(request
) => {
365 match self.read(request
.inode
, request
.size
, request
.offset
).await
{
366 Ok(data
) => request
.reply(&data
).map_err(Error
::from
),
367 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
370 Request
::Readlink(request
) => match self.readlink(request
.inode
).await
{
371 Ok(data
) => request
.reply(&data
).map_err(Error
::from
),
372 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
374 Request
::ListXAttrSize(request
) => match self.listxattrs(request
.inode
).await
{
378 .fold(0, |sum
, i
| sum
+ i
.name().to_bytes_with_nul().len()),
380 .map_err(Error
::from
),
381 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
383 Request
::ListXAttr(mut request
) => match self.listxattrs_into(&mut request
).await
{
384 Ok(ReplyBufState
::Ok
) => request
.reply().map_err(Error
::from
),
385 Ok(ReplyBufState
::Full
) => request
.fail_full().map_err(Error
::from
),
386 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
388 Request
::GetXAttrSize(request
) => {
389 match self.getxattr(request
.inode
, &request
.attr_name
).await
{
390 Ok(xattr
) => request
.reply(xattr
.value().len()).map_err(Error
::from
),
391 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
394 Request
::GetXAttr(request
) => {
395 match self.getxattr(request
.inode
, &request
.attr_name
).await
{
396 Ok(xattr
) => request
.reply(xattr
.value()).map_err(Error
::from
),
397 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
402 eprintln
!("Received unexpected fuse request");
404 other
.fail(libc
::ENOSYS
).map_err(Error
::from
)
408 if let Err(err
) = result
{
412 .expect("failed to propagate error to main loop");
416 fn get_lookup(&self, inode
: u64) -> Result
<LookupRef
, Error
> {
417 let lookups
= self.lookups
.read().unwrap();
418 if let Some(lookup
) = lookups
.get(&inode
) {
419 return Ok(lookup
.get_ref(self));
421 io_return
!(libc
::ENOENT
);
424 async
fn open_dir(&self, inode
: u64) -> Result
<Directory
, Error
> {
425 if inode
== ROOT_ID
{
426 Ok(self.accessor
.open_root().await?
)
427 } else if !is_dir_inode(inode
) {
428 io_return
!(libc
::ENOTDIR
);
430 Ok(unsafe { self.accessor.open_dir_at_end(inode).await? }
)
434 async
fn open_entry(&self, lookup
: &LookupRef
<'_
>) -> io
::Result
<FileEntry
> {
437 .open_file_at_range(&lookup
.entry_range_info
)
442 fn open_content(&self, lookup
: &LookupRef
) -> Result
<FileContents
, Error
> {
443 if is_dir_inode(lookup
.inode
) {
444 io_return
!(libc
::EISDIR
);
447 match lookup
.content_range
.clone() {
448 Some(range
) => Ok(unsafe { self.accessor.open_contents_at_range(range) }
),
449 None
=> io_return
!(libc
::EBADF
),
453 fn make_lookup(&self, parent
: u64, inode
: u64, entry
: &FileEntry
) -> Result
<LookupRef
, Error
> {
454 let lookups
= self.lookups
.read().unwrap();
455 if let Some(lookup
) = lookups
.get(&inode
) {
456 return Ok(lookup
.get_ref(self));
460 let entry
= Lookup
::new(
463 entry
.entry_range_info().clone(),
464 entry
.content_range()?
,
466 let reference
= entry
.get_ref(self);
467 entry
.refs
.store(1, Ordering
::Release
);
469 let mut lookups
= self.lookups
.write().unwrap();
470 if let Some(lookup
) = lookups
.get(&inode
) {
471 return Ok(lookup
.get_ref(self));
474 lookups
.insert(inode
, entry
);
479 fn forget(&self, inode
: u64, count
: usize) -> Result
<(), Error
> {
480 let node
= self.get_lookup(inode
)?
;
489 ) -> Result
<(EntryParam
, LookupRef
<'a
>), Error
> {
490 let dir
= self.open_dir(parent
).await?
;
492 let entry
= match { dir }
.lookup(file_name
).await?
{
493 Some(entry
) => entry
,
494 None
=> io_return
!(libc
::ENOENT
),
497 let entry
= if let pxar
::EntryKind
::Hardlink(_
) = entry
.kind() {
498 // we don't know the file's end-offset, so we'll just allow the decoder to decode the
499 // entire rest of the archive until we figure out something better...
500 let entry
= self.accessor
.follow_hardlink(&entry
).await?
;
502 if let pxar
::EntryKind
::Hardlink(_
) = entry
.kind() {
503 // hardlinks must not point to other hardlinks...
504 io_return
!(libc
::ELOOP
);
512 let response
= to_entry(&entry
)?
;
513 let inode
= response
.inode
;
514 Ok((response
, self.make_lookup(parent
, inode
, &entry
)?
))
517 async
fn getattr(&self, inode
: u64) -> Result
<libc
::stat
, Error
> {
519 self.accessor
.open_file_at_range(&self.get_lookup(inode
)?
.entry_range_info
).await?
521 to_stat(inode
, &entry
)
524 async
fn readdirplus
<'a
>(
526 request
: &mut requests
::ReaddirPlus
,
527 ) -> Result
<Vec
<LookupRef
<'a
>>, Error
> {
528 let mut lookups
= Vec
::new();
529 let offset
= usize::try_from(request
.offset
)
530 .map_err(|_
| io_format_err
!("directory offset out of range"))?
;
532 let dir
= self.open_dir(request
.inode
).await?
;
533 let dir_lookup
= self.get_lookup(request
.inode
)?
;
535 let entry_count
= dir
.read_dir().count() as isize;
537 let mut next
= offset
as isize;
538 let mut iter
= dir
.read_dir().skip(offset
);
539 while let Some(file
) = iter
.next().await
{
541 let file
= file?
.decode_entry().await?
;
542 let stat
= to_stat(to_inode(&file
), &file
)?
;
543 let name
= file
.file_name();
544 match request
.add_entry(name
, &stat
, next
, 1, std
::f64::MAX
, std
::f64::MAX
)?
{
545 ReplyBufState
::Ok
=> (),
546 ReplyBufState
::Full
=> return Ok(lookups
),
548 lookups
.push(self.make_lookup(request
.inode
, stat
.st_ino
, &file
)?
);
551 if next
== entry_count
{
553 let file
= dir
.lookup_self().await?
;
554 let stat
= to_stat(to_inode(&file
), &file
)?
;
555 let name
= OsStr
::new(".");
556 match request
.add_entry(name
, &stat
, next
, 1, std
::f64::MAX
, std
::f64::MAX
)?
{
557 ReplyBufState
::Ok
=> (),
558 ReplyBufState
::Full
=> return Ok(lookups
),
560 lookups
.push(LookupRef
::clone(&dir_lookup
));
563 if next
== entry_count
+ 1 {
565 let lookup
= self.get_lookup(dir_lookup
.parent
)?
;
566 let parent_dir
= self.open_dir(lookup
.inode
).await?
;
567 let file
= parent_dir
.lookup_self().await?
;
568 let stat
= to_stat(to_inode(&file
), &file
)?
;
569 let name
= OsStr
::new("..");
570 match request
.add_entry(name
, &stat
, next
, 1, std
::f64::MAX
, std
::f64::MAX
)?
{
571 ReplyBufState
::Ok
=> (),
572 ReplyBufState
::Full
=> return Ok(lookups
),
574 lookups
.push(lookup
);
580 async
fn read(&self, inode
: u64, len
: usize, offset
: u64) -> Result
<Vec
<u8>, Error
> {
581 let file
= self.get_lookup(inode
)?
;
582 let content
= self.open_content(&file
)?
;
583 let mut buf
= vec
::undefined(len
);
584 let got
= content
.read_at(&mut buf
, offset
).await?
;
589 async
fn readlink(&self, inode
: u64) -> Result
<OsString
, Error
> {
590 let lookup
= self.get_lookup(inode
)?
;
591 let file
= self.open_entry(&lookup
).await?
;
592 match file
.get_symlink() {
593 None
=> io_return
!(libc
::EINVAL
),
594 Some(link
) => Ok(link
.to_owned()),
598 async
fn listxattrs(&self, inode
: u64) -> Result
<Vec
<pxar
::format
::XAttr
>, Error
> {
599 let lookup
= self.get_lookup(inode
)?
;
606 let mut xattrs
= metadata
.xattrs
;
608 use pxar
::format
::XAttr
;
610 if let Some(fcaps
) = metadata
.fcaps
{
611 xattrs
.push(XAttr
::new(xattr
::xattr_name_fcaps().to_bytes(), fcaps
.data
));
614 // TODO: Special cases:
615 // b"system.posix_acl_access
616 // b"system.posix_acl_default
618 // For these we need to be able to create posix acl format entries, at that point we could
619 // just ditch libacl as well...
624 async
fn listxattrs_into(
626 request
: &mut requests
::ListXAttr
,
627 ) -> Result
<ReplyBufState
, Error
> {
628 let xattrs
= self.listxattrs(request
.inode
).await?
;
630 for entry
in xattrs
{
631 match request
.add_c_string(entry
.name()) {
632 ReplyBufState
::Ok
=> (),
633 ReplyBufState
::Full
=> return Ok(ReplyBufState
::Full
),
637 Ok(ReplyBufState
::Ok
)
640 async
fn getxattr(&self, inode
: u64, xattr
: &OsStr
) -> Result
<pxar
::format
::XAttr
, Error
> {
641 // TODO: pxar::Accessor could probably get a more optimized method to fetch a specific
642 // xattr for an entry...
643 let xattrs
= self.listxattrs(inode
).await?
;
644 for entry
in xattrs
{
645 if entry
.name().to_bytes() == xattr
.as_bytes() {
649 io_return
!(libc
::ENODATA
);
654 fn to_entry(entry
: &FileEntry
) -> Result
<EntryParam
, Error
> {
655 to_entry_param(to_inode(&entry
), &entry
)
659 fn to_inode(entry
: &FileEntry
) -> u64 {
661 entry
.entry_range_info().entry_range
.end
663 entry
.entry_range_info().entry_range
.start
| NON_DIRECTORY_INODE
667 fn to_entry_param(inode
: u64, entry
: &pxar
::Entry
) -> Result
<EntryParam
, Error
> {
668 Ok(EntryParam
::simple(inode
, to_stat(inode
, entry
)?
))
671 fn to_stat(inode
: u64, entry
: &pxar
::Entry
) -> Result
<libc
::stat
, Error
> {
672 let nlink
= if entry
.is_dir() { 2 }
else { 1 }
;
674 let metadata
= entry
.metadata();
676 let mut stat
: libc
::stat
= unsafe { mem::zeroed() }
;
678 stat
.st_nlink
= nlink
;
679 stat
.st_mode
= u32::try_from(metadata
.stat
.mode
)
680 .map_err(|err
| format_err
!("mode does not fit into st_mode field: {}", err
))?
;
681 stat
.st_size
= i64::try_from(entry
.file_size().unwrap_or(0))
682 .map_err(|err
| format_err
!("size does not fit into st_size field: {}", err
))?
;
683 stat
.st_uid
= metadata
.stat
.uid
;
684 stat
.st_gid
= metadata
.stat
.gid
;
685 stat
.st_atime
= metadata
.stat
.mtime
.secs
;
686 stat
.st_atime_nsec
= metadata
.stat
.mtime
.nanos
as _
;
687 stat
.st_mtime
= metadata
.stat
.mtime
.secs
;
688 stat
.st_mtime_nsec
= metadata
.stat
.mtime
.nanos
as _
;
689 stat
.st_ctime
= metadata
.stat
.mtime
.secs
;
690 stat
.st_ctime_nsec
= metadata
.stat
.mtime
.nanos
as _
;