]> git.proxmox.com Git - proxmox-backup.git/commitdiff
remove proxmox-protocol subcrate
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Thu, 22 Aug 2019 12:03:43 +0000 (14:03 +0200)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Thu, 22 Aug 2019 12:08:25 +0000 (14:08 +0200)
AFAICT we have no use for it anymore, its api entry points
are gone. If we do end up needing something from it, it's
still in the git history anyway. (And about two thirds of it
can be made much less awkward by utilizing async-await
anyway, so no love lost there...)

Moved the chunker back into src/backup/chunker.rs

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
20 files changed:
Cargo.toml
proxmox-protocol/Cargo.toml [deleted file]
proxmox-protocol/proxmox-protocol.h [deleted file]
proxmox-protocol/src/c_chunker.rs [deleted file]
proxmox-protocol/src/c_client.rs [deleted file]
proxmox-protocol/src/c_connector.rs [deleted file]
proxmox-protocol/src/chunk_stream.rs [deleted file]
proxmox-protocol/src/chunker.rs [deleted file]
proxmox-protocol/src/client.rs [deleted file]
proxmox-protocol/src/common.rs [deleted file]
proxmox-protocol/src/connect.rs [deleted file]
proxmox-protocol/src/lib.rs [deleted file]
proxmox-protocol/src/protocol.rs [deleted file]
proxmox-protocol/src/server.rs [deleted file]
proxmox-protocol/src/tools.rs [deleted file]
proxmox-protocol/src/types.rs [deleted file]
src/backup.rs
src/backup/chunk_stream.rs
src/backup/chunker.rs [new file with mode: 0644]
src/backup/dynamic_index.rs

index 2dae29057e3603388d90a647ad8d0caa5ee0a260..10a07e3220c78448c8fa4c684b3362af50924de0 100644 (file)
@@ -10,7 +10,6 @@ path = "src/lib.rs"
 
 [dependencies]
 proxmox = { git = "ssh://gitolite3@proxdev.maurer-it.com/rust/proxmox", version = "0.1" }
-proxmox-protocol = { path = "proxmox-protocol" }
 log = "0.4"
 syslog = "4.0"
 failure = "0.1"
