]> git.proxmox.com Git - proxmox-backup.git/commitdiff
move fuse code from pbs-client to pbs-pxar-fuse
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Tue, 6 Dec 2022 08:17:03 +0000 (09:17 +0100)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Tue, 6 Dec 2022 08:17:04 +0000 (09:17 +0100)
it's used by pxar-bin and proxmox-backup-client for
mounting, but pbs-client is used by more (eg. the
proxmox-backup-qemu library which really doesn't need to
pull in any fuse dependencies)

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
13 files changed:
Cargo.toml
Makefile
pbs-client/Cargo.toml
pbs-client/src/catalog_shell.rs
pbs-client/src/pxar/fuse.rs [deleted file]
pbs-client/src/pxar/mod.rs
pbs-pxar-fuse/Cargo.toml [new file with mode: 0644]
pbs-pxar-fuse/src/lib.rs [new file with mode: 0644]
proxmox-backup-client/Cargo.toml
proxmox-backup-client/src/catalog.rs
proxmox-backup-client/src/mount.rs
pxar-bin/Cargo.toml
pxar-bin/src/main.rs

index 38e9c1f2db75b8cf7d0ae531fa879d1644f48768..de83e1fa3727c23ba0cfea4bfb12f25875c76327 100644 (file)
@@ -25,7 +25,7 @@ members = [
     "pbs-config",
     "pbs-datastore",
     "pbs-fuse-loop",
-    "proxmox-rrd",
+    "pbs-pxar-fuse",
     "pbs-tape",
     "pbs-tools",
 
@@ -33,6 +33,8 @@ members = [
     "proxmox-backup-client",
     "proxmox-file-restore",
     "proxmox-restore-daemon",
+    "proxmox-rrd",
+
     "pxar-bin",
 ]
 
index 87b5ca01d3543a0305486162cf852b8dff7bd965..2b6377dd6f63f77b8e24545287f8ee9f9e240d92 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -38,13 +38,14 @@ SUBCRATES := \
        pbs-config \
        pbs-datastore \
        pbs-fuse-loop \
-       proxmox-rrd \
+       pbs-pxar-fuse \
        pbs-tape \
        pbs-tools \
        proxmox-backup-banner \
        proxmox-backup-client \
        proxmox-file-restore \
        proxmox-restore-daemon \
+       proxmox-rrd \
        pxar-bin
 
 ifeq ($(BUILD_MODE), release)
index a288e0e75aa42474ea150ead66bafd68ffa42459..40b6bc991e8590bd1adea732ec1838f31d9e25d1 100644 (file)
@@ -35,7 +35,6 @@ pathpatterns = "0.1.2"
 
 proxmox-async = "0.4"
 proxmox-compression = "0.1.1"
-proxmox-fuse = "0.1.3"
 proxmox-http = { version = "0.7", features = [ "client", "http-helpers", "websocket" ] }
 proxmox-io = { version = "1.0.1", features = [ "tokio" ] }
 proxmox-lang = "1.1"
index 25e27b3783652fe2eaacbaf0b0bf5b55cb26b111..98af5699ea1128f54569843b43b3ff455f2eb6cd 100644 (file)
@@ -17,16 +17,20 @@ use pathpatterns::{MatchEntry, MatchList, MatchPattern, MatchType, PatternFlag};
 use proxmox_router::cli::{self, CliCommand, CliCommandMap, CliHelper, CommandLineInterface};
 use proxmox_schema::api;
 use proxmox_sys::fs::{create_path, CreateOptions};
+use pxar::accessor::ReadAt;
 use pxar::{EntryKind, Metadata};
 
 use pbs_datastore::catalog::{self, DirEntryAttribute};
 use proxmox_async::runtime::block_in_place;
 
-use crate::pxar::fuse::{Accessor, FileEntry};
 use crate::pxar::Flags;
 
 type CatalogReader = pbs_datastore::catalog::CatalogReader<std::fs::File>;
 
+type Reader = std::sync::Arc<dyn ReadAt + Send + Sync + 'static>;
+type Accessor = pxar::accessor::aio::Accessor<Reader>;
+type FileEntry = pxar::accessor::aio::FileEntry<Reader>;
+
 const MAX_SYMLINK_COUNT: usize = 40;
 
 static mut SHELL: Option<usize> = None;
diff --git a/pbs-client/src/pxar/fuse.rs b/pbs-client/src/pxar/fuse.rs
deleted file mode 100644 (file)
index ee0870d..0000000
+++ /dev/null
@@ -1,684 +0,0 @@
-//! Asynchronous fuse implementation.
-
-use std::collections::BTreeMap;
-use std::convert::TryFrom;
-use std::ffi::{OsStr, OsString};
-use std::future::Future;
-use std::io;
-use std::mem;
-use std::ops::Range;
-use std::os::unix::ffi::OsStrExt;
-use std::path::Path;
-use std::pin::Pin;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Arc, RwLock};
-use std::task::{Context, Poll};
-
-use anyhow::{format_err, Error};
-use futures::channel::mpsc::UnboundedSender;
-use futures::select;
-use futures::sink::SinkExt;
-use futures::stream::{StreamExt, TryStreamExt};
-
-use proxmox_io::vec;
-use pxar::accessor::{self, EntryRangeInfo, ReadAt};
-
-use proxmox_fuse::requests::{self, FuseRequest};
-use proxmox_fuse::{EntryParam, Fuse, ReplyBufState, Request, ROOT_ID};
-use proxmox_lang::io_format_err;
-use proxmox_sys::fs::xattr;
-
-/// We mark inodes for regular files this way so we know how to access them.
-const NON_DIRECTORY_INODE: u64 = 1u64 << 63;
-
-#[inline]
-fn is_dir_inode(inode: u64) -> bool {
-    0 == (inode & NON_DIRECTORY_INODE)
-}
-
-/// Our reader type instance used for accessors.
-pub type Reader = Arc<dyn ReadAt + Send + Sync + 'static>;
-
-/// Our Accessor type instance.
-pub type Accessor = accessor::aio::Accessor<Reader>;
-
-/// Our Directory type instance.
-pub type Directory = accessor::aio::Directory<Reader>;
-
-/// Our FileEntry type instance.
-pub type FileEntry = accessor::aio::FileEntry<Reader>;
-
-/// Our FileContents type instance.
-pub type FileContents = accessor::aio::FileContents<Reader>;
-
-pub struct Session {
-    fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send + Sync + 'static>>,
-}
-
-impl Session {
-    /// Create a fuse session for an archive.
-    pub async fn mount_path(
-        archive_path: &Path,
-        options: &OsStr,
-        verbose: bool,
-        mountpoint: &Path,
-    ) -> Result<Self, Error> {
-        // TODO: Add a buffered/caching ReadAt layer?
-        let file = std::fs::File::open(archive_path)?;
-        let file_size = file.metadata()?.len();
-        let reader: Reader = Arc::new(accessor::sync::FileReader::new(file));
-        let accessor = Accessor::new(reader, file_size).await?;
-        Self::mount(accessor, options, verbose, mountpoint)
-    }
-
-    /// Create a new fuse session for the given pxar `Accessor`.
-    pub fn mount(
-        accessor: Accessor,
-        options: &OsStr,
-        verbose: bool,
-        path: &Path,
-    ) -> Result<Self, Error> {
-        let fuse = Fuse::builder("pxar-mount")?
-            .debug()
-            .options_os(options)?
-            .enable_readdirplus()
-            .enable_read()
-            .enable_readlink()
-            .enable_read_xattr()
-            .build()?
-            .mount(path)?;
-
-        let session = SessionImpl::new(accessor, verbose);
-
-        Ok(Self {
-            fut: Box::pin(session.main(fuse)),
-        })
-    }
-}
-
-impl Future for Session {
-    type Output = Result<(), Error>;
-
-    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
-        Pin::new(&mut self.fut).poll(cx)
-    }
-}
-
-/// We use this to return an errno value back to the kernel.
-macro_rules! io_return {
-    ($errno:expr) => {{
-        return Err(::std::io::Error::from_raw_os_error($errno).into());
-    }};
-}
-
-/// This is what we need to cache as a "lookup" entry. The kernel assumes that these are easily
-/// accessed.
-struct Lookup {
-    refs: AtomicUsize,
-
-    inode: u64,
-    parent: u64,
-    entry_range_info: EntryRangeInfo,
-    content_range: Option<Range<u64>>,
-}
-
-impl Lookup {
-    fn new(
-        inode: u64,
-        parent: u64,
-        entry_range_info: EntryRangeInfo,
-        content_range: Option<Range<u64>>,
-    ) -> Box<Lookup> {
-        Box::new(Self {
-            refs: AtomicUsize::new(1),
-            inode,
-            parent,
-            entry_range_info,
-            content_range,
-        })
-    }
-
-    /// Decrease the reference count by `count`. Note that this must not include the reference held
-    /// by `self` itself, so this must not decrease the count below 2.
-    fn forget(&self, count: usize) -> Result<(), Error> {
-        loop {
-            let old = self.refs.load(Ordering::Acquire);
-            if count >= old {
-                // We use this to bail out of a functionin an unexpected error case. This will cause the fuse
-                // request to be answered with a generic `EIO` error code. The error message contained in here
-                // will be printed to stdout if the verbose flag is used, otherwise silently dropped.
-                return Err(io_format_err!("reference count underflow").into());
-            }
-            let new = old - count;
-            match self
-                .refs
-                .compare_exchange(old, new, Ordering::SeqCst, Ordering::SeqCst)
-            {
-                Ok(_) => break Ok(()),
-                Err(_) => continue,
-            }
-        }
-    }
-
-    fn get_ref<'a>(&self, session: &'a SessionImpl) -> LookupRef<'a> {
-        if self.refs.fetch_add(1, Ordering::AcqRel) == 0 {
-            panic!("atomic refcount increased from 0 to 1");
-        }
-
-        LookupRef {
-            session,
-            lookup: self as *const Lookup,
-        }
-    }
-}
-
-struct LookupRef<'a> {
-    session: &'a SessionImpl,
-    lookup: *const Lookup,
-}
-
-unsafe impl<'a> Send for LookupRef<'a> {}
-unsafe impl<'a> Sync for LookupRef<'a> {}
-
-impl<'a> Clone for LookupRef<'a> {
-    fn clone(&self) -> Self {
-        self.get_ref(self.session)
-    }
-}
-
-impl<'a> std::ops::Deref for LookupRef<'a> {
-    type Target = Lookup;
-
-    fn deref(&self) -> &Self::Target {
-        unsafe { &*self.lookup }
-    }
-}
-
-impl<'a> Drop for LookupRef<'a> {
-    fn drop(&mut self) {
-        if self.lookup.is_null() {
-            return;
-        }
-
-        if self.refs.fetch_sub(1, Ordering::AcqRel) == 1 {
-            let inode = self.inode;
-            drop(self.session.lookups.write().unwrap().remove(&inode));
-        }
-    }
-}
-
-impl<'a> LookupRef<'a> {
-    fn leak(mut self) -> &'a Lookup {
-        unsafe { &*mem::replace(&mut self.lookup, std::ptr::null()) }
-    }
-}
-
-struct SessionImpl {
-    accessor: Accessor,
-    verbose: bool,
-    lookups: RwLock<BTreeMap<u64, Box<Lookup>>>,
-}
-
-impl SessionImpl {
-    fn new(accessor: Accessor, verbose: bool) -> Self {
-        let root = Lookup::new(
-            ROOT_ID,
-            ROOT_ID,
-            EntryRangeInfo::toplevel(0..accessor.size()),
-            None,
-        );
-
-        let mut tree = BTreeMap::new();
-        tree.insert(ROOT_ID, root);
-
-        Self {
-            accessor,
-            verbose,
-            lookups: RwLock::new(tree),
-        }
-    }
-
-    /// Here's how we deal with errors:
-    ///
-    /// Any error will be logged if a log level of at least 'debug' was set, otherwise the
-    /// message will be silently dropped.
-    ///
-    /// Opaque errors will cause the fuse main loop to bail out with that error.
-    ///
-    /// `io::Error`s will cause the fuse request to responded to with the given `io::Error`. An
-    /// `io::ErrorKind::Other` translates to a generic `EIO`.
-    async fn handle_err(
-        &self,
-        request: impl FuseRequest,
-        err: Error,
-        mut sender: UnboundedSender<Error>,
-    ) {
-        let final_result = match err.downcast::<io::Error>() {
-            Ok(err) => {
-                if err.kind() == io::ErrorKind::Other {
-                    log::error!("an IO error occurred: {}", err);
-                }
-
-                // fail the request
-                request.io_fail(err).map_err(Error::from)
-            }
-            Err(err) => {
-                // `bail` (non-`io::Error`) is used for fatal errors which should actually cancel:
-                log::error!("internal error: {}, bailing out", err);
-                Err(err)
-            }
-        };
-        if let Err(err) = final_result {
-            // either we failed to send the error code to fuse, or the above was not an
-            // `io::Error`, so in this case notify the main loop:
-            sender
-                .send(err)
-                .await
-                .expect("failed to propagate error to main loop");
-        }
-    }
-
-    async fn main(self, fuse: Fuse) -> Result<(), Error> {
-        Arc::new(self).main_do(fuse).await
-    }
-
-    async fn main_do(self: Arc<Self>, fuse: Fuse) -> Result<(), Error> {
-        let (err_send, mut err_recv) = futures::channel::mpsc::unbounded::<Error>();
-        let mut fuse = fuse.fuse(); // make this a futures::stream::FusedStream!
-        loop {
-            select! {
-                request = fuse.try_next() => match request? {
-                    Some(request) => {
-                        tokio::spawn(Arc::clone(&self).handle_request(request, err_send.clone()));
-                    }
-                    None => break,
-                },
-                err = err_recv.next() => match err {
-                    Some(err) => if self.verbose {
-                        log::error!("cancelling fuse main loop due to error: {}", err);
-                        return Err(err);
-                    },
-                    None => panic!("error channel was closed unexpectedly"),
-                },
-            }
-        }
-        Ok(())
-    }
-
-    async fn handle_request(
-        self: Arc<Self>,
-        request: Request,
-        mut err_sender: UnboundedSender<Error>,
-    ) {
-        let result: Result<(), Error> = match request {
-            Request::Lookup(request) => {
-                match self.lookup(request.parent, &request.file_name).await {
-                    Ok((entry, lookup)) => match request.reply(&entry) {
-                        Ok(()) => {
-                            lookup.leak();
-                            Ok(())
-                        }
-                        Err(err) => Err(Error::from(err)),
-                    },
-                    Err(err) => return self.handle_err(request, err, err_sender).await,
-                }
-            }
-            Request::Forget(request) => match self.forget(request.inode, request.count as usize) {
-                Ok(()) => {
-                    request.reply();
-                    Ok(())
-                }
-                Err(err) => return self.handle_err(request, err, err_sender).await,
-            },
-            Request::Getattr(request) => match self.getattr(request.inode).await {
-                Ok(stat) => request.reply(&stat, f64::MAX).map_err(Error::from),
-                Err(err) => return self.handle_err(request, err, err_sender).await,
-            },
-            Request::ReaddirPlus(mut request) => match self.readdirplus(&mut request).await {
-                Ok(lookups) => match request.reply() {
-                    Ok(()) => {
-                        for i in lookups {
-                            i.leak();
-                        }
-                        Ok(())
-                    }
-                    Err(err) => Err(Error::from(err)),
-                },
-                Err(err) => return self.handle_err(request, err, err_sender).await,
-            },
-            Request::Read(request) => {
-                match self.read(request.inode, request.size, request.offset).await {
-                    Ok(data) => request.reply(&data).map_err(Error::from),
-                    Err(err) => return self.handle_err(request, err, err_sender).await,
-                }
-            }
-            Request::Readlink(request) => match self.readlink(request.inode).await {
-                Ok(data) => request.reply(&data).map_err(Error::from),
-                Err(err) => return self.handle_err(request, err, err_sender).await,
-            },
-            Request::ListXAttrSize(request) => match self.listxattrs(request.inode).await {
-                Ok(data) => request
-                    .reply(
-                        data.into_iter()
-                            .fold(0, |sum, i| sum + i.name().to_bytes_with_nul().len()),
-                    )
-                    .map_err(Error::from),
-                Err(err) => return self.handle_err(request, err, err_sender).await,
-            },
-            Request::ListXAttr(mut request) => match self.listxattrs_into(&mut request).await {
-                Ok(ReplyBufState::Ok) => request.reply().map_err(Error::from),
-                Ok(ReplyBufState::Full) => request.fail_full().map_err(Error::from),
-                Err(err) => return self.handle_err(request, err, err_sender).await,
-            },
-            Request::GetXAttrSize(request) => {
-                match self.getxattr(request.inode, &request.attr_name).await {
-                    Ok(xattr) => request.reply(xattr.value().len()).map_err(Error::from),
-                    Err(err) => return self.handle_err(request, err, err_sender).await,
-                }
-            }
-            Request::GetXAttr(request) => {
-                match self.getxattr(request.inode, &request.attr_name).await {
-                    Ok(xattr) => request.reply(xattr.value()).map_err(Error::from),
-                    Err(err) => return self.handle_err(request, err, err_sender).await,
-                }
-            }
-            other => {
-                log::error!("Received unexpected fuse request");
-                other.fail(libc::ENOSYS).map_err(Error::from)
-            }
-        };
-
-        if let Err(err) = result {
-            err_sender
-                .send(err)
-                .await
-                .expect("failed to propagate error to main loop");
-        }
-    }
-
-    fn get_lookup(&self, inode: u64) -> Result<LookupRef, Error> {
-        let lookups = self.lookups.read().unwrap();
-        if let Some(lookup) = lookups.get(&inode) {
-            return Ok(lookup.get_ref(self));
-        }
-        io_return!(libc::ENOENT);
-    }
-
-    async fn open_dir(&self, inode: u64) -> Result<Directory, Error> {
-        if inode == ROOT_ID {
-            Ok(self.accessor.open_root().await?)
-        } else if !is_dir_inode(inode) {
-            io_return!(libc::ENOTDIR);
-        } else {
-            Ok(unsafe { self.accessor.open_dir_at_end(inode).await? })
-        }
-    }
-
-    async fn open_entry(&self, lookup: &LookupRef<'_>) -> io::Result<FileEntry> {
-        unsafe {
-            self.accessor
-                .open_file_at_range(&lookup.entry_range_info)
-                .await
-        }
-    }
-
-    fn open_content(&self, lookup: &LookupRef) -> Result<FileContents, Error> {
-        if is_dir_inode(lookup.inode) {
-            io_return!(libc::EISDIR);
-        }
-
-        match lookup.content_range.clone() {
-            Some(range) => Ok(unsafe { self.accessor.open_contents_at_range(range) }),
-            None => io_return!(libc::EBADF),
-        }
-    }
-
-    fn make_lookup(&self, parent: u64, inode: u64, entry: &FileEntry) -> Result<LookupRef, Error> {
-        let lookups = self.lookups.read().unwrap();
-        if let Some(lookup) = lookups.get(&inode) {
-            return Ok(lookup.get_ref(self));
-        }
-        drop(lookups);
-
-        let entry = Lookup::new(
-            inode,
-            parent,
-            entry.entry_range_info().clone(),
-            entry.content_range()?,
-        );
-        let reference = entry.get_ref(self);
-        entry.refs.store(1, Ordering::Release);
-
-        let mut lookups = self.lookups.write().unwrap();
-        if let Some(lookup) = lookups.get(&inode) {
-            return Ok(lookup.get_ref(self));
-        }
-
-        lookups.insert(inode, entry);
-        drop(lookups);
-        Ok(reference)
-    }
-
-    fn forget(&self, inode: u64, count: usize) -> Result<(), Error> {
-        let node = self.get_lookup(inode)?;
-        node.forget(count)?;
-        Ok(())
-    }
-
-    async fn lookup(
-        &'_ self,
-        parent: u64,
-        file_name: &OsStr,
-    ) -> Result<(EntryParam, LookupRef<'_>), Error> {
-        let dir = self.open_dir(parent).await?;
-
-        let entry = match { dir }.lookup(file_name).await? {
-            Some(entry) => entry,
-            None => io_return!(libc::ENOENT),
-        };
-
-        let entry = if let pxar::EntryKind::Hardlink(_) = entry.kind() {
-            // we don't know the file's end-offset, so we'll just allow the decoder to decode the
-            // entire rest of the archive until we figure out something better...
-            let entry = self.accessor.follow_hardlink(&entry).await?;
-
-            if let pxar::EntryKind::Hardlink(_) = entry.kind() {
-                // hardlinks must not point to other hardlinks...
-                io_return!(libc::ELOOP);
-            }
-
-            entry
-        } else {
-            entry
-        };
-
-        let response = to_entry(&entry)?;
-        let inode = response.inode;
-        Ok((response, self.make_lookup(parent, inode, &entry)?))
-    }
-
-    async fn getattr(&self, inode: u64) -> Result<libc::stat, Error> {
-        let entry = unsafe {
-            self.accessor
-                .open_file_at_range(&self.get_lookup(inode)?.entry_range_info)
-                .await?
-        };
-        to_stat(inode, &entry)
-    }
-
-    async fn readdirplus(
-        &'_ self,
-        request: &mut requests::ReaddirPlus,
-    ) -> Result<Vec<LookupRef<'_>>, Error> {
-        let mut lookups = Vec::new();
-        let offset = usize::try_from(request.offset)
-            .map_err(|_| io_format_err!("directory offset out of range"))?;
-
-        let dir = self.open_dir(request.inode).await?;
-        let dir_lookup = self.get_lookup(request.inode)?;
-
-        let entry_count = dir.read_dir().count() as isize;
-
-        let mut next = offset as isize;
-        let mut iter = dir.read_dir().skip(offset);
-        while let Some(file) = iter.next().await {
-            next += 1;
-            let file = file?.decode_entry().await?;
-            let stat = to_stat(to_inode(&file), &file)?;
-            let name = file.file_name();
-            match request.add_entry(name, &stat, next, 1, f64::MAX, f64::MAX)? {
-                ReplyBufState::Ok => (),
-                ReplyBufState::Full => return Ok(lookups),
-            }
-            lookups.push(self.make_lookup(request.inode, stat.st_ino, &file)?);
-        }
-
-        if next == entry_count {
-            next += 1;
-            let file = dir.lookup_self().await?;
-            let stat = to_stat(to_inode(&file), &file)?;
-            let name = OsStr::new(".");
-            match request.add_entry(name, &stat, next, 1, f64::MAX, f64::MAX)? {
-                ReplyBufState::Ok => (),
-                ReplyBufState::Full => return Ok(lookups),
-            }
-            lookups.push(LookupRef::clone(&dir_lookup));
-        }
-
-        if next == entry_count + 1 {
-            next += 1;
-            let lookup = self.get_lookup(dir_lookup.parent)?;
-            let parent_dir = self.open_dir(lookup.inode).await?;
-            let file = parent_dir.lookup_self().await?;
-            let stat = to_stat(to_inode(&file), &file)?;
-            let name = OsStr::new("..");
-            match request.add_entry(name, &stat, next, 1, f64::MAX, f64::MAX)? {
-                ReplyBufState::Ok => (),
-                ReplyBufState::Full => return Ok(lookups),
-            }
-            lookups.push(lookup);
-        }
-
-        Ok(lookups)
-    }
-
-    async fn read(&self, inode: u64, len: usize, offset: u64) -> Result<Vec<u8>, Error> {
-        let file = self.get_lookup(inode)?;
-        let content = self.open_content(&file)?;
-        let mut buf = vec::undefined(len);
-        let mut pos = 0;
-        // fuse' read is different from normal read - no short reads allowed except for EOF!
-        // the returned data will be 0-byte padded up to len by fuse
-        loop {
-            let got = content
-                .read_at(&mut buf[pos..], offset + pos as u64)
-                .await?;
-            pos += got;
-            if got == 0 || pos >= len {
-                break;
-            }
-        }
-        buf.truncate(pos);
-        Ok(buf)
-    }
-
-    async fn readlink(&self, inode: u64) -> Result<OsString, Error> {
-        let lookup = self.get_lookup(inode)?;
-        let file = self.open_entry(&lookup).await?;
-        match file.get_symlink() {
-            None => io_return!(libc::EINVAL),
-            Some(link) => Ok(link.to_owned()),
-        }
-    }
-
-    async fn listxattrs(&self, inode: u64) -> Result<Vec<pxar::format::XAttr>, Error> {
-        let lookup = self.get_lookup(inode)?;
-        let metadata = self.open_entry(&lookup).await?.into_entry().into_metadata();
-
-        let mut xattrs = metadata.xattrs;
-
-        use pxar::format::XAttr;
-
-        if let Some(fcaps) = metadata.fcaps {
-            xattrs.push(XAttr::new(xattr::xattr_name_fcaps().to_bytes(), fcaps.data));
-        }
-
-        // TODO: Special cases:
-        //     b"system.posix_acl_access
-        //     b"system.posix_acl_default
-        //
-        // For these we need to be able to create posix acl format entries, at that point we could
-        // just ditch libacl as well...
-
-        Ok(xattrs)
-    }
-
-    async fn listxattrs_into(
-        &self,
-        request: &mut requests::ListXAttr,
-    ) -> Result<ReplyBufState, Error> {
-        let xattrs = self.listxattrs(request.inode).await?;
-
-        for entry in xattrs {
-            match request.add_c_string(entry.name()) {
-                ReplyBufState::Ok => (),
-                ReplyBufState::Full => return Ok(ReplyBufState::Full),
-            }
-        }
-
-        Ok(ReplyBufState::Ok)
-    }
-
-    async fn getxattr(&self, inode: u64, xattr: &OsStr) -> Result<pxar::format::XAttr, Error> {
-        // TODO: pxar::Accessor could probably get a more optimized method to fetch a specific
-        // xattr for an entry...
-        let xattrs = self.listxattrs(inode).await?;
-        for entry in xattrs {
-            if entry.name().to_bytes() == xattr.as_bytes() {
-                return Ok(entry);
-            }
-        }
-        io_return!(libc::ENODATA);
-    }
-}
-
-#[inline]
-fn to_entry(entry: &FileEntry) -> Result<EntryParam, Error> {
-    to_entry_param(to_inode(entry), entry)
-}
-
-#[inline]
-fn to_inode(entry: &FileEntry) -> u64 {
-    if entry.is_dir() {
-        entry.entry_range_info().entry_range.end
-    } else {
-        entry.entry_range_info().entry_range.start | NON_DIRECTORY_INODE
-    }
-}
-
-fn to_entry_param(inode: u64, entry: &pxar::Entry) -> Result<EntryParam, Error> {
-    Ok(EntryParam::simple(inode, to_stat(inode, entry)?))
-}
-
-fn to_stat(inode: u64, entry: &pxar::Entry) -> Result<libc::stat, Error> {
-    let nlink = if entry.is_dir() { 2 } else { 1 };
-
-    let metadata = entry.metadata();
-
-    let mut stat: libc::stat = unsafe { mem::zeroed() };
-    stat.st_ino = inode;
-    stat.st_nlink = nlink;
-    stat.st_mode = u32::try_from(metadata.stat.mode)
-        .map_err(|err| format_err!("mode does not fit into st_mode field: {}", err))?;
-    stat.st_size = i64::try_from(entry.file_size().unwrap_or(0))
-        .map_err(|err| format_err!("size does not fit into st_size field: {}", err))?;
-    stat.st_uid = metadata.stat.uid;
-    stat.st_gid = metadata.stat.gid;
-    stat.st_atime = metadata.stat.mtime.secs;
-    stat.st_atime_nsec = metadata.stat.mtime.nanos as _;
-    stat.st_mtime = metadata.stat.mtime.secs;
-    stat.st_mtime_nsec = metadata.stat.mtime.nanos as _;
-    stat.st_ctime = metadata.stat.mtime.secs;
-    stat.st_ctime_nsec = metadata.stat.mtime.nanos as _;
-    Ok(stat)
-}
index 725fc2d922dffdb830d0ea9fd8ee71d6f55ac3d0..a158101d6d8c9464f8490fb463d6c0241c7d7e18 100644 (file)
@@ -50,7 +50,6 @@
 pub(crate) mod create;
 pub(crate) mod dir_stack;
 pub(crate) mod extract;
