]> git.proxmox.com Git - proxmox-backup.git/blob - src/pxar/fuse.rs
require square brackets for ipv6 addresses
[proxmox-backup.git] / src / pxar / fuse.rs
1 //! Asynchronous fuse implementation.
2
3 use std::collections::BTreeMap;
4 use std::convert::TryFrom;
5 use std::ffi::{OsStr, OsString};
6 use std::future::Future;
7 use std::io;
8 use std::mem;
9 use std::ops::Range;
10 use std::os::unix::ffi::OsStrExt;
11 use std::path::Path;
12 use std::pin::Pin;
13 use std::sync::atomic::{AtomicUsize, Ordering};
14 use std::sync::{Arc, RwLock};
15 use std::task::{Context, Poll};
16
17 use anyhow::{format_err, Error};
18 use futures::channel::mpsc::UnboundedSender;
19 use futures::select;
20 use futures::sink::SinkExt;
21 use futures::stream::{StreamExt, TryStreamExt};
22
23 use proxmox::tools::vec;
24 use pxar::accessor::{self, EntryRangeInfo, ReadAt};
25
26 use proxmox_fuse::requests::{self, FuseRequest};
27 use proxmox_fuse::{EntryParam, Fuse, ReplyBufState, Request, ROOT_ID};
28
29 use crate::tools::xattr;
30
31 /// We mark inodes for regular files this way so we know how to access them.
32 const NON_DIRECTORY_INODE: u64 = 1u64 << 63;
33
34 #[inline]
35 fn is_dir_inode(inode: u64) -> bool {
36 0 == (inode & NON_DIRECTORY_INODE)
37 }
38
39 /// Our reader type instance used for accessors.
40 pub type Reader = Arc<dyn ReadAt + Send + Sync + 'static>;
41
42 /// Our Accessor type instance.
43 pub type Accessor = accessor::aio::Accessor<Reader>;
44
45 /// Our Directory type instance.
46 pub type Directory = accessor::aio::Directory<Reader>;
47
48 /// Our FileEntry type instance.
49 pub type FileEntry = accessor::aio::FileEntry<Reader>;
50
51 /// Our FileContents type instance.
52 pub type FileContents = accessor::aio::FileContents<Reader>;
53
54 pub struct Session {
55 fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send + Sync + 'static>>,
56 }
57
58 impl Session {
59 /// Create a fuse session for an archive.
60 pub async fn mount_path(
61 archive_path: &Path,
62 options: &OsStr,
63 verbose: bool,
64 mountpoint: &Path,
65 ) -> Result<Self, Error> {
66 // TODO: Add a buffered/caching ReadAt layer?
67 let file = std::fs::File::open(archive_path)?;
68 let file_size = file.metadata()?.len();
69 let reader: Reader = Arc::new(accessor::sync::FileReader::new(file));
70 let accessor = Accessor::new(reader, file_size).await?;
71 Self::mount(accessor, options, verbose, mountpoint)
72 }
73
74 /// Create a new fuse session for the given pxar `Accessor`.
75 pub fn mount(
76 accessor: Accessor,
77 options: &OsStr,
78 verbose: bool,
79 path: &Path,
80 ) -> Result<Self, Error> {
81 let fuse = Fuse::builder("pxar-mount")?
82 .debug()
83 .options_os(options)?
84 .enable_readdirplus()
85 .enable_read()
86 .enable_readlink()
87 .enable_read_xattr()
88 .build()?
89 .mount(path)?;
90
91 let session = SessionImpl::new(accessor, verbose);
92
93 Ok(Self {
94 fut: Box::pin(session.main(fuse)),
95 })
96 }
97 }
98
99 impl Future for Session {
100 type Output = Result<(), Error>;
101
102 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
103 Pin::new(&mut self.fut).poll(cx)
104 }
105 }
106
107 /// We use this to return an errno value back to the kernel.
108 macro_rules! io_return {
109 ($errno:expr) => {
110 return Err(::std::io::Error::from_raw_os_error($errno).into());
111 };
112 }
113
114 /// Format an "other" error, see `io_bail` below for details.
115 macro_rules! io_format_err {
116 ($($fmt:tt)*) => {
117 ::std::io::Error::new(::std::io::ErrorKind::Other, format!($($fmt)*))
118 }
119 }
120
121 /// We use this to bail out of a functionin an unexpected error case. This will cause the fuse
122 /// request to be answered with a generic `EIO` error code. The error message contained in here
123 /// will be printed to stdout if the verbose flag is used, otherwise silently dropped.
124 macro_rules! io_bail {
125 ($($fmt:tt)*) => { return Err(io_format_err!($($fmt)*).into()); }
126 }
127
128 /// This is what we need to cache as a "lookup" entry. The kernel assumes that these are easily
129 /// accessed.
130 struct Lookup {
131 refs: AtomicUsize,
132
133 inode: u64,
134 parent: u64,
135 entry_range_info: EntryRangeInfo,
136 content_range: Option<Range<u64>>,
137 }
138
139 impl Lookup {
140 fn new(
141 inode: u64,
142 parent: u64,
143 entry_range_info: EntryRangeInfo,
144 content_range: Option<Range<u64>>,
145 ) -> Box<Lookup> {
146 Box::new(Self {
147 refs: AtomicUsize::new(1),
148 inode,
149 parent,
150 entry_range_info,
151 content_range,
152 })
153 }
154
155 /// Decrease the reference count by `count`. Note that this must not include the reference held
156 /// by `self` itself, so this must not decrease the count below 2.
157 fn forget(&self, count: usize) -> Result<(), Error> {
158 loop {
159 let old = self.refs.load(Ordering::Acquire);
160 if count >= old {
161 io_bail!("reference count underflow");
162 }
163 let new = old - count;
164 match self
165 .refs
166 .compare_exchange(old, new, Ordering::SeqCst, Ordering::SeqCst)
167 {
168 Ok(_) => break Ok(()),
169 Err(_) => continue,
170 }
171 }
172 }
173
174 fn get_ref<'a>(&self, session: &'a SessionImpl) -> LookupRef<'a> {
175 if self.refs.fetch_add(1, Ordering::AcqRel) == 0 {
176 panic!("atomic refcount increased from 0 to 1");
177 }
178
179 LookupRef {
180 session,
181 lookup: self as *const Lookup,
182 }
183 }
184 }
185
186 struct LookupRef<'a> {
187 session: &'a SessionImpl,
188 lookup: *const Lookup,
189 }
190
191 unsafe impl<'a> Send for LookupRef<'a> {}
192 unsafe impl<'a> Sync for LookupRef<'a> {}
193
194 impl<'a> Clone for LookupRef<'a> {
195 fn clone(&self) -> Self {
196 self.get_ref(self.session)
197 }
198 }
199
200 impl<'a> std::ops::Deref for LookupRef<'a> {
201 type Target = Lookup;
202
203 fn deref(&self) -> &Self::Target {
204 unsafe { &*self.lookup }
205 }
206 }
207
208 impl<'a> Drop for LookupRef<'a> {
209 fn drop(&mut self) {
210 if self.lookup.is_null() {
211 return;
212 }
213
214 if self.refs.fetch_sub(1, Ordering::AcqRel) == 1 {
215 let inode = self.inode;
216 drop(self.session.lookups.write().unwrap().remove(&inode));
217 }
218 }
219 }
220
221 impl<'a> LookupRef<'a> {
222 fn leak(mut self) -> &'a Lookup {
223 unsafe { &*mem::replace(&mut self.lookup, std::ptr::null()) }
224 }
225 }
226
227 struct SessionImpl {
228 accessor: Accessor,
229 verbose: bool,
230 lookups: RwLock<BTreeMap<u64, Box<Lookup>>>,
231 }
232
233 impl SessionImpl {
234 fn new(accessor: Accessor, verbose: bool) -> Self {
235 let root = Lookup::new(
236 ROOT_ID,
237 ROOT_ID,
238 EntryRangeInfo::toplevel(0..accessor.size()),
239 None,
240 );
241
242 let mut tree = BTreeMap::new();
243 tree.insert(ROOT_ID, root);
244
245 Self {
246 accessor,
247 verbose,
248 lookups: RwLock::new(tree),
249 }
250 }
251
252 /// Here's how we deal with errors:
253 ///
254 /// Any error will be printed if the verbose flag was set, otherwise the message will be
255 /// silently dropped.
256 ///
257 /// Opaque errors will cause the fuse main loop to bail out with that error.
258 ///
259 /// `io::Error`s will cause the fuse request to responded to with the given `io::Error`. An
260 /// `io::ErrorKind::Other` translates to a generic `EIO`.
261 async fn handle_err(
262 &self,
263 request: impl FuseRequest,
264 err: Error,
265 mut sender: UnboundedSender<Error>,
266 ) {
267 let final_result = match err.downcast::<io::Error>() {
268 Ok(err) => {
269 if err.kind() == io::ErrorKind::Other {
270 if self.verbose {
271 eprintln!("an IO error occurred: {}", err);
272 }
273 }
274
275 // fail the request
276 request.io_fail(err).map_err(Error::from)
277 }
278 Err(err) => {
279 // `bail` (non-`io::Error`) is used for fatal errors which should actually cancel:
280 if self.verbose {
281 eprintln!("internal error: {}, bailing out", err);
282 }
283 Err(err)
284 }
285 };
286 if let Err(err) = final_result {
287 // either we failed to send the error code to fuse, or the above was not an
288 // `io::Error`, so in this case notify the main loop:
289 sender
290 .send(err)
291 .await
292 .expect("failed to propagate error to main loop");
293 }
294 }
295
296 async fn main(self, fuse: Fuse) -> Result<(), Error> {
297 Arc::new(self).main_do(fuse).await
298 }
299
300 async fn main_do(self: Arc<Self>, fuse: Fuse) -> Result<(), Error> {
301 let (err_send, mut err_recv) = futures::channel::mpsc::unbounded::<Error>();
302 let mut fuse = fuse.fuse(); // make this a futures::stream::FusedStream!
303 loop {
304 select! {
305 request = fuse.try_next() => match request? {
306 Some(request) => {
307 tokio::spawn(Arc::clone(&self).handle_request(request, err_send.clone()));
308 }
309 None => break,
310 },
311 err = err_recv.next() => match err {
312 Some(err) => if self.verbose {
313 eprintln!("cancelling fuse main loop due to error: {}", err);
314 return Err(err);
315 },
316 None => panic!("error channel was closed unexpectedly"),
317 },
318 }
319 }
320 Ok(())
321 }
322
323 async fn handle_request(
324 self: Arc<Self>,
325 request: Request,
326 mut err_sender: UnboundedSender<Error>,
327 ) {
328 let result: Result<(), Error> = match request {
329 Request::Lookup(request) => {
330 match self.lookup(request.parent, &request.file_name).await {
331 Ok((entry, lookup)) => match request.reply(&entry) {
332 Ok(()) => {
333 lookup.leak();
334 Ok(())
335 }
336 Err(err) => Err(Error::from(err)),
337 },
338 Err(err) => return self.handle_err(request, err, err_sender).await,
339 }
340 }
341 Request::Forget(request) => match self.forget(request.inode, request.count as usize) {
342 Ok(()) => {
343 request.reply();
344 Ok(())
345 }
346 Err(err) => return self.handle_err(request, err, err_sender).await,
347 },
348 Request::Getattr(request) => match self.getattr(request.inode).await {
349 Ok(stat) => request.reply(&stat, std::f64::MAX).map_err(Error::from),
350 Err(err) => return self.handle_err(request, err, err_sender).await,
351 },
352 Request::ReaddirPlus(mut request) => match self.readdirplus(&mut request).await {
353 Ok(lookups) => match request.reply() {
354 Ok(()) => {
355 for i in lookups {
356 i.leak();
357 }
358 Ok(())
359 }
360 Err(err) => Err(Error::from(err)),
361 },
362 Err(err) => return self.handle_err(request, err, err_sender).await,
363 },
364 Request::Read(request) => {
365 match self.read(request.inode, request.size, request.offset).await {
366 Ok(data) => request.reply(&data).map_err(Error::from),
367 Err(err) => return self.handle_err(request, err, err_sender).await,
368 }
369 }
370 Request::Readlink(request) => match self.readlink(request.inode).await {
371 Ok(data) => request.reply(&data).map_err(Error::from),
372 Err(err) => return self.handle_err(request, err, err_sender).await,
373 },
374 Request::ListXAttrSize(request) => match self.listxattrs(request.inode).await {
375 Ok(data) => request
376 .reply(
377 data.into_iter()
378 .fold(0, |sum, i| sum + i.name().to_bytes_with_nul().len()),
379 )
380 .map_err(Error::from),
381 Err(err) => return self.handle_err(request, err, err_sender).await,
382 },
383 Request::ListXAttr(mut request) => match self.listxattrs_into(&mut request).await {
384 Ok(ReplyBufState::Ok) => request.reply().map_err(Error::from),
385 Ok(ReplyBufState::Full) => request.fail_full().map_err(Error::from),
386 Err(err) => return self.handle_err(request, err, err_sender).await,
387 },
388 Request::GetXAttrSize(request) => {
389 match self.getxattr(request.inode, &request.attr_name).await {
390 Ok(xattr) => request.reply(xattr.value().len()).map_err(Error::from),
391 Err(err) => return self.handle_err(request, err, err_sender).await,
392 }
393 }
394 Request::GetXAttr(request) => {
395 match self.getxattr(request.inode, &request.attr_name).await {
396 Ok(xattr) => request.reply(xattr.value()).map_err(Error::from),
397 Err(err) => return self.handle_err(request, err, err_sender).await,
398 }
399 }
400 other => {
401 if self.verbose {
402 eprintln!("Received unexpected fuse request");
403 }
404 other.fail(libc::ENOSYS).map_err(Error::from)
405 }
406 };
407
408 if let Err(err) = result {
409 err_sender
410 .send(err)
411 .await
412 .expect("failed to propagate error to main loop");
413 }
414 }
415
416 fn get_lookup(&self, inode: u64) -> Result<LookupRef, Error> {
417 let lookups = self.lookups.read().unwrap();
418 if let Some(lookup) = lookups.get(&inode) {
419 return Ok(lookup.get_ref(self));
420 }
421 io_return!(libc::ENOENT);
422 }
423
424 async fn open_dir(&self, inode: u64) -> Result<Directory, Error> {
425 if inode == ROOT_ID {
426 Ok(self.accessor.open_root().await?)
427 } else if !is_dir_inode(inode) {
428 io_return!(libc::ENOTDIR);
429 } else {
430 Ok(unsafe { self.accessor.open_dir_at_end(inode).await? })
431 }
432 }
433
434 async fn open_entry(&self, lookup: &LookupRef<'_>) -> io::Result<FileEntry> {
435 unsafe {
436 self.accessor
437 .open_file_at_range(&lookup.entry_range_info)
438 .await
439 }
440 }
441
442 fn open_content(&self, lookup: &LookupRef) -> Result<FileContents, Error> {
443 if is_dir_inode(lookup.inode) {
444 io_return!(libc::EISDIR);
445 }
446
447 match lookup.content_range.clone() {
448 Some(range) => Ok(unsafe { self.accessor.open_contents_at_range(range) }),
449 None => io_return!(libc::EBADF),
450 }
451 }
452
453 fn make_lookup(&self, parent: u64, inode: u64, entry: &FileEntry) -> Result<LookupRef, Error> {
454 let lookups = self.lookups.read().unwrap();
455 if let Some(lookup) = lookups.get(&inode) {
456 return Ok(lookup.get_ref(self));
457 }
458 drop(lookups);
459
460 let entry = Lookup::new(
461 inode,
462 parent,
463 entry.entry_range_info().clone(),
464 entry.content_range()?,
465 );
466 let reference = entry.get_ref(self);
467 entry.refs.store(1, Ordering::Release);
468
469 let mut lookups = self.lookups.write().unwrap();
470 if let Some(lookup) = lookups.get(&inode) {
471 return Ok(lookup.get_ref(self));
472 }
473
474 lookups.insert(inode, entry);
475 drop(lookups);
476 Ok(reference)
477 }
478
479 fn forget(&self, inode: u64, count: usize) -> Result<(), Error> {
480 let node = self.get_lookup(inode)?;
481 node.forget(count)?;
482 Ok(())
483 }
484
485 async fn lookup<'a>(
486 &'a self,
487 parent: u64,
488 file_name: &OsStr,
489 ) -> Result<(EntryParam, LookupRef<'a>), Error> {
490 let dir = self.open_dir(parent).await?;
491
492 let entry = match { dir }.lookup(file_name).await? {
493 Some(entry) => entry,
494 None => io_return!(libc::ENOENT),
495 };
496
497 let entry = if let pxar::EntryKind::Hardlink(_) = entry.kind() {
498 // we don't know the file's end-offset, so we'll just allow the decoder to decode the
499 // entire rest of the archive until we figure out something better...
500 let entry = self.accessor.follow_hardlink(&entry).await?;
501
502 if let pxar::EntryKind::Hardlink(_) = entry.kind() {
503 // hardlinks must not point to other hardlinks...
504 io_return!(libc::ELOOP);
505 }
506
507 entry
508 } else {
509 entry
510 };
511
512 let response = to_entry(&entry)?;
513 let inode = response.inode;
514 Ok((response, self.make_lookup(parent, inode, &entry)?))
515 }
516
517 async fn getattr(&self, inode: u64) -> Result<libc::stat, Error> {
518 let entry = unsafe {
519 self.accessor.open_file_at_range(&self.get_lookup(inode)?.entry_range_info).await?
520 };
521 to_stat(inode, &entry)
522 }
523
524 async fn readdirplus<'a>(
525 &'a self,
526 request: &mut requests::ReaddirPlus,
527 ) -> Result<Vec<LookupRef<'a>>, Error> {
528 let mut lookups = Vec::new();
529 let offset = usize::try_from(request.offset)
530 .map_err(|_| io_format_err!("directory offset out of range"))?;
531
532 let dir = self.open_dir(request.inode).await?;
533 let dir_lookup = self.get_lookup(request.inode)?;
534
535 let entry_count = dir.read_dir().count() as isize;
536
537 let mut next = offset as isize;
538 let mut iter = dir.read_dir().skip(offset);
539 while let Some(file) = iter.next().await {
540 next += 1;
541 let file = file?.decode_entry().await?;
542 let stat = to_stat(to_inode(&file), &file)?;
543 let name = file.file_name();
544 match request.add_entry(name, &stat, next, 1, std::f64::MAX, std::f64::MAX)? {
545 ReplyBufState::Ok => (),
546 ReplyBufState::Full => return Ok(lookups),
547 }
548 lookups.push(self.make_lookup(request.inode, stat.st_ino, &file)?);
549 }
550
551 if next == entry_count {
552 next += 1;
553 let file = dir.lookup_self().await?;
554 let stat = to_stat(to_inode(&file), &file)?;
555 let name = OsStr::new(".");
556 match request.add_entry(name, &stat, next, 1, std::f64::MAX, std::f64::MAX)? {
557 ReplyBufState::Ok => (),
558 ReplyBufState::Full => return Ok(lookups),
559 }
560 lookups.push(LookupRef::clone(&dir_lookup));
561 }
562
563 if next == entry_count + 1 {
564 next += 1;
565 let lookup = self.get_lookup(dir_lookup.parent)?;
566 let parent_dir = self.open_dir(lookup.inode).await?;
567 let file = parent_dir.lookup_self().await?;
568 let stat = to_stat(to_inode(&file), &file)?;
569 let name = OsStr::new("..");
570 match request.add_entry(name, &stat, next, 1, std::f64::MAX, std::f64::MAX)? {
571 ReplyBufState::Ok => (),
572 ReplyBufState::Full => return Ok(lookups),
573 }
574 lookups.push(lookup);
575 }
576
577 Ok(lookups)
578 }
579
580 async fn read(&self, inode: u64, len: usize, offset: u64) -> Result<Vec<u8>, Error> {
581 let file = self.get_lookup(inode)?;
582 let content = self.open_content(&file)?;
583 let mut buf = vec::undefined(len);
584 let got = content.read_at(&mut buf, offset).await?;
585 buf.truncate(got);
586 Ok(buf)
587 }
588
589 async fn readlink(&self, inode: u64) -> Result<OsString, Error> {
590 let lookup = self.get_lookup(inode)?;
591 let file = self.open_entry(&lookup).await?;
592 match file.get_symlink() {
593 None => io_return!(libc::EINVAL),
594 Some(link) => Ok(link.to_owned()),
595 }
596 }
597
598 async fn listxattrs(&self, inode: u64) -> Result<Vec<pxar::format::XAttr>, Error> {
599 let lookup = self.get_lookup(inode)?;
600 let metadata = self
601 .open_entry(&lookup)
602 .await?
603 .into_entry()
604 .into_metadata();
605
606 let mut xattrs = metadata.xattrs;
607
608 use pxar::format::XAttr;
609
610 if let Some(fcaps) = metadata.fcaps {
611 xattrs.push(XAttr::new(xattr::xattr_name_fcaps().to_bytes(), fcaps.data));
612 }
613
614 // TODO: Special cases:
615 // b"system.posix_acl_access
616 // b"system.posix_acl_default
617 //
618 // For these we need to be able to create posix acl format entries, at that point we could
619 // just ditch libacl as well...
620
621 Ok(xattrs)
622 }
623
624 async fn listxattrs_into(
625 &self,
626 request: &mut requests::ListXAttr,
627 ) -> Result<ReplyBufState, Error> {
628 let xattrs = self.listxattrs(request.inode).await?;
629
630 for entry in xattrs {
631 match request.add_c_string(entry.name()) {
632 ReplyBufState::Ok => (),
633 ReplyBufState::Full => return Ok(ReplyBufState::Full),
634 }
635 }
636
637 Ok(ReplyBufState::Ok)
638 }
639
640 async fn getxattr(&self, inode: u64, xattr: &OsStr) -> Result<pxar::format::XAttr, Error> {
641 // TODO: pxar::Accessor could probably get a more optimized method to fetch a specific
642 // xattr for an entry...
643 let xattrs = self.listxattrs(inode).await?;
644 for entry in xattrs {
645 if entry.name().to_bytes() == xattr.as_bytes() {
646 return Ok(entry);
647 }
648 }
649 io_return!(libc::ENODATA);
650 }
651 }
652
653 #[inline]
654 fn to_entry(entry: &FileEntry) -> Result<EntryParam, Error> {
655 to_entry_param(to_inode(&entry), &entry)
656 }
657
658 #[inline]
659 fn to_inode(entry: &FileEntry) -> u64 {
660 if entry.is_dir() {
661 entry.entry_range_info().entry_range.end
662 } else {
663 entry.entry_range_info().entry_range.start | NON_DIRECTORY_INODE
664 }
665 }
666
667 fn to_entry_param(inode: u64, entry: &pxar::Entry) -> Result<EntryParam, Error> {
668 Ok(EntryParam::simple(inode, to_stat(inode, entry)?))
669 }
670
671 fn to_stat(inode: u64, entry: &pxar::Entry) -> Result<libc::stat, Error> {
672 let nlink = if entry.is_dir() { 2 } else { 1 };
673
674 let metadata = entry.metadata();
675
676 let mut stat: libc::stat = unsafe { mem::zeroed() };
677 stat.st_ino = inode;
678 stat.st_nlink = nlink;
679 stat.st_mode = u32::try_from(metadata.stat.mode)
680 .map_err(|err| format_err!("mode does not fit into st_mode field: {}", err))?;
681 stat.st_size = i64::try_from(entry.file_size().unwrap_or(0))
682 .map_err(|err| format_err!("size does not fit into st_size field: {}", err))?;
683 stat.st_uid = metadata.stat.uid;
684 stat.st_gid = metadata.stat.gid;
685 stat.st_atime = metadata.stat.mtime.secs;
686 stat.st_atime_nsec = metadata.stat.mtime.nanos as _;
687 stat.st_mtime = metadata.stat.mtime.secs;
688 stat.st_mtime_nsec = metadata.stat.mtime.nanos as _;
689 stat.st_ctime = metadata.stat.mtime.secs;
690 stat.st_ctime_nsec = metadata.stat.mtime.nanos as _;
691 Ok(stat)
692 }