]>
Commit | Line | Data |
---|---|---|
48663c56 XL |
1 | use crate::serialization::{Addr, SerializationSink}; |
2 | use std::error::Error; | |
3 | use std::fs; | |
60c5eb7d | 4 | use std::io::{Write}; |
48663c56 | 5 | use std::path::Path; |
60c5eb7d | 6 | use parking_lot::Mutex; |
48663c56 XL |
7 | |
8 | pub struct FileSerializationSink { | |
60c5eb7d XL |
9 | data: Mutex<Inner>, |
10 | } | |
11 | ||
12 | struct Inner { | |
13 | file: fs::File, | |
14 | buffer: Vec<u8>, | |
15 | buf_pos: usize, | |
16 | addr: u32, | |
48663c56 XL |
17 | } |
18 | ||
19 | impl SerializationSink for FileSerializationSink { | |
20 | fn from_path(path: &Path) -> Result<Self, Box<dyn Error>> { | |
21 | fs::create_dir_all(path.parent().unwrap())?; | |
22 | ||
23 | let file = fs::File::create(path)?; | |
24 | ||
25 | Ok(FileSerializationSink { | |
60c5eb7d XL |
26 | data: Mutex::new(Inner { |
27 | file, | |
28 | buffer: vec![0; 1024*512], | |
29 | buf_pos: 0, | |
30 | addr: 0 | |
31 | }), | |
48663c56 XL |
32 | }) |
33 | } | |
34 | ||
35 | #[inline] | |
36 | fn write_atomic<W>(&self, num_bytes: usize, write: W) -> Addr | |
37 | where | |
38 | W: FnOnce(&mut [u8]), | |
39 | { | |
60c5eb7d XL |
40 | let mut data = self.data.lock(); |
41 | let Inner { | |
42 | ref mut file, | |
43 | ref mut buffer, | |
44 | ref mut buf_pos, | |
45 | ref mut addr | |
46 | } = *data; | |
48663c56 | 47 | |
60c5eb7d XL |
48 | let curr_addr = *addr; |
49 | *addr += num_bytes as u32; | |
48663c56 | 50 | |
60c5eb7d XL |
51 | let buf_start = *buf_pos; |
52 | let buf_end = buf_start + num_bytes; | |
48663c56 | 53 | |
60c5eb7d XL |
54 | if buf_end <= buffer.len() { |
55 | // We have enough space in the buffer, just write the data to it. | |
56 | write(&mut buffer[buf_start .. buf_end]); | |
57 | *buf_pos = buf_end; | |
58 | } else { | |
59 | // We don't have enough space in the buffer, so flush to disk | |
60 | file.write_all(&buffer[..buf_start]).unwrap(); | |
61 | ||
62 | if num_bytes <= buffer.len() { | |
63 | // There's enough space in the buffer, after flushing | |
64 | write(&mut buffer[0 .. num_bytes]); | |
65 | *buf_pos = num_bytes; | |
66 | } else { | |
67 | // Even after flushing the buffer there isn't enough space, so | |
68 | // fall back to dynamic allocation | |
69 | let mut temp_buffer = vec![0; num_bytes]; | |
70 | write(&mut temp_buffer[..]); | |
71 | file.write_all(&temp_buffer[..]).unwrap(); | |
72 | *buf_pos = 0; | |
73 | } | |
74 | } | |
48663c56 XL |
75 | |
76 | Addr(curr_addr) | |
77 | } | |
78 | } | |
60c5eb7d XL |
79 | |
80 | impl Drop for FileSerializationSink { | |
81 | fn drop(&mut self) { | |
82 | let mut data = self.data.lock(); | |
83 | let Inner { | |
84 | ref mut file, | |
85 | ref mut buffer, | |
86 | ref mut buf_pos, | |
87 | addr: _, | |
88 | } = *data; | |
89 | ||
90 | if *buf_pos > 0 { | |
91 | file.write_all(&buffer[..*buf_pos]).unwrap(); | |
92 | } | |
93 | } | |
94 | } |