use std::ffi::OsStr;
use std::os::unix::ffi::OsStrExt;
use std::sync::{Arc, Mutex};
+use std::path::PathBuf;
+use std::pin::Pin;
use anyhow::{bail, format_err, Error};
use futures::*;
use proxmox::tools::fs::{replace_file, CreateOptions};
use proxmox::{http_err, identity, list_subdirs_api_method, sortable};
-use pxar::accessor::aio::Accessor;
+use pxar::accessor::aio::{Accessor, FileContents, FileEntry};
use pxar::EntryKind;
use crate::api2::types::*;
use crate::config::cached_user_info::CachedUserInfo;
use crate::server::WorkerTask;
-use crate::tools::{self, AsyncReaderStream, WrappedReaderStream};
+use crate::tools::{
+ self,
+ zip::{ZipEncoder, ZipEntry},
+ AsyncChannelWriter, AsyncReaderStream, WrappedReaderStream,
+};
+
use crate::config::acl::{
PRIV_DATASTORE_AUDIT,
PRIV_DATASTORE_MODIFY,
Ok(res.into())
}
+fn recurse_files<T, W>(
+ mut zip: ZipEncoder<W>,
+ mut decoder: Accessor<T>,
+ prefix: PathBuf,
+ file: FileEntry<T>,
+) -> Pin<Box<dyn Future<Output = Result<(ZipEncoder<W>, Accessor<T>), Error>> + Send + 'static>>
+where
+ T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static,
+ W: tokio::io::AsyncWrite + Unpin + Send + 'static,
+{
+ Box::pin(async move {
+ let metadata = file.entry().metadata();
+ let path = file.entry().path().strip_prefix(&prefix)?.to_path_buf();
+
+ match file.kind() {
+ EntryKind::File { .. } => {
+ let entry = ZipEntry::new(
+ path,
+ metadata.stat.mtime.secs,
+ metadata.stat.mode as u16,
+ true,
+ );
+ zip.add_entry(entry, Some(file.contents().await?))
+ .await
+ .map_err(|err| format_err!("could not send file entry: {}", err))?;
+ }
+ EntryKind::Hardlink(_) => {
+ let realfile = decoder.follow_hardlink(&file).await?;
+ let entry = ZipEntry::new(
+ path,
+ metadata.stat.mtime.secs,
+ metadata.stat.mode as u16,
+ true,
+ );
+ zip.add_entry(entry, Some(realfile.contents().await?))
+ .await
+ .map_err(|err| format_err!("could not send file entry: {}", err))?;
+ }
+ EntryKind::Directory => {
+ let dir = file.enter_directory().await?;
+ let mut readdir = dir.read_dir();
+ let entry = ZipEntry::new(
+ path,
+ metadata.stat.mtime.secs,
+ metadata.stat.mode as u16,
+ false,
+ );
+ zip.add_entry::<FileContents<T>>(entry, None).await?;
+ while let Some(entry) = readdir.next().await {
+ let entry = entry?.decode_entry().await?;
+ let (zip_tmp, decoder_tmp) =
+ recurse_files(zip, decoder, prefix.clone(), entry).await?;
+ zip = zip_tmp;
+ decoder = decoder_tmp;
+ }
+ }
+ _ => {} // ignore all else
+ };
+
+ Ok((zip, decoder))
+ })
+}
+
#[sortable]
pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new(
&ApiHandler::AsyncHttp(&pxar_file_download),
.lookup(OsStr::from_bytes(file_path)).await?
.ok_or(format_err!("error opening '{:?}'", file_path))?;
- let file = match file.kind() {
- EntryKind::File { .. } => file,
- EntryKind::Hardlink(_) => {
- decoder.follow_hardlink(&file).await?
- },
- // TODO symlink
- other => bail!("cannot download file of type {:?}", other),
- };
+ let body = match file.kind() {
+ EntryKind::File { .. } => Body::wrap_stream(
+ AsyncReaderStream::new(file.contents().await?).map_err(move |err| {
+ eprintln!("error during streaming of file '{:?}' - {}", filepath, err);
+ err
+ }),
+ ),
+ EntryKind::Hardlink(_) => Body::wrap_stream(
+ AsyncReaderStream::new(decoder.follow_hardlink(&file).await?.contents().await?)
+ .map_err(move |err| {
+ eprintln!(
+ "error during streaming of hardlink '{:?}' - {}",
+ filepath, err
+ );
+ err
+ }),
+ ),
+ EntryKind::Directory => {
+ let (sender, receiver) = tokio::sync::mpsc::channel(100);
+ let mut prefix = PathBuf::new();
+ let mut components = file.entry().path().components();
+ components.next_back(); // discar last
+ for comp in components {
+ prefix.push(comp);
+ }
- let body = Body::wrap_stream(
- AsyncReaderStream::new(file.contents().await?)
- .map_err(move |err| {
- eprintln!("error during streaming of '{:?}' - {}", filepath, err);
+ let channelwriter = AsyncChannelWriter::new(sender, 1024 * 1024);
+ let zipencoder = ZipEncoder::new(channelwriter);
+
+ crate::server::spawn_internal_task(async move {
+ let (mut zipencoder, _) = recurse_files(zipencoder, decoder, prefix, file)
+ .await
+ .map_err(|err| eprintln!("error during creating of zip: {}", err))?;
+
+ zipencoder
+ .finish()
+ .await
+ .map_err(|err| eprintln!("error during finishing of zip: {}", err))
+ });
+
+ Body::wrap_stream(receiver.map_err(move |err| {
+ eprintln!("error during streaming of zip '{:?}' - {}", filepath, err);
err
- })
- );
+ }))
+ }
+ other => bail!("cannot download file of type {:?}", other),
+ };
// fixme: set other headers ?
Ok(Response::builder()