-/// This module implements the "container" file format that `measureme` uses for
-/// storing things on disk. The format supports storing three independent
-/// streams of data: one for events, one for string data, and one for string
-/// index data (in theory it could support an arbitrary number of separate
-/// streams but three is all we need). The data of each stream is split into
-/// "pages", where each page has a small header designating what kind of
-/// data it is (i.e. event, string data, or string index), and the length of
-/// the page.
-///
-/// Pages of different kinds can be arbitrarily interleaved. The headers allow
-/// for reconstructing each of the streams later on. An example file might thus
-/// look like this:
-///
-/// ```ignore
-/// | file header | page (events) | page (string data) | page (events) | page (string index) |
-/// ```
-///
-/// The exact encoding of a page is:
-///
-/// | byte slice | contents |
-/// |-------------------------|-----------------------------------------|
-/// | &[0 .. 1] | page tag |
-/// | &[1 .. 5] | page size as little endian u32 |
-/// | &[5 .. (5 + page_size)] | page contents (exactly page_size bytes) |
-///
-/// A page is immediately followed by the next page, without any padding.
-use parking_lot::Mutex;
-use rustc_hash::FxHashMap;
-use std::cmp::min;
-use std::convert::TryInto;
-use std::error::Error;
-use std::fmt::Debug;
-use std::fs;
-use std::io::Write;
-use std::sync::Arc;
-
-const MAX_PAGE_SIZE: usize = 256 * 1024;
-
-/// The number of bytes we consider enough to warrant their own page when
-/// deciding whether to flush a partially full buffer. Actual pages may need
-/// to be smaller, e.g. when writing the tail of the data stream.
-const MIN_PAGE_SIZE: usize = MAX_PAGE_SIZE / 2;
-
-#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
-#[repr(u8)]
-pub enum PageTag {
- Events = 0,
- StringData = 1,
- StringIndex = 2,
-}
-
-impl std::convert::TryFrom<u8> for PageTag {
- type Error = String;
-
- fn try_from(value: u8) -> Result<Self, Self::Error> {
- match value {
- 0 => Ok(PageTag::Events),
- 1 => Ok(PageTag::StringData),
- 2 => Ok(PageTag::StringIndex),
- _ => Err(format!("Could not convert byte `{}` to PageTag.", value)),
- }
- }
-}
-
-/// An address within a data stream. Each data stream has its own address space,
-/// i.e. the first piece of data written to the events stream will have
-/// `Addr(0)` and the first piece of data written to the string data stream
-/// will *also* have `Addr(0)`.
-//
-// TODO: Evaluate if it makes sense to add a type tag to `Addr` in order to
-// prevent accidental use of `Addr` values with the wrong address space.
-#[derive(Clone, Copy, Eq, PartialEq, Debug)]
-pub struct Addr(pub u32);
-
-impl Addr {
- pub fn as_usize(self) -> usize {
- self.0 as usize
- }
-}
-
-#[derive(Debug)]
-pub struct SerializationSink {
- shared_state: SharedState,
- data: Mutex<SerializationSinkInner>,
- page_tag: PageTag,
-}
-
-pub struct SerializationSinkBuilder(SharedState);
-
-impl SerializationSinkBuilder {
- pub fn new_from_file(file: fs::File) -> Result<Self, Box<dyn Error + Send + Sync>> {
- Ok(Self(SharedState(Arc::new(Mutex::new(
- BackingStorage::File(file),
- )))))
- }
-
- pub fn new_in_memory() -> SerializationSinkBuilder {
- Self(SharedState(Arc::new(Mutex::new(BackingStorage::Memory(
- Vec::new(),
- )))))
- }
-
- pub fn new_sink(&self, page_tag: PageTag) -> SerializationSink {
- SerializationSink {
- data: Mutex::new(SerializationSinkInner {
- buffer: Vec::with_capacity(MAX_PAGE_SIZE),
- addr: 0,
- }),
- shared_state: self.0.clone(),
- page_tag,
- }
- }
-}
-
-/// The `BackingStorage` is what the data gets written to. Usually that is a
-/// file but for testing purposes it can also be an in-memory vec of bytes.
-#[derive(Debug)]
-enum BackingStorage {
- File(fs::File),
- Memory(Vec<u8>),
-}
-
-impl Write for BackingStorage {
- #[inline]
- fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
- match *self {
- BackingStorage::File(ref mut file) => file.write(buf),
- BackingStorage::Memory(ref mut vec) => vec.write(buf),
- }
- }
-
- fn flush(&mut self) -> std::io::Result<()> {
- match *self {
- BackingStorage::File(ref mut file) => file.flush(),
- BackingStorage::Memory(_) => {
- // Nothing to do
- Ok(())
- }
- }
- }
-}
-
-/// This struct allows to treat `SerializationSink` as `std::io::Write`.
-pub struct StdWriteAdapter<'a>(&'a SerializationSink);
-
-impl<'a> Write for StdWriteAdapter<'a> {
- fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
- self.0.write_bytes_atomic(buf);
- Ok(buf.len())
- }
-
- fn flush(&mut self) -> std::io::Result<()> {
- let mut data = self.0.data.lock();
- let SerializationSinkInner {
- ref mut buffer,
- addr: _,
- } = *data;
-
- // First flush the local buffer.
- self.0.flush(buffer);
-
- // Then flush the backing store.
- self.0.shared_state.0.lock().flush()?;
-
- Ok(())
- }
-}
-
-#[derive(Debug)]
-struct SerializationSinkInner {
- buffer: Vec<u8>,
- addr: u32,
-}
-
-/// This state is shared between all `SerializationSink`s writing to the same
-/// backing storage (e.g. the same file).
-#[derive(Clone, Debug)]
-struct SharedState(Arc<Mutex<BackingStorage>>);
-
-impl SharedState {
- /// Copies out the contents of all pages with the given tag and
- /// concatenates them into a single byte vec. This method is only meant to
- /// be used for testing and will panic if the underlying backing storage is
- /// a file instead of in memory.
- fn copy_bytes_with_page_tag(&self, page_tag: PageTag) -> Vec<u8> {
- let data = self.0.lock();
- let data = match *data {
- BackingStorage::File(_) => panic!(),
- BackingStorage::Memory(ref data) => data,
- };
-
- split_streams(data).remove(&page_tag).unwrap_or(Vec::new())
- }
-}
-
-/// This function reconstructs the individual data streams from their paged
-/// version.
-///
-/// For example, if `E` denotes the page header of an events page, `S` denotes
-/// the header of a string data page, and lower case letters denote page
-/// contents then a paged stream could look like:
-///
-/// ```ignore
-/// s = Eabcd_Sopq_Eef_Eghi_Srst
-/// ```
-///
-/// and `split_streams` would result in the following set of streams:
-///
-/// ```ignore
-/// split_streams(s) = {
-/// events: [abcdefghi],
-/// string_data: [opqrst],
-/// }
-/// ```
-pub fn split_streams(paged_data: &[u8]) -> FxHashMap<PageTag, Vec<u8>> {
- let mut result: FxHashMap<PageTag, Vec<u8>> = FxHashMap::default();
-
- let mut pos = 0;
- while pos < paged_data.len() {
- let tag = TryInto::try_into(paged_data[pos]).unwrap();
- let page_size =
- u32::from_le_bytes(paged_data[pos + 1..pos + 5].try_into().unwrap()) as usize;
-
- assert!(page_size > 0);
-
- result
- .entry(tag)
- .or_default()
- .extend_from_slice(&paged_data[pos + 5..pos + 5 + page_size]);
-
- pos += page_size + 5;
- }
-
- result
-}
-
-impl SerializationSink {
- /// Writes `bytes` as a single page to the shared backing storage. The
- /// method will first write the page header (consisting of the page tag and
- /// the number of bytes in the page) and then the page contents
- /// (i.e. `bytes`).
- fn write_page(&self, bytes: &[u8]) {
- if bytes.len() > 0 {
- // We explicitly don't assert `bytes.len() >= MIN_PAGE_SIZE` because
- // `MIN_PAGE_SIZE` is just a recommendation and the last page will
- // often be smaller than that.
- assert!(bytes.len() <= MAX_PAGE_SIZE);
-
- let mut file = self.shared_state.0.lock();
-
- file.write_all(&[self.page_tag as u8]).unwrap();
-
- let page_size: [u8; 4] = (bytes.len() as u32).to_le_bytes();
- file.write_all(&page_size).unwrap();
- file.write_all(&bytes[..]).unwrap();
- }
- }
-
- /// Flushes `buffer` by writing its contents as a new page to the backing
- /// storage and then clearing it.
- fn flush(&self, buffer: &mut Vec<u8>) {
- self.write_page(&buffer[..]);
- buffer.clear();
- }
-
- /// Creates a copy of all data written so far. This method is meant to be
- /// used for writing unit tests. It will panic if the underlying
- /// `BackingStorage` is a file.
- pub fn into_bytes(mut self) -> Vec<u8> {
- // Swap out the contains of `self` with something that can safely be
- // dropped without side effects.
- let mut data = Mutex::new(SerializationSinkInner {
- buffer: Vec::new(),
- addr: 0,
- });
- std::mem::swap(&mut self.data, &mut data);
-
- // Extract the data from the mutex.
- let SerializationSinkInner {
- ref mut buffer,
- addr: _,
- } = data.into_inner();
-
- // Make sure we write the current contents of the buffer to the
- // backing storage before proceeding.
- self.flush(buffer);
-
- self.shared_state.copy_bytes_with_page_tag(self.page_tag)
- }
-
- /// Atomically writes `num_bytes` of data to this `SerializationSink`.
- /// Atomic means the data is guaranteed to be written as a contiguous range
- /// of bytes.
- ///
- /// The buffer provided to the `write` callback is guaranteed to be of size
- /// `num_bytes` and `write` is supposed to completely fill it with the data
- /// to be written.
- ///
- /// The return value is the address of the data written and can be used to
- /// refer to the data later on.
- pub fn write_atomic<W>(&self, num_bytes: usize, write: W) -> Addr
- where
- W: FnOnce(&mut [u8]),
- {
- if num_bytes > MAX_PAGE_SIZE {
- let mut bytes = vec![0u8; num_bytes];
- write(&mut bytes[..]);
- return self.write_bytes_atomic(&bytes[..]);
- }
-
- let mut data = self.data.lock();
- let SerializationSinkInner {
- ref mut buffer,
- ref mut addr,
- } = *data;
-
- if buffer.len() + num_bytes > MAX_PAGE_SIZE {
- self.flush(buffer);
- assert!(buffer.is_empty());
- }
-
- let curr_addr = *addr;
-
- let buf_start = buffer.len();
- let buf_end = buf_start + num_bytes;
- buffer.resize(buf_end, 0u8);
- write(&mut buffer[buf_start..buf_end]);
-
- *addr += num_bytes as u32;
-
- Addr(curr_addr)
- }
-
- /// Atomically writes the data in `bytes` to this `SerializationSink`.
- /// Atomic means the data is guaranteed to be written as a contiguous range
- /// of bytes.
- ///
- /// This method may perform better than `write_atomic` because it may be
- /// able to skip the sink's internal buffer. Use this method if the data to
- /// be written is already available as a `&[u8]`.
- ///
- /// The return value is the address of the data written and can be used to
- /// refer to the data later on.
- pub fn write_bytes_atomic(&self, bytes: &[u8]) -> Addr {
- // For "small" data we go to the buffered version immediately.
- if bytes.len() <= 128 {
- return self.write_atomic(bytes.len(), |sink| {
- sink.copy_from_slice(bytes);
- });
- }
-
- let mut data = self.data.lock();
- let SerializationSinkInner {
- ref mut buffer,
- ref mut addr,
- } = *data;
-
- let curr_addr = Addr(*addr);
- *addr += bytes.len() as u32;
-
- let mut bytes_left = bytes;
-
- // Do we have too little data in the buffer? If so, fill up the buffer
- // to the minimum page size.
- if buffer.len() < MIN_PAGE_SIZE {
- let num_bytes_to_take = min(MIN_PAGE_SIZE - buffer.len(), bytes_left.len());
- buffer.extend_from_slice(&bytes_left[..num_bytes_to_take]);
- bytes_left = &bytes_left[num_bytes_to_take..];
- }
-
- if bytes_left.is_empty() {
- return curr_addr;
- }
-
- // Make sure we flush the buffer before writing out any other pages.
- self.flush(buffer);
-
- for chunk in bytes_left.chunks(MAX_PAGE_SIZE) {
- if chunk.len() == MAX_PAGE_SIZE {
- // This chunk has the maximum size. It might or might not be the
- // last one. In either case we want to write it to disk
- // immediately because there is no reason to copy it to the
- // buffer first.
- self.write_page(chunk);
- } else {
- // This chunk is less than the chunk size that we requested, so
- // it must be the last one. If it is big enough to warrant its
- // own page, we write it to disk immediately. Otherwise, we copy
- // it to the buffer.
- if chunk.len() >= MIN_PAGE_SIZE {
- self.write_page(chunk);
- } else {
- debug_assert!(buffer.is_empty());
- buffer.extend_from_slice(chunk);
- }
- }
- }
-
- curr_addr
- }
-
- pub fn as_std_write<'a>(&'a self) -> impl Write + 'a {
- StdWriteAdapter(self)
- }
-}
-
-impl Drop for SerializationSink {
- fn drop(&mut self) {
- let mut data = self.data.lock();
- let SerializationSinkInner {
- ref mut buffer,
- addr: _,
- } = *data;
-
- self.flush(buffer);
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- // This function writes `chunk_count` byte-slices of size `chunk_size` to
- // three `SerializationSinks` that all map to the same underlying stream,
- // so we get interleaved pages with different tags.
- // It then extracts the data out again and asserts that it is the same as
- // has been written.
- fn test_roundtrip<W>(chunk_size: usize, chunk_count: usize, write: W)
- where
- W: Fn(&SerializationSink, &[u8]) -> Addr,
- {
- let sink_builder = SerializationSinkBuilder::new_in_memory();
- let tags = [PageTag::Events, PageTag::StringData, PageTag::StringIndex];
- let expected_chunk: Vec<u8> = (0..chunk_size).map(|x| (x % 239) as u8).collect();
-
- {
- let sinks: Vec<SerializationSink> =
- tags.iter().map(|&tag| sink_builder.new_sink(tag)).collect();
-
- for chunk_index in 0..chunk_count {
- let expected_addr = Addr((chunk_index * chunk_size) as u32);
- for sink in sinks.iter() {
- assert_eq!(write(sink, &expected_chunk[..]), expected_addr);
- }
- }
- }
-
- let streams: Vec<Vec<u8>> = tags
- .iter()
- .map(|&tag| sink_builder.0.copy_bytes_with_page_tag(tag))
- .collect();
-
- for stream in streams {
- for chunk in stream.chunks(chunk_size) {
- assert_eq!(chunk, expected_chunk);
- }
- }
- }
-
- fn write_closure(sink: &SerializationSink, bytes: &[u8]) -> Addr {
- sink.write_atomic(bytes.len(), |dest| dest.copy_from_slice(bytes))
- }
-
- fn write_slice(sink: &SerializationSink, bytes: &[u8]) -> Addr {
- sink.write_bytes_atomic(bytes)
- }
-
- // Creates two roundtrip tests, one using `SerializationSink::write_atomic`
- // and one using `SerializationSink::write_bytes_atomic`.
- macro_rules! mk_roundtrip_test {
- ($name:ident, $chunk_size:expr, $chunk_count:expr) => {
- mod $name {
- use super::*;
-
- #[test]
- fn write_atomic() {
- test_roundtrip($chunk_size, $chunk_count, write_closure);
- }
-
- #[test]
- fn write_bytes_atomic() {
- test_roundtrip($chunk_size, $chunk_count, write_slice);
- }
- }
- };
- }
-
- mk_roundtrip_test!(small_data, 10, (90 * MAX_PAGE_SIZE) / 100);
- mk_roundtrip_test!(huge_data, MAX_PAGE_SIZE * 10, 5);
-
- mk_roundtrip_test!(exactly_max_page_size, MAX_PAGE_SIZE, 10);
- mk_roundtrip_test!(max_page_size_plus_one, MAX_PAGE_SIZE + 1, 10);
- mk_roundtrip_test!(max_page_size_minus_one, MAX_PAGE_SIZE - 1, 10);
-
- mk_roundtrip_test!(exactly_min_page_size, MIN_PAGE_SIZE, 10);
- mk_roundtrip_test!(min_page_size_plus_one, MIN_PAGE_SIZE + 1, 10);
- mk_roundtrip_test!(min_page_size_minus_one, MIN_PAGE_SIZE - 1, 10);
-}
+/// This module implements the "container" file format that `measureme` uses for\r
+/// storing things on disk. The format supports storing three independent\r
+/// streams of data: one for events, one for string data, and one for string\r
+/// index data (in theory it could support an arbitrary number of separate\r
+/// streams but three is all we need). The data of each stream is split into\r
+/// "pages", where each page has a small header designating what kind of\r
+/// data it is (i.e. event, string data, or string index), and the length of\r
+/// the page.\r
+///\r
+/// Pages of different kinds can be arbitrarily interleaved. The headers allow\r
+/// for reconstructing each of the streams later on. An example file might thus\r
+/// look like this:\r
+///\r
+/// ```ignore\r
+/// | file header | page (events) | page (string data) | page (events) | page (string index) |\r
+/// ```\r
+///\r
+/// The exact encoding of a page is:\r
+///\r
+/// | byte slice | contents |\r
+/// |-------------------------|-----------------------------------------|\r
+/// | &[0 .. 1] | page tag |\r
+/// | &[1 .. 5] | page size as little endian u32 |\r
+/// | &[5 .. (5 + page_size)] | page contents (exactly page_size bytes) |\r
+///\r
+/// A page is immediately followed by the next page, without any padding.\r
+use parking_lot::Mutex;\r
+use rustc_hash::FxHashMap;\r
+use std::cmp::min;\r
+use std::convert::TryInto;\r
+use std::error::Error;\r
+use std::fmt::Debug;\r
+use std::fs;\r
+use std::io::Write;\r
+use std::sync::Arc;\r
+\r
+const MAX_PAGE_SIZE: usize = 256 * 1024;\r
+\r
+/// The number of bytes we consider enough to warrant their own page when\r
+/// deciding whether to flush a partially full buffer. Actual pages may need\r
+/// to be smaller, e.g. when writing the tail of the data stream.\r
+const MIN_PAGE_SIZE: usize = MAX_PAGE_SIZE / 2;\r
+\r
+#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]\r
+#[repr(u8)]\r
+pub enum PageTag {\r
+ Events = 0,\r
+ StringData = 1,\r
+ StringIndex = 2,\r
+}\r
+\r
+impl std::convert::TryFrom<u8> for PageTag {\r
+ type Error = String;\r
+\r
+ fn try_from(value: u8) -> Result<Self, Self::Error> {\r
+ match value {\r
+ 0 => Ok(PageTag::Events),\r
+ 1 => Ok(PageTag::StringData),\r
+ 2 => Ok(PageTag::StringIndex),\r
+ _ => Err(format!("Could not convert byte `{}` to PageTag.", value)),\r
+ }\r
+ }\r
+}\r
+\r
+/// An address within a data stream. Each data stream has its own address space,\r
+/// i.e. the first piece of data written to the events stream will have\r
+/// `Addr(0)` and the first piece of data written to the string data stream\r
+/// will *also* have `Addr(0)`.\r
+//\r
+// TODO: Evaluate if it makes sense to add a type tag to `Addr` in order to\r
+// prevent accidental use of `Addr` values with the wrong address space.\r
+#[derive(Clone, Copy, Eq, PartialEq, Debug)]\r
+pub struct Addr(pub u32);\r
+\r
+impl Addr {\r
+ pub fn as_usize(self) -> usize {\r
+ self.0 as usize\r
+ }\r
+}\r
+\r
+#[derive(Debug)]\r
+pub struct SerializationSink {\r
+ shared_state: SharedState,\r
+ data: Mutex<SerializationSinkInner>,\r
+ page_tag: PageTag,\r
+}\r
+\r
+pub struct SerializationSinkBuilder(SharedState);\r
+\r
+impl SerializationSinkBuilder {\r
+ pub fn new_from_file(file: fs::File) -> Result<Self, Box<dyn Error + Send + Sync>> {\r
+ Ok(Self(SharedState(Arc::new(Mutex::new(\r
+ BackingStorage::File(file),\r
+ )))))\r
+ }\r
+\r
+ pub fn new_in_memory() -> SerializationSinkBuilder {\r
+ Self(SharedState(Arc::new(Mutex::new(BackingStorage::Memory(\r
+ Vec::new(),\r
+ )))))\r
+ }\r
+\r
+ pub fn new_sink(&self, page_tag: PageTag) -> SerializationSink {\r
+ SerializationSink {\r
+ data: Mutex::new(SerializationSinkInner {\r
+ buffer: Vec::with_capacity(MAX_PAGE_SIZE),\r
+ addr: 0,\r
+ }),\r
+ shared_state: self.0.clone(),\r
+ page_tag,\r
+ }\r
+ }\r
+}\r
+\r
+/// The `BackingStorage` is what the data gets written to. Usually that is a\r
+/// file but for testing purposes it can also be an in-memory vec of bytes.\r
+#[derive(Debug)]\r
+enum BackingStorage {\r
+ File(fs::File),\r
+ Memory(Vec<u8>),\r
+}\r
+\r
+impl Write for BackingStorage {\r
+ #[inline]\r
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {\r
+ match *self {\r
+ BackingStorage::File(ref mut file) => file.write(buf),\r
+ BackingStorage::Memory(ref mut vec) => vec.write(buf),\r
+ }\r
+ }\r
+\r
+ fn flush(&mut self) -> std::io::Result<()> {\r
+ match *self {\r
+ BackingStorage::File(ref mut file) => file.flush(),\r
+ BackingStorage::Memory(_) => {\r
+ // Nothing to do\r
+ Ok(())\r
+ }\r
+ }\r
+ }\r
+}\r
+\r
+/// This struct allows to treat `SerializationSink` as `std::io::Write`.\r
+pub struct StdWriteAdapter<'a>(&'a SerializationSink);\r
+\r
+impl<'a> Write for StdWriteAdapter<'a> {\r
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {\r
+ self.0.write_bytes_atomic(buf);\r
+ Ok(buf.len())\r
+ }\r
+\r
+ fn flush(&mut self) -> std::io::Result<()> {\r
+ let mut data = self.0.data.lock();\r
+ let SerializationSinkInner {\r
+ ref mut buffer,\r
+ addr: _,\r
+ } = *data;\r
+\r
+ // First flush the local buffer.\r
+ self.0.flush(buffer);\r
+\r
+ // Then flush the backing store.\r
+ self.0.shared_state.0.lock().flush()?;\r
+\r
+ Ok(())\r
+ }\r
+}\r
+\r
+#[derive(Debug)]\r
+struct SerializationSinkInner {\r
+ buffer: Vec<u8>,\r
+ addr: u32,\r
+}\r
+\r
+/// This state is shared between all `SerializationSink`s writing to the same\r
+/// backing storage (e.g. the same file).\r
+#[derive(Clone, Debug)]\r
+struct SharedState(Arc<Mutex<BackingStorage>>);\r
+\r
+impl SharedState {\r
+ /// Copies out the contents of all pages with the given tag and\r
+ /// concatenates them into a single byte vec. This method is only meant to\r
+ /// be used for testing and will panic if the underlying backing storage is\r
+ /// a file instead of in memory.\r
+ fn copy_bytes_with_page_tag(&self, page_tag: PageTag) -> Vec<u8> {\r
+ let data = self.0.lock();\r
+ let data = match *data {\r
+ BackingStorage::File(_) => panic!(),\r
+ BackingStorage::Memory(ref data) => data,\r
+ };\r
+\r
+ split_streams(data).remove(&page_tag).unwrap_or(Vec::new())\r
+ }\r
+}\r
+\r
+/// This function reconstructs the individual data streams from their paged\r
+/// version.\r
+///\r
+/// For example, if `E` denotes the page header of an events page, `S` denotes\r
+/// the header of a string data page, and lower case letters denote page\r
+/// contents then a paged stream could look like:\r
+///\r
+/// ```ignore\r
+/// s = Eabcd_Sopq_Eef_Eghi_Srst\r
+/// ```\r
+///\r
+/// and `split_streams` would result in the following set of streams:\r
+///\r
+/// ```ignore\r
+/// split_streams(s) = {\r
+/// events: [abcdefghi],\r
+/// string_data: [opqrst],\r
+/// }\r
+/// ```\r
+pub fn split_streams(paged_data: &[u8]) -> FxHashMap<PageTag, Vec<u8>> {\r
+ let mut result: FxHashMap<PageTag, Vec<u8>> = FxHashMap::default();\r
+\r
+ let mut pos = 0;\r
+ while pos < paged_data.len() {\r
+ let tag = TryInto::try_into(paged_data[pos]).unwrap();\r
+ let page_size =\r
+ u32::from_le_bytes(paged_data[pos + 1..pos + 5].try_into().unwrap()) as usize;\r
+\r
+ assert!(page_size > 0);\r
+\r
+ result\r
+ .entry(tag)\r
+ .or_default()\r
+ .extend_from_slice(&paged_data[pos + 5..pos + 5 + page_size]);\r
+\r
+ pos += page_size + 5;\r
+ }\r
+\r
+ result\r
+}\r
+\r
+impl SerializationSink {\r
+ /// Writes `bytes` as a single page to the shared backing storage. The\r
+ /// method will first write the page header (consisting of the page tag and\r
+ /// the number of bytes in the page) and then the page contents\r
+ /// (i.e. `bytes`).\r
+ fn write_page(&self, bytes: &[u8]) {\r
+ if bytes.len() > 0 {\r
+ // We explicitly don't assert `bytes.len() >= MIN_PAGE_SIZE` because\r
+ // `MIN_PAGE_SIZE` is just a recommendation and the last page will\r
+ // often be smaller than that.\r
+ assert!(bytes.len() <= MAX_PAGE_SIZE);\r
+\r
+ let mut file = self.shared_state.0.lock();\r
+\r
+ file.write_all(&[self.page_tag as u8]).unwrap();\r
+\r
+ let page_size: [u8; 4] = (bytes.len() as u32).to_le_bytes();\r
+ file.write_all(&page_size).unwrap();\r
+ file.write_all(&bytes[..]).unwrap();\r
+ }\r
+ }\r
+\r
+ /// Flushes `buffer` by writing its contents as a new page to the backing\r
+ /// storage and then clearing it.\r
+ fn flush(&self, buffer: &mut Vec<u8>) {\r
+ self.write_page(&buffer[..]);\r
+ buffer.clear();\r
+ }\r
+\r
+ /// Creates a copy of all data written so far. This method is meant to be\r
+ /// used for writing unit tests. It will panic if the underlying\r
+ /// `BackingStorage` is a file.\r
+ pub fn into_bytes(mut self) -> Vec<u8> {\r
+ // Swap out the contains of `self` with something that can safely be\r
+ // dropped without side effects.\r
+ let mut data = Mutex::new(SerializationSinkInner {\r
+ buffer: Vec::new(),\r
+ addr: 0,\r
+ });\r
+ std::mem::swap(&mut self.data, &mut data);\r
+\r
+ // Extract the data from the mutex.\r
+ let SerializationSinkInner {\r
+ ref mut buffer,\r
+ addr: _,\r
+ } = data.into_inner();\r
+\r
+ // Make sure we write the current contents of the buffer to the\r
+ // backing storage before proceeding.\r
+ self.flush(buffer);\r
+\r
+ self.shared_state.copy_bytes_with_page_tag(self.page_tag)\r
+ }\r
+\r
+ /// Atomically writes `num_bytes` of data to this `SerializationSink`.\r
+ /// Atomic means the data is guaranteed to be written as a contiguous range\r
+ /// of bytes.\r
+ ///\r
+ /// The buffer provided to the `write` callback is guaranteed to be of size\r
+ /// `num_bytes` and `write` is supposed to completely fill it with the data\r
+ /// to be written.\r
+ ///\r
+ /// The return value is the address of the data written and can be used to\r
+ /// refer to the data later on.\r
+ pub fn write_atomic<W>(&self, num_bytes: usize, write: W) -> Addr\r
+ where\r
+ W: FnOnce(&mut [u8]),\r
+ {\r
+ if num_bytes > MAX_PAGE_SIZE {\r
+ let mut bytes = vec![0u8; num_bytes];\r
+ write(&mut bytes[..]);\r
+ return self.write_bytes_atomic(&bytes[..]);\r
+ }\r
+\r
+ let mut data = self.data.lock();\r
+ let SerializationSinkInner {\r
+ ref mut buffer,\r
+ ref mut addr,\r
+ } = *data;\r
+\r
+ if buffer.len() + num_bytes > MAX_PAGE_SIZE {\r
+ self.flush(buffer);\r
+ assert!(buffer.is_empty());\r
+ }\r
+\r
+ let curr_addr = *addr;\r
+\r
+ let buf_start = buffer.len();\r
+ let buf_end = buf_start + num_bytes;\r
+ buffer.resize(buf_end, 0u8);\r
+ write(&mut buffer[buf_start..buf_end]);\r
+\r
+ *addr += num_bytes as u32;\r
+\r
+ Addr(curr_addr)\r
+ }\r
+\r
+ /// Atomically writes the data in `bytes` to this `SerializationSink`.\r
+ /// Atomic means the data is guaranteed to be written as a contiguous range\r
+ /// of bytes.\r
+ ///\r
+ /// This method may perform better than `write_atomic` because it may be\r
+ /// able to skip the sink's internal buffer. Use this method if the data to\r
+ /// be written is already available as a `&[u8]`.\r
+ ///\r
+ /// The return value is the address of the data written and can be used to\r
+ /// refer to the data later on.\r
+ pub fn write_bytes_atomic(&self, bytes: &[u8]) -> Addr {\r
+ // For "small" data we go to the buffered version immediately.\r
+ if bytes.len() <= 128 {\r
+ return self.write_atomic(bytes.len(), |sink| {\r
+ sink.copy_from_slice(bytes);\r
+ });\r
+ }\r
+\r
+ let mut data = self.data.lock();\r
+ let SerializationSinkInner {\r
+ ref mut buffer,\r
+ ref mut addr,\r
+ } = *data;\r
+\r
+ let curr_addr = Addr(*addr);\r
+ *addr += bytes.len() as u32;\r
+\r
+ let mut bytes_left = bytes;\r
+\r
+ // Do we have too little data in the buffer? If so, fill up the buffer\r
+ // to the minimum page size.\r
+ if buffer.len() < MIN_PAGE_SIZE {\r
+ let num_bytes_to_take = min(MIN_PAGE_SIZE - buffer.len(), bytes_left.len());\r
+ buffer.extend_from_slice(&bytes_left[..num_bytes_to_take]);\r
+ bytes_left = &bytes_left[num_bytes_to_take..];\r
+ }\r
+\r
+ if bytes_left.is_empty() {\r
+ return curr_addr;\r
+ }\r
+\r
+ // Make sure we flush the buffer before writing out any other pages.\r
+ self.flush(buffer);\r
+\r
+ for chunk in bytes_left.chunks(MAX_PAGE_SIZE) {\r
+ if chunk.len() == MAX_PAGE_SIZE {\r
+ // This chunk has the maximum size. It might or might not be the\r
+ // last one. In either case we want to write it to disk\r
+ // immediately because there is no reason to copy it to the\r
+ // buffer first.\r
+ self.write_page(chunk);\r
+ } else {\r
+ // This chunk is less than the chunk size that we requested, so\r
+ // it must be the last one. If it is big enough to warrant its\r
+ // own page, we write it to disk immediately. Otherwise, we copy\r
+ // it to the buffer.\r
+ if chunk.len() >= MIN_PAGE_SIZE {\r
+ self.write_page(chunk);\r
+ } else {\r
+ debug_assert!(buffer.is_empty());\r
+ buffer.extend_from_slice(chunk);\r
+ }\r
+ }\r
+ }\r
+\r
+ curr_addr\r
+ }\r
+\r
+ pub fn as_std_write<'a>(&'a self) -> impl Write + 'a {\r
+ StdWriteAdapter(self)\r
+ }\r
+}\r
+\r
+impl Drop for SerializationSink {\r
+ fn drop(&mut self) {\r
+ let mut data = self.data.lock();\r
+ let SerializationSinkInner {\r
+ ref mut buffer,\r
+ addr: _,\r
+ } = *data;\r
+\r
+ self.flush(buffer);\r
+ }\r
+}\r
+\r
+#[cfg(test)]\r
+mod tests {\r
+ use super::*;\r
+\r
+ // This function writes `chunk_count` byte-slices of size `chunk_size` to\r
+ // three `SerializationSinks` that all map to the same underlying stream,\r
+ // so we get interleaved pages with different tags.\r
+ // It then extracts the data out again and asserts that it is the same as\r
+ // has been written.\r
+ fn test_roundtrip<W>(chunk_size: usize, chunk_count: usize, write: W)\r
+ where\r
+ W: Fn(&SerializationSink, &[u8]) -> Addr,\r
+ {\r
+ let sink_builder = SerializationSinkBuilder::new_in_memory();\r
+ let tags = [PageTag::Events, PageTag::StringData, PageTag::StringIndex];\r
+ let expected_chunk: Vec<u8> = (0..chunk_size).map(|x| (x % 239) as u8).collect();\r
+\r
+ {\r
+ let sinks: Vec<SerializationSink> =\r
+ tags.iter().map(|&tag| sink_builder.new_sink(tag)).collect();\r
+\r
+ for chunk_index in 0..chunk_count {\r
+ let expected_addr = Addr((chunk_index * chunk_size) as u32);\r
+ for sink in sinks.iter() {\r
+ assert_eq!(write(sink, &expected_chunk[..]), expected_addr);\r
+ }\r
+ }\r
+ }\r
+\r
+ let streams: Vec<Vec<u8>> = tags\r
+ .iter()\r
+ .map(|&tag| sink_builder.0.copy_bytes_with_page_tag(tag))\r
+ .collect();\r
+\r
+ for stream in streams {\r
+ for chunk in stream.chunks(chunk_size) {\r
+ assert_eq!(chunk, expected_chunk);\r
+ }\r
+ }\r
+ }\r
+\r
+ fn write_closure(sink: &SerializationSink, bytes: &[u8]) -> Addr {\r
+ sink.write_atomic(bytes.len(), |dest| dest.copy_from_slice(bytes))\r
+ }\r
+\r
+ fn write_slice(sink: &SerializationSink, bytes: &[u8]) -> Addr {\r
+ sink.write_bytes_atomic(bytes)\r
+ }\r
+\r
+ // Creates two roundtrip tests, one using `SerializationSink::write_atomic`\r
+ // and one using `SerializationSink::write_bytes_atomic`.\r
+ macro_rules! mk_roundtrip_test {\r
+ ($name:ident, $chunk_size:expr, $chunk_count:expr) => {\r
+ mod $name {\r
+ use super::*;\r
+\r
+ #[test]\r
+ fn write_atomic() {\r
+ test_roundtrip($chunk_size, $chunk_count, write_closure);\r
+ }\r
+\r
+ #[test]\r
+ fn write_bytes_atomic() {\r
+ test_roundtrip($chunk_size, $chunk_count, write_slice);\r
+ }\r
+ }\r
+ };\r
+ }\r
+\r
+ mk_roundtrip_test!(small_data, 10, (90 * MAX_PAGE_SIZE) / 100);\r
+ mk_roundtrip_test!(huge_data, MAX_PAGE_SIZE * 10, 5);\r
+\r
+ mk_roundtrip_test!(exactly_max_page_size, MAX_PAGE_SIZE, 10);\r
+ mk_roundtrip_test!(max_page_size_plus_one, MAX_PAGE_SIZE + 1, 10);\r
+ mk_roundtrip_test!(max_page_size_minus_one, MAX_PAGE_SIZE - 1, 10);\r
+\r
+ mk_roundtrip_test!(exactly_min_page_size, MIN_PAGE_SIZE, 10);\r
+ mk_roundtrip_test!(min_page_size_plus_one, MIN_PAGE_SIZE + 1, 10);\r
+ mk_roundtrip_test!(min_page_size_minus_one, MIN_PAGE_SIZE - 1, 10);\r
+}\r