-pub mod fuse;
 pub(crate) mod metadata;
 pub(crate) mod tools;
 
diff --git a/pbs-pxar-fuse/Cargo.toml b/pbs-pxar-fuse/Cargo.toml
new file mode 100644 (file)
index 0000000..cb0abfe
--- /dev/null
@@ -0,0 +1,19 @@
+[package]
+name = "pbs-pxar-fuse"
+version = "0.1.0"
+edition = "2021"
+description = "pxar fuse file system code"
+
+[dependencies]
+anyhow = "1.0"
+futures = "0.3"
+libc = "0.2"
+log = "0.4"
+tokio = "1.6"
+
+proxmox-fuse = "0.1.3"
+proxmox-io = "1.0.1"
+proxmox-lang = "1.1"
+proxmox-sys = "0.4.1"
+
+pxar = { version = "0.10.2", features = [ "tokio-io" ] }
diff --git a/pbs-pxar-fuse/src/lib.rs b/pbs-pxar-fuse/src/lib.rs
new file mode 100644 (file)
index 0000000..ee0870d
--- /dev/null
@@ -0,0 +1,684 @@
+//! Asynchronous fuse implementation.
+
+use std::collections::BTreeMap;
+use std::convert::TryFrom;
+use std::ffi::{OsStr, OsString};
+use std::future::Future;
+use std::io;
+use std::mem;
+use std::ops::Range;
+use std::os::unix::ffi::OsStrExt;
+use std::path::Path;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, RwLock};
+use std::task::{Context, Poll};
+
+use anyhow::{format_err, Error};
+use futures::channel::mpsc::UnboundedSender;
+use futures::select;
+use futures::sink::SinkExt;
+use futures::stream::{StreamExt, TryStreamExt};
+
+use proxmox_io::vec;
+use pxar::accessor::{self, EntryRangeInfo, ReadAt};
+
+use proxmox_fuse::requests::{self, FuseRequest};
+use proxmox_fuse::{EntryParam, Fuse, ReplyBufState, Request, ROOT_ID};
+use proxmox_lang::io_format_err;
+use proxmox_sys::fs::xattr;
+
+/// We mark inodes for regular files this way so we know how to access them.
+const NON_DIRECTORY_INODE: u64 = 1u64 << 63;
+
+#[inline]
+fn is_dir_inode(inode: u64) -> bool {
+    0 == (inode & NON_DIRECTORY_INODE)
+}
+
+/// Our reader type instance used for accessors.
+pub type Reader = Arc<dyn ReadAt + Send + Sync + 'static>;
+
+/// Our Accessor type instance.
+pub type Accessor = accessor::aio::Accessor<Reader>;
+
+/// Our Directory type instance.
+pub type Directory = accessor::aio::Directory<Reader>;
+
+/// Our FileEntry type instance.
+pub type FileEntry = accessor::aio::FileEntry<Reader>;
+
+/// Our FileContents type instance.
+pub type FileContents = accessor::aio::FileContents<Reader>;
+
+pub struct Session {
+    fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send + Sync + 'static>>,
+}
+
+impl Session {
+    /// Create a fuse session for an archive.
+    pub async fn mount_path(
+        archive_path: &Path,
+        options: &OsStr,
+        verbose: bool,
+        mountpoint: &Path,
+    ) -> Result<Self, Error> {
+        // TODO: Add a buffered/caching ReadAt layer?
+        let file = std::fs::File::open(archive_path)?;
+        let file_size = file.metadata()?.len();
+        let reader: Reader = Arc::new(accessor::sync::FileReader::new(file));
+        let accessor = Accessor::new(reader, file_size).await?;
+        Self::mount(accessor, options, verbose, mountpoint)
+    }
+
+    /// Create a new fuse session for the given pxar `Accessor`.
+    pub fn mount(
+        accessor: Accessor,
+        options: &OsStr,
+        verbose: bool,
+        path: &Path,
+    ) -> Result<Self, Error> {
+        let fuse = Fuse::builder("pxar-mount")?
+            .debug()
+            .options_os(options)?
+            .enable_readdirplus()
+            .enable_read()
+            .enable_readlink()
+            .enable_read_xattr()
+            .build()?
+            .mount(path)?;
+
+        let session = SessionImpl::new(accessor, verbose);
+
+        Ok(Self {
+            fut: Box::pin(session.main(fuse)),
+        })
+    }
+}
+
+impl Future for Session {
+    type Output = Result<(), Error>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+        Pin::new(&mut self.fut).poll(cx)
+    }
+}
+
+/// We use this to return an errno value back to the kernel.
+macro_rules! io_return {
+    ($errno:expr) => {{
+        return Err(::std::io::Error::from_raw_os_error($errno).into());
+    }};
+}
+
+/// This is what we need to cache as a "lookup" entry. The kernel assumes that these are easily
+/// accessed.
+struct Lookup {
+    refs: AtomicUsize,
+
+    inode: u64,
+    parent: u64,
+    entry_range_info: EntryRangeInfo,
+    content_range: Option<Range<u64>>,
+}
+
+impl Lookup {
+    fn new(
+        inode: u64,
+        parent: u64,
+        entry_range_info: EntryRangeInfo,
+        content_range: Option<Range<u64>>,
+    ) -> Box<Lookup> {
+        Box::new(Self {
+            refs: AtomicUsize::new(1),
+            inode,
+            parent,
+            entry_range_info,
+            content_range,
+        })
+    }
+
+    /// Decrease the reference count by `count`. Note that this must not include the reference held
+    /// by `self` itself, so this must not decrease the count below 2.
+    fn forget(&self, count: usize) -> Result<(), Error> {
+        loop {
+            let old = self.refs.load(Ordering::Acquire);
+            if count >= old {
+                // We use this to bail out of a functionin an unexpected error case. This will cause the fuse
+                // request to be answered with a generic `EIO` error code. The error message contained in here
+                // will be printed to stdout if the verbose flag is used, otherwise silently dropped.
+                return Err(io_format_err!("reference count underflow").into());
+            }
+            let new = old - count;
+            match self
+                .refs
+                .compare_exchange(old, new, Ordering::SeqCst, Ordering::SeqCst)
+            {
+                Ok(_) => break Ok(()),
+                Err(_) => continue,
+            }
+        }
+    }
+
+    fn get_ref<'a>(&self, session: &'a SessionImpl) -> LookupRef<'a> {
+        if self.refs.fetch_add(1, Ordering::AcqRel) == 0 {
+            panic!("atomic refcount increased from 0 to 1");
+        }
+
+        LookupRef {
+            session,
+            lookup: self as *const Lookup,
+        }
+    }
+}
+
+struct LookupRef<'a> {
+    session: &'a SessionImpl,
+    lookup: *const Lookup,
+}
+
+unsafe impl<'a> Send for LookupRef<'a> {}
+unsafe impl<'a> Sync for LookupRef<'a> {}
+
+impl<'a> Clone for LookupRef<'a> {
+    fn clone(&self) -> Self {
+        self.get_ref(self.session)
+    }
+}
+
+impl<'a> std::ops::Deref for LookupRef<'a> {
+    type Target = Lookup;
+
+    fn deref(&self) -> &Self::Target {
+        unsafe { &*self.lookup }
+    }
+}
+
+impl<'a> Drop for LookupRef<'a> {
+    fn drop(&mut self) {
+        if self.lookup.is_null() {
+            return;
+        }
+
+        if self.refs.fetch_sub(1, Ordering::AcqRel) == 1 {
+            let inode = self.inode;
+            drop(self.session.lookups.write().unwrap().remove(&inode));
+        }
+    }
+}
+
+impl<'a> LookupRef<'a> {
+    fn leak(mut self) -> &'a Lookup {
+        unsafe { &*mem::replace(&mut self.lookup, std::ptr::null()) }
+    }
+}
+
+struct SessionImpl {
+    accessor: Accessor,
+    verbose: bool,
+    lookups: RwLock<BTreeMap<u64, Box<Lookup>>>,
+}
+
+impl SessionImpl {
+    fn new(accessor: Accessor, verbose: bool) -> Self {
+        let root = Lookup::new(
+            ROOT_ID,
+            ROOT_ID,
+            EntryRangeInfo::toplevel(0..accessor.size()),
+            None,
+        );
+
+        let mut tree = BTreeMap::new();
+        tree.insert(ROOT_ID, root);
+
+        Self {
+            accessor,
+            verbose,
+            lookups: RwLock::new(tree),
+        }
+    }
+
+    /// Here's how we deal with errors:
+    ///
+    /// Any error will be logged if a log level of at least 'debug' was set, otherwise the
+    /// message will be silently dropped.
+    ///
+    /// Opaque errors will cause the fuse main loop to bail out with that error.
+    ///
+    /// `io::Error`s will cause the fuse request to responded to with the given `io::Error`. An
+    /// `io::ErrorKind::Other` translates to a generic `EIO`.
+    async fn handle_err(
+        &self,
+        request: impl FuseRequest,
+        err: Error,
+        mut sender: UnboundedSender<Error>,
+    ) {
+        let final_result = match err.downcast::<io::Error>() {
+            Ok(err) => {
+                if err.kind() == io::ErrorKind::Other {
+                    log::error!("an IO error occurred: {}", err);
+                }
+
+                // fail the request
+                request.io_fail(err).map_err(Error::from)
+            }
+            Err(err) => {
+                // `bail` (non-`io::Error`) is used for fatal errors which should actually cancel:
+                log::error!("internal error: {}, bailing out", err);
+                Err(err)
+            }
+        };
+        if let Err(err) = final_result {
+            // either we failed to send the error code to fuse, or the above was not an
+            // `io::Error`, so in this case notify the main loop:
+            sender
+                .send(err)
+                .await
+                .expect("failed to propagate error to main loop");
+        }
+    }
+
+    async fn main(self, fuse: Fuse) -> Result<(), Error> {
+        Arc::new(self).main_do(fuse).await
+    }
+
+    async fn main_do(self: Arc<Self>, fuse: Fuse) -> Result<(), Error> {
+        let (err_send, mut err_recv) = futures::channel::mpsc::unbounded::<Error>();
+        let mut fuse = fuse.fuse(); // make this a futures::stream::FusedStream!
+        loop {
+            select! {
+                request = fuse.try_next() => match request? {
+                    Some(request) => {
+                        tokio::spawn(Arc::clone(&self).handle_request(request, err_send.clone()));
+                    }
+                    None => break,
+                },
+                err = err_recv.next() => match err {
+                    Some(err) => if self.verbose {
+                        log::error!("cancelling fuse main loop due to error: {}", err);
+                        return Err(err);
+                    },
+                    None => panic!("error channel was closed unexpectedly"),
+                },
+            }
+        }
+        Ok(())
+    }
+
+    async fn handle_request(
+        self: Arc<Self>,
+        request: Request,
+        mut err_sender: UnboundedSender<Error>,
+    ) {
+        let result: Result<(), Error> = match request {
+            Request::Lookup(request) => {
+                match self.lookup(request.parent, &request.file_name).await {
+                    Ok((entry, lookup)) => match request.reply(&entry) {
+                        Ok(()) => {
+                            lookup.leak();
+                            Ok(())
+                        }
+                        Err(err) => Err(Error::from(err)),
+                    },
+                    Err(err) => return self.handle_err(request, err, err_sender).await,
+                }
+            }
+            Request::Forget(request) => match self.forget(request.inode, request.count as usize) {
+                Ok(()) => {
+                    request.reply();
+                    Ok(())
+                }
+                Err(err) => return self.handle_err(request, err, err_sender).await,
+            },
+            Request::Getattr(request) => match self.getattr(request.inode).await {
+                Ok(stat) => request.reply(&stat, f64::MAX).map_err(Error::from),
+                Err(err) => return self.handle_err(request, err, err_sender).await,
+            },
+            Request::ReaddirPlus(mut request) => match self.readdirplus(&mut request).await {
+                Ok(lookups) => match request.reply() {
+                    Ok(()) => {
+                        for i in lookups {
+                            i.leak();
+                        }
+                        Ok(())
+                    }
+                    Err(err) => Err(Error::from(err)),
+                },
+                Err(err) => return self.handle_err(request, err, err_sender).await,
+            },
+            Request::Read(request) => {
+                match self.read(request.inode, request.size, request.offset).await {
+                    Ok(data) => request.reply(&data).map_err(Error::from),
+                    Err(err) => return self.handle_err(request, err, err_sender).await,
+                }
+            }
+            Request::Readlink(request) => match self.readlink(request.inode).await {
+                Ok(data) => request.reply(&data).map_err(Error::from),
+                Err(err) => return self.handle_err(request, err, err_sender).await,
+            },
+            Request::ListXAttrSize(request) => match self.listxattrs(request.inode).await {
+                Ok(data) => request
+                    .reply(
+                        data.into_iter()
+                            .fold(0, |sum, i| sum + i.name().to_bytes_with_nul().len()),
+                    )
+                    .map_err(Error::from),
+                Err(err) => return self.handle_err(request, err, err_sender).await,
+            },
+            Request::ListXAttr(mut request) => match self.listxattrs_into(&mut request).await {
+                Ok(ReplyBufState::Ok) => request.reply().map_err(Error::from),
+                Ok(ReplyBufState::Full) => request.fail_full().map_err(Error::from),
+                Err(err) => return self.handle_err(request, err, err_sender).await,
+            },
+            Request::GetXAttrSize(request) => {
+                match self.getxattr(request.inode, &request.attr_name).await {
+                    Ok(xattr) => request.reply(xattr.value().len()).map_err(Error::from),
+                    Err(err) => return self.handle_err(request, err, err_sender).await,
+                }
+            }
+            Request::GetXAttr(request) => {
+                match self.getxattr(request.inode, &request.attr_name).await {
+                    Ok(xattr) => request.reply(xattr.value()).map_err(Error::from),
+                    Err(err) => return self.handle_err(request, err, err_sender).await,
+                }
+            }
+            other => {
+                log::error!("Received unexpected fuse request");
+                other.fail(libc::ENOSYS).map_err(Error::from)
+            }
+        };
+
+        if let Err(err) = result {
+            err_sender
+                .send(err)
+                .await
+                .expect("failed to propagate error to main loop");
+        }
+    }
+
+    fn get_lookup(&self, inode: u64) -> Result<LookupRef, Error> {
+        let lookups = self.lookups.read().unwrap();
+        if let Some(lookup) = lookups.get(&inode) {
+            return Ok(lookup.get_ref(self));
+        }
+        io_return!(libc::ENOENT);
+    }
+
+    async fn open_dir(&self, inode: u64) -> Result<Directory, Error> {
+        if inode == ROOT_ID {
+            Ok(self.accessor.open_root().await?)
+        } else if !is_dir_inode(inode) {
+            io_return!(libc::ENOTDIR);
+        } else {
+            Ok(unsafe { self.accessor.open_dir_at_end(inode).await? })
+        }
+    }
+
+    async fn open_entry(&self, lookup: &LookupRef<'_>) -> io::Result<FileEntry> {
+        unsafe {
+            self.accessor
+                .open_file_at_range(&lookup.entry_range_info)
+                .await
+        }
+    }
+
+    fn open_content(&self, lookup: &LookupRef) -> Result<FileContents, Error> {
+        if is_dir_inode(lookup.inode) {
+            io_return!(libc::EISDIR);
+        }
+
+        match lookup.content_range.clone() {
+            Some(range) => Ok(unsafe { self.accessor.open_contents_at_range(range) }),
+            None => io_return!(libc::EBADF),
+        }
+    }
+
+    fn make_lookup(&self, parent: u64, inode: u64, entry: &FileEntry) -> Result<LookupRef, Error> {
+        let lookups = self.lookups.read().unwrap();
+        if let Some(lookup) = lookups.get(&inode) {
+            return Ok(lookup.get_ref(self));
+        }
+        drop(lookups);
+
+        let entry = Lookup::new(
+            inode,
+            parent,
+            entry.entry_range_info().clone(),
+            entry.content_range()?,
+        );
+        let reference = entry.get_ref(self);
+        entry.refs.store(1, Ordering::Release);
+
+        let mut lookups = self.lookups.write().unwrap();
+        if let Some(lookup) = lookups.get(&inode) {
+            return Ok(lookup.get_ref(self));
+        }
+
+        lookups.insert(inode, entry);
+        drop(lookups);
+        Ok(reference)
+    }
+
+    fn forget(&self, inode: u64, count: usize) -> Result<(), Error> {
+        let node = self.get_lookup(inode)?;
+        node.forget(count)?;
+        Ok(())
+    }
+
+    async fn lookup(
+        &'_ self,
+        parent: u64,
+        file_name: &OsStr,
+    ) -> Result<(EntryParam, LookupRef<'_>), Error> {
+        let dir = self.open_dir(parent).await?;
+
+        let entry = match { dir }.lookup(file_name).await? {
+            Some(entry) => entry,
+            None => io_return!(libc::ENOENT),
+        };
+
+        let entry = if let pxar::EntryKind::Hardlink(_) = entry.kind() {
+            // we don't know the file's end-offset, so we'll just allow the decoder to decode the
+            // entire rest of the archive until we figure out something better...
+            let entry = self.accessor.follow_hardlink(&entry).await?;
+
+            if let pxar::EntryKind::Hardlink(_) = entry.kind() {
+                // hardlinks must not point to other hardlinks...
+                io_return!(libc::ELOOP);
+            }
+
+            entry
+        } else {
+            entry
+        };
+
+        let response = to_entry(&entry)?;
+        let inode = response.inode;
+        Ok((response, self.make_lookup(parent, inode, &entry)?))
+    }
+
+    async fn getattr(&self, inode: u64) -> Result<libc::stat, Error> {
+        let entry = unsafe {
+            self.accessor
+                .open_file_at_range(&self.get_lookup(inode)?.entry_range_info)
+                .await?
+        };
+        to_stat(inode, &entry)
+    }
+
+    async fn readdirplus(
+        &'_ self,
+        request: &mut requests::ReaddirPlus,
+    ) -> Result<Vec<LookupRef<'_>>, Error> {
+        let mut lookups = Vec::new();
+        let offset = usize::try_from(request.offset)
+            .map_err(|_| io_format_err!("directory offset out of range"))?;
+
+        let dir = self.open_dir(request.inode).await?;
+        let dir_lookup = self.get_lookup(request.inode)?;
+
+        let entry_count = dir.read_dir().count() as isize;
+
+        let mut next = offset as isize;
+        let mut iter = dir.read_dir().skip(offset);
+        while let Some(file) = iter.next().await {
+            next += 1;
+            let file = file?.decode_entry().await?;
+            let stat = to_stat(to_inode(&file), &file)?;
+            let name = file.file_name();
+            match request.add_entry(name, &stat, next, 1, f64::MAX, f64::MAX)? {
+                ReplyBufState::Ok => (),
+                ReplyBufState::Full => return Ok(lookups),
+            }
+            lookups.push(self.make_lookup(request.inode, stat.st_ino, &file)?);
+        }
+
+        if next == entry_count {
+            next += 1;
+            let file = dir.lookup_self().await?;
+            let stat = to_stat(to_inode(&file), &file)?;
+            let name = OsStr::new(".");
+            match request.add_entry(name, &stat, next, 1, f64::MAX, f64::MAX)? {
+                ReplyBufState::Ok => (),
+                ReplyBufState::Full => return Ok(lookups),
+            }
+            lookups.push(LookupRef::clone(&dir_lookup));
+        }
+
+        if next == entry_count + 1 {
+            next += 1;
+            let lookup = self.get_lookup(dir_lookup.parent)?;
+            let parent_dir = self.open_dir(lookup.inode).await?;
+            let file = parent_dir.lookup_self().await?;
+            let stat = to_stat(to_inode(&file), &file)?;
+            let name = OsStr::new("..");
+            match request.add_entry(name, &stat, next, 1, f64::MAX, f64::MAX)? {
+                ReplyBufState::Ok => (),
+                ReplyBufState::Full => return Ok(lookups),
+            }
+            lookups.push(lookup);
+        }
+
+        Ok(lookups)
+    }
+
+    async fn read(&self, inode: u64, len: usize, offset: u64) -> Result<Vec<u8>, Error> {
+        let file = self.get_lookup(inode)?;
+        let content = self.open_content(&file)?;
+        let mut buf = vec::undefined(len);
+        let mut pos = 0;
+        // fuse' read is different from normal read - no short reads allowed except for EOF!
+        // the returned data will be 0-byte padded up to len by fuse
+        loop {
+            let got = content
+                .read_at(&mut buf[pos..], offset + pos as u64)
+                .await?;
+            pos += got;
+            if got == 0 || pos >= len {
+                break;
+            }
+        }
+        buf.truncate(pos);
+        Ok(buf)
+    }
+
+    async fn readlink(&self, inode: u64) -> Result<OsString, Error> {
+        let lookup = self.get_lookup(inode)?;
+        let file = self.open_entry(&lookup).await?;
+        match file.get_symlink() {
+            None => io_return!(libc::EINVAL),
+            Some(link) => Ok(link.to_owned()),
+        }
+    }
+
+    async fn listxattrs(&self, inode: u64) -> Result<Vec<pxar::format::XAttr>, Error> {
+        let lookup = self.get_lookup(inode)?;
+        let metadata = self.open_entry(&lookup).await?.into_entry().into_metadata();
+
+        let mut xattrs = metadata.xattrs;
+
+        use pxar::format::XAttr;
+
+        if let Some(fcaps) = metadata.fcaps {
+            xattrs.push(XAttr::new(xattr::xattr_name_fcaps().to_bytes(), fcaps.data));
+        }
+
+        // TODO: Special cases:
+        //     b"system.posix_acl_access
+        //     b"system.posix_acl_default
+        //
+        // For these we need to be able to create posix acl format entries, at that point we could
+        // just ditch libacl as well...
+
+        Ok(xattrs)
+    }
+
+    async fn listxattrs_into(
+        &self,
+        request: &mut requests::ListXAttr,
+    ) -> Result<ReplyBufState, Error> {
+        let xattrs = self.listxattrs(request.inode).await?;
+
+        for entry in xattrs {
+            match request.add_c_string(entry.name()) {
+                ReplyBufState::Ok => (),
+                ReplyBufState::Full => return Ok(ReplyBufState::Full),
+            }
+        }
+
+        Ok(ReplyBufState::Ok)
+    }
+
+    async fn getxattr(&self, inode: u64, xattr: &OsStr) -> Result<pxar::format::XAttr, Error> {
+        // TODO: pxar::Accessor could probably get a more optimized method to fetch a specific
+        // xattr for an entry...
+        let xattrs = self.listxattrs(inode).await?;
+        for entry in xattrs {
+            if entry.name().to_bytes() == xattr.as_bytes() {
+                return Ok(entry);
+            }
+        }
+        io_return!(libc::ENODATA);
+    }
+}
+
+#[inline]
+fn to_entry(entry: &FileEntry) -> Result<EntryParam, Error> {
+    to_entry_param(to_inode(entry), entry)
+}
+
+#[inline]
+fn to_inode(entry: &FileEntry) -> u64 {
+    if entry.is_dir() {
+        entry.entry_range_info().entry_range.end
+    } else {
+        entry.entry_range_info().entry_range.start | NON_DIRECTORY_INODE
+    }
+}
+
+fn to_entry_param(inode: u64, entry: &pxar::Entry) -> Result<EntryParam, Error> {
+    Ok(EntryParam::simple(inode, to_stat(inode, entry)?))
+}
+
+fn to_stat(inode: u64, entry: &pxar::Entry) -> Result<libc::stat, Error> {
+    let nlink = if entry.is_dir() { 2 } else { 1 };
+
+    let metadata = entry.metadata();
+
+    let mut stat: libc::stat = unsafe { mem::zeroed() };
+    stat.st_ino = inode;
+    stat.st_nlink = nlink;
+    stat.st_mode = u32::try_from(metadata.stat.mode)
+        .map_err(|err| format_err!("mode does not fit into st_mode field: {}", err))?;
+    stat.st_size = i64::try_from(entry.file_size().unwrap_or(0))
+        .map_err(|err| format_err!("size does not fit into st_size field: {}", err))?;
+    stat.st_uid = metadata.stat.uid;
+    stat.st_gid = metadata.stat.gid;
+    stat.st_atime = metadata.stat.mtime.secs;
+    stat.st_atime_nsec = metadata.stat.mtime.nanos as _;
+    stat.st_mtime = metadata.stat.mtime.secs;
+    stat.st_mtime_nsec = metadata.stat.mtime.nanos as _;
+    stat.st_ctime = metadata.stat.mtime.secs;
+    stat.st_ctime_nsec = metadata.stat.mtime.nanos as _;
+    Ok(stat)
+}
index bce6a0d1bc228cacc4356e558c8a6b9aec8dd776..518b2004b0041e32f2851becb17ed70753aefbb5 100644 (file)
@@ -9,8 +9,8 @@ anyhow = "1.0"
 futures = "0.3"
 hyper = { version = "0.14", features = [ "full" ] }
 libc = "0.2"
