]>
Commit | Line | Data |
---|---|---|
ac4e349b WB |
1 | use std::collections::hash_map::{self, HashMap}; |
2 | use std::io::{Read, Write}; | |
3 | use std::{mem, ptr}; | |
4 | ||
5 | use failure::*; | |
6 | ||
7 | use endian_trait::Endian; | |
8 | ||
9 | use crate::common; | |
10 | use crate::protocol::*; | |
11 | use crate::ChunkEntry; | |
12 | use crate::FixedChunk; | |
13 | ||
14 | type Result<T> = std::result::Result<T, Error>; | |
15 | ||
16 | pub trait ChunkList: Send { | |
17 | fn next(&mut self) -> Result<Option<&[u8; 32]>>; | |
18 | } | |
19 | ||
20 | /// This provides callbacks used by a `Connection` when it receives a packet. | |
21 | pub trait HandleClient { | |
22 | /// The protocol handler will call this when the client produces an irrecoverable error. | |
23 | fn error(&self); | |
24 | ||
25 | /// The client wants the list of hashes, the provider should provide an iterator over chunk | |
26 | /// entries. | |
27 | fn get_chunk_list(&self, backup_name: &str) -> Result<Box<dyn ChunkList>>; | |
28 | ||
29 | /// The client has uploaded a chunk, we should add it to the chunk store. Return whether the | |
30 | /// chunk was new. | |
31 | fn upload_chunk(&self, chunk: &ChunkEntry, data: &[u8]) -> Result<bool>; | |
32 | ||
33 | /// The client wants to create a backup. Since multiple backup streams can happen in parallel, | |
34 | /// this should return a handler used to create the individual streams. | |
35 | /// The handler will be informed about success via the ``finish()`` method. | |
36 | fn create_backup( | |
37 | &self, | |
38 | backup_type: &str, | |
39 | id: &str, | |
40 | timestamp: i64, | |
41 | new: bool, | |
42 | ) -> Result<Box<dyn HandleBackup + Send>>; | |
43 | } | |
44 | ||
45 | /// A single backup may contain multiple files. Currently we represent this via a hierarchy where | |
46 | /// the `HandleBackup` trait is instantiated for each backup, which is responsible for | |
47 | /// instantiating the `BackupFile` trait objects. | |
48 | pub trait HandleBackup { | |
49 | /// All individual streams for this backup have been successfully finished. | |
50 | fn finish(&mut self) -> Result<()>; | |
51 | ||
52 | /// Create a specific file in this backup. | |
53 | fn create_file( | |
54 | &self, | |
55 | name: &str, | |
56 | fixed_size: Option<u64>, | |
57 | chunk_size: usize, | |
58 | ) -> Result<Box<dyn BackupFile + Send>>; | |
59 | } | |
60 | ||
61 | /// This handles backup files created by calling `create_file` on a `Backup`. | |
62 | pub trait BackupFile { | |
63 | /// Backup use the server-local timestamp formatting, so we want to be able to tell the client | |
64 | /// the real remote path: | |
65 | fn relative_path(&self) -> &str; | |
66 | ||
67 | /// The client wants to add a chunk to a fixed index file at a certain position. | |
68 | fn add_fixed_data(&mut self, index: u64, chunk: &FixedChunk) -> Result<()>; | |
69 | ||
70 | /// The client wants to add a chunks to a dynamic index file. | |
71 | fn add_dynamic_data(&mut self, chunk: &DynamicChunk) -> Result<()>; | |
72 | ||
73 | /// This notifies the handler that the backup has finished successfully. This should commit the | |
74 | /// data to the store for good. After this the client will receive an "ok". | |
75 | /// | |
76 | /// If the Drop handler gets called before this method, the backup was aborted due to an error | |
77 | /// or the client disconnected unexpectedly, in which case cleanup of temporary files should be | |
78 | /// performed. | |
79 | fn finish(&mut self) -> Result<()>; | |
80 | } | |
81 | ||
82 | #[derive(Clone, Eq, Hash, PartialEq)] | |
83 | struct BackupId(backup_type::Type, String, i64); | |
84 | ||
85 | /// Associates a socket with the server side of the backup protocol. | |
86 | /// The communcation channel should be `Read + Write` and may be non-blocking (provided it | |
87 | /// correctly returns `io::ErrorKind::WouldBlock`). | |
88 | /// The handler has to implement the `HandleClient` trait to provide callbacks used while | |
89 | /// communicating with the client. | |
90 | pub struct Connection<S, H> | |
91 | where | |
92 | S: Read + Write, | |
93 | H: HandleClient, | |
94 | { | |
95 | handler: H, | |
96 | common: common::Connection<S>, | |
97 | ||
98 | // states: | |
99 | ||
100 | // If this is set we are currently transferring our hash list to the client: | |
101 | hash_list: Option<( | |
102 | u8, // data stream ID | |
103 | Box<dyn ChunkList>, | |
104 | )>, | |
105 | ||
106 | // currently active 'backups' (handlers for a specific BackupDir) | |
107 | backups: HashMap<BackupId, Box<dyn HandleBackup + Send>>, | |
108 | ||
109 | // currently active backup *file* streams | |
110 | backup_files: HashMap<u8, Box<dyn BackupFile + Send>>, | |
111 | } | |
112 | ||
113 | impl<S, H> Connection<S, H> | |
114 | where | |
115 | S: Read + Write, | |
116 | H: HandleClient, | |
117 | { | |
118 | pub fn new(socket: S, handler: H) -> Result<Self> { | |
119 | let mut me = Self { | |
120 | handler, | |
121 | common: common::Connection::new(socket), | |
122 | hash_list: None, | |
123 | backups: HashMap::new(), | |
124 | backup_files: HashMap::new(), | |
125 | }; | |
126 | ||
127 | me.send_hello()?; | |
128 | Ok(me) | |
129 | } | |
130 | ||
131 | fn send_hello(&mut self) -> Result<()> { | |
132 | let mut packet = Packet::builder(0, PacketType::Hello); | |
133 | packet.write_data(server::Hello { | |
134 | magic: server::HELLO_MAGIC, | |
135 | version: server::PROTOCOL_VERSION, | |
136 | }); | |
137 | self.common.queue_data(packet.finish())?; | |
138 | Ok(()) | |
139 | } | |
140 | ||
141 | pub fn eof(&self) -> bool { | |
142 | self.common.eof | |
143 | } | |
144 | ||
145 | /// It is safe to clear the error after an `io::ErrorKind::Interrupted`. | |
146 | pub fn clear_err(&mut self) { | |
147 | self.common.clear_err() | |
148 | } | |
149 | ||
150 | pub fn main(&mut self) -> Result<()> { | |
151 | self.poll_read()?; | |
152 | self.poll_send()?; | |
153 | Ok(()) | |
154 | } | |
155 | ||
156 | // If this returns an error it is considered fatal and the connection should be dropped! | |
157 | fn poll_read(&mut self) -> Result<()> { | |
158 | if self.common.eof { | |
159 | // polls after EOF are errors: | |
160 | bail!("client disconnected"); | |
161 | } | |
162 | ||
163 | if !self.common.poll_read()? { | |
164 | // No data available | |
165 | if self.common.eof { | |
166 | bail!("client disconnected"); | |
167 | } | |
168 | return Ok(()); | |
169 | } | |
170 | ||
171 | // we received a packet, handle it: | |
172 | ||
173 | loop { | |
174 | use PacketType::*; | |
175 | match self.common.current_packet_type { | |
176 | GetHashList => self.hash_list_requested()?, | |
177 | UploadChunk => self.receive_chunk()?, | |
178 | CreateBackup => self.create_backup()?, | |
179 | BackupDataDynamic => self.backup_data_dynamic()?, | |
180 | BackupDataFixed => self.backup_data_fixed()?, | |
181 | BackupFinished => self.backup_finished()?, | |
182 | _ => bail!( | |
183 | "client sent an unexpected packet of type {}", | |
184 | self.common.current_packet_type as u32, | |
185 | ), | |
186 | }; | |
e4027693 WB |
187 | self.common.next()?; |
188 | if !self.common.poll_read()? { | |
ac4e349b WB |
189 | break; |
190 | } | |
191 | } | |
192 | ||
193 | Ok(()) | |
194 | } | |
195 | ||
196 | fn poll_send(&mut self) -> Result<()> { | |
197 | if self.common.error { | |
198 | eprintln!("refusing to send datato client in error state"); | |
199 | bail!("client is in error state"); | |
200 | } | |
201 | ||
202 | if let Some(false) = self.common.poll_send()? { | |
203 | // send queue is not finished, don't add anything else... | |
204 | return Ok(()); | |
205 | } | |
206 | ||
207 | // Queue has either finished or was empty, see if we should enqueue more data: | |
208 | if self.hash_list.is_some() { | |
209 | return self.send_hash_list(); | |
210 | } | |
211 | Ok(()) | |
212 | } | |
213 | ||
214 | fn hash_list_requested(&mut self) -> Result<()> { | |
215 | // Verify protocol: GetHashList is an empty packet. | |
216 | let request = self.common.read_unaligned_data::<client::GetHashList>(0)?; | |
217 | self.common | |
218 | .assert_size(mem::size_of_val(&request) + request.name_length as usize)?; | |
219 | let name_bytes = &self.common.packet_data()[mem::size_of_val(&request)..]; | |
220 | let name = std::str::from_utf8(name_bytes)?; | |
221 | ||
222 | // We support only one active hash list stream: | |
223 | if self.hash_list.is_some() { | |
224 | return self.respond_error(ErrorId::Busy); | |
225 | } | |
226 | ||
227 | self.hash_list = Some(( | |
228 | self.common.current_packet.id, | |
229 | self.handler.get_chunk_list(name)?, | |
230 | )); | |
231 | ||
232 | Ok(()) | |
233 | } | |
234 | ||
235 | fn send_hash_list(&mut self) -> Result<()> { | |
236 | loop { | |
237 | let (stream_id, hash_iter) = self.hash_list.as_mut().unwrap(); | |
238 | ||
239 | let max_chunks_per_packet = (MAX_PACKET_SIZE as usize - mem::size_of::<Packet>()) | |
240 | / mem::size_of::<FixedChunk>(); | |
241 | ||
242 | let mut packet = Packet::builder(*stream_id, PacketType::HashListPart); | |
243 | packet.reserve(mem::size_of::<FixedChunk>() * max_chunks_per_packet); | |
244 | ||
245 | let mut count = 0; | |
246 | for _ in 0..max_chunks_per_packet { | |
247 | let entry: &[u8; 32] = match hash_iter.next() { | |
248 | Ok(Some(entry)) => entry, | |
249 | Ok(None) => break, | |
250 | Err(e) => { | |
251 | eprintln!("error sending chunk list to client: {}", e); | |
252 | continue; | |
253 | } | |
254 | }; | |
255 | ||
256 | packet.write_buf(entry); | |
257 | count += 1; | |
258 | } | |
259 | ||
260 | let can_send_more = self.common.queue_data(packet.finish())?; | |
261 | ||
262 | if count == 0 { | |
263 | // We just sent the EOF packet, clear our iterator state! | |
264 | self.hash_list = None; | |
265 | break; | |
266 | } | |
267 | ||
268 | if !can_send_more { | |
269 | break; | |
270 | } | |
271 | } | |
272 | Ok(()) | |
273 | } | |
274 | ||
275 | fn respond_error(&mut self, kind: ErrorId) -> Result<()> { | |
276 | self.respond_value(PacketType::Error, kind)?; | |
277 | Ok(()) | |
278 | } | |
279 | ||
280 | fn respond_value<T: Endian>(&mut self, pkttype: PacketType, data: T) -> Result<()> { | |
281 | let mut packet = Packet::builder(self.common.current_packet.id, pkttype); | |
282 | packet.write_data(data); | |
283 | self.common.queue_data(packet.finish())?; | |
284 | Ok(()) | |
285 | } | |
286 | ||
287 | fn respond_empty(&mut self, pkttype: PacketType) -> Result<()> { | |
288 | self.common | |
289 | .queue_data(Packet::simple(self.common.current_packet.id, pkttype))?; | |
290 | Ok(()) | |
291 | } | |
292 | ||
293 | fn respond_ok(&mut self) -> Result<()> { | |
294 | self.respond_empty(PacketType::Ok) | |
295 | } | |
296 | ||
297 | fn receive_chunk(&mut self) -> Result<()> { | |
298 | self.common | |
299 | .assert_atleast(mem::size_of::<client::UploadChunk>())?; | |
300 | let data = self.common.packet_data(); | |
301 | let (hash, data) = data.split_at(mem::size_of::<FixedChunk>()); | |
302 | if data.len() == 0 { | |
303 | bail!("received an empty chunk"); | |
304 | } | |
305 | let entry = ChunkEntry::from_data(data); | |
306 | if entry.hash != hash { | |
307 | let cli_hash = crate::tools::digest_to_hex(hash); | |
308 | let data_hash = entry.digest_to_hex(); | |
309 | bail!( | |
310 | "client claimed data with digest {} has digest {}", | |
311 | data_hash, | |
312 | cli_hash | |
313 | ); | |
314 | } | |
315 | let _new = self.handler.upload_chunk(&entry, data)?; | |
316 | self.respond_ok() | |
317 | } | |
318 | ||
319 | fn create_backup(&mut self) -> Result<()> { | |
320 | if self | |
321 | .backup_files | |
322 | .contains_key(&self.common.current_packet.id) | |
323 | { | |
324 | bail!("stream id already in use..."); | |
325 | } | |
326 | ||
327 | let create_msg = self.common.read_unaligned_data::<client::CreateBackup>(0)?; | |
328 | ||
329 | // simple data: | |
330 | let flags = create_msg.flags; | |
331 | let backup_type = create_msg.backup_type; | |
332 | let time = create_msg.timestamp as i64; | |
333 | ||
334 | // text comes from the payload data after the CreateBackup struct: | |
335 | let data = self.common.packet_data(); | |
336 | let payload = &data[mem::size_of_val(&create_msg)..]; | |
337 | ||
338 | // there must be exactly the ID and the file name in the payload: | |
339 | let id_len = create_msg.id_length as usize; | |
340 | let name_len = create_msg.name_length as usize; | |
341 | let expected_len = id_len + name_len; | |
342 | if payload.len() < expected_len { | |
343 | bail!("client sent incomplete CreateBackup request"); | |
344 | } else if payload.len() > expected_len { | |
345 | bail!("client sent excess data in CreateBackup request"); | |
346 | } | |
347 | ||
348 | // id and file name must be utf8: | |
349 | let id = std::str::from_utf8(&payload[0..id_len]) | |
350 | .map_err(|e| format_err!("client-requested backup id is invalid: {}", e))?; | |
351 | let file_name = std::str::from_utf8(&payload[id_len..]) | |
352 | .map_err(|e| format_err!("client-requested backup file name invalid: {}", e))?; | |
353 | ||
354 | // Sanity check dynamic vs fixed: | |
355 | let is_dynamic = (flags & backup_flags::DYNAMIC_CHUNKS) != 0; | |
356 | let file_size = match (is_dynamic, create_msg.file_size) { | |
357 | (false, size) => Some(size), | |
358 | (true, 0) => None, | |
359 | (true, _) => bail!("file size of dynamic streams must be zero"), | |
360 | }; | |
361 | ||
362 | // search or create the handler: | |
363 | let hashmap_id = BackupId(backup_type, id.to_string(), time); | |
364 | let handle = match self.backups.entry(hashmap_id) { | |
365 | hash_map::Entry::Vacant(entry) => entry.insert(self.handler.create_backup( | |
366 | backup_type::id_to_name(backup_type)?, | |
367 | id, | |
368 | time, | |
369 | (flags & backup_flags::EXCL) != 0, | |
370 | )?), | |
371 | hash_map::Entry::Occupied(entry) => entry.into_mut(), | |
372 | }; | |
373 | let file = handle.create_file(file_name, file_size, create_msg.chunk_size as usize)?; | |
374 | ||
375 | let mut response = | |
376 | Packet::builder(self.common.current_packet.id, PacketType::BackupCreated); | |
377 | let path = file.relative_path(); | |
378 | if path.len() > 0xffff { | |
379 | bail!("path too long"); | |
380 | } | |
381 | response | |
382 | .write_data(server::BackupCreated { | |
383 | path_length: path.len() as _, | |
384 | }) | |
385 | .write_buf(path.as_bytes()); | |
386 | self.common.queue_data(response.finish())?; | |
387 | ||
388 | self.backup_files | |
389 | .insert(self.common.current_packet.id, file); | |
390 | ||
391 | Ok(()) | |
392 | } | |
393 | ||
394 | fn backup_data_dynamic(&mut self) -> Result<()> { | |
395 | let stream_id = self.common.current_packet.id; | |
396 | let file = self | |
397 | .backup_files | |
398 | .get_mut(&stream_id) | |
399 | .ok_or_else(|| format_err!("BackupDataDynamic for invalid stream id {}", stream_id))?; | |
400 | ||
401 | let mut data = self.common.packet_data(); | |
402 | // Data consists of (offset: u64, hash: [u8; 32]) | |
403 | let entry_len = mem::size_of::<DynamicChunk>(); | |
404 | ||
405 | while data.len() >= entry_len { | |
406 | let mut entry = unsafe { ptr::read_unaligned(data.as_ptr() as *const DynamicChunk) }; | |
407 | data = &data[entry_len..]; | |
408 | ||
409 | entry.offset = entry.offset.from_le(); | |
410 | file.add_dynamic_data(&entry)?; | |
411 | } | |
412 | ||
413 | if data.len() != 0 { | |
414 | bail!( | |
415 | "client sent excess data ({} bytes) after dynamic chunk indices!", | |
416 | data.len() | |
417 | ); | |
418 | } | |
419 | ||
420 | Ok(()) | |
421 | } | |
422 | ||
423 | fn backup_data_fixed(&mut self) -> Result<()> { | |
424 | let stream_id = self.common.current_packet.id; | |
425 | let file = self | |
426 | .backup_files | |
427 | .get_mut(&stream_id) | |
428 | .ok_or_else(|| format_err!("BackupDataFixed for invalid stream id {}", stream_id))?; | |
429 | ||
430 | let mut data = self.common.packet_data(); | |
431 | // Data consists of (index: u64, hash: [u8; 32]) | |
432 | #[repr(C, packed)] | |
433 | struct IndexedChunk { | |
434 | index: u64, | |
435 | digest: FixedChunk, | |
436 | } | |
437 | let entry_len = mem::size_of::<IndexedChunk>(); | |
438 | ||
439 | while data.len() >= entry_len { | |
440 | let mut entry = unsafe { ptr::read_unaligned(data.as_ptr() as *const IndexedChunk) }; | |
441 | data = &data[entry_len..]; | |
442 | ||
443 | entry.index = entry.index.from_le(); | |
444 | file.add_fixed_data(entry.index, &entry.digest)?; | |
445 | } | |
446 | ||
447 | if data.len() != 0 { | |
448 | bail!( | |
449 | "client sent excess data ({} bytes) after dynamic chunk indices!", | |
450 | data.len() | |
451 | ); | |
452 | } | |
453 | ||
454 | Ok(()) | |
455 | } | |
456 | ||
457 | fn backup_finished(&mut self) -> Result<()> { | |
458 | let stream_id = self.common.current_packet.id; | |
459 | let mut file = self | |
460 | .backup_files | |
461 | .remove(&stream_id) | |
462 | .ok_or_else(|| format_err!("BackupDataDynamic for invalid stream id {}", stream_id))?; | |
463 | file.finish()?; | |
464 | self.respond_ok() | |
465 | } | |
466 | } |