}
}
+fn rename_corrupted_chunk(
+ datastore: Arc<DataStore>,
+ digest: &[u8;32],
+ worker: Arc<WorkerTask>,
+) {
+ let (path, digest_str) = datastore.chunk_path(digest);
+
+ let mut counter = 0;
+ let mut new_path = path.clone();
+ new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
+ while new_path.exists() && counter < 9 {
+ counter += 1;
+ new_path.set_file_name(format!("{}.{}.bad", digest_str, counter));
+ }
+
+ match std::fs::rename(&path, &new_path) {
+ Ok(_) => {
+ worker.log(format!("corrupted chunk renamed to {:?}", &new_path));
+ },
+ Err(err) => {
+ match err.kind() {
+ std::io::ErrorKind::NotFound => { /* ignored */ },
+ _ => worker.log(format!("could not rename corrupted chunk {:?} - {}", &path, err))
+ }
+ }
+ };
+}
+
// We use a separate thread to read/load chunks, so that we can do
// load and verify in parallel to increase performance.
fn chunk_reader_thread(
corrupt_chunks.lock().unwrap().insert(info.digest);
worker.log(format!("can't verify chunk, load failed - {}", err));
errors.fetch_add(1, Ordering::SeqCst);
+ rename_corrupted_chunk(datastore.clone(), &info.digest, worker.clone());
continue;
}
Ok(chunk) => {
let start_time = Instant::now();
let chunk_channel = chunk_reader_thread(
- datastore,
+ datastore.clone(),
index,
verified_chunks.clone(),
corrupt_chunks.clone(),
corrupt_chunks.lock().unwrap().insert(digest);
worker.log(format!("{}", err));
errors.fetch_add(1, Ordering::SeqCst);
+ rename_corrupted_chunk(datastore.clone(), &digest, worker.clone());
} else {
verified_chunks.lock().unwrap().insert(digest);
}