]> git.proxmox.com Git - proxmox-backup.git/blob - pbs-client/src/pxar/fuse.rs
update to proxmox-sys 0.2 crate
[proxmox-backup.git] / pbs-client / src / pxar / fuse.rs
1 //! Asynchronous fuse implementation.
2
3 use std::collections::BTreeMap;
4 use std::convert::TryFrom;
5 use std::ffi::{OsStr, OsString};
6 use std::future::Future;
7 use std::io;
8 use std::mem;
9 use std::ops::Range;
10 use std::os::unix::ffi::OsStrExt;
11 use std::path::Path;
12 use std::pin::Pin;
13 use std::sync::atomic::{AtomicUsize, Ordering};
14 use std::sync::{Arc, RwLock};
15 use std::task::{Context, Poll};
16
17 use anyhow::{format_err, Error};
18 use futures::channel::mpsc::UnboundedSender;
19 use futures::select;
20 use futures::sink::SinkExt;
21 use futures::stream::{StreamExt, TryStreamExt};
22
23 use proxmox_io::vec;
24 use pxar::accessor::{self, EntryRangeInfo, ReadAt};
25
26 use proxmox_fuse::requests::{self, FuseRequest};
27 use proxmox_fuse::{EntryParam, Fuse, ReplyBufState, Request, ROOT_ID};
28 use proxmox_sys::fs::xattr;
29
30 /// We mark inodes for regular files this way so we know how to access them.
31 const NON_DIRECTORY_INODE: u64 = 1u64 << 63;
32
33 #[inline]
34 fn is_dir_inode(inode: u64) -> bool {
35 0 == (inode & NON_DIRECTORY_INODE)
36 }
37
38 /// Our reader type instance used for accessors.
39 pub type Reader = Arc<dyn ReadAt + Send + Sync + 'static>;
40
41 /// Our Accessor type instance.
42 pub type Accessor = accessor::aio::Accessor<Reader>;
43
44 /// Our Directory type instance.
45 pub type Directory = accessor::aio::Directory<Reader>;
46
47 /// Our FileEntry type instance.
48 pub type FileEntry = accessor::aio::FileEntry<Reader>;
49
50 /// Our FileContents type instance.
51 pub type FileContents = accessor::aio::FileContents<Reader>;
52
53 pub struct Session {
54 fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send + Sync + 'static>>,
55 }
56
57 impl Session {
58 /// Create a fuse session for an archive.
59 pub async fn mount_path(
60 archive_path: &Path,
61 options: &OsStr,
62 verbose: bool,
63 mountpoint: &Path,
64 ) -> Result<Self, Error> {
65 // TODO: Add a buffered/caching ReadAt layer?
66 let file = std::fs::File::open(archive_path)?;
67 let file_size = file.metadata()?.len();
68 let reader: Reader = Arc::new(accessor::sync::FileReader::new(file));
69 let accessor = Accessor::new(reader, file_size).await?;
70 Self::mount(accessor, options, verbose, mountpoint)
71 }
72
73 /// Create a new fuse session for the given pxar `Accessor`.
74 pub fn mount(
75 accessor: Accessor,
76 options: &OsStr,
77 verbose: bool,
78 path: &Path,
79 ) -> Result<Self, Error> {
80 let fuse = Fuse::builder("pxar-mount")?
81 .debug()
82 .options_os(options)?
83 .enable_readdirplus()
84 .enable_read()
85 .enable_readlink()
86 .enable_read_xattr()
87 .build()?
88 .mount(path)?;
89
90 let session = SessionImpl::new(accessor, verbose);
91
92 Ok(Self {
93 fut: Box::pin(session.main(fuse)),
94 })
95 }
96 }
97
98 impl Future for Session {
99 type Output = Result<(), Error>;
100
101 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
102 Pin::new(&mut self.fut).poll(cx)
103 }
104 }
105
106 /// We use this to return an errno value back to the kernel.
107 macro_rules! io_return {
108 ($errno:expr) => {
109 return Err(::std::io::Error::from_raw_os_error($errno).into());
110 };
111 }
112
113 /// Format an "other" error, see `io_bail` below for details.
114 macro_rules! io_format_err {
115 ($($fmt:tt)*) => {
116 ::std::io::Error::new(::std::io::ErrorKind::Other, format!($($fmt)*))
117 }
118 }
119
120 /// We use this to bail out of a functionin an unexpected error case. This will cause the fuse
121 /// request to be answered with a generic `EIO` error code. The error message contained in here
122 /// will be printed to stdout if the verbose flag is used, otherwise silently dropped.
123 macro_rules! io_bail {
124 ($($fmt:tt)*) => { return Err(io_format_err!($($fmt)*).into()); }
125 }
126
127 /// This is what we need to cache as a "lookup" entry. The kernel assumes that these are easily
128 /// accessed.
129 struct Lookup {
130 refs: AtomicUsize,
131
132 inode: u64,
133 parent: u64,
134 entry_range_info: EntryRangeInfo,
135 content_range: Option<Range<u64>>,
136 }
137
138 impl Lookup {
139 fn new(
140 inode: u64,
141 parent: u64,
142 entry_range_info: EntryRangeInfo,
143 content_range: Option<Range<u64>>,
144 ) -> Box<Lookup> {
145 Box::new(Self {
146 refs: AtomicUsize::new(1),
147 inode,
148 parent,
149 entry_range_info,
150 content_range,
151 })
152 }
153
154 /// Decrease the reference count by `count`. Note that this must not include the reference held
155 /// by `self` itself, so this must not decrease the count below 2.
156 fn forget(&self, count: usize) -> Result<(), Error> {
157 loop {
158 let old = self.refs.load(Ordering::Acquire);
159 if count >= old {
160 io_bail!("reference count underflow");
161 }
162 let new = old - count;
163 match self
164 .refs
165 .compare_exchange(old, new, Ordering::SeqCst, Ordering::SeqCst)
166 {
167 Ok(_) => break Ok(()),
168 Err(_) => continue,
169 }
170 }
171 }
172
173 fn get_ref<'a>(&self, session: &'a SessionImpl) -> LookupRef<'a> {
174 if self.refs.fetch_add(1, Ordering::AcqRel) == 0 {
175 panic!("atomic refcount increased from 0 to 1");
176 }
177
178 LookupRef {
179 session,
180 lookup: self as *const Lookup,
181 }
182 }
183 }
184
185 struct LookupRef<'a> {
186 session: &'a SessionImpl,
187 lookup: *const Lookup,
188 }
189
190 unsafe impl<'a> Send for LookupRef<'a> {}
191 unsafe impl<'a> Sync for LookupRef<'a> {}
192
193 impl<'a> Clone for LookupRef<'a> {
194 fn clone(&self) -> Self {
195 self.get_ref(self.session)
196 }
197 }
198
199 impl<'a> std::ops::Deref for LookupRef<'a> {
200 type Target = Lookup;
201
202 fn deref(&self) -> &Self::Target {
203 unsafe { &*self.lookup }
204 }
205 }
206
207 impl<'a> Drop for LookupRef<'a> {
208 fn drop(&mut self) {
209 if self.lookup.is_null() {
210 return;
211 }
212
213 if self.refs.fetch_sub(1, Ordering::AcqRel) == 1 {
214 let inode = self.inode;
215 drop(self.session.lookups.write().unwrap().remove(&inode));
216 }
217 }
218 }
219
220 impl<'a> LookupRef<'a> {
221 fn leak(mut self) -> &'a Lookup {
222 unsafe { &*mem::replace(&mut self.lookup, std::ptr::null()) }
223 }
224 }
225
226 struct SessionImpl {
227 accessor: Accessor,
228 verbose: bool,
229 lookups: RwLock<BTreeMap<u64, Box<Lookup>>>,
230 }
231
232 impl SessionImpl {
233 fn new(accessor: Accessor, verbose: bool) -> Self {
234 let root = Lookup::new(
235 ROOT_ID,
236 ROOT_ID,
237 EntryRangeInfo::toplevel(0..accessor.size()),
238 None,
239 );
240
241 let mut tree = BTreeMap::new();
242 tree.insert(ROOT_ID, root);
243
244 Self {
245 accessor,
246 verbose,
247 lookups: RwLock::new(tree),
248 }
249 }
250
251 /// Here's how we deal with errors:
252 ///
253 /// Any error will be printed if the verbose flag was set, otherwise the message will be
254 /// silently dropped.
255 ///
256 /// Opaque errors will cause the fuse main loop to bail out with that error.
257 ///
258 /// `io::Error`s will cause the fuse request to responded to with the given `io::Error`. An
259 /// `io::ErrorKind::Other` translates to a generic `EIO`.
260 async fn handle_err(
261 &self,
262 request: impl FuseRequest,
263 err: Error,
264 mut sender: UnboundedSender<Error>,
265 ) {
266 let final_result = match err.downcast::<io::Error>() {
267 Ok(err) => {
268 if err.kind() == io::ErrorKind::Other && self.verbose {
269 eprintln!("an IO error occurred: {}", err);
270 }
271
272 // fail the request
273 request.io_fail(err).map_err(Error::from)
274 }
275 Err(err) => {
276 // `bail` (non-`io::Error`) is used for fatal errors which should actually cancel:
277 if self.verbose {
278 eprintln!("internal error: {}, bailing out", err);
279 }
280 Err(err)
281 }
282 };
283 if let Err(err) = final_result {
284 // either we failed to send the error code to fuse, or the above was not an
285 // `io::Error`, so in this case notify the main loop:
286 sender
287 .send(err)
288 .await
289 .expect("failed to propagate error to main loop");
290 }
291 }
292
293 async fn main(self, fuse: Fuse) -> Result<(), Error> {
294 Arc::new(self).main_do(fuse).await
295 }
296
297 async fn main_do(self: Arc<Self>, fuse: Fuse) -> Result<(), Error> {
298 let (err_send, mut err_recv) = futures::channel::mpsc::unbounded::<Error>();
299 let mut fuse = fuse.fuse(); // make this a futures::stream::FusedStream!
300 loop {
301 select! {
302 request = fuse.try_next() => match request? {
303 Some(request) => {
304 tokio::spawn(Arc::clone(&self).handle_request(request, err_send.clone()));
305 }
306 None => break,
307 },
308 err = err_recv.next() => match err {
309 Some(err) => if self.verbose {
310 eprintln!("cancelling fuse main loop due to error: {}", err);
311 return Err(err);
312 },
313 None => panic!("error channel was closed unexpectedly"),
314 },
315 }
316 }
317 Ok(())
318 }
319
320 async fn handle_request(
321 self: Arc<Self>,
322 request: Request,
323 mut err_sender: UnboundedSender<Error>,
324 ) {
325 let result: Result<(), Error> = match request {
326 Request::Lookup(request) => {
327 match self.lookup(request.parent, &request.file_name).await {
328 Ok((entry, lookup)) => match request.reply(&entry) {
329 Ok(()) => {
330 lookup.leak();
331 Ok(())
332 }
333 Err(err) => Err(Error::from(err)),
334 },
335 Err(err) => return self.handle_err(request, err, err_sender).await,
336 }
337 }
338 Request::Forget(request) => match self.forget(request.inode, request.count as usize) {
339 Ok(()) => {
340 request.reply();
341 Ok(())
342 }
343 Err(err) => return self.handle_err(request, err, err_sender).await,
344 },
345 Request::Getattr(request) => match self.getattr(request.inode).await {
346 Ok(stat) => request.reply(&stat, f64::MAX).map_err(Error::from),
347 Err(err) => return self.handle_err(request, err, err_sender).await,
348 },
349 Request::ReaddirPlus(mut request) => match self.readdirplus(&mut request).await {
350 Ok(lookups) => match request.reply() {
351 Ok(()) => {
352 for i in lookups {
353 i.leak();
354 }
355 Ok(())
356 }
357 Err(err) => Err(Error::from(err)),
358 },
359 Err(err) => return self.handle_err(request, err, err_sender).await,
360 },
361 Request::Read(request) => {
362 match self.read(request.inode, request.size, request.offset).await {
363 Ok(data) => request.reply(&data).map_err(Error::from),
364 Err(err) => return self.handle_err(request, err, err_sender).await,
365 }
366 }
367 Request::Readlink(request) => match self.readlink(request.inode).await {
368 Ok(data) => request.reply(&data).map_err(Error::from),
369 Err(err) => return self.handle_err(request, err, err_sender).await,
370 },
371 Request::ListXAttrSize(request) => match self.listxattrs(request.inode).await {
372 Ok(data) => request
373 .reply(
374 data.into_iter()
375 .fold(0, |sum, i| sum + i.name().to_bytes_with_nul().len()),
376 )
377 .map_err(Error::from),
378 Err(err) => return self.handle_err(request, err, err_sender).await,
379 },
380 Request::ListXAttr(mut request) => match self.listxattrs_into(&mut request).await {
381 Ok(ReplyBufState::Ok) => request.reply().map_err(Error::from),
382 Ok(ReplyBufState::Full) => request.fail_full().map_err(Error::from),
383 Err(err) => return self.handle_err(request, err, err_sender).await,
384 },
385 Request::GetXAttrSize(request) => {
386 match self.getxattr(request.inode, &request.attr_name).await {
387 Ok(xattr) => request.reply(xattr.value().len()).map_err(Error::from),
388 Err(err) => return self.handle_err(request, err, err_sender).await,
389 }
390 }
391 Request::GetXAttr(request) => {
392 match self.getxattr(request.inode, &request.attr_name).await {
393 Ok(xattr) => request.reply(xattr.value()).map_err(Error::from),
394 Err(err) => return self.handle_err(request, err, err_sender).await,
395 }
396 }
397 other => {
398 if self.verbose {
399 eprintln!("Received unexpected fuse request");
400 }
401 other.fail(libc::ENOSYS).map_err(Error::from)
402 }
403 };
404
405 if let Err(err) = result {
406 err_sender
407 .send(err)
408 .await
409 .expect("failed to propagate error to main loop");
410 }
411 }
412
413 fn get_lookup(&self, inode: u64) -> Result<LookupRef, Error> {
414 let lookups = self.lookups.read().unwrap();
415 if let Some(lookup) = lookups.get(&inode) {
416 return Ok(lookup.get_ref(self));
417 }
418 io_return!(libc::ENOENT);
419 }
420
421 async fn open_dir(&self, inode: u64) -> Result<Directory, Error> {
422 if inode == ROOT_ID {
423 Ok(self.accessor.open_root().await?)
424 } else if !is_dir_inode(inode) {
425 io_return!(libc::ENOTDIR);
426 } else {
427 Ok(unsafe { self.accessor.open_dir_at_end(inode).await? })
428 }
429 }
430
431 async fn open_entry(&self, lookup: &LookupRef<'_>) -> io::Result<FileEntry> {
432 unsafe {
433 self.accessor
434 .open_file_at_range(&lookup.entry_range_info)
435 .await
436 }
437 }
438
439 fn open_content(&self, lookup: &LookupRef) -> Result<FileContents, Error> {
440 if is_dir_inode(lookup.inode) {
441 io_return!(libc::EISDIR);
442 }
443
444 match lookup.content_range.clone() {
445 Some(range) => Ok(unsafe { self.accessor.open_contents_at_range(range) }),
446 None => io_return!(libc::EBADF),
447 }
448 }
449
450 fn make_lookup(&self, parent: u64, inode: u64, entry: &FileEntry) -> Result<LookupRef, Error> {
451 let lookups = self.lookups.read().unwrap();
452 if let Some(lookup) = lookups.get(&inode) {
453 return Ok(lookup.get_ref(self));
454 }
455 drop(lookups);
456
457 let entry = Lookup::new(
458 inode,
459 parent,
460 entry.entry_range_info().clone(),
461 entry.content_range()?,
462 );
463 let reference = entry.get_ref(self);
464 entry.refs.store(1, Ordering::Release);
465
466 let mut lookups = self.lookups.write().unwrap();
467 if let Some(lookup) = lookups.get(&inode) {
468 return Ok(lookup.get_ref(self));
469 }
470
471 lookups.insert(inode, entry);
472 drop(lookups);
473 Ok(reference)
474 }
475
476 fn forget(&self, inode: u64, count: usize) -> Result<(), Error> {
477 let node = self.get_lookup(inode)?;
478 node.forget(count)?;
479 Ok(())
480 }
481
482 async fn lookup(
483 &'_ self,
484 parent: u64,
485 file_name: &OsStr,
486 ) -> Result<(EntryParam, LookupRef<'_>), Error> {
487 let dir = self.open_dir(parent).await?;
488
489 let entry = match { dir }.lookup(file_name).await? {
490 Some(entry) => entry,
491 None => io_return!(libc::ENOENT),
492 };
493
494 let entry = if let pxar::EntryKind::Hardlink(_) = entry.kind() {
495 // we don't know the file's end-offset, so we'll just allow the decoder to decode the
496 // entire rest of the archive until we figure out something better...
497 let entry = self.accessor.follow_hardlink(&entry).await?;
498
499 if let pxar::EntryKind::Hardlink(_) = entry.kind() {
500 // hardlinks must not point to other hardlinks...
501 io_return!(libc::ELOOP);
502 }
503
504 entry
505 } else {
506 entry
507 };
508
509 let response = to_entry(&entry)?;
510 let inode = response.inode;
511 Ok((response, self.make_lookup(parent, inode, &entry)?))
512 }
513
514 async fn getattr(&self, inode: u64) -> Result<libc::stat, Error> {
515 let entry = unsafe {
516 self.accessor.open_file_at_range(&self.get_lookup(inode)?.entry_range_info).await?
517 };
518 to_stat(inode, &entry)
519 }
520
521 async fn readdirplus(
522 &'_ self,
523 request: &mut requests::ReaddirPlus,
524 ) -> Result<Vec<LookupRef<'_>>, Error> {
525 let mut lookups = Vec::new();
526 let offset = usize::try_from(request.offset)
527 .map_err(|_| io_format_err!("directory offset out of range"))?;
528
529 let dir = self.open_dir(request.inode).await?;
530 let dir_lookup = self.get_lookup(request.inode)?;
531
532 let entry_count = dir.read_dir().count() as isize;
533
534 let mut next = offset as isize;
535 let mut iter = dir.read_dir().skip(offset);
536 while let Some(file) = iter.next().await {
537 next += 1;
538 let file = file?.decode_entry().await?;
539 let stat = to_stat(to_inode(&file), &file)?;
540 let name = file.file_name();
541 match request.add_entry(name, &stat, next, 1, f64::MAX, f64::MAX)? {
542 ReplyBufState::Ok => (),
543 ReplyBufState::Full => return Ok(lookups),
544 }
545 lookups.push(self.make_lookup(request.inode, stat.st_ino, &file)?);
546 }
547
548 if next == entry_count {
549 next += 1;
550 let file = dir.lookup_self().await?;
551 let stat = to_stat(to_inode(&file), &file)?;
552 let name = OsStr::new(".");
553 match request.add_entry(name, &stat, next, 1, f64::MAX, f64::MAX)? {
554 ReplyBufState::Ok => (),
555 ReplyBufState::Full => return Ok(lookups),
556 }
557 lookups.push(LookupRef::clone(&dir_lookup));
558 }
559
560 if next == entry_count + 1 {
561 next += 1;
562 let lookup = self.get_lookup(dir_lookup.parent)?;
563 let parent_dir = self.open_dir(lookup.inode).await?;
564 let file = parent_dir.lookup_self().await?;
565 let stat = to_stat(to_inode(&file), &file)?;
566 let name = OsStr::new("..");
567 match request.add_entry(name, &stat, next, 1, f64::MAX, f64::MAX)? {
568 ReplyBufState::Ok => (),
569 ReplyBufState::Full => return Ok(lookups),
570 }
571 lookups.push(lookup);
572 }
573
574 Ok(lookups)
575 }
576
577 async fn read(&self, inode: u64, len: usize, offset: u64) -> Result<Vec<u8>, Error> {
578 let file = self.get_lookup(inode)?;
579 let content = self.open_content(&file)?;
580 let mut buf = vec::undefined(len);
581 let got = content.read_at(&mut buf, offset).await?;
582 buf.truncate(got);
583 Ok(buf)
584 }
585
586 async fn readlink(&self, inode: u64) -> Result<OsString, Error> {
587 let lookup = self.get_lookup(inode)?;
588 let file = self.open_entry(&lookup).await?;
589 match file.get_symlink() {
590 None => io_return!(libc::EINVAL),
591 Some(link) => Ok(link.to_owned()),
592 }
593 }
594
595 async fn listxattrs(&self, inode: u64) -> Result<Vec<pxar::format::XAttr>, Error> {
596 let lookup = self.get_lookup(inode)?;
597 let metadata = self
598 .open_entry(&lookup)
599 .await?
600 .into_entry()
601 .into_metadata();
602
603 let mut xattrs = metadata.xattrs;
604
605 use pxar::format::XAttr;
606
607 if let Some(fcaps) = metadata.fcaps {
608 xattrs.push(XAttr::new(xattr::xattr_name_fcaps().to_bytes(), fcaps.data));
609 }
610
611 // TODO: Special cases:
612 // b"system.posix_acl_access
613 // b"system.posix_acl_default
614 //
615 // For these we need to be able to create posix acl format entries, at that point we could
616 // just ditch libacl as well...
617
618 Ok(xattrs)
619 }
620
621 async fn listxattrs_into(
622 &self,
623 request: &mut requests::ListXAttr,
624 ) -> Result<ReplyBufState, Error> {
625 let xattrs = self.listxattrs(request.inode).await?;
626
627 for entry in xattrs {
628 match request.add_c_string(entry.name()) {
629 ReplyBufState::Ok => (),
630 ReplyBufState::Full => return Ok(ReplyBufState::Full),
631 }
632 }
633
634 Ok(ReplyBufState::Ok)
635 }
636
637 async fn getxattr(&self, inode: u64, xattr: &OsStr) -> Result<pxar::format::XAttr, Error> {
638 // TODO: pxar::Accessor could probably get a more optimized method to fetch a specific
639 // xattr for an entry...
640 let xattrs = self.listxattrs(inode).await?;
641 for entry in xattrs {
642 if entry.name().to_bytes() == xattr.as_bytes() {
643 return Ok(entry);
644 }
645 }
646 io_return!(libc::ENODATA);
647 }
648 }
649
650 #[inline]
651 fn to_entry(entry: &FileEntry) -> Result<EntryParam, Error> {
652 to_entry_param(to_inode(&entry), &entry)
653 }
654
655 #[inline]
656 fn to_inode(entry: &FileEntry) -> u64 {
657 if entry.is_dir() {
658 entry.entry_range_info().entry_range.end
659 } else {
660 entry.entry_range_info().entry_range.start | NON_DIRECTORY_INODE
661 }
662 }
663
664 fn to_entry_param(inode: u64, entry: &pxar::Entry) -> Result<EntryParam, Error> {
665 Ok(EntryParam::simple(inode, to_stat(inode, entry)?))
666 }
667
668 fn to_stat(inode: u64, entry: &pxar::Entry) -> Result<libc::stat, Error> {
669 let nlink = if entry.is_dir() { 2 } else { 1 };
670
671 let metadata = entry.metadata();
672
673 let mut stat: libc::stat = unsafe { mem::zeroed() };
674 stat.st_ino = inode;
675 stat.st_nlink = nlink;
676 stat.st_mode = u32::try_from(metadata.stat.mode)
677 .map_err(|err| format_err!("mode does not fit into st_mode field: {}", err))?;
678 stat.st_size = i64::try_from(entry.file_size().unwrap_or(0))
679 .map_err(|err| format_err!("size does not fit into st_size field: {}", err))?;
680 stat.st_uid = metadata.stat.uid;
681 stat.st_gid = metadata.stat.gid;
682 stat.st_atime = metadata.stat.mtime.secs;
683 stat.st_atime_nsec = metadata.stat.mtime.nanos as _;
684 stat.st_mtime = metadata.stat.mtime.secs;
685 stat.st_mtime_nsec = metadata.stat.mtime.nanos as _;
686 stat.st_ctime = metadata.stat.mtime.secs;
687 stat.st_ctime_nsec = metadata.stat.mtime.nanos as _;
688 Ok(stat)
689 }