1 //! Asynchronous fuse implementation.
3 use std
::collections
::BTreeMap
;
4 use std
::convert
::TryFrom
;
5 use std
::ffi
::{OsStr, OsString}
;
6 use std
::future
::Future
;
10 use std
::os
::unix
::ffi
::OsStrExt
;
13 use std
::sync
::atomic
::{AtomicUsize, Ordering}
;
14 use std
::sync
::{Arc, RwLock}
;
15 use std
::task
::{Context, Poll}
;
17 use anyhow
::{format_err, Error}
;
18 use futures
::channel
::mpsc
::UnboundedSender
;
20 use futures
::sink
::SinkExt
;
21 use futures
::stream
::{StreamExt, TryStreamExt}
;
24 use pxar
::accessor
::{self, EntryRangeInfo, ReadAt}
;
26 use proxmox_fuse
::requests
::{self, FuseRequest}
;
27 use proxmox_fuse
::{EntryParam, Fuse, ReplyBufState, Request, ROOT_ID}
;
28 use proxmox_lang
::io_format_err
;
29 use proxmox_sys
::fs
::xattr
;
31 /// We mark inodes for regular files this way so we know how to access them.
32 const NON_DIRECTORY_INODE
: u64 = 1u64 << 63;
35 fn is_dir_inode(inode
: u64) -> bool
{
36 0 == (inode
& NON_DIRECTORY_INODE
)
39 /// Our reader type instance used for accessors.
40 pub type Reader
= Arc
<dyn ReadAt
+ Send
+ Sync
+ '
static>;
42 /// Our Accessor type instance.
43 pub type Accessor
= accessor
::aio
::Accessor
<Reader
>;
45 /// Our Directory type instance.
46 pub type Directory
= accessor
::aio
::Directory
<Reader
>;
48 /// Our FileEntry type instance.
49 pub type FileEntry
= accessor
::aio
::FileEntry
<Reader
>;
51 /// Our FileContents type instance.
52 pub type FileContents
= accessor
::aio
::FileContents
<Reader
>;
55 fut
: Pin
<Box
<dyn Future
<Output
= Result
<(), Error
>> + Send
+ Sync
+ '
static>>,
59 /// Create a fuse session for an archive.
60 pub async
fn mount_path(
65 ) -> Result
<Self, Error
> {
66 // TODO: Add a buffered/caching ReadAt layer?
67 let file
= std
::fs
::File
::open(archive_path
)?
;
68 let file_size
= file
.metadata()?
.len();
69 let reader
: Reader
= Arc
::new(accessor
::sync
::FileReader
::new(file
));
70 let accessor
= Accessor
::new(reader
, file_size
).await?
;
71 Self::mount(accessor
, options
, verbose
, mountpoint
)
74 /// Create a new fuse session for the given pxar `Accessor`.
80 ) -> Result
<Self, Error
> {
81 let fuse
= Fuse
::builder("pxar-mount")?
91 let session
= SessionImpl
::new(accessor
, verbose
);
94 fut
: Box
::pin(session
.main(fuse
)),
99 impl Future
for Session
{
100 type Output
= Result
<(), Error
>;
102 fn poll(mut self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Self::Output
> {
103 Pin
::new(&mut self.fut
).poll(cx
)
107 /// We use this to return an errno value back to the kernel.
108 macro_rules
! io_return
{
110 return Err(::std
::io
::Error
::from_raw_os_error($errno
).into());
114 /// This is what we need to cache as a "lookup" entry. The kernel assumes that these are easily
121 entry_range_info
: EntryRangeInfo
,
122 content_range
: Option
<Range
<u64>>,
129 entry_range_info
: EntryRangeInfo
,
130 content_range
: Option
<Range
<u64>>,
133 refs
: AtomicUsize
::new(1),
141 /// Decrease the reference count by `count`. Note that this must not include the reference held
142 /// by `self` itself, so this must not decrease the count below 2.
143 fn forget(&self, count
: usize) -> Result
<(), Error
> {
145 let old
= self.refs
.load(Ordering
::Acquire
);
147 // We use this to bail out of a functionin an unexpected error case. This will cause the fuse
148 // request to be answered with a generic `EIO` error code. The error message contained in here
149 // will be printed to stdout if the verbose flag is used, otherwise silently dropped.
150 return Err(io_format_err
!("reference count underflow").into());
152 let new
= old
- count
;
155 .compare_exchange(old
, new
, Ordering
::SeqCst
, Ordering
::SeqCst
)
157 Ok(_
) => break Ok(()),
163 fn get_ref
<'a
>(&self, session
: &'a SessionImpl
) -> LookupRef
<'a
> {
164 if self.refs
.fetch_add(1, Ordering
::AcqRel
) == 0 {
165 panic
!("atomic refcount increased from 0 to 1");
170 lookup
: self as *const Lookup
,
175 struct LookupRef
<'a
> {
176 session
: &'a SessionImpl
,
177 lookup
: *const Lookup
,
180 unsafe impl<'a
> Send
for LookupRef
<'a
> {}
181 unsafe impl<'a
> Sync
for LookupRef
<'a
> {}
183 impl<'a
> Clone
for LookupRef
<'a
> {
184 fn clone(&self) -> Self {
185 self.get_ref(self.session
)
189 impl<'a
> std
::ops
::Deref
for LookupRef
<'a
> {
190 type Target
= Lookup
;
192 fn deref(&self) -> &Self::Target
{
193 unsafe { &*self.lookup }
197 impl<'a
> Drop
for LookupRef
<'a
> {
199 if self.lookup
.is_null() {
203 if self.refs
.fetch_sub(1, Ordering
::AcqRel
) == 1 {
204 let inode
= self.inode
;
205 drop(self.session
.lookups
.write().unwrap().remove(&inode
));
210 impl<'a
> LookupRef
<'a
> {
211 fn leak(mut self) -> &'a Lookup
{
212 unsafe { &*mem::replace(&mut self.lookup, std::ptr::null()) }
219 lookups
: RwLock
<BTreeMap
<u64, Box
<Lookup
>>>,
223 fn new(accessor
: Accessor
, verbose
: bool
) -> Self {
224 let root
= Lookup
::new(
227 EntryRangeInfo
::toplevel(0..accessor
.size()),
231 let mut tree
= BTreeMap
::new();
232 tree
.insert(ROOT_ID
, root
);
237 lookups
: RwLock
::new(tree
),
241 /// Here's how we deal with errors:
243 /// Any error will be printed if the verbose flag was set, otherwise the message will be
244 /// silently dropped.
246 /// Opaque errors will cause the fuse main loop to bail out with that error.
248 /// `io::Error`s will cause the fuse request to responded to with the given `io::Error`. An
249 /// `io::ErrorKind::Other` translates to a generic `EIO`.
252 request
: impl FuseRequest
,
254 mut sender
: UnboundedSender
<Error
>,
256 let final_result
= match err
.downcast
::<io
::Error
>() {
258 if err
.kind() == io
::ErrorKind
::Other
&& self.verbose
{
259 eprintln
!("an IO error occurred: {}", err
);
263 request
.io_fail(err
).map_err(Error
::from
)
266 // `bail` (non-`io::Error`) is used for fatal errors which should actually cancel:
268 eprintln
!("internal error: {}, bailing out", err
);
273 if let Err(err
) = final_result
{
274 // either we failed to send the error code to fuse, or the above was not an
275 // `io::Error`, so in this case notify the main loop:
279 .expect("failed to propagate error to main loop");
283 async
fn main(self, fuse
: Fuse
) -> Result
<(), Error
> {
284 Arc
::new(self).main_do(fuse
).await
287 async
fn main_do(self: Arc
<Self>, fuse
: Fuse
) -> Result
<(), Error
> {
288 let (err_send
, mut err_recv
) = futures
::channel
::mpsc
::unbounded
::<Error
>();
289 let mut fuse
= fuse
.fuse(); // make this a futures::stream::FusedStream!
292 request
= fuse
.try_next() => match request?
{
294 tokio
::spawn(Arc
::clone(&self).handle_request(request
, err_send
.clone()));
298 err
= err_recv
.next() => match err
{
299 Some(err
) => if self.verbose
{
300 eprintln
!("cancelling fuse main loop due to error: {}", err
);
303 None
=> panic
!("error channel was closed unexpectedly"),
310 async
fn handle_request(
313 mut err_sender
: UnboundedSender
<Error
>,
315 let result
: Result
<(), Error
> = match request
{
316 Request
::Lookup(request
) => {
317 match self.lookup(request
.parent
, &request
.file_name
).await
{
318 Ok((entry
, lookup
)) => match request
.reply(&entry
) {
323 Err(err
) => Err(Error
::from(err
)),
325 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
328 Request
::Forget(request
) => match self.forget(request
.inode
, request
.count
as usize) {
333 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
335 Request
::Getattr(request
) => match self.getattr(request
.inode
).await
{
336 Ok(stat
) => request
.reply(&stat
, f64::MAX
).map_err(Error
::from
),
337 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
339 Request
::ReaddirPlus(mut request
) => match self.readdirplus(&mut request
).await
{
340 Ok(lookups
) => match request
.reply() {
347 Err(err
) => Err(Error
::from(err
)),
349 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
351 Request
::Read(request
) => {
352 match self.read(request
.inode
, request
.size
, request
.offset
).await
{
353 Ok(data
) => request
.reply(&data
).map_err(Error
::from
),
354 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
357 Request
::Readlink(request
) => match self.readlink(request
.inode
).await
{
358 Ok(data
) => request
.reply(&data
).map_err(Error
::from
),
359 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
361 Request
::ListXAttrSize(request
) => match self.listxattrs(request
.inode
).await
{
365 .fold(0, |sum
, i
| sum
+ i
.name().to_bytes_with_nul().len()),
367 .map_err(Error
::from
),
368 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
370 Request
::ListXAttr(mut request
) => match self.listxattrs_into(&mut request
).await
{
371 Ok(ReplyBufState
::Ok
) => request
.reply().map_err(Error
::from
),
372 Ok(ReplyBufState
::Full
) => request
.fail_full().map_err(Error
::from
),
373 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
375 Request
::GetXAttrSize(request
) => {
376 match self.getxattr(request
.inode
, &request
.attr_name
).await
{
377 Ok(xattr
) => request
.reply(xattr
.value().len()).map_err(Error
::from
),
378 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
381 Request
::GetXAttr(request
) => {
382 match self.getxattr(request
.inode
, &request
.attr_name
).await
{
383 Ok(xattr
) => request
.reply(xattr
.value()).map_err(Error
::from
),
384 Err(err
) => return self.handle_err(request
, err
, err_sender
).await
,
389 eprintln
!("Received unexpected fuse request");
391 other
.fail(libc
::ENOSYS
).map_err(Error
::from
)
395 if let Err(err
) = result
{
399 .expect("failed to propagate error to main loop");
403 fn get_lookup(&self, inode
: u64) -> Result
<LookupRef
, Error
> {
404 let lookups
= self.lookups
.read().unwrap();
405 if let Some(lookup
) = lookups
.get(&inode
) {
406 return Ok(lookup
.get_ref(self));
408 io_return
!(libc
::ENOENT
);
411 async
fn open_dir(&self, inode
: u64) -> Result
<Directory
, Error
> {
412 if inode
== ROOT_ID
{
413 Ok(self.accessor
.open_root().await?
)
414 } else if !is_dir_inode(inode
) {
415 io_return
!(libc
::ENOTDIR
);
417 Ok(unsafe { self.accessor.open_dir_at_end(inode).await? }
)
421 async
fn open_entry(&self, lookup
: &LookupRef
<'_
>) -> io
::Result
<FileEntry
> {
424 .open_file_at_range(&lookup
.entry_range_info
)
429 fn open_content(&self, lookup
: &LookupRef
) -> Result
<FileContents
, Error
> {
430 if is_dir_inode(lookup
.inode
) {
431 io_return
!(libc
::EISDIR
);
434 match lookup
.content_range
.clone() {
435 Some(range
) => Ok(unsafe { self.accessor.open_contents_at_range(range) }
),
436 None
=> io_return
!(libc
::EBADF
),
440 fn make_lookup(&self, parent
: u64, inode
: u64, entry
: &FileEntry
) -> Result
<LookupRef
, Error
> {
441 let lookups
= self.lookups
.read().unwrap();
442 if let Some(lookup
) = lookups
.get(&inode
) {
443 return Ok(lookup
.get_ref(self));
447 let entry
= Lookup
::new(
450 entry
.entry_range_info().clone(),
451 entry
.content_range()?
,
453 let reference
= entry
.get_ref(self);
454 entry
.refs
.store(1, Ordering
::Release
);
456 let mut lookups
= self.lookups
.write().unwrap();
457 if let Some(lookup
) = lookups
.get(&inode
) {
458 return Ok(lookup
.get_ref(self));
461 lookups
.insert(inode
, entry
);
466 fn forget(&self, inode
: u64, count
: usize) -> Result
<(), Error
> {
467 let node
= self.get_lookup(inode
)?
;
476 ) -> Result
<(EntryParam
, LookupRef
<'_
>), Error
> {
477 let dir
= self.open_dir(parent
).await?
;
479 let entry
= match { dir }
.lookup(file_name
).await?
{
480 Some(entry
) => entry
,
481 None
=> io_return
!(libc
::ENOENT
),
484 let entry
= if let pxar
::EntryKind
::Hardlink(_
) = entry
.kind() {
485 // we don't know the file's end-offset, so we'll just allow the decoder to decode the
486 // entire rest of the archive until we figure out something better...
487 let entry
= self.accessor
.follow_hardlink(&entry
).await?
;
489 if let pxar
::EntryKind
::Hardlink(_
) = entry
.kind() {
490 // hardlinks must not point to other hardlinks...
491 io_return
!(libc
::ELOOP
);
499 let response
= to_entry(&entry
)?
;
500 let inode
= response
.inode
;
501 Ok((response
, self.make_lookup(parent
, inode
, &entry
)?
))
504 async
fn getattr(&self, inode
: u64) -> Result
<libc
::stat
, Error
> {
506 self.accessor
.open_file_at_range(&self.get_lookup(inode
)?
.entry_range_info
).await?
508 to_stat(inode
, &entry
)
511 async
fn readdirplus(
513 request
: &mut requests
::ReaddirPlus
,
514 ) -> Result
<Vec
<LookupRef
<'_
>>, Error
> {
515 let mut lookups
= Vec
::new();
516 let offset
= usize::try_from(request
.offset
)
517 .map_err(|_
| io_format_err
!("directory offset out of range"))?
;
519 let dir
= self.open_dir(request
.inode
).await?
;
520 let dir_lookup
= self.get_lookup(request
.inode
)?
;
522 let entry_count
= dir
.read_dir().count() as isize;
524 let mut next
= offset
as isize;
525 let mut iter
= dir
.read_dir().skip(offset
);
526 while let Some(file
) = iter
.next().await
{
528 let file
= file?
.decode_entry().await?
;
529 let stat
= to_stat(to_inode(&file
), &file
)?
;
530 let name
= file
.file_name();
531 match request
.add_entry(name
, &stat
, next
, 1, f64::MAX
, f64::MAX
)?
{
532 ReplyBufState
::Ok
=> (),
533 ReplyBufState
::Full
=> return Ok(lookups
),
535 lookups
.push(self.make_lookup(request
.inode
, stat
.st_ino
, &file
)?
);
538 if next
== entry_count
{
540 let file
= dir
.lookup_self().await?
;
541 let stat
= to_stat(to_inode(&file
), &file
)?
;
542 let name
= OsStr
::new(".");
543 match request
.add_entry(name
, &stat
, next
, 1, f64::MAX
, f64::MAX
)?
{
544 ReplyBufState
::Ok
=> (),
545 ReplyBufState
::Full
=> return Ok(lookups
),
547 lookups
.push(LookupRef
::clone(&dir_lookup
));
550 if next
== entry_count
+ 1 {
552 let lookup
= self.get_lookup(dir_lookup
.parent
)?
;
553 let parent_dir
= self.open_dir(lookup
.inode
).await?
;
554 let file
= parent_dir
.lookup_self().await?
;
555 let stat
= to_stat(to_inode(&file
), &file
)?
;
556 let name
= OsStr
::new("..");
557 match request
.add_entry(name
, &stat
, next
, 1, f64::MAX
, f64::MAX
)?
{
558 ReplyBufState
::Ok
=> (),
559 ReplyBufState
::Full
=> return Ok(lookups
),
561 lookups
.push(lookup
);
567 async
fn read(&self, inode
: u64, len
: usize, offset
: u64) -> Result
<Vec
<u8>, Error
> {
568 let file
= self.get_lookup(inode
)?
;
569 let content
= self.open_content(&file
)?
;
570 let mut buf
= vec
::undefined(len
);
571 let got
= content
.read_at(&mut buf
, offset
).await?
;
576 async
fn readlink(&self, inode
: u64) -> Result
<OsString
, Error
> {
577 let lookup
= self.get_lookup(inode
)?
;
578 let file
= self.open_entry(&lookup
).await?
;
579 match file
.get_symlink() {
580 None
=> io_return
!(libc
::EINVAL
),
581 Some(link
) => Ok(link
.to_owned()),
585 async
fn listxattrs(&self, inode
: u64) -> Result
<Vec
<pxar
::format
::XAttr
>, Error
> {
586 let lookup
= self.get_lookup(inode
)?
;
593 let mut xattrs
= metadata
.xattrs
;
595 use pxar
::format
::XAttr
;
597 if let Some(fcaps
) = metadata
.fcaps
{
598 xattrs
.push(XAttr
::new(xattr
::xattr_name_fcaps().to_bytes(), fcaps
.data
));
601 // TODO: Special cases:
602 // b"system.posix_acl_access
603 // b"system.posix_acl_default
605 // For these we need to be able to create posix acl format entries, at that point we could
606 // just ditch libacl as well...
611 async
fn listxattrs_into(
613 request
: &mut requests
::ListXAttr
,
614 ) -> Result
<ReplyBufState
, Error
> {
615 let xattrs
= self.listxattrs(request
.inode
).await?
;
617 for entry
in xattrs
{
618 match request
.add_c_string(entry
.name()) {
619 ReplyBufState
::Ok
=> (),
620 ReplyBufState
::Full
=> return Ok(ReplyBufState
::Full
),
624 Ok(ReplyBufState
::Ok
)
627 async
fn getxattr(&self, inode
: u64, xattr
: &OsStr
) -> Result
<pxar
::format
::XAttr
, Error
> {
628 // TODO: pxar::Accessor could probably get a more optimized method to fetch a specific
629 // xattr for an entry...
630 let xattrs
= self.listxattrs(inode
).await?
;
631 for entry
in xattrs
{
632 if entry
.name().to_bytes() == xattr
.as_bytes() {
636 io_return
!(libc
::ENODATA
);
641 fn to_entry(entry
: &FileEntry
) -> Result
<EntryParam
, Error
> {
642 to_entry_param(to_inode(entry
), entry
)
646 fn to_inode(entry
: &FileEntry
) -> u64 {
648 entry
.entry_range_info().entry_range
.end
650 entry
.entry_range_info().entry_range
.start
| NON_DIRECTORY_INODE
654 fn to_entry_param(inode
: u64, entry
: &pxar
::Entry
) -> Result
<EntryParam
, Error
> {
655 Ok(EntryParam
::simple(inode
, to_stat(inode
, entry
)?
))
658 fn to_stat(inode
: u64, entry
: &pxar
::Entry
) -> Result
<libc
::stat
, Error
> {
659 let nlink
= if entry
.is_dir() { 2 }
else { 1 }
;
661 let metadata
= entry
.metadata();
663 let mut stat
: libc
::stat
= unsafe { mem::zeroed() }
;
665 stat
.st_nlink
= nlink
;
666 stat
.st_mode
= u32::try_from(metadata
.stat
.mode
)
667 .map_err(|err
| format_err
!("mode does not fit into st_mode field: {}", err
))?
;
668 stat
.st_size
= i64::try_from(entry
.file_size().unwrap_or(0))
669 .map_err(|err
| format_err
!("size does not fit into st_size field: {}", err
))?
;
670 stat
.st_uid
= metadata
.stat
.uid
;
671 stat
.st_gid
= metadata
.stat
.gid
;
672 stat
.st_atime
= metadata
.stat
.mtime
.secs
;
673 stat
.st_atime_nsec
= metadata
.stat
.mtime
.nanos
as _
;
674 stat
.st_mtime
= metadata
.stat
.mtime
.secs
;
675 stat
.st_mtime_nsec
= metadata
.stat
.mtime
.nanos
as _
;
676 stat
.st_ctime
= metadata
.stat
.mtime
.secs
;
677 stat
.st_ctime_nsec
= metadata
.stat
.mtime
.nanos
as _
;