]> git.proxmox.com Git - proxmox-backup.git/blob - pbs-client/src/pxar/fuse.rs
update proxmox-metrics dependency to 0.3.1
[proxmox-backup.git] / pbs-client / src / pxar / fuse.rs
1 //! Asynchronous fuse implementation.
2
3 use std::collections::BTreeMap;
4 use std::convert::TryFrom;
5 use std::ffi::{OsStr, OsString};
6 use std::future::Future;
7 use std::io;
8 use std::mem;
9 use std::ops::Range;
10 use std::os::unix::ffi::OsStrExt;
11 use std::path::Path;
12 use std::pin::Pin;
13 use std::sync::atomic::{AtomicUsize, Ordering};
14 use std::sync::{Arc, RwLock};
15 use std::task::{Context, Poll};
16
17 use anyhow::{format_err, Error};
18 use futures::channel::mpsc::UnboundedSender;
19 use futures::select;
20 use futures::sink::SinkExt;
21 use futures::stream::{StreamExt, TryStreamExt};
22
23 use proxmox_io::vec;
24 use pxar::accessor::{self, EntryRangeInfo, ReadAt};
25
26 use proxmox_fuse::requests::{self, FuseRequest};
27 use proxmox_fuse::{EntryParam, Fuse, ReplyBufState, Request, ROOT_ID};
28 use proxmox_lang::io_format_err;
29 use proxmox_sys::fs::xattr;
30
31 /// We mark inodes for regular files this way so we know how to access them.
32 const NON_DIRECTORY_INODE: u64 = 1u64 << 63;
33
34 #[inline]
35 fn is_dir_inode(inode: u64) -> bool {
36 0 == (inode & NON_DIRECTORY_INODE)
37 }
38
39 /// Our reader type instance used for accessors.
40 pub type Reader = Arc<dyn ReadAt + Send + Sync + 'static>;
41
42 /// Our Accessor type instance.
43 pub type Accessor = accessor::aio::Accessor<Reader>;
44
45 /// Our Directory type instance.
46 pub type Directory = accessor::aio::Directory<Reader>;
47
48 /// Our FileEntry type instance.
49 pub type FileEntry = accessor::aio::FileEntry<Reader>;
50
51 /// Our FileContents type instance.
52 pub type FileContents = accessor::aio::FileContents<Reader>;
53
54 pub struct Session {
55 fut: Pin<Box<dyn Future<Output = Result<(), Error>> + Send + Sync + 'static>>,
56 }
57
58 impl Session {
59 /// Create a fuse session for an archive.
60 pub async fn mount_path(
61 archive_path: &Path,
62 options: &OsStr,
63 verbose: bool,
64 mountpoint: &Path,
65 ) -> Result<Self, Error> {
66 // TODO: Add a buffered/caching ReadAt layer?
67 let file = std::fs::File::open(archive_path)?;
68 let file_size = file.metadata()?.len();
69 let reader: Reader = Arc::new(accessor::sync::FileReader::new(file));
70 let accessor = Accessor::new(reader, file_size).await?;
71 Self::mount(accessor, options, verbose, mountpoint)
72 }
73
74 /// Create a new fuse session for the given pxar `Accessor`.
75 pub fn mount(
76 accessor: Accessor,
77 options: &OsStr,
78 verbose: bool,
79 path: &Path,
80 ) -> Result<Self, Error> {
81 let fuse = Fuse::builder("pxar-mount")?
82 .debug()
83 .options_os(options)?
84 .enable_readdirplus()
85 .enable_read()
86 .enable_readlink()
87 .enable_read_xattr()
88 .build()?
89 .mount(path)?;
90
91 let session = SessionImpl::new(accessor, verbose);
92
93 Ok(Self {
94 fut: Box::pin(session.main(fuse)),
95 })
96 }
97 }
98
99 impl Future for Session {
100 type Output = Result<(), Error>;
101
102 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
103 Pin::new(&mut self.fut).poll(cx)
104 }
105 }
106
107 /// We use this to return an errno value back to the kernel.
108 macro_rules! io_return {
109 ($errno:expr) => {{
110 return Err(::std::io::Error::from_raw_os_error($errno).into());
111 }};
112 }
113
114 /// This is what we need to cache as a "lookup" entry. The kernel assumes that these are easily
115 /// accessed.
116 struct Lookup {
117 refs: AtomicUsize,
118
119 inode: u64,
120 parent: u64,
121 entry_range_info: EntryRangeInfo,
122 content_range: Option<Range<u64>>,
123 }
124
125 impl Lookup {
126 fn new(
127 inode: u64,
128 parent: u64,
129 entry_range_info: EntryRangeInfo,
130 content_range: Option<Range<u64>>,
131 ) -> Box<Lookup> {
132 Box::new(Self {
133 refs: AtomicUsize::new(1),
134 inode,
135 parent,
136 entry_range_info,
137 content_range,
138 })
139 }
140
141 /// Decrease the reference count by `count`. Note that this must not include the reference held
142 /// by `self` itself, so this must not decrease the count below 2.
143 fn forget(&self, count: usize) -> Result<(), Error> {
144 loop {
145 let old = self.refs.load(Ordering::Acquire);
146 if count >= old {
147 // We use this to bail out of a functionin an unexpected error case. This will cause the fuse
148 // request to be answered with a generic `EIO` error code. The error message contained in here
149 // will be printed to stdout if the verbose flag is used, otherwise silently dropped.
150 return Err(io_format_err!("reference count underflow").into());
151 }
152 let new = old - count;
153 match self
154 .refs
155 .compare_exchange(old, new, Ordering::SeqCst, Ordering::SeqCst)
156 {
157 Ok(_) => break Ok(()),
158 Err(_) => continue,
159 }
160 }
161 }
162
163 fn get_ref<'a>(&self, session: &'a SessionImpl) -> LookupRef<'a> {
164 if self.refs.fetch_add(1, Ordering::AcqRel) == 0 {
165 panic!("atomic refcount increased from 0 to 1");
166 }
167
168 LookupRef {
169 session,
170 lookup: self as *const Lookup,
171 }
172 }
173 }
174
175 struct LookupRef<'a> {
176 session: &'a SessionImpl,
177 lookup: *const Lookup,
178 }
179
180 unsafe impl<'a> Send for LookupRef<'a> {}
181 unsafe impl<'a> Sync for LookupRef<'a> {}
182
183 impl<'a> Clone for LookupRef<'a> {
184 fn clone(&self) -> Self {
185 self.get_ref(self.session)
186 }
187 }
188
189 impl<'a> std::ops::Deref for LookupRef<'a> {
190 type Target = Lookup;
191
192 fn deref(&self) -> &Self::Target {
193 unsafe { &*self.lookup }
194 }
195 }
196
197 impl<'a> Drop for LookupRef<'a> {
198 fn drop(&mut self) {
199 if self.lookup.is_null() {
200 return;
201 }
202
203 if self.refs.fetch_sub(1, Ordering::AcqRel) == 1 {
204 let inode = self.inode;
205 drop(self.session.lookups.write().unwrap().remove(&inode));
206 }
207 }
208 }
209
210 impl<'a> LookupRef<'a> {
211 fn leak(mut self) -> &'a Lookup {
212 unsafe { &*mem::replace(&mut self.lookup, std::ptr::null()) }
213 }
214 }
215
216 struct SessionImpl {
217 accessor: Accessor,
218 verbose: bool,
219 lookups: RwLock<BTreeMap<u64, Box<Lookup>>>,
220 }
221
222 impl SessionImpl {
223 fn new(accessor: Accessor, verbose: bool) -> Self {
224 let root = Lookup::new(
225 ROOT_ID,
226 ROOT_ID,
227 EntryRangeInfo::toplevel(0..accessor.size()),
228 None,
229 );
230
231 let mut tree = BTreeMap::new();
232 tree.insert(ROOT_ID, root);
233
234 Self {
235 accessor,
236 verbose,
237 lookups: RwLock::new(tree),
238 }
239 }
240
241 /// Here's how we deal with errors:
242 ///
243 /// Any error will be printed if the verbose flag was set, otherwise the message will be
244 /// silently dropped.
245 ///
246 /// Opaque errors will cause the fuse main loop to bail out with that error.
247 ///
248 /// `io::Error`s will cause the fuse request to responded to with the given `io::Error`. An
249 /// `io::ErrorKind::Other` translates to a generic `EIO`.
250 async fn handle_err(
251 &self,
252 request: impl FuseRequest,
253 err: Error,
254 mut sender: UnboundedSender<Error>,
255 ) {
256 let final_result = match err.downcast::<io::Error>() {
257 Ok(err) => {
258 if err.kind() == io::ErrorKind::Other && self.verbose {
259 eprintln!("an IO error occurred: {}", err);
260 }
261
262 // fail the request
263 request.io_fail(err).map_err(Error::from)
264 }
265 Err(err) => {
266 // `bail` (non-`io::Error`) is used for fatal errors which should actually cancel:
267 if self.verbose {
268 eprintln!("internal error: {}, bailing out", err);
269 }
270 Err(err)
271 }
272 };
273 if let Err(err) = final_result {
274 // either we failed to send the error code to fuse, or the above was not an
275 // `io::Error`, so in this case notify the main loop:
276 sender
277 .send(err)
278 .await
279 .expect("failed to propagate error to main loop");
280 }
281 }
282
283 async fn main(self, fuse: Fuse) -> Result<(), Error> {
284 Arc::new(self).main_do(fuse).await
285 }
286
287 async fn main_do(self: Arc<Self>, fuse: Fuse) -> Result<(), Error> {
288 let (err_send, mut err_recv) = futures::channel::mpsc::unbounded::<Error>();
289 let mut fuse = fuse.fuse(); // make this a futures::stream::FusedStream!
290 loop {
291 select! {
292 request = fuse.try_next() => match request? {
293 Some(request) => {
294 tokio::spawn(Arc::clone(&self).handle_request(request, err_send.clone()));
295 }
296 None => break,
297 },
298 err = err_recv.next() => match err {
299 Some(err) => if self.verbose {
300 eprintln!("cancelling fuse main loop due to error: {}", err);
301 return Err(err);
302 },
303 None => panic!("error channel was closed unexpectedly"),
304 },
305 }
306 }
307 Ok(())
308 }
309
310 async fn handle_request(
311 self: Arc<Self>,
312 request: Request,
313 mut err_sender: UnboundedSender<Error>,
314 ) {
315 let result: Result<(), Error> = match request {
316 Request::Lookup(request) => {
317 match self.lookup(request.parent, &request.file_name).await {
318 Ok((entry, lookup)) => match request.reply(&entry) {
319 Ok(()) => {
320 lookup.leak();
321 Ok(())
322 }
323 Err(err) => Err(Error::from(err)),
324 },
325 Err(err) => return self.handle_err(request, err, err_sender).await,
326 }
327 }
328 Request::Forget(request) => match self.forget(request.inode, request.count as usize) {
329 Ok(()) => {
330 request.reply();
331 Ok(())
332 }
333 Err(err) => return self.handle_err(request, err, err_sender).await,
334 },
335 Request::Getattr(request) => match self.getattr(request.inode).await {
336 Ok(stat) => request.reply(&stat, f64::MAX).map_err(Error::from),
337 Err(err) => return self.handle_err(request, err, err_sender).await,
338 },
339 Request::ReaddirPlus(mut request) => match self.readdirplus(&mut request).await {
340 Ok(lookups) => match request.reply() {
341 Ok(()) => {
342 for i in lookups {
343 i.leak();
344 }
345 Ok(())
346 }
347 Err(err) => Err(Error::from(err)),
348 },
349 Err(err) => return self.handle_err(request, err, err_sender).await,
350 },
351 Request::Read(request) => {
352 match self.read(request.inode, request.size, request.offset).await {
353 Ok(data) => request.reply(&data).map_err(Error::from),
354 Err(err) => return self.handle_err(request, err, err_sender).await,
355 }
356 }
357 Request::Readlink(request) => match self.readlink(request.inode).await {
358 Ok(data) => request.reply(&data).map_err(Error::from),
359 Err(err) => return self.handle_err(request, err, err_sender).await,
360 },
361 Request::ListXAttrSize(request) => match self.listxattrs(request.inode).await {
362 Ok(data) => request
363 .reply(
364 data.into_iter()
365 .fold(0, |sum, i| sum + i.name().to_bytes_with_nul().len()),
366 )
367 .map_err(Error::from),
368 Err(err) => return self.handle_err(request, err, err_sender).await,
369 },
370 Request::ListXAttr(mut request) => match self.listxattrs_into(&mut request).await {
371 Ok(ReplyBufState::Ok) => request.reply().map_err(Error::from),
372 Ok(ReplyBufState::Full) => request.fail_full().map_err(Error::from),
373 Err(err) => return self.handle_err(request, err, err_sender).await,
374 },
375 Request::GetXAttrSize(request) => {
376 match self.getxattr(request.inode, &request.attr_name).await {
377 Ok(xattr) => request.reply(xattr.value().len()).map_err(Error::from),
378 Err(err) => return self.handle_err(request, err, err_sender).await,
379 }
380 }
381 Request::GetXAttr(request) => {
382 match self.getxattr(request.inode, &request.attr_name).await {
383 Ok(xattr) => request.reply(xattr.value()).map_err(Error::from),
384 Err(err) => return self.handle_err(request, err, err_sender).await,
385 }
386 }
387 other => {
388 if self.verbose {
389 eprintln!("Received unexpected fuse request");
390 }
391 other.fail(libc::ENOSYS).map_err(Error::from)
392 }
393 };
394
395 if let Err(err) = result {
396 err_sender
397 .send(err)
398 .await
399 .expect("failed to propagate error to main loop");
400 }
401 }
402
403 fn get_lookup(&self, inode: u64) -> Result<LookupRef, Error> {
404 let lookups = self.lookups.read().unwrap();
405 if let Some(lookup) = lookups.get(&inode) {
406 return Ok(lookup.get_ref(self));
407 }
408 io_return!(libc::ENOENT);
409 }
410
411 async fn open_dir(&self, inode: u64) -> Result<Directory, Error> {
412 if inode == ROOT_ID {
413 Ok(self.accessor.open_root().await?)
414 } else if !is_dir_inode(inode) {
415 io_return!(libc::ENOTDIR);
416 } else {
417 Ok(unsafe { self.accessor.open_dir_at_end(inode).await? })
418 }
419 }
420
421 async fn open_entry(&self, lookup: &LookupRef<'_>) -> io::Result<FileEntry> {
422 unsafe {
423 self.accessor
424 .open_file_at_range(&lookup.entry_range_info)
425 .await
426 }
427 }
428
429 fn open_content(&self, lookup: &LookupRef) -> Result<FileContents, Error> {
430 if is_dir_inode(lookup.inode) {
431 io_return!(libc::EISDIR);
432 }
433
434 match lookup.content_range.clone() {
435 Some(range) => Ok(unsafe { self.accessor.open_contents_at_range(range) }),
436 None => io_return!(libc::EBADF),
437 }
438 }
439
440 fn make_lookup(&self, parent: u64, inode: u64, entry: &FileEntry) -> Result<LookupRef, Error> {
441 let lookups = self.lookups.read().unwrap();
442 if let Some(lookup) = lookups.get(&inode) {
443 return Ok(lookup.get_ref(self));
444 }
445 drop(lookups);
446
447 let entry = Lookup::new(
448 inode,
449 parent,
450 entry.entry_range_info().clone(),
451 entry.content_range()?,
452 );
453 let reference = entry.get_ref(self);
454 entry.refs.store(1, Ordering::Release);
455
456 let mut lookups = self.lookups.write().unwrap();
457 if let Some(lookup) = lookups.get(&inode) {
458 return Ok(lookup.get_ref(self));
459 }
460
461 lookups.insert(inode, entry);
462 drop(lookups);
463 Ok(reference)
464 }
465
466 fn forget(&self, inode: u64, count: usize) -> Result<(), Error> {
467 let node = self.get_lookup(inode)?;
468 node.forget(count)?;
469 Ok(())
470 }
471
472 async fn lookup(
473 &'_ self,
474 parent: u64,
475 file_name: &OsStr,
476 ) -> Result<(EntryParam, LookupRef<'_>), Error> {
477 let dir = self.open_dir(parent).await?;
478
479 let entry = match { dir }.lookup(file_name).await? {
480 Some(entry) => entry,
481 None => io_return!(libc::ENOENT),
482 };
483
484 let entry = if let pxar::EntryKind::Hardlink(_) = entry.kind() {
485 // we don't know the file's end-offset, so we'll just allow the decoder to decode the
486 // entire rest of the archive until we figure out something better...
487 let entry = self.accessor.follow_hardlink(&entry).await?;
488
489 if let pxar::EntryKind::Hardlink(_) = entry.kind() {
490 // hardlinks must not point to other hardlinks...
491 io_return!(libc::ELOOP);
492 }
493
494 entry
495 } else {
496 entry
497 };
498
499 let response = to_entry(&entry)?;
500 let inode = response.inode;
501 Ok((response, self.make_lookup(parent, inode, &entry)?))
502 }
503
504 async fn getattr(&self, inode: u64) -> Result<libc::stat, Error> {
505 let entry = unsafe {
506 self.accessor.open_file_at_range(&self.get_lookup(inode)?.entry_range_info).await?
507 };
508 to_stat(inode, &entry)
509 }
510
511 async fn readdirplus(
512 &'_ self,
513 request: &mut requests::ReaddirPlus,
514 ) -> Result<Vec<LookupRef<'_>>, Error> {
515 let mut lookups = Vec::new();
516 let offset = usize::try_from(request.offset)
517 .map_err(|_| io_format_err!("directory offset out of range"))?;
518
519 let dir = self.open_dir(request.inode).await?;
520 let dir_lookup = self.get_lookup(request.inode)?;
521
522 let entry_count = dir.read_dir().count() as isize;
523
524 let mut next = offset as isize;
525 let mut iter = dir.read_dir().skip(offset);
526 while let Some(file) = iter.next().await {
527 next += 1;
528 let file = file?.decode_entry().await?;
529 let stat = to_stat(to_inode(&file), &file)?;
530 let name = file.file_name();
531 match request.add_entry(name, &stat, next, 1, f64::MAX, f64::MAX)? {
532 ReplyBufState::Ok => (),
533 ReplyBufState::Full => return Ok(lookups),
534 }
535 lookups.push(self.make_lookup(request.inode, stat.st_ino, &file)?);
536 }
537
538 if next == entry_count {
539 next += 1;
540 let file = dir.lookup_self().await?;
541 let stat = to_stat(to_inode(&file), &file)?;
542 let name = OsStr::new(".");
543 match request.add_entry(name, &stat, next, 1, f64::MAX, f64::MAX)? {
544 ReplyBufState::Ok => (),
545 ReplyBufState::Full => return Ok(lookups),
546 }
547 lookups.push(LookupRef::clone(&dir_lookup));
548 }
549
550 if next == entry_count + 1 {
551 next += 1;
552 let lookup = self.get_lookup(dir_lookup.parent)?;
553 let parent_dir = self.open_dir(lookup.inode).await?;
554 let file = parent_dir.lookup_self().await?;
555 let stat = to_stat(to_inode(&file), &file)?;
556 let name = OsStr::new("..");
557 match request.add_entry(name, &stat, next, 1, f64::MAX, f64::MAX)? {
558 ReplyBufState::Ok => (),
559 ReplyBufState::Full => return Ok(lookups),
560 }
561 lookups.push(lookup);
562 }
563
564 Ok(lookups)
565 }
566
567 async fn read(&self, inode: u64, len: usize, offset: u64) -> Result<Vec<u8>, Error> {
568 let file = self.get_lookup(inode)?;
569 let content = self.open_content(&file)?;
570 let mut buf = vec::undefined(len);
571 let got = content.read_at(&mut buf, offset).await?;
572 buf.truncate(got);
573 Ok(buf)
574 }
575
576 async fn readlink(&self, inode: u64) -> Result<OsString, Error> {
577 let lookup = self.get_lookup(inode)?;
578 let file = self.open_entry(&lookup).await?;
579 match file.get_symlink() {
580 None => io_return!(libc::EINVAL),
581 Some(link) => Ok(link.to_owned()),
582 }
583 }
584
585 async fn listxattrs(&self, inode: u64) -> Result<Vec<pxar::format::XAttr>, Error> {
586 let lookup = self.get_lookup(inode)?;
587 let metadata = self
588 .open_entry(&lookup)
589 .await?
590 .into_entry()
591 .into_metadata();
592
593 let mut xattrs = metadata.xattrs;
594
595 use pxar::format::XAttr;
596
597 if let Some(fcaps) = metadata.fcaps {
598 xattrs.push(XAttr::new(xattr::xattr_name_fcaps().to_bytes(), fcaps.data));
599 }
600
601 // TODO: Special cases:
602 // b"system.posix_acl_access
603 // b"system.posix_acl_default
604 //
605 // For these we need to be able to create posix acl format entries, at that point we could
606 // just ditch libacl as well...
607
608 Ok(xattrs)
609 }
610
611 async fn listxattrs_into(
612 &self,
613 request: &mut requests::ListXAttr,
614 ) -> Result<ReplyBufState, Error> {
615 let xattrs = self.listxattrs(request.inode).await?;
616
617 for entry in xattrs {
618 match request.add_c_string(entry.name()) {
619 ReplyBufState::Ok => (),
620 ReplyBufState::Full => return Ok(ReplyBufState::Full),
621 }
622 }
623
624 Ok(ReplyBufState::Ok)
625 }
626
627 async fn getxattr(&self, inode: u64, xattr: &OsStr) -> Result<pxar::format::XAttr, Error> {
628 // TODO: pxar::Accessor could probably get a more optimized method to fetch a specific
629 // xattr for an entry...
630 let xattrs = self.listxattrs(inode).await?;
631 for entry in xattrs {
632 if entry.name().to_bytes() == xattr.as_bytes() {
633 return Ok(entry);
634 }
635 }
636 io_return!(libc::ENODATA);
637 }
638 }
639
640 #[inline]
641 fn to_entry(entry: &FileEntry) -> Result<EntryParam, Error> {
642 to_entry_param(to_inode(entry), entry)
643 }
644
645 #[inline]
646 fn to_inode(entry: &FileEntry) -> u64 {
647 if entry.is_dir() {
648 entry.entry_range_info().entry_range.end
649 } else {
650 entry.entry_range_info().entry_range.start | NON_DIRECTORY_INODE
651 }
652 }
653
654 fn to_entry_param(inode: u64, entry: &pxar::Entry) -> Result<EntryParam, Error> {
655 Ok(EntryParam::simple(inode, to_stat(inode, entry)?))
656 }
657
658 fn to_stat(inode: u64, entry: &pxar::Entry) -> Result<libc::stat, Error> {
659 let nlink = if entry.is_dir() { 2 } else { 1 };
660
661 let metadata = entry.metadata();
662
663 let mut stat: libc::stat = unsafe { mem::zeroed() };
664 stat.st_ino = inode;
665 stat.st_nlink = nlink;
666 stat.st_mode = u32::try_from(metadata.stat.mode)
667 .map_err(|err| format_err!("mode does not fit into st_mode field: {}", err))?;
668 stat.st_size = i64::try_from(entry.file_size().unwrap_or(0))
669 .map_err(|err| format_err!("size does not fit into st_size field: {}", err))?;
670 stat.st_uid = metadata.stat.uid;
671 stat.st_gid = metadata.stat.gid;
672 stat.st_atime = metadata.stat.mtime.secs;
673 stat.st_atime_nsec = metadata.stat.mtime.nanos as _;
674 stat.st_mtime = metadata.stat.mtime.secs;
675 stat.st_mtime_nsec = metadata.stat.mtime.nanos as _;
676 stat.st_ctime = metadata.stat.mtime.secs;
677 stat.st_ctime_nsec = metadata.stat.mtime.nanos as _;
678 Ok(stat)
679 }