]> git.proxmox.com Git - proxmox-backup.git/blame - pbs-datastore/src/chunker.rs
update to rrd-api-types 1.0.2
[proxmox-backup.git] / pbs-datastore / src / chunker.rs
CommitLineData
88ef759c
CE
1use std::sync::mpsc::Receiver;
2
06178f13
WB
3/// Note: window size 32 or 64, is faster because we can
4/// speedup modulo operations, but always computes hash 0
5/// for constant data streams .. 0,0,0,0,0,0
6/// so we use a modified the chunk boundary test too not
7/// use hash value 0 to detect a boundary.
8const CA_CHUNKER_WINDOW_SIZE: usize = 64;
9
e3218156
CE
10/// Additional context for chunker to find possible boundaries in payload streams
11#[derive(Default)]
12pub struct Context {
13 /// Already consumed bytes of the chunk stream consumer
14 pub base: u64,
15 /// Total size currently buffered
16 pub total: u64,
17}
18
19pub trait Chunker {
20 fn scan(&mut self, data: &[u8], ctx: &Context) -> usize;
21 fn reset(&mut self);
22}
23
2ff4c2cd 24/// Sliding window chunker (Buzhash)
06178f13
WB
25///
26/// This is a rewrite of *casync* chunker (cachunker.h) in rust.
27///
28/// Hashing by cyclic polynomial (also called Buzhash) has the benefit
29/// of avoiding multiplications, using barrel shifts instead. For more
30/// information please take a look at the [Rolling
2ff4c2cd
TL
31/// Hash](https://en.wikipedia.org/wiki/Rolling_hash) article from
32/// Wikipedia.
06178f13 33
e3218156 34pub struct ChunkerImpl {
06178f13
WB
35 h: u32,
36 window_size: usize,
37 chunk_size: usize,
38
39 chunk_size_min: usize,
40 chunk_size_max: usize,
41 _chunk_size_avg: usize,
42
43 _discriminator: u32,
44
45 break_test_mask: u32,
46 break_test_minimum: u32,
47
48 window: [u8; CA_CHUNKER_WINDOW_SIZE],
49}
50
88ef759c
CE
51/// Sliding window chunker (Buzhash) with boundary suggestions
52///
53/// Suggest to chunk at a given boundary instead of the regular chunk boundary for better alignment
54/// with file payload boundaries.
55pub struct PayloadChunker {
56 chunker: ChunkerImpl,
57 current_suggested: Option<u64>,
58 suggested_boundaries: Receiver<u64>,
59}
60
06178f13 61const BUZHASH_TABLE: [u32; 256] = [
7d83440c
WB
62 0x458be752, 0xc10748cc, 0xfbbcdbb8, 0x6ded5b68, 0xb10a82b5, 0x20d75648, 0xdfc5665f, 0xa8428801,
63 0x7ebf5191, 0x841135c7, 0x65cc53b3, 0x280a597c, 0x16f60255, 0xc78cbc3e, 0x294415f5, 0xb938d494,
64 0xec85c4e6, 0xb7d33edc, 0xe549b544, 0xfdeda5aa, 0x882bf287, 0x3116737c, 0x05569956, 0xe8cc1f68,
65 0x0806ac5e, 0x22a14443, 0x15297e10, 0x50d090e7, 0x4ba60f6f, 0xefd9f1a7, 0x5c5c885c, 0x82482f93,
66 0x9bfd7c64, 0x0b3e7276, 0xf2688e77, 0x8fad8abc, 0xb0509568, 0xf1ada29f, 0xa53efdfe, 0xcb2b1d00,
67 0xf2a9e986, 0x6463432b, 0x95094051, 0x5a223ad2, 0x9be8401b, 0x61e579cb, 0x1a556a14, 0x5840fdc2,
68 0x9261ddf6, 0xcde002bb, 0x52432bb0, 0xbf17373e, 0x7b7c222f, 0x2955ed16, 0x9f10ca59, 0xe840c4c9,
69 0xccabd806, 0x14543f34, 0x1462417a, 0x0d4a1f9c, 0x087ed925, 0xd7f8f24c, 0x7338c425, 0xcf86c8f5,
70 0xb19165cd, 0x9891c393, 0x325384ac, 0x0308459d, 0x86141d7e, 0xc922116a, 0xe2ffa6b6, 0x53f52aed,
71 0x2cd86197, 0xf5b9f498, 0xbf319c8f, 0xe0411fae, 0x977eb18c, 0xd8770976, 0x9833466a, 0xc674df7f,
72 0x8c297d45, 0x8ca48d26, 0xc49ed8e2, 0x7344f874, 0x556f79c7, 0x6b25eaed, 0xa03e2b42, 0xf68f66a4,
73 0x8e8b09a2, 0xf2e0e62a, 0x0d3a9806, 0x9729e493, 0x8c72b0fc, 0x160b94f6, 0x450e4d3d, 0x7a320e85,
74 0xbef8f0e1, 0x21d73653, 0x4e3d977a, 0x1e7b3929, 0x1cc6c719, 0xbe478d53, 0x8d752809, 0xe6d8c2c6,
75 0x275f0892, 0xc8acc273, 0x4cc21580, 0xecc4a617, 0xf5f7be70, 0xe795248a, 0x375a2fe9, 0x425570b6,
76 0x8898dcf8, 0xdc2d97c4, 0x0106114b, 0x364dc22f, 0x1e0cad1f, 0xbe63803c, 0x5f69fac2, 0x4d5afa6f,
77 0x1bc0dfb5, 0xfb273589, 0x0ea47f7b, 0x3c1c2b50, 0x21b2a932, 0x6b1223fd, 0x2fe706a8, 0xf9bd6ce2,
78 0xa268e64e, 0xe987f486, 0x3eacf563, 0x1ca2018c, 0x65e18228, 0x2207360a, 0x57cf1715, 0x34c37d2b,
79 0x1f8f3cde, 0x93b657cf, 0x31a019fd, 0xe69eb729, 0x8bca7b9b, 0x4c9d5bed, 0x277ebeaf, 0xe0d8f8ae,
80 0xd150821c, 0x31381871, 0xafc3f1b0, 0x927db328, 0xe95effac, 0x305a47bd, 0x426ba35b, 0x1233af3f,
81 0x686a5b83, 0x50e072e5, 0xd9d3bb2a, 0x8befc475, 0x487f0de6, 0xc88dff89, 0xbd664d5e, 0x971b5d18,
82 0x63b14847, 0xd7d3c1ce, 0x7f583cf3, 0x72cbcb09, 0xc0d0a81c, 0x7fa3429b, 0xe9158a1b, 0x225ea19a,
83 0xd8ca9ea3, 0xc763b282, 0xbb0c6341, 0x020b8293, 0xd4cd299d, 0x58cfa7f8, 0x91b4ee53, 0x37e4d140,
84 0x95ec764c, 0x30f76b06, 0x5ee68d24, 0x679c8661, 0xa41979c2, 0xf2b61284, 0x4fac1475, 0x0adb49f9,
85 0x19727a23, 0x15a7e374, 0xc43a18d5, 0x3fb1aa73, 0x342fc615, 0x924c0793, 0xbee2d7f0, 0x8a279de9,
86 0x4aa2d70c, 0xe24dd37f, 0xbe862c0b, 0x177c22c2, 0x5388e5ee, 0xcd8a7510, 0xf901b4fd, 0xdbc13dbc,
87 0x6c0bae5b, 0x64efe8c7, 0x48b02079, 0x80331a49, 0xca3d8ae6, 0xf3546190, 0xfed7108b, 0xc49b941b,
88 0x32baf4a9, 0xeb833a4a, 0x88a3f1a5, 0x3a91ce0a, 0x3cc27da1, 0x7112e684, 0x4a3096b1, 0x3794574c,
89 0xa3c8b6f3, 0x1d213941, 0x6e0a2e00, 0x233479f1, 0x0f4cd82f, 0x6093edd2, 0x5d7d209e, 0x464fe319,
90 0xd4dcac9e, 0x0db845cb, 0xfb5e4bc3, 0xe0256ce1, 0x09fb4ed1, 0x0914be1e, 0xa5bdb2c3, 0xc6eb57bb,
91 0x30320350, 0x3f397e91, 0xa67791bc, 0x86bc0e2c, 0xefa0a7e2, 0xe9ff7543, 0xe733612c, 0xd185897b,
92 0x329e5388, 0x91dd236b, 0x2ecb0d93, 0xf4d82a3d, 0x35b5c03f, 0xe4e606f0, 0x05b21843, 0x37b45964,
93 0x5eff22f4, 0x6027f4cc, 0x77178b3c, 0xae507131, 0x7bf7cabc, 0xf9c18d66, 0x593ade65, 0xd95ddf11,
06178f13
WB
94];
95
e3218156 96impl ChunkerImpl {
06178f13
WB
97 /// Create a new Chunker instance, which produces and average
98 /// chunk size of `chunk_size_avg` (need to be a power of two). We
99 /// allow variation from `chunk_size_avg/4` up to a maximum of
100 /// `chunk_size_avg*4`.
101 pub fn new(chunk_size_avg: usize) -> Self {
102 // The chunk cut discriminator. In order to get an average
103 // chunk size of avg, we cut whenever for a hash value "h" at
104 // byte "i" given the descriminator "d(avg)": h(i) mod d(avg)
105 // == d(avg) - 1. Note that the discriminator calculated like
106 // this only yields correct results as long as the minimal
107 // chunk size is picked as avg/4, and the maximum chunk size
108 // as avg*4. If they are picked differently the result might
109 // be skewed into either direction.
110 let avg = chunk_size_avg as f64;
111 let discriminator = (avg / (-1.42888852e-7 * avg + 1.33237515)) as u32;
112
113 if chunk_size_avg.count_ones() != 1 {
114 panic!("got unexpected chunk size - not a power of two.");
115 }
116
7d83440c
WB
117 let break_test_mask = (chunk_size_avg * 2 - 1) as u32;
118 let break_test_minimum = break_test_mask - 2;
06178f13
WB
119
120 Self {
121 h: 0,
122 window_size: 0,
123 chunk_size: 0,
7d83440c
WB
124 chunk_size_min: chunk_size_avg >> 2,
125 chunk_size_max: chunk_size_avg << 2,
06178f13 126 _chunk_size_avg: chunk_size_avg,
7d83440c 127 _discriminator: discriminator,
653b1ca1
WB
128 break_test_mask,
129 break_test_minimum,
06178f13
WB
130 window: [0u8; CA_CHUNKER_WINDOW_SIZE],
131 }
132 }
133
e3218156
CE
134 // fast implementation avoiding modulo
135 // #[inline(always)]
136 fn shall_break(&self) -> bool {
137 if self.chunk_size >= self.chunk_size_max {
138 return true;
139 }
140
141 if self.chunk_size < self.chunk_size_min {
142 return false;
143 }
144
145 //(self.h & 0x1ffff) <= 2 //THIS IS SLOW!!!
146
147 //(self.h & self.break_test_mask) <= 2 // Bad on 0 streams
148
149 (self.h & self.break_test_mask) >= self.break_test_minimum
150 }
151
152 // This is the original implementation from casync
153 /*
154 #[inline(always)]
155 fn shall_break_orig(&self) -> bool {
156
157 if self.chunk_size >= self.chunk_size_max { return true; }
158
159 if self.chunk_size < self.chunk_size_min { return false; }
160
161 (self.h % self.discriminator) == (self.discriminator - 1)
162 }
163 */
164}
165
166impl Chunker for ChunkerImpl {
06178f13
WB
167 /// Scans the specified data for a chunk border. Returns 0 if none
168 /// was found (and the function should be called with more data
169 /// later on), or another value indicating the position of a
170 /// border.
e3218156 171 fn scan(&mut self, data: &[u8], _ctx: &Context) -> usize {
06178f13
WB
172 let window_len = self.window.len();
173 let data_len = data.len();
174
175 let mut pos = 0;
176
7d83440c
WB
177 if self.window_size < window_len {
178 let need = window_len - self.window_size;
06178f13
WB
179 let copy_len = if need < data_len { need } else { data_len };
180
181 for _i in 0..copy_len {
182 let byte = data[pos];
183 self.window[self.window_size] = byte;
184 self.h = self.h.rotate_left(1) ^ BUZHASH_TABLE[byte as usize];
185 pos += 1;
186 self.window_size += 1;
187 }
188
189 self.chunk_size += copy_len;
190
191 // return if window is still not full
7d83440c 192 if self.window_size < window_len {
06178f13
WB
193 return 0;
194 }
195 }
196
197 //let mut idx = self.chunk_size % CA_CHUNKER_WINDOW_SIZE;
198 let mut idx = self.chunk_size & 0x3f;
199
200 while pos < data_len {
201 // roll window
202 let enter = data[pos];
203 let leave = self.window[idx];
204 self.h = self.h.rotate_left(1) ^
205 //BUZHASH_TABLE[leave as usize].rotate_left(CA_CHUNKER_WINDOW_SIZE as u32) ^
206 BUZHASH_TABLE[leave as usize] ^
207 BUZHASH_TABLE[enter as usize];
208
209 self.chunk_size += 1;
210 pos += 1;
211
212 self.window[idx] = enter;
213
214 if self.shall_break() {
215 self.h = 0;
216 self.chunk_size = 0;
217 self.window_size = 0;
218 return pos;
219 }
220
221 //idx = self.chunk_size % CA_CHUNKER_WINDOW_SIZE;
222 idx = self.chunk_size & 0x3f;
223 //idx += 1; if idx >= CA_CHUNKER_WINDOW_SIZE { idx = 0 };
224 }
225
226 0
227 }
228
e3218156 229 fn reset(&mut self) {
77fdae28
CE
230 self.h = 0;
231 self.chunk_size = 0;
232 self.window_size = 0;
233 }
06178f13
WB
234}
235
88ef759c
CE
236impl PayloadChunker {
237 /// Create a new PayloadChunker instance, which produces and average
238 /// chunk size of `chunk_size_avg` (need to be a power of two), if no
239 /// suggested boundaries are provided.
240 /// Use suggested boundaries instead, whenever the chunk size is within
241 /// the min - max range.
242 pub fn new(chunk_size_avg: usize, suggested_boundaries: Receiver<u64>) -> Self {
243 Self {
244 chunker: ChunkerImpl::new(chunk_size_avg),
245 current_suggested: None,
246 suggested_boundaries,
247 }
248 }
249}
250
251impl Chunker for PayloadChunker {
252 fn scan(&mut self, data: &[u8], ctx: &Context) -> usize {
253 assert!(ctx.total >= data.len() as u64);
254 let pos = ctx.total - data.len() as u64;
255
256 loop {
257 if let Some(boundary) = self.current_suggested {
258 if boundary < ctx.base + pos {
259 log::debug!("Boundary {boundary} in past");
260 // ignore passed boundaries
261 self.current_suggested = None;
262 continue;
263 }
264
265 if boundary > ctx.base + ctx.total {
266 log::debug!("Boundary {boundary} in future");
267 // boundary in future, cannot decide yet
268 return self.chunker.scan(data, ctx);
269 }
270
271 let chunk_size = (boundary - ctx.base) as usize;
272 if chunk_size < self.chunker.chunk_size_min {
273 log::debug!("Chunk size {chunk_size} below minimum chunk size");
274 // chunk to small, ignore boundary
275 self.current_suggested = None;
276 continue;
277 }
278
279 if chunk_size <= self.chunker.chunk_size_max {
280 self.current_suggested = None;
281 // calculate boundary relative to start of given data buffer
282 let len = chunk_size - pos as usize;
283 if len == 0 {
284 // passed this one, previous scan did not know about boundary just yet
285 return self.chunker.scan(data, ctx);
286 }
287 self.chunker.reset();
288 log::debug!(
289 "Chunk at suggested boundary: {boundary}, chunk size: {chunk_size}"
290 );
291 return len;
292 }
293
294 log::debug!("Chunk {chunk_size} to big, regular scan");
295 // chunk to big, cannot decide yet
296 // scan for hash based chunk boundary instead
297 return self.chunker.scan(data, ctx);
298 }
299
300 if let Ok(boundary) = self.suggested_boundaries.try_recv() {
301 self.current_suggested = Some(boundary);
302 } else {
303 log::debug!("No suggested boundary, regular scan");
304 return self.chunker.scan(data, ctx);
305 }
306 }
307 }
308
309 fn reset(&mut self) {
310 self.chunker.reset();
311 }
312}
313
06178f13
WB
314#[test]
315fn test_chunker1() {
06178f13
WB
316 let mut buffer = Vec::new();
317
7d83440c 318 for i in 0..(256 * 1024) {
06178f13 319 for j in 0..4 {
7d83440c 320 let byte = ((i >> (j << 3)) & 0xff) as u8;
06178f13
WB
321 buffer.push(byte);
322 }
323 }
e3218156 324 let mut chunker = ChunkerImpl::new(64 * 1024);
06178f13
WB
325
326 let mut pos = 0;
327 let mut last = 0;
328
329 let mut chunks1: Vec<(usize, usize)> = vec![];
330 let mut chunks2: Vec<(usize, usize)> = vec![];
e3218156 331 let ctx = Context::default();
06178f13
WB
332
333 // test1: feed single bytes
334 while pos < buffer.len() {
e3218156 335 let k = chunker.scan(&buffer[pos..pos + 1], &ctx);
06178f13
WB
336 pos += 1;
337 if k != 0 {
338 let prev = last;
339 last = pos;
7d83440c
WB
340 chunks1.push((prev, pos - prev));
341 }
06178f13
WB
342 }
343 chunks1.push((last, buffer.len() - last));
344
e3218156 345 let mut chunker = ChunkerImpl::new(64 * 1024);
06178f13
WB
346
347 let mut pos = 0;
348
349 // test2: feed with whole buffer
350 while pos < buffer.len() {
e3218156 351 let k = chunker.scan(&buffer[pos..], &ctx);
06178f13
WB
352 if k != 0 {
353 chunks2.push((pos, k));
354 pos += k;
7d83440c 355 } else {
06178f13
WB
356 break;
357 }
358 }
359
360 chunks2.push((pos, buffer.len() - pos));
361
362 if chunks1 != chunks2 {
06178f13
WB
363 let mut size1 = 0;
364 for (_offset, len) in &chunks1 {
365 size1 += len;
366 }
367 println!("Chunks1:{}\n{:?}\n", size1, chunks1);
368
369 let mut size2 = 0;
370 for (_offset, len) in &chunks2 {
371 size2 += len;
372 }
373 println!("Chunks2:{}\n{:?}\n", size2, chunks2);
374
7d83440c 375 if size1 != 256 * 4 * 1024 {
06178f13
WB
376 panic!("wrong size for chunks1");
377 }
7d83440c 378 if size2 != 256 * 4 * 1024 {
06178f13
WB
379 panic!("wrong size for chunks2");
380 }
381
382 panic!("got different chunks");
383 }
06178f13 384}
e11ee319
CE
385
386#[test]
387fn test_suggested_boundary() {
388 let mut buffer = Vec::new();
389
390 for i in 0..(256 * 1024) {
391 for j in 0..4 {
392 let byte = ((i >> (j << 3)) & 0xff) as u8;
393 buffer.push(byte);
394 }
395 }
396 let (tx, rx) = std::sync::mpsc::channel();
397 let mut chunker = PayloadChunker::new(64 * 1024, rx);
398
399 // Suggest chunk boundary within regular chunk
400 tx.send(32 * 1024).unwrap();
401 // Suggest chunk boundary within regular chunk, resulting chunk being 0
402 tx.send(32 * 1024).unwrap();
403 // Suggest chunk boundary in the past, must be ignored
404 tx.send(0).unwrap();
405 // Suggest chunk boundary aligned with regular boundary
406 tx.send(405521).unwrap();
407
408 let mut pos = 0;
409 let mut last = 0;
410
411 let mut chunks1: Vec<(usize, usize)> = vec![];
412 let mut chunks2: Vec<(usize, usize)> = vec![];
413 let mut ctx = Context::default();
414
415 // test1: feed single bytes with suggeset boundary
416 while pos < buffer.len() {
417 ctx.total += 1;
418 let k = chunker.scan(&buffer[pos..pos + 1], &ctx);
419 pos += 1;
420 if k != 0 {
421 let prev = last;
422 last = pos;
423 ctx.base += pos as u64;
424 ctx.total = 0;
425 chunks1.push((prev, pos - prev));
426 }
427 }
428 chunks1.push((last, buffer.len() - last));
429
430 let mut pos = 0;
431 let mut ctx = Context::default();
432 ctx.total = buffer.len() as u64;
433 chunker.reset();
434 // Suggest chunk boundary within regular chunk
435 tx.send(32 * 1024).unwrap();
436 // Suggest chunk boundary within regular chunk,
437 // resulting chunk being to small and therefore ignored
438 tx.send(32 * 1024).unwrap();
439 // Suggest chunk boundary in the past, must be ignored
440 tx.send(0).unwrap();
441 // Suggest chunk boundary aligned with regular boundary
442 tx.send(405521).unwrap();
443
444 while pos < buffer.len() {
445 let k = chunker.scan(&buffer[pos..], &ctx);
446 if k != 0 {
447 chunks2.push((pos, k));
448 pos += k;
449 ctx.base += pos as u64;
450 ctx.total = (buffer.len() - pos) as u64;
451 } else {
452 break;
453 }
454 }
455
456 chunks2.push((pos, buffer.len() - pos));
457
458 if chunks1 != chunks2 {
459 let mut size1 = 0;
460 for (_offset, len) in &chunks1 {
461 size1 += len;
462 }
463 println!("Chunks1: {size1}\n{chunks1:?}\n");
464
465 let mut size2 = 0;
466 for (_offset, len) in &chunks2 {
467 size2 += len;
468 }
469 println!("Chunks2: {size2}\n{chunks2:?}\n");
470
471 panic!("got different chunks");
472 }
473
474 let expected_sizes = [32768, 110609, 229376, 32768, 262144, 262144, 118767];
475 for ((_, chunk_size), expected) in chunks1.iter().zip(expected_sizes.iter()) {
476 assert_eq!(chunk_size, expected);
477 }
478}