]>
Commit | Line | Data |
---|---|---|
48663c56 | 1 | use crate::serialization::{Addr, SerializationSink}; |
dfeec247 | 2 | use parking_lot::Mutex; |
48663c56 XL |
3 | use std::error::Error; |
4 | use std::fs; | |
dfeec247 | 5 | use std::io::Write; |
48663c56 | 6 | use std::path::Path; |
48663c56 XL |
7 | |
8 | pub struct FileSerializationSink { | |
60c5eb7d XL |
9 | data: Mutex<Inner>, |
10 | } | |
11 | ||
12 | struct Inner { | |
13 | file: fs::File, | |
14 | buffer: Vec<u8>, | |
15 | buf_pos: usize, | |
16 | addr: u32, | |
48663c56 XL |
17 | } |
18 | ||
19 | impl SerializationSink for FileSerializationSink { | |
20 | fn from_path(path: &Path) -> Result<Self, Box<dyn Error>> { | |
21 | fs::create_dir_all(path.parent().unwrap())?; | |
22 | ||
23 | let file = fs::File::create(path)?; | |
24 | ||
25 | Ok(FileSerializationSink { | |
60c5eb7d XL |
26 | data: Mutex::new(Inner { |
27 | file, | |
dfeec247 | 28 | buffer: vec![0; 1024 * 512], |
60c5eb7d | 29 | buf_pos: 0, |
dfeec247 | 30 | addr: 0, |
60c5eb7d | 31 | }), |
48663c56 XL |
32 | }) |
33 | } | |
34 | ||
35 | #[inline] | |
36 | fn write_atomic<W>(&self, num_bytes: usize, write: W) -> Addr | |
37 | where | |
38 | W: FnOnce(&mut [u8]), | |
39 | { | |
60c5eb7d XL |
40 | let mut data = self.data.lock(); |
41 | let Inner { | |
42 | ref mut file, | |
43 | ref mut buffer, | |
44 | ref mut buf_pos, | |
dfeec247 | 45 | ref mut addr, |
60c5eb7d | 46 | } = *data; |
48663c56 | 47 | |
60c5eb7d XL |
48 | let curr_addr = *addr; |
49 | *addr += num_bytes as u32; | |
48663c56 | 50 | |
60c5eb7d XL |
51 | let buf_start = *buf_pos; |
52 | let buf_end = buf_start + num_bytes; | |
48663c56 | 53 | |
60c5eb7d XL |
54 | if buf_end <= buffer.len() { |
55 | // We have enough space in the buffer, just write the data to it. | |
dfeec247 | 56 | write(&mut buffer[buf_start..buf_end]); |
60c5eb7d XL |
57 | *buf_pos = buf_end; |
58 | } else { | |
59 | // We don't have enough space in the buffer, so flush to disk | |
60 | file.write_all(&buffer[..buf_start]).unwrap(); | |
61 | ||
62 | if num_bytes <= buffer.len() { | |
63 | // There's enough space in the buffer, after flushing | |
dfeec247 | 64 | write(&mut buffer[0..num_bytes]); |
60c5eb7d XL |
65 | *buf_pos = num_bytes; |
66 | } else { | |
67 | // Even after flushing the buffer there isn't enough space, so | |
68 | // fall back to dynamic allocation | |
69 | let mut temp_buffer = vec![0; num_bytes]; | |
70 | write(&mut temp_buffer[..]); | |
71 | file.write_all(&temp_buffer[..]).unwrap(); | |
72 | *buf_pos = 0; | |
73 | } | |
74 | } | |
48663c56 XL |
75 | |
76 | Addr(curr_addr) | |
77 | } | |
dfeec247 XL |
78 | |
79 | fn write_bytes_atomic(&self, bytes: &[u8]) -> Addr { | |
80 | if bytes.len() < 128 { | |
81 | // For "small" pieces of data, use the regular implementation so we | |
82 | // don't repeatedly flush an almost empty buffer to disk. | |
83 | return self.write_atomic(bytes.len(), |sink| sink.copy_from_slice(bytes)); | |
84 | } | |
85 | ||
86 | let mut data = self.data.lock(); | |
87 | let Inner { | |
88 | ref mut file, | |
89 | ref mut buffer, | |
90 | ref mut buf_pos, | |
91 | ref mut addr, | |
92 | } = *data; | |
93 | ||
94 | let curr_addr = *addr; | |
95 | *addr += bytes.len() as u32; | |
96 | ||
97 | if *buf_pos > 0 { | |
98 | // There's something in the buffer, flush it to disk | |
99 | file.write_all(&buffer[..*buf_pos]).unwrap(); | |
100 | *buf_pos = 0; | |
101 | } | |
102 | ||
103 | // Now write the whole input to disk, skipping the write buffer | |
104 | file.write_all(bytes).unwrap(); | |
105 | ||
106 | Addr(curr_addr) | |
107 | } | |
48663c56 | 108 | } |
60c5eb7d XL |
109 | |
110 | impl Drop for FileSerializationSink { | |
111 | fn drop(&mut self) { | |
112 | let mut data = self.data.lock(); | |
113 | let Inner { | |
114 | ref mut file, | |
115 | ref mut buffer, | |
116 | ref mut buf_pos, | |
117 | addr: _, | |
118 | } = *data; | |
119 | ||
120 | if *buf_pos > 0 { | |
121 | file.write_all(&buffer[..*buf_pos]).unwrap(); | |
122 | } | |
123 | } | |
124 | } |