]>
git.proxmox.com Git - rustc.git/blob - vendor/measureme/src/serialization.rs
1 /// This module implements the "container" file format that `measureme` uses for
2 /// storing things on disk. The format supports storing three independent
3 /// streams of data: one for events, one for string data, and one for string
4 /// index data (in theory it could support an arbitrary number of separate
5 /// streams but three is all we need). The data of each stream is split into
6 /// "pages", where each page has a small header designating what kind of
7 /// data it is (i.e. event, string data, or string index), and the length of
10 /// Pages of different kinds can be arbitrarily interleaved. The headers allow
11 /// for reconstructing each of the streams later on. An example file might thus
15 /// | file header | page (events) | page (string data) | page (events) | page (string index) |
18 /// The exact encoding of a page is:
20 /// | byte slice | contents |
21 /// |-------------------------|-----------------------------------------|
22 /// | &[0 .. 1] | page tag |
23 /// | &[1 .. 5] | page size as little endian u32 |
24 /// | &[5 .. (5 + page_size)] | page contents (exactly page_size bytes) |
26 /// A page is immediately followed by the next page, without any padding.
27 use parking_lot
::Mutex
;
28 use rustc_hash
::FxHashMap
;
30 use std
::convert
::TryInto
;
31 use std
::error
::Error
;
37 const MAX_PAGE_SIZE
: usize = 256 * 1024;
39 /// The number of bytes we consider enough to warrant their own page when
40 /// deciding whether to flush a partially full buffer. Actual pages may need
41 /// to be smaller, e.g. when writing the tail of the data stream.
42 const MIN_PAGE_SIZE
: usize = MAX_PAGE_SIZE
/ 2;
44 #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
52 impl std
::convert
::TryFrom
<u8> for PageTag
{
55 fn try_from(value
: u8) -> Result
<Self, Self::Error
> {
57 0 => Ok(PageTag
::Events
),
58 1 => Ok(PageTag
::StringData
),
59 2 => Ok(PageTag
::StringIndex
),
60 _
=> Err(format
!("Could not convert byte `{}` to PageTag.", value
)),
65 /// An address within a data stream. Each data stream has its own address space,
66 /// i.e. the first piece of data written to the events stream will have
67 /// `Addr(0)` and the first piece of data written to the string data stream
68 /// will *also* have `Addr(0)`.
70 // TODO: Evaluate if it makes sense to add a type tag to `Addr` in order to
71 // prevent accidental use of `Addr` values with the wrong address space.
72 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
73 pub struct Addr(pub u32);
76 pub fn as_usize(self) -> usize {
82 pub struct SerializationSink
{
83 shared_state
: SharedState
,
84 data
: Mutex
<SerializationSinkInner
>,
88 pub struct SerializationSinkBuilder(SharedState
);
90 impl SerializationSinkBuilder
{
91 pub fn new_from_file(file
: fs
::File
) -> Result
<Self, Box
<dyn Error
+ Send
+ Sync
>> {
92 Ok(Self(SharedState(Arc
::new(Mutex
::new(
93 BackingStorage
::File(file
),
97 pub fn new_in_memory() -> SerializationSinkBuilder
{
98 Self(SharedState(Arc
::new(Mutex
::new(BackingStorage
::Memory(
103 pub fn new_sink(&self, page_tag
: PageTag
) -> SerializationSink
{
105 data
: Mutex
::new(SerializationSinkInner
{
106 buffer
: Vec
::with_capacity(MAX_PAGE_SIZE
),
109 shared_state
: self.0.clone(),
115 /// The `BackingStorage` is what the data gets written to. Usually that is a
116 /// file but for testing purposes it can also be an in-memory vec of bytes.
118 enum BackingStorage
{
123 impl Write
for BackingStorage
{
125 fn write(&mut self, buf
: &[u8]) -> std
::io
::Result
<usize> {
127 BackingStorage
::File(ref mut file
) => file
.write(buf
),
128 BackingStorage
::Memory(ref mut vec
) => vec
.write(buf
),
132 fn flush(&mut self) -> std
::io
::Result
<()> {
134 BackingStorage
::File(ref mut file
) => file
.flush(),
135 BackingStorage
::Memory(_
) => {
143 /// This struct allows to treat `SerializationSink` as `std::io::Write`.
144 pub struct StdWriteAdapter
<'a
>(&'a SerializationSink
);
146 impl<'a
> Write
for StdWriteAdapter
<'a
> {
147 fn write(&mut self, buf
: &[u8]) -> std
::io
::Result
<usize> {
148 self.0.write_bytes_atomic(buf
);
152 fn flush(&mut self) -> std
::io
::Result
<()> {
153 let mut data
= self.0.data
.lock();
154 let SerializationSinkInner
{
159 // First flush the local buffer.
160 self.0.flush
(buffer
);
162 // Then flush the backing store.
163 self.0.shared_state
.0.lock().flush()?
;
170 struct SerializationSinkInner
{
175 /// This state is shared between all `SerializationSink`s writing to the same
176 /// backing storage (e.g. the same file).
177 #[derive(Clone, Debug)]
178 struct SharedState(Arc
<Mutex
<BackingStorage
>>);
181 /// Copies out the contents of all pages with the given tag and
182 /// concatenates them into a single byte vec. This method is only meant to
183 /// be used for testing and will panic if the underlying backing storage is
184 /// a file instead of in memory.
185 fn copy_bytes_with_page_tag(&self, page_tag
: PageTag
) -> Vec
<u8> {
186 let data
= self.0.lock();
187 let data
= match *data
{
188 BackingStorage
::File(_
) => panic
!(),
189 BackingStorage
::Memory(ref data
) => data
,
192 split_streams(data
).remove(&page_tag
).unwrap_or(Vec
::new())
196 /// This function reconstructs the individual data streams from their paged
199 /// For example, if `E` denotes the page header of an events page, `S` denotes
200 /// the header of a string data page, and lower case letters denote page
201 /// contents then a paged stream could look like:
204 /// s = Eabcd_Sopq_Eef_Eghi_Srst
207 /// and `split_streams` would result in the following set of streams:
210 /// split_streams(s) = {
211 /// events: [abcdefghi],
212 /// string_data: [opqrst],
215 pub fn split_streams(paged_data
: &[u8]) -> FxHashMap
<PageTag
, Vec
<u8>> {
216 let mut result
: FxHashMap
<PageTag
, Vec
<u8>> = FxHashMap
::default();
219 while pos
< paged_data
.len() {
220 let tag
= TryInto
::try_into(paged_data
[pos
]).unwrap();
222 u32::from_le_bytes(paged_data
[pos
+ 1..pos
+ 5].try_into().unwrap()) as usize;
224 assert
!(page_size
> 0);
229 .extend_from_slice(&paged_data
[pos
+ 5..pos
+ 5 + page_size
]);
231 pos
+= page_size
+ 5;
237 impl SerializationSink
{
238 /// Writes `bytes` as a single page to the shared backing storage. The
239 /// method will first write the page header (consisting of the page tag and
240 /// the number of bytes in the page) and then the page contents
242 fn write_page(&self, bytes
: &[u8]) {
244 // We explicitly don't assert `bytes.len() >= MIN_PAGE_SIZE` because
245 // `MIN_PAGE_SIZE` is just a recommendation and the last page will
246 // often be smaller than that.
247 assert
!(bytes
.len() <= MAX_PAGE_SIZE
);
249 let mut file
= self.shared_state
.0.lock();
251 file
.write_all(&[self.page_tag
as u8]).unwrap();
253 let page_size
: [u8; 4] = (bytes
.len() as u32).to_le_bytes();
254 file
.write_all(&page_size
).unwrap();
255 file
.write_all(&bytes
[..]).unwrap();
259 /// Flushes `buffer` by writing its contents as a new page to the backing
260 /// storage and then clearing it.
261 fn flush(&self, buffer
: &mut Vec
<u8>) {
262 self.write_page(&buffer
[..]);
266 /// Creates a copy of all data written so far. This method is meant to be
267 /// used for writing unit tests. It will panic if the underlying
268 /// `BackingStorage` is a file.
269 pub fn into_bytes(mut self) -> Vec
<u8> {
270 // Swap out the contains of `self` with something that can safely be
271 // dropped without side effects.
272 let mut data
= Mutex
::new(SerializationSinkInner
{
276 std
::mem
::swap(&mut self.data
, &mut data
);
278 // Extract the data from the mutex.
279 let SerializationSinkInner
{
282 } = data
.into_inner();
284 // Make sure we write the current contents of the buffer to the
285 // backing storage before proceeding.
288 self.shared_state
.copy_bytes_with_page_tag(self.page_tag
)
291 /// Atomically writes `num_bytes` of data to this `SerializationSink`.
292 /// Atomic means the data is guaranteed to be written as a contiguous range
295 /// The buffer provided to the `write` callback is guaranteed to be of size
296 /// `num_bytes` and `write` is supposed to completely fill it with the data
299 /// The return value is the address of the data written and can be used to
300 /// refer to the data later on.
301 pub fn write_atomic
<W
>(&self, num_bytes
: usize, write
: W
) -> Addr
303 W
: FnOnce(&mut [u8]),
305 if num_bytes
> MAX_PAGE_SIZE
{
306 let mut bytes
= vec
![0u8; num_bytes
];
307 write(&mut bytes
[..]);
308 return self.write_bytes_atomic(&bytes
[..]);
311 let mut data
= self.data
.lock();
312 let SerializationSinkInner
{
317 if buffer
.len() + num_bytes
> MAX_PAGE_SIZE
{
319 assert
!(buffer
.is_empty());
322 let curr_addr
= *addr
;
324 let buf_start
= buffer
.len();
325 let buf_end
= buf_start
+ num_bytes
;
326 buffer
.resize(buf_end
, 0u8);
327 write(&mut buffer
[buf_start
..buf_end
]);
329 *addr
+= num_bytes
as u32;
334 /// Atomically writes the data in `bytes` to this `SerializationSink`.
335 /// Atomic means the data is guaranteed to be written as a contiguous range
338 /// This method may perform better than `write_atomic` because it may be
339 /// able to skip the sink's internal buffer. Use this method if the data to
340 /// be written is already available as a `&[u8]`.
342 /// The return value is the address of the data written and can be used to
343 /// refer to the data later on.
344 pub fn write_bytes_atomic(&self, bytes
: &[u8]) -> Addr
{
345 // For "small" data we go to the buffered version immediately.
346 if bytes
.len() <= 128 {
347 return self.write_atomic(bytes
.len(), |sink
| {
348 sink
.copy_from_slice(bytes
);
352 let mut data
= self.data
.lock();
353 let SerializationSinkInner
{
358 let curr_addr
= Addr(*addr
);
359 *addr
+= bytes
.len() as u32;
361 let mut bytes_left
= bytes
;
363 // Do we have too little data in the buffer? If so, fill up the buffer
364 // to the minimum page size.
365 if buffer
.len() < MIN_PAGE_SIZE
{
366 let num_bytes_to_take
= min(MIN_PAGE_SIZE
- buffer
.len(), bytes_left
.len());
367 buffer
.extend_from_slice(&bytes_left
[..num_bytes_to_take
]);
368 bytes_left
= &bytes_left
[num_bytes_to_take
..];
371 if bytes_left
.is_empty() {
375 // Make sure we flush the buffer before writing out any other pages.
378 for chunk
in bytes_left
.chunks(MAX_PAGE_SIZE
) {
379 if chunk
.len() == MAX_PAGE_SIZE
{
380 // This chunk has the maximum size. It might or might not be the
381 // last one. In either case we want to write it to disk
382 // immediately because there is no reason to copy it to the
384 self.write_page(chunk
);
386 // This chunk is less than the chunk size that we requested, so
387 // it must be the last one. If it is big enough to warrant its
388 // own page, we write it to disk immediately. Otherwise, we copy
390 if chunk
.len() >= MIN_PAGE_SIZE
{
391 self.write_page(chunk
);
393 debug_assert
!(buffer
.is_empty());
394 buffer
.extend_from_slice(chunk
);
402 pub fn as_std_write
<'a
>(&'a
self) -> impl Write
+ 'a
{
403 StdWriteAdapter(self)
407 impl Drop
for SerializationSink
{
409 let mut data
= self.data
.lock();
410 let SerializationSinkInner
{
423 // This function writes `chunk_count` byte-slices of size `chunk_size` to
424 // three `SerializationSinks` that all map to the same underlying stream,
425 // so we get interleaved pages with different tags.
426 // It then extracts the data out again and asserts that it is the same as
428 fn test_roundtrip
<W
>(chunk_size
: usize, chunk_count
: usize, write
: W
)
430 W
: Fn(&SerializationSink
, &[u8]) -> Addr
,
432 let sink_builder
= SerializationSinkBuilder
::new_in_memory();
433 let tags
= [PageTag
::Events
, PageTag
::StringData
, PageTag
::StringIndex
];
434 let expected_chunk
: Vec
<u8> = (0..chunk_size
).map(|x
| (x
% 239) as u8).collect();
437 let sinks
: Vec
<SerializationSink
> =
438 tags
.iter().map(|&tag
| sink_builder
.new_sink(tag
)).collect();
440 for chunk_index
in 0..chunk_count
{
441 let expected_addr
= Addr((chunk_index
* chunk_size
) as u32);
442 for sink
in sinks
.iter() {
443 assert_eq
!(write(sink
, &expected_chunk
[..]), expected_addr
);
448 let streams
: Vec
<Vec
<u8>> = tags
450 .map(|&tag
| sink_builder
.0.copy_bytes_with_page_tag(tag
))
453 for stream
in streams
{
454 for chunk
in stream
.chunks(chunk_size
) {
455 assert_eq
!(chunk
, expected_chunk
);
460 fn write_closure(sink
: &SerializationSink
, bytes
: &[u8]) -> Addr
{
461 sink
.write_atomic(bytes
.len(), |dest
| dest
.copy_from_slice(bytes
))
464 fn write_slice(sink
: &SerializationSink
, bytes
: &[u8]) -> Addr
{
465 sink
.write_bytes_atomic(bytes
)
468 // Creates two roundtrip tests, one using `SerializationSink::write_atomic`
469 // and one using `SerializationSink::write_bytes_atomic`.
470 macro_rules
! mk_roundtrip_test
{
471 ($name
:ident
, $chunk_size
:expr
, $chunk_count
:expr
) => {
477 test_roundtrip($chunk_size
, $chunk_count
, write_closure
);
481 fn write_bytes_atomic() {
482 test_roundtrip($chunk_size
, $chunk_count
, write_slice
);
488 mk_roundtrip_test
!(small_data
, 10, (90 * MAX_PAGE_SIZE
) / 100);
489 mk_roundtrip_test
!(huge_data
, MAX_PAGE_SIZE
* 10, 5);
491 mk_roundtrip_test
!(exactly_max_page_size
, MAX_PAGE_SIZE
, 10);
492 mk_roundtrip_test
!(max_page_size_plus_one
, MAX_PAGE_SIZE
+ 1, 10);
493 mk_roundtrip_test
!(max_page_size_minus_one
, MAX_PAGE_SIZE
- 1, 10);
495 mk_roundtrip_test
!(exactly_min_page_size
, MIN_PAGE_SIZE
, 10);
496 mk_roundtrip_test
!(min_page_size_plus_one
, MIN_PAGE_SIZE
+ 1, 10);
497 mk_roundtrip_test
!(min_page_size_minus_one
, MIN_PAGE_SIZE
- 1, 10);