-nix = "0.24"
 log = "0.4"
+nix = "0.24"
 openssl = "0.10"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
@@ -24,16 +24,18 @@ pathpatterns = "0.1.2"
 pxar = { version = "0.10.2", features = [ "tokio-io" ] }
 
 proxmox-async = "0.4"
+proxmox-fuse = "0.1.3"
 proxmox-io = "1.0.1"
 proxmox-router = { version = "1.3.0", features = [ "cli" ] }
 proxmox-schema = { version = "1.3.1", features = [ "api-macro" ] }
-proxmox-time = "1"
 proxmox-sys = { version = "0.4.1",  features = [ "sortable-macro" ] }
+proxmox-time = "1"
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-buildcfg = { path = "../pbs-buildcfg" }
-pbs-config = { path = "../pbs-config" }
 pbs-client = { path = "../pbs-client" }
+pbs-config = { path = "../pbs-config" }
 pbs-datastore = { path = "../pbs-datastore" }
 pbs-fuse-loop = { path = "../pbs-fuse-loop" }
+pbs-pxar-fuse = { path = "../pbs-pxar-fuse" }
 pbs-tools = { path = "../pbs-tools" }
index 6566147e8bd33c1048db135448856f5bb398d51d..8c8c1458dfe936999ad86b6f36addfb7dc55ff1f 100644 (file)
@@ -219,8 +219,8 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
     );
     let reader = BufferedDynamicReader::new(index, chunk_reader);
     let archive_size = reader.archive_size();