diff --git a/proxmox-protocol/Cargo.toml b/proxmox-protocol/Cargo.toml
deleted file mode 100644 (file)
index 5f99547..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-[package]
-name = "proxmox-protocol"
-version = "0.1.0"
-authors = [
-    "Dietmar Maurer <dietmar@proxmox.com>",
-    "Wolfgang Bumiller <w.bumiller@proxmox.com>",
-]
-edition = "2018"
-
-[lib]
-crate-type = ['lib', 'cdylib']
-
-[dependencies]
-chrono = "0.4"
-endian_trait = { version = "0.6", features = ["arrays"] }
-errno = "0.2"
-failure = "0.1"
-libc = "0.2"
-openssl = "0.10"
-serde_json = "1.0"
-url = "1.7"
diff --git a/proxmox-protocol/proxmox-protocol.h b/proxmox-protocol/proxmox-protocol.h
deleted file mode 100644 (file)
index ef4d4c3..0000000
+++ /dev/null
@@ -1,100 +0,0 @@
-#pragma once
-
-#include <stddef.h>
-#include <stdint.h>
-#include <stdbool.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-typedef int64_t proxmox_backup_read_cb(void *opaque, void *buffer, uint64_t size);
-typedef int64_t proxmox_backup_write_cb(void *opaque, const void *buffer, uint64_t size);
-typedef void proxmox_backup_drop_cb(void *opaque);
-
-typedef struct ProxmoxBackup ProxmoxBackup;
-
-extern ProxmoxBackup *proxmox_backup_new(
-       void *opaque,
-       proxmox_backup_read_cb *read_cb,
-       proxmox_backup_write_cb *write_cb,
-       proxmox_backup_drop_cb *drop_cb);
-
-extern void proxmox_backup_done(ProxmoxBackup *self);
-
-extern void proxmox_backup_clear_err(ProxmoxBackup *self);
-extern const char* proxmox_backup_get_error(const ProxmoxBackup *self);
-
-extern bool proxmox_backup_is_eof(const ProxmoxBackup *self);
-
-extern int proxmox_backup_wait_for_handshake(ProxmoxBackup *self);
-
-extern int proxmox_backup_query_hashes(ProxmoxBackup *self, const char *file_name);
-extern int proxmox_backup_wait_for_hashes(ProxmoxBackup *self);
-
-extern bool proxmox_backup_is_chunk_available(ProxmoxBackup *self, const void *digest);
-extern int proxmox_backup_upload_chunk(
-       ProxmoxBackup *self,
-       const void *digest,
-       const void *data,
-       uint64_t size);
-extern int proxmox_backup_continue_upload(ProxmoxBackup *self);
-
-extern int proxmox_backup_poll_read(ProxmoxBackup *self);
-extern int proxmox_backup_poll_send(ProxmoxBackup *self);
-
-extern int proxmox_backup_wait_for_id(ProxmoxBackup *self, int id);
-extern int proxmox_backup_discard_id(ProxmoxBackup *self, int id);
-
-extern int proxmox_backup_create(
-    ProxmoxBackup *self,
-    bool dynamic,
-    const char *backup_type,
-    const char *backup_id,
-    int64_t time_epoch,
-    const char *file_name,
-    size_t chunk_size,
-    int64_t file_size,
-    bool is_new);
-
-extern int proxmox_backup_dynamic_data(
-    ProxmoxBackup *self,
-    int stream,
-    const void *digest,
-    uint64_t size);
-
-extern int proxmox_backup_fixed_data(
-    ProxmoxBackup *self,
-    int stream,
-    size_t index,
-    const void *digest);
-
-extern int proxmox_backup_finish_backup(
-    ProxmoxBackup *self,
-    int stream,
-    char **remote_path);
-
-typedef struct ProxmoxChunker ProxmoxChunker;
-extern ProxmoxChunker *proxmox_chunker_new(uint64_t chunk_size_avg);
-extern void proxmox_chunker_done(ProxmoxChunker *self);
-extern uint64_t proxmox_chunker_scan(ProxmoxChunker *self, const void *data, size_t size);
-
-extern void proxmox_chunk_digest(const void *data, size_t size, uint8_t (*digest)[32]);
-
-typedef struct ProxmoxConnector ProxmoxConnector;
-extern ProxmoxConnector *proxmox_connector_new(
-       const char *user,
-       const char *server,
-       const char *store);
-extern void proxmox_connector_drop(ProxmoxConnector *self);
-extern int proxmox_connector_set_password(ProxmoxConnector *self, const char *password);
-extern int proxmox_connector_set_ticket(
-       ProxmoxConnector *self,
-       const char *ticket,
-       const char *token);
-extern void proxmox_connector_set_certificate_validation(ProxmoxConnector *self, bool on);
-extern ProxmoxBackup *proxmox_connector_connect(ProxmoxConnector *self);
-
-#ifdef __cplusplus
-}
-#endif
diff --git a/proxmox-protocol/src/c_chunker.rs b/proxmox-protocol/src/c_chunker.rs
deleted file mode 100644 (file)
index c24730e..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-//! C API for the Chunker.
-
-use std::os::raw::c_void;
-
-use libc::size_t;
-
-use crate::Chunker;
-
-/// Creates a new chunker instance.
-#[no_mangle]
-pub extern "C" fn proxmox_chunker_new(chunk_size_avg: size_t) -> *mut Chunker {
-    Box::leak(Box::new(Chunker::new(chunk_size_avg as usize)))
-}
-
-/// Drops an instance of a chunker. The pointer must be valid or `NULL`.
-#[no_mangle]
-pub extern "C" fn proxmox_chunker_done(me: *mut Chunker) {
-    if !me.is_null() {
-        unsafe {
-            Box::from_raw(me);
-        }
-    }
-}
-
-/// Scan the specified data for a chunk border. Returns 0 if none was found, or a positive offset
-/// to a border.
-#[no_mangle]
-pub extern "C" fn proxmox_chunker_scan(
-    me: *mut Chunker,
-    data: *const c_void,
-    size: size_t,
-) -> size_t {
-    let me = unsafe { &mut *me };
-    me.scan(unsafe { std::slice::from_raw_parts(data as *const u8, size as usize) }) as size_t
-}
-
-/// Compute a chunk digest. This is mostly a convenience method to avoid having to lookup the right
-/// digest method for your language of choice.
-#[no_mangle]
-pub extern "C" fn proxmox_chunk_digest(
-    data: *const c_void,
-    size: size_t,
-    out_digest: *mut [u8; 32],
-) {
-    let digest = crate::FixedChunk::from_data(unsafe {
-        std::slice::from_raw_parts(data as *const u8, size as usize)
-    });
-    unsafe { *out_digest = digest.0 };
-}
diff --git a/proxmox-protocol/src/c_client.rs b/proxmox-protocol/src/c_client.rs
deleted file mode 100644 (file)
index ee0dc65..0000000
+++ /dev/null
@@ -1,519 +0,0 @@
-//! For the C API we need to provide a `Client` compatible with C. In rust `Client` takes a
-//! `T: Read + Write`, so we need to provide a way for C to provide callbacks to us to
-//! implement this.
-
-use std::ffi::{CStr, CString};
-use std::io::{self, Read, Write};
-use std::os::raw::{c_char, c_int, c_void};
-
-use failure::{bail, format_err, Error};
-use libc::size_t;
-
-/// Read callback. The first parameter is the `opaque` parameter passed to `proxmox_backup_new`,
-/// the rest are the usual read function parameters. This should return the number of bytes
-/// actually read, zero on EOF, or a negative `errno` value on error (eg. `-EAGAIN`).
-pub type ReadFn = extern "C" fn(opaque: *mut c_void, buf: *mut u8, size: u64) -> i64;
-
-/// Write callback. The first parameter is the `opaque` parameter passed to `proxmox_backup_new`,
-/// the rest are the usual write function parameters. This should return the number of bytes
-/// actually written, or a negative `errno` value on error (eg. `-EAGAIN`).
-pub type WriteFn = extern "C" fn(opaque: *mut c_void, buf: *const u8, size: u64) -> i64;
-
-/// Optional drop callback. This is called when the Client gets destroyed and allows freeing
-/// resources associated with the opaque object behind the C API socket.
-pub type DropFn = extern "C" fn(opaque: *mut c_void);
-
-/// Stores the external C callbacks for communicating with the protocol socket.
-pub struct CApiSocket {
-    opaque: *mut c_void,
-    read: ReadFn,
-    write: WriteFn,
-    drop: Option<DropFn>,
-}
-
-impl CApiSocket {
-    fn from_io<T: Read + Write>(stream: T) -> Self {
-        let opaque = Box::leak(Box::new(stream));
-        Self {
-            opaque: opaque as *mut T as _,
-            read: c_read_fn::<T>,
-            write: c_write_fn::<T>,
-            drop: Some(c_drop_fn::<T>),
-        }
-    }
-}
-
-/// A client instance using C callbacks for reading from and writing to the protocol socket.
-pub struct CClient {
-    client: crate::Client<CApiSocket>,
-    error: Option<CString>,
-    upload: Option<(*const u8, usize)>,
-}
-
-impl CClient {
-    fn set_error(&mut self, err: Error) -> c_int {
-        self.error = Some(match CString::new(err.to_string()) {
-            Ok(cs) => cs,
-            Err(_) => CString::new("<bad bytes in error string>").unwrap(),
-        });
-        -1
-    }
-
-    #[inline(always)]
-    fn bool_result(&mut self, res: Result<bool, Error>) -> c_int {
-        match res {
-            Ok(false) => 0,
-            Ok(true) => 1,
-            Err(e) => self.set_error(e),
-        }
-    }
-
-    #[inline(always)]
-    fn bool_call<F>(&mut self, func: F) -> c_int
-    where
-        F: FnOnce(&mut crate::Client<CApiSocket>) -> Result<bool, Error>,
-    {
-        let res = func(&mut self.client);
-        self.bool_result(res)
-    }
-
-    #[inline(always)]
-    fn int_call<F>(&mut self, func: F) -> c_int
-    where
-        F: FnOnce(&mut crate::Client<CApiSocket>) -> Result<c_int, Error>,
-    {
-        match func(&mut self.client) {
-            Ok(v) => v,
-            Err(e) => self.set_error(e.into()),
-        }
-    }
-}
-
-impl Read for CApiSocket {
-    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        let rc = (self.read)(self.opaque, buf.as_mut_ptr(), buf.len() as u64);
-        if rc < 0 {
-            Err(io::Error::from_raw_os_error((-rc) as i32))
-        } else {
-            Ok(rc as usize)
-        }
-    }
-}
-
-impl Write for CApiSocket {
-    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-        let rc = (self.write)(self.opaque, buf.as_ptr(), buf.len() as u64);
-        if rc < 0 {
-            Err(io::Error::from_raw_os_error((-rc) as i32))
-        } else {
-            Ok(rc as usize)
-        }
-    }
-
-    fn flush(&mut self) -> io::Result<()> {
-        Ok(())
-    }
-}
-
-impl Drop for CApiSocket {
-    fn drop(&mut self) {
-        if let Some(drop) = self.drop {
-            drop(self.opaque);
-        }
-    }
-}
-
-extern "C" fn c_read_fn<T: Read>(opaque: *mut c_void, buf: *mut u8, size: u64) -> i64 {
-    let stream = unsafe { &mut *(opaque as *mut T) };
-    let buf = unsafe { std::slice::from_raw_parts_mut(buf, size as usize) };
-
-    match stream.read(buf) {
-        Ok(size) => size as i64,
-        Err(err) => {
-            match err.raw_os_error() {
-                Some(err) => -(err as i64),
-                None => {
-                    eprintln!("error reading from stream: {}", err);
-                    -libc::EIO as i64
-                }
-            }
-        },
-    }
-}
-
-extern "C" fn c_write_fn<T: Write>(opaque: *mut c_void, buf: *const u8, size: u64) -> i64 {
-    let stream = unsafe { &mut *(opaque as *mut T) };
-    let buf = unsafe { std::slice::from_raw_parts(buf, size as usize) };
-
-    match stream.write(buf) {
-        Ok(size) => size as i64,
-        Err(err) => {
-            match err.raw_os_error() {
-                Some(err) => -(err as i64),
-                None => {
-                    eprintln!("error writing to stream: {}", err);
-                    -libc::EIO as i64
-                }
-            }
-        },
-    }
-}
-
-extern "C" fn c_drop_fn<T>(opaque: *mut c_void) {
-    unsafe {
-        Box::from_raw(opaque as *mut T);
-    }
-}
-
-pub(crate) fn make_c_compatible_client<T: Read + Write>(stream: T) -> crate::Client<CApiSocket> {
-    crate::Client::new(CApiSocket::from_io(stream))
-}
-
-pub(crate) fn make_c_client(client: crate::Client<CApiSocket>) -> *mut CClient {
-    Box::leak(Box::new(CClient {
-        client,
-        error: None,
-        upload: None,
-    }))
-}
-
-/// Creates a new instance of a backup protocol client.
-///
-/// # Arguments
-///
-/// * `opaque` - An opaque pointer passed to the two provided callback methods.
-/// * `read` - The read callback.
-/// * `write` - The write callback.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_new(
-    opaque: *mut c_void,
-    read: ReadFn,
-    write: WriteFn,
-    drop: Option<DropFn>,
-) -> *mut CClient {
-    make_c_client(crate::Client::new(CApiSocket {
-        opaque,
-        read,
-        write,
-        drop,
-    }))
-}
-
-/// Drops an instance of a backup protocol client. The pointer must be valid or `NULL`.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_done(me: *mut CClient) {
-    if !me.is_null() {
-        unsafe {
-            Box::from_raw(me);
-        }
-    }
-}
-
-/// Returns a C String describing the last error or `NULL` if there was none.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_get_error(me: *const CClient) -> *const c_char {
-    let me = unsafe { &*me };
-    match me.error {
-        Some(ref e) => e.as_ptr(),
-        None => std::ptr::null(),
-    }
-}
-
-/// Returns true if the `read` callback had previously returned `EOF`.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_is_eof(me: *const CClient) -> bool {
-    let me = unsafe { &*me };
-    me.client.eof()
-}
-
-/// The data polling methods usually pass errors from the callbacks through to the original caller.
-/// Since the protocol needs to be non-blocking-IO safe and therefore able to resumine at any point
-/// where `-EAGAIN` can be returned by the callbacks, it is up to the caller which errors are to be
-/// considered fatal, but any error returned by callbacks which is not `-EAGAIN` will result in an
-/// internal error flag to be set which has to be cleared before trying to resume normal
-/// operations.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_clear_err(me: *mut CClient) {
-    let me = unsafe { &mut *me };
-    me.client.clear_err();
-    me.error = None;
-}
-
-/// Polls for data and checks whether the protocol handshake has been made successfully.
-/// Returns `1` if the handshake was successful, `0` if it is not yet complete or `-1` on error.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_wait_for_handshake(me: *mut CClient) -> c_int {
-    let me = unsafe { &mut *me };
-    me.bool_call(move |c| c.wait_for_handshake())
-}
-
-fn check_string(s: *const c_char) -> Result<&'static str, Error> {
-    if s.is_null() {
-        bail!("NULL string");
-    }
-    Ok(std::str::from_utf8(unsafe {
-        CStr::from_ptr(s).to_bytes()
-    })?)
-}
-
-/// Request the list of hashes for a backup file in order to prevent duplicates from being sent to
-/// the server. This simply causes an internal list to be filled. Only one such operation can be
-/// performed simultaneously. To wait for its completion see `proxmox_backup_wait_for_hashes`.
-///
-/// If the file name is `NULL` or not a valid UTF-8 string, this function returns an error without
-/// putting the protocol handler in an error state.
-///
-/// Returns `0` on success, `-1` otherwise.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_query_hashes(me: *mut CClient, file_name: *const c_char) -> c_int {
-    let me = unsafe { &mut *me };
-
-    me.int_call(move |client| {
-        let file_name = check_string(file_name)?;
-        client.query_hashes(file_name)?;
-        Ok(0)
-    })
-}
-
-/// If there is an ongoing hash list request, this will poll the data stream.
-///
-/// Returns `1` if the transfer is complete (or there was no transfer to begin with), `0` if it is
-/// incomplete, or `-1` if an error occurred.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_wait_for_hashes(me: *mut CClient) -> c_int {
-    let me = unsafe { &mut *me };
-    me.bool_call(move |c| c.wait_for_hashes())
-}
-
-/// Check if a chunk of the provided digest is known to the this client instance. Note that this
-/// does not query the server for this information, and is only useful after a call to
-/// `proxmox_backup_query_hashes` or after uploading something.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_is_chunk_available(me: *const CClient, digest: *const u8) -> bool {
-    let me = unsafe { &*me };
-    let digest = unsafe { &*(digest as *const [u8; 32]) };
-    me.client.is_chunk_available(digest)
-}
-
-/// Begin uploading a chunk to the server. This attempts to upload the data right away, but if the
-/// writer may fail due to non-blocking I/O in which case the `proxmox_backup_continue_upload`
-/// function must be used.
-///
-/// Returns `0` if the upload is incomplete, a positive ID if the upload was completed immediately,
-/// or `-1` on error.
-///
-/// The ID returned on success can be used to wait for the server to acknowledge that the chunk has
-/// been written successfully. Use `proxmox_backup_wait_for_id` to do this. If confirmation is not
-/// required, the ID should be released via `proxmox_backup_discard_id`.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_upload_chunk(
-    me: *mut CClient,
-    digest: *const u8,
-    data: *const u8,
-    size: u64,
-) -> c_int {
-    let me = unsafe { &mut *me };
-    let digest: &[u8; 32] = unsafe { &*(digest as *const [u8; 32]) };
-    let size = size as usize;
-    let slice: &[u8] = unsafe { std::slice::from_raw_parts(data, size) };
-    match me.client.upload_chunk(digest, slice) {
-        Ok(Some(id)) => id.0 as c_int,
-        Ok(None) => {
-            me.upload = Some((data, size));
-            0
-        }
-        Err(e) => me.set_error(e),
-    }
-}
-
-/// If an upload did not finish immediately (`proxmox_backup_upload_chunk` returned `0`), this
-/// function must be used to retry sending the rest of the data.
-///
-/// Returns `0` if the upload is incomplete, a positive ID if the upload was completed immediately,
-/// or `-1` on error.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_continue_upload(me: *mut CClient) -> c_int {
-    let me = unsafe { &mut *me };
-    match me.upload {
-        Some((data, len)) => {
-            let slice: &[u8] = unsafe { std::slice::from_raw_parts(data, len) };
-            match me.client.continue_upload_chunk(slice) {
-                Ok(Some(id)) => id.0 as c_int,
-                Ok(None) => 0,
-                Err(e) => me.set_error(e),
-            }
-        }
-        None => me.set_error(format_err!("no upload currently running")),
-    }
-}
-
-/// Run the main receive loop. Returns `0` on success, `-1` on error.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_poll_read(me: *mut CClient) -> c_int {
-    let me = unsafe { &mut *me };
-    match me.client.poll_read(false) {
-        Ok(_) => 0,
-        Err(e) => me.set_error(e),
-    }
-}
-
-/// Run the main send loop. If the `write` callback returned `-EAGAIN`, during an operation, the
-/// protocol handler keeps the data to be sent in a write queue. This function will attempt to
-/// continue writing out the remaining data. See individual function descriptions for when this is
-/// necessary.
-///
-/// Returns `1` if the queue is now empty, `0` if there is still data in the queue, or `-1` on
-/// error.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_poll_send(me: *mut CClient) -> c_int {
-    let me = unsafe { &mut *me };
-    me.bool_call(move |c| Ok(c.poll_send()?.unwrap_or(true)))
-}
-
-/// Run the main receive loop and check for confirmation of a stream with the specified ID.
-///
-/// Returns `1` if the transaction was confirmed, `0` if not, or `-1` on error.
-///
-/// Note that once this function returned `1` for an ID, the id is considered to be free for
-/// recycling and should not be used for further calls.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_wait_for_id(me: *mut CClient, id: c_int) -> c_int {
-    let me = unsafe { &mut *me };
-    me.bool_call(move |c| c.wait_for_id(crate::StreamId(id as u8)))
-}
-
-/// Notifies the protocol handler that we do not bother waiting for confirmation of an ID. The ID
-/// may immediately be recycled for future transactions, thus the user should not use it for any
-/// further function calls.
-///
-/// Returns `0` on success, `-1` on error.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_discard_id(me: *mut CClient, id: c_int) -> c_int {
-    let me = unsafe { &mut *me };
-    match me.client.discard_id(crate::StreamId(id as u8)) {
-        Ok(_) => 0,
-        Err(e) => me.set_error(e),
-    }
-}
-
-/// Create a new backup. The returned ID should be waited upon via `proxmox_backup_wait_for_id`,
-/// which returns true once the server confirmed the creation of the backup.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_create(
-    me: *mut CClient,
-    dynamic: bool,
-    backup_type: *const c_char, // "host", "ct", "vm"
-    backup_id: *const c_char,
-    time_epoch: i64,
-    file_name: *const c_char,
-    chunk_size: size_t,
-    file_size: i64,
-    is_new: bool,
-) -> c_int {
-    let me = unsafe { &mut *me };
-    me.int_call(move |client| {
-        let index_type = match dynamic {
-            false => crate::IndexType::Fixed,
-            _ => crate::IndexType::Dynamic,
-        };
-
-        let backup_type = check_string(backup_type)?;
-        let backup_id = check_string(backup_id)?;
-        let file_name = check_string(file_name)?;
-
-        Ok(client
-            .create_backup(
-                index_type,
-                backup_type,
-                backup_id,
-                time_epoch,
-                file_name,
-                chunk_size as _,
-                if file_size < 0 {
-                    None
-                } else {
-                    Some(file_size as u64)
-                },
-                is_new,
-            )?
-            .0 as c_int)
-    })
-}
-
-/// Send a dynamic chunk entry.
-///
-/// If the entry was sent out successfully this returns `1`. If the `write` callback returned
-/// `-EAGAIN` this returns `0` and the data is queued, after which `proxmox_backup_poll_send`
-/// should be used to continue sending the data.
-/// On error `-1` is returned.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_dynamic_data(
-    me: *mut CClient,
-    stream: c_int,
-    digest: *const [u8; 32],
-    size: u64,
-) -> c_int {
-    let me = unsafe { &mut *me };
-    me.bool_call(move |client| {
-        client.dynamic_data(crate::BackupStream(stream as u8), unsafe { &*digest }, size)
-    })
-}
-
-/// Send a fixed chunk entry.
-///
-/// If the entry was sent out successfully this returns `1`. If the `write` callback returned
-/// `-EAGAIN` this returns `0` and the data is queued, after which `proxmox_backup_poll_send`
-/// should be used to continue sending the data.
-/// On error `-1` is returned.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_fixed_data(
-    me: *mut CClient,
-    stream: c_int,
-    index: size_t,
-    digest: *const [u8; 32],
-) -> c_int {
-    let me = unsafe { &mut *me };
-    me.bool_call(move |client| {
-        client.fixed_data(crate::BackupStream(stream as u8), index as usize, unsafe {
-            &*digest
-        })
-    })
-}
-
-/// Finish a running backup.
-///
-/// Tells the server that the backup is supposed to be considered complete. If the request could be
-/// sent out entirely `1` is returned. If the underlying socket is non-blocking and the packet
-/// wasn't finished `0` is returned, after which `proxmox_backup_poll_send` should be used.
-///
-/// Once the request was sent out successfully, the client should wait for acknowledgement by the
-/// remote server via `proxmox_backup_wait_for_id`, passing the backup stream ID as parameter.
-///
-/// Finally, if the client wishes to know the exact name the server stored the file under, the
-/// `remote_path` parameter can be non-`NULL` to receive a string containing the file name, which
-/// must be freed by the caller!
-///
-/// Returns: `1` on success, possibly `0` for non-blocking I/O, `-1` on error.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_finish_backup(
-    me: *mut CClient,
-    stream: c_int,
-    remote_path: *mut *mut c_char,
-) -> c_int {
-    let me = unsafe { &mut *me };
-    me.int_call(move |client| {
-        let (path, ack) = client.finish_backup(crate::BackupStream(stream as u8))?;
-
-        if !remote_path.is_null() {
-            // would be shorter with the unstable map_or_else
-            let cstr = CString::new(path)
-                .map(|cs| cs.into_raw())
-                .unwrap_or(std::ptr::null_mut());
-            unsafe {
-                *remote_path = cstr;
-            }
-        }
-
-        Ok(if ack { 1 } else { 0 })
-    })
-}
diff --git a/proxmox-protocol/src/c_connector.rs b/proxmox-protocol/src/c_connector.rs
deleted file mode 100644 (file)
index bb82d0f..0000000
+++ /dev/null
@@ -1,145 +0,0 @@
-//! C API for the `Connector`.
-
-use std::ffi::CStr;
-use std::os::raw::{c_char, c_int};
-
-use crate::Connector;
-
-#[inline(always)]
-fn with_errno<T>(err: c_int, value: T) -> T {
-    errno::set_errno(errno::Errno(err));
-    value
-}
-
-#[inline]
-fn checkstr(ptr: *const c_char) -> Option<String> {
-    if ptr.is_null() {
-        return None;
-    }
-
-    let cstr = unsafe { CStr::from_ptr(ptr) };
-    match cstr.to_str() {
-        Ok(s) => Some(s.to_string()),
-        Err(_) => None,
-    }
-}
-
-#[inline(always)]
-fn wrap_buildcall<F>(me: *mut Connector, func: F)
-where
-    F: FnOnce(Connector) -> Connector,
-{
-    let me = unsafe { &mut *me };
-    let moved_me = std::mem::replace(me, unsafe { std::mem::uninitialized() });
-    std::mem::forget(std::mem::replace(me, func(moved_me)));
-}
-
-/// Create a connector object.
-///
-/// Returns a valid pointer or `NULL` on error, with `errno` set.
-///
-/// Errors:
-///   * `EINVAL`: a required parameter was `NULL` or contained invalid bytes.
-#[no_mangle]
-pub extern "C" fn proxmox_connector_new(
-    user: *const c_char,
-    server: *const c_char,
-    store: *const c_char,
-) -> *mut Connector {
-    let (user, server, store) = match (checkstr(user), checkstr(server), checkstr(store)) {
-        (Some(user), Some(server), Some(store)) => (user, server, store),
-        _ => return with_errno(libc::EINVAL, std::ptr::null_mut()),
-    };
-
-    Box::leak(Box::new(Connector::new(user, server, store)))
-}
-
-/// If a connector is not required anymore and has not been used up via a call to
-/// `proxmox_connector_connect`, this can be used to free the associated resources.
-#[no_mangle]
-pub extern "C" fn proxmox_connector_drop(me: *mut Connector) {
-    unsafe { Box::from_raw(me) };
-}
-
-/// Use a password
-///
-/// Returns `0` on success, a negative `errno` value on error.
-///
-/// Errors:
-///   * `EINVAL`: a required parameter was `NULL` or contained invalid bytes.
-#[no_mangle]
-pub extern "C" fn proxmox_connector_set_password(
-    me: *mut Connector,
-    password: *const c_char,
-) -> c_int {
-    let password = match checkstr(password) {
-        Some(pw) =>  pw,
-        _ => return -libc::EINVAL,
-    };
-
-    wrap_buildcall(me, move |me| me.password(password));
-
-    0
-}
-
-/// Use an existing ticket.
-///
-/// Returns `0` on success, a negative `errno` value on error.
-///
-/// Errors:
-///   * `EINVAL`: a required parameter was `NULL` or contained invalid bytes.
-#[no_mangle]
-pub extern "C" fn proxmox_connector_set_ticket(
-    me: *mut Connector,
-    ticket: *const c_char,
-    token: *const c_char,
-) -> c_int {
-    let (ticket, token) = match (checkstr(ticket), checkstr(token)) {
-        (Some(ticket), Some(token)) => (ticket, token),
-        _ => return -libc::EINVAL,
-    };
-
-    wrap_buildcall(me, move |me| me.ticket(ticket, token));
-
-    0
-}
-
-/// Change whether certificate validation should be used on the connector.
-#[no_mangle]
-pub extern "C" fn proxmox_connector_set_certificate_validation(me: *mut Connector, on: bool) {
-    let me = unsafe { &mut *me };
-
-    wrap_buildcall(me, move |me| me.certificate_validation(on));
-}
-
-/// Initiate the connection. This consumes the Connector, invalidating the pointer to it!
-///
-/// Returns a `ProxmoxBackup*`, or `NULL` on error.
-#[no_mangle]
-pub extern "C" fn proxmox_connector_connect(
-    me: *mut Connector,
-) -> *mut crate::c_client::CClient {
-    let boxed = unsafe { Box::from_raw(me) };
-    let me = *boxed;
-    match me.do_connect() {
-        Ok(stream) => {
-            let mut client = crate::c_client::make_c_compatible_client(stream);
-            match client.wait_for_handshake() {
-                Ok(true) => crate::c_client::make_c_client(client),
-                Ok(false) => {
-                    // This is a synchronous blocking connection, so this should be impossible:
-                    eprintln!("proxmox backup protocol error handshake did not complete?");
-                    std::ptr::null_mut()
-                }
-                Err(err) => {
-                    eprintln!("error during handshake with backup server: {}", err);
-                    std::ptr::null_mut()
-                }
-            }
-        }
-        Err(err) => {
-            eprintln!("error connecting to backup server: {}", err);
-            std::ptr::null_mut()
-        }
-    }
-}
diff --git a/proxmox-protocol/src/chunk_stream.rs b/proxmox-protocol/src/chunk_stream.rs
deleted file mode 100644 (file)
index 4502b90..0000000
+++ /dev/null
@@ -1,118 +0,0 @@
-use std::io::Read;
-
-use failure::Error;
-
-use crate::Chunker;
-
-pub struct ChunkStream<T: Read> {
-    input: T,
-    buffer: Vec<u8>,
-    fill: usize,
-    pos: usize,
-    keep: bool,
-    eof: bool,
-    chunker: Chunker,
-}
-
-impl<T: Read> ChunkStream<T> {
-    pub fn new(input: T) -> Self {
-        Self {
-            input,
-            buffer: Vec::new(),
-            fill: 0,
-            pos: 0,
-            keep: false,
-            eof: false,
-            chunker: Chunker::new(4 * 1024 * 1024),
-        }
-    }
-
-    pub fn stream(&mut self) -> &mut Self {
-        self
-    }
-
-    fn fill_buf(&mut self) -> Result<bool, Error> {
-        if self.fill == self.buffer.len() {
-            let mut more = self.buffer.len(); // just double it
-            if more == 0 {
-                more = 1024 * 1024; // at the start, make a 1M buffer
-            }
-            // we need more data:
-            self.buffer.reserve(more);
-            unsafe {
-                self.buffer.set_len(self.buffer.capacity());
-            }
-        }
-
-        match self.input.read(&mut self.buffer[self.fill..]) {
-            Ok(more) => {
-                if more == 0 {
-                    self.eof = true;
-                }
-                self.fill += more;
-                Ok(true)
-            }
-            Err(err) => {
-                if err.kind() == std::io::ErrorKind::WouldBlock {
-                    Ok(false)
-                } else {
-                    Err(err.into())
-                }
-            }
-        }
-    }
-
-    fn consume(&mut self) {
-        assert!(self.fill >= self.pos);
-
-        let remaining = self.fill - self.pos;
-        unsafe {
-            std::ptr::copy_nonoverlapping(
-                &self.buffer[self.pos] as *const u8,
-                self.buffer.as_mut_ptr(),
-                remaining,
-            );
-        }
-        self.fill = remaining;
-        self.pos = 0;
-    }
-
-    pub fn next(&mut self) {
-        self.keep = false;
-    }
-
-    // This crate should not depend on the futures create, so we use another Option instead of
-    // Async<T>.
-    pub fn get(&mut self) -> Result<Option<Option<&[u8]>>, Error> {
-        if self.keep {
-            return Ok(Some(Some(&self.buffer[0..self.pos])));
-        }
-
-        if self.eof {
-            return Ok(Some(None));
-        }
-
-        if self.pos != 0 {
-            self.consume();
-        }
-
-        loop {
-            match self.fill_buf() {
-                Ok(true) => (),
-                Ok(false) => return Ok(None),
-                Err(err) => return Err(err),
-            }
-
-            // Note that if we hit EOF we hit a hard boundary...
-            let boundary = self.chunker.scan(&self.buffer[self.pos..self.fill]);
-            if boundary == 0 && !self.eof {
-                self.pos = self.fill;
-                continue;
-            }
-
-            self.pos += boundary;
-            self.keep = true;
-            return Ok(Some(Some(&self.buffer[0..self.pos])));
-        }
-    }
-}
diff --git a/proxmox-protocol/src/chunker.rs b/proxmox-protocol/src/chunker.rs
deleted file mode 100644 (file)
index 088d6b6..0000000
+++ /dev/null
@@ -1,306 +0,0 @@
-
-/// Note: window size 32 or 64, is faster because we can
-/// speedup modulo operations, but always computes hash 0
-/// for constant data streams .. 0,0,0,0,0,0
-/// so we use a modified the chunk boundary test too not
-/// use hash value 0 to detect a boundary.
-const CA_CHUNKER_WINDOW_SIZE: usize = 64;
-
-/// Slinding window chunker (Buzhash)
-///
-/// This is a rewrite of *casync* chunker (cachunker.h) in rust.
-///
-/// Hashing by cyclic polynomial (also called Buzhash) has the benefit
-/// of avoiding multiplications, using barrel shifts instead. For more
-/// information please take a look at the [Rolling
-/// Hash](https://en.wikipedia.org/wiki/Rolling_hash) artikel from
-/// wikipedia.
-
-pub struct Chunker {
-    h: u32,
-    window_size: usize,
-    chunk_size: usize,
-
-    chunk_size_min: usize,
-    chunk_size_max: usize,
-    _chunk_size_avg: usize,
-
-    _discriminator: u32,
-
-    break_test_mask: u32,
-    break_test_minimum: u32,
-
-    window: [u8; CA_CHUNKER_WINDOW_SIZE],
-}
-
-const BUZHASH_TABLE: [u32; 256] = [
-    0x458be752, 0xc10748cc, 0xfbbcdbb8, 0x6ded5b68,
-    0xb10a82b5, 0x20d75648, 0xdfc5665f, 0xa8428801,
-    0x7ebf5191, 0x841135c7, 0x65cc53b3, 0x280a597c,
-    0x16f60255, 0xc78cbc3e, 0x294415f5, 0xb938d494,
-    0xec85c4e6, 0xb7d33edc, 0xe549b544, 0xfdeda5aa,
-    0x882bf287, 0x3116737c, 0x05569956, 0xe8cc1f68,
-    0x0806ac5e, 0x22a14443, 0x15297e10, 0x50d090e7,
-    0x4ba60f6f, 0xefd9f1a7, 0x5c5c885c, 0x82482f93,
-    0x9bfd7c64, 0x0b3e7276, 0xf2688e77, 0x8fad8abc,
-    0xb0509568, 0xf1ada29f, 0xa53efdfe, 0xcb2b1d00,
-    0xf2a9e986, 0x6463432b, 0x95094051, 0x5a223ad2,
-    0x9be8401b, 0x61e579cb, 0x1a556a14, 0x5840fdc2,
-    0x9261ddf6, 0xcde002bb, 0x52432bb0, 0xbf17373e,
-    0x7b7c222f, 0x2955ed16, 0x9f10ca59, 0xe840c4c9,
-    0xccabd806, 0x14543f34, 0x1462417a, 0x0d4a1f9c,
-    0x087ed925, 0xd7f8f24c, 0x7338c425, 0xcf86c8f5,
-    0xb19165cd, 0x9891c393, 0x325384ac, 0x0308459d,
-    0x86141d7e, 0xc922116a, 0xe2ffa6b6, 0x53f52aed,
-    0x2cd86197, 0xf5b9f498, 0xbf319c8f, 0xe0411fae,
-    0x977eb18c, 0xd8770976, 0x9833466a, 0xc674df7f,
-    0x8c297d45, 0x8ca48d26, 0xc49ed8e2, 0x7344f874,
-    0x556f79c7, 0x6b25eaed, 0xa03e2b42, 0xf68f66a4,
-    0x8e8b09a2, 0xf2e0e62a, 0x0d3a9806, 0x9729e493,
-    0x8c72b0fc, 0x160b94f6, 0x450e4d3d, 0x7a320e85,
-    0xbef8f0e1, 0x21d73653, 0x4e3d977a, 0x1e7b3929,
-    0x1cc6c719, 0xbe478d53, 0x8d752809, 0xe6d8c2c6,
-    0x275f0892, 0xc8acc273, 0x4cc21580, 0xecc4a617,
-    0xf5f7be70, 0xe795248a, 0x375a2fe9, 0x425570b6,
-    0x8898dcf8, 0xdc2d97c4, 0x0106114b, 0x364dc22f,
-    0x1e0cad1f, 0xbe63803c, 0x5f69fac2, 0x4d5afa6f,
-    0x1bc0dfb5, 0xfb273589, 0x0ea47f7b, 0x3c1c2b50,
-    0x21b2a932, 0x6b1223fd, 0x2fe706a8, 0xf9bd6ce2,
-    0xa268e64e, 0xe987f486, 0x3eacf563, 0x1ca2018c,
-    0x65e18228, 0x2207360a, 0x57cf1715, 0x34c37d2b,
-    0x1f8f3cde, 0x93b657cf, 0x31a019fd, 0xe69eb729,
-    0x8bca7b9b, 0x4c9d5bed, 0x277ebeaf, 0xe0d8f8ae,
-    0xd150821c, 0x31381871, 0xafc3f1b0, 0x927db328,
-    0xe95effac, 0x305a47bd, 0x426ba35b, 0x1233af3f,
-    0x686a5b83, 0x50e072e5, 0xd9d3bb2a, 0x8befc475,
-    0x487f0de6, 0xc88dff89, 0xbd664d5e, 0x971b5d18,
-    0x63b14847, 0xd7d3c1ce, 0x7f583cf3, 0x72cbcb09,
-    0xc0d0a81c, 0x7fa3429b, 0xe9158a1b, 0x225ea19a,
-    0xd8ca9ea3, 0xc763b282, 0xbb0c6341, 0x020b8293,
-    0xd4cd299d, 0x58cfa7f8, 0x91b4ee53, 0x37e4d140,
-    0x95ec764c, 0x30f76b06, 0x5ee68d24, 0x679c8661,
-    0xa41979c2, 0xf2b61284, 0x4fac1475, 0x0adb49f9,
-    0x19727a23, 0x15a7e374, 0xc43a18d5, 0x3fb1aa73,
-    0x342fc615, 0x924c0793, 0xbee2d7f0, 0x8a279de9,
-    0x4aa2d70c, 0xe24dd37f, 0xbe862c0b, 0x177c22c2,
-    0x5388e5ee, 0xcd8a7510, 0xf901b4fd, 0xdbc13dbc,
-    0x6c0bae5b, 0x64efe8c7, 0x48b02079, 0x80331a49,
-    0xca3d8ae6, 0xf3546190, 0xfed7108b, 0xc49b941b,
-    0x32baf4a9, 0xeb833a4a, 0x88a3f1a5, 0x3a91ce0a,
-    0x3cc27da1, 0x7112e684, 0x4a3096b1, 0x3794574c,
-    0xa3c8b6f3, 0x1d213941, 0x6e0a2e00, 0x233479f1,
-    0x0f4cd82f, 0x6093edd2, 0x5d7d209e, 0x464fe319,
-    0xd4dcac9e, 0x0db845cb, 0xfb5e4bc3, 0xe0256ce1,
-    0x09fb4ed1, 0x0914be1e, 0xa5bdb2c3, 0xc6eb57bb,
-    0x30320350, 0x3f397e91, 0xa67791bc, 0x86bc0e2c,
-    0xefa0a7e2, 0xe9ff7543, 0xe733612c, 0xd185897b,
-    0x329e5388, 0x91dd236b, 0x2ecb0d93, 0xf4d82a3d,
-    0x35b5c03f, 0xe4e606f0, 0x05b21843, 0x37b45964,
-    0x5eff22f4, 0x6027f4cc, 0x77178b3c, 0xae507131,
-    0x7bf7cabc, 0xf9c18d66, 0x593ade65, 0xd95ddf11,
-];
-
-impl Chunker {
-
-    /// Create a new Chunker instance, which produces and average
-    /// chunk size of `chunk_size_avg` (need to be a power of two). We
-    /// allow variation from `chunk_size_avg/4` up to a maximum of
-    /// `chunk_size_avg*4`.
-    pub fn new(chunk_size_avg: usize) -> Self {
-        // The chunk cut discriminator. In order to get an average
-        // chunk size of avg, we cut whenever for a hash value "h" at
-        // byte "i" given the descriminator "d(avg)": h(i) mod d(avg)
-        // == d(avg) - 1. Note that the discriminator calculated like
-        // this only yields correct results as long as the minimal
-        // chunk size is picked as avg/4, and the maximum chunk size
-        // as avg*4. If they are picked differently the result might
-        // be skewed into either direction.
-        let avg = chunk_size_avg as f64;
-        let discriminator = (avg / (-1.42888852e-7 * avg + 1.33237515)) as u32;
-
-        if chunk_size_avg.count_ones() != 1 {
-            panic!("got unexpected chunk size - not a power of two.");
-        }
-
-        let break_test_mask = (chunk_size_avg*2 - 1) as u32;
-        let break_test_minimum =  break_test_mask - 2;
-
-        Self {
-            h: 0,
-            window_size: 0,
-            chunk_size: 0,
-            chunk_size_min: chunk_size_avg>>2,
-            chunk_size_max: chunk_size_avg<<2,
-            _chunk_size_avg: chunk_size_avg,
-            _discriminator:  discriminator,
-            break_test_mask: break_test_mask,
-            break_test_minimum: break_test_minimum,
-            window: [0u8; CA_CHUNKER_WINDOW_SIZE],
-        }
-    }
-
-    /// Scans the specified data for a chunk border. Returns 0 if none
-    /// was found (and the function should be called with more data
-    /// later on), or another value indicating the position of a
-    /// border.
-    pub fn scan(&mut self, data: &[u8]) -> usize {
-
-        let window_len = self.window.len();
-        let data_len = data.len();
-
-        let mut pos = 0;
-
-        if self.window_size <  window_len {
-            let need =  window_len - self.window_size;
-            let copy_len = if need < data_len { need } else { data_len };
-
-            for _i in 0..copy_len {
-                let byte = data[pos];
-                self.window[self.window_size] = byte;
-                self.h = self.h.rotate_left(1) ^ BUZHASH_TABLE[byte as usize];
-                pos += 1;
-                self.window_size += 1;
-            }
-
-            self.chunk_size += copy_len;
-
-            // return if window is still not full
-            if self.window_size <  window_len {
-                return 0;
-            }
-        }
-
-        //let mut idx = self.chunk_size % CA_CHUNKER_WINDOW_SIZE;
-        let mut idx = self.chunk_size & 0x3f;
-
-        while pos < data_len {
-            // roll window
-            let enter = data[pos];
-            let leave = self.window[idx];
-            self.h = self.h.rotate_left(1) ^
-                //BUZHASH_TABLE[leave as usize].rotate_left(CA_CHUNKER_WINDOW_SIZE as u32) ^
-                BUZHASH_TABLE[leave as usize] ^
-                BUZHASH_TABLE[enter as usize];
-
-            self.chunk_size += 1;
-            pos += 1;
-
-            self.window[idx] = enter;
-
-            if self.shall_break() {
-                self.h = 0;
-                self.chunk_size = 0;
-                self.window_size = 0;
-                return pos;
-            }
-
-            //idx = self.chunk_size % CA_CHUNKER_WINDOW_SIZE;
-            idx = self.chunk_size & 0x3f;
-            //idx += 1; if idx >= CA_CHUNKER_WINDOW_SIZE { idx = 0 };
-        }
-
-        0
-    }
-
-    // fast implementation avoiding modulo
-   // #[inline(always)]
-    fn shall_break(&self) -> bool {
-
-        if self.chunk_size >= self.chunk_size_max { return true; }
-
-        if self.chunk_size < self.chunk_size_min { return false; }
-
-        //(self.h & 0x1ffff) <= 2 //THIS IS SLOW!!!
-
-        //(self.h & self.break_test_mask) <= 2 // Bad on 0 streams
-
-        (self.h & self.break_test_mask) >= self.break_test_minimum
-    }
-
-    // This is the original implementation from casync
-    /*
-    #[inline(always)]
-    fn shall_break_orig(&self) -> bool {
-
-        if self.chunk_size >= self.chunk_size_max { return true; }
-
-        if self.chunk_size < self.chunk_size_min { return false; }
-
-        (self.h % self.discriminator) == (self.discriminator - 1)
-    }
-     */
-}
-
-#[test]
-fn test_chunker1() {
-
-    let mut buffer = Vec::new();
-
-    for i in 0..256*1024 {
-        for j in 0..4 {
-            let byte = ((i >> (j<<3))&0xff) as u8;
-            buffer.push(byte);
-        }
-    }
-    let mut chunker = Chunker::new(64*1024);
-
-    let mut pos = 0;
-    let mut last = 0;
-
-    let mut chunks1: Vec<(usize, usize)> = vec![];
-    let mut chunks2: Vec<(usize, usize)> = vec![];
-
-    // test1: feed single bytes
-    while pos < buffer.len() {
-        let k = chunker.scan(&buffer[pos..pos+1]);
-        pos += 1;
-        if k != 0 {
-            let prev = last;
-            last = pos;
-            chunks1.push((prev, pos-prev));
-         }
-    }
-    chunks1.push((last, buffer.len() - last));
-
-    let mut chunker = Chunker::new(64*1024);
-
-    let mut pos = 0;
-
-    // test2: feed with whole buffer
-    while pos < buffer.len() {
-        let k = chunker.scan(&buffer[pos..]);
-        if k != 0 {
-            chunks2.push((pos, k));
-            pos += k;
-         } else {
-            break;
-        }
-    }
-
-    chunks2.push((pos, buffer.len() - pos));
-
-    if chunks1 != chunks2 {
-
-        let mut size1 = 0;
-        for (_offset, len) in &chunks1 {
-            size1 += len;
-        }
-        println!("Chunks1:{}\n{:?}\n", size1, chunks1);
-
-        let mut size2 = 0;
-        for (_offset, len) in &chunks2 {
-            size2 += len;
-        }
-        println!("Chunks2:{}\n{:?}\n", size2, chunks2);
-
-        if size1 != 256*4*1024 {
-            panic!("wrong size for chunks1");
-        }
-        if size2 != 256*4*1024 {
-            panic!("wrong size for chunks2");
-        }
-
-        panic!("got different chunks");
-    }
-
-}
diff --git a/proxmox-protocol/src/client.rs b/proxmox-protocol/src/client.rs
deleted file mode 100644 (file)
index 86cc6c0..0000000
+++ /dev/null
@@ -1,603 +0,0 @@
-use std::borrow::Borrow;
-use std::collections::hash_map;
-use std::collections::{HashMap, HashSet};
-use std::io::{Read, Write};
-use std::mem;
-use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
-
-use endian_trait::Endian;
-use failure::*;
-
-use crate::common;
-use crate::protocol::*;
-use crate::tools::swapped_data_to_buf;
-use crate::{ChunkEntry, FixedChunk, IndexType};
-
-#[derive(Clone, Copy, Eq, PartialEq)]
-#[repr(transparent)]
-pub struct BackupStream(pub(crate) u8);
-
-#[derive(Clone, Copy, Eq, PartialEq)]
-#[repr(transparent)]
-pub struct StreamId(pub(crate) u8);
-
-impl From<BackupStream> for StreamId {
-    fn from(v: BackupStream) -> Self {
-        Self(v.0)
-    }
-}
-
-struct BackupStreamData {
-    id: u8,
-    index_type: IndexType,
-    pos: u64,
-    path: Option<String>,
-}
-
-pub enum AckState {
-    Waiting,      // no ack received yet.
-    Received,     // already received an ack, but the user hasn't seen it yet.
-    Ignore,       // client doesn't care.
-    AwaitingData, // waiting for something other than an 'Ok' packet
-}
-
-pub struct Client<S>
-where
-    S: Read + Write,
-{
-    chunks: RwLock<HashSet<FixedChunk>>,
-    common: common::Connection<S>,
-    handshake_done: bool,
-
-    cur_id: u8,
-    free_ids: Vec<u8>,
-    waiting_ids: HashMap<u8, AckState>,
-    hash_download: Option<u8>,
-
-    upload_chunk: Option<FixedChunk>,
-    upload_id: u8,
-    upload_pos: usize,
-    upload_state: u8,
-
-    streams: HashMap<u8, BackupStreamData>,
-}
-
-type Result<T> = std::result::Result<T, Error>;
-
-impl<S> Client<S>
-where
-    S: Read + Write,
-{
-    pub fn new(socket: S) -> Self {
-        Self {
-            chunks: RwLock::new(HashSet::new()),
-            common: common::Connection::new(socket),
-            handshake_done: false,
-
-            cur_id: 1,
-            free_ids: Vec::new(),
-            waiting_ids: HashMap::new(),
-            hash_download: None,
-
-            upload_state: 0,
-            upload_pos: 0,
-            upload_id: 0,
-            upload_chunk: None,
-
-            streams: HashMap::new(),
-        }
-    }
-
-    pub fn eof(&self) -> bool {
-        self.common.eof
-    }
-
-    pub fn error(&self) -> bool {
-        self.common.error
-    }
-
-    /// It is safe to clear the error after an `io::ErrorKind::Interrupted`.
-    pub fn clear_err(&mut self) {
-        self.common.clear_err()
-    }
-
-    pub fn wait_for_handshake(&mut self) -> Result<bool> {
-        if !self.handshake_done {
-            self.poll_read(true)?;
-        }
-        Ok(self.handshake_done)
-    }
-
-    pub fn query_hashes(&mut self, file_name: &str) -> Result<()> {
-        if self.hash_download.is_some() {
-            bail!("hash query already in progress");
-        }
-
-        let id = self.next_id()?;
-        let mut packet = Packet::builder(id, PacketType::GetHashList);
-        packet
-            .write_data(client::GetHashList {
-                name_length: file_name.len() as u16,
-            })
-            .write_buf(file_name.as_bytes());
-        self.common.queue_data(packet.finish())?;
-        self.hash_download = Some(id);
-        Ok(())
-    }
-
-    pub fn wait_for_hashes(&mut self) -> Result<bool> {
-        while self.hash_download.is_some() {
-            if !self.poll_read(true)? {
-                break;
-            }
-        }
-        Ok(self.hash_download.is_none())
-    }
-
-    fn chunk_read_lock(&self) -> Result<RwLockReadGuard<HashSet<FixedChunk>>> {
-        self.chunks
-            .read()
-            .map_err(|_| format_err!("lock poisoned, disconnecting client..."))
-    }
-
-    pub fn is_chunk_available<T: Borrow<FixedChunk>>(&self, chunk: &T) -> bool {
-        self.chunk_read_lock().unwrap().contains(chunk.borrow())
-    }
-
-    /// Attempts to upload a chunk. Returns an error state only on fatal errors. If the underlying
-    /// writer returns a `WouldBlock` error this returns `None` and `continue_upload_chunk` has to
-    /// be called until the chunk is uploaded completely. During this time no other operations
-    /// should be performed on the this object!
-    /// See `continue_upload_chunk()` for a description of the returned value.
-    pub fn upload_chunk<T>(&mut self, info: &T, data: &[u8]) -> Result<Option<StreamId>>
-    where
-        T: Borrow<FixedChunk>,
-    {
-        if self.upload_chunk.is_some() {
-            bail!("cannot concurrently upload multiple chunks");
-        }
-
-        self.upload_id = self.next_id()?;
-        self.upload_chunk = Some(info.borrow().clone());
-        self.next_upload_state(0);
-        self.continue_upload_chunk(data)
-    }
-
-    fn next_upload_state(&mut self, state: u8) {
-        self.upload_state = state;
-        self.upload_pos = 0;
-    }
-
-    // This is split into a static method not borrowing self so the buffer can point into self...
-    fn do_upload_write(
-        writer: &mut common::Connection<S>,
-        buf: &[u8],
-    ) -> Result<Option<(usize, bool)>> {
-        match writer.write_some(buf) {
-            Ok(put) => Ok(Some((put, put == buf.len()))),
-            Err(e) => {
-                if e.kind() == std::io::ErrorKind::WouldBlock {
-                    Ok(None)
-                } else {
-                    Err(e.into())
-                }
-            }
-        }
-    }
-
-    fn after_upload_write(&mut self, put: Option<(usize, bool)>, next_state: u8) -> bool {
-        match put {
-            None => return false,
-            Some((put, done)) => {
-                if done {
-                    self.next_upload_state(next_state);
-                } else {
-                    self.upload_pos += put;
-                }
-                done
-            }
-        }
-    }
-
-    fn upload_write(&mut self, buf: &[u8], next_state: u8) -> Result<bool> {
-        let wrote = Self::do_upload_write(&mut self.common, buf)?;
-        Ok(self.after_upload_write(wrote, next_state))
-    }
-
-    /// If an `upload_chunk()` call returned `Ok(false)` this needs to be used to complete the
-    /// upload process as the chunk may have already been partially written to the socket.
-    /// This function will return `Ok(false)` on `WouldBlock` errors just like `upload_chunk()`
-    /// will, after which the caller should wait for the writer to become write-ready and then
-    /// call this method again.
-    /// Once the complete chunk packet has been written to the underlying socket, this returns a
-    /// packet ID which can be waited upon for completion
-    pub fn continue_upload_chunk(&mut self, data: &[u8]) -> Result<Option<StreamId>> {
-        loop {
-            match self.upload_state {
-                // Writing the packet header:
-                0 => {
-                    let len = mem::size_of::<Packet>()
-                        + mem::size_of::<client::UploadChunk>()
-                        + data.len();
-                    let packet = Packet {
-                        id: self.upload_id,
-                        pkttype: PacketType::UploadChunk as _,
-                        length: len as _,
-                    }
-                    .to_le();
-                    let buf = unsafe { swapped_data_to_buf(&packet) };
-                    if !self.upload_write(&buf[self.upload_pos..], 1)? {
-                        return Ok(None);
-                    }
-                }
-                // Writing the hash:
-                1 => {
-                    let chunk = self.upload_chunk.as_ref().unwrap();
-                    let buf = &chunk.0[self.upload_pos..];
-                    let wrote = Self::do_upload_write(&mut self.common, buf)?;
-                    if !self.after_upload_write(wrote, 2) {
-                        return Ok(None);
-                    }
-                }
-                // Writing the data:
-                2 => {
-                    if !self.upload_write(&data[self.upload_pos..], 3)? {
-                        return Ok(None);
-                    }
-                }
-                // Done:
-                3 => {
-                    self.upload_chunk = None;
-                    self.expect_ok_for_id(self.upload_id);
-                    return Ok(Some(StreamId(self.upload_id)));
-                }
-                n => bail!("bad chunk upload state: {}", n),
-            }
-        }
-    }
-
-    // generic data polling method, returns true if at least one packet was received
-    pub fn poll_read(&mut self, one: bool) -> Result<bool> {
-        if self.common.eof {
-            // polls after EOF are errors:
-            bail!("server disconnected");
-        }
-
-        if !self.common.poll_read()? {
-            // On the client side we do not expect a server-side disconnect, so error out!
-            if self.common.eof {
-                bail!("server disconnected");
-            }
-            return Ok(false);
-        }
-
-        loop {
-            match self.common.current_packet_type {
-                PacketType::Ok => self.recv_ok()?,
-                PacketType::Hello => self.recv_hello()?,
-                PacketType::HashListPart => self.recv_hash_list()?,
-                PacketType::BackupCreated => self.backup_created()?,
-                _ => bail!(
-                    "server sent an unexpected packet of type {}",
-                    self.common.current_packet_type as u32,
-                ),
-            }
-            self.common.next()?;
-            if one || !self.common.poll_read()? {
-                break;
-            }
-        }
-        Ok(true)
-    }
-
-    // None => nothing was queued
-    // Some(true) => queue finished
-    // Some(false) => queue not finished
-    pub fn poll_send(&mut self) -> Result<Option<bool>> {
-        self.common.poll_send()
-    }
-
-    // private helpermethods
-
-    fn next_id(&mut self) -> Result<u8> {
-        if let Some(id) = self.free_ids.pop() {
-            return Ok(id);
-        }
-        if self.cur_id < 0xff {
-            self.cur_id += 1;
-            return Ok(self.cur_id - 1);
-        }
-        bail!("too many concurrent transactions");
-    }
-
-    fn free_id(&mut self, id: u8) {
-        self.free_ids.push(id);
-    }
-
-    fn expect_ok_for_id(&mut self, id: u8) {
-        self.waiting_ids.insert(id, AckState::Waiting);
-    }
-
-    fn expect_response_for_id(&mut self, id: u8) {
-        self.waiting_ids.insert(id, AckState::AwaitingData);
-    }
-
-    // Methods handling received packets:
-
-    fn recv_hello(&mut self) -> Result<()> {
-        let hello = self.common.read_unaligned_data::<server::Hello>(0)?;
-
-        if hello.magic != server::HELLO_MAGIC {
-            bail!("received an invalid hello packet");
-        }
-
-        let ver = hello.version;
-        if ver != server::PROTOCOL_VERSION {
-            bail!(
-                "hello packet contained incompatible protocol version: {} (!= {})",
-                ver,
-                server::PROTOCOL_VERSION,
-            );
-        }
-
-        self.handshake_done = true;
-        self.free_id(0);
-        Ok(())
-    }
-
-    fn chunk_write_lock(&self) -> Result<RwLockWriteGuard<HashSet<FixedChunk>>> {
-        self.chunks
-            .write()
-            .map_err(|_| format_err!("lock poisoned, disconnecting client..."))
-    }
-
-    fn recv_hash_list(&mut self) -> Result<()> {
-        let data = self.common.packet_data();
-        if data.len() == 0 {
-            // No more hashes, we're done
-            self.hash_download = None;
-            return Ok(());
-        }
-
-        if (data.len() % mem::size_of::<FixedChunk>()) != 0 {
-            bail!("hash list contains invalid size");
-        }
-
-        let chunk_list: &[FixedChunk] = unsafe {
-            std::slice::from_raw_parts(
-                data.as_ptr() as *const FixedChunk,
-                data.len() / mem::size_of::<FixedChunk>(),
-            )
-        };
-
-        let mut my_chunks = self.chunk_write_lock()?;
-        for c in chunk_list {
-            eprintln!("Got chunk '{}'", c.digest_to_hex());
-            my_chunks.insert(c.clone());
-        }
-
-        Ok(())
-    }
-
-    fn ack_id(&mut self, id: u8, data_packet: bool) -> Result<()> {
-        use hash_map::Entry::*;
-
-        match self.waiting_ids.entry(id) {
-            Vacant(_) => bail!("received unexpected packet for transaction id {}", id),
-            Occupied(mut entry) => match entry.get() {
-                AckState::Ignore => {
-                    entry.remove();
-                }
-                AckState::Received => bail!("duplicate Ack received for transaction id {}", id),
-                AckState::Waiting => {
-                    if data_packet {
-                        bail!("received data packet while expecting simple Ok for {}", id);
-                    }
-                    *entry.get_mut() = AckState::Received;
-                }
-                AckState::AwaitingData => {
-                    if !data_packet {
-                        bail!(
-                            "received empty Ok while waiting for data on stream id {}",
-                            id
-                        );
-                    }
-                    *entry.get_mut() = AckState::Received;
-                }
-            },
-        }
-        Ok(())
-    }
-
-    fn recv_ok(&mut self) -> Result<()> {
-        self.ack_id(self.common.current_packet.id, false)
-    }
-
-    pub fn wait_for_id(&mut self, id: StreamId) -> Result<bool> {
-        if !self.waiting_ids.contains_key(&id.0) {
-            bail!("wait_for_id() called on unexpected id {}", id.0);
-        }
-
-        loop {
-            if !self.poll_read(true)? {
-                return Ok(false);
-            }
-
-            use hash_map::Entry::*;
-            match self.waiting_ids.entry(id.0) {
-                Vacant(_) => return Ok(true),
-                Occupied(entry) => match entry.get() {
-                    AckState::Received => {
-                        entry.remove();
-                        return Ok(true);
-                    }
-                    _ => continue,
-                },
-            }
-        }
-    }
-
-    pub fn discard_id(&mut self, id: StreamId) -> Result<()> {
-        use hash_map::Entry::*;
-        match self.waiting_ids.entry(id.0) {
-            Vacant(_) => bail!("discard_id called with unknown id {}", id.0),
-            Occupied(mut entry) => match entry.get() {
-                AckState::Ignore => (),
-                AckState::Received => {
-                    entry.remove();
-                }
-                AckState::Waiting | AckState::AwaitingData => {
-                    *entry.get_mut() = AckState::Ignore;
-                }
-            },
-        }
-        Ok(())
-    }
-
-    pub fn create_backup(
-        &mut self,
-        index_type: IndexType,
-        backup_type: &str,
-        id: &str,
-        timestamp: i64,
-        file_name: &str,
-        chunk_size: usize,
-        file_size: Option<u64>,
-        is_new: bool,
-    ) -> Result<BackupStream> {
-        let backup_type = backup_type::name_to_id(backup_type)?;
-
-        if id.len() > 0xff {
-            bail!("id too long");
-        }
-
-        if file_name.len() > 0xff {
-            bail!("file name too long");
-        }
-
-        let mut flags: backup_flags::Type = 0;
-        if is_new {
-            flags |= backup_flags::EXCL;
-        }
-        if index_type == IndexType::Dynamic {
-            flags |= backup_flags::DYNAMIC_CHUNKS;
-            if file_size.is_some() {
-                bail!("file size must be None on dynamic backup streams");
-            }
-        } else if file_size.is_none() {
-            bail!("file size is mandatory for fixed backup streams");
-        }
-
-        let packet_id = self.next_id()?;
-        let mut packet = Packet::builder(packet_id, PacketType::CreateBackup);
-        packet
-            .write_data(client::CreateBackup {
-                backup_type,
-                id_length: id.len() as _,
-                timestamp: timestamp as u64,
-                flags,
-                name_length: file_name.len() as _,
-                chunk_size: chunk_size as _,
-                file_size: file_size.unwrap_or(0) as u64,
-            })
-            .write_buf(id.as_bytes())
-            .write_buf(file_name.as_bytes());
-
-        self.streams.insert(
-            packet_id,
-            BackupStreamData {
-                id: packet_id,
-                index_type,
-                pos: 0,
-                path: None,
-            },
-        );
-
-        self.expect_response_for_id(packet_id);
-        self.common.queue_data(packet.finish())?;
-        Ok(BackupStream(packet_id))
-    }
-
-    fn backup_created(&mut self) -> Result<()> {
-        let info = self
-            .common
-            .read_unaligned_data::<server::BackupCreated>(0)?;
-        let data = &self.common.packet_data()[mem::size_of_val(&info)..];
-        if data.len() != info.path_length as usize {
-            bail!("backup-created packet has invalid length");
-        }
-        let name = std::str::from_utf8(data)?;
-        let pkt_id = self.common.current_packet.id;
-        self.streams
-            .get_mut(&pkt_id)
-            .ok_or_else(|| format_err!("BackupCreated response for invalid stream: {}", pkt_id))?
-            .path = Some(name.to_string());
-        self.ack_id(pkt_id, true)?;
-        Ok(())
-    }
-
-    pub fn dynamic_chunk(&mut self, stream: BackupStream, entry: &ChunkEntry) -> Result<bool> {
-        self.dynamic_data(stream, &entry.hash, entry.size)
-    }
-
-    pub fn dynamic_data<T: Borrow<FixedChunk>>(
-        &mut self,
-        stream: BackupStream,
-        digest: &T,
-        size: u64,
-    ) -> Result<bool> {
-        let data = self
-            .streams
-            .get_mut(&stream.0)
-            .ok_or_else(|| format_err!("no such active backup stream"))?;
-
-        if data.index_type != IndexType::Dynamic {
-            bail!("dynamic_data called for stream of static chunks");
-        }
-
-        let mut packet = Packet::builder(data.id, PacketType::BackupDataDynamic);
-        packet
-            .write_data(data.pos as u64)
-            .write_buf(&digest.borrow().0);
-        data.pos += size;
-
-        self.common.queue_data(packet.finish())
-    }
-
-    pub fn fixed_data<T: Borrow<FixedChunk>>(
-        &mut self,
-        stream: BackupStream,
-        index: usize,
-        digest: &T,
-    ) -> Result<bool> {
-        let data = self
-            .streams
-            .get_mut(&stream.0)
-            .ok_or_else(|| format_err!("no such active backup stream"))?;
-
-        if data.index_type != IndexType::Fixed {
-            bail!("fixed_data called for stream of dynamic chunks");
-        }
-
-        let mut packet = Packet::builder(data.id, PacketType::BackupDataFixed);
-        packet
-            .write_data(index as u64)
-            .write_buf(&digest.borrow().0);
-
-        self.common.queue_data(packet.finish())
-    }
-
-    pub fn finish_backup(&mut self, stream: BackupStream) -> Result<(String, bool)> {
-        let path = self
-            .streams
-            .remove(&stream.0)
-            .ok_or_else(|| format_err!("no such active backup stream"))?
-            .path
-            .unwrap_or_else(|| "<no remote name received>".to_string());
-        let done = self
-            .common
-            .queue_data(Packet::simple(stream.0, PacketType::BackupFinished))?;
-        self.expect_ok_for_id(stream.0);
-        Ok((path, done))
-    }
-}
diff --git a/proxmox-protocol/src/common.rs b/proxmox-protocol/src/common.rs
deleted file mode 100644 (file)
index 3c7012b..0000000
+++ /dev/null
@@ -1,274 +0,0 @@
-use std::io::{self, Read, Write};
-use std::mem;
-use std::ptr;
-
-use failure::*;
-
-use endian_trait::Endian;
-
-use crate::protocol::*;
-
-type Result<T> = std::result::Result<T, Error>;
-
-pub(crate) struct Connection<S>
-where
-    S: Read + Write,
-{
-    socket: S,
-    pub buffer: Vec<u8>,
-    pub current_packet: Packet,
-    pub current_packet_type: PacketType,
-    pub error: bool,
-    pub eof: bool,
-    upload_queue: Option<(Vec<u8>, usize)>,
-}
-
-impl<S> Connection<S>
-where
-    S: Read + Write,
-{
-    pub fn new(socket: S) -> Self {
-        Self {
-            socket,
-            buffer: Vec::new(),
-            current_packet: unsafe { mem::zeroed() },
-            current_packet_type: PacketType::Error,
-            error: false,
-            eof: false,
-            upload_queue: None,
-        }
-    }
-
-    pub fn write_some(&mut self, buf: &[u8]) -> std::io::Result<usize> {
-        self.socket.write(buf)
-    }
-
-    /// It is safe to clear the error after an `io::ErrorKind::Interrupted`.
-    pub fn clear_err(&mut self) {
-        self.error = false;
-    }
-
-    // None => nothing was queued
-    // Some(true) => queue finished
-    // Some(false) => queue not finished
-    pub fn poll_send(&mut self) -> Result<Option<bool>> {
-        if let Some((ref data, ref mut pos)) = self.upload_queue {
-            loop {
-                match self.socket.write(&data[*pos..]) {
-                    Ok(put) => {
-                        *pos += put;
-                        if *pos == data.len() {
-                            self.upload_queue = None;
-                            return Ok(Some(true));
-                        }
-                        // Keep writing
-                        continue;
-                    }
-                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
-                        return Ok(Some(false));
-                    }
-                    Err(e) => return Err(e.into()),
-                }
-            }
-        } else {
-            Ok(None)
-        }
-    }
-
-    // Returns true when the data was also sent out, false if the queue is now full.
-    // For now we only allow a single dataset to be queued at once.
-    pub fn queue_data(&mut self, buf: Vec<u8>) -> Result<bool> {
-        if self.upload_queue.is_some() {
-            bail!("upload queue clash");
-        }
-
-        self.upload_queue = Some((buf, 0));
-
-        match self.poll_send()? {
-            None => unreachable!(), // We literally just set self.upload_queue to Some(value)
-            Some(v) => Ok(v),
-        }
-    }
-
-    // Returns 'true' if there's data available, 'false' if there isn't (if the
-    // underlying reader returned `WouldBlock` or the `read()` was short).
-    // Other errors are propagated.
-    pub fn poll_read(&mut self) -> Result<bool> {
-        if self.eof {
-            return Ok(false);
-        }
-
-        if self.error {
-            eprintln!("refusing to read from a client in error state");
-            bail!("client is in error state");
-        }
-
-        match self.poll_data_do() {
-            Ok(has_packet) => Ok(has_packet),
-            Err(e) => {
-                self.error = true;
-                Err(e)
-            }
-        }
-    }
-
-    fn poll_data_do(&mut self) -> Result<bool> {
-        if !self.read_packet()? {
-            return Ok(false);
-        }
-
-        if self.current_packet.length > MAX_PACKET_SIZE {
-            bail!("client tried to send a huge packet");
-        }
-
-        if !self.fill_packet()? {
-            return Ok(false);
-        }
-
-        Ok(true)
-    }
-
-    pub fn packet_length(&self) -> usize {
-        self.current_packet.length as usize
-    }
-
-    pub fn packet_data(&self) -> &[u8] {
-        let beg = mem::size_of::<Packet>();
-        let end = self.packet_length();
-        &self.buffer[beg..end]
-    }
-
-    pub fn next(&mut self) -> Result<()> {
-        let pktlen = self.packet_length();
-        unsafe {
-            if self.buffer.len() != pktlen {
-                std::ptr::copy_nonoverlapping(
-                    &self.buffer[pktlen],
-                    &mut self.buffer[0],
-                    self.buffer.len() - pktlen,
-                );
-            }
-            self.buffer.set_len(self.buffer.len() - pktlen);
-        }
-        Ok(())
-    }
-
-    // NOTE: After calling this you must `self.buffer.set_len()` when done!
-    #[must_use]
-    fn buffer_set_min_size(&mut self, size: usize) -> usize {
-        if self.buffer.capacity() < size {
-            self.buffer.reserve(size - self.buffer.len());
-        }
-        let start = self.buffer.len();
-        unsafe {
-            self.buffer.set_len(size);
-        }
-        start
-    }
-
-    fn fill_buffer(&mut self, size: usize) -> Result<bool> {
-        if self.buffer.len() >= size {
-            return Ok(true);
-        }
-        let mut filled = self.buffer_set_min_size(size);
-        loop {
-            // We don't use read_exact to not block too long or busy-read on nonblocking sockets...
-            match self.socket.read(&mut self.buffer[filled..]) {
-                Ok(got) => {
-                    if got == 0 {
-                        self.eof = true;
-                        unsafe {
-                            self.buffer.set_len(filled);
-                        }
-                        return Ok(false);
-                    }
-                    filled += got;
-                    if filled >= size {
-                        unsafe {
-                            self.buffer.set_len(filled);
-                        }
-                        return Ok(true);
-                    }
-                    // reloop
-                }
-                Err(e) => {
-                    unsafe {
-                        self.buffer.set_len(filled);
-                    }
-                    return Err(e.into());
-                }
-            }
-        }
-    }
-
-    fn read_packet_do(&mut self) -> Result<bool> {
-        if !self.fill_buffer(mem::size_of::<Packet>())? {
-            return Ok(false);
-        }
-
-        self.current_packet = self.read_unaligned::<Packet>(0)?.from_le();
-
-        self.current_packet_type = match PacketType::try_from(self.current_packet.pkttype) {
-            Some(t) => t,
-            None => bail!("unexpected packet type"),
-        };
-
-        let length = self.current_packet.length;
-        if (length as usize) < mem::size_of::<Packet>() {
-            bail!("received packet of bad length ({})", length);
-        }
-
-        Ok(true)
-    }
-
-    fn read_packet(&mut self) -> Result<bool> {
-        match self.read_packet_do() {
-            Ok(b) => Ok(b),
-            Err(e) => {
-                if let Some(ioe) = e.downcast_ref::<std::io::Error>() {
-                    if ioe.kind() == io::ErrorKind::WouldBlock {
-                        return Ok(false);
-                    }
-                }
-                Err(e)
-            }
-        }
-    }
-
-    fn read_unaligned<T: Endian>(&self, offset: usize) -> Result<T> {
-        if offset + mem::size_of::<T>() > self.buffer.len() {
-            bail!("buffer underrun");
-        }
-        Ok(unsafe { ptr::read_unaligned(&self.buffer[offset] as *const _ as *const T) }.from_le())
-    }
-
-    pub fn read_unaligned_data<T: Endian>(&self, offset: usize) -> Result<T> {
-        self.read_unaligned(offset + mem::size_of::<Packet>())
-    }
-
-    fn fill_packet(&mut self) -> Result<bool> {
-        self.fill_buffer(self.current_packet.length as usize)
-    }
-
-    // convenience helpers:
-
-    pub fn assert_size(&self, size: usize) -> Result<()> {
-        if self.packet_data().len() != size {
-            bail!(
-                "protocol error: invalid packet size (type {})",
-                self.current_packet.pkttype,
-            );
-        }
-        Ok(())
-    }
-
-    pub fn assert_atleast(&self, size: usize) -> Result<()> {
-        if self.packet_data().len() < size {
-            bail!(
-                "protocol error: invalid packet size (type {})",
-                self.current_packet.pkttype,
-            );
-        }
-        Ok(())
-    }
-}
diff --git a/proxmox-protocol/src/connect.rs b/proxmox-protocol/src/connect.rs
deleted file mode 100644 (file)
index 37b653b..0000000
+++ /dev/null
@@ -1,225 +0,0 @@
-//! This module provides a `Connector` used to log into a Proxmox Backup API server and connect to
-//! the proxmox protocol via an HTTP Upgrade request.
-
-use std::io::{Read, Write};
-use std::net::TcpStream;
-
-use failure::{bail, format_err, Error};
-use openssl::ssl::{self, SslStream};
-use url::form_urlencoded;
-
-use crate::Client;
-
-enum Authentication {
-    Password(String),
-    Ticket(String, String),
-}
-
-/// Connector used to log into a Proxmox Backup API server and open a backup protocol connection.
-/// If successful, this will create a `Client` used to communicate over the Proxmox Backup
-/// Protocol.
-pub struct Connector {
-    user: String,
-    server: String,
-    store: String,
-    auth: Option<Authentication>,
-    certificate_validation: bool,
-}
-
-fn build_login(host: &str, user: &str, pass: &str) -> Vec<u8> {
-    let formdata = form_urlencoded::Serializer::new(String::new())
-        .append_pair("username", user)
-        .append_pair("password", pass)
-        .finish();
-
-    format!("\
-        POST /api2/json/access/ticket HTTP/1.1\r\n\
-        host: {}\r\n\
-        content-length: {}\r\n\
-        content-type: application/x-www-form-urlencoded\r\n\
-        \r\n\
-        {}",
-        host,
-        formdata.as_bytes().len(),
-        formdata,
-    )
-    .into_bytes()
-}
-
-fn build_protocol_connect(host: &str, store: &str, ticket: &str, token: &str) -> Vec<u8> {
-    format!("\
-        GET /api2/json/admin/datastore/{}/test-upload HTTP/1.1\r\n\
-        host: {}\r\n\
-        connection: upgrade\r\n\
-        upgrade: proxmox-backup-protocol-1\r\n\
-        cookie: PBSAuthCookie={}\r\n\
-        CSRFPreventionToken: {}\r\n\
-        \r\n",
-        store,
-        host,
-        ticket,
-        token,
-    )
-    .into_bytes()
-}
-
-// Minimalistic http response reader. The only things we care about here are the status code and
-// the payload...
-fn read_http_response<T: Read>(sock: T) -> Result<(u16, Vec<u8>), Error> {
-    use std::io::BufRead;
-    let mut reader = std::io::BufReader::new(sock);
-
-    let mut status = String::new();
-    reader.read_line(&mut status)?;
-
-    let status = status.trim_end();
-    let mut parts = status.splitn(3, ' ');
-    let _version = parts
-        .next()
-        .ok_or_else(|| format_err!("bad http response (missing version)"))?;
-    let code = parts
-        .next()
-        .ok_or_else(|| format_err!("bad http response (missing status code)"))?;
-    let _reason = parts.next();
-
-    let code: u16 = code.parse()?;
-
-    // We need the payload's length if there is one:
-    let mut length: Option<u32> = None;
-    let mut line = String::new();
-    loop {
-        line.clear();
-        reader.read_line(&mut line)?;
-        let line = line.trim_end();
-
-        if line.len() == 0 {
-            break;
-        }
-
-        let parts: Vec<&str> = line.splitn(2, ':').collect();
-        if parts.len() != 2 {
-            bail!("invalid header in http response");
-        }
-
-        let name = parts[0].trim().to_lowercase().to_string();
-        let value = parts[1].trim();
-
-        // The only important header (important to know how much we need to read!)
-        if name == "content-length" {
-            length = Some(value.parse()?);
-        }
-
-        // Don't care about any other header contents currently...
-    }
-
-    match length {
-        None => Ok((code, Vec::new())),
-        Some(length) => {
-            let length = length as usize;
-
-            let mut out = Vec::with_capacity(length);
-            unsafe {
-                out.set_len(length);
-            }
-
-            reader.read_exact(&mut out)?;
-            Ok((code, out))
-        },
-    }
-}
-
-fn parse_login_response(data: &[u8]) -> Result<(String, String), Error> {
-    let value: serde_json::Value = serde_json::from_slice(data)?;
-    let ticket = value["data"]["ticket"]
-        .as_str()
-        .ok_or_else(|| format_err!("no ticket found in login response"))?
-        .to_string();
-    let token = value["data"]["CSRFPreventionToken"]
-        .as_str()
-        .ok_or_else(|| format_err!("no token found in login response"))?
-        .to_string();
-    Ok((ticket, token))
-}
-
-impl Connector {
-    /// Create a new connector for a specified user, server and remote backup store.
-    pub fn new(user: String, server: String, store: String) -> Self {
-        Self {
-            user,
-            server,
-            store,
-            auth: None,
-            certificate_validation: true,
-        }
-    }
-
-    /// Use a password to authenticate with the remote server.
-    pub fn password(mut self, pass: String) -> Self {
-        self.auth = Some(Authentication::Password(pass));
-        self
-    }
-
-    /// Use an already existing ticket to connect to the server.
-    pub fn ticket(mut self, ticket: String, token: String) -> Self {
-        self.auth = Some(Authentication::Ticket(ticket, token));
-        self
-    }
-
-    /// Disable TLS certificate validation.
-    pub fn certificate_validation(mut self, on: bool) -> Self {
-        self.certificate_validation = on;
-        self
-    }
-
-    pub(crate) fn do_connect(self) -> Result<SslStream<TcpStream>, Error> {
-        if self.auth.is_none() {
-            bail!("missing authentication");
-        }
-
-        let stream = TcpStream::connect(&self.server)?;
-
-        let mut connector = ssl::SslConnector::builder(ssl::SslMethod::tls())?;
-        if !self.certificate_validation {
-            connector.set_verify(ssl::SslVerifyMode::NONE);
-        }
-        let connector = connector.build();
-
-        let mut stream = connector.connect(&self.server, stream)?;
-        let (ticket, token) = match self.auth {
-            None => unreachable!(), // checked above
-            Some(Authentication::Password(password)) => {
-                let login_request = build_login(&self.server, &self.user, &password);
-                stream.write_all(&login_request)?;
-
-                let (code, ticket) = read_http_response(&mut stream)?;
-                if code != 200 {
-                    bail!("login failed");
-                }
-
-                parse_login_response(&ticket)?
-            }
-            Some(Authentication::Ticket(ticket, token)) => (ticket, token),
-        };
-
-        let protocol_request = build_protocol_connect(&self.server, &self.store, &ticket, &token);
-        stream.write_all(&protocol_request)?;
-        let (code, _empty_body) = read_http_response(&mut stream)?;
-        if code != 101 {
-            bail!("expected 101 Switching Protocol, received code: {}", code);
-        }
-
-        Ok(stream)
-    }
-
-    /// This creates creates a synchronous client (via blocking I/O), tries to authenticate with
-    /// the server and connect to the protocol endpoint. On success, a `Client` is returned.
-    pub fn connect(self) -> Result<Client<SslStream<TcpStream>>, Error> {
-        let stream = self.do_connect()?;
-
-        let mut client = Client::new(stream);
-        if !client.wait_for_handshake()? {
-            bail!("handshake failed");
-        }
-        Ok(client)
-    }
-}
diff --git a/proxmox-protocol/src/lib.rs b/proxmox-protocol/src/lib.rs
deleted file mode 100644 (file)
index d08e94c..0000000
+++ /dev/null
@@ -1,24 +0,0 @@
-pub(crate) mod common;
-
-pub mod protocol;
-pub mod server;
-pub mod tools;
-
-mod chunk_stream;
-pub use chunk_stream::*;
-
-mod chunker;
-pub use chunker::*;
-
-mod client;
-pub use client::*;
-
-mod connect;
-pub use connect::*;
-
-mod types;
-pub use types::*;
-
-pub mod c_chunker;
-pub mod c_client;
-pub mod c_connector;
diff --git a/proxmox-protocol/src/protocol.rs b/proxmox-protocol/src/protocol.rs
deleted file mode 100644 (file)
index 1cd958f..0000000
+++ /dev/null
@@ -1,281 +0,0 @@
-use std::mem;
-
-use endian_trait::Endian;
-
-// There's no reason to have more than that in a single packet...
-pub const MAX_PACKET_SIZE: u32 = 16 * 1024 * 1024;
-
-// Each packet has a transaction ID (eg. when uploading multiple disks each
-// upload is a separate stream).
-#[derive(Endian)]
-#[repr(C, packed)]
-pub struct Packet {
-    pub id: u8,      // request/command id
-    pub pkttype: u8, // packet type
-    pub length: u32, // data length before the next packet
-
-                     // content is attached directly afterwards
-}
-
-impl Packet {
-    pub fn builder(id: u8, pkttype: PacketType) -> PacketBuilder {
-        PacketBuilder::new(id, pkttype)
-    }
-
-    pub fn simple(id: u8, pkttype: PacketType) -> Vec<u8> {
-        Self::builder(id, pkttype).finish()
-    }
-}
-
-#[derive(Endian, Clone, Copy)]
-#[repr(u8)]
-pub enum ErrorId {
-    Generic,
-    Busy,
-}
-
-#[repr(u8)]
-#[derive(Clone, Copy)]
-pub enum PacketType {
-    /// First packet sent by the server.
-    Hello,
-
-    /// Generic acknowledgement.
-    Ok,
-
-    /// Error packet sent by the server, this cancels the request for which it is produced.
-    Error,
-
-    /// The client wants the list of available hashes in order to know which ones require an
-    /// upload.
-    ///
-    /// The body should contain a backup file name of which to retrieve the hash list.
-    ///
-    /// Server responds with a sequence of ``HashListPart`` packets.
-    GetHashList,
-
-    /// Array of hashes. The number of hashes in a packet is calculated from the packet length as
-    /// provided in the ``Packet`` struct. An empty packet indicates the end of the list. We send a
-    /// sequence of such packets because we don't know whether the server will be keeping the list
-    /// in memory yet, so it might not know the number in advance and may be iterating through
-    /// directories until it hits an end. It can produce the network packets asynchronously while
-    /// walking the chunk dir.
-    HashListPart,
-
-    /// Client requests to download chunks via a hash list from the server. The number of chunks
-    /// can be derived from the length of this request, so it works similar to ``HashListPart``,
-    /// but there's only 1 chunk list per request ID.
-    ///
-    /// The server responds with a sequence of ``Chunk`` packets or ``Error``.
-    DownloadChunks,
-
-    /// The response to ``DownloadChunks``. One packet per requested chunk.
-    Chunk,
-
-    /// The upload of a chunk can happen independent from the ongoing backup
-    /// streams. Server responds with an ``OK``.
-    UploadChunk,
-
-    /// Create a file in a new or existing backup. Contains all the metadata of
-    /// a file.
-    ///
-    /// The server responds with ``BackupCreated`` or ``Error``. On ``BackupCreated`` the client
-    /// may proceed to send as many ``BackupData...`` packets as necessary to fill the file.
-    /// The sequence is finished by the client with a ``BackupFinished``.
-    CreateBackup,
-
-    /// Successful from the server to a client's ``CreateBackup`` packet. Contains the server side
-    /// path relative to the store.
-    BackupCreated,
-
-    /// This packet contains an array of references to fixed sized chunks.  Clients should upload
-    /// chunks via ``UploadChunk`` packets before using them in this type of packet. A non-existent
-    /// chunk is an error.
-    ///
-    /// The server produces an ``Error`` packet in case of an error.
-    BackupDataFixed,
-
-    /// This packet contains an array of references to dynamic sized chunks.  Clients should upload
-    /// chunks via ``UploadChunk`` packets before using them in this type of packet. A non-existent
-    /// chunk is an error.
-    ///
-    /// The server produces an ``Error`` packet in case of an error.
-    BackupDataDynamic,
-
-    /// This ends a backup file. The server responds with an ``OK`` or an ``Error`` packet.
-    BackupFinished,
-}
-
-// Nightly has a std::convert::TryFrom, actually...
-impl PacketType {
-    pub fn try_from(v: u8) -> Option<Self> {
-        if v <= PacketType::BackupFinished as u8 {
-            Some(unsafe { std::mem::transmute(v) })
-        } else {
-            None
-        }
-    }
-}
-
-// Not using bitflags! for Endian derive...
-pub mod backup_flags {
-    pub type Type = u8;
-    /// The backup must not exist yet.
-    pub const EXCL: Type = 0x00000001;
-    /// The data represents a raw file
-    pub const RAW: Type = 0x00000002;
-    /// The data uses dynamically sized chunks (catar file)
-    pub const DYNAMIC_CHUNKS: Type = 0x00000004;
-}
-
-pub mod backup_type {
-    pub type Type = u8;
-    pub const VM: Type = 0;
-    pub const CT: Type = 1;
-    pub const HOST: Type = 2;
-
-    use failure::{bail, Error};
-    pub fn id_to_name(id: Type) -> Result<&'static str, Error> {
-        Ok(match id {
-            VM => "vm",
-            CT => "ct",
-            HOST => "host",
-            n => bail!("unknown backup type id: {}", n),
-        })
-    }
-
-    pub fn name_to_id(id: &str) -> Result<Type, Error> {
-        Ok(match id {
-            "vm" => VM,
-            "ct" => CT,
-            "host" => HOST,
-            n => bail!("unknown backup type name: {}", n),
-        })
-    }
-}
-
-#[repr(C, packed)]
-#[derive(Endian)]
-pub struct DynamicChunk {
-    pub offset: u64,
-    pub digest: [u8; 32],
-}
-
-pub mod server {
-    use endian_trait::Endian;
-
-    pub const PROTOCOL_VERSION: u32 = 1;
-
-    pub const HELLO_MAGIC: [u8; 8] = *b"PMXBCKUP";
-
-    pub const HELLO_VERSION: u32 = 1; // the current version
-    #[derive(Endian)]
-    #[repr(C, packed)]
-    pub struct Hello {
-        pub magic: [u8; 8],
-        pub version: u32,
-    }
-
-    #[derive(Endian)]
-    #[repr(C, packed)]
-    pub struct Error {
-        pub id: u8,
-    }
-
-    #[derive(Endian)]
-    #[repr(C, packed)]
-    pub struct Chunk {
-        pub hash: super::DynamicChunk,
-        // Data follows here...
-    }
-
-    #[derive(Endian)]
-    #[repr(C, packed)]
-    pub struct BackupCreated {
-        pub path_length: u16,
-        // path follows here
-    }
-}
-
-pub mod client {
-    use endian_trait::Endian;
-
-    #[derive(Endian)]
-    #[repr(C, packed)]
-    pub struct UploadChunk {
-        pub hash: crate::FixedChunk,
-    }
-
-    #[derive(Endian)]
-    #[repr(C, packed)]
-    pub struct CreateBackup {
-        pub backup_type: super::backup_type::Type,
-        pub id_length: u8,  // length of the ID string
-        pub timestamp: u64, // seconds since the epoch
-        pub flags: super::backup_flags::Type,
-        pub name_length: u8, // file name length
-        pub chunk_size: u32, // average or "fixed" chunk size
-        pub file_size: u64,  // size for fixed size files (must be 0 if DYNAMIC_CHUNKS is set)
-
-                             // ``id_length`` bytes of ID follow
-                             // ``name_length`` bytes of file name follow
-                             // Further packets contain the data or chunks
-    }
-
-    #[derive(Endian)]
-    #[repr(C, packed)]
-    pub struct GetHashList {
-        pub name_length: u16,
-        // name follows as payload
-    }
-}
-
-pub struct PacketBuilder {
-    data: Vec<u8>,
-}
-
-impl PacketBuilder {
-    pub fn new(id: u8, pkttype: PacketType) -> Self {
-        let data = Vec::with_capacity(mem::size_of::<Packet>());
-        let mut me = Self { data };
-        me.write_data(
-            Packet {
-                id,
-                pkttype: pkttype as _,
-                length: 0,
-            }
-            .to_le(),
-        );
-        me
-    }
-
-    pub fn reserve(&mut self, more: usize) -> &mut Self {
-        self.data.reserve(more);
-        self
-    }
-
-    pub fn write_buf(&mut self, buf: &[u8]) -> &mut Self {
-        self.data.extend_from_slice(buf);
-        self
-    }
-
-    pub fn write_data<T: Endian>(&mut self, data: T) -> &mut Self {
-        self.write_data_noswap(&data.to_le())
-    }
-
-    pub fn write_data_noswap<T>(&mut self, data: &T) -> &mut Self {
-        self.write_buf(unsafe {
-            std::slice::from_raw_parts(data as *const T as *const u8, mem::size_of::<T>())
-        })
-    }
-
-    pub fn finish(mut self) -> Vec<u8> {
-        let length = self.data.len();
-        assert!(length >= mem::size_of::<Packet>());
-        unsafe {
-            let head = self.data.as_mut_ptr() as *mut Packet;
-            std::ptr::write_unaligned((&mut (*head).length) as *mut u32, (length as u32).to_le());
-        }
-        self.data
-    }
-}
diff --git a/proxmox-protocol/src/server.rs b/proxmox-protocol/src/server.rs
deleted file mode 100644 (file)
index 20e8417..0000000
+++ /dev/null
@@ -1,466 +0,0 @@
-use std::collections::hash_map::{self, HashMap};
-use std::io::{Read, Write};
-use std::{mem, ptr};
-
-use failure::*;
-
-use endian_trait::Endian;
-
-use crate::common;
-use crate::protocol::*;
-use crate::ChunkEntry;
-use crate::FixedChunk;
-
-type Result<T> = std::result::Result<T, Error>;
-
-pub trait ChunkList: Send {
-    fn next(&mut self) -> Result<Option<&[u8; 32]>>;
-}
-
-/// This provides callbacks used by a `Connection` when it receives a packet.
-pub trait HandleClient {
-    /// The protocol handler will call this when the client produces an irrecoverable error.
-    fn error(&self);
-
-    /// The client wants the list of hashes, the provider should provide an iterator over chunk
-    /// entries.
-    fn get_chunk_list(&self, backup_name: &str) -> Result<Box<dyn ChunkList>>;
-
-    /// The client has uploaded a chunk, we should add it to the chunk store. Return whether the
-    /// chunk was new.
-    fn upload_chunk(&self, chunk: &ChunkEntry, data: &[u8]) -> Result<bool>;
-
-    /// The client wants to create a backup. Since multiple backup streams can happen in parallel,
-    /// this should return a handler used to create the individual streams.
-    /// The handler will be informed about success via the ``finish()`` method.
-    fn create_backup(
-        &self,
-        backup_type: &str,
-        id: &str,
-        timestamp: i64,
-        new: bool,
-    ) -> Result<Box<dyn HandleBackup + Send>>;
-}
-
-/// A single backup may contain multiple files. Currently we represent this via a hierarchy where
-/// the `HandleBackup` trait is instantiated for each backup, which is responsible for
-/// instantiating the `BackupFile` trait objects.
-pub trait HandleBackup {
-    /// All individual streams for this backup have been successfully finished.
-    fn finish(&mut self) -> Result<()>;
-
-    /// Create a specific file in this backup.
-    fn create_file(
-        &self,
-        name: &str,
-        fixed_size: Option<u64>,
-        chunk_size: usize,
-    ) -> Result<Box<dyn BackupFile + Send>>;
-}
-
-/// This handles backup files created by calling `create_file` on a `Backup`.
-pub trait BackupFile {
-    /// Backup use the server-local timestamp formatting, so we want to be able to tell the client
-    /// the real remote path:
-    fn relative_path(&self) -> &str;
-
-    /// The client wants to add a chunk to a fixed index file at a certain position.
-    fn add_fixed_data(&mut self, index: u64, chunk: &FixedChunk) -> Result<()>;
-
-    /// The client wants to add a chunks to a dynamic index file.
-    fn add_dynamic_data(&mut self, chunk: &DynamicChunk) -> Result<()>;
-
-    /// This notifies the handler that the backup has finished successfully. This should commit the
-    /// data to the store for good. After this the client will receive an "ok".
-    ///
-    /// If the Drop handler gets called before this method, the backup was aborted due to an error
-    /// or the client disconnected unexpectedly, in which case cleanup of temporary files should be
-    /// performed.
-    fn finish(&mut self) -> Result<()>;
-}
-
-#[derive(Clone, Eq, Hash, PartialEq)]
-struct BackupId(backup_type::Type, String, i64);
-
-/// Associates a socket with the server side of the backup protocol.
-/// The communcation channel should be `Read + Write` and may be non-blocking (provided it
-/// correctly returns `io::ErrorKind::WouldBlock`).
-/// The handler has to implement the `HandleClient` trait to provide callbacks used while
-/// communicating with the client.
-pub struct Connection<S, H>
-where
-    S: Read + Write,
-    H: HandleClient,
-{
-    handler: H,
-    common: common::Connection<S>,
-
-    // states:
-
-    // If this is set we are currently transferring our hash list to the client:
-    hash_list: Option<(
-        u8, // data stream ID
-        Box<dyn ChunkList>,
-    )>,
-
-    // currently active 'backups' (handlers for a specific BackupDir)
-    backups: HashMap<BackupId, Box<dyn HandleBackup + Send>>,
-
-    // currently active backup *file* streams
-    backup_files: HashMap<u8, Box<dyn BackupFile + Send>>,
-}
-
-impl<S, H> Connection<S, H>
-where
-    S: Read + Write,
-    H: HandleClient,
-{
-    pub fn new(socket: S, handler: H) -> Result<Self> {
-        let mut me = Self {
-            handler,
-            common: common::Connection::new(socket),
-            hash_list: None,
-            backups: HashMap::new(),
-            backup_files: HashMap::new(),
-        };
-
-        me.send_hello()?;
-        Ok(me)
-    }
-
-    fn send_hello(&mut self) -> Result<()> {
-        let mut packet = Packet::builder(0, PacketType::Hello);
-        packet.write_data(server::Hello {
-            magic: server::HELLO_MAGIC,
-            version: server::PROTOCOL_VERSION,
-        });
-        self.common.queue_data(packet.finish())?;
-        Ok(())
-    }
-
-    pub fn eof(&self) -> bool {
-        self.common.eof
-    }
-
-    /// It is safe to clear the error after an `io::ErrorKind::Interrupted`.
-    pub fn clear_err(&mut self) {
-        self.common.clear_err()
-    }
-
-    pub fn main(&mut self) -> Result<()> {
-        self.poll_read()?;
-        self.poll_send()?;
-        Ok(())
-    }
-
-    // If this returns an error it is considered fatal and the connection should be dropped!
-    fn poll_read(&mut self) -> Result<()> {
-        if self.common.eof {
-            // polls after EOF are errors:
-            bail!("client disconnected");
-        }
-
-        if !self.common.poll_read()? {
-            // No data available
-            if self.common.eof {
-                bail!("client disconnected");
-            }
-            return Ok(());
-        }
-
-        // we received a packet, handle it:
-
-        loop {
-            use PacketType::*;
-            match self.common.current_packet_type {
-                GetHashList => self.hash_list_requested()?,
-                UploadChunk => self.receive_chunk()?,
-                CreateBackup => self.create_backup()?,
-                BackupDataDynamic => self.backup_data_dynamic()?,
-                BackupDataFixed => self.backup_data_fixed()?,
-                BackupFinished => self.backup_finished()?,
-                _ => bail!(
-                    "client sent an unexpected packet of type {}",
-                    self.common.current_packet_type as u32,
-                ),
-            };
-            self.common.next()?;
-            if !self.common.poll_read()? {
-                break;
-            }
-        }
-
-        Ok(())
-    }
-
-    fn poll_send(&mut self) -> Result<()> {
-        if self.common.error {
-            eprintln!("refusing to send datato client in error state");
-            bail!("client is in error state");
-        }
-
-        if let Some(false) = self.common.poll_send()? {
-            // send queue is not finished, don't add anything else...
-            return Ok(());
-        }
-
-        // Queue has either finished or was empty, see if we should enqueue more data:
-        if self.hash_list.is_some() {
-            return self.send_hash_list();
-        }
-        Ok(())
-    }
-
-    fn hash_list_requested(&mut self) -> Result<()> {
-        // Verify protocol: GetHashList is an empty packet.
-        let request = self.common.read_unaligned_data::<client::GetHashList>(0)?;
-        self.common
-            .assert_size(mem::size_of_val(&request) + request.name_length as usize)?;
-        let name_bytes = &self.common.packet_data()[mem::size_of_val(&request)..];
-        let name = std::str::from_utf8(name_bytes)?;
-
-        // We support only one active hash list stream:
-        if self.hash_list.is_some() {
-            return self.respond_error(ErrorId::Busy);
-        }
-
-        self.hash_list = Some((
-            self.common.current_packet.id,
-            self.handler.get_chunk_list(name)?,
-        ));
-
-        Ok(())
-    }
-
-    fn send_hash_list(&mut self) -> Result<()> {
-        loop {
-            let (stream_id, hash_iter) = self.hash_list.as_mut().unwrap();
-
-            let max_chunks_per_packet = (MAX_PACKET_SIZE as usize - mem::size_of::<Packet>())
-                / mem::size_of::<FixedChunk>();
-
-            let mut packet = Packet::builder(*stream_id, PacketType::HashListPart);
-            packet.reserve(mem::size_of::<FixedChunk>() * max_chunks_per_packet);
-
-            let mut count = 0;
-            for _ in 0..max_chunks_per_packet {
-                let entry: &[u8; 32] = match hash_iter.next() {
-                    Ok(Some(entry)) => entry,
-                    Ok(None) => break,
-                    Err(e) => {
-                        eprintln!("error sending chunk list to client: {}", e);
-                        continue;
-                    }
-                };
-
-                packet.write_buf(entry);
-                count += 1;
-            }
-
-            let can_send_more = self.common.queue_data(packet.finish())?;
-
-            if count == 0 {
-                // We just sent the EOF packet, clear our iterator state!
-                self.hash_list = None;
-                break;
-            }
-
-            if !can_send_more {
-                break;
-            }
-        }
-        Ok(())
-    }
-
-    fn respond_error(&mut self, kind: ErrorId) -> Result<()> {
-        self.respond_value(PacketType::Error, kind)?;
-        Ok(())
-    }
-
-    fn respond_value<T: Endian>(&mut self, pkttype: PacketType, data: T) -> Result<()> {
-        let mut packet = Packet::builder(self.common.current_packet.id, pkttype);
-        packet.write_data(data);
-        self.common.queue_data(packet.finish())?;
-        Ok(())
-    }
-
-    fn respond_empty(&mut self, pkttype: PacketType) -> Result<()> {
-        self.common
-            .queue_data(Packet::simple(self.common.current_packet.id, pkttype))?;
-        Ok(())
-    }
-
-    fn respond_ok(&mut self) -> Result<()> {
-        self.respond_empty(PacketType::Ok)
-    }
-
-    fn receive_chunk(&mut self) -> Result<()> {
-        self.common
-            .assert_atleast(mem::size_of::<client::UploadChunk>())?;
-        let data = self.common.packet_data();
-        let (hash, data) = data.split_at(mem::size_of::<FixedChunk>());
-        if data.len() == 0 {
-            bail!("received an empty chunk");
-        }
-        let entry = ChunkEntry::from_data(data);
-        if entry.hash != hash {
-            let cli_hash = crate::tools::digest_to_hex(hash);
-            let data_hash = entry.digest_to_hex();
-            bail!(
-                "client claimed data with digest {} has digest {}",
-                data_hash,
-                cli_hash
-            );
-        }
-        let _new = self.handler.upload_chunk(&entry, data)?;
-        self.respond_ok()
-    }
-
-    fn create_backup(&mut self) -> Result<()> {
-        if self
-            .backup_files
-            .contains_key(&self.common.current_packet.id)
-        {
-            bail!("stream id already in use...");
-        }
-
-        let create_msg = self.common.read_unaligned_data::<client::CreateBackup>(0)?;
-
-        // simple data:
-        let flags = create_msg.flags;
-        let backup_type = create_msg.backup_type;
-        let time = create_msg.timestamp as i64;
-
-        // text comes from the payload data after the CreateBackup struct:
-        let data = self.common.packet_data();
-        let payload = &data[mem::size_of_val(&create_msg)..];
-
-        // there must be exactly the ID and the file name in the payload:
-        let id_len = create_msg.id_length as usize;
-        let name_len = create_msg.name_length as usize;
-        let expected_len = id_len + name_len;
-        if payload.len() < expected_len {
-            bail!("client sent incomplete CreateBackup request");
-        } else if payload.len() > expected_len {
-            bail!("client sent excess data in CreateBackup request");
-        }
-
-        // id and file name must be utf8:
-        let id = std::str::from_utf8(&payload[0..id_len])
-            .map_err(|e| format_err!("client-requested backup id is invalid: {}", e))?;
-        let file_name = std::str::from_utf8(&payload[id_len..])
-            .map_err(|e| format_err!("client-requested backup file name invalid: {}", e))?;
-
-        // Sanity check dynamic vs fixed:
-        let is_dynamic = (flags & backup_flags::DYNAMIC_CHUNKS) != 0;
-        let file_size = match (is_dynamic, create_msg.file_size) {
-            (false, size) => Some(size),
-            (true, 0) => None,
-            (true, _) => bail!("file size of dynamic streams must be zero"),
-        };
-
-        // search or create the handler:
-        let hashmap_id = BackupId(backup_type, id.to_string(), time);
-        let handle = match self.backups.entry(hashmap_id) {
-            hash_map::Entry::Vacant(entry) => entry.insert(self.handler.create_backup(
-                backup_type::id_to_name(backup_type)?,
-                id,
-                time,
-                (flags & backup_flags::EXCL) != 0,
-            )?),
-            hash_map::Entry::Occupied(entry) => entry.into_mut(),
-        };
-        let file = handle.create_file(file_name, file_size, create_msg.chunk_size as usize)?;
-
-        let mut response =
-            Packet::builder(self.common.current_packet.id, PacketType::BackupCreated);
-        let path = file.relative_path();
-        if path.len() > 0xffff {
-            bail!("path too long");
-        }
-        response
-            .write_data(server::BackupCreated {
-                path_length: path.len() as _,
-            })
-            .write_buf(path.as_bytes());
-        self.common.queue_data(response.finish())?;
-
-        self.backup_files
-            .insert(self.common.current_packet.id, file);
-
-        Ok(())
-    }
-
-    fn backup_data_dynamic(&mut self) -> Result<()> {
-        let stream_id = self.common.current_packet.id;
-        let file = self
-            .backup_files
-            .get_mut(&stream_id)
-            .ok_or_else(|| format_err!("BackupDataDynamic for invalid stream id {}", stream_id))?;
-
-        let mut data = self.common.packet_data();
-        // Data consists of (offset: u64, hash: [u8; 32])
-        let entry_len = mem::size_of::<DynamicChunk>();
-
-        while data.len() >= entry_len {
-            let mut entry = unsafe { ptr::read_unaligned(data.as_ptr() as *const DynamicChunk) };
-            data = &data[entry_len..];
-
-            entry.offset = entry.offset.from_le();
-            file.add_dynamic_data(&entry)?;
-        }
-
-        if data.len() != 0 {
-            bail!(
-                "client sent excess data ({} bytes) after dynamic chunk indices!",
-                data.len()
-            );
-        }
-
-        Ok(())
-    }
-
-    fn backup_data_fixed(&mut self) -> Result<()> {
-        let stream_id = self.common.current_packet.id;
-        let file = self
-            .backup_files
-            .get_mut(&stream_id)
-            .ok_or_else(|| format_err!("BackupDataFixed for invalid stream id {}", stream_id))?;
-
-        let mut data = self.common.packet_data();
-        // Data consists of (index: u64, hash: [u8; 32])
-        #[repr(C, packed)]
-        struct IndexedChunk {
-            index: u64,
-            digest: FixedChunk,
-        }
-        let entry_len = mem::size_of::<IndexedChunk>();
-
-        while data.len() >= entry_len {
-            let mut entry = unsafe { ptr::read_unaligned(data.as_ptr() as *const IndexedChunk) };
-            data = &data[entry_len..];
-
-            entry.index = entry.index.from_le();
-            file.add_fixed_data(entry.index, &entry.digest)?;
-        }
-
-        if data.len() != 0 {
-            bail!(
-                "client sent excess data ({} bytes) after dynamic chunk indices!",
-                data.len()
-            );
-        }
-
-        Ok(())
-    }
-
-    fn backup_finished(&mut self) -> Result<()> {
-        let stream_id = self.common.current_packet.id;
-        let mut file = self
-            .backup_files
-            .remove(&stream_id)
-            .ok_or_else(|| format_err!("BackupDataDynamic for invalid stream id {}", stream_id))?;
-        file.finish()?;
-        self.respond_ok()
-    }
-}
diff --git a/proxmox-protocol/src/tools.rs b/proxmox-protocol/src/tools.rs
deleted file mode 100644 (file)
index a614e28..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-use failure::{bail, Error};
-
-pub fn digest_to_hex(digest: &[u8]) -> String {
-    const HEX_CHARS: &'static [u8; 16] = b"0123456789abcdef";
-
-    let mut buf = Vec::<u8>::with_capacity(digest.len() * 2);
-
-    for i in 0..digest.len() {
-        buf.push(HEX_CHARS[(digest[i] >> 4) as usize]);
-        buf.push(HEX_CHARS[(digest[i] & 0xf) as usize]);
-    }
-
-    unsafe { String::from_utf8_unchecked(buf) }
-}
-
-pub unsafe fn swapped_data_to_buf<T>(data: &T) -> &[u8] {
-    std::slice::from_raw_parts(data as *const T as *const u8, std::mem::size_of::<T>())
-}
-
-fn hex_nibble(c: u8) -> Result<u8, Error> {
-    Ok(match c {
-        b'0'..=b'9' => c - b'0',
-        b'a'..=b'f' => c - b'a' + 0xa,
-        b'A'..=b'F' => c - b'A' + 0xa,
-        _ => bail!("not a hex digit: {}", c as char),
-    })
-}
-
-#[inline]
-pub fn parse_hex_digest<T: AsRef<[u8]>>(hex: T) -> Result<[u8; 32], Error> {
-    let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() };
-
-    let hex = hex.as_ref();
-
-    if hex.len() != 64 {
-        bail!(
-            "invalid hex digest ({} instead of 64 digits long)",
-            hex.len()
-        );
-    }
-
-    for i in 0..32 {
-        digest[i] = (hex_nibble(hex[i * 2])? << 4) + hex_nibble(hex[i * 2 + 1])?;
-    }
-
-    Ok(digest)
-}
diff --git a/proxmox-protocol/src/types.rs b/proxmox-protocol/src/types.rs
deleted file mode 100644 (file)
index 7bf759a..0000000
+++ /dev/null
@@ -1,98 +0,0 @@
-use std::borrow::Borrow;
-
-use endian_trait::Endian;
-use failure::*;
-
-#[derive(Clone, Copy, Debug, Eq, PartialEq)]
-pub enum IndexType {
-    Fixed,
-    Dynamic,
-}
-
-#[derive(Endian, Clone, Debug, Eq, Hash, PartialEq)]
-#[repr(transparent)]
-pub struct FixedChunk(pub [u8; 32]);
-
-impl FixedChunk {
-    pub fn new(hash: [u8; 32]) -> Self {
-        Self(hash)
-    }
-
-    pub fn from_hex<T: AsRef<[u8]>>(hex: T) -> Result<Self, Error> {
-        Ok(Self::new(crate::tools::parse_hex_digest(hex.as_ref())?))
-    }
-
-    pub fn from_data(data: &[u8]) -> Self {
-        let mut hasher = openssl::sha::Sha256::new();
-        hasher.update(data);
-        Self::new(hasher.finish())
-    }
-
-    pub fn digest_to_hex(&self) -> String {
-        crate::tools::digest_to_hex(&self.0)
-    }
-}
-
-#[derive(Endian, Clone, Copy, Debug, Hash)]
-#[repr(C, packed)]
-pub struct ChunkEntry {
-    pub hash: [u8; 32],
-    pub size: u64,
-}
-
-impl ChunkEntry {
-    pub fn new(hash: [u8; 32], size: u64) -> Self {
-        Self { hash, size }
-    }
-
-    pub fn from_hex<T: AsRef<[u8]>>(hex: T, size: u64) -> Result<Self, Error> {
-        Ok(Self::new(
-            crate::tools::parse_hex_digest(hex.as_ref())?,
-            size,
-        ))
-    }
-
-    pub fn len(&self) -> u64 {
-        self.size
-    }
-
-    pub fn from_data(data: &[u8]) -> Self {
-        let mut hasher = openssl::sha::Sha256::new();
-        hasher.update(data);
-        Self::new(hasher.finish(), data.len() as u64)
-    }
-
-    pub fn digest_to_hex(&self) -> String {
-        crate::tools::digest_to_hex(&self.hash)
-    }
-
-    pub fn to_fixed(&self) -> FixedChunk {
-        FixedChunk(self.hash)
-    }
-}
-
-impl PartialEq for ChunkEntry {
-    fn eq(&self, other: &Self) -> bool {
-        self.size == other.size && self.hash == other.hash
-    }
-}
-
-impl Eq for ChunkEntry {}
-
-impl Into<FixedChunk> for ChunkEntry {
-    fn into(self) -> FixedChunk {
-        FixedChunk(self.hash)
-    }
-}
-
-impl Borrow<FixedChunk> for ChunkEntry {
-    fn borrow(&self) -> &FixedChunk {
-        unsafe { std::mem::transmute(&self.hash) }
-    }
-}
-
-impl Borrow<FixedChunk> for [u8; 32] {
-    fn borrow(&self) -> &FixedChunk {
-        unsafe { std::mem::transmute(self) }
-    }
-}
index ce6d59221b349aee94a71196eb991ddd18f5bcae..887e7361339c496242389951777a0f22ee64cb2f 100644 (file)
@@ -134,6 +134,9 @@ pub use checksum_reader::*;
 mod checksum_writer;
 pub use checksum_writer::*;
 
