]>
Commit | Line | Data |
---|---|---|
72b1a166 FG |
1 | /// This module implements the "container" file format that `measureme` uses for |
2 | /// storing things on disk. The format supports storing three independent | |
3 | /// streams of data: one for events, one for string data, and one for string | |
4 | /// index data (in theory it could support an arbitrary number of separate | |
5 | /// streams but three is all we need). The data of each stream is split into | |
6 | /// "pages", where each page has a small header designating what kind of | |
7 | /// data it is (i.e. event, string data, or string index), and the length of | |
8 | /// the page. | |
9 | /// | |
10 | /// Pages of different kinds can be arbitrarily interleaved. The headers allow | |
11 | /// for reconstructing each of the streams later on. An example file might thus | |
12 | /// look like this: | |
13 | /// | |
14 | /// ```ignore | |
15 | /// | file header | page (events) | page (string data) | page (events) | page (string index) | | |
16 | /// ``` | |
17 | /// | |
18 | /// The exact encoding of a page is: | |
19 | /// | |
20 | /// | byte slice | contents | | |
21 | /// |-------------------------|-----------------------------------------| | |
22 | /// | &[0 .. 1] | page tag | | |
23 | /// | &[1 .. 5] | page size as little endian u32 | | |
24 | /// | &[5 .. (5 + page_size)] | page contents (exactly page_size bytes) | | |
25 | /// | |
26 | /// A page is immediately followed by the next page, without any padding. | |
46de9a89 | 27 | use parking_lot::Mutex; |
72b1a166 FG |
28 | use rustc_hash::FxHashMap; |
29 | use std::cmp::min; | |
30 | use std::convert::TryInto; | |
48663c56 | 31 | use std::error::Error; |
72b1a166 FG |
32 | use std::fmt::Debug; |
33 | use std::fs; | |
34 | use std::io::Write; | |
35 | use std::sync::Arc; | |
48663c56 | 36 | |
72b1a166 FG |
37 | const MAX_PAGE_SIZE: usize = 256 * 1024; |
38 | ||
39 | /// The number of bytes we consider enough to warrant their own page when | |
40 | /// deciding whether to flush a partially full buffer. Actual pages may need | |
41 | /// to be smaller, e.g. when writing the tail of the data stream. | |
42 | const MIN_PAGE_SIZE: usize = MAX_PAGE_SIZE / 2; | |
43 | ||
44 | #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] | |
45 | #[repr(u8)] | |
46 | pub enum PageTag { | |
47 | Events = 0, | |
48 | StringData = 1, | |
49 | StringIndex = 2, | |
50 | } | |
51 | ||
52 | impl std::convert::TryFrom<u8> for PageTag { | |
53 | type Error = String; | |
54 | ||
55 | fn try_from(value: u8) -> Result<Self, Self::Error> { | |
56 | match value { | |
57 | 0 => Ok(PageTag::Events), | |
58 | 1 => Ok(PageTag::StringData), | |
59 | 2 => Ok(PageTag::StringIndex), | |
60 | _ => Err(format!("Could not convert byte `{}` to PageTag.", value)), | |
61 | } | |
62 | } | |
63 | } | |
64 | ||
65 | /// An address within a data stream. Each data stream has its own address space, | |
66 | /// i.e. the first piece of data written to the events stream will have | |
67 | /// `Addr(0)` and the first piece of data written to the string data stream | |
68 | /// will *also* have `Addr(0)`. | |
69 | // | |
70 | // TODO: Evaluate if it makes sense to add a type tag to `Addr` in order to | |
71 | // prevent accidental use of `Addr` values with the wrong address space. | |
48663c56 XL |
72 | #[derive(Clone, Copy, Eq, PartialEq, Debug)] |
73 | pub struct Addr(pub u32); | |
74 | ||
75 | impl Addr { | |
76 | pub fn as_usize(self) -> usize { | |
77 | self.0 as usize | |
78 | } | |
79 | } | |
80 | ||
72b1a166 FG |
81 | #[derive(Debug)] |
82 | pub struct SerializationSink { | |
83 | shared_state: SharedState, | |
84 | data: Mutex<SerializationSinkInner>, | |
85 | page_tag: PageTag, | |
86 | } | |
87 | ||
88 | pub struct SerializationSinkBuilder(SharedState); | |
48663c56 | 89 | |
72b1a166 FG |
90 | impl SerializationSinkBuilder { |
91 | pub fn new_from_file(file: fs::File) -> Result<Self, Box<dyn Error + Send + Sync>> { | |
92 | Ok(Self(SharedState(Arc::new(Mutex::new( | |
93 | BackingStorage::File(file), | |
94 | ))))) | |
95 | } | |
96 | ||
97 | pub fn new_in_memory() -> SerializationSinkBuilder { | |
98 | Self(SharedState(Arc::new(Mutex::new(BackingStorage::Memory( | |
99 | Vec::new(), | |
100 | ))))) | |
101 | } | |
46de9a89 | 102 | |
72b1a166 FG |
103 | pub fn new_sink(&self, page_tag: PageTag) -> SerializationSink { |
104 | SerializationSink { | |
105 | data: Mutex::new(SerializationSinkInner { | |
106 | buffer: Vec::with_capacity(MAX_PAGE_SIZE), | |
107 | addr: 0, | |
108 | }), | |
109 | shared_state: self.0.clone(), | |
110 | page_tag, | |
111 | } | |
46de9a89 | 112 | } |
48663c56 XL |
113 | } |
114 | ||
72b1a166 FG |
115 | /// The `BackingStorage` is what the data gets written to. Usually that is a |
116 | /// file but for testing purposes it can also be an in-memory vec of bytes. | |
117 | #[derive(Debug)] | |
118 | enum BackingStorage { | |
119 | File(fs::File), | |
120 | Memory(Vec<u8>), | |
d9bb1a4e | 121 | } |
48663c56 | 122 | |
72b1a166 FG |
123 | impl Write for BackingStorage { |
124 | #[inline] | |
125 | fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { | |
126 | match *self { | |
127 | BackingStorage::File(ref mut file) => file.write(buf), | |
128 | BackingStorage::Memory(ref mut vec) => vec.write(buf), | |
129 | } | |
130 | } | |
131 | ||
132 | fn flush(&mut self) -> std::io::Result<()> { | |
133 | match *self { | |
134 | BackingStorage::File(ref mut file) => file.flush(), | |
135 | BackingStorage::Memory(_) => { | |
136 | // Nothing to do | |
137 | Ok(()) | |
138 | } | |
48663c56 | 139 | } |
d9bb1a4e | 140 | } |
72b1a166 FG |
141 | } |
142 | ||
143 | /// This struct allows to treat `SerializationSink` as `std::io::Write`. | |
144 | pub struct StdWriteAdapter<'a>(&'a SerializationSink); | |
145 | ||
146 | impl<'a> Write for StdWriteAdapter<'a> { | |
147 | fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { | |
148 | self.0.write_bytes_atomic(buf); | |
149 | Ok(buf.len()) | |
150 | } | |
151 | ||
152 | fn flush(&mut self) -> std::io::Result<()> { | |
153 | let mut data = self.0.data.lock(); | |
154 | let SerializationSinkInner { | |
155 | ref mut buffer, | |
156 | addr: _, | |
157 | } = *data; | |
158 | ||
159 | // First flush the local buffer. | |
160 | self.0.flush(buffer); | |
161 | ||
162 | // Then flush the backing store. | |
163 | self.0.shared_state.0.lock().flush()?; | |
164 | ||
165 | Ok(()) | |
166 | } | |
167 | } | |
168 | ||
169 | #[derive(Debug)] | |
170 | struct SerializationSinkInner { | |
171 | buffer: Vec<u8>, | |
172 | addr: u32, | |
173 | } | |
174 | ||
175 | /// This state is shared between all `SerializationSink`s writing to the same | |
176 | /// backing storage (e.g. the same file). | |
177 | #[derive(Clone, Debug)] | |
178 | struct SharedState(Arc<Mutex<BackingStorage>>); | |
179 | ||
180 | impl SharedState { | |
181 | /// Copies out the contents of all pages with the given tag and | |
182 | /// concatenates them into a single byte vec. This method is only meant to | |
183 | /// be used for testing and will panic if the underlying backing storage is | |
184 | /// a file instead of in memory. | |
185 | fn copy_bytes_with_page_tag(&self, page_tag: PageTag) -> Vec<u8> { | |
186 | let data = self.0.lock(); | |
187 | let data = match *data { | |
188 | BackingStorage::File(_) => panic!(), | |
189 | BackingStorage::Memory(ref data) => data, | |
190 | }; | |
191 | ||
192 | split_streams(data).remove(&page_tag).unwrap_or(Vec::new()) | |
193 | } | |
194 | } | |
195 | ||
196 | /// This function reconstructs the individual data streams from their paged | |
197 | /// version. | |
198 | /// | |
199 | /// For example, if `E` denotes the page header of an events page, `S` denotes | |
200 | /// the header of a string data page, and lower case letters denote page | |
201 | /// contents then a paged stream could look like: | |
202 | /// | |
203 | /// ```ignore | |
204 | /// s = Eabcd_Sopq_Eef_Eghi_Srst | |
205 | /// ``` | |
206 | /// | |
207 | /// and `split_streams` would result in the following set of streams: | |
208 | /// | |
209 | /// ```ignore | |
210 | /// split_streams(s) = { | |
211 | /// events: [abcdefghi], | |
212 | /// string_data: [opqrst], | |
213 | /// } | |
214 | /// ``` | |
215 | pub fn split_streams(paged_data: &[u8]) -> FxHashMap<PageTag, Vec<u8>> { | |
216 | let mut result: FxHashMap<PageTag, Vec<u8>> = FxHashMap::default(); | |
217 | ||
218 | let mut pos = 0; | |
219 | while pos < paged_data.len() { | |
220 | let tag = TryInto::try_into(paged_data[pos]).unwrap(); | |
221 | let page_size = | |
222 | u32::from_le_bytes(paged_data[pos + 1..pos + 5].try_into().unwrap()) as usize; | |
48663c56 | 223 | |
72b1a166 FG |
224 | assert!(page_size > 0); |
225 | ||
226 | result | |
227 | .entry(tag) | |
228 | .or_default() | |
229 | .extend_from_slice(&paged_data[pos + 5..pos + 5 + page_size]); | |
230 | ||
231 | pos += page_size + 5; | |
48663c56 | 232 | } |
72b1a166 FG |
233 | |
234 | result | |
d9bb1a4e | 235 | } |
48663c56 | 236 | |
72b1a166 FG |
237 | impl SerializationSink { |
238 | /// Writes `bytes` as a single page to the shared backing storage. The | |
239 | /// method will first write the page header (consisting of the page tag and | |
240 | /// the number of bytes in the page) and then the page contents | |
241 | /// (i.e. `bytes`). | |
242 | fn write_page(&self, bytes: &[u8]) { | |
243 | if bytes.len() > 0 { | |
244 | // We explicitly don't assert `bytes.len() >= MIN_PAGE_SIZE` because | |
245 | // `MIN_PAGE_SIZE` is just a recommendation and the last page will | |
246 | // often be smaller than that. | |
247 | assert!(bytes.len() <= MAX_PAGE_SIZE); | |
248 | ||
249 | let mut file = self.shared_state.0.lock(); | |
250 | ||
251 | file.write_all(&[self.page_tag as u8]).unwrap(); | |
252 | ||
253 | let page_size: [u8; 4] = (bytes.len() as u32).to_le_bytes(); | |
254 | file.write_all(&page_size).unwrap(); | |
255 | file.write_all(&bytes[..]).unwrap(); | |
256 | } | |
d9bb1a4e | 257 | } |
48663c56 | 258 | |
72b1a166 FG |
259 | /// Flushes `buffer` by writing its contents as a new page to the backing |
260 | /// storage and then clearing it. | |
261 | fn flush(&self, buffer: &mut Vec<u8>) { | |
262 | self.write_page(&buffer[..]); | |
263 | buffer.clear(); | |
264 | } | |
265 | ||
266 | /// Creates a copy of all data written so far. This method is meant to be | |
267 | /// used for writing unit tests. It will panic if the underlying | |
268 | /// `BackingStorage` is a file. | |
269 | pub fn into_bytes(mut self) -> Vec<u8> { | |
270 | // Swap out the contains of `self` with something that can safely be | |
271 | // dropped without side effects. | |
272 | let mut data = Mutex::new(SerializationSinkInner { | |
273 | buffer: Vec::new(), | |
274 | addr: 0, | |
275 | }); | |
276 | std::mem::swap(&mut self.data, &mut data); | |
277 | ||
278 | // Extract the data from the mutex. | |
279 | let SerializationSinkInner { | |
280 | ref mut buffer, | |
281 | addr: _, | |
282 | } = data.into_inner(); | |
283 | ||
284 | // Make sure we write the current contents of the buffer to the | |
285 | // backing storage before proceeding. | |
286 | self.flush(buffer); | |
287 | ||
288 | self.shared_state.copy_bytes_with_page_tag(self.page_tag) | |
289 | } | |
290 | ||
291 | /// Atomically writes `num_bytes` of data to this `SerializationSink`. | |
292 | /// Atomic means the data is guaranteed to be written as a contiguous range | |
293 | /// of bytes. | |
294 | /// | |
295 | /// The buffer provided to the `write` callback is guaranteed to be of size | |
296 | /// `num_bytes` and `write` is supposed to completely fill it with the data | |
297 | /// to be written. | |
298 | /// | |
299 | /// The return value is the address of the data written and can be used to | |
300 | /// refer to the data later on. | |
301 | pub fn write_atomic<W>(&self, num_bytes: usize, write: W) -> Addr | |
d9bb1a4e FG |
302 | where |
303 | W: FnOnce(&mut [u8]), | |
304 | { | |
72b1a166 FG |
305 | if num_bytes > MAX_PAGE_SIZE { |
306 | let mut bytes = vec![0u8; num_bytes]; | |
307 | write(&mut bytes[..]); | |
308 | return self.write_bytes_atomic(&bytes[..]); | |
309 | } | |
310 | ||
311 | let mut data = self.data.lock(); | |
312 | let SerializationSinkInner { | |
313 | ref mut buffer, | |
314 | ref mut addr, | |
315 | } = *data; | |
316 | ||
317 | if buffer.len() + num_bytes > MAX_PAGE_SIZE { | |
318 | self.flush(buffer); | |
319 | assert!(buffer.is_empty()); | |
320 | } | |
321 | ||
322 | let curr_addr = *addr; | |
323 | ||
324 | let buf_start = buffer.len(); | |
325 | let buf_end = buf_start + num_bytes; | |
326 | buffer.resize(buf_end, 0u8); | |
327 | write(&mut buffer[buf_start..buf_end]); | |
328 | ||
329 | *addr += num_bytes as u32; | |
330 | ||
331 | Addr(curr_addr) | |
332 | } | |
333 | ||
334 | /// Atomically writes the data in `bytes` to this `SerializationSink`. | |
335 | /// Atomic means the data is guaranteed to be written as a contiguous range | |
336 | /// of bytes. | |
337 | /// | |
338 | /// This method may perform better than `write_atomic` because it may be | |
339 | /// able to skip the sink's internal buffer. Use this method if the data to | |
340 | /// be written is already available as a `&[u8]`. | |
341 | /// | |
342 | /// The return value is the address of the data written and can be used to | |
343 | /// refer to the data later on. | |
344 | pub fn write_bytes_atomic(&self, bytes: &[u8]) -> Addr { | |
345 | // For "small" data we go to the buffered version immediately. | |
346 | if bytes.len() <= 128 { | |
347 | return self.write_atomic(bytes.len(), |sink| { | |
348 | sink.copy_from_slice(bytes); | |
349 | }); | |
350 | } | |
351 | ||
d9bb1a4e | 352 | let mut data = self.data.lock(); |
72b1a166 FG |
353 | let SerializationSinkInner { |
354 | ref mut buffer, | |
355 | ref mut addr, | |
356 | } = *data; | |
357 | ||
358 | let curr_addr = Addr(*addr); | |
359 | *addr += bytes.len() as u32; | |
48663c56 | 360 | |
72b1a166 | 361 | let mut bytes_left = bytes; |
48663c56 | 362 | |
72b1a166 FG |
363 | // Do we have too little data in the buffer? If so, fill up the buffer |
364 | // to the minimum page size. | |
365 | if buffer.len() < MIN_PAGE_SIZE { | |
366 | let num_bytes_to_take = min(MIN_PAGE_SIZE - buffer.len(), bytes_left.len()); | |
367 | buffer.extend_from_slice(&bytes_left[..num_bytes_to_take]); | |
368 | bytes_left = &bytes_left[num_bytes_to_take..]; | |
369 | } | |
48663c56 | 370 | |
72b1a166 FG |
371 | if bytes_left.is_empty() { |
372 | return curr_addr; | |
373 | } | |
374 | ||
375 | // Make sure we flush the buffer before writing out any other pages. | |
376 | self.flush(buffer); | |
377 | ||
378 | for chunk in bytes_left.chunks(MAX_PAGE_SIZE) { | |
379 | if chunk.len() == MAX_PAGE_SIZE { | |
380 | // This chunk has the maximum size. It might or might not be the | |
381 | // last one. In either case we want to write it to disk | |
382 | // immediately because there is no reason to copy it to the | |
383 | // buffer first. | |
384 | self.write_page(chunk); | |
385 | } else { | |
386 | // This chunk is less than the chunk size that we requested, so | |
387 | // it must be the last one. If it is big enough to warrant its | |
388 | // own page, we write it to disk immediately. Otherwise, we copy | |
389 | // it to the buffer. | |
390 | if chunk.len() >= MIN_PAGE_SIZE { | |
391 | self.write_page(chunk); | |
392 | } else { | |
393 | debug_assert!(buffer.is_empty()); | |
394 | buffer.extend_from_slice(chunk); | |
395 | } | |
396 | } | |
397 | } | |
398 | ||
399 | curr_addr | |
400 | } | |
48663c56 | 401 | |
72b1a166 FG |
402 | pub fn as_std_write<'a>(&'a self) -> impl Write + 'a { |
403 | StdWriteAdapter(self) | |
48663c56 | 404 | } |
d9bb1a4e | 405 | } |
48663c56 | 406 | |
72b1a166 FG |
407 | impl Drop for SerializationSink { |
408 | fn drop(&mut self) { | |
409 | let mut data = self.data.lock(); | |
410 | let SerializationSinkInner { | |
411 | ref mut buffer, | |
412 | addr: _, | |
413 | } = *data; | |
414 | ||
415 | self.flush(buffer); | |
48663c56 XL |
416 | } |
417 | } | |
72b1a166 FG |
418 | |
419 | #[cfg(test)] | |
420 | mod tests { | |
421 | use super::*; | |
422 | ||
423 | // This function writes `chunk_count` byte-slices of size `chunk_size` to | |
424 | // three `SerializationSinks` that all map to the same underlying stream, | |
425 | // so we get interleaved pages with different tags. | |
426 | // It then extracts the data out again and asserts that it is the same as | |
427 | // has been written. | |
428 | fn test_roundtrip<W>(chunk_size: usize, chunk_count: usize, write: W) | |
429 | where | |
430 | W: Fn(&SerializationSink, &[u8]) -> Addr, | |
431 | { | |
432 | let sink_builder = SerializationSinkBuilder::new_in_memory(); | |
433 | let tags = [PageTag::Events, PageTag::StringData, PageTag::StringIndex]; | |
434 | let expected_chunk: Vec<u8> = (0..chunk_size).map(|x| (x % 239) as u8).collect(); | |
435 | ||
436 | { | |
437 | let sinks: Vec<SerializationSink> = | |
438 | tags.iter().map(|&tag| sink_builder.new_sink(tag)).collect(); | |
439 | ||
440 | for chunk_index in 0..chunk_count { | |
441 | let expected_addr = Addr((chunk_index * chunk_size) as u32); | |
442 | for sink in sinks.iter() { | |
443 | assert_eq!(write(sink, &expected_chunk[..]), expected_addr); | |
444 | } | |
445 | } | |
446 | } | |
447 | ||
448 | let streams: Vec<Vec<u8>> = tags | |
449 | .iter() | |
450 | .map(|&tag| sink_builder.0.copy_bytes_with_page_tag(tag)) | |
451 | .collect(); | |
452 | ||
453 | for stream in streams { | |
454 | for chunk in stream.chunks(chunk_size) { | |
455 | assert_eq!(chunk, expected_chunk); | |
456 | } | |
457 | } | |
458 | } | |
459 | ||
460 | fn write_closure(sink: &SerializationSink, bytes: &[u8]) -> Addr { | |
461 | sink.write_atomic(bytes.len(), |dest| dest.copy_from_slice(bytes)) | |
462 | } | |
463 | ||
464 | fn write_slice(sink: &SerializationSink, bytes: &[u8]) -> Addr { | |
465 | sink.write_bytes_atomic(bytes) | |
466 | } | |
467 | ||
468 | // Creates two roundtrip tests, one using `SerializationSink::write_atomic` | |
469 | // and one using `SerializationSink::write_bytes_atomic`. | |
470 | macro_rules! mk_roundtrip_test { | |
471 | ($name:ident, $chunk_size:expr, $chunk_count:expr) => { | |
472 | mod $name { | |
473 | use super::*; | |
474 | ||
475 | #[test] | |
476 | fn write_atomic() { | |
477 | test_roundtrip($chunk_size, $chunk_count, write_closure); | |
478 | } | |
479 | ||
480 | #[test] | |
481 | fn write_bytes_atomic() { | |
482 | test_roundtrip($chunk_size, $chunk_count, write_slice); | |
483 | } | |
484 | } | |
485 | }; | |
486 | } | |
487 | ||
488 | mk_roundtrip_test!(small_data, 10, (90 * MAX_PAGE_SIZE) / 100); | |
489 | mk_roundtrip_test!(huge_data, MAX_PAGE_SIZE * 10, 5); | |
490 | ||
491 | mk_roundtrip_test!(exactly_max_page_size, MAX_PAGE_SIZE, 10); | |
492 | mk_roundtrip_test!(max_page_size_plus_one, MAX_PAGE_SIZE + 1, 10); | |
493 | mk_roundtrip_test!(max_page_size_minus_one, MAX_PAGE_SIZE - 1, 10); | |
494 | ||
495 | mk_roundtrip_test!(exactly_min_page_size, MIN_PAGE_SIZE, 10); | |
496 | mk_roundtrip_test!(min_page_size_plus_one, MIN_PAGE_SIZE + 1, 10); | |
497 | mk_roundtrip_test!(min_page_size_minus_one, MIN_PAGE_SIZE - 1, 10); | |
498 | } |