-    let reader: pbs_client::pxar::fuse::Reader = Arc::new(BufferedDynamicReadAt::new(reader));
-    let decoder = pbs_client::pxar::fuse::Accessor::new(reader, archive_size).await?;
+    let reader: pbs_pxar_fuse::Reader = Arc::new(BufferedDynamicReadAt::new(reader));
+    let decoder = pbs_pxar_fuse::Accessor::new(reader, archive_size).await?;
 
     client.download(CATALOG_NAME, &mut tmpfile).await?;
     let index = DynamicIndexReader::new(tmpfile)
index 385c23ed7c510ac5ee4f2a674fe2250aa6963126..afec2553fdd03ba872b16b2656a4a718a7eca404 100644 (file)
@@ -295,16 +295,12 @@ async fn mount_do(param: Value, pipe: Option<OwnedFd>) -> Result<Value, Error> {
         );
         let reader = BufferedDynamicReader::new(index, chunk_reader);
         let archive_size = reader.archive_size();
-        let reader: pbs_client::pxar::fuse::Reader = Arc::new(BufferedDynamicReadAt::new(reader));
-        let decoder = pbs_client::pxar::fuse::Accessor::new(reader, archive_size).await?;
-
-        let session = pbs_client::pxar::fuse::Session::mount(
-            decoder,
-            options,
-            false,
-            Path::new(target.unwrap()),
-        )
-        .map_err(|err| format_err!("pxar mount failed: {}", err))?;
+        let reader: pbs_pxar_fuse::Reader = Arc::new(BufferedDynamicReadAt::new(reader));
+        let decoder = pbs_pxar_fuse::Accessor::new(reader, archive_size).await?;
+
+        let session =
+            pbs_pxar_fuse::Session::mount(decoder, options, false, Path::new(target.unwrap()))
+                .map_err(|err| format_err!("pxar mount failed: {}", err))?;
 
         daemonize()?;
 
