]> git.proxmox.com Git - rustc.git/blobdiff - vendor/measureme/src/serialization.rs
Merge tag 'debian/1.52.1+dfsg1-1_exp2' into proxmox/buster
[rustc.git] / vendor / measureme / src / serialization.rs
index 28bfd96c0342ac1c88c690c9b2c53a7971ac609b..6dcc51d39b9061c810b617a0d26c482a79647c19 100644 (file)
-/// This module implements the "container" file format that `measureme` uses for
-/// storing things on disk. The format supports storing three independent
-/// streams of data: one for events, one for string data, and one for string
-/// index data (in theory it could support an arbitrary number of separate
-/// streams but three is all we need). The data of each stream is split into
-/// "pages", where each page has a small header designating what kind of
-/// data it is (i.e. event, string data, or string index), and the length of
-/// the page.
-///
-/// Pages of different kinds can be arbitrarily interleaved. The headers allow
-/// for reconstructing each of the streams later on. An example file might thus
-/// look like this:
-///
-/// ```ignore
-/// | file header | page (events) | page (string data) | page (events) | page (string index) |
-/// ```
-///
-/// The exact encoding of a page is:
-///
-/// | byte slice              | contents                                |
-/// |-------------------------|-----------------------------------------|
-/// | &[0 .. 1]               | page tag                                |
-/// | &[1 .. 5]               | page size as little endian u32          |
-/// | &[5 .. (5 + page_size)] | page contents (exactly page_size bytes) |
-///
-/// A page is immediately followed by the next page, without any padding.
-use parking_lot::Mutex;
-use rustc_hash::FxHashMap;
-use std::cmp::min;
-use std::convert::TryInto;
-use std::error::Error;
-use std::fmt::Debug;
-use std::fs;
-use std::io::Write;
-use std::sync::Arc;
-
-const MAX_PAGE_SIZE: usize = 256 * 1024;
-
-/// The number of bytes we consider enough to warrant their own page when
-/// deciding whether to flush a partially full buffer. Actual pages may need
-/// to be smaller, e.g. when writing the tail of the data stream.
-const MIN_PAGE_SIZE: usize = MAX_PAGE_SIZE / 2;
-
-#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
-#[repr(u8)]
-pub enum PageTag {
-    Events = 0,
-    StringData = 1,
-    StringIndex = 2,
-}
-
-impl std::convert::TryFrom<u8> for PageTag {
-    type Error = String;
-
-    fn try_from(value: u8) -> Result<Self, Self::Error> {
-        match value {
-            0 => Ok(PageTag::Events),
-            1 => Ok(PageTag::StringData),
-            2 => Ok(PageTag::StringIndex),
-            _ => Err(format!("Could not convert byte `{}` to PageTag.", value)),
-        }
-    }
-}
-
-/// An address within a data stream. Each data stream has its own address space,
-/// i.e. the first piece of data written to the events stream will have
-/// `Addr(0)` and the first piece of data written to the string data stream
-/// will *also* have `Addr(0)`.
-//
-// TODO: Evaluate if it makes sense to add a type tag to `Addr` in order to
-//       prevent accidental use of `Addr` values with the wrong address space.
-#[derive(Clone, Copy, Eq, PartialEq, Debug)]
-pub struct Addr(pub u32);
-
-impl Addr {
-    pub fn as_usize(self) -> usize {
-        self.0 as usize
-    }
-}
-
-#[derive(Debug)]
-pub struct SerializationSink {
-    shared_state: SharedState,
-    data: Mutex<SerializationSinkInner>,
-    page_tag: PageTag,
-}
-
-pub struct SerializationSinkBuilder(SharedState);
-
-impl SerializationSinkBuilder {
-    pub fn new_from_file(file: fs::File) -> Result<Self, Box<dyn Error + Send + Sync>> {
-        Ok(Self(SharedState(Arc::new(Mutex::new(
-            BackingStorage::File(file),
-        )))))
-    }
-
-    pub fn new_in_memory() -> SerializationSinkBuilder {
-        Self(SharedState(Arc::new(Mutex::new(BackingStorage::Memory(
-            Vec::new(),
-        )))))
-    }
-
-    pub fn new_sink(&self, page_tag: PageTag) -> SerializationSink {
-        SerializationSink {
-            data: Mutex::new(SerializationSinkInner {
-                buffer: Vec::with_capacity(MAX_PAGE_SIZE),
-                addr: 0,
-            }),
-            shared_state: self.0.clone(),
-            page_tag,
-        }
-    }
-}
-
-/// The `BackingStorage` is what the data gets written to. Usually that is a
-/// file but for testing purposes it can also be an in-memory vec of bytes.
-#[derive(Debug)]
-enum BackingStorage {
-    File(fs::File),
-    Memory(Vec<u8>),
-}
-
-impl Write for BackingStorage {
-    #[inline]
-    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
-        match *self {
-            BackingStorage::File(ref mut file) => file.write(buf),
-            BackingStorage::Memory(ref mut vec) => vec.write(buf),
-        }
-    }
-
-    fn flush(&mut self) -> std::io::Result<()> {
-        match *self {
-            BackingStorage::File(ref mut file) => file.flush(),
-            BackingStorage::Memory(_) => {
-                // Nothing to do
-                Ok(())
-            }
-        }
-    }
-}
-
-/// This struct allows to treat `SerializationSink` as `std::io::Write`.
-pub struct StdWriteAdapter<'a>(&'a SerializationSink);
-
-impl<'a> Write for StdWriteAdapter<'a> {
-    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
-        self.0.write_bytes_atomic(buf);
-        Ok(buf.len())
-    }
-
-    fn flush(&mut self) -> std::io::Result<()> {
-        let mut data = self.0.data.lock();
-        let SerializationSinkInner {
-            ref mut buffer,
-            addr: _,
-        } = *data;
-
-        // First flush the local buffer.
-        self.0.flush(buffer);
-
-        // Then flush the backing store.
-        self.0.shared_state.0.lock().flush()?;
-
-        Ok(())
-    }
-}
-
-#[derive(Debug)]
-struct SerializationSinkInner {
-    buffer: Vec<u8>,
-    addr: u32,
-}
-
-/// This state is shared between all `SerializationSink`s writing to the same
-/// backing storage (e.g. the same file).
-#[derive(Clone, Debug)]
-struct SharedState(Arc<Mutex<BackingStorage>>);
-
-impl SharedState {
-    /// Copies out the contents of all pages with the given tag and
-    /// concatenates them into a single byte vec. This method is only meant to
-    /// be used for testing and will panic if the underlying backing storage is
-    /// a file instead of in memory.
-    fn copy_bytes_with_page_tag(&self, page_tag: PageTag) -> Vec<u8> {
-        let data = self.0.lock();
-        let data = match *data {
-            BackingStorage::File(_) => panic!(),
-            BackingStorage::Memory(ref data) => data,
-        };
-
-        split_streams(data).remove(&page_tag).unwrap_or(Vec::new())
-    }
-}
-
-/// This function reconstructs the individual data streams from their paged
-/// version.
-///
-/// For example, if `E` denotes the page header of an events page, `S` denotes
-/// the header of a string data page, and lower case letters denote page
-/// contents then a paged stream could look like:
-///
-/// ```ignore
-/// s = Eabcd_Sopq_Eef_Eghi_Srst
-/// ```
-///
-/// and `split_streams` would result in the following set of streams:
-///
-/// ```ignore
-/// split_streams(s) = {
-///     events: [abcdefghi],
-///     string_data: [opqrst],
-/// }
-/// ```
-pub fn split_streams(paged_data: &[u8]) -> FxHashMap<PageTag, Vec<u8>> {
-    let mut result: FxHashMap<PageTag, Vec<u8>> = FxHashMap::default();
-
-    let mut pos = 0;
-    while pos < paged_data.len() {
-        let tag = TryInto::try_into(paged_data[pos]).unwrap();
-        let page_size =
-            u32::from_le_bytes(paged_data[pos + 1..pos + 5].try_into().unwrap()) as usize;
-
-        assert!(page_size > 0);
-
-        result
-            .entry(tag)
-            .or_default()
-            .extend_from_slice(&paged_data[pos + 5..pos + 5 + page_size]);
-
-        pos += page_size + 5;
-    }
-
-    result
-}
-
-impl SerializationSink {
-    /// Writes `bytes` as a single page to the shared backing storage. The
-    /// method will first write the page header (consisting of the page tag and
-    /// the number of bytes in the page) and then the page contents
-    /// (i.e. `bytes`).
-    fn write_page(&self, bytes: &[u8]) {
-        if bytes.len() > 0 {
-            // We explicitly don't assert `bytes.len() >= MIN_PAGE_SIZE` because
-            // `MIN_PAGE_SIZE` is just a recommendation and the last page will
-            // often be smaller than that.
-            assert!(bytes.len() <= MAX_PAGE_SIZE);
-
-            let mut file = self.shared_state.0.lock();
-
-            file.write_all(&[self.page_tag as u8]).unwrap();
-
-            let page_size: [u8; 4] = (bytes.len() as u32).to_le_bytes();
-            file.write_all(&page_size).unwrap();
-            file.write_all(&bytes[..]).unwrap();
-        }
-    }
-
-    /// Flushes `buffer` by writing its contents as a new page to the backing
-    /// storage and then clearing it.
-    fn flush(&self, buffer: &mut Vec<u8>) {
-        self.write_page(&buffer[..]);
-        buffer.clear();
-    }
-
-    /// Creates a copy of all data written so far. This method is meant to be
-    /// used for writing unit tests. It will panic if the underlying
-    /// `BackingStorage` is a file.
-    pub fn into_bytes(mut self) -> Vec<u8> {
-        // Swap out the contains of `self` with something that can safely be
-        // dropped without side effects.
-        let mut data = Mutex::new(SerializationSinkInner {
-            buffer: Vec::new(),
-            addr: 0,
-        });
-        std::mem::swap(&mut self.data, &mut data);
-
-        // Extract the data from the mutex.
-        let SerializationSinkInner {
-            ref mut buffer,
-            addr: _,
-        } = data.into_inner();
-
-        // Make sure we write the current contents of the buffer to the
-        // backing storage before proceeding.
-        self.flush(buffer);
-
-        self.shared_state.copy_bytes_with_page_tag(self.page_tag)
-    }
-
-    /// Atomically writes `num_bytes` of data to this `SerializationSink`.
-    /// Atomic means the data is guaranteed to be written as a contiguous range
-    /// of bytes.
-    ///
-    /// The buffer provided to the `write` callback is guaranteed to be of size
-    /// `num_bytes` and `write` is supposed to completely fill it with the data
-    /// to be written.
-    ///
-    /// The return value is the address of the data written and can be used to
-    /// refer to the data later on.
-    pub fn write_atomic<W>(&self, num_bytes: usize, write: W) -> Addr
-    where
-        W: FnOnce(&mut [u8]),
-    {
-        if num_bytes > MAX_PAGE_SIZE {
-            let mut bytes = vec![0u8; num_bytes];
-            write(&mut bytes[..]);
-            return self.write_bytes_atomic(&bytes[..]);
-        }
-
-        let mut data = self.data.lock();
-        let SerializationSinkInner {
-            ref mut buffer,
-            ref mut addr,
-        } = *data;
-
-        if buffer.len() + num_bytes > MAX_PAGE_SIZE {
-            self.flush(buffer);
-            assert!(buffer.is_empty());
-        }
-
-        let curr_addr = *addr;
-
-        let buf_start = buffer.len();
-        let buf_end = buf_start + num_bytes;
-        buffer.resize(buf_end, 0u8);
-        write(&mut buffer[buf_start..buf_end]);
-
-        *addr += num_bytes as u32;
-
-        Addr(curr_addr)
-    }
-
-    /// Atomically writes the data in `bytes` to this `SerializationSink`.
-    /// Atomic means the data is guaranteed to be written as a contiguous range
-    /// of bytes.
-    ///
-    /// This method may perform better than `write_atomic` because it may be
-    /// able to skip the sink's internal buffer. Use this method if the data to
-    /// be written is already available as a `&[u8]`.
-    ///
-    /// The return value is the address of the data written and can be used to
-    /// refer to the data later on.
-    pub fn write_bytes_atomic(&self, bytes: &[u8]) -> Addr {
-        // For "small" data we go to the buffered version immediately.
-        if bytes.len() <= 128 {
-            return self.write_atomic(bytes.len(), |sink| {
-                sink.copy_from_slice(bytes);
-            });
-        }
-
-        let mut data = self.data.lock();
-        let SerializationSinkInner {
-            ref mut buffer,
-            ref mut addr,
-        } = *data;
-
-        let curr_addr = Addr(*addr);
-        *addr += bytes.len() as u32;
-
-        let mut bytes_left = bytes;
-
-        // Do we have too little data in the buffer? If so, fill up the buffer
-        // to the minimum page size.
-        if buffer.len() < MIN_PAGE_SIZE {
-            let num_bytes_to_take = min(MIN_PAGE_SIZE - buffer.len(), bytes_left.len());
-            buffer.extend_from_slice(&bytes_left[..num_bytes_to_take]);
-            bytes_left = &bytes_left[num_bytes_to_take..];
-        }
-
-        if bytes_left.is_empty() {
-            return curr_addr;
-        }
-
-        // Make sure we flush the buffer before writing out any other pages.
-        self.flush(buffer);
-
-        for chunk in bytes_left.chunks(MAX_PAGE_SIZE) {
-            if chunk.len() == MAX_PAGE_SIZE {
-                // This chunk has the maximum size. It might or might not be the
-                // last one. In either case we want to write it to disk
-                // immediately because there is no reason to copy it to the
-                // buffer first.
-                self.write_page(chunk);
-            } else {
-                // This chunk is less than the chunk size that we requested, so
-                // it must be the last one. If it is big enough to warrant its
-                // own page, we write it to disk immediately. Otherwise, we copy
-                // it to the buffer.
-                if chunk.len() >= MIN_PAGE_SIZE {
-                    self.write_page(chunk);
-                } else {
-                    debug_assert!(buffer.is_empty());
-                    buffer.extend_from_slice(chunk);
-                }
-            }
-        }
-
-        curr_addr
-    }
-
-    pub fn as_std_write<'a>(&'a self) -> impl Write + 'a {
-        StdWriteAdapter(self)
-    }
-}
-
-impl Drop for SerializationSink {
-    fn drop(&mut self) {
-        let mut data = self.data.lock();
-        let SerializationSinkInner {
-            ref mut buffer,
-            addr: _,
-        } = *data;
-
-        self.flush(buffer);
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    // This function writes `chunk_count` byte-slices of size `chunk_size` to
-    // three `SerializationSinks` that all map to the same underlying stream,
-    // so we get interleaved pages with different tags.
-    // It then extracts the data out again and asserts that it is the same as
-    // has been written.
-    fn test_roundtrip<W>(chunk_size: usize, chunk_count: usize, write: W)
-    where
-        W: Fn(&SerializationSink, &[u8]) -> Addr,
-    {
-        let sink_builder = SerializationSinkBuilder::new_in_memory();
-        let tags = [PageTag::Events, PageTag::StringData, PageTag::StringIndex];
-        let expected_chunk: Vec<u8> = (0..chunk_size).map(|x| (x % 239) as u8).collect();
-
-        {
-            let sinks: Vec<SerializationSink> =
-                tags.iter().map(|&tag| sink_builder.new_sink(tag)).collect();
-
-            for chunk_index in 0..chunk_count {
-                let expected_addr = Addr((chunk_index * chunk_size) as u32);
-                for sink in sinks.iter() {
-                    assert_eq!(write(sink, &expected_chunk[..]), expected_addr);
-                }
-            }
-        }
-
-        let streams: Vec<Vec<u8>> = tags
-            .iter()
-            .map(|&tag| sink_builder.0.copy_bytes_with_page_tag(tag))
-            .collect();
-
-        for stream in streams {
-            for chunk in stream.chunks(chunk_size) {
-                assert_eq!(chunk, expected_chunk);
-            }
-        }
-    }
-
-    fn write_closure(sink: &SerializationSink, bytes: &[u8]) -> Addr {
-        sink.write_atomic(bytes.len(), |dest| dest.copy_from_slice(bytes))
-    }
-
-    fn write_slice(sink: &SerializationSink, bytes: &[u8]) -> Addr {
-        sink.write_bytes_atomic(bytes)
-    }
-
-    // Creates two roundtrip tests, one using `SerializationSink::write_atomic`
-    // and one using `SerializationSink::write_bytes_atomic`.
-    macro_rules! mk_roundtrip_test {
-        ($name:ident, $chunk_size:expr, $chunk_count:expr) => {
-            mod $name {
-                use super::*;
-
-                #[test]
-                fn write_atomic() {
-                    test_roundtrip($chunk_size, $chunk_count, write_closure);
-                }
-
-                #[test]
-                fn write_bytes_atomic() {
-                    test_roundtrip($chunk_size, $chunk_count, write_slice);
-                }
-            }
-        };
-    }
-
-    mk_roundtrip_test!(small_data, 10, (90 * MAX_PAGE_SIZE) / 100);
-    mk_roundtrip_test!(huge_data, MAX_PAGE_SIZE * 10, 5);
-
-    mk_roundtrip_test!(exactly_max_page_size, MAX_PAGE_SIZE, 10);
-    mk_roundtrip_test!(max_page_size_plus_one, MAX_PAGE_SIZE + 1, 10);
-    mk_roundtrip_test!(max_page_size_minus_one, MAX_PAGE_SIZE - 1, 10);
-
-    mk_roundtrip_test!(exactly_min_page_size, MIN_PAGE_SIZE, 10);
-    mk_roundtrip_test!(min_page_size_plus_one, MIN_PAGE_SIZE + 1, 10);
-    mk_roundtrip_test!(min_page_size_minus_one, MIN_PAGE_SIZE - 1, 10);
-}
+/// This module implements the "container" file format that `measureme` uses for\r
+/// storing things on disk. The format supports storing three independent\r
+/// streams of data: one for events, one for string data, and one for string\r
+/// index data (in theory it could support an arbitrary number of separate\r
+/// streams but three is all we need). The data of each stream is split into\r
+/// "pages", where each page has a small header designating what kind of\r
+/// data it is (i.e. event, string data, or string index), and the length of\r
+/// the page.\r
+///\r
+/// Pages of different kinds can be arbitrarily interleaved. The headers allow\r
+/// for reconstructing each of the streams later on. An example file might thus\r
+/// look like this:\r
+///\r
+/// ```ignore\r
+/// | file header | page (events) | page (string data) | page (events) | page (string index) |\r
+/// ```\r
+///\r
+/// The exact encoding of a page is:\r
+///\r
+/// | byte slice              | contents                                |\r
+/// |-------------------------|-----------------------------------------|\r
+/// | &[0 .. 1]               | page tag                                |\r
+/// | &[1 .. 5]               | page size as little endian u32          |\r
+/// | &[5 .. (5 + page_size)] | page contents (exactly page_size bytes) |\r
+///\r
+/// A page is immediately followed by the next page, without any padding.\r
+use parking_lot::Mutex;\r
+use rustc_hash::FxHashMap;\r
+use std::cmp::min;\r
+use std::convert::TryInto;\r
+use std::error::Error;\r
+use std::fmt::Debug;\r
+use std::fs;\r
+use std::io::Write;\r
+use std::sync::Arc;\r
+\r
+const MAX_PAGE_SIZE: usize = 256 * 1024;\r
+\r
+/// The number of bytes we consider enough to warrant their own page when\r
+/// deciding whether to flush a partially full buffer. Actual pages may need\r
+/// to be smaller, e.g. when writing the tail of the data stream.\r
+const MIN_PAGE_SIZE: usize = MAX_PAGE_SIZE / 2;\r
+\r
+#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]\r
+#[repr(u8)]\r
+pub enum PageTag {\r
+    Events = 0,\r
+    StringData = 1,\r
+    StringIndex = 2,\r
+}\r
+\r
+impl std::convert::TryFrom<u8> for PageTag {\r
+    type Error = String;\r
+\r
+    fn try_from(value: u8) -> Result<Self, Self::Error> {\r
+        match value {\r
+            0 => Ok(PageTag::Events),\r
+            1 => Ok(PageTag::StringData),\r
+            2 => Ok(PageTag::StringIndex),\r
+            _ => Err(format!("Could not convert byte `{}` to PageTag.", value)),\r
+        }\r
+    }\r
+}\r
+\r
+/// An address within a data stream. Each data stream has its own address space,\r
+/// i.e. the first piece of data written to the events stream will have\r
+/// `Addr(0)` and the first piece of data written to the string data stream\r
+/// will *also* have `Addr(0)`.\r
+//\r
+// TODO: Evaluate if it makes sense to add a type tag to `Addr` in order to\r
+//       prevent accidental use of `Addr` values with the wrong address space.\r
+#[derive(Clone, Copy, Eq, PartialEq, Debug)]\r
+pub struct Addr(pub u32);\r
+\r
+impl Addr {\r
+    pub fn as_usize(self) -> usize {\r
+        self.0 as usize\r
+    }\r
+}\r
+\r
+#[derive(Debug)]\r
+pub struct SerializationSink {\r
+    shared_state: SharedState,\r
+    data: Mutex<SerializationSinkInner>,\r
+    page_tag: PageTag,\r
+}\r
+\r
+pub struct SerializationSinkBuilder(SharedState);\r
+\r
+impl SerializationSinkBuilder {\r
+    pub fn new_from_file(file: fs::File) -> Result<Self, Box<dyn Error + Send + Sync>> {\r
+        Ok(Self(SharedState(Arc::new(Mutex::new(\r
+            BackingStorage::File(file),\r
+        )))))\r
+    }\r
+\r
+    pub fn new_in_memory() -> SerializationSinkBuilder {\r
+        Self(SharedState(Arc::new(Mutex::new(BackingStorage::Memory(\r
+            Vec::new(),\r
+        )))))\r
+    }\r
+\r
+    pub fn new_sink(&self, page_tag: PageTag) -> SerializationSink {\r
+        SerializationSink {\r
+            data: Mutex::new(SerializationSinkInner {\r
+                buffer: Vec::with_capacity(MAX_PAGE_SIZE),\r
+                addr: 0,\r
+            }),\r
+            shared_state: self.0.clone(),\r
+            page_tag,\r
+        }\r
+    }\r
+}\r
+\r
+/// The `BackingStorage` is what the data gets written to. Usually that is a\r
+/// file but for testing purposes it can also be an in-memory vec of bytes.\r
+#[derive(Debug)]\r
+enum BackingStorage {\r
+    File(fs::File),\r
+    Memory(Vec<u8>),\r
+}\r
+\r
+impl Write for BackingStorage {\r
+    #[inline]\r
+    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {\r
+        match *self {\r
+            BackingStorage::File(ref mut file) => file.write(buf),\r
+            BackingStorage::Memory(ref mut vec) => vec.write(buf),\r
+        }\r
+    }\r
+\r
+    fn flush(&mut self) -> std::io::Result<()> {\r
+        match *self {\r
+            BackingStorage::File(ref mut file) => file.flush(),\r
+            BackingStorage::Memory(_) => {\r
+                // Nothing to do\r
+                Ok(())\r
+            }\r
+        }\r
+    }\r
+}\r
+\r
+/// This struct allows to treat `SerializationSink` as `std::io::Write`.\r
+pub struct StdWriteAdapter<'a>(&'a SerializationSink);\r
+\r
+impl<'a> Write for StdWriteAdapter<'a> {\r
+    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {\r
+        self.0.write_bytes_atomic(buf);\r
+        Ok(buf.len())\r
+    }\r
+\r
+    fn flush(&mut self) -> std::io::Result<()> {\r
+        let mut data = self.0.data.lock();\r
+        let SerializationSinkInner {\r
+            ref mut buffer,\r
+            addr: _,\r
+        } = *data;\r
+\r
+        // First flush the local buffer.\r
+        self.0.flush(buffer);\r
+\r
+        // Then flush the backing store.\r
+        self.0.shared_state.0.lock().flush()?;\r
+\r
+        Ok(())\r
+    }\r
+}\r
+\r
+#[derive(Debug)]\r
+struct SerializationSinkInner {\r
+    buffer: Vec<u8>,\r
+    addr: u32,\r
+}\r
+\r
+/// This state is shared between all `SerializationSink`s writing to the same\r
+/// backing storage (e.g. the same file).\r
+#[derive(Clone, Debug)]\r
+struct SharedState(Arc<Mutex<BackingStorage>>);\r
+\r
+impl SharedState {\r
+    /// Copies out the contents of all pages with the given tag and\r
+    /// concatenates them into a single byte vec. This method is only meant to\r
+    /// be used for testing and will panic if the underlying backing storage is\r
+    /// a file instead of in memory.\r
+    fn copy_bytes_with_page_tag(&self, page_tag: PageTag) -> Vec<u8> {\r
+        let data = self.0.lock();\r
+        let data = match *data {\r
+            BackingStorage::File(_) => panic!(),\r
+            BackingStorage::Memory(ref data) => data,\r
+        };\r
+\r
+        split_streams(data).remove(&page_tag).unwrap_or(Vec::new())\r
+    }\r
+}\r
+\r
+/// This function reconstructs the individual data streams from their paged\r
+/// version.\r
+///\r
+/// For example, if `E` denotes the page header of an events page, `S` denotes\r
+/// the header of a string data page, and lower case letters denote page\r
+/// contents then a paged stream could look like:\r
+///\r
+/// ```ignore\r
+/// s = Eabcd_Sopq_Eef_Eghi_Srst\r
+/// ```\r
+///\r
+/// and `split_streams` would result in the following set of streams:\r
+///\r
+/// ```ignore\r
+/// split_streams(s) = {\r
+///     events: [abcdefghi],\r
+///     string_data: [opqrst],\r
+/// }\r
+/// ```\r
+pub fn split_streams(paged_data: &[u8]) -> FxHashMap<PageTag, Vec<u8>> {\r
+    let mut result: FxHashMap<PageTag, Vec<u8>> = FxHashMap::default();\r
+\r
+    let mut pos = 0;\r
+    while pos < paged_data.len() {\r
+        let tag = TryInto::try_into(paged_data[pos]).unwrap();\r
+        let page_size =\r
+            u32::from_le_bytes(paged_data[pos + 1..pos + 5].try_into().unwrap()) as usize;\r
+\r
+        assert!(page_size > 0);\r
+\r
+        result\r
+            .entry(tag)\r
+            .or_default()\r
+            .extend_from_slice(&paged_data[pos + 5..pos + 5 + page_size]);\r
+\r
+        pos += page_size + 5;\r
+    }\r
+\r
+    result\r
+}\r
+\r
+impl SerializationSink {\r
+    /// Writes `bytes` as a single page to the shared backing storage. The\r
+    /// method will first write the page header (consisting of the page tag and\r
+    /// the number of bytes in the page) and then the page contents\r
+    /// (i.e. `bytes`).\r
+    fn write_page(&self, bytes: &[u8]) {\r
+        if bytes.len() > 0 {\r
+            // We explicitly don't assert `bytes.len() >= MIN_PAGE_SIZE` because\r
+            // `MIN_PAGE_SIZE` is just a recommendation and the last page will\r
+            // often be smaller than that.\r
+            assert!(bytes.len() <= MAX_PAGE_SIZE);\r
+\r
+            let mut file = self.shared_state.0.lock();\r
+\r
+            file.write_all(&[self.page_tag as u8]).unwrap();\r
+\r
+            let page_size: [u8; 4] = (bytes.len() as u32).to_le_bytes();\r
+            file.write_all(&page_size).unwrap();\r
+            file.write_all(&bytes[..]).unwrap();\r
+        }\r
+    }\r
+\r
+    /// Flushes `buffer` by writing its contents as a new page to the backing\r
+    /// storage and then clearing it.\r
+    fn flush(&self, buffer: &mut Vec<u8>) {\r
+        self.write_page(&buffer[..]);\r
+        buffer.clear();\r
+    }\r
+\r
+    /// Creates a copy of all data written so far. This method is meant to be\r
+    /// used for writing unit tests. It will panic if the underlying\r
+    /// `BackingStorage` is a file.\r
+    pub fn into_bytes(mut self) -> Vec<u8> {\r
+        // Swap out the contains of `self` with something that can safely be\r
+        // dropped without side effects.\r
+        let mut data = Mutex::new(SerializationSinkInner {\r
+            buffer: Vec::new(),\r
+            addr: 0,\r
+        });\r
+        std::mem::swap(&mut self.data, &mut data);\r
+\r
+        // Extract the data from the mutex.\r
+        let SerializationSinkInner {\r
+            ref mut buffer,\r
+            addr: _,\r
+        } = data.into_inner();\r
+\r
+        // Make sure we write the current contents of the buffer to the\r
+        // backing storage before proceeding.\r
+        self.flush(buffer);\r
+\r
+        self.shared_state.copy_bytes_with_page_tag(self.page_tag)\r
+    }\r
+\r
+    /// Atomically writes `num_bytes` of data to this `SerializationSink`.\r
+    /// Atomic means the data is guaranteed to be written as a contiguous range\r
+    /// of bytes.\r
+    ///\r
+    /// The buffer provided to the `write` callback is guaranteed to be of size\r
+    /// `num_bytes` and `write` is supposed to completely fill it with the data\r
+    /// to be written.\r
+    ///\r
+    /// The return value is the address of the data written and can be used to\r
+    /// refer to the data later on.\r
+    pub fn write_atomic<W>(&self, num_bytes: usize, write: W) -> Addr\r
+    where\r
+        W: FnOnce(&mut [u8]),\r
+    {\r
+        if num_bytes > MAX_PAGE_SIZE {\r
+            let mut bytes = vec![0u8; num_bytes];\r
+            write(&mut bytes[..]);\r
+            return self.write_bytes_atomic(&bytes[..]);\r
+        }\r
+\r
+        let mut data = self.data.lock();\r
+        let SerializationSinkInner {\r
+            ref mut buffer,\r
+            ref mut addr,\r
+        } = *data;\r
+\r
+        if buffer.len() + num_bytes > MAX_PAGE_SIZE {\r
+            self.flush(buffer);\r
+            assert!(buffer.is_empty());\r
+        }\r
+\r
+        let curr_addr = *addr;\r
+\r
+        let buf_start = buffer.len();\r
+        let buf_end = buf_start + num_bytes;\r
+        buffer.resize(buf_end, 0u8);\r
+        write(&mut buffer[buf_start..buf_end]);\r
+\r
+        *addr += num_bytes as u32;\r
+\r
+        Addr(curr_addr)\r
+    }\r
+\r
+    /// Atomically writes the data in `bytes` to this `SerializationSink`.\r
+    /// Atomic means the data is guaranteed to be written as a contiguous range\r
+    /// of bytes.\r
+    ///\r
+    /// This method may perform better than `write_atomic` because it may be\r
+    /// able to skip the sink's internal buffer. Use this method if the data to\r
+    /// be written is already available as a `&[u8]`.\r
+    ///\r
+    /// The return value is the address of the data written and can be used to\r
+    /// refer to the data later on.\r
+    pub fn write_bytes_atomic(&self, bytes: &[u8]) -> Addr {\r
+        // For "small" data we go to the buffered version immediately.\r
+        if bytes.len() <= 128 {\r
+            return self.write_atomic(bytes.len(), |sink| {\r
+                sink.copy_from_slice(bytes);\r
+            });\r
+        }\r
+\r
+        let mut data = self.data.lock();\r
+        let SerializationSinkInner {\r
+            ref mut buffer,\r
+            ref mut addr,\r
+        } = *data;\r
+\r
+        let curr_addr = Addr(*addr);\r
+        *addr += bytes.len() as u32;\r
+\r
+        let mut bytes_left = bytes;\r
+\r
+        // Do we have too little data in the buffer? If so, fill up the buffer\r
+        // to the minimum page size.\r
+        if buffer.len() < MIN_PAGE_SIZE {\r
+            let num_bytes_to_take = min(MIN_PAGE_SIZE - buffer.len(), bytes_left.len());\r
+            buffer.extend_from_slice(&bytes_left[..num_bytes_to_take]);\r
+            bytes_left = &bytes_left[num_bytes_to_take..];\r
+        }\r
+\r
+        if bytes_left.is_empty() {\r
+            return curr_addr;\r
+        }\r
+\r
+        // Make sure we flush the buffer before writing out any other pages.\r
+        self.flush(buffer);\r
+\r
+        for chunk in bytes_left.chunks(MAX_PAGE_SIZE) {\r
+            if chunk.len() == MAX_PAGE_SIZE {\r
+                // This chunk has the maximum size. It might or might not be the\r
+                // last one. In either case we want to write it to disk\r
+                // immediately because there is no reason to copy it to the\r
+                // buffer first.\r
+                self.write_page(chunk);\r
+            } else {\r
+                // This chunk is less than the chunk size that we requested, so\r
+                // it must be the last one. If it is big enough to warrant its\r
+                // own page, we write it to disk immediately. Otherwise, we copy\r
+                // it to the buffer.\r
+                if chunk.len() >= MIN_PAGE_SIZE {\r
+                    self.write_page(chunk);\r
+                } else {\r
+                    debug_assert!(buffer.is_empty());\r
+                    buffer.extend_from_slice(chunk);\r
+                }\r
+            }\r
+        }\r
+\r
+        curr_addr\r
+    }\r
+\r
+    pub fn as_std_write<'a>(&'a self) -> impl Write + 'a {\r
+        StdWriteAdapter(self)\r
+    }\r
+}\r
+\r
+impl Drop for SerializationSink {\r
+    fn drop(&mut self) {\r
+        let mut data = self.data.lock();\r
+        let SerializationSinkInner {\r
+            ref mut buffer,\r
+            addr: _,\r
+        } = *data;\r
+\r
+        self.flush(buffer);\r
+    }\r
+}\r
+\r
+#[cfg(test)]\r
+mod tests {\r
+    use super::*;\r
+\r
+    // This function writes `chunk_count` byte-slices of size `chunk_size` to\r
+    // three `SerializationSinks` that all map to the same underlying stream,\r
+    // so we get interleaved pages with different tags.\r
+    // It then extracts the data out again and asserts that it is the same as\r
+    // has been written.\r
+    fn test_roundtrip<W>(chunk_size: usize, chunk_count: usize, write: W)\r
+    where\r
+        W: Fn(&SerializationSink, &[u8]) -> Addr,\r
+    {\r
+        let sink_builder = SerializationSinkBuilder::new_in_memory();\r
+        let tags = [PageTag::Events, PageTag::StringData, PageTag::StringIndex];\r
+        let expected_chunk: Vec<u8> = (0..chunk_size).map(|x| (x % 239) as u8).collect();\r
+\r
+        {\r
+            let sinks: Vec<SerializationSink> =\r
+                tags.iter().map(|&tag| sink_builder.new_sink(tag)).collect();\r
+\r
+            for chunk_index in 0..chunk_count {\r
+                let expected_addr = Addr((chunk_index * chunk_size) as u32);\r
+                for sink in sinks.iter() {\r
+                    assert_eq!(write(sink, &expected_chunk[..]), expected_addr);\r
+                }\r
+            }\r
+        }\r
+\r
+        let streams: Vec<Vec<u8>> = tags\r
+            .iter()\r
+            .map(|&tag| sink_builder.0.copy_bytes_with_page_tag(tag))\r
+            .collect();\r
+\r
+        for stream in streams {\r
+            for chunk in stream.chunks(chunk_size) {\r
+                assert_eq!(chunk, expected_chunk);\r
+            }\r
+        }\r
+    }\r
+\r
+    fn write_closure(sink: &SerializationSink, bytes: &[u8]) -> Addr {\r
+        sink.write_atomic(bytes.len(), |dest| dest.copy_from_slice(bytes))\r
+    }\r
+\r
+    fn write_slice(sink: &SerializationSink, bytes: &[u8]) -> Addr {\r
+        sink.write_bytes_atomic(bytes)\r
+    }\r
+\r
+    // Creates two roundtrip tests, one using `SerializationSink::write_atomic`\r
+    // and one using `SerializationSink::write_bytes_atomic`.\r
+    macro_rules! mk_roundtrip_test {\r
+        ($name:ident, $chunk_size:expr, $chunk_count:expr) => {\r
+            mod $name {\r
+                use super::*;\r
+\r
+                #[test]\r
+                fn write_atomic() {\r
+                    test_roundtrip($chunk_size, $chunk_count, write_closure);\r
+                }\r
+\r
+                #[test]\r
+                fn write_bytes_atomic() {\r
+                    test_roundtrip($chunk_size, $chunk_count, write_slice);\r
+                }\r
+            }\r
+        };\r
+    }\r
+\r
+    mk_roundtrip_test!(small_data, 10, (90 * MAX_PAGE_SIZE) / 100);\r
+    mk_roundtrip_test!(huge_data, MAX_PAGE_SIZE * 10, 5);\r
+\r
+    mk_roundtrip_test!(exactly_max_page_size, MAX_PAGE_SIZE, 10);\r
+    mk_roundtrip_test!(max_page_size_plus_one, MAX_PAGE_SIZE + 1, 10);\r
+    mk_roundtrip_test!(max_page_size_minus_one, MAX_PAGE_SIZE - 1, 10);\r
+\r
+    mk_roundtrip_test!(exactly_min_page_size, MIN_PAGE_SIZE, 10);\r
+    mk_roundtrip_test!(min_page_size_plus_one, MIN_PAGE_SIZE + 1, 10);\r
+    mk_roundtrip_test!(min_page_size_minus_one, MIN_PAGE_SIZE - 1, 10);\r
+}\r