[dependencies]
proxmox = { git = "ssh://gitolite3@proxdev.maurer-it.com/rust/proxmox", version = "0.1" }
-proxmox-protocol = { path = "proxmox-protocol" }
log = "0.4"
syslog = "4.0"
failure = "0.1"
+++ /dev/null
-[package]
-name = "proxmox-protocol"
-version = "0.1.0"
-authors = [
- "Dietmar Maurer <dietmar@proxmox.com>",
- "Wolfgang Bumiller <w.bumiller@proxmox.com>",
-]
-edition = "2018"
-
-[lib]
-crate-type = ['lib', 'cdylib']
-
-[dependencies]
-chrono = "0.4"
-endian_trait = { version = "0.6", features = ["arrays"] }
-errno = "0.2"
-failure = "0.1"
-libc = "0.2"
-openssl = "0.10"
-serde_json = "1.0"
-url = "1.7"
+++ /dev/null
-#pragma once
-
-#include <stddef.h>
-#include <stdint.h>
-#include <stdbool.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-typedef int64_t proxmox_backup_read_cb(void *opaque, void *buffer, uint64_t size);
-typedef int64_t proxmox_backup_write_cb(void *opaque, const void *buffer, uint64_t size);
-typedef void proxmox_backup_drop_cb(void *opaque);
-
-typedef struct ProxmoxBackup ProxmoxBackup;
-
-extern ProxmoxBackup *proxmox_backup_new(
- void *opaque,
- proxmox_backup_read_cb *read_cb,
- proxmox_backup_write_cb *write_cb,
- proxmox_backup_drop_cb *drop_cb);
-
-extern void proxmox_backup_done(ProxmoxBackup *self);
-
-extern void proxmox_backup_clear_err(ProxmoxBackup *self);
-extern const char* proxmox_backup_get_error(const ProxmoxBackup *self);
-
-extern bool proxmox_backup_is_eof(const ProxmoxBackup *self);
-
-extern int proxmox_backup_wait_for_handshake(ProxmoxBackup *self);
-
-extern int proxmox_backup_query_hashes(ProxmoxBackup *self, const char *file_name);
-extern int proxmox_backup_wait_for_hashes(ProxmoxBackup *self);
-
-extern bool proxmox_backup_is_chunk_available(ProxmoxBackup *self, const void *digest);
-extern int proxmox_backup_upload_chunk(
- ProxmoxBackup *self,
- const void *digest,
- const void *data,
- uint64_t size);
-extern int proxmox_backup_continue_upload(ProxmoxBackup *self);
-
-extern int proxmox_backup_poll_read(ProxmoxBackup *self);
-extern int proxmox_backup_poll_send(ProxmoxBackup *self);
-
-extern int proxmox_backup_wait_for_id(ProxmoxBackup *self, int id);
-extern int proxmox_backup_discard_id(ProxmoxBackup *self, int id);
-
-extern int proxmox_backup_create(
- ProxmoxBackup *self,
- bool dynamic,
- const char *backup_type,
- const char *backup_id,
- int64_t time_epoch,
- const char *file_name,
- size_t chunk_size,
- int64_t file_size,
- bool is_new);
-
-extern int proxmox_backup_dynamic_data(
- ProxmoxBackup *self,
- int stream,
- const void *digest,
- uint64_t size);
-
-extern int proxmox_backup_fixed_data(
- ProxmoxBackup *self,
- int stream,
- size_t index,
- const void *digest);
-
-extern int proxmox_backup_finish_backup(
- ProxmoxBackup *self,
- int stream,
- char **remote_path);
-
-typedef struct ProxmoxChunker ProxmoxChunker;
-extern ProxmoxChunker *proxmox_chunker_new(uint64_t chunk_size_avg);
-extern void proxmox_chunker_done(ProxmoxChunker *self);
-extern uint64_t proxmox_chunker_scan(ProxmoxChunker *self, const void *data, size_t size);
-
-extern void proxmox_chunk_digest(const void *data, size_t size, uint8_t (*digest)[32]);
-
-typedef struct ProxmoxConnector ProxmoxConnector;
-extern ProxmoxConnector *proxmox_connector_new(
- const char *user,
- const char *server,
- const char *store);
-extern void proxmox_connector_drop(ProxmoxConnector *self);
-extern int proxmox_connector_set_password(ProxmoxConnector *self, const char *password);
-extern int proxmox_connector_set_ticket(
- ProxmoxConnector *self,
- const char *ticket,
- const char *token);
-extern void proxmox_connector_set_certificate_validation(ProxmoxConnector *self, bool on);
-extern ProxmoxBackup *proxmox_connector_connect(ProxmoxConnector *self);
-
-#ifdef __cplusplus
-}
-#endif
+++ /dev/null
-//! C API for the Chunker.
-
-use std::os::raw::c_void;
-
-use libc::size_t;
-
-use crate::Chunker;
-
-/// Creates a new chunker instance.
-#[no_mangle]
-pub extern "C" fn proxmox_chunker_new(chunk_size_avg: size_t) -> *mut Chunker {
- Box::leak(Box::new(Chunker::new(chunk_size_avg as usize)))
-}
-
-/// Drops an instance of a chunker. The pointer must be valid or `NULL`.
-#[no_mangle]
-pub extern "C" fn proxmox_chunker_done(me: *mut Chunker) {
- if !me.is_null() {
- unsafe {
- Box::from_raw(me);
- }
- }
-}
-
-/// Scan the specified data for a chunk border. Returns 0 if none was found, or a positive offset
-/// to a border.
-#[no_mangle]
-pub extern "C" fn proxmox_chunker_scan(
- me: *mut Chunker,
- data: *const c_void,
- size: size_t,
-) -> size_t {
- let me = unsafe { &mut *me };
- me.scan(unsafe { std::slice::from_raw_parts(data as *const u8, size as usize) }) as size_t
-}
-
-/// Compute a chunk digest. This is mostly a convenience method to avoid having to lookup the right
-/// digest method for your language of choice.
-#[no_mangle]
-pub extern "C" fn proxmox_chunk_digest(
- data: *const c_void,
- size: size_t,
- out_digest: *mut [u8; 32],
-) {
- let digest = crate::FixedChunk::from_data(unsafe {
- std::slice::from_raw_parts(data as *const u8, size as usize)
- });
- unsafe { *out_digest = digest.0 };
-}
+++ /dev/null
-//! For the C API we need to provide a `Client` compatible with C. In rust `Client` takes a
-//! `T: Read + Write`, so we need to provide a way for C to provide callbacks to us to
-//! implement this.
-
-use std::ffi::{CStr, CString};
-use std::io::{self, Read, Write};
-use std::os::raw::{c_char, c_int, c_void};
-
-use failure::{bail, format_err, Error};
-use libc::size_t;
-
-/// Read callback. The first parameter is the `opaque` parameter passed to `proxmox_backup_new`,
-/// the rest are the usual read function parameters. This should return the number of bytes
-/// actually read, zero on EOF, or a negative `errno` value on error (eg. `-EAGAIN`).
-pub type ReadFn = extern "C" fn(opaque: *mut c_void, buf: *mut u8, size: u64) -> i64;
-
-/// Write callback. The first parameter is the `opaque` parameter passed to `proxmox_backup_new`,
-/// the rest are the usual write function parameters. This should return the number of bytes
-/// actually written, or a negative `errno` value on error (eg. `-EAGAIN`).
-pub type WriteFn = extern "C" fn(opaque: *mut c_void, buf: *const u8, size: u64) -> i64;
-
-/// Optional drop callback. This is called when the Client gets destroyed and allows freeing
-/// resources associated with the opaque object behind the C API socket.
-pub type DropFn = extern "C" fn(opaque: *mut c_void);
-
-/// Stores the external C callbacks for communicating with the protocol socket.
-pub struct CApiSocket {
- opaque: *mut c_void,
- read: ReadFn,
- write: WriteFn,
- drop: Option<DropFn>,
-}
-
-impl CApiSocket {
- fn from_io<T: Read + Write>(stream: T) -> Self {
- let opaque = Box::leak(Box::new(stream));
- Self {
- opaque: opaque as *mut T as _,
- read: c_read_fn::<T>,
- write: c_write_fn::<T>,
- drop: Some(c_drop_fn::<T>),
- }
- }
-}
-
-/// A client instance using C callbacks for reading from and writing to the protocol socket.
-pub struct CClient {
- client: crate::Client<CApiSocket>,
- error: Option<CString>,
- upload: Option<(*const u8, usize)>,
-}
-
-impl CClient {
- fn set_error(&mut self, err: Error) -> c_int {
- self.error = Some(match CString::new(err.to_string()) {
- Ok(cs) => cs,
- Err(_) => CString::new("<bad bytes in error string>").unwrap(),
- });
- -1
- }
-
- #[inline(always)]
- fn bool_result(&mut self, res: Result<bool, Error>) -> c_int {
- match res {
- Ok(false) => 0,
- Ok(true) => 1,
- Err(e) => self.set_error(e),
- }
- }
-
- #[inline(always)]
- fn bool_call<F>(&mut self, func: F) -> c_int
- where
- F: FnOnce(&mut crate::Client<CApiSocket>) -> Result<bool, Error>,
- {
- let res = func(&mut self.client);
- self.bool_result(res)
- }
-
- #[inline(always)]
- fn int_call<F>(&mut self, func: F) -> c_int
- where
- F: FnOnce(&mut crate::Client<CApiSocket>) -> Result<c_int, Error>,
- {
- match func(&mut self.client) {
- Ok(v) => v,
- Err(e) => self.set_error(e.into()),
- }
- }
-}
-
-impl Read for CApiSocket {
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- let rc = (self.read)(self.opaque, buf.as_mut_ptr(), buf.len() as u64);
- if rc < 0 {
- Err(io::Error::from_raw_os_error((-rc) as i32))
- } else {
- Ok(rc as usize)
- }
- }
-}
-
-impl Write for CApiSocket {
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- let rc = (self.write)(self.opaque, buf.as_ptr(), buf.len() as u64);
- if rc < 0 {
- Err(io::Error::from_raw_os_error((-rc) as i32))
- } else {
- Ok(rc as usize)
- }
- }
-
- fn flush(&mut self) -> io::Result<()> {
- Ok(())
- }
-}
-
-impl Drop for CApiSocket {
- fn drop(&mut self) {
- if let Some(drop) = self.drop {
- drop(self.opaque);
- }
- }
-}
-
-extern "C" fn c_read_fn<T: Read>(opaque: *mut c_void, buf: *mut u8, size: u64) -> i64 {
- let stream = unsafe { &mut *(opaque as *mut T) };
- let buf = unsafe { std::slice::from_raw_parts_mut(buf, size as usize) };
-
- match stream.read(buf) {
- Ok(size) => size as i64,
- Err(err) => {
- match err.raw_os_error() {
- Some(err) => -(err as i64),
- None => {
- eprintln!("error reading from stream: {}", err);
- -libc::EIO as i64
- }
- }
- },
- }
-}
-
-extern "C" fn c_write_fn<T: Write>(opaque: *mut c_void, buf: *const u8, size: u64) -> i64 {
- let stream = unsafe { &mut *(opaque as *mut T) };
- let buf = unsafe { std::slice::from_raw_parts(buf, size as usize) };
-
- match stream.write(buf) {
- Ok(size) => size as i64,
- Err(err) => {
- match err.raw_os_error() {
- Some(err) => -(err as i64),
- None => {
- eprintln!("error writing to stream: {}", err);
- -libc::EIO as i64
- }
- }
- },
- }
-}
-
-extern "C" fn c_drop_fn<T>(opaque: *mut c_void) {
- unsafe {
- Box::from_raw(opaque as *mut T);
- }
-}
-
-pub(crate) fn make_c_compatible_client<T: Read + Write>(stream: T) -> crate::Client<CApiSocket> {
- crate::Client::new(CApiSocket::from_io(stream))
-}
-
-pub(crate) fn make_c_client(client: crate::Client<CApiSocket>) -> *mut CClient {
- Box::leak(Box::new(CClient {
- client,
- error: None,
- upload: None,
- }))
-}
-
-/// Creates a new instance of a backup protocol client.
-///
-/// # Arguments
-///
-/// * `opaque` - An opaque pointer passed to the two provided callback methods.
-/// * `read` - The read callback.
-/// * `write` - The write callback.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_new(
- opaque: *mut c_void,
- read: ReadFn,
- write: WriteFn,
- drop: Option<DropFn>,
-) -> *mut CClient {
- make_c_client(crate::Client::new(CApiSocket {
- opaque,
- read,
- write,
- drop,
- }))
-}
-
-/// Drops an instance of a backup protocol client. The pointer must be valid or `NULL`.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_done(me: *mut CClient) {
- if !me.is_null() {
- unsafe {
- Box::from_raw(me);
- }
- }
-}
-
-/// Returns a C String describing the last error or `NULL` if there was none.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_get_error(me: *const CClient) -> *const c_char {
- let me = unsafe { &*me };
- match me.error {
- Some(ref e) => e.as_ptr(),
- None => std::ptr::null(),
- }
-}
-
-/// Returns true if the `read` callback had previously returned `EOF`.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_is_eof(me: *const CClient) -> bool {
- let me = unsafe { &*me };
- me.client.eof()
-}
-
-/// The data polling methods usually pass errors from the callbacks through to the original caller.
-/// Since the protocol needs to be non-blocking-IO safe and therefore able to resumine at any point
-/// where `-EAGAIN` can be returned by the callbacks, it is up to the caller which errors are to be
-/// considered fatal, but any error returned by callbacks which is not `-EAGAIN` will result in an
-/// internal error flag to be set which has to be cleared before trying to resume normal
-/// operations.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_clear_err(me: *mut CClient) {
- let me = unsafe { &mut *me };
- me.client.clear_err();
- me.error = None;
-}
-
-/// Polls for data and checks whether the protocol handshake has been made successfully.
-/// Returns `1` if the handshake was successful, `0` if it is not yet complete or `-1` on error.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_wait_for_handshake(me: *mut CClient) -> c_int {
- let me = unsafe { &mut *me };
- me.bool_call(move |c| c.wait_for_handshake())
-}
-
-fn check_string(s: *const c_char) -> Result<&'static str, Error> {
- if s.is_null() {
- bail!("NULL string");
- }
- Ok(std::str::from_utf8(unsafe {
- CStr::from_ptr(s).to_bytes()
- })?)
-}
-
-/// Request the list of hashes for a backup file in order to prevent duplicates from being sent to
-/// the server. This simply causes an internal list to be filled. Only one such operation can be
-/// performed simultaneously. To wait for its completion see `proxmox_backup_wait_for_hashes`.
-///
-/// If the file name is `NULL` or not a valid UTF-8 string, this function returns an error without
-/// putting the protocol handler in an error state.
-///
-/// Returns `0` on success, `-1` otherwise.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_query_hashes(me: *mut CClient, file_name: *const c_char) -> c_int {
- let me = unsafe { &mut *me };
-
- me.int_call(move |client| {
- let file_name = check_string(file_name)?;
- client.query_hashes(file_name)?;
- Ok(0)
- })
-}
-
-/// If there is an ongoing hash list request, this will poll the data stream.
-///
-/// Returns `1` if the transfer is complete (or there was no transfer to begin with), `0` if it is
-/// incomplete, or `-1` if an error occurred.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_wait_for_hashes(me: *mut CClient) -> c_int {
- let me = unsafe { &mut *me };
- me.bool_call(move |c| c.wait_for_hashes())
-}
-
-/// Check if a chunk of the provided digest is known to the this client instance. Note that this
-/// does not query the server for this information, and is only useful after a call to
-/// `proxmox_backup_query_hashes` or after uploading something.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_is_chunk_available(me: *const CClient, digest: *const u8) -> bool {
- let me = unsafe { &*me };
- let digest = unsafe { &*(digest as *const [u8; 32]) };
- me.client.is_chunk_available(digest)
-}
-
-/// Begin uploading a chunk to the server. This attempts to upload the data right away, but if the
-/// writer may fail due to non-blocking I/O in which case the `proxmox_backup_continue_upload`
-/// function must be used.
-///
-/// Returns `0` if the upload is incomplete, a positive ID if the upload was completed immediately,
-/// or `-1` on error.
-///
-/// The ID returned on success can be used to wait for the server to acknowledge that the chunk has
-/// been written successfully. Use `proxmox_backup_wait_for_id` to do this. If confirmation is not
-/// required, the ID should be released via `proxmox_backup_discard_id`.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_upload_chunk(
- me: *mut CClient,
- digest: *const u8,
- data: *const u8,
- size: u64,
-) -> c_int {
- let me = unsafe { &mut *me };
- let digest: &[u8; 32] = unsafe { &*(digest as *const [u8; 32]) };
- let size = size as usize;
- let slice: &[u8] = unsafe { std::slice::from_raw_parts(data, size) };
- match me.client.upload_chunk(digest, slice) {
- Ok(Some(id)) => id.0 as c_int,
- Ok(None) => {
- me.upload = Some((data, size));
- 0
- }
- Err(e) => me.set_error(e),
- }
-}
-
-/// If an upload did not finish immediately (`proxmox_backup_upload_chunk` returned `0`), this
-/// function must be used to retry sending the rest of the data.
-///
-/// Returns `0` if the upload is incomplete, a positive ID if the upload was completed immediately,
-/// or `-1` on error.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_continue_upload(me: *mut CClient) -> c_int {
- let me = unsafe { &mut *me };
- match me.upload {
- Some((data, len)) => {
- let slice: &[u8] = unsafe { std::slice::from_raw_parts(data, len) };
- match me.client.continue_upload_chunk(slice) {
- Ok(Some(id)) => id.0 as c_int,
- Ok(None) => 0,
- Err(e) => me.set_error(e),
- }
- }
- None => me.set_error(format_err!("no upload currently running")),
- }
-}
-
-/// Run the main receive loop. Returns `0` on success, `-1` on error.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_poll_read(me: *mut CClient) -> c_int {
- let me = unsafe { &mut *me };
- match me.client.poll_read(false) {
- Ok(_) => 0,
- Err(e) => me.set_error(e),
- }
-}
-
-/// Run the main send loop. If the `write` callback returned `-EAGAIN`, during an operation, the
-/// protocol handler keeps the data to be sent in a write queue. This function will attempt to
-/// continue writing out the remaining data. See individual function descriptions for when this is
-/// necessary.
-///
-/// Returns `1` if the queue is now empty, `0` if there is still data in the queue, or `-1` on
-/// error.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_poll_send(me: *mut CClient) -> c_int {
- let me = unsafe { &mut *me };
- me.bool_call(move |c| Ok(c.poll_send()?.unwrap_or(true)))
-}
-
-/// Run the main receive loop and check for confirmation of a stream with the specified ID.
-///
-/// Returns `1` if the transaction was confirmed, `0` if not, or `-1` on error.
-///
-/// Note that once this function returned `1` for an ID, the id is considered to be free for
-/// recycling and should not be used for further calls.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_wait_for_id(me: *mut CClient, id: c_int) -> c_int {
- let me = unsafe { &mut *me };
- me.bool_call(move |c| c.wait_for_id(crate::StreamId(id as u8)))
-}
-
-/// Notifies the protocol handler that we do not bother waiting for confirmation of an ID. The ID
-/// may immediately be recycled for future transactions, thus the user should not use it for any
-/// further function calls.
-///
-/// Returns `0` on success, `-1` on error.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_discard_id(me: *mut CClient, id: c_int) -> c_int {
- let me = unsafe { &mut *me };
- match me.client.discard_id(crate::StreamId(id as u8)) {
- Ok(_) => 0,
- Err(e) => me.set_error(e),
- }
-}
-
-/// Create a new backup. The returned ID should be waited upon via `proxmox_backup_wait_for_id`,
-/// which returns true once the server confirmed the creation of the backup.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_create(
- me: *mut CClient,
- dynamic: bool,
- backup_type: *const c_char, // "host", "ct", "vm"
- backup_id: *const c_char,
- time_epoch: i64,
- file_name: *const c_char,
- chunk_size: size_t,
- file_size: i64,
- is_new: bool,
-) -> c_int {
- let me = unsafe { &mut *me };
- me.int_call(move |client| {
- let index_type = match dynamic {
- false => crate::IndexType::Fixed,
- _ => crate::IndexType::Dynamic,
- };
-
- let backup_type = check_string(backup_type)?;
- let backup_id = check_string(backup_id)?;
- let file_name = check_string(file_name)?;
-
- Ok(client
- .create_backup(
- index_type,
- backup_type,
- backup_id,
- time_epoch,
- file_name,
- chunk_size as _,
- if file_size < 0 {
- None
- } else {
- Some(file_size as u64)
- },
- is_new,
- )?
- .0 as c_int)
- })
-}
-
-/// Send a dynamic chunk entry.
-///
-/// If the entry was sent out successfully this returns `1`. If the `write` callback returned
-/// `-EAGAIN` this returns `0` and the data is queued, after which `proxmox_backup_poll_send`
-/// should be used to continue sending the data.
-/// On error `-1` is returned.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_dynamic_data(
- me: *mut CClient,
- stream: c_int,
- digest: *const [u8; 32],
- size: u64,
-) -> c_int {
- let me = unsafe { &mut *me };
- me.bool_call(move |client| {
- client.dynamic_data(crate::BackupStream(stream as u8), unsafe { &*digest }, size)
- })
-}
-
-/// Send a fixed chunk entry.
-///
-/// If the entry was sent out successfully this returns `1`. If the `write` callback returned
-/// `-EAGAIN` this returns `0` and the data is queued, after which `proxmox_backup_poll_send`
-/// should be used to continue sending the data.
-/// On error `-1` is returned.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_fixed_data(
- me: *mut CClient,
- stream: c_int,
- index: size_t,
- digest: *const [u8; 32],
-) -> c_int {
- let me = unsafe { &mut *me };
- me.bool_call(move |client| {
- client.fixed_data(crate::BackupStream(stream as u8), index as usize, unsafe {
- &*digest
- })
- })
-}
-
-/// Finish a running backup.
-///
-/// Tells the server that the backup is supposed to be considered complete. If the request could be
-/// sent out entirely `1` is returned. If the underlying socket is non-blocking and the packet
-/// wasn't finished `0` is returned, after which `proxmox_backup_poll_send` should be used.
-///
-/// Once the request was sent out successfully, the client should wait for acknowledgement by the
-/// remote server via `proxmox_backup_wait_for_id`, passing the backup stream ID as parameter.
-///
-/// Finally, if the client wishes to know the exact name the server stored the file under, the
-/// `remote_path` parameter can be non-`NULL` to receive a string containing the file name, which
-/// must be freed by the caller!
-///
-/// Returns: `1` on success, possibly `0` for non-blocking I/O, `-1` on error.
-#[no_mangle]
-pub extern "C" fn proxmox_backup_finish_backup(
- me: *mut CClient,
- stream: c_int,
- remote_path: *mut *mut c_char,
-) -> c_int {
- let me = unsafe { &mut *me };
- me.int_call(move |client| {
- let (path, ack) = client.finish_backup(crate::BackupStream(stream as u8))?;
-
- if !remote_path.is_null() {
- // would be shorter with the unstable map_or_else
- let cstr = CString::new(path)
- .map(|cs| cs.into_raw())
- .unwrap_or(std::ptr::null_mut());
- unsafe {
- *remote_path = cstr;
- }
- }
-
- Ok(if ack { 1 } else { 0 })
- })
-}
+++ /dev/null
-//! C API for the `Connector`.
-
-use std::ffi::CStr;
-use std::os::raw::{c_char, c_int};
-
-use crate::Connector;
-
-#[inline(always)]
-fn with_errno<T>(err: c_int, value: T) -> T {
- errno::set_errno(errno::Errno(err));
- value
-}
-
-#[inline]
-fn checkstr(ptr: *const c_char) -> Option<String> {
- if ptr.is_null() {
- return None;
- }
-
- let cstr = unsafe { CStr::from_ptr(ptr) };
- match cstr.to_str() {
- Ok(s) => Some(s.to_string()),
- Err(_) => None,
- }
-}
-
-#[inline(always)]
-fn wrap_buildcall<F>(me: *mut Connector, func: F)
-where
- F: FnOnce(Connector) -> Connector,
-{
- let me = unsafe { &mut *me };
- let moved_me = std::mem::replace(me, unsafe { std::mem::uninitialized() });
- std::mem::forget(std::mem::replace(me, func(moved_me)));
-}
-
-/// Create a connector object.
-///
-/// Returns a valid pointer or `NULL` on error, with `errno` set.
-///
-/// Errors:
-/// * `EINVAL`: a required parameter was `NULL` or contained invalid bytes.
-#[no_mangle]
-pub extern "C" fn proxmox_connector_new(
- user: *const c_char,
- server: *const c_char,
- store: *const c_char,
-) -> *mut Connector {
- let (user, server, store) = match (checkstr(user), checkstr(server), checkstr(store)) {
- (Some(user), Some(server), Some(store)) => (user, server, store),
- _ => return with_errno(libc::EINVAL, std::ptr::null_mut()),
- };
-
- Box::leak(Box::new(Connector::new(user, server, store)))
-}
-
-/// If a connector is not required anymore and has not been used up via a call to
-/// `proxmox_connector_connect`, this can be used to free the associated resources.
-#[no_mangle]
-pub extern "C" fn proxmox_connector_drop(me: *mut Connector) {
- unsafe { Box::from_raw(me) };
-}
-
-/// Use a password
-///
-/// Returns `0` on success, a negative `errno` value on error.
-///
-/// Errors:
-/// * `EINVAL`: a required parameter was `NULL` or contained invalid bytes.
-#[no_mangle]
-pub extern "C" fn proxmox_connector_set_password(
- me: *mut Connector,
- password: *const c_char,
-) -> c_int {
- let password = match checkstr(password) {
- Some(pw) => pw,
- _ => return -libc::EINVAL,
- };
-
- wrap_buildcall(me, move |me| me.password(password));
-
- 0
-}
-
-/// Use an existing ticket.
-///
-/// Returns `0` on success, a negative `errno` value on error.
-///
-/// Errors:
-/// * `EINVAL`: a required parameter was `NULL` or contained invalid bytes.
-#[no_mangle]
-pub extern "C" fn proxmox_connector_set_ticket(
- me: *mut Connector,
- ticket: *const c_char,
- token: *const c_char,
-) -> c_int {
- let (ticket, token) = match (checkstr(ticket), checkstr(token)) {
- (Some(ticket), Some(token)) => (ticket, token),
- _ => return -libc::EINVAL,
- };
-
- wrap_buildcall(me, move |me| me.ticket(ticket, token));
-
- 0
-}
-
-/// Change whether certificate validation should be used on the connector.
-#[no_mangle]
-pub extern "C" fn proxmox_connector_set_certificate_validation(me: *mut Connector, on: bool) {
- let me = unsafe { &mut *me };
-
- wrap_buildcall(me, move |me| me.certificate_validation(on));
-}
-
-/// Initiate the connection. This consumes the Connector, invalidating the pointer to it!
-///
-/// Returns a `ProxmoxBackup*`, or `NULL` on error.
-#[no_mangle]
-pub extern "C" fn proxmox_connector_connect(
- me: *mut Connector,
-) -> *mut crate::c_client::CClient {
- let boxed = unsafe { Box::from_raw(me) };
- let me = *boxed;
- match me.do_connect() {
- Ok(stream) => {
- let mut client = crate::c_client::make_c_compatible_client(stream);
- match client.wait_for_handshake() {
- Ok(true) => crate::c_client::make_c_client(client),
- Ok(false) => {
- // This is a synchronous blocking connection, so this should be impossible:
- eprintln!("proxmox backup protocol error handshake did not complete?");
- std::ptr::null_mut()
- }
- Err(err) => {
- eprintln!("error during handshake with backup server: {}", err);
- std::ptr::null_mut()
- }
- }
- }
- Err(err) => {
- eprintln!("error connecting to backup server: {}", err);
- std::ptr::null_mut()
- }
- }
-}
+++ /dev/null
-use std::io::Read;
-
-use failure::Error;
-
-use crate::Chunker;
-
-pub struct ChunkStream<T: Read> {
- input: T,
- buffer: Vec<u8>,
- fill: usize,
- pos: usize,
- keep: bool,
- eof: bool,
- chunker: Chunker,
-}
-
-impl<T: Read> ChunkStream<T> {
- pub fn new(input: T) -> Self {
- Self {
- input,
- buffer: Vec::new(),
- fill: 0,
- pos: 0,
- keep: false,
- eof: false,
- chunker: Chunker::new(4 * 1024 * 1024),
- }
- }
-
- pub fn stream(&mut self) -> &mut Self {
- self
- }
-
- fn fill_buf(&mut self) -> Result<bool, Error> {
- if self.fill == self.buffer.len() {
- let mut more = self.buffer.len(); // just double it
- if more == 0 {
- more = 1024 * 1024; // at the start, make a 1M buffer
- }
- // we need more data:
- self.buffer.reserve(more);
- unsafe {
- self.buffer.set_len(self.buffer.capacity());
- }
- }
-
- match self.input.read(&mut self.buffer[self.fill..]) {
- Ok(more) => {
- if more == 0 {
- self.eof = true;
- }
- self.fill += more;
- Ok(true)
- }
- Err(err) => {
- if err.kind() == std::io::ErrorKind::WouldBlock {
- Ok(false)
- } else {
- Err(err.into())
- }
- }
- }
- }
-
- fn consume(&mut self) {
- assert!(self.fill >= self.pos);
-
- let remaining = self.fill - self.pos;
- unsafe {
- std::ptr::copy_nonoverlapping(
- &self.buffer[self.pos] as *const u8,
- self.buffer.as_mut_ptr(),
- remaining,
- );
- }
- self.fill = remaining;
- self.pos = 0;
- }
-
- pub fn next(&mut self) {
- self.keep = false;
- }
-
- // This crate should not depend on the futures create, so we use another Option instead of
- // Async<T>.
- pub fn get(&mut self) -> Result<Option<Option<&[u8]>>, Error> {
- if self.keep {
- return Ok(Some(Some(&self.buffer[0..self.pos])));
- }
-
- if self.eof {
- return Ok(Some(None));
- }
-
- if self.pos != 0 {
- self.consume();
- }
-
- loop {
- match self.fill_buf() {
- Ok(true) => (),
- Ok(false) => return Ok(None),
- Err(err) => return Err(err),
- }
-
- // Note that if we hit EOF we hit a hard boundary...
- let boundary = self.chunker.scan(&self.buffer[self.pos..self.fill]);
- if boundary == 0 && !self.eof {
- self.pos = self.fill;
- continue;
- }
-
- self.pos += boundary;
- self.keep = true;
- return Ok(Some(Some(&self.buffer[0..self.pos])));
- }
- }
-}
+++ /dev/null
-
-/// Note: window size 32 or 64, is faster because we can
-/// speedup modulo operations, but always computes hash 0
-/// for constant data streams .. 0,0,0,0,0,0
-/// so we use a modified the chunk boundary test too not
-/// use hash value 0 to detect a boundary.
-const CA_CHUNKER_WINDOW_SIZE: usize = 64;
-
-/// Slinding window chunker (Buzhash)
-///
-/// This is a rewrite of *casync* chunker (cachunker.h) in rust.
-///
-/// Hashing by cyclic polynomial (also called Buzhash) has the benefit
-/// of avoiding multiplications, using barrel shifts instead. For more
-/// information please take a look at the [Rolling
-/// Hash](https://en.wikipedia.org/wiki/Rolling_hash) artikel from
-/// wikipedia.
-
-pub struct Chunker {
- h: u32,
- window_size: usize,
- chunk_size: usize,
-
- chunk_size_min: usize,
- chunk_size_max: usize,
- _chunk_size_avg: usize,
-
- _discriminator: u32,
-
- break_test_mask: u32,
- break_test_minimum: u32,
-
- window: [u8; CA_CHUNKER_WINDOW_SIZE],
-}
-
-const BUZHASH_TABLE: [u32; 256] = [
- 0x458be752, 0xc10748cc, 0xfbbcdbb8, 0x6ded5b68,
- 0xb10a82b5, 0x20d75648, 0xdfc5665f, 0xa8428801,
- 0x7ebf5191, 0x841135c7, 0x65cc53b3, 0x280a597c,
- 0x16f60255, 0xc78cbc3e, 0x294415f5, 0xb938d494,
- 0xec85c4e6, 0xb7d33edc, 0xe549b544, 0xfdeda5aa,
- 0x882bf287, 0x3116737c, 0x05569956, 0xe8cc1f68,
- 0x0806ac5e, 0x22a14443, 0x15297e10, 0x50d090e7,
- 0x4ba60f6f, 0xefd9f1a7, 0x5c5c885c, 0x82482f93,
- 0x9bfd7c64, 0x0b3e7276, 0xf2688e77, 0x8fad8abc,
- 0xb0509568, 0xf1ada29f, 0xa53efdfe, 0xcb2b1d00,
- 0xf2a9e986, 0x6463432b, 0x95094051, 0x5a223ad2,
- 0x9be8401b, 0x61e579cb, 0x1a556a14, 0x5840fdc2,
- 0x9261ddf6, 0xcde002bb, 0x52432bb0, 0xbf17373e,
- 0x7b7c222f, 0x2955ed16, 0x9f10ca59, 0xe840c4c9,
- 0xccabd806, 0x14543f34, 0x1462417a, 0x0d4a1f9c,
- 0x087ed925, 0xd7f8f24c, 0x7338c425, 0xcf86c8f5,
- 0xb19165cd, 0x9891c393, 0x325384ac, 0x0308459d,
- 0x86141d7e, 0xc922116a, 0xe2ffa6b6, 0x53f52aed,
- 0x2cd86197, 0xf5b9f498, 0xbf319c8f, 0xe0411fae,
- 0x977eb18c, 0xd8770976, 0x9833466a, 0xc674df7f,
- 0x8c297d45, 0x8ca48d26, 0xc49ed8e2, 0x7344f874,
- 0x556f79c7, 0x6b25eaed, 0xa03e2b42, 0xf68f66a4,
- 0x8e8b09a2, 0xf2e0e62a, 0x0d3a9806, 0x9729e493,
- 0x8c72b0fc, 0x160b94f6, 0x450e4d3d, 0x7a320e85,
- 0xbef8f0e1, 0x21d73653, 0x4e3d977a, 0x1e7b3929,
- 0x1cc6c719, 0xbe478d53, 0x8d752809, 0xe6d8c2c6,
- 0x275f0892, 0xc8acc273, 0x4cc21580, 0xecc4a617,
- 0xf5f7be70, 0xe795248a, 0x375a2fe9, 0x425570b6,
- 0x8898dcf8, 0xdc2d97c4, 0x0106114b, 0x364dc22f,
- 0x1e0cad1f, 0xbe63803c, 0x5f69fac2, 0x4d5afa6f,
- 0x1bc0dfb5, 0xfb273589, 0x0ea47f7b, 0x3c1c2b50,
- 0x21b2a932, 0x6b1223fd, 0x2fe706a8, 0xf9bd6ce2,
- 0xa268e64e, 0xe987f486, 0x3eacf563, 0x1ca2018c,
- 0x65e18228, 0x2207360a, 0x57cf1715, 0x34c37d2b,
- 0x1f8f3cde, 0x93b657cf, 0x31a019fd, 0xe69eb729,
- 0x8bca7b9b, 0x4c9d5bed, 0x277ebeaf, 0xe0d8f8ae,
- 0xd150821c, 0x31381871, 0xafc3f1b0, 0x927db328,
- 0xe95effac, 0x305a47bd, 0x426ba35b, 0x1233af3f,
- 0x686a5b83, 0x50e072e5, 0xd9d3bb2a, 0x8befc475,
- 0x487f0de6, 0xc88dff89, 0xbd664d5e, 0x971b5d18,
- 0x63b14847, 0xd7d3c1ce, 0x7f583cf3, 0x72cbcb09,
- 0xc0d0a81c, 0x7fa3429b, 0xe9158a1b, 0x225ea19a,
- 0xd8ca9ea3, 0xc763b282, 0xbb0c6341, 0x020b8293,
- 0xd4cd299d, 0x58cfa7f8, 0x91b4ee53, 0x37e4d140,
- 0x95ec764c, 0x30f76b06, 0x5ee68d24, 0x679c8661,
- 0xa41979c2, 0xf2b61284, 0x4fac1475, 0x0adb49f9,
- 0x19727a23, 0x15a7e374, 0xc43a18d5, 0x3fb1aa73,
- 0x342fc615, 0x924c0793, 0xbee2d7f0, 0x8a279de9,
- 0x4aa2d70c, 0xe24dd37f, 0xbe862c0b, 0x177c22c2,
- 0x5388e5ee, 0xcd8a7510, 0xf901b4fd, 0xdbc13dbc,
- 0x6c0bae5b, 0x64efe8c7, 0x48b02079, 0x80331a49,
- 0xca3d8ae6, 0xf3546190, 0xfed7108b, 0xc49b941b,
- 0x32baf4a9, 0xeb833a4a, 0x88a3f1a5, 0x3a91ce0a,
- 0x3cc27da1, 0x7112e684, 0x4a3096b1, 0x3794574c,
- 0xa3c8b6f3, 0x1d213941, 0x6e0a2e00, 0x233479f1,
- 0x0f4cd82f, 0x6093edd2, 0x5d7d209e, 0x464fe319,
- 0xd4dcac9e, 0x0db845cb, 0xfb5e4bc3, 0xe0256ce1,
- 0x09fb4ed1, 0x0914be1e, 0xa5bdb2c3, 0xc6eb57bb,
- 0x30320350, 0x3f397e91, 0xa67791bc, 0x86bc0e2c,
- 0xefa0a7e2, 0xe9ff7543, 0xe733612c, 0xd185897b,
- 0x329e5388, 0x91dd236b, 0x2ecb0d93, 0xf4d82a3d,
- 0x35b5c03f, 0xe4e606f0, 0x05b21843, 0x37b45964,
- 0x5eff22f4, 0x6027f4cc, 0x77178b3c, 0xae507131,
- 0x7bf7cabc, 0xf9c18d66, 0x593ade65, 0xd95ddf11,
-];
-
-impl Chunker {
-
- /// Create a new Chunker instance, which produces and average
- /// chunk size of `chunk_size_avg` (need to be a power of two). We
- /// allow variation from `chunk_size_avg/4` up to a maximum of
- /// `chunk_size_avg*4`.
- pub fn new(chunk_size_avg: usize) -> Self {
- // The chunk cut discriminator. In order to get an average
- // chunk size of avg, we cut whenever for a hash value "h" at
- // byte "i" given the descriminator "d(avg)": h(i) mod d(avg)
- // == d(avg) - 1. Note that the discriminator calculated like
- // this only yields correct results as long as the minimal
- // chunk size is picked as avg/4, and the maximum chunk size
- // as avg*4. If they are picked differently the result might
- // be skewed into either direction.
- let avg = chunk_size_avg as f64;
- let discriminator = (avg / (-1.42888852e-7 * avg + 1.33237515)) as u32;
-
- if chunk_size_avg.count_ones() != 1 {
- panic!("got unexpected chunk size - not a power of two.");
- }
-
- let break_test_mask = (chunk_size_avg*2 - 1) as u32;
- let break_test_minimum = break_test_mask - 2;
-
- Self {
- h: 0,
- window_size: 0,
- chunk_size: 0,
- chunk_size_min: chunk_size_avg>>2,
- chunk_size_max: chunk_size_avg<<2,
- _chunk_size_avg: chunk_size_avg,
- _discriminator: discriminator,
- break_test_mask: break_test_mask,
- break_test_minimum: break_test_minimum,
- window: [0u8; CA_CHUNKER_WINDOW_SIZE],
- }
- }
-
- /// Scans the specified data for a chunk border. Returns 0 if none
- /// was found (and the function should be called with more data
- /// later on), or another value indicating the position of a
- /// border.
- pub fn scan(&mut self, data: &[u8]) -> usize {
-
- let window_len = self.window.len();
- let data_len = data.len();
-
- let mut pos = 0;
-
- if self.window_size < window_len {
- let need = window_len - self.window_size;
- let copy_len = if need < data_len { need } else { data_len };
-
- for _i in 0..copy_len {
- let byte = data[pos];
- self.window[self.window_size] = byte;
- self.h = self.h.rotate_left(1) ^ BUZHASH_TABLE[byte as usize];
- pos += 1;
- self.window_size += 1;
- }
-
- self.chunk_size += copy_len;
-
- // return if window is still not full
- if self.window_size < window_len {
- return 0;
- }
- }
-
- //let mut idx = self.chunk_size % CA_CHUNKER_WINDOW_SIZE;
- let mut idx = self.chunk_size & 0x3f;
-
- while pos < data_len {
- // roll window
- let enter = data[pos];
- let leave = self.window[idx];
- self.h = self.h.rotate_left(1) ^
- //BUZHASH_TABLE[leave as usize].rotate_left(CA_CHUNKER_WINDOW_SIZE as u32) ^
- BUZHASH_TABLE[leave as usize] ^
- BUZHASH_TABLE[enter as usize];
-
- self.chunk_size += 1;
- pos += 1;
-
- self.window[idx] = enter;
-
- if self.shall_break() {
- self.h = 0;
- self.chunk_size = 0;
- self.window_size = 0;
- return pos;
- }
-
- //idx = self.chunk_size % CA_CHUNKER_WINDOW_SIZE;
- idx = self.chunk_size & 0x3f;
- //idx += 1; if idx >= CA_CHUNKER_WINDOW_SIZE { idx = 0 };
- }
-
- 0
- }
-
- // fast implementation avoiding modulo
- // #[inline(always)]
- fn shall_break(&self) -> bool {
-
- if self.chunk_size >= self.chunk_size_max { return true; }
-
- if self.chunk_size < self.chunk_size_min { return false; }
-
- //(self.h & 0x1ffff) <= 2 //THIS IS SLOW!!!
-
- //(self.h & self.break_test_mask) <= 2 // Bad on 0 streams
-
- (self.h & self.break_test_mask) >= self.break_test_minimum
- }
-
- // This is the original implementation from casync
- /*
- #[inline(always)]
- fn shall_break_orig(&self) -> bool {
-
- if self.chunk_size >= self.chunk_size_max { return true; }
-
- if self.chunk_size < self.chunk_size_min { return false; }
-
- (self.h % self.discriminator) == (self.discriminator - 1)
- }
- */
-}
-
-#[test]
-fn test_chunker1() {
-
- let mut buffer = Vec::new();
-
- for i in 0..256*1024 {
- for j in 0..4 {
- let byte = ((i >> (j<<3))&0xff) as u8;
- buffer.push(byte);
- }
- }
- let mut chunker = Chunker::new(64*1024);
-
- let mut pos = 0;
- let mut last = 0;
-
- let mut chunks1: Vec<(usize, usize)> = vec![];
- let mut chunks2: Vec<(usize, usize)> = vec![];
-
- // test1: feed single bytes
- while pos < buffer.len() {
- let k = chunker.scan(&buffer[pos..pos+1]);
- pos += 1;
- if k != 0 {
- let prev = last;
- last = pos;
- chunks1.push((prev, pos-prev));
- }
- }
- chunks1.push((last, buffer.len() - last));
-
- let mut chunker = Chunker::new(64*1024);
-
- let mut pos = 0;
-
- // test2: feed with whole buffer
- while pos < buffer.len() {
- let k = chunker.scan(&buffer[pos..]);
- if k != 0 {
- chunks2.push((pos, k));
- pos += k;
- } else {
- break;
- }
- }
-
- chunks2.push((pos, buffer.len() - pos));
-
- if chunks1 != chunks2 {
-
- let mut size1 = 0;
- for (_offset, len) in &chunks1 {
- size1 += len;
- }
- println!("Chunks1:{}\n{:?}\n", size1, chunks1);
-
- let mut size2 = 0;
- for (_offset, len) in &chunks2 {
- size2 += len;
- }
- println!("Chunks2:{}\n{:?}\n", size2, chunks2);
-
- if size1 != 256*4*1024 {
- panic!("wrong size for chunks1");
- }
- if size2 != 256*4*1024 {
- panic!("wrong size for chunks2");
- }
-
- panic!("got different chunks");
- }
-
-}
+++ /dev/null
-use std::borrow::Borrow;
-use std::collections::hash_map;
-use std::collections::{HashMap, HashSet};
-use std::io::{Read, Write};
-use std::mem;
-use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
-
-use endian_trait::Endian;
-use failure::*;
-
-use crate::common;
-use crate::protocol::*;
-use crate::tools::swapped_data_to_buf;
-use crate::{ChunkEntry, FixedChunk, IndexType};
-
-#[derive(Clone, Copy, Eq, PartialEq)]
-#[repr(transparent)]
-pub struct BackupStream(pub(crate) u8);
-
-#[derive(Clone, Copy, Eq, PartialEq)]
-#[repr(transparent)]
-pub struct StreamId(pub(crate) u8);
-
-impl From<BackupStream> for StreamId {
- fn from(v: BackupStream) -> Self {
- Self(v.0)
- }
-}
-
-struct BackupStreamData {
- id: u8,
- index_type: IndexType,
- pos: u64,
- path: Option<String>,
-}
-
-pub enum AckState {
- Waiting, // no ack received yet.
- Received, // already received an ack, but the user hasn't seen it yet.
- Ignore, // client doesn't care.
- AwaitingData, // waiting for something other than an 'Ok' packet
-}
-
-pub struct Client<S>
-where
- S: Read + Write,
-{
- chunks: RwLock<HashSet<FixedChunk>>,
- common: common::Connection<S>,
- handshake_done: bool,
-
- cur_id: u8,
- free_ids: Vec<u8>,
- waiting_ids: HashMap<u8, AckState>,
- hash_download: Option<u8>,
-
- upload_chunk: Option<FixedChunk>,
- upload_id: u8,
- upload_pos: usize,
- upload_state: u8,
-
- streams: HashMap<u8, BackupStreamData>,
-}
-
-type Result<T> = std::result::Result<T, Error>;
-
-impl<S> Client<S>
-where
- S: Read + Write,
-{
- pub fn new(socket: S) -> Self {
- Self {
- chunks: RwLock::new(HashSet::new()),
- common: common::Connection::new(socket),
- handshake_done: false,
-
- cur_id: 1,
- free_ids: Vec::new(),
- waiting_ids: HashMap::new(),
- hash_download: None,
-
- upload_state: 0,
- upload_pos: 0,
- upload_id: 0,
- upload_chunk: None,
-
- streams: HashMap::new(),
- }
- }
-
- pub fn eof(&self) -> bool {
- self.common.eof
- }
-
- pub fn error(&self) -> bool {
- self.common.error
- }
-
- /// It is safe to clear the error after an `io::ErrorKind::Interrupted`.
- pub fn clear_err(&mut self) {
- self.common.clear_err()
- }
-
- pub fn wait_for_handshake(&mut self) -> Result<bool> {
- if !self.handshake_done {
- self.poll_read(true)?;
- }
- Ok(self.handshake_done)
- }
-
- pub fn query_hashes(&mut self, file_name: &str) -> Result<()> {
- if self.hash_download.is_some() {
- bail!("hash query already in progress");
- }
-
- let id = self.next_id()?;
- let mut packet = Packet::builder(id, PacketType::GetHashList);
- packet
- .write_data(client::GetHashList {
- name_length: file_name.len() as u16,
- })
- .write_buf(file_name.as_bytes());
- self.common.queue_data(packet.finish())?;
- self.hash_download = Some(id);
- Ok(())
- }
-
- pub fn wait_for_hashes(&mut self) -> Result<bool> {
- while self.hash_download.is_some() {
- if !self.poll_read(true)? {
- break;
- }
- }
- Ok(self.hash_download.is_none())
- }
-
- fn chunk_read_lock(&self) -> Result<RwLockReadGuard<HashSet<FixedChunk>>> {
- self.chunks
- .read()
- .map_err(|_| format_err!("lock poisoned, disconnecting client..."))
- }
-
- pub fn is_chunk_available<T: Borrow<FixedChunk>>(&self, chunk: &T) -> bool {
- self.chunk_read_lock().unwrap().contains(chunk.borrow())
- }
-
- /// Attempts to upload a chunk. Returns an error state only on fatal errors. If the underlying
- /// writer returns a `WouldBlock` error this returns `None` and `continue_upload_chunk` has to
- /// be called until the chunk is uploaded completely. During this time no other operations
- /// should be performed on the this object!
- /// See `continue_upload_chunk()` for a description of the returned value.
- pub fn upload_chunk<T>(&mut self, info: &T, data: &[u8]) -> Result<Option<StreamId>>
- where
- T: Borrow<FixedChunk>,
- {
- if self.upload_chunk.is_some() {
- bail!("cannot concurrently upload multiple chunks");
- }
-
- self.upload_id = self.next_id()?;
- self.upload_chunk = Some(info.borrow().clone());
- self.next_upload_state(0);
- self.continue_upload_chunk(data)
- }
-
- fn next_upload_state(&mut self, state: u8) {
- self.upload_state = state;
- self.upload_pos = 0;
- }
-
- // This is split into a static method not borrowing self so the buffer can point into self...
- fn do_upload_write(
- writer: &mut common::Connection<S>,
- buf: &[u8],
- ) -> Result<Option<(usize, bool)>> {
- match writer.write_some(buf) {
- Ok(put) => Ok(Some((put, put == buf.len()))),
- Err(e) => {
- if e.kind() == std::io::ErrorKind::WouldBlock {
- Ok(None)
- } else {
- Err(e.into())
- }
- }
- }
- }
-
- fn after_upload_write(&mut self, put: Option<(usize, bool)>, next_state: u8) -> bool {
- match put {
- None => return false,
- Some((put, done)) => {
- if done {
- self.next_upload_state(next_state);
- } else {
- self.upload_pos += put;
- }
- done
- }
- }
- }
-
- fn upload_write(&mut self, buf: &[u8], next_state: u8) -> Result<bool> {
- let wrote = Self::do_upload_write(&mut self.common, buf)?;
- Ok(self.after_upload_write(wrote, next_state))
- }
-
- /// If an `upload_chunk()` call returned `Ok(false)` this needs to be used to complete the
- /// upload process as the chunk may have already been partially written to the socket.
- /// This function will return `Ok(false)` on `WouldBlock` errors just like `upload_chunk()`
- /// will, after which the caller should wait for the writer to become write-ready and then
- /// call this method again.
- /// Once the complete chunk packet has been written to the underlying socket, this returns a
- /// packet ID which can be waited upon for completion
- pub fn continue_upload_chunk(&mut self, data: &[u8]) -> Result<Option<StreamId>> {
- loop {
- match self.upload_state {
- // Writing the packet header:
- 0 => {
- let len = mem::size_of::<Packet>()
- + mem::size_of::<client::UploadChunk>()
- + data.len();
- let packet = Packet {
- id: self.upload_id,
- pkttype: PacketType::UploadChunk as _,
- length: len as _,
- }
- .to_le();
- let buf = unsafe { swapped_data_to_buf(&packet) };
- if !self.upload_write(&buf[self.upload_pos..], 1)? {
- return Ok(None);
- }
- }
- // Writing the hash:
- 1 => {
- let chunk = self.upload_chunk.as_ref().unwrap();
- let buf = &chunk.0[self.upload_pos..];
- let wrote = Self::do_upload_write(&mut self.common, buf)?;
- if !self.after_upload_write(wrote, 2) {
- return Ok(None);
- }
- }
- // Writing the data:
- 2 => {
- if !self.upload_write(&data[self.upload_pos..], 3)? {
- return Ok(None);
- }
- }
- // Done:
- 3 => {
- self.upload_chunk = None;
- self.expect_ok_for_id(self.upload_id);
- return Ok(Some(StreamId(self.upload_id)));
- }
- n => bail!("bad chunk upload state: {}", n),
- }
- }
- }
-
- // generic data polling method, returns true if at least one packet was received
- pub fn poll_read(&mut self, one: bool) -> Result<bool> {
- if self.common.eof {
- // polls after EOF are errors:
- bail!("server disconnected");
- }
-
- if !self.common.poll_read()? {
- // On the client side we do not expect a server-side disconnect, so error out!
- if self.common.eof {
- bail!("server disconnected");
- }
- return Ok(false);
- }
-
- loop {
- match self.common.current_packet_type {
- PacketType::Ok => self.recv_ok()?,
- PacketType::Hello => self.recv_hello()?,
- PacketType::HashListPart => self.recv_hash_list()?,
- PacketType::BackupCreated => self.backup_created()?,
- _ => bail!(
- "server sent an unexpected packet of type {}",
- self.common.current_packet_type as u32,
- ),
- }
- self.common.next()?;
- if one || !self.common.poll_read()? {
- break;
- }
- }
- Ok(true)
- }
-
- // None => nothing was queued
- // Some(true) => queue finished
- // Some(false) => queue not finished
- pub fn poll_send(&mut self) -> Result<Option<bool>> {
- self.common.poll_send()
- }
-
- // private helpermethods
-
- fn next_id(&mut self) -> Result<u8> {
- if let Some(id) = self.free_ids.pop() {
- return Ok(id);
- }
- if self.cur_id < 0xff {
- self.cur_id += 1;
- return Ok(self.cur_id - 1);
- }
- bail!("too many concurrent transactions");
- }
-
- fn free_id(&mut self, id: u8) {
- self.free_ids.push(id);
- }
-
- fn expect_ok_for_id(&mut self, id: u8) {
- self.waiting_ids.insert(id, AckState::Waiting);
- }
-
- fn expect_response_for_id(&mut self, id: u8) {
- self.waiting_ids.insert(id, AckState::AwaitingData);
- }
-
- // Methods handling received packets:
-
- fn recv_hello(&mut self) -> Result<()> {
- let hello = self.common.read_unaligned_data::<server::Hello>(0)?;
-
- if hello.magic != server::HELLO_MAGIC {
- bail!("received an invalid hello packet");
- }
-
- let ver = hello.version;
- if ver != server::PROTOCOL_VERSION {
- bail!(
- "hello packet contained incompatible protocol version: {} (!= {})",
- ver,
- server::PROTOCOL_VERSION,
- );
- }
-
- self.handshake_done = true;
- self.free_id(0);
- Ok(())
- }
-
- fn chunk_write_lock(&self) -> Result<RwLockWriteGuard<HashSet<FixedChunk>>> {
- self.chunks
- .write()
- .map_err(|_| format_err!("lock poisoned, disconnecting client..."))
- }
-
- fn recv_hash_list(&mut self) -> Result<()> {
- let data = self.common.packet_data();
- if data.len() == 0 {
- // No more hashes, we're done
- self.hash_download = None;
- return Ok(());
- }
-
- if (data.len() % mem::size_of::<FixedChunk>()) != 0 {
- bail!("hash list contains invalid size");
- }
-
- let chunk_list: &[FixedChunk] = unsafe {
- std::slice::from_raw_parts(
- data.as_ptr() as *const FixedChunk,
- data.len() / mem::size_of::<FixedChunk>(),
- )
- };
-
- let mut my_chunks = self.chunk_write_lock()?;
- for c in chunk_list {
- eprintln!("Got chunk '{}'", c.digest_to_hex());
- my_chunks.insert(c.clone());
- }
-
- Ok(())
- }
-
- fn ack_id(&mut self, id: u8, data_packet: bool) -> Result<()> {
- use hash_map::Entry::*;
-
- match self.waiting_ids.entry(id) {
- Vacant(_) => bail!("received unexpected packet for transaction id {}", id),
- Occupied(mut entry) => match entry.get() {
- AckState::Ignore => {
- entry.remove();
- }
- AckState::Received => bail!("duplicate Ack received for transaction id {}", id),
- AckState::Waiting => {
- if data_packet {
- bail!("received data packet while expecting simple Ok for {}", id);
- }
- *entry.get_mut() = AckState::Received;
- }
- AckState::AwaitingData => {
- if !data_packet {
- bail!(
- "received empty Ok while waiting for data on stream id {}",
- id
- );
- }
- *entry.get_mut() = AckState::Received;
- }
- },
- }
- Ok(())
- }
-
- fn recv_ok(&mut self) -> Result<()> {
- self.ack_id(self.common.current_packet.id, false)
- }
-
- pub fn wait_for_id(&mut self, id: StreamId) -> Result<bool> {
- if !self.waiting_ids.contains_key(&id.0) {
- bail!("wait_for_id() called on unexpected id {}", id.0);
- }
-
- loop {
- if !self.poll_read(true)? {
- return Ok(false);
- }
-
- use hash_map::Entry::*;
- match self.waiting_ids.entry(id.0) {
- Vacant(_) => return Ok(true),
- Occupied(entry) => match entry.get() {
- AckState::Received => {
- entry.remove();
- return Ok(true);
- }
- _ => continue,
- },
- }
- }
- }
-
- pub fn discard_id(&mut self, id: StreamId) -> Result<()> {
- use hash_map::Entry::*;
- match self.waiting_ids.entry(id.0) {
- Vacant(_) => bail!("discard_id called with unknown id {}", id.0),
- Occupied(mut entry) => match entry.get() {
- AckState::Ignore => (),
- AckState::Received => {
- entry.remove();
- }
- AckState::Waiting | AckState::AwaitingData => {
- *entry.get_mut() = AckState::Ignore;
- }
- },
- }
- Ok(())
- }
-
- pub fn create_backup(
- &mut self,
- index_type: IndexType,
- backup_type: &str,
- id: &str,
- timestamp: i64,
- file_name: &str,
- chunk_size: usize,
- file_size: Option<u64>,
- is_new: bool,
- ) -> Result<BackupStream> {
- let backup_type = backup_type::name_to_id(backup_type)?;
-
- if id.len() > 0xff {
- bail!("id too long");
- }
-
- if file_name.len() > 0xff {
- bail!("file name too long");
- }
-
- let mut flags: backup_flags::Type = 0;
- if is_new {
- flags |= backup_flags::EXCL;
- }
- if index_type == IndexType::Dynamic {
- flags |= backup_flags::DYNAMIC_CHUNKS;
- if file_size.is_some() {
- bail!("file size must be None on dynamic backup streams");
- }
- } else if file_size.is_none() {
- bail!("file size is mandatory for fixed backup streams");
- }
-
- let packet_id = self.next_id()?;
- let mut packet = Packet::builder(packet_id, PacketType::CreateBackup);
- packet
- .write_data(client::CreateBackup {
- backup_type,
- id_length: id.len() as _,
- timestamp: timestamp as u64,
- flags,
- name_length: file_name.len() as _,
- chunk_size: chunk_size as _,
- file_size: file_size.unwrap_or(0) as u64,
- })
- .write_buf(id.as_bytes())
- .write_buf(file_name.as_bytes());
-
- self.streams.insert(
- packet_id,
- BackupStreamData {
- id: packet_id,
- index_type,
- pos: 0,
- path: None,
- },
- );
-
- self.expect_response_for_id(packet_id);
- self.common.queue_data(packet.finish())?;
- Ok(BackupStream(packet_id))
- }
-
- fn backup_created(&mut self) -> Result<()> {
- let info = self
- .common
- .read_unaligned_data::<server::BackupCreated>(0)?;
- let data = &self.common.packet_data()[mem::size_of_val(&info)..];
- if data.len() != info.path_length as usize {
- bail!("backup-created packet has invalid length");
- }
- let name = std::str::from_utf8(data)?;
- let pkt_id = self.common.current_packet.id;
- self.streams
- .get_mut(&pkt_id)
- .ok_or_else(|| format_err!("BackupCreated response for invalid stream: {}", pkt_id))?
- .path = Some(name.to_string());
- self.ack_id(pkt_id, true)?;
- Ok(())
- }
-
- pub fn dynamic_chunk(&mut self, stream: BackupStream, entry: &ChunkEntry) -> Result<bool> {
- self.dynamic_data(stream, &entry.hash, entry.size)
- }
-
- pub fn dynamic_data<T: Borrow<FixedChunk>>(
- &mut self,
- stream: BackupStream,
- digest: &T,
- size: u64,
- ) -> Result<bool> {
- let data = self
- .streams
- .get_mut(&stream.0)
- .ok_or_else(|| format_err!("no such active backup stream"))?;
-
- if data.index_type != IndexType::Dynamic {
- bail!("dynamic_data called for stream of static chunks");
- }
-
- let mut packet = Packet::builder(data.id, PacketType::BackupDataDynamic);
- packet
- .write_data(data.pos as u64)
- .write_buf(&digest.borrow().0);
- data.pos += size;
-
- self.common.queue_data(packet.finish())
- }
-
- pub fn fixed_data<T: Borrow<FixedChunk>>(
- &mut self,
- stream: BackupStream,
- index: usize,
- digest: &T,
- ) -> Result<bool> {
- let data = self
- .streams
- .get_mut(&stream.0)
- .ok_or_else(|| format_err!("no such active backup stream"))?;
-
- if data.index_type != IndexType::Fixed {
- bail!("fixed_data called for stream of dynamic chunks");
- }
-
- let mut packet = Packet::builder(data.id, PacketType::BackupDataFixed);
- packet
- .write_data(index as u64)
- .write_buf(&digest.borrow().0);
-
- self.common.queue_data(packet.finish())
- }
-
- pub fn finish_backup(&mut self, stream: BackupStream) -> Result<(String, bool)> {
- let path = self
- .streams
- .remove(&stream.0)
- .ok_or_else(|| format_err!("no such active backup stream"))?
- .path
- .unwrap_or_else(|| "<no remote name received>".to_string());
- let done = self
- .common
- .queue_data(Packet::simple(stream.0, PacketType::BackupFinished))?;
- self.expect_ok_for_id(stream.0);
- Ok((path, done))
- }
-}
+++ /dev/null
-use std::io::{self, Read, Write};
-use std::mem;
-use std::ptr;
-
-use failure::*;
-
-use endian_trait::Endian;
-
-use crate::protocol::*;
-
-type Result<T> = std::result::Result<T, Error>;
-
-pub(crate) struct Connection<S>
-where
- S: Read + Write,
-{
- socket: S,
- pub buffer: Vec<u8>,
- pub current_packet: Packet,
- pub current_packet_type: PacketType,
- pub error: bool,
- pub eof: bool,
- upload_queue: Option<(Vec<u8>, usize)>,
-}
-
-impl<S> Connection<S>
-where
- S: Read + Write,
-{
- pub fn new(socket: S) -> Self {
- Self {
- socket,
- buffer: Vec::new(),
- current_packet: unsafe { mem::zeroed() },
- current_packet_type: PacketType::Error,
- error: false,
- eof: false,
- upload_queue: None,
- }
- }
-
- pub fn write_some(&mut self, buf: &[u8]) -> std::io::Result<usize> {
- self.socket.write(buf)
- }
-
- /// It is safe to clear the error after an `io::ErrorKind::Interrupted`.
- pub fn clear_err(&mut self) {
- self.error = false;
- }
-
- // None => nothing was queued
- // Some(true) => queue finished
- // Some(false) => queue not finished
- pub fn poll_send(&mut self) -> Result<Option<bool>> {
- if let Some((ref data, ref mut pos)) = self.upload_queue {
- loop {
- match self.socket.write(&data[*pos..]) {
- Ok(put) => {
- *pos += put;
- if *pos == data.len() {
- self.upload_queue = None;
- return Ok(Some(true));
- }
- // Keep writing
- continue;
- }
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- return Ok(Some(false));
- }
- Err(e) => return Err(e.into()),
- }
- }
- } else {
- Ok(None)
- }
- }
-
- // Returns true when the data was also sent out, false if the queue is now full.
- // For now we only allow a single dataset to be queued at once.
- pub fn queue_data(&mut self, buf: Vec<u8>) -> Result<bool> {
- if self.upload_queue.is_some() {
- bail!("upload queue clash");
- }
-
- self.upload_queue = Some((buf, 0));
-
- match self.poll_send()? {
- None => unreachable!(), // We literally just set self.upload_queue to Some(value)
- Some(v) => Ok(v),
- }
- }
-
- // Returns 'true' if there's data available, 'false' if there isn't (if the
- // underlying reader returned `WouldBlock` or the `read()` was short).
- // Other errors are propagated.
- pub fn poll_read(&mut self) -> Result<bool> {
- if self.eof {
- return Ok(false);
- }
-
- if self.error {
- eprintln!("refusing to read from a client in error state");
- bail!("client is in error state");
- }
-
- match self.poll_data_do() {
- Ok(has_packet) => Ok(has_packet),
- Err(e) => {
- self.error = true;
- Err(e)
- }
- }
- }
-
- fn poll_data_do(&mut self) -> Result<bool> {
- if !self.read_packet()? {
- return Ok(false);
- }
-
- if self.current_packet.length > MAX_PACKET_SIZE {
- bail!("client tried to send a huge packet");
- }
-
- if !self.fill_packet()? {
- return Ok(false);
- }
-
- Ok(true)
- }
-
- pub fn packet_length(&self) -> usize {
- self.current_packet.length as usize
- }
-
- pub fn packet_data(&self) -> &[u8] {
- let beg = mem::size_of::<Packet>();
- let end = self.packet_length();
- &self.buffer[beg..end]
- }
-
- pub fn next(&mut self) -> Result<()> {
- let pktlen = self.packet_length();
- unsafe {
- if self.buffer.len() != pktlen {
- std::ptr::copy_nonoverlapping(
- &self.buffer[pktlen],
- &mut self.buffer[0],
- self.buffer.len() - pktlen,
- );
- }
- self.buffer.set_len(self.buffer.len() - pktlen);
- }
- Ok(())
- }
-
- // NOTE: After calling this you must `self.buffer.set_len()` when done!
- #[must_use]
- fn buffer_set_min_size(&mut self, size: usize) -> usize {
- if self.buffer.capacity() < size {
- self.buffer.reserve(size - self.buffer.len());
- }
- let start = self.buffer.len();
- unsafe {
- self.buffer.set_len(size);
- }
- start
- }
-
- fn fill_buffer(&mut self, size: usize) -> Result<bool> {
- if self.buffer.len() >= size {
- return Ok(true);
- }
- let mut filled = self.buffer_set_min_size(size);
- loop {
- // We don't use read_exact to not block too long or busy-read on nonblocking sockets...
- match self.socket.read(&mut self.buffer[filled..]) {
- Ok(got) => {
- if got == 0 {
- self.eof = true;
- unsafe {
- self.buffer.set_len(filled);
- }
- return Ok(false);
- }
- filled += got;
- if filled >= size {
- unsafe {
- self.buffer.set_len(filled);
- }
- return Ok(true);
- }
- // reloop
- }
- Err(e) => {
- unsafe {
- self.buffer.set_len(filled);
- }
- return Err(e.into());
- }
- }
- }
- }
-
- fn read_packet_do(&mut self) -> Result<bool> {
- if !self.fill_buffer(mem::size_of::<Packet>())? {
- return Ok(false);
- }
-
- self.current_packet = self.read_unaligned::<Packet>(0)?.from_le();
-
- self.current_packet_type = match PacketType::try_from(self.current_packet.pkttype) {
- Some(t) => t,
- None => bail!("unexpected packet type"),
- };
-
- let length = self.current_packet.length;
- if (length as usize) < mem::size_of::<Packet>() {
- bail!("received packet of bad length ({})", length);
- }
-
- Ok(true)
- }
-
- fn read_packet(&mut self) -> Result<bool> {
- match self.read_packet_do() {
- Ok(b) => Ok(b),
- Err(e) => {
- if let Some(ioe) = e.downcast_ref::<std::io::Error>() {
- if ioe.kind() == io::ErrorKind::WouldBlock {
- return Ok(false);
- }
- }
- Err(e)
- }
- }
- }
-
- fn read_unaligned<T: Endian>(&self, offset: usize) -> Result<T> {
- if offset + mem::size_of::<T>() > self.buffer.len() {
- bail!("buffer underrun");
- }
- Ok(unsafe { ptr::read_unaligned(&self.buffer[offset] as *const _ as *const T) }.from_le())
- }
-
- pub fn read_unaligned_data<T: Endian>(&self, offset: usize) -> Result<T> {
- self.read_unaligned(offset + mem::size_of::<Packet>())
- }
-
- fn fill_packet(&mut self) -> Result<bool> {
- self.fill_buffer(self.current_packet.length as usize)
- }
-
- // convenience helpers:
-
- pub fn assert_size(&self, size: usize) -> Result<()> {
- if self.packet_data().len() != size {
- bail!(
- "protocol error: invalid packet size (type {})",
- self.current_packet.pkttype,
- );
- }
- Ok(())
- }
-
- pub fn assert_atleast(&self, size: usize) -> Result<()> {
- if self.packet_data().len() < size {
- bail!(
- "protocol error: invalid packet size (type {})",
- self.current_packet.pkttype,
- );
- }
- Ok(())
- }
-}
+++ /dev/null
-//! This module provides a `Connector` used to log into a Proxmox Backup API server and connect to
-//! the proxmox protocol via an HTTP Upgrade request.
-
-use std::io::{Read, Write};
-use std::net::TcpStream;
-
-use failure::{bail, format_err, Error};
-use openssl::ssl::{self, SslStream};
-use url::form_urlencoded;
-
-use crate::Client;
-
-enum Authentication {
- Password(String),
- Ticket(String, String),
-}
-
-/// Connector used to log into a Proxmox Backup API server and open a backup protocol connection.
-/// If successful, this will create a `Client` used to communicate over the Proxmox Backup
-/// Protocol.
-pub struct Connector {
- user: String,
- server: String,
- store: String,
- auth: Option<Authentication>,
- certificate_validation: bool,
-}
-
-fn build_login(host: &str, user: &str, pass: &str) -> Vec<u8> {
- let formdata = form_urlencoded::Serializer::new(String::new())
- .append_pair("username", user)
- .append_pair("password", pass)
- .finish();
-
- format!("\
- POST /api2/json/access/ticket HTTP/1.1\r\n\
- host: {}\r\n\
- content-length: {}\r\n\
- content-type: application/x-www-form-urlencoded\r\n\
- \r\n\
- {}",
- host,
- formdata.as_bytes().len(),
- formdata,
- )
- .into_bytes()
-}
-
-fn build_protocol_connect(host: &str, store: &str, ticket: &str, token: &str) -> Vec<u8> {
- format!("\
- GET /api2/json/admin/datastore/{}/test-upload HTTP/1.1\r\n\
- host: {}\r\n\
- connection: upgrade\r\n\
- upgrade: proxmox-backup-protocol-1\r\n\
- cookie: PBSAuthCookie={}\r\n\
- CSRFPreventionToken: {}\r\n\
- \r\n",
- store,
- host,
- ticket,
- token,
- )
- .into_bytes()
-}
-
-// Minimalistic http response reader. The only things we care about here are the status code and
-// the payload...
-fn read_http_response<T: Read>(sock: T) -> Result<(u16, Vec<u8>), Error> {
- use std::io::BufRead;
- let mut reader = std::io::BufReader::new(sock);
-
- let mut status = String::new();
- reader.read_line(&mut status)?;
-
- let status = status.trim_end();
- let mut parts = status.splitn(3, ' ');
- let _version = parts
- .next()
- .ok_or_else(|| format_err!("bad http response (missing version)"))?;
- let code = parts
- .next()
- .ok_or_else(|| format_err!("bad http response (missing status code)"))?;
- let _reason = parts.next();
-
- let code: u16 = code.parse()?;
-
- // We need the payload's length if there is one:
- let mut length: Option<u32> = None;
- let mut line = String::new();
- loop {
- line.clear();
- reader.read_line(&mut line)?;
- let line = line.trim_end();
-
- if line.len() == 0 {
- break;
- }
-
- let parts: Vec<&str> = line.splitn(2, ':').collect();
- if parts.len() != 2 {
- bail!("invalid header in http response");
- }
-
- let name = parts[0].trim().to_lowercase().to_string();
- let value = parts[1].trim();
-
- // The only important header (important to know how much we need to read!)
- if name == "content-length" {
- length = Some(value.parse()?);
- }
-
- // Don't care about any other header contents currently...
- }
-
- match length {
- None => Ok((code, Vec::new())),
- Some(length) => {
- let length = length as usize;
-
- let mut out = Vec::with_capacity(length);
- unsafe {
- out.set_len(length);
- }
-
- reader.read_exact(&mut out)?;
- Ok((code, out))
- },
- }
-}
-
-fn parse_login_response(data: &[u8]) -> Result<(String, String), Error> {
- let value: serde_json::Value = serde_json::from_slice(data)?;
- let ticket = value["data"]["ticket"]
- .as_str()
- .ok_or_else(|| format_err!("no ticket found in login response"))?
- .to_string();
- let token = value["data"]["CSRFPreventionToken"]
- .as_str()
- .ok_or_else(|| format_err!("no token found in login response"))?
- .to_string();
- Ok((ticket, token))
-}
-
-impl Connector {
- /// Create a new connector for a specified user, server and remote backup store.
- pub fn new(user: String, server: String, store: String) -> Self {
- Self {
- user,
- server,
- store,
- auth: None,
- certificate_validation: true,
- }
- }
-
- /// Use a password to authenticate with the remote server.
- pub fn password(mut self, pass: String) -> Self {
- self.auth = Some(Authentication::Password(pass));
- self
- }
-
- /// Use an already existing ticket to connect to the server.
- pub fn ticket(mut self, ticket: String, token: String) -> Self {
- self.auth = Some(Authentication::Ticket(ticket, token));
- self
- }
-
- /// Disable TLS certificate validation.
- pub fn certificate_validation(mut self, on: bool) -> Self {
- self.certificate_validation = on;
- self
- }
-
- pub(crate) fn do_connect(self) -> Result<SslStream<TcpStream>, Error> {
- if self.auth.is_none() {
- bail!("missing authentication");
- }
-
- let stream = TcpStream::connect(&self.server)?;
-
- let mut connector = ssl::SslConnector::builder(ssl::SslMethod::tls())?;
- if !self.certificate_validation {
- connector.set_verify(ssl::SslVerifyMode::NONE);
- }
- let connector = connector.build();
-
- let mut stream = connector.connect(&self.server, stream)?;
- let (ticket, token) = match self.auth {
- None => unreachable!(), // checked above
- Some(Authentication::Password(password)) => {
- let login_request = build_login(&self.server, &self.user, &password);
- stream.write_all(&login_request)?;
-
- let (code, ticket) = read_http_response(&mut stream)?;
- if code != 200 {
- bail!("login failed");
- }
-
- parse_login_response(&ticket)?
- }
- Some(Authentication::Ticket(ticket, token)) => (ticket, token),
- };
-
- let protocol_request = build_protocol_connect(&self.server, &self.store, &ticket, &token);
- stream.write_all(&protocol_request)?;
- let (code, _empty_body) = read_http_response(&mut stream)?;
- if code != 101 {
- bail!("expected 101 Switching Protocol, received code: {}", code);
- }
-
- Ok(stream)
- }
-
- /// This creates creates a synchronous client (via blocking I/O), tries to authenticate with
- /// the server and connect to the protocol endpoint. On success, a `Client` is returned.
- pub fn connect(self) -> Result<Client<SslStream<TcpStream>>, Error> {
- let stream = self.do_connect()?;
-
- let mut client = Client::new(stream);
- if !client.wait_for_handshake()? {
- bail!("handshake failed");
- }
- Ok(client)
- }
-}
+++ /dev/null
-pub(crate) mod common;
-
-pub mod protocol;
-pub mod server;
-pub mod tools;
-
-mod chunk_stream;
-pub use chunk_stream::*;
-
-mod chunker;
-pub use chunker::*;
-
-mod client;
-pub use client::*;
-
-mod connect;
-pub use connect::*;
-
-mod types;
-pub use types::*;
-
-pub mod c_chunker;
-pub mod c_client;
-pub mod c_connector;
+++ /dev/null
-use std::mem;
-
-use endian_trait::Endian;
-
-// There's no reason to have more than that in a single packet...
-pub const MAX_PACKET_SIZE: u32 = 16 * 1024 * 1024;
-
-// Each packet has a transaction ID (eg. when uploading multiple disks each
-// upload is a separate stream).
-#[derive(Endian)]
-#[repr(C, packed)]
-pub struct Packet {
- pub id: u8, // request/command id
- pub pkttype: u8, // packet type
- pub length: u32, // data length before the next packet
-
- // content is attached directly afterwards
-}
-
-impl Packet {
- pub fn builder(id: u8, pkttype: PacketType) -> PacketBuilder {
- PacketBuilder::new(id, pkttype)
- }
-
- pub fn simple(id: u8, pkttype: PacketType) -> Vec<u8> {
- Self::builder(id, pkttype).finish()
- }
-}
-
-#[derive(Endian, Clone, Copy)]
-#[repr(u8)]
-pub enum ErrorId {
- Generic,
- Busy,
-}
-
-#[repr(u8)]
-#[derive(Clone, Copy)]
-pub enum PacketType {
- /// First packet sent by the server.
- Hello,
-
- /// Generic acknowledgement.
- Ok,
-
- /// Error packet sent by the server, this cancels the request for which it is produced.
- Error,
-
- /// The client wants the list of available hashes in order to know which ones require an
- /// upload.
- ///
- /// The body should contain a backup file name of which to retrieve the hash list.
- ///
- /// Server responds with a sequence of ``HashListPart`` packets.
- GetHashList,
-
- /// Array of hashes. The number of hashes in a packet is calculated from the packet length as
- /// provided in the ``Packet`` struct. An empty packet indicates the end of the list. We send a
- /// sequence of such packets because we don't know whether the server will be keeping the list
- /// in memory yet, so it might not know the number in advance and may be iterating through
- /// directories until it hits an end. It can produce the network packets asynchronously while
- /// walking the chunk dir.
- HashListPart,
-
- /// Client requests to download chunks via a hash list from the server. The number of chunks
- /// can be derived from the length of this request, so it works similar to ``HashListPart``,
- /// but there's only 1 chunk list per request ID.
- ///
- /// The server responds with a sequence of ``Chunk`` packets or ``Error``.
- DownloadChunks,
-
- /// The response to ``DownloadChunks``. One packet per requested chunk.
- Chunk,
-
- /// The upload of a chunk can happen independent from the ongoing backup
- /// streams. Server responds with an ``OK``.
- UploadChunk,
-
- /// Create a file in a new or existing backup. Contains all the metadata of
- /// a file.
- ///
- /// The server responds with ``BackupCreated`` or ``Error``. On ``BackupCreated`` the client
- /// may proceed to send as many ``BackupData...`` packets as necessary to fill the file.
- /// The sequence is finished by the client with a ``BackupFinished``.
- CreateBackup,
-
- /// Successful from the server to a client's ``CreateBackup`` packet. Contains the server side
- /// path relative to the store.
- BackupCreated,
-
- /// This packet contains an array of references to fixed sized chunks. Clients should upload
- /// chunks via ``UploadChunk`` packets before using them in this type of packet. A non-existent
- /// chunk is an error.
- ///
- /// The server produces an ``Error`` packet in case of an error.
- BackupDataFixed,
-
- /// This packet contains an array of references to dynamic sized chunks. Clients should upload
- /// chunks via ``UploadChunk`` packets before using them in this type of packet. A non-existent
- /// chunk is an error.
- ///
- /// The server produces an ``Error`` packet in case of an error.
- BackupDataDynamic,
-
- /// This ends a backup file. The server responds with an ``OK`` or an ``Error`` packet.
- BackupFinished,
-}
-
-// Nightly has a std::convert::TryFrom, actually...
-impl PacketType {
- pub fn try_from(v: u8) -> Option<Self> {
- if v <= PacketType::BackupFinished as u8 {
- Some(unsafe { std::mem::transmute(v) })
- } else {
- None
- }
- }
-}
-
-// Not using bitflags! for Endian derive...
-pub mod backup_flags {
- pub type Type = u8;
- /// The backup must not exist yet.
- pub const EXCL: Type = 0x00000001;
- /// The data represents a raw file
- pub const RAW: Type = 0x00000002;
- /// The data uses dynamically sized chunks (catar file)
- pub const DYNAMIC_CHUNKS: Type = 0x00000004;
-}
-
-pub mod backup_type {
- pub type Type = u8;
- pub const VM: Type = 0;
- pub const CT: Type = 1;
- pub const HOST: Type = 2;
-
- use failure::{bail, Error};
- pub fn id_to_name(id: Type) -> Result<&'static str, Error> {
- Ok(match id {
- VM => "vm",
- CT => "ct",
- HOST => "host",
- n => bail!("unknown backup type id: {}", n),
- })
- }
-
- pub fn name_to_id(id: &str) -> Result<Type, Error> {
- Ok(match id {
- "vm" => VM,
- "ct" => CT,
- "host" => HOST,
- n => bail!("unknown backup type name: {}", n),
- })
- }
-}
-
-#[repr(C, packed)]
-#[derive(Endian)]
-pub struct DynamicChunk {
- pub offset: u64,
- pub digest: [u8; 32],
-}
-
-pub mod server {
- use endian_trait::Endian;
-
- pub const PROTOCOL_VERSION: u32 = 1;
-
- pub const HELLO_MAGIC: [u8; 8] = *b"PMXBCKUP";
-
- pub const HELLO_VERSION: u32 = 1; // the current version
- #[derive(Endian)]
- #[repr(C, packed)]
- pub struct Hello {
- pub magic: [u8; 8],
- pub version: u32,
- }
-
- #[derive(Endian)]
- #[repr(C, packed)]
- pub struct Error {
- pub id: u8,
- }
-
- #[derive(Endian)]
- #[repr(C, packed)]
- pub struct Chunk {
- pub hash: super::DynamicChunk,
- // Data follows here...
- }
-
- #[derive(Endian)]
- #[repr(C, packed)]
- pub struct BackupCreated {
- pub path_length: u16,
- // path follows here
- }
-}
-
-pub mod client {
- use endian_trait::Endian;
-
- #[derive(Endian)]
- #[repr(C, packed)]
- pub struct UploadChunk {
- pub hash: crate::FixedChunk,
- }
-
- #[derive(Endian)]
- #[repr(C, packed)]
- pub struct CreateBackup {
- pub backup_type: super::backup_type::Type,
- pub id_length: u8, // length of the ID string
- pub timestamp: u64, // seconds since the epoch
- pub flags: super::backup_flags::Type,
- pub name_length: u8, // file name length
- pub chunk_size: u32, // average or "fixed" chunk size
- pub file_size: u64, // size for fixed size files (must be 0 if DYNAMIC_CHUNKS is set)
-
- // ``id_length`` bytes of ID follow
- // ``name_length`` bytes of file name follow
- // Further packets contain the data or chunks
- }
-
- #[derive(Endian)]
- #[repr(C, packed)]
- pub struct GetHashList {
- pub name_length: u16,
- // name follows as payload
- }
-}
-
-pub struct PacketBuilder {
- data: Vec<u8>,
-}
-
-impl PacketBuilder {
- pub fn new(id: u8, pkttype: PacketType) -> Self {
- let data = Vec::with_capacity(mem::size_of::<Packet>());
- let mut me = Self { data };
- me.write_data(
- Packet {
- id,
- pkttype: pkttype as _,
- length: 0,
- }
- .to_le(),
- );
- me
- }
-
- pub fn reserve(&mut self, more: usize) -> &mut Self {
- self.data.reserve(more);
- self
- }
-
- pub fn write_buf(&mut self, buf: &[u8]) -> &mut Self {
- self.data.extend_from_slice(buf);
- self
- }
-
- pub fn write_data<T: Endian>(&mut self, data: T) -> &mut Self {
- self.write_data_noswap(&data.to_le())
- }
-
- pub fn write_data_noswap<T>(&mut self, data: &T) -> &mut Self {
- self.write_buf(unsafe {
- std::slice::from_raw_parts(data as *const T as *const u8, mem::size_of::<T>())
- })
- }
-
- pub fn finish(mut self) -> Vec<u8> {
- let length = self.data.len();
- assert!(length >= mem::size_of::<Packet>());
- unsafe {
- let head = self.data.as_mut_ptr() as *mut Packet;
- std::ptr::write_unaligned((&mut (*head).length) as *mut u32, (length as u32).to_le());
- }
- self.data
- }
-}
+++ /dev/null
-use std::collections::hash_map::{self, HashMap};
-use std::io::{Read, Write};
-use std::{mem, ptr};
-
-use failure::*;
-
-use endian_trait::Endian;
-
-use crate::common;
-use crate::protocol::*;
-use crate::ChunkEntry;
-use crate::FixedChunk;
-
-type Result<T> = std::result::Result<T, Error>;
-
-pub trait ChunkList: Send {
- fn next(&mut self) -> Result<Option<&[u8; 32]>>;
-}
-
-/// This provides callbacks used by a `Connection` when it receives a packet.
-pub trait HandleClient {
- /// The protocol handler will call this when the client produces an irrecoverable error.
- fn error(&self);
-
- /// The client wants the list of hashes, the provider should provide an iterator over chunk
- /// entries.
- fn get_chunk_list(&self, backup_name: &str) -> Result<Box<dyn ChunkList>>;
-
- /// The client has uploaded a chunk, we should add it to the chunk store. Return whether the
- /// chunk was new.
- fn upload_chunk(&self, chunk: &ChunkEntry, data: &[u8]) -> Result<bool>;
-
- /// The client wants to create a backup. Since multiple backup streams can happen in parallel,
- /// this should return a handler used to create the individual streams.
- /// The handler will be informed about success via the ``finish()`` method.
- fn create_backup(
- &self,
- backup_type: &str,
- id: &str,
- timestamp: i64,
- new: bool,
- ) -> Result<Box<dyn HandleBackup + Send>>;
-}
-
-/// A single backup may contain multiple files. Currently we represent this via a hierarchy where
-/// the `HandleBackup` trait is instantiated for each backup, which is responsible for
-/// instantiating the `BackupFile` trait objects.
-pub trait HandleBackup {
- /// All individual streams for this backup have been successfully finished.
- fn finish(&mut self) -> Result<()>;
-
- /// Create a specific file in this backup.
- fn create_file(
- &self,
- name: &str,
- fixed_size: Option<u64>,
- chunk_size: usize,
- ) -> Result<Box<dyn BackupFile + Send>>;
-}
-
-/// This handles backup files created by calling `create_file` on a `Backup`.
-pub trait BackupFile {
- /// Backup use the server-local timestamp formatting, so we want to be able to tell the client
- /// the real remote path:
- fn relative_path(&self) -> &str;
-
- /// The client wants to add a chunk to a fixed index file at a certain position.
- fn add_fixed_data(&mut self, index: u64, chunk: &FixedChunk) -> Result<()>;
-
- /// The client wants to add a chunks to a dynamic index file.
- fn add_dynamic_data(&mut self, chunk: &DynamicChunk) -> Result<()>;
-
- /// This notifies the handler that the backup has finished successfully. This should commit the
- /// data to the store for good. After this the client will receive an "ok".
- ///
- /// If the Drop handler gets called before this method, the backup was aborted due to an error
- /// or the client disconnected unexpectedly, in which case cleanup of temporary files should be
- /// performed.
- fn finish(&mut self) -> Result<()>;
-}
-
-#[derive(Clone, Eq, Hash, PartialEq)]
-struct BackupId(backup_type::Type, String, i64);
-
-/// Associates a socket with the server side of the backup protocol.
-/// The communcation channel should be `Read + Write` and may be non-blocking (provided it
-/// correctly returns `io::ErrorKind::WouldBlock`).
-/// The handler has to implement the `HandleClient` trait to provide callbacks used while
-/// communicating with the client.
-pub struct Connection<S, H>
-where
- S: Read + Write,
- H: HandleClient,
-{
- handler: H,
- common: common::Connection<S>,
-
- // states:
-
- // If this is set we are currently transferring our hash list to the client:
- hash_list: Option<(
- u8, // data stream ID
- Box<dyn ChunkList>,
- )>,
-
- // currently active 'backups' (handlers for a specific BackupDir)
- backups: HashMap<BackupId, Box<dyn HandleBackup + Send>>,
-
- // currently active backup *file* streams
- backup_files: HashMap<u8, Box<dyn BackupFile + Send>>,
-}
-
-impl<S, H> Connection<S, H>
-where
- S: Read + Write,
- H: HandleClient,
-{
- pub fn new(socket: S, handler: H) -> Result<Self> {
- let mut me = Self {
- handler,
- common: common::Connection::new(socket),
- hash_list: None,
- backups: HashMap::new(),
- backup_files: HashMap::new(),
- };
-
- me.send_hello()?;
- Ok(me)
- }
-
- fn send_hello(&mut self) -> Result<()> {
- let mut packet = Packet::builder(0, PacketType::Hello);
- packet.write_data(server::Hello {
- magic: server::HELLO_MAGIC,
- version: server::PROTOCOL_VERSION,
- });
- self.common.queue_data(packet.finish())?;
- Ok(())
- }
-
- pub fn eof(&self) -> bool {
- self.common.eof
- }
-
- /// It is safe to clear the error after an `io::ErrorKind::Interrupted`.
- pub fn clear_err(&mut self) {
- self.common.clear_err()
- }
-
- pub fn main(&mut self) -> Result<()> {
- self.poll_read()?;
- self.poll_send()?;
- Ok(())
- }
-
- // If this returns an error it is considered fatal and the connection should be dropped!
- fn poll_read(&mut self) -> Result<()> {
- if self.common.eof {
- // polls after EOF are errors:
- bail!("client disconnected");
- }
-
- if !self.common.poll_read()? {
- // No data available
- if self.common.eof {
- bail!("client disconnected");
- }
- return Ok(());
- }
-
- // we received a packet, handle it:
-
- loop {
- use PacketType::*;
- match self.common.current_packet_type {
- GetHashList => self.hash_list_requested()?,
- UploadChunk => self.receive_chunk()?,
- CreateBackup => self.create_backup()?,
- BackupDataDynamic => self.backup_data_dynamic()?,
- BackupDataFixed => self.backup_data_fixed()?,
- BackupFinished => self.backup_finished()?,
- _ => bail!(
- "client sent an unexpected packet of type {}",
- self.common.current_packet_type as u32,
- ),
- };
- self.common.next()?;
- if !self.common.poll_read()? {
- break;
- }
- }
-
- Ok(())
- }
-
- fn poll_send(&mut self) -> Result<()> {
- if self.common.error {
- eprintln!("refusing to send datato client in error state");
- bail!("client is in error state");
- }
-
- if let Some(false) = self.common.poll_send()? {
- // send queue is not finished, don't add anything else...
- return Ok(());
- }
-
- // Queue has either finished or was empty, see if we should enqueue more data:
- if self.hash_list.is_some() {
- return self.send_hash_list();
- }
- Ok(())
- }
-
- fn hash_list_requested(&mut self) -> Result<()> {
- // Verify protocol: GetHashList is an empty packet.
- let request = self.common.read_unaligned_data::<client::GetHashList>(0)?;
- self.common
- .assert_size(mem::size_of_val(&request) + request.name_length as usize)?;
- let name_bytes = &self.common.packet_data()[mem::size_of_val(&request)..];
- let name = std::str::from_utf8(name_bytes)?;
-
- // We support only one active hash list stream:
- if self.hash_list.is_some() {
- return self.respond_error(ErrorId::Busy);
- }
-
- self.hash_list = Some((
- self.common.current_packet.id,
- self.handler.get_chunk_list(name)?,
- ));
-
- Ok(())
- }
-
- fn send_hash_list(&mut self) -> Result<()> {
- loop {
- let (stream_id, hash_iter) = self.hash_list.as_mut().unwrap();
-
- let max_chunks_per_packet = (MAX_PACKET_SIZE as usize - mem::size_of::<Packet>())
- / mem::size_of::<FixedChunk>();
-
- let mut packet = Packet::builder(*stream_id, PacketType::HashListPart);
- packet.reserve(mem::size_of::<FixedChunk>() * max_chunks_per_packet);
-
- let mut count = 0;
- for _ in 0..max_chunks_per_packet {
- let entry: &[u8; 32] = match hash_iter.next() {
- Ok(Some(entry)) => entry,
- Ok(None) => break,
- Err(e) => {
- eprintln!("error sending chunk list to client: {}", e);
- continue;
- }
- };
-
- packet.write_buf(entry);
- count += 1;
- }
-
- let can_send_more = self.common.queue_data(packet.finish())?;
-
- if count == 0 {
- // We just sent the EOF packet, clear our iterator state!
- self.hash_list = None;
- break;
- }
-
- if !can_send_more {
- break;
- }
- }
- Ok(())
- }
-
- fn respond_error(&mut self, kind: ErrorId) -> Result<()> {
- self.respond_value(PacketType::Error, kind)?;
- Ok(())
- }
-
- fn respond_value<T: Endian>(&mut self, pkttype: PacketType, data: T) -> Result<()> {
- let mut packet = Packet::builder(self.common.current_packet.id, pkttype);
- packet.write_data(data);
- self.common.queue_data(packet.finish())?;
- Ok(())
- }
-
- fn respond_empty(&mut self, pkttype: PacketType) -> Result<()> {
- self.common
- .queue_data(Packet::simple(self.common.current_packet.id, pkttype))?;
- Ok(())
- }
-
- fn respond_ok(&mut self) -> Result<()> {
- self.respond_empty(PacketType::Ok)
- }
-
- fn receive_chunk(&mut self) -> Result<()> {
- self.common
- .assert_atleast(mem::size_of::<client::UploadChunk>())?;
- let data = self.common.packet_data();
- let (hash, data) = data.split_at(mem::size_of::<FixedChunk>());
- if data.len() == 0 {
- bail!("received an empty chunk");
- }
- let entry = ChunkEntry::from_data(data);
- if entry.hash != hash {
- let cli_hash = crate::tools::digest_to_hex(hash);
- let data_hash = entry.digest_to_hex();
- bail!(
- "client claimed data with digest {} has digest {}",
- data_hash,
- cli_hash
- );
- }
- let _new = self.handler.upload_chunk(&entry, data)?;
- self.respond_ok()
- }
-
- fn create_backup(&mut self) -> Result<()> {
- if self
- .backup_files
- .contains_key(&self.common.current_packet.id)
- {
- bail!("stream id already in use...");
- }
-
- let create_msg = self.common.read_unaligned_data::<client::CreateBackup>(0)?;
-
- // simple data:
- let flags = create_msg.flags;
- let backup_type = create_msg.backup_type;
- let time = create_msg.timestamp as i64;
-
- // text comes from the payload data after the CreateBackup struct:
- let data = self.common.packet_data();
- let payload = &data[mem::size_of_val(&create_msg)..];
-
- // there must be exactly the ID and the file name in the payload:
- let id_len = create_msg.id_length as usize;
- let name_len = create_msg.name_length as usize;
- let expected_len = id_len + name_len;
- if payload.len() < expected_len {
- bail!("client sent incomplete CreateBackup request");
- } else if payload.len() > expected_len {
- bail!("client sent excess data in CreateBackup request");
- }
-
- // id and file name must be utf8:
- let id = std::str::from_utf8(&payload[0..id_len])
- .map_err(|e| format_err!("client-requested backup id is invalid: {}", e))?;
- let file_name = std::str::from_utf8(&payload[id_len..])
- .map_err(|e| format_err!("client-requested backup file name invalid: {}", e))?;
-
- // Sanity check dynamic vs fixed:
- let is_dynamic = (flags & backup_flags::DYNAMIC_CHUNKS) != 0;
- let file_size = match (is_dynamic, create_msg.file_size) {
- (false, size) => Some(size),
- (true, 0) => None,
- (true, _) => bail!("file size of dynamic streams must be zero"),
- };
-
- // search or create the handler:
- let hashmap_id = BackupId(backup_type, id.to_string(), time);
- let handle = match self.backups.entry(hashmap_id) {
- hash_map::Entry::Vacant(entry) => entry.insert(self.handler.create_backup(
- backup_type::id_to_name(backup_type)?,
- id,
- time,
- (flags & backup_flags::EXCL) != 0,
- )?),
- hash_map::Entry::Occupied(entry) => entry.into_mut(),
- };
- let file = handle.create_file(file_name, file_size, create_msg.chunk_size as usize)?;
-
- let mut response =
- Packet::builder(self.common.current_packet.id, PacketType::BackupCreated);
- let path = file.relative_path();
- if path.len() > 0xffff {
- bail!("path too long");
- }
- response
- .write_data(server::BackupCreated {
- path_length: path.len() as _,
- })
- .write_buf(path.as_bytes());
- self.common.queue_data(response.finish())?;
-
- self.backup_files
- .insert(self.common.current_packet.id, file);
-
- Ok(())
- }
-
- fn backup_data_dynamic(&mut self) -> Result<()> {
- let stream_id = self.common.current_packet.id;
- let file = self
- .backup_files
- .get_mut(&stream_id)
- .ok_or_else(|| format_err!("BackupDataDynamic for invalid stream id {}", stream_id))?;
-
- let mut data = self.common.packet_data();
- // Data consists of (offset: u64, hash: [u8; 32])
- let entry_len = mem::size_of::<DynamicChunk>();
-
- while data.len() >= entry_len {
- let mut entry = unsafe { ptr::read_unaligned(data.as_ptr() as *const DynamicChunk) };
- data = &data[entry_len..];
-
- entry.offset = entry.offset.from_le();
- file.add_dynamic_data(&entry)?;
- }
-
- if data.len() != 0 {
- bail!(
- "client sent excess data ({} bytes) after dynamic chunk indices!",
- data.len()
- );
- }
-
- Ok(())
- }
-
- fn backup_data_fixed(&mut self) -> Result<()> {
- let stream_id = self.common.current_packet.id;
- let file = self
- .backup_files
- .get_mut(&stream_id)
- .ok_or_else(|| format_err!("BackupDataFixed for invalid stream id {}", stream_id))?;
-
- let mut data = self.common.packet_data();
- // Data consists of (index: u64, hash: [u8; 32])
- #[repr(C, packed)]
- struct IndexedChunk {
- index: u64,
- digest: FixedChunk,
- }
- let entry_len = mem::size_of::<IndexedChunk>();
-
- while data.len() >= entry_len {
- let mut entry = unsafe { ptr::read_unaligned(data.as_ptr() as *const IndexedChunk) };
- data = &data[entry_len..];
-
- entry.index = entry.index.from_le();
- file.add_fixed_data(entry.index, &entry.digest)?;
- }
-
- if data.len() != 0 {
- bail!(
- "client sent excess data ({} bytes) after dynamic chunk indices!",
- data.len()
- );
- }
-
- Ok(())
- }
-
- fn backup_finished(&mut self) -> Result<()> {
- let stream_id = self.common.current_packet.id;
- let mut file = self
- .backup_files
- .remove(&stream_id)
- .ok_or_else(|| format_err!("BackupDataDynamic for invalid stream id {}", stream_id))?;
- file.finish()?;
- self.respond_ok()
- }
-}
+++ /dev/null
-use failure::{bail, Error};
-
-pub fn digest_to_hex(digest: &[u8]) -> String {
- const HEX_CHARS: &'static [u8; 16] = b"0123456789abcdef";
-
- let mut buf = Vec::<u8>::with_capacity(digest.len() * 2);
-
- for i in 0..digest.len() {
- buf.push(HEX_CHARS[(digest[i] >> 4) as usize]);
- buf.push(HEX_CHARS[(digest[i] & 0xf) as usize]);
- }
-
- unsafe { String::from_utf8_unchecked(buf) }
-}
-
-pub unsafe fn swapped_data_to_buf<T>(data: &T) -> &[u8] {
- std::slice::from_raw_parts(data as *const T as *const u8, std::mem::size_of::<T>())
-}
-
-fn hex_nibble(c: u8) -> Result<u8, Error> {
- Ok(match c {
- b'0'..=b'9' => c - b'0',
- b'a'..=b'f' => c - b'a' + 0xa,
- b'A'..=b'F' => c - b'A' + 0xa,
- _ => bail!("not a hex digit: {}", c as char),
- })
-}
-
-#[inline]
-pub fn parse_hex_digest<T: AsRef<[u8]>>(hex: T) -> Result<[u8; 32], Error> {
- let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() };
-
- let hex = hex.as_ref();
-
- if hex.len() != 64 {
- bail!(
- "invalid hex digest ({} instead of 64 digits long)",
- hex.len()
- );
- }
-
- for i in 0..32 {
- digest[i] = (hex_nibble(hex[i * 2])? << 4) + hex_nibble(hex[i * 2 + 1])?;
- }
-
- Ok(digest)
-}
+++ /dev/null
-use std::borrow::Borrow;
-
-use endian_trait::Endian;
-use failure::*;
-
-#[derive(Clone, Copy, Debug, Eq, PartialEq)]
-pub enum IndexType {
- Fixed,
- Dynamic,
-}
-
-#[derive(Endian, Clone, Debug, Eq, Hash, PartialEq)]
-#[repr(transparent)]
-pub struct FixedChunk(pub [u8; 32]);
-
-impl FixedChunk {
- pub fn new(hash: [u8; 32]) -> Self {
- Self(hash)
- }
-
- pub fn from_hex<T: AsRef<[u8]>>(hex: T) -> Result<Self, Error> {
- Ok(Self::new(crate::tools::parse_hex_digest(hex.as_ref())?))
- }
-
- pub fn from_data(data: &[u8]) -> Self {
- let mut hasher = openssl::sha::Sha256::new();
- hasher.update(data);
- Self::new(hasher.finish())
- }
-
- pub fn digest_to_hex(&self) -> String {
- crate::tools::digest_to_hex(&self.0)
- }
-}
-
-#[derive(Endian, Clone, Copy, Debug, Hash)]
-#[repr(C, packed)]
-pub struct ChunkEntry {
- pub hash: [u8; 32],
- pub size: u64,
-}
-
-impl ChunkEntry {
- pub fn new(hash: [u8; 32], size: u64) -> Self {
- Self { hash, size }
- }
-
- pub fn from_hex<T: AsRef<[u8]>>(hex: T, size: u64) -> Result<Self, Error> {
- Ok(Self::new(
- crate::tools::parse_hex_digest(hex.as_ref())?,
- size,
- ))
- }
-
- pub fn len(&self) -> u64 {
- self.size
- }
-
- pub fn from_data(data: &[u8]) -> Self {
- let mut hasher = openssl::sha::Sha256::new();
- hasher.update(data);
- Self::new(hasher.finish(), data.len() as u64)
- }
-
- pub fn digest_to_hex(&self) -> String {
- crate::tools::digest_to_hex(&self.hash)
- }
-
- pub fn to_fixed(&self) -> FixedChunk {
- FixedChunk(self.hash)
- }
-}
-
-impl PartialEq for ChunkEntry {
- fn eq(&self, other: &Self) -> bool {
- self.size == other.size && self.hash == other.hash
- }
-}
-
-impl Eq for ChunkEntry {}
-
-impl Into<FixedChunk> for ChunkEntry {
- fn into(self) -> FixedChunk {
- FixedChunk(self.hash)
- }
-}
-
-impl Borrow<FixedChunk> for ChunkEntry {
- fn borrow(&self) -> &FixedChunk {
- unsafe { std::mem::transmute(&self.hash) }
- }
-}
-
-impl Borrow<FixedChunk> for [u8; 32] {
- fn borrow(&self) -> &FixedChunk {
- unsafe { std::mem::transmute(self) }
- }
-}
mod checksum_writer;
pub use checksum_writer::*;
+mod chunker;
+pub use chunker::*;
+
mod data_chunk;
pub use data_chunk::*;
mod chunk_stat;
pub use chunk_stat::*;
-pub use proxmox_protocol::Chunker;
-
mod read_chunk;
pub use read_chunk::*;
+use bytes::BytesMut;
use failure::*;
-
-use proxmox_protocol::Chunker;
-use futures::{Async, Poll};
use futures::stream::Stream;
+use futures::{Async, Poll};
-use bytes::BytesMut;
+use super::Chunker;
/// Split input stream into dynamic sized chunks
pub struct ChunkStream<S> {
--- /dev/null
+/// Note: window size 32 or 64, is faster because we can
+/// speedup modulo operations, but always computes hash 0
+/// for constant data streams .. 0,0,0,0,0,0
+/// so we use a modified the chunk boundary test too not
+/// use hash value 0 to detect a boundary.
+const CA_CHUNKER_WINDOW_SIZE: usize = 64;
+
+/// Slinding window chunker (Buzhash)
+///
+/// This is a rewrite of *casync* chunker (cachunker.h) in rust.
+///
+/// Hashing by cyclic polynomial (also called Buzhash) has the benefit
+/// of avoiding multiplications, using barrel shifts instead. For more
+/// information please take a look at the [Rolling
+/// Hash](https://en.wikipedia.org/wiki/Rolling_hash) artikel from
+/// wikipedia.
+
+pub struct Chunker {
+ h: u32,
+ window_size: usize,
+ chunk_size: usize,
+
+ chunk_size_min: usize,
+ chunk_size_max: usize,
+ _chunk_size_avg: usize,
+
+ _discriminator: u32,
+
+ break_test_mask: u32,
+ break_test_minimum: u32,
+
+ window: [u8; CA_CHUNKER_WINDOW_SIZE],
+}
+
+const BUZHASH_TABLE: [u32; 256] = [
+ 0x458be752, 0xc10748cc, 0xfbbcdbb8, 0x6ded5b68, 0xb10a82b5, 0x20d75648, 0xdfc5665f, 0xa8428801,
+ 0x7ebf5191, 0x841135c7, 0x65cc53b3, 0x280a597c, 0x16f60255, 0xc78cbc3e, 0x294415f5, 0xb938d494,
+ 0xec85c4e6, 0xb7d33edc, 0xe549b544, 0xfdeda5aa, 0x882bf287, 0x3116737c, 0x05569956, 0xe8cc1f68,
+ 0x0806ac5e, 0x22a14443, 0x15297e10, 0x50d090e7, 0x4ba60f6f, 0xefd9f1a7, 0x5c5c885c, 0x82482f93,
+ 0x9bfd7c64, 0x0b3e7276, 0xf2688e77, 0x8fad8abc, 0xb0509568, 0xf1ada29f, 0xa53efdfe, 0xcb2b1d00,
+ 0xf2a9e986, 0x6463432b, 0x95094051, 0x5a223ad2, 0x9be8401b, 0x61e579cb, 0x1a556a14, 0x5840fdc2,
+ 0x9261ddf6, 0xcde002bb, 0x52432bb0, 0xbf17373e, 0x7b7c222f, 0x2955ed16, 0x9f10ca59, 0xe840c4c9,
+ 0xccabd806, 0x14543f34, 0x1462417a, 0x0d4a1f9c, 0x087ed925, 0xd7f8f24c, 0x7338c425, 0xcf86c8f5,
+ 0xb19165cd, 0x9891c393, 0x325384ac, 0x0308459d, 0x86141d7e, 0xc922116a, 0xe2ffa6b6, 0x53f52aed,
+ 0x2cd86197, 0xf5b9f498, 0xbf319c8f, 0xe0411fae, 0x977eb18c, 0xd8770976, 0x9833466a, 0xc674df7f,
+ 0x8c297d45, 0x8ca48d26, 0xc49ed8e2, 0x7344f874, 0x556f79c7, 0x6b25eaed, 0xa03e2b42, 0xf68f66a4,
+ 0x8e8b09a2, 0xf2e0e62a, 0x0d3a9806, 0x9729e493, 0x8c72b0fc, 0x160b94f6, 0x450e4d3d, 0x7a320e85,
+ 0xbef8f0e1, 0x21d73653, 0x4e3d977a, 0x1e7b3929, 0x1cc6c719, 0xbe478d53, 0x8d752809, 0xe6d8c2c6,
+ 0x275f0892, 0xc8acc273, 0x4cc21580, 0xecc4a617, 0xf5f7be70, 0xe795248a, 0x375a2fe9, 0x425570b6,
+ 0x8898dcf8, 0xdc2d97c4, 0x0106114b, 0x364dc22f, 0x1e0cad1f, 0xbe63803c, 0x5f69fac2, 0x4d5afa6f,
+ 0x1bc0dfb5, 0xfb273589, 0x0ea47f7b, 0x3c1c2b50, 0x21b2a932, 0x6b1223fd, 0x2fe706a8, 0xf9bd6ce2,
+ 0xa268e64e, 0xe987f486, 0x3eacf563, 0x1ca2018c, 0x65e18228, 0x2207360a, 0x57cf1715, 0x34c37d2b,
+ 0x1f8f3cde, 0x93b657cf, 0x31a019fd, 0xe69eb729, 0x8bca7b9b, 0x4c9d5bed, 0x277ebeaf, 0xe0d8f8ae,
+ 0xd150821c, 0x31381871, 0xafc3f1b0, 0x927db328, 0xe95effac, 0x305a47bd, 0x426ba35b, 0x1233af3f,
+ 0x686a5b83, 0x50e072e5, 0xd9d3bb2a, 0x8befc475, 0x487f0de6, 0xc88dff89, 0xbd664d5e, 0x971b5d18,
+ 0x63b14847, 0xd7d3c1ce, 0x7f583cf3, 0x72cbcb09, 0xc0d0a81c, 0x7fa3429b, 0xe9158a1b, 0x225ea19a,
+ 0xd8ca9ea3, 0xc763b282, 0xbb0c6341, 0x020b8293, 0xd4cd299d, 0x58cfa7f8, 0x91b4ee53, 0x37e4d140,
+ 0x95ec764c, 0x30f76b06, 0x5ee68d24, 0x679c8661, 0xa41979c2, 0xf2b61284, 0x4fac1475, 0x0adb49f9,
+ 0x19727a23, 0x15a7e374, 0xc43a18d5, 0x3fb1aa73, 0x342fc615, 0x924c0793, 0xbee2d7f0, 0x8a279de9,
+ 0x4aa2d70c, 0xe24dd37f, 0xbe862c0b, 0x177c22c2, 0x5388e5ee, 0xcd8a7510, 0xf901b4fd, 0xdbc13dbc,
+ 0x6c0bae5b, 0x64efe8c7, 0x48b02079, 0x80331a49, 0xca3d8ae6, 0xf3546190, 0xfed7108b, 0xc49b941b,
+ 0x32baf4a9, 0xeb833a4a, 0x88a3f1a5, 0x3a91ce0a, 0x3cc27da1, 0x7112e684, 0x4a3096b1, 0x3794574c,
+ 0xa3c8b6f3, 0x1d213941, 0x6e0a2e00, 0x233479f1, 0x0f4cd82f, 0x6093edd2, 0x5d7d209e, 0x464fe319,
+ 0xd4dcac9e, 0x0db845cb, 0xfb5e4bc3, 0xe0256ce1, 0x09fb4ed1, 0x0914be1e, 0xa5bdb2c3, 0xc6eb57bb,
+ 0x30320350, 0x3f397e91, 0xa67791bc, 0x86bc0e2c, 0xefa0a7e2, 0xe9ff7543, 0xe733612c, 0xd185897b,
+ 0x329e5388, 0x91dd236b, 0x2ecb0d93, 0xf4d82a3d, 0x35b5c03f, 0xe4e606f0, 0x05b21843, 0x37b45964,
+ 0x5eff22f4, 0x6027f4cc, 0x77178b3c, 0xae507131, 0x7bf7cabc, 0xf9c18d66, 0x593ade65, 0xd95ddf11,
+];
+
+impl Chunker {
+ /// Create a new Chunker instance, which produces and average
+ /// chunk size of `chunk_size_avg` (need to be a power of two). We
+ /// allow variation from `chunk_size_avg/4` up to a maximum of
+ /// `chunk_size_avg*4`.
+ pub fn new(chunk_size_avg: usize) -> Self {
+ // The chunk cut discriminator. In order to get an average
+ // chunk size of avg, we cut whenever for a hash value "h" at
+ // byte "i" given the descriminator "d(avg)": h(i) mod d(avg)
+ // == d(avg) - 1. Note that the discriminator calculated like
+ // this only yields correct results as long as the minimal
+ // chunk size is picked as avg/4, and the maximum chunk size
+ // as avg*4. If they are picked differently the result might
+ // be skewed into either direction.
+ let avg = chunk_size_avg as f64;
+ let discriminator = (avg / (-1.42888852e-7 * avg + 1.33237515)) as u32;
+
+ if chunk_size_avg.count_ones() != 1 {
+ panic!("got unexpected chunk size - not a power of two.");
+ }
+
+ let break_test_mask = (chunk_size_avg * 2 - 1) as u32;
+ let break_test_minimum = break_test_mask - 2;
+
+ Self {
+ h: 0,
+ window_size: 0,
+ chunk_size: 0,
+ chunk_size_min: chunk_size_avg >> 2,
+ chunk_size_max: chunk_size_avg << 2,
+ _chunk_size_avg: chunk_size_avg,
+ _discriminator: discriminator,
+ break_test_mask: break_test_mask,
+ break_test_minimum: break_test_minimum,
+ window: [0u8; CA_CHUNKER_WINDOW_SIZE],
+ }
+ }
+
+ /// Scans the specified data for a chunk border. Returns 0 if none
+ /// was found (and the function should be called with more data
+ /// later on), or another value indicating the position of a
+ /// border.
+ pub fn scan(&mut self, data: &[u8]) -> usize {
+ let window_len = self.window.len();
+ let data_len = data.len();
+
+ let mut pos = 0;
+
+ if self.window_size < window_len {
+ let need = window_len - self.window_size;
+ let copy_len = if need < data_len { need } else { data_len };
+
+ for _i in 0..copy_len {
+ let byte = data[pos];
+ self.window[self.window_size] = byte;
+ self.h = self.h.rotate_left(1) ^ BUZHASH_TABLE[byte as usize];
+ pos += 1;
+ self.window_size += 1;
+ }
+
+ self.chunk_size += copy_len;
+
+ // return if window is still not full
+ if self.window_size < window_len {
+ return 0;
+ }
+ }
+
+ //let mut idx = self.chunk_size % CA_CHUNKER_WINDOW_SIZE;
+ let mut idx = self.chunk_size & 0x3f;
+
+ while pos < data_len {
+ // roll window
+ let enter = data[pos];
+ let leave = self.window[idx];
+ self.h = self.h.rotate_left(1) ^
+ //BUZHASH_TABLE[leave as usize].rotate_left(CA_CHUNKER_WINDOW_SIZE as u32) ^
+ BUZHASH_TABLE[leave as usize] ^
+ BUZHASH_TABLE[enter as usize];
+
+ self.chunk_size += 1;
+ pos += 1;
+
+ self.window[idx] = enter;
+
+ if self.shall_break() {
+ self.h = 0;
+ self.chunk_size = 0;
+ self.window_size = 0;
+ return pos;
+ }
+
+ //idx = self.chunk_size % CA_CHUNKER_WINDOW_SIZE;
+ idx = self.chunk_size & 0x3f;
+ //idx += 1; if idx >= CA_CHUNKER_WINDOW_SIZE { idx = 0 };
+ }
+
+ 0
+ }
+
+ // fast implementation avoiding modulo
+ // #[inline(always)]
+ fn shall_break(&self) -> bool {
+ if self.chunk_size >= self.chunk_size_max {
+ return true;
+ }
+
+ if self.chunk_size < self.chunk_size_min {
+ return false;
+ }
+
+ //(self.h & 0x1ffff) <= 2 //THIS IS SLOW!!!
+
+ //(self.h & self.break_test_mask) <= 2 // Bad on 0 streams
+
+ (self.h & self.break_test_mask) >= self.break_test_minimum
+ }
+
+ // This is the original implementation from casync
+ /*
+ #[inline(always)]
+ fn shall_break_orig(&self) -> bool {
+
+ if self.chunk_size >= self.chunk_size_max { return true; }
+
+ if self.chunk_size < self.chunk_size_min { return false; }
+
+ (self.h % self.discriminator) == (self.discriminator - 1)
+ }
+ */
+}
+
+#[test]
+fn test_chunker1() {
+ let mut buffer = Vec::new();
+
+ for i in 0..(256 * 1024) {
+ for j in 0..4 {
+ let byte = ((i >> (j << 3)) & 0xff) as u8;
+ buffer.push(byte);
+ }
+ }
+ let mut chunker = Chunker::new(64 * 1024);
+
+ let mut pos = 0;
+ let mut last = 0;
+
+ let mut chunks1: Vec<(usize, usize)> = vec![];
+ let mut chunks2: Vec<(usize, usize)> = vec![];
+
+ // test1: feed single bytes
+ while pos < buffer.len() {
+ let k = chunker.scan(&buffer[pos..pos + 1]);
+ pos += 1;
+ if k != 0 {
+ let prev = last;
+ last = pos;
+ chunks1.push((prev, pos - prev));
+ }
+ }
+ chunks1.push((last, buffer.len() - last));
+
+ let mut chunker = Chunker::new(64 * 1024);
+
+ let mut pos = 0;
+
+ // test2: feed with whole buffer
+ while pos < buffer.len() {
+ let k = chunker.scan(&buffer[pos..]);
+ if k != 0 {
+ chunks2.push((pos, k));
+ pos += k;
+ } else {
+ break;
+ }
+ }
+
+ chunks2.push((pos, buffer.len() - pos));
+
+ if chunks1 != chunks2 {
+ let mut size1 = 0;
+ for (_offset, len) in &chunks1 {
+ size1 += len;
+ }
+ println!("Chunks1:{}\n{:?}\n", size1, chunks1);
+
+ let mut size2 = 0;
+ for (_offset, len) in &chunks2 {
+ size2 += len;
+ }
+ println!("Chunks2:{}\n{:?}\n", size2, chunks2);
+
+ if size1 != 256 * 4 * 1024 {
+ panic!("wrong size for chunks1");
+ }
+ if size2 != 256 * 4 * 1024 {
+ panic!("wrong size for chunks2");
+ }
+
+ panic!("got different chunks");
+ }
+}
use proxmox::tools::io::ReadExt;
use proxmox::tools::vec;
-use proxmox_protocol::Chunker;
+use super::Chunker;
+use super::IndexFile;
use super::chunk_stat::ChunkStat;
use super::chunk_store::ChunkStore;
use super::read_chunk::ReadChunk;
-use super::IndexFile;
use super::{DataChunk, DataChunkBuilder};
use crate::tools;