index 4c8cefefa354f87b7536d18ebe85a1b3df00b233..f025e02c56c8ab7415156815b7f76adcab88686f 100644 (file)
@@ -17,12 +17,13 @@ serde_json = "1.0"
 tokio = { version = "1.6", features = [ "rt", "rt-multi-thread" ] }
 
 pathpatterns = "0.1.2"
-#proxmox = "0.15.3"
+pxar = { version = "0.10.2", features = [ "tokio-io" ] }
+
 proxmox-async = "0.4"
-proxmox-schema = { version = "1.3.1", features = [ "api-macro" ] }
 proxmox-router = "1.3.0"
+proxmox-schema = { version = "1.3.1", features = [ "api-macro" ] }
 proxmox-sys = "0.4.1"
-pxar = { version = "0.10.2", features = [ "tokio-io" ] }
 
 pbs-client = { path = "../pbs-client" }
+pbs-pxar-fuse = { path = "../pbs-pxar-fuse" }
 pbs-tools = { path = "../pbs-tools" }
index 9d7cf9def5113376b505a641667a8e4a9c232fbf..908873211496ff9e2fec58494e5a7710ae06e371 100644 (file)
@@ -12,9 +12,7 @@ use futures::select;
 use tokio::signal::unix::{signal, SignalKind};
 
 use pathpatterns::{MatchEntry, MatchType, PatternFlag};
-use pbs_client::pxar::{
-    format_single_line_entry, fuse, Flags, PxarExtractOptions, ENCODER_MAX_ENTRIES,
-};
+use pbs_client::pxar::{format_single_line_entry, Flags, PxarExtractOptions, ENCODER_MAX_ENTRIES};
 
 use proxmox_router::cli::*;
 use proxmox_schema::api;
@@ -382,7 +380,7 @@ async fn mount_archive(archive: String, mountpoint: String, verbose: bool) -> Re
     let mountpoint = Path::new(&mountpoint);
     let options = OsStr::new("ro,default_permissions");
 
-    let session = fuse::Session::mount_path(archive, options, verbose, mountpoint)
+    let session = pbs_pxar_fuse::Session::mount_path(archive, options, verbose, mountpoint)
         .await
         .map_err(|err| format_err!("pxar mount failed: {}", err))?;