+mod chunker;
+pub use chunker::*;
+
 mod data_chunk;
 pub use data_chunk::*;
 
@@ -155,8 +158,6 @@ pub use chunk_stream::*;
 mod chunk_stat;
 pub use chunk_stat::*;
 
-pub use proxmox_protocol::Chunker;
-
 mod read_chunk;
 pub use read_chunk::*;
 
index a8a77e091ec4796a19a76a7c7ef2d2366867b79f..d542a3289e4a49eb06dd71100a566afa9debfd9a 100644 (file)
@@ -1,10 +1,9 @@
+use bytes::BytesMut;
 use failure::*;
-
-use proxmox_protocol::Chunker;
-use futures::{Async, Poll};
 use futures::stream::Stream;
+use futures::{Async, Poll};
 
-use bytes::BytesMut;
+use super::Chunker;
 
 /// Split input stream into dynamic sized chunks
 pub struct ChunkStream<S> {
diff --git a/src/backup/chunker.rs b/src/backup/chunker.rs
new file mode 100644 (file)
index 0000000..4ef7d0b
--- /dev/null
@@ -0,0 +1,271 @@
+/// Note: window size 32 or 64, is faster because we can
+/// speedup modulo operations, but always computes hash 0
+/// for constant data streams .. 0,0,0,0,0,0
+/// so we use a modified the chunk boundary test too not
+/// use hash value 0 to detect a boundary.
+const CA_CHUNKER_WINDOW_SIZE: usize = 64;
+
+/// Slinding window chunker (Buzhash)
+///
+/// This is a rewrite of *casync* chunker (cachunker.h) in rust.
+///
+/// Hashing by cyclic polynomial (also called Buzhash) has the benefit
+/// of avoiding multiplications, using barrel shifts instead. For more
+/// information please take a look at the [Rolling
+/// Hash](https://en.wikipedia.org/wiki/Rolling_hash) artikel from
+/// wikipedia.
+
+pub struct Chunker {
+    h: u32,
+    window_size: usize,
+    chunk_size: usize,
+
+    chunk_size_min: usize,
+    chunk_size_max: usize,
+    _chunk_size_avg: usize,
+
+    _discriminator: u32,
+
+    break_test_mask: u32,
+    break_test_minimum: u32,
+
+    window: [u8; CA_CHUNKER_WINDOW_SIZE],
+}
+
+const BUZHASH_TABLE: [u32; 256] = [
+    0x458be752, 0xc10748cc, 0xfbbcdbb8, 0x6ded5b68, 0xb10a82b5, 0x20d75648, 0xdfc5665f, 0xa8428801,
+    0x7ebf5191, 0x841135c7, 0x65cc53b3, 0x280a597c, 0x16f60255, 0xc78cbc3e, 0x294415f5, 0xb938d494,
+    0xec85c4e6, 0xb7d33edc, 0xe549b544, 0xfdeda5aa, 0x882bf287, 0x3116737c, 0x05569956, 0xe8cc1f68,
+    0x0806ac5e, 0x22a14443, 0x15297e10, 0x50d090e7, 0x4ba60f6f, 0xefd9f1a7, 0x5c5c885c, 0x82482f93,
+    0x9bfd7c64, 0x0b3e7276, 0xf2688e77, 0x8fad8abc, 0xb0509568, 0xf1ada29f, 0xa53efdfe, 0xcb2b1d00,
+    0xf2a9e986, 0x6463432b, 0x95094051, 0x5a223ad2, 0x9be8401b, 0x61e579cb, 0x1a556a14, 0x5840fdc2,
+    0x9261ddf6, 0xcde002bb, 0x52432bb0, 0xbf17373e, 0x7b7c222f, 0x2955ed16, 0x9f10ca59, 0xe840c4c9,
+    0xccabd806, 0x14543f34, 0x1462417a, 0x0d4a1f9c, 0x087ed925, 0xd7f8f24c, 0x7338c425, 0xcf86c8f5,
+    0xb19165cd, 0x9891c393, 0x325384ac, 0x0308459d, 0x86141d7e, 0xc922116a, 0xe2ffa6b6, 0x53f52aed,
+    0x2cd86197, 0xf5b9f498, 0xbf319c8f, 0xe0411fae, 0x977eb18c, 0xd8770976, 0x9833466a, 0xc674df7f,
+    0x8c297d45, 0x8ca48d26, 0xc49ed8e2, 0x7344f874, 0x556f79c7, 0x6b25eaed, 0xa03e2b42, 0xf68f66a4,
+    0x8e8b09a2, 0xf2e0e62a, 0x0d3a9806, 0x9729e493, 0x8c72b0fc, 0x160b94f6, 0x450e4d3d, 0x7a320e85,
+    0xbef8f0e1, 0x21d73653, 0x4e3d977a, 0x1e7b3929, 0x1cc6c719, 0xbe478d53, 0x8d752809, 0xe6d8c2c6,
+    0x275f0892, 0xc8acc273, 0x4cc21580, 0xecc4a617, 0xf5f7be70, 0xe795248a, 0x375a2fe9, 0x425570b6,
+    0x8898dcf8, 0xdc2d97c4, 0x0106114b, 0x364dc22f, 0x1e0cad1f, 0xbe63803c, 0x5f69fac2, 0x4d5afa6f,
+    0x1bc0dfb5, 0xfb273589, 0x0ea47f7b, 0x3c1c2b50, 0x21b2a932, 0x6b1223fd, 0x2fe706a8, 0xf9bd6ce2,
+    0xa268e64e, 0xe987f486, 0x3eacf563, 0x1ca2018c, 0x65e18228, 0x2207360a, 0x57cf1715, 0x34c37d2b,
+    0x1f8f3cde, 0x93b657cf, 0x31a019fd, 0xe69eb729, 0x8bca7b9b, 0x4c9d5bed, 0x277ebeaf, 0xe0d8f8ae,
+    0xd150821c, 0x31381871, 0xafc3f1b0, 0x927db328, 0xe95effac, 0x305a47bd, 0x426ba35b, 0x1233af3f,
+    0x686a5b83, 0x50e072e5, 0xd9d3bb2a, 0x8befc475, 0x487f0de6, 0xc88dff89, 0xbd664d5e, 0x971b5d18,
+    0x63b14847, 0xd7d3c1ce, 0x7f583cf3, 0x72cbcb09, 0xc0d0a81c, 0x7fa3429b, 0xe9158a1b, 0x225ea19a,
+    0xd8ca9ea3, 0xc763b282, 0xbb0c6341, 0x020b8293, 0xd4cd299d, 0x58cfa7f8, 0x91b4ee53, 0x37e4d140,
+    0x95ec764c, 0x30f76b06, 0x5ee68d24, 0x679c8661, 0xa41979c2, 0xf2b61284, 0x4fac1475, 0x0adb49f9,
+    0x19727a23, 0x15a7e374, 0xc43a18d5, 0x3fb1aa73, 0x342fc615, 0x924c0793, 0xbee2d7f0, 0x8a279de9,
+    0x4aa2d70c, 0xe24dd37f, 0xbe862c0b, 0x177c22c2, 0x5388e5ee, 0xcd8a7510, 0xf901b4fd, 0xdbc13dbc,
+    0x6c0bae5b, 0x64efe8c7, 0x48b02079, 0x80331a49, 0xca3d8ae6, 0xf3546190, 0xfed7108b, 0xc49b941b,
+    0x32baf4a9, 0xeb833a4a, 0x88a3f1a5, 0x3a91ce0a, 0x3cc27da1, 0x7112e684, 0x4a3096b1, 0x3794574c,
+    0xa3c8b6f3, 0x1d213941, 0x6e0a2e00, 0x233479f1, 0x0f4cd82f, 0x6093edd2, 0x5d7d209e, 0x464fe319,
+    0xd4dcac9e, 0x0db845cb, 0xfb5e4bc3, 0xe0256ce1, 0x09fb4ed1, 0x0914be1e, 0xa5bdb2c3, 0xc6eb57bb,
+    0x30320350, 0x3f397e91, 0xa67791bc, 0x86bc0e2c, 0xefa0a7e2, 0xe9ff7543, 0xe733612c, 0xd185897b,
+    0x329e5388, 0x91dd236b, 0x2ecb0d93, 0xf4d82a3d, 0x35b5c03f, 0xe4e606f0, 0x05b21843, 0x37b45964,
+    0x5eff22f4, 0x6027f4cc, 0x77178b3c, 0xae507131, 0x7bf7cabc, 0xf9c18d66, 0x593ade65, 0xd95ddf11,
+];
+
+impl Chunker {
+    /// Create a new Chunker instance, which produces and average
+    /// chunk size of `chunk_size_avg` (need to be a power of two). We
+    /// allow variation from `chunk_size_avg/4` up to a maximum of
+    /// `chunk_size_avg*4`.
+    pub fn new(chunk_size_avg: usize) -> Self {
+        // The chunk cut discriminator. In order to get an average
+        // chunk size of avg, we cut whenever for a hash value "h" at
+        // byte "i" given the descriminator "d(avg)": h(i) mod d(avg)
+        // == d(avg) - 1. Note that the discriminator calculated like
+        // this only yields correct results as long as the minimal
+        // chunk size is picked as avg/4, and the maximum chunk size
+        // as avg*4. If they are picked differently the result might
+        // be skewed into either direction.
+        let avg = chunk_size_avg as f64;
+        let discriminator = (avg / (-1.42888852e-7 * avg + 1.33237515)) as u32;
+
+        if chunk_size_avg.count_ones() != 1 {
+            panic!("got unexpected chunk size - not a power of two.");
+        }
+
+        let break_test_mask = (chunk_size_avg * 2 - 1) as u32;
+        let break_test_minimum = break_test_mask - 2;
+
+        Self {
+            h: 0,
+            window_size: 0,
+            chunk_size: 0,
+            chunk_size_min: chunk_size_avg >> 2,
+            chunk_size_max: chunk_size_avg << 2,
+            _chunk_size_avg: chunk_size_avg,
+            _discriminator: discriminator,
+            break_test_mask: break_test_mask,
+            break_test_minimum: break_test_minimum,
+            window: [0u8; CA_CHUNKER_WINDOW_SIZE],
+        }
+    }
+
+    /// Scans the specified data for a chunk border. Returns 0 if none
+    /// was found (and the function should be called with more data
+    /// later on), or another value indicating the position of a
+    /// border.
+    pub fn scan(&mut self, data: &[u8]) -> usize {
+        let window_len = self.window.len();
+        let data_len = data.len();
+
+        let mut pos = 0;
+
+        if self.window_size < window_len {
+            let need = window_len - self.window_size;
+            let copy_len = if need < data_len { need } else { data_len };
+
+            for _i in 0..copy_len {
+                let byte = data[pos];
+                self.window[self.window_size] = byte;
+                self.h = self.h.rotate_left(1) ^ BUZHASH_TABLE[byte as usize];
+                pos += 1;
+                self.window_size += 1;
+            }
+
+            self.chunk_size += copy_len;
+
+            // return if window is still not full
+            if self.window_size < window_len {
+                return 0;
+            }
+        }
+
+        //let mut idx = self.chunk_size % CA_CHUNKER_WINDOW_SIZE;
+        let mut idx = self.chunk_size & 0x3f;
+
+        while pos < data_len {
+            // roll window
+            let enter = data[pos];
+            let leave = self.window[idx];
+            self.h = self.h.rotate_left(1) ^
+                //BUZHASH_TABLE[leave as usize].rotate_left(CA_CHUNKER_WINDOW_SIZE as u32) ^
+                BUZHASH_TABLE[leave as usize] ^
+                BUZHASH_TABLE[enter as usize];
+
+            self.chunk_size += 1;
+            pos += 1;
+
+            self.window[idx] = enter;
+
+            if self.shall_break() {
+                self.h = 0;
+                self.chunk_size = 0;
+                self.window_size = 0;
+                return pos;
+            }
+
+            //idx = self.chunk_size % CA_CHUNKER_WINDOW_SIZE;
+            idx = self.chunk_size & 0x3f;
+            //idx += 1; if idx >= CA_CHUNKER_WINDOW_SIZE { idx = 0 };
+        }
+
+        0
+    }
+
+    // fast implementation avoiding modulo
+    // #[inline(always)]
+    fn shall_break(&self) -> bool {
+        if self.chunk_size >= self.chunk_size_max {
+            return true;
+        }
+
+        if self.chunk_size < self.chunk_size_min {
+            return false;
+        }
+
+        //(self.h & 0x1ffff) <= 2 //THIS IS SLOW!!!
+
+        //(self.h & self.break_test_mask) <= 2 // Bad on 0 streams
+
+        (self.h & self.break_test_mask) >= self.break_test_minimum
+    }
+
+    // This is the original implementation from casync
+    /*
+    #[inline(always)]
+    fn shall_break_orig(&self) -> bool {
+
+        if self.chunk_size >= self.chunk_size_max { return true; }
+
+        if self.chunk_size < self.chunk_size_min { return false; }
+
+        (self.h % self.discriminator) == (self.discriminator - 1)
+    }
+     */
+}
+
+#[test]
+fn test_chunker1() {
+    let mut buffer = Vec::new();
+
+    for i in 0..(256 * 1024) {
+        for j in 0..4 {
+            let byte = ((i >> (j << 3)) & 0xff) as u8;
+            buffer.push(byte);
+        }
+    }
+    let mut chunker = Chunker::new(64 * 1024);
+
+    let mut pos = 0;
+    let mut last = 0;
+
+    let mut chunks1: Vec<(usize, usize)> = vec![];
+    let mut chunks2: Vec<(usize, usize)> = vec![];
+
+    // test1: feed single bytes
+    while pos < buffer.len() {
+        let k = chunker.scan(&buffer[pos..pos + 1]);
+        pos += 1;
+        if k != 0 {
+            let prev = last;
+            last = pos;
+            chunks1.push((prev, pos - prev));
+        }
+    }
+    chunks1.push((last, buffer.len() - last));
+
+    let mut chunker = Chunker::new(64 * 1024);
+
+    let mut pos = 0;
+
+    // test2: feed with whole buffer
+    while pos < buffer.len() {
+        let k = chunker.scan(&buffer[pos..]);
+        if k != 0 {
+            chunks2.push((pos, k));
+            pos += k;
+        } else {
+            break;
+        }
+    }
+
+    chunks2.push((pos, buffer.len() - pos));
+
+    if chunks1 != chunks2 {
+        let mut size1 = 0;
+        for (_offset, len) in &chunks1 {
+            size1 += len;
+        }
+        println!("Chunks1:{}\n{:?}\n", size1, chunks1);
+
+        let mut size2 = 0;
+        for (_offset, len) in &chunks2 {
+            size2 += len;
+        }
+        println!("Chunks2:{}\n{:?}\n", size2, chunks2);
+
+        if size1 != 256 * 4 * 1024 {
+            panic!("wrong size for chunks1");
+        }
+        if size2 != 256 * 4 * 1024 {
+            panic!("wrong size for chunks2");
+        }
+
+        panic!("got different chunks");
+    }
+}
index 6103bdd119633b4be0ac5a7b4ca9c6faaea39b53..90c8b511876e7517f947d5bcd20f86d109e148b2 100644 (file)
@@ -10,12 +10,12 @@ use uuid::Uuid;
 
 use proxmox::tools::io::ReadExt;
 use proxmox::tools::vec;
-use proxmox_protocol::Chunker;
 
+use super::Chunker;
+use super::IndexFile;
 use super::chunk_stat::ChunkStat;
 use super::chunk_store::ChunkStore;
 use super::read_chunk::ReadChunk;
-use super::IndexFile;
 use super::{DataChunk, DataChunkBuilder};
 use crate::tools;