]> git.proxmox.com Git - proxmox-backup.git/blame - proxmox-protocol/src/server.rs
more formatting & use statement fixups
[proxmox-backup.git] / proxmox-protocol / src / server.rs
CommitLineData
ac4e349b
WB
1use std::collections::hash_map::{self, HashMap};
2use std::io::{Read, Write};
3use std::{mem, ptr};
4
5use failure::*;
6
7use endian_trait::Endian;
8
9use crate::common;
10use crate::protocol::*;
11use crate::ChunkEntry;
12use crate::FixedChunk;
13
14type Result<T> = std::result::Result<T, Error>;
15
16pub trait ChunkList: Send {
17 fn next(&mut self) -> Result<Option<&[u8; 32]>>;
18}
19
20/// This provides callbacks used by a `Connection` when it receives a packet.
21pub trait HandleClient {
22 /// The protocol handler will call this when the client produces an irrecoverable error.
23 fn error(&self);
24
25 /// The client wants the list of hashes, the provider should provide an iterator over chunk
26 /// entries.
27 fn get_chunk_list(&self, backup_name: &str) -> Result<Box<dyn ChunkList>>;
28
29 /// The client has uploaded a chunk, we should add it to the chunk store. Return whether the
30 /// chunk was new.
31 fn upload_chunk(&self, chunk: &ChunkEntry, data: &[u8]) -> Result<bool>;
32
33 /// The client wants to create a backup. Since multiple backup streams can happen in parallel,
34 /// this should return a handler used to create the individual streams.
35 /// The handler will be informed about success via the ``finish()`` method.
36 fn create_backup(
37 &self,
38 backup_type: &str,
39 id: &str,
40 timestamp: i64,
41 new: bool,
42 ) -> Result<Box<dyn HandleBackup + Send>>;
43}
44
45/// A single backup may contain multiple files. Currently we represent this via a hierarchy where
46/// the `HandleBackup` trait is instantiated for each backup, which is responsible for
47/// instantiating the `BackupFile` trait objects.
48pub trait HandleBackup {
49 /// All individual streams for this backup have been successfully finished.
50 fn finish(&mut self) -> Result<()>;
51
52 /// Create a specific file in this backup.
53 fn create_file(
54 &self,
55 name: &str,
56 fixed_size: Option<u64>,
57 chunk_size: usize,
58 ) -> Result<Box<dyn BackupFile + Send>>;
59}
60
61/// This handles backup files created by calling `create_file` on a `Backup`.
62pub trait BackupFile {
63 /// Backup use the server-local timestamp formatting, so we want to be able to tell the client
64 /// the real remote path:
65 fn relative_path(&self) -> &str;
66
67 /// The client wants to add a chunk to a fixed index file at a certain position.
68 fn add_fixed_data(&mut self, index: u64, chunk: &FixedChunk) -> Result<()>;
69
70 /// The client wants to add a chunks to a dynamic index file.
71 fn add_dynamic_data(&mut self, chunk: &DynamicChunk) -> Result<()>;
72
73 /// This notifies the handler that the backup has finished successfully. This should commit the
74 /// data to the store for good. After this the client will receive an "ok".
75 ///
76 /// If the Drop handler gets called before this method, the backup was aborted due to an error
77 /// or the client disconnected unexpectedly, in which case cleanup of temporary files should be
78 /// performed.
79 fn finish(&mut self) -> Result<()>;
80}
81
82#[derive(Clone, Eq, Hash, PartialEq)]
83struct BackupId(backup_type::Type, String, i64);
84
85/// Associates a socket with the server side of the backup protocol.
86/// The communcation channel should be `Read + Write` and may be non-blocking (provided it
87/// correctly returns `io::ErrorKind::WouldBlock`).
88/// The handler has to implement the `HandleClient` trait to provide callbacks used while
89/// communicating with the client.
90pub struct Connection<S, H>
91where
92 S: Read + Write,
93 H: HandleClient,
94{
95 handler: H,
96 common: common::Connection<S>,
97
98 // states:
99
100 // If this is set we are currently transferring our hash list to the client:
101 hash_list: Option<(
102 u8, // data stream ID
103 Box<dyn ChunkList>,
104 )>,
105
106 // currently active 'backups' (handlers for a specific BackupDir)
107 backups: HashMap<BackupId, Box<dyn HandleBackup + Send>>,
108
109 // currently active backup *file* streams
110 backup_files: HashMap<u8, Box<dyn BackupFile + Send>>,
111}
112
113impl<S, H> Connection<S, H>
114where
115 S: Read + Write,
116 H: HandleClient,
117{
118 pub fn new(socket: S, handler: H) -> Result<Self> {
119 let mut me = Self {
120 handler,
121 common: common::Connection::new(socket),
122 hash_list: None,
123 backups: HashMap::new(),
124 backup_files: HashMap::new(),
125 };
126
127 me.send_hello()?;
128 Ok(me)
129 }
130
131 fn send_hello(&mut self) -> Result<()> {
132 let mut packet = Packet::builder(0, PacketType::Hello);
133 packet.write_data(server::Hello {
134 magic: server::HELLO_MAGIC,
135 version: server::PROTOCOL_VERSION,
136 });
137 self.common.queue_data(packet.finish())?;
138 Ok(())
139 }
140
141 pub fn eof(&self) -> bool {
142 self.common.eof
143 }
144
145 /// It is safe to clear the error after an `io::ErrorKind::Interrupted`.
146 pub fn clear_err(&mut self) {
147 self.common.clear_err()
148 }
149
150 pub fn main(&mut self) -> Result<()> {
151 self.poll_read()?;
152 self.poll_send()?;
153 Ok(())
154 }
155
156 // If this returns an error it is considered fatal and the connection should be dropped!
157 fn poll_read(&mut self) -> Result<()> {
158 if self.common.eof {
159 // polls after EOF are errors:
160 bail!("client disconnected");
161 }
162
163 if !self.common.poll_read()? {
164 // No data available
165 if self.common.eof {
166 bail!("client disconnected");
167 }
168 return Ok(());
169 }
170
171 // we received a packet, handle it:
172
173 loop {
174 use PacketType::*;
175 match self.common.current_packet_type {
176 GetHashList => self.hash_list_requested()?,
177 UploadChunk => self.receive_chunk()?,
178 CreateBackup => self.create_backup()?,
179 BackupDataDynamic => self.backup_data_dynamic()?,
180 BackupDataFixed => self.backup_data_fixed()?,
181 BackupFinished => self.backup_finished()?,
182 _ => bail!(
183 "client sent an unexpected packet of type {}",
184 self.common.current_packet_type as u32,
185 ),
186 };
e4027693
WB
187 self.common.next()?;
188 if !self.common.poll_read()? {
ac4e349b
WB
189 break;
190 }
191 }
192
193 Ok(())
194 }
195
196 fn poll_send(&mut self) -> Result<()> {
197 if self.common.error {
198 eprintln!("refusing to send datato client in error state");
199 bail!("client is in error state");
200 }
201
202 if let Some(false) = self.common.poll_send()? {
203 // send queue is not finished, don't add anything else...
204 return Ok(());
205 }
206
207 // Queue has either finished or was empty, see if we should enqueue more data:
208 if self.hash_list.is_some() {
209 return self.send_hash_list();
210 }
211 Ok(())
212 }
213
214 fn hash_list_requested(&mut self) -> Result<()> {
215 // Verify protocol: GetHashList is an empty packet.
216 let request = self.common.read_unaligned_data::<client::GetHashList>(0)?;
217 self.common
218 .assert_size(mem::size_of_val(&request) + request.name_length as usize)?;
219 let name_bytes = &self.common.packet_data()[mem::size_of_val(&request)..];
220 let name = std::str::from_utf8(name_bytes)?;
221
222 // We support only one active hash list stream:
223 if self.hash_list.is_some() {
224 return self.respond_error(ErrorId::Busy);
225 }
226
227 self.hash_list = Some((
228 self.common.current_packet.id,
229 self.handler.get_chunk_list(name)?,
230 ));
231
232 Ok(())
233 }
234
235 fn send_hash_list(&mut self) -> Result<()> {
236 loop {
237 let (stream_id, hash_iter) = self.hash_list.as_mut().unwrap();
238
239 let max_chunks_per_packet = (MAX_PACKET_SIZE as usize - mem::size_of::<Packet>())
240 / mem::size_of::<FixedChunk>();
241
242 let mut packet = Packet::builder(*stream_id, PacketType::HashListPart);
243 packet.reserve(mem::size_of::<FixedChunk>() * max_chunks_per_packet);
244
245 let mut count = 0;
246 for _ in 0..max_chunks_per_packet {
247 let entry: &[u8; 32] = match hash_iter.next() {
248 Ok(Some(entry)) => entry,
249 Ok(None) => break,
250 Err(e) => {
251 eprintln!("error sending chunk list to client: {}", e);
252 continue;
253 }
254 };
255
256 packet.write_buf(entry);
257 count += 1;
258 }
259
260 let can_send_more = self.common.queue_data(packet.finish())?;
261
262 if count == 0 {
263 // We just sent the EOF packet, clear our iterator state!
264 self.hash_list = None;
265 break;
266 }
267
268 if !can_send_more {
269 break;
270 }
271 }
272 Ok(())
273 }
274
275 fn respond_error(&mut self, kind: ErrorId) -> Result<()> {
276 self.respond_value(PacketType::Error, kind)?;
277 Ok(())
278 }
279
280 fn respond_value<T: Endian>(&mut self, pkttype: PacketType, data: T) -> Result<()> {
281 let mut packet = Packet::builder(self.common.current_packet.id, pkttype);
282 packet.write_data(data);
283 self.common.queue_data(packet.finish())?;
284 Ok(())
285 }
286
287 fn respond_empty(&mut self, pkttype: PacketType) -> Result<()> {
288 self.common
289 .queue_data(Packet::simple(self.common.current_packet.id, pkttype))?;
290 Ok(())
291 }
292
293 fn respond_ok(&mut self) -> Result<()> {
294 self.respond_empty(PacketType::Ok)
295 }
296
297 fn receive_chunk(&mut self) -> Result<()> {
298 self.common
299 .assert_atleast(mem::size_of::<client::UploadChunk>())?;
300 let data = self.common.packet_data();
301 let (hash, data) = data.split_at(mem::size_of::<FixedChunk>());
302 if data.len() == 0 {
303 bail!("received an empty chunk");
304 }
305 let entry = ChunkEntry::from_data(data);
306 if entry.hash != hash {
307 let cli_hash = crate::tools::digest_to_hex(hash);
308 let data_hash = entry.digest_to_hex();
309 bail!(
310 "client claimed data with digest {} has digest {}",
311 data_hash,
312 cli_hash
313 );
314 }
315 let _new = self.handler.upload_chunk(&entry, data)?;
316 self.respond_ok()
317 }
318
319 fn create_backup(&mut self) -> Result<()> {
320 if self
321 .backup_files
322 .contains_key(&self.common.current_packet.id)
323 {
324 bail!("stream id already in use...");
325 }
326
327 let create_msg = self.common.read_unaligned_data::<client::CreateBackup>(0)?;
328
329 // simple data:
330 let flags = create_msg.flags;
331 let backup_type = create_msg.backup_type;
332 let time = create_msg.timestamp as i64;
333
334 // text comes from the payload data after the CreateBackup struct:
335 let data = self.common.packet_data();
336 let payload = &data[mem::size_of_val(&create_msg)..];
337
338 // there must be exactly the ID and the file name in the payload:
339 let id_len = create_msg.id_length as usize;
340 let name_len = create_msg.name_length as usize;
341 let expected_len = id_len + name_len;
342 if payload.len() < expected_len {
343 bail!("client sent incomplete CreateBackup request");
344 } else if payload.len() > expected_len {
345 bail!("client sent excess data in CreateBackup request");
346 }
347
348 // id and file name must be utf8:
349 let id = std::str::from_utf8(&payload[0..id_len])
350 .map_err(|e| format_err!("client-requested backup id is invalid: {}", e))?;
351 let file_name = std::str::from_utf8(&payload[id_len..])
352 .map_err(|e| format_err!("client-requested backup file name invalid: {}", e))?;
353
354 // Sanity check dynamic vs fixed:
355 let is_dynamic = (flags & backup_flags::DYNAMIC_CHUNKS) != 0;
356 let file_size = match (is_dynamic, create_msg.file_size) {
357 (false, size) => Some(size),
358 (true, 0) => None,
359 (true, _) => bail!("file size of dynamic streams must be zero"),
360 };
361
362 // search or create the handler:
363 let hashmap_id = BackupId(backup_type, id.to_string(), time);
364 let handle = match self.backups.entry(hashmap_id) {
365 hash_map::Entry::Vacant(entry) => entry.insert(self.handler.create_backup(
366 backup_type::id_to_name(backup_type)?,
367 id,
368 time,
369 (flags & backup_flags::EXCL) != 0,
370 )?),
371 hash_map::Entry::Occupied(entry) => entry.into_mut(),
372 };
373 let file = handle.create_file(file_name, file_size, create_msg.chunk_size as usize)?;
374
375 let mut response =
376 Packet::builder(self.common.current_packet.id, PacketType::BackupCreated);
377 let path = file.relative_path();
378 if path.len() > 0xffff {
379 bail!("path too long");
380 }
381 response
382 .write_data(server::BackupCreated {
383 path_length: path.len() as _,
384 })
385 .write_buf(path.as_bytes());
386 self.common.queue_data(response.finish())?;
387
388 self.backup_files
389 .insert(self.common.current_packet.id, file);
390
391 Ok(())
392 }
393
394 fn backup_data_dynamic(&mut self) -> Result<()> {
395 let stream_id = self.common.current_packet.id;
396 let file = self
397 .backup_files
398 .get_mut(&stream_id)
399 .ok_or_else(|| format_err!("BackupDataDynamic for invalid stream id {}", stream_id))?;
400
401 let mut data = self.common.packet_data();
402 // Data consists of (offset: u64, hash: [u8; 32])
403 let entry_len = mem::size_of::<DynamicChunk>();
404
405 while data.len() >= entry_len {
406 let mut entry = unsafe { ptr::read_unaligned(data.as_ptr() as *const DynamicChunk) };
407 data = &data[entry_len..];
408
409 entry.offset = entry.offset.from_le();
410 file.add_dynamic_data(&entry)?;
411 }
412
413 if data.len() != 0 {
414 bail!(
415 "client sent excess data ({} bytes) after dynamic chunk indices!",
416 data.len()
417 );
418 }
419
420 Ok(())
421 }
422
423 fn backup_data_fixed(&mut self) -> Result<()> {
424 let stream_id = self.common.current_packet.id;
425 let file = self
426 .backup_files
427 .get_mut(&stream_id)
428 .ok_or_else(|| format_err!("BackupDataFixed for invalid stream id {}", stream_id))?;
429
430 let mut data = self.common.packet_data();
431 // Data consists of (index: u64, hash: [u8; 32])
432 #[repr(C, packed)]
433 struct IndexedChunk {
434 index: u64,
435 digest: FixedChunk,
436 }
437 let entry_len = mem::size_of::<IndexedChunk>();
438
439 while data.len() >= entry_len {
440 let mut entry = unsafe { ptr::read_unaligned(data.as_ptr() as *const IndexedChunk) };
441 data = &data[entry_len..];
442
443 entry.index = entry.index.from_le();
444 file.add_fixed_data(entry.index, &entry.digest)?;
445 }
446
447 if data.len() != 0 {
448 bail!(
449 "client sent excess data ({} bytes) after dynamic chunk indices!",
450 data.len()
451 );
452 }
453
454 Ok(())
455 }
456
457 fn backup_finished(&mut self) -> Result<()> {
458 let stream_id = self.common.current_packet.id;
459 let mut file = self
460 .backup_files
461 .remove(&stream_id)
462 .ok_or_else(|| format_err!("BackupDataDynamic for invalid stream id {}", stream_id))?;
463 file.finish()?;
464 self.respond_ok()
465 }
466}