]> git.proxmox.com Git - proxmox-backup.git/blob - src/pxar/fuse.rs
a5001cbe33bbf6b54b523f4c3cb419a737bcb378
[proxmox-backup.git] / src / pxar / fuse.rs
1 //! Asynchronous fuse implementation.
2
3 use std::collections::BTreeMap;
4 use std::convert::TryFrom;
5 use std::ffi::{OsStr, OsString};
6 use std::future::Future;
7 use std::io;
8 use std::mem;
9 use std::ops::Range;
10 use std::os::unix::ffi::OsStrExt;
11 use std::path::Path;
12 use std::pin::Pin;
13 use std::sync::atomic::{AtomicUsize, Ordering};
14 use std::sync::{Arc, RwLock};
15 use std::task::{Context, Poll};
16
17 use anyhow::{format_err, Error};
18 use futures::channel::mpsc::UnboundedSender;
19 use futures::select;
20 use futures::sink::SinkExt;
21 use futures::stream::{StreamExt, TryStreamExt};
22
23 use proxmox::tools::vec;
24 use pxar::accessor::{self, EntryRangeInfo, ReadAt};
25
26 use proxmox_fuse::requests::{self, FuseRequest};
27 use proxmox_fuse::{EntryParam, Fuse, ReplyBufState, Request, ROOT_ID};
28
29 use crate::tools::xattr;
30
31 /// We mark inodes for regular files this way so we know how to access them.
32 const NON_DIRECTORY_INODE: u64 = 1u64 << 63;
33
34 #[inline]
35 fn is_dir_inode(inode: u64) -> bool {
36 0 == (inode & NON_DIRECTORY_INODE)
37 }
38
39 /// Our reader type instance used for accessors.
40 pub type Reader = Arc<dyn ReadAt + Send + Sync + 'static>;
41
42 /// Our Accessor type instance.
43 pub type Accessor = accessor::aio::Accessor<Reader>;
44
45 /// Our Directory type instance.
46 pub type Directory = accessor::aio::Directory<Reader>;
47
48 /// Our FileEntry type instance.
49 pub type FileEntry = accessor::aio::FileEntry<Reader>;
50
51 /// Our FileContents type instance.
52 pub type FileContents = accessor::aio::FileContents<Reader>;
53
54 pub struct Session {
55 fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send + Sync + 'static>>,
56 }
57
58 impl Session {
59 /// Create a fuse session for an archive.
60 pub async fn mount_path(
61 archive_path: &Path,
62 options: &OsStr,
63 verbose: bool,
64 mountpoint: &Path,
65 ) -> Result<Self, Error> {
66 // TODO: Add a buffered/caching ReadAt layer?
67 let file = std::fs::File::open(archive_path)?;
68 let file_size = file.metadata()?.len();
69 let reader: Reader = Arc::new(accessor::sync::FileReader::new(file));
70 let accessor = Accessor::new(reader, file_size).await?;
71 Self::mount(accessor, options, verbose, mountpoint)
72 }
73
74 /// Create a new fuse session for the given pxar `Accessor`.
75 pub fn mount(
76 accessor: Accessor,
77 options: &OsStr,
78 verbose: bool,
79 path: &Path,
80 ) -> Result<Self, Error> {
81 let fuse = Fuse::builder("pxar-mount")?
82 .debug()
83 .options_os(options)?
84 .enable_readdirplus()
85 .enable_read()
86 .enable_readlink()
87 .enable_read_xattr()
88 .build()?
89 .mount(path)?;
90
91 let session = SessionImpl::new(accessor, verbose);
92
93 Ok(Self {
94 fut: Box::pin(session.main(fuse)),
95 })
96 }
97 }
98
99 impl Future for Session {
100 type Output = Result<(), Error>;
101
102 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
103 Pin::new(&mut self.fut).poll(cx)
104 }
105 }
106
107 /// We use this to return an errno value back to the kernel.
108 macro_rules! io_return {
109 ($errno:expr) => {
110 return Err(::std::io::Error::from_raw_os_error($errno).into());
111 };
112 }
113
114 /// Format an "other" error, see `io_bail` below for details.
115 macro_rules! io_format_err {
116 ($($fmt:tt)*) => {
117 ::std::io::Error::new(::std::io::ErrorKind::Other, format!($($fmt)*))
118 }
119 }
120
121 /// We use this to bail out of a functionin an unexpected error case. This will cause the fuse
122 /// request to be answered with a generic `EIO` error code. The error message contained in here
123 /// will be printed to stdout if the verbose flag is used, otherwise silently dropped.
124 macro_rules! io_bail {
125 ($($fmt:tt)*) => { return Err(io_format_err!($($fmt)*).into()); }
126 }
127
128 /// This is what we need to cache as a "lookup" entry. The kernel assumes that these are easily
129 /// accessed.
130 struct Lookup {
131 refs: AtomicUsize,
132
133 inode: u64,
134 parent: u64,
135 entry_range_info: EntryRangeInfo,
136 content_range: Option<Range<u64>>,
137 }
138
139 impl Lookup {
140 fn new(
141 inode: u64,
142 parent: u64,
143 entry_range_info: EntryRangeInfo,
144 content_range: Option<Range<u64>>,
145 ) -> Box<Lookup> {
146 Box::new(Self {
147 refs: AtomicUsize::new(1),
148 inode,
149 parent,
150 entry_range_info,
151 content_range,
152 })
153 }
154
155 /// Decrease the reference count by `count`. Note that this must not include the reference held
156 /// by `self` itself, so this must not decrease the count below 2.
157 fn forget(&self, count: usize) -> Result<(), Error> {
158 loop {
159 let old = self.refs.load(Ordering::Acquire);
160 if count >= old {
161 io_bail!("reference count underflow");
162 }
163 let new = old - count;
164 match self
165 .refs
166 .compare_exchange(old, new, Ordering::SeqCst, Ordering::SeqCst)
167 {
168 Ok(_) => break Ok(()),
169 Err(_) => continue,
170 }
171 }
172 }
173
174 fn get_ref<'a>(&self, session: &'a SessionImpl) -> LookupRef<'a> {
175 if self.refs.fetch_add(1, Ordering::AcqRel) == 0 {
176 panic!("atomic refcount increased from 0 to 1");
177 }
178
179 LookupRef {
180 session,
181 lookup: self as *const Lookup,
182 }
183 }
184 }
185
186 struct LookupRef<'a> {
187 session: &'a SessionImpl,
188 lookup: *const Lookup,
189 }
190
191 unsafe impl<'a> Send for LookupRef<'a> {}
192 unsafe impl<'a> Sync for LookupRef<'a> {}
193
194 impl<'a> Clone for LookupRef<'a> {
195 fn clone(&self) -> Self {
196 self.get_ref(self.session)
197 }
198 }
199
200 impl<'a> std::ops::Deref for LookupRef<'a> {
201 type Target = Lookup;
202
203 fn deref(&self) -> &Self::Target {
204 unsafe { &*self.lookup }
205 }
206 }
207
208 impl<'a> Drop for LookupRef<'a> {
209 fn drop(&mut self) {
210 if self.lookup.is_null() {
211 return;
212 }
213
214 if self.refs.fetch_sub(1, Ordering::AcqRel) == 1 {
215 let inode = self.inode;
216 drop(self.session.lookups.write().unwrap().remove(&inode));
217 }
218 }
219 }
220
221 impl<'a> LookupRef<'a> {
222 fn leak(mut self) -> &'a Lookup {
223 unsafe { &*mem::replace(&mut self.lookup, std::ptr::null()) }
224 }
225 }
226
227 struct SessionImpl {
228 accessor: Accessor,
229 verbose: bool,
230 lookups: RwLock<BTreeMap<u64, Box<Lookup>>>,
231 }
232
233 impl SessionImpl {
234 fn new(accessor: Accessor, verbose: bool) -> Self {
235 let root = Lookup::new(
236 ROOT_ID,
237 ROOT_ID,
238 EntryRangeInfo::toplevel(0..accessor.size()),
239 None,
240 );
241
242 let mut tree = BTreeMap::new();
243 tree.insert(ROOT_ID, root);
244
245 Self {
246 accessor,
247 verbose,
248 lookups: RwLock::new(tree),
249 }
250 }
251
252 /// Here's how we deal with errors:
253 ///
254 /// Any error will be printed if the verbose flag was set, otherwise the message will be
255 /// silently dropped.
256 ///
257 /// Opaque errors will cause the fuse main loop to bail out with that error.
258 ///
259 /// `io::Error`s will cause the fuse request to responded to with the given `io::Error`. An
260 /// `io::ErrorKind::Other` translates to a generic `EIO`.
261 async fn handle_err(
262 &self,
263 request: impl FuseRequest,
264 err: Error,
265 mut sender: UnboundedSender<Error>,
266 ) {
267 let final_result = match err.downcast::<io::Error>() {
268 Ok(err) => {
269 if err.kind() == io::ErrorKind::Other && self.verbose {
270 eprintln!("an IO error occurred: {}", err);
271 }
272
273 // fail the request
274 request.io_fail(err).map_err(Error::from)
275 }
276 Err(err) => {
277 // `bail` (non-`io::Error`) is used for fatal errors which should actually cancel:
278 if self.verbose {
279 eprintln!("internal error: {}, bailing out", err);
280 }
281 Err(err)
282 }
283 };
284 if let Err(err) = final_result {
285 // either we failed to send the error code to fuse, or the above was not an
286 // `io::Error`, so in this case notify the main loop:
287 sender
288 .send(err)
289 .await
290 .expect("failed to propagate error to main loop");
291 }
292 }
293
294 async fn main(self, fuse: Fuse) -> Result<(), Error> {
295 Arc::new(self).main_do(fuse).await
296 }
297
298 async fn main_do(self: Arc<Self>, fuse: Fuse) -> Result<(), Error> {
299 let (err_send, mut err_recv) = futures::channel::mpsc::unbounded::<Error>();
300 let mut fuse = fuse.fuse(); // make this a futures::stream::FusedStream!
301 loop {
302 select! {
303 request = fuse.try_next() => match request? {
304 Some(request) => {
305 tokio::spawn(Arc::clone(&self).handle_request(request, err_send.clone()));
306 }
307 None => break,
308 },
309 err = err_recv.next() => match err {
310 Some(err) => if self.verbose {
311 eprintln!("cancelling fuse main loop due to error: {}", err);
312 return Err(err);
313 },
314 None => panic!("error channel was closed unexpectedly"),
315 },
316 }
317 }
318 Ok(())
319 }
320
321 async fn handle_request(
322 self: Arc<Self>,
323 request: Request,
324 mut err_sender: UnboundedSender<Error>,
325 ) {
326 let result: Result<(), Error> = match request {
327 Request::Lookup(request) => {
328 match self.lookup(request.parent, &request.file_name).await {
329 Ok((entry, lookup)) => match request.reply(&entry) {
330 Ok(()) => {
331 lookup.leak();
332 Ok(())
333 }
334 Err(err) => Err(Error::from(err)),
335 },
336 Err(err) => return self.handle_err(request, err, err_sender).await,
337 }
338 }
339 Request::Forget(request) => match self.forget(request.inode, request.count as usize) {
340 Ok(()) => {
341 request.reply();
342 Ok(())
343 }
344 Err(err) => return self.handle_err(request, err, err_sender).await,
345 },
346 Request::Getattr(request) => match self.getattr(request.inode).await {
347 Ok(stat) => request.reply(&stat, std::f64::MAX).map_err(Error::from),
348 Err(err) => return self.handle_err(request, err, err_sender).await,
349 },
350 Request::ReaddirPlus(mut request) => match self.readdirplus(&mut request).await {
351 Ok(lookups) => match request.reply() {
352 Ok(()) => {
353 for i in lookups {
354 i.leak();
355 }
356 Ok(())
357 }
358 Err(err) => Err(Error::from(err)),
359 },
360 Err(err) => return self.handle_err(request, err, err_sender).await,
361 },
362 Request::Read(request) => {
363 match self.read(request.inode, request.size, request.offset).await {
364 Ok(data) => request.reply(&data).map_err(Error::from),
365 Err(err) => return self.handle_err(request, err, err_sender).await,
366 }
367 }
368 Request::Readlink(request) => match self.readlink(request.inode).await {
369 Ok(data) => request.reply(&data).map_err(Error::from),
370 Err(err) => return self.handle_err(request, err, err_sender).await,
371 },
372 Request::ListXAttrSize(request) => match self.listxattrs(request.inode).await {
373 Ok(data) => request
374 .reply(
375 data.into_iter()
376 .fold(0, |sum, i| sum + i.name().to_bytes_with_nul().len()),
377 )
378 .map_err(Error::from),
379 Err(err) => return self.handle_err(request, err, err_sender).await,
380 },
381 Request::ListXAttr(mut request) => match self.listxattrs_into(&mut request).await {
382 Ok(ReplyBufState::Ok) => request.reply().map_err(Error::from),
383 Ok(ReplyBufState::Full) => request.fail_full().map_err(Error::from),
384 Err(err) => return self.handle_err(request, err, err_sender).await,
385 },
386 Request::GetXAttrSize(request) => {
387 match self.getxattr(request.inode, &request.attr_name).await {
388 Ok(xattr) => request.reply(xattr.value().len()).map_err(Error::from),
389 Err(err) => return self.handle_err(request, err, err_sender).await,
390 }
391 }
392 Request::GetXAttr(request) => {
393 match self.getxattr(request.inode, &request.attr_name).await {
394 Ok(xattr) => request.reply(xattr.value()).map_err(Error::from),
395 Err(err) => return self.handle_err(request, err, err_sender).await,
396 }
397 }
398 other => {
399 if self.verbose {
400 eprintln!("Received unexpected fuse request");
401 }
402 other.fail(libc::ENOSYS).map_err(Error::from)
403 }
404 };
405
406 if let Err(err) = result {
407 err_sender
408 .send(err)
409 .await
410 .expect("failed to propagate error to main loop");
411 }
412 }
413
414 fn get_lookup(&self, inode: u64) -> Result<LookupRef, Error> {
415 let lookups = self.lookups.read().unwrap();
416 if let Some(lookup) = lookups.get(&inode) {
417 return Ok(lookup.get_ref(self));
418 }
419 io_return!(libc::ENOENT);
420 }
421
422 async fn open_dir(&self, inode: u64) -> Result<Directory, Error> {
423 if inode == ROOT_ID {
424 Ok(self.accessor.open_root().await?)
425 } else if !is_dir_inode(inode) {
426 io_return!(libc::ENOTDIR);
427 } else {
428 Ok(unsafe { self.accessor.open_dir_at_end(inode).await? })
429 }
430 }
431
432 async fn open_entry(&self, lookup: &LookupRef<'_>) -> io::Result<FileEntry> {
433 unsafe {
434 self.accessor
435 .open_file_at_range(&lookup.entry_range_info)
436 .await
437 }
438 }
439
440 fn open_content(&self, lookup: &LookupRef) -> Result<FileContents, Error> {
441 if is_dir_inode(lookup.inode) {
442 io_return!(libc::EISDIR);
443 }
444
445 match lookup.content_range.clone() {
446 Some(range) => Ok(unsafe { self.accessor.open_contents_at_range(range) }),
447 None => io_return!(libc::EBADF),
448 }
449 }
450
451 fn make_lookup(&self, parent: u64, inode: u64, entry: &FileEntry) -> Result<LookupRef, Error> {
452 let lookups = self.lookups.read().unwrap();
453 if let Some(lookup) = lookups.get(&inode) {
454 return Ok(lookup.get_ref(self));
455 }
456 drop(lookups);
457
458 let entry = Lookup::new(
459 inode,
460 parent,
461 entry.entry_range_info().clone(),
462 entry.content_range()?,
463 );
464 let reference = entry.get_ref(self);
465 entry.refs.store(1, Ordering::Release);
466
467 let mut lookups = self.lookups.write().unwrap();
468 if let Some(lookup) = lookups.get(&inode) {
469 return Ok(lookup.get_ref(self));
470 }
471
472 lookups.insert(inode, entry);
473 drop(lookups);
474 Ok(reference)
475 }
476
477 fn forget(&self, inode: u64, count: usize) -> Result<(), Error> {
478 let node = self.get_lookup(inode)?;
479 node.forget(count)?;
480 Ok(())
481 }
482
483 async fn lookup(
484 &'_ self,
485 parent: u64,
486 file_name: &OsStr,
487 ) -> Result<(EntryParam, LookupRef<'_>), Error> {
488 let dir = self.open_dir(parent).await?;
489
490 let entry = match { dir }.lookup(file_name).await? {
491 Some(entry) => entry,
492 None => io_return!(libc::ENOENT),
493 };
494
495 let entry = if let pxar::EntryKind::Hardlink(_) = entry.kind() {
496 // we don't know the file's end-offset, so we'll just allow the decoder to decode the
497 // entire rest of the archive until we figure out something better...
498 let entry = self.accessor.follow_hardlink(&entry).await?;
499
500 if let pxar::EntryKind::Hardlink(_) = entry.kind() {
501 // hardlinks must not point to other hardlinks...
502 io_return!(libc::ELOOP);
503 }
504
505 entry
506 } else {
507 entry
508 };
509
510 let response = to_entry(&entry)?;
511 let inode = response.inode;
512 Ok((response, self.make_lookup(parent, inode, &entry)?))
513 }
514
515 async fn getattr(&self, inode: u64) -> Result<libc::stat, Error> {
516 let entry = unsafe {
517 self.accessor.open_file_at_range(&self.get_lookup(inode)?.entry_range_info).await?
518 };
519 to_stat(inode, &entry)
520 }
521
522 async fn readdirplus(
523 &'_ self,
524 request: &mut requests::ReaddirPlus,
525 ) -> Result<Vec<LookupRef<'_>>, Error> {
526 let mut lookups = Vec::new();
527 let offset = usize::try_from(request.offset)
528 .map_err(|_| io_format_err!("directory offset out of range"))?;
529
530 let dir = self.open_dir(request.inode).await?;
531 let dir_lookup = self.get_lookup(request.inode)?;
532
533 let entry_count = dir.read_dir().count() as isize;
534
535 let mut next = offset as isize;
536 let mut iter = dir.read_dir().skip(offset);
537 while let Some(file) = iter.next().await {
538 next += 1;
539 let file = file?.decode_entry().await?;
540 let stat = to_stat(to_inode(&file), &file)?;
541 let name = file.file_name();
542 match request.add_entry(name, &stat, next, 1, std::f64::MAX, std::f64::MAX)? {
543 ReplyBufState::Ok => (),
544 ReplyBufState::Full => return Ok(lookups),
545 }
546 lookups.push(self.make_lookup(request.inode, stat.st_ino, &file)?);
547 }
548
549 if next == entry_count {
550 next += 1;
551 let file = dir.lookup_self().await?;
552 let stat = to_stat(to_inode(&file), &file)?;
553 let name = OsStr::new(".");
554 match request.add_entry(name, &stat, next, 1, std::f64::MAX, std::f64::MAX)? {
555 ReplyBufState::Ok => (),
556 ReplyBufState::Full => return Ok(lookups),
557 }
558 lookups.push(LookupRef::clone(&dir_lookup));
559 }
560
561 if next == entry_count + 1 {
562 next += 1;
563 let lookup = self.get_lookup(dir_lookup.parent)?;
564 let parent_dir = self.open_dir(lookup.inode).await?;
565 let file = parent_dir.lookup_self().await?;
566 let stat = to_stat(to_inode(&file), &file)?;
567 let name = OsStr::new("..");
568 match request.add_entry(name, &stat, next, 1, std::f64::MAX, std::f64::MAX)? {
569 ReplyBufState::Ok => (),
570 ReplyBufState::Full => return Ok(lookups),
571 }
572 lookups.push(lookup);
573 }
574
575 Ok(lookups)
576 }
577
578 async fn read(&self, inode: u64, len: usize, offset: u64) -> Result<Vec<u8>, Error> {
579 let file = self.get_lookup(inode)?;
580 let content = self.open_content(&file)?;
581 let mut buf = vec::undefined(len);
582 let got = content.read_at(&mut buf, offset).await?;
583 buf.truncate(got);
584 Ok(buf)
585 }
586
587 async fn readlink(&self, inode: u64) -> Result<OsString, Error> {
588 let lookup = self.get_lookup(inode)?;
589 let file = self.open_entry(&lookup).await?;
590 match file.get_symlink() {
591 None => io_return!(libc::EINVAL),
592 Some(link) => Ok(link.to_owned()),
593 }
594 }
595
596 async fn listxattrs(&self, inode: u64) -> Result<Vec<pxar::format::XAttr>, Error> {
597 let lookup = self.get_lookup(inode)?;
598 let metadata = self
599 .open_entry(&lookup)
600 .await?
601 .into_entry()
602 .into_metadata();
603
604 let mut xattrs = metadata.xattrs;
605
606 use pxar::format::XAttr;
607
608 if let Some(fcaps) = metadata.fcaps {
609 xattrs.push(XAttr::new(xattr::xattr_name_fcaps().to_bytes(), fcaps.data));
610 }
611
612 // TODO: Special cases:
613 // b"system.posix_acl_access
614 // b"system.posix_acl_default
615 //
616 // For these we need to be able to create posix acl format entries, at that point we could
617 // just ditch libacl as well...
618
619 Ok(xattrs)
620 }
621
622 async fn listxattrs_into(
623 &self,
624 request: &mut requests::ListXAttr,
625 ) -> Result<ReplyBufState, Error> {
626 let xattrs = self.listxattrs(request.inode).await?;
627
628 for entry in xattrs {
629 match request.add_c_string(entry.name()) {
630 ReplyBufState::Ok => (),
631 ReplyBufState::Full => return Ok(ReplyBufState::Full),
632 }
633 }
634
635 Ok(ReplyBufState::Ok)
636 }
637
638 async fn getxattr(&self, inode: u64, xattr: &OsStr) -> Result<pxar::format::XAttr, Error> {
639 // TODO: pxar::Accessor could probably get a more optimized method to fetch a specific
640 // xattr for an entry...
641 let xattrs = self.listxattrs(inode).await?;
642 for entry in xattrs {
643 if entry.name().to_bytes() == xattr.as_bytes() {
644 return Ok(entry);
645 }
646 }
647 io_return!(libc::ENODATA);
648 }
649 }
650
651 #[inline]
652 fn to_entry(entry: &FileEntry) -> Result<EntryParam, Error> {
653 to_entry_param(to_inode(&entry), &entry)
654 }
655
656 #[inline]
657 fn to_inode(entry: &FileEntry) -> u64 {
658 if entry.is_dir() {
659 entry.entry_range_info().entry_range.end
660 } else {
661 entry.entry_range_info().entry_range.start | NON_DIRECTORY_INODE
662 }
663 }
664
665 fn to_entry_param(inode: u64, entry: &pxar::Entry) -> Result<EntryParam, Error> {
666 Ok(EntryParam::simple(inode, to_stat(inode, entry)?))
667 }
668
669 fn to_stat(inode: u64, entry: &pxar::Entry) -> Result<libc::stat, Error> {
670 let nlink = if entry.is_dir() { 2 } else { 1 };
671
672 let metadata = entry.metadata();
673
674 let mut stat: libc::stat = unsafe { mem::zeroed() };
675 stat.st_ino = inode;
676 stat.st_nlink = nlink;
677 stat.st_mode = u32::try_from(metadata.stat.mode)
678 .map_err(|err| format_err!("mode does not fit into st_mode field: {}", err))?;
679 stat.st_size = i64::try_from(entry.file_size().unwrap_or(0))
680 .map_err(|err| format_err!("size does not fit into st_size field: {}", err))?;
681 stat.st_uid = metadata.stat.uid;
682 stat.st_gid = metadata.stat.gid;
683 stat.st_atime = metadata.stat.mtime.secs;
684 stat.st_atime_nsec = metadata.stat.mtime.nanos as _;
685 stat.st_mtime = metadata.stat.mtime.secs;
686 stat.st_mtime_nsec = metadata.stat.mtime.nanos as _;
687 stat.st_ctime = metadata.stat.mtime.secs;
688 stat.st_ctime_nsec = metadata.stat.mtime.nanos as _;
689 Ok(stat)
690 }