]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/common/obj_bencher.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / common / obj_bencher.cc
index de73e70b15bdb362c5f907b380d7d89bdeae3435..e94fd7c3e346c0872e3d70af1292d00ec02ebec5 100644 (file)
@@ -17,7 +17,8 @@
  */
 #include "include/compat.h"
 #include <pthread.h>
-#include "common/Cond.h"
+#include "common/ceph_mutex.h"
+#include "common/Clock.h"
 #include "obj_bencher.h"
 
 const std::string BENCH_LASTRUN_METADATA = "benchmark_last_metadata";
@@ -92,14 +93,14 @@ void *ObjBencher::status_printer(void *_bencher) {
   bench_data& data = bencher->data;
   Formatter *formatter = bencher->formatter;
   ostream *outstream = bencher->outstream;
-  Cond cond;
+  ceph::condition_variable cond;
   int i = 0;
   int previous_writes = 0;
   int cycleSinceChange = 0;
   double bandwidth;
   int iops = 0;
   mono_clock::duration ONE_SECOND = std::chrono::seconds(1);
-  bencher->lock.lock();
+  std::unique_lock locker{bencher->lock};
   if (formatter)
     formatter->open_array_section("datas");
   while(!data.done) {
@@ -222,7 +223,7 @@ void *ObjBencher::status_printer(void *_bencher) {
     }
     ++i;
     ++cycleSinceChange;
-    cond.WaitInterval(bencher->lock, ONE_SECOND);
+    cond.wait_for(locker, ONE_SECOND);
   }
   if (formatter)
     formatter->close_section(); //datas
@@ -230,7 +231,6 @@ void *ObjBencher::status_printer(void *_bencher) {
     std::chrono::duration<double> runtime = mono_clock::now() - data.start_time;
     data.idata.min_iops = data.idata.max_iops = data.finished / runtime.count();
   }
-  bencher->lock.unlock();
   return NULL;
 }
 
@@ -245,6 +245,7 @@ int ObjBencher::aio_bench(
   if (concurrentios <= 0)
     return -EINVAL;
 
+  int num_ops = 0;
   int num_objects = 0;
   int r = 0;
   int prev_pid = 0;
@@ -257,7 +258,7 @@ int ObjBencher::aio_bench(
   if (operation != OP_WRITE || reuse_bench) {
     uint64_t prev_op_size, prev_object_size;
     r = fetch_bench_metadata(run_name_meta, &prev_op_size, &prev_object_size,
-                            &num_objects, &prev_pid);
+                            &num_ops, &num_objects, &prev_pid);
     if (r < 0) {
       if (r == -ENOENT) {
         if (reuse_bench)
@@ -298,17 +299,17 @@ int ObjBencher::aio_bench(
     if (r != 0) goto out;
   }
   else if (OP_SEQ_READ == operation) {
-    r = seq_read_bench(secondsToRun, num_objects, concurrentios, prev_pid, no_verify);
+    r = seq_read_bench(secondsToRun, num_ops, num_objects, concurrentios, prev_pid, no_verify);
     if (r != 0) goto out;
   }
   else if (OP_RAND_READ == operation) {
-    r = rand_read_bench(secondsToRun, num_objects, concurrentios, prev_pid, no_verify);
+    r = rand_read_bench(secondsToRun, num_ops, num_objects, concurrentios, prev_pid, no_verify);
     if (r != 0) goto out;
   }
 
   if (OP_WRITE == operation && cleanup) {
     r = fetch_bench_metadata(run_name_meta, &op_size, &object_size,
-                            &num_objects, &prev_pid);
+                            &num_ops, &num_objects, &prev_pid);
     if (r < 0) {
       if (r == -ENOENT)
         cerr << "Should never happen: bench metadata missing for current run!" << std::endl;
@@ -340,21 +341,21 @@ int ObjBencher::aio_bench(
 }
 
 struct lock_cond {
-  explicit lock_cond(Mutex *_lock) : lock(_lock) {}
-  Mutex *lock;
-  Cond cond;
+  explicit lock_cond(ceph::mutex *_lock) : lock(_lock) {}
+  ceph::mutex *lock;
+  ceph::condition_variable cond;
 };
 
 void _aio_cb(void *cb, void *arg) {
   struct lock_cond *lc = (struct lock_cond *)arg;
   lc->lock->lock();
-  lc->cond.Signal();
+  lc->cond.notify_all();
   lc->lock->unlock();
 }
 
 int ObjBencher::fetch_bench_metadata(const std::string& metadata_file,
                                     uint64_t *op_size, uint64_t* object_size,
-                                    int* num_objects, int* prevPid) {
+                                    int* num_ops, int* num_objects, int* prevPid) {
   int r = 0;
   bufferlist object_data;
 
@@ -369,13 +370,19 @@ int ObjBencher::fetch_bench_metadata(const std::string& metadata_file,
   }
   auto p = object_data.cbegin();
   decode(*object_size, p);
-  decode(*num_objects, p);
+  decode(*num_ops, p);
   decode(*prevPid, p);
   if (!p.end()) {
     decode(*op_size, p);
   } else {
     *op_size = *object_size;
   }
+  unsigned ops_per_object = 1;
+  // make sure *op_size value is reasonable
+  if (*op_size > 0 && *object_size > *op_size) {
+    ops_per_object = *object_size / *op_size;
+  }
+  *num_objects = (*num_ops + ops_per_object - 1) / ops_per_object;
 
   return 0;
 }
@@ -383,9 +390,9 @@ int ObjBencher::fetch_bench_metadata(const std::string& metadata_file,
 int ObjBencher::write_bench(int secondsToRun,
                            int concurrentios, const string& run_name_meta,
                            unsigned max_objects, int prev_pid) {
-  if (concurrentios <= 0) 
+  if (concurrentios <= 0)
     return -EINVAL;
-  
+
   if (!formatter) {
     out(cout) << "Maintaining " << concurrentios << " concurrent writes of "
              << data.op_size << " bytes to objects of size "
@@ -437,10 +444,10 @@ int ObjBencher::write_bench(int secondsToRun,
 
   pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
   ceph_pthread_setname(print_thread, "write_stat");
-  lock.lock();
+  std::unique_lock locker{lock};
   data.finished = 0;
   data.start_time = mono_clock::now();
-  lock.unlock();
+  locker.unlock();
   for (int i = 0; i<concurrentios; ++i) {
     start_times[i] = mono_clock::now();
     r = create_completion(i, _aio_cb, (void *)&lc);
@@ -451,22 +458,21 @@ int ObjBencher::write_bench(int secondsToRun,
     if (r < 0) {
       goto ERR;
     }
-    lock.lock();
+    locker.lock();
     ++data.started;
     ++data.in_flight;
-    lock.unlock();
+    locker.unlock();
   }
 
   //keep on adding new writes as old ones complete until we've passed minimum time
   int slot;
-  int num_objects;
 
   //don't need locking for reads because other thread doesn't write
 
   stopTime = data.start_time + std::chrono::seconds(secondsToRun);
   slot = 0;
-  lock.lock();
-  while (secondsToRun && mono_clock::now() < stopTime) {
+  locker.lock();
+  while (data.finished < data.started) {
     bool found = false;
     while (1) {
       int old_slot = slot;
@@ -482,21 +488,15 @@ int ObjBencher::write_bench(int secondsToRun,
       } while (slot != old_slot);
       if (found)
         break;
-      lc.cond.Wait(lock);
+      lc.cond.wait(locker);
     }
-    lock.unlock();
-    //create new contents and name on the heap, and fill them
-    newName = generate_object_name_fast(data.started / writes_per_object);
-    newContents = contents[slot].get();
-    snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started);
-    // we wrote to buffer, going around internal crc cache, so invalidate it now.
-    newContents->invalidate_crc();
+    locker.unlock();
 
     completion_wait(slot);
-    lock.lock();
+    locker.lock();
     r = completion_ret(slot);
     if (r != 0) {
-      lock.unlock();
+      locker.unlock();
       goto ERR;
     }
     data.cur_latency = mono_clock::now() - start_times[slot];
@@ -510,10 +510,31 @@ int ObjBencher::write_bench(int secondsToRun,
     data.avg_latency = total_latency / data.finished;
     data.latency_diff_sum += delta * (data.cur_latency.count() - data.avg_latency);
     --data.in_flight;
-    lock.unlock();
+    locker.unlock();
     release_completion(slot);
 
+    if (!secondsToRun || mono_clock::now() >= stopTime) {
+      locker.lock();
+      continue;
+    }
+
+    if (data.op_size && max_objects &&
+        data.started >=
+            (int)((data.object_size * max_objects + data.op_size - 1) /
+                  data.op_size)) {
+      locker.lock();
+      continue;
+    }
+
     //write new stuff to backend
+
+    //create new contents and name on the heap, and fill them
+    newName = generate_object_name_fast(data.started / writes_per_object);
+    newContents = contents[slot].get();
+    snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started);
+    // we wrote to buffer, going around internal crc cache, so invalidate it now.
+    newContents->invalidate_crc();
+
     start_times[slot] = mono_clock::now();
     r = create_completion(slot, _aio_cb, &lc);
     if (r < 0)
@@ -524,46 +545,16 @@ int ObjBencher::write_bench(int secondsToRun,
       goto ERR;
     }
     name[slot] = newName;
-    lock.lock();
+    locker.lock();
     ++data.started;
     ++data.in_flight;
-    if (data.op_size) {
-      if (max_objects &&
-         data.started >= (int)((data.object_size * max_objects + data.op_size - 1) /
-                              data.op_size))
-        break;
-    }
-  }
-  lock.unlock();
-
-  while (data.finished < data.started) {
-    slot = data.finished % concurrentios;
-    completion_wait(slot);
-    lock.lock();
-    r = completion_ret(slot);
-    if (r != 0) {
-      lock.unlock();
-      goto ERR;
-    }
-    data.cur_latency = mono_clock::now() - start_times[slot];
-    total_latency += data.cur_latency.count();
-    if (data.cur_latency.count() > data.max_latency)
-      data.max_latency = data.cur_latency.count();
-    if (data.cur_latency.count() < data.min_latency)
-      data.min_latency = data.cur_latency.count();
-    ++data.finished;
-    double delta = data.cur_latency.count() - data.avg_latency;
-    data.avg_latency = total_latency / data.finished;
-    data.latency_diff_sum += delta * (data.cur_latency.count() - data.avg_latency);
-    --data.in_flight;
-    lock.unlock();
-    release_completion(slot);
   }
+  locker.unlock();
 
   timePassed = mono_clock::now() - data.start_time;
-  lock.lock();
+  locker.lock();
   data.done = true;
-  lock.unlock();
+  locker.unlock();
 
   pthread_join(print_thread, NULL);
 
@@ -595,7 +586,7 @@ int ObjBencher::write_bench(int secondsToRun,
     out(cout) << "Total time run:         " << timePassed.count() << std::endl
        << "Total writes made:      " << data.finished << std::endl
        << "Write size:             " << data.op_size << std::endl
-       << "Object size:            " << data.object_size << std::endl      
+       << "Object size:            " << data.object_size << std::endl
        << "Bandwidth (MB/sec):     " << setprecision(6) << bandwidth << std::endl
        << "Stddev Bandwidth:       " << bandwidth_stddev << std::endl
        << "Max bandwidth (MB/sec): " << data.idata.max_bandwidth << std::endl
@@ -628,8 +619,7 @@ int ObjBencher::write_bench(int secondsToRun,
   }
   //write object size/number data for read benchmarks
   encode(data.object_size, b_write);
-  num_objects = (data.finished + writes_per_object - 1) / writes_per_object;
-  encode(num_objects, b_write);
+  encode(data.finished, b_write);
   encode(prev_pid ? prev_pid : getpid(),  b_write);
   encode(data.op_size, b_write);
 
@@ -641,17 +631,20 @@ int ObjBencher::write_bench(int secondsToRun,
   return 0;
 
  ERR:
-  lock.lock();
+  locker.lock();
   data.done = 1;
-  lock.unlock();
+  locker.unlock();
   pthread_join(print_thread, NULL);
   return r;
 }
 
-int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify) {
+int ObjBencher::seq_read_bench(
+  int seconds_to_run, int num_ops, int num_objects,
+  int concurrentios, int pid, bool no_verify) {
+
   lock_cond lc(&lock);
 
-  if (concurrentios <= 0) 
+  if (concurrentios <= 0)
     return -EINVAL;
 
   std::vector<string> name(concurrentios);
@@ -681,10 +674,10 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
     contents[i] = std::make_unique<bufferlist>();
   }
 
-  lock.lock();
+  std::unique_lock locker{lock};
   data.finished = 0;
   data.start_time = mono_clock::now();
-  lock.unlock();
+  locker.unlock();
 
   pthread_t print_thread;
   pthread_create(&print_thread, NULL, status_printer, (void *)this);
@@ -702,10 +695,10 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
       cerr << "r = " << r << std::endl;
       goto ERR;
     }
-    lock.lock();
+    locker.lock();
     ++data.started;
     ++data.in_flight;
-    lock.unlock();
+    locker.unlock();
   }
 
   //keep on adding new reads as old ones complete
@@ -713,9 +706,8 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
   bufferlist *cur_contents;
 
   slot = 0;
-  while ((seconds_to_run && mono_clock::now() < finish_time) &&
-        num_objects > data.started) {
-    lock.lock();
+  while (data.finished < data.started) {
+    locker.lock();
     int old_slot = slot;
     bool found = false;
     while (1) {
@@ -732,7 +724,7 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
       if (found) {
         break;
       }
-      lc.cond.Wait(lock);
+      lc.cond.wait(locker);
     }
 
     // calculate latency here, so memcmp doesn't inflate it
@@ -740,28 +732,33 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
 
     cur_contents = contents[slot].get();
     int current_index = index[slot];
-    
+
     // invalidate internal crc cache
     cur_contents->invalidate_crc();
-  
+
     if (!no_verify) {
       snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
-      if ( (cur_contents->length() != data.op_size) || 
+      if ( (cur_contents->length() != data.op_size) ||
            (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0) ) {
         cerr << name[slot] << " is not correct!" << std::endl;
         ++errors;
       }
     }
 
-    newName = generate_object_name_fast(data.started / reads_per_object, pid);
-    index[slot] = data.started;
-    lock.unlock();
+    bool start_new_read = (seconds_to_run && mono_clock::now() < finish_time) &&
+                          num_ops > data.started;
+    if (start_new_read) {
+      newName = generate_object_name_fast(data.started / reads_per_object, pid);
+      index[slot] = data.started;
+    }
+
+    locker.unlock();
     completion_wait(slot);
-    lock.lock();
+    locker.lock();
     r = completion_ret(slot);
     if (r < 0) {
       cerr << "read got " << r << std::endl;
-      lock.unlock();
+      locker.unlock();
       goto ERR;
     }
     total_latency += data.cur_latency.count();
@@ -772,9 +769,12 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
     ++data.finished;
     data.avg_latency = total_latency / data.finished;
     --data.in_flight;
-    lock.unlock();
+    locker.unlock();
     release_completion(slot);
 
+    if (!start_new_read)
+      continue;
+
     //start new read and check data if requested
     start_times[slot] = mono_clock::now();
     create_completion(slot, _aio_cb, (void *)&lc);
@@ -783,58 +783,24 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
     if (r < 0) {
       goto ERR;
     }
-    lock.lock();
+    locker.lock();
     ++data.started;
     ++data.in_flight;
-    lock.unlock();
+    locker.unlock();
     name[slot] = newName;
   }
 
-  //wait for final reads to complete
-  while (data.finished < data.started) {
-    slot = data.finished % concurrentios;
-    completion_wait(slot);
-    lock.lock();
-    r = completion_ret(slot);
-    if (r < 0) {
-      cerr << "read got " << r << std::endl;
-      lock.unlock();
-      goto ERR;
-    }
-    data.cur_latency = mono_clock::now() - start_times[slot];
-    total_latency += data.cur_latency.count();
-    if (data.cur_latency.count() > data.max_latency)
-      data.max_latency = data.cur_latency.count();
-    if (data.cur_latency.count() < data.min_latency)
-      data.min_latency = data.cur_latency.count();
-    ++data.finished;
-    data.avg_latency = total_latency / data.finished;
-    --data.in_flight;
-    release_completion(slot);
-    if (!no_verify) {
-      snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
-      lock.unlock();
-      if ((contents[slot]->length() != data.op_size) || 
-         (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
-        cerr << name[slot] << " is not correct!" << std::endl;
-        ++errors;
-      }
-    } else {
-        lock.unlock();
-    }
-  }
-
   timePassed = mono_clock::now() - data.start_time;
-  lock.lock();
+  locker.lock();
   data.done = true;
-  lock.unlock();
+  locker.unlock();
 
   pthread_join(print_thread, NULL);
 
   double bandwidth;
   bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count();
   bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
-  
+
   double iops_stddev;
   if (data.idata.iops_cycles > 1) {
     iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
@@ -875,15 +841,17 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
   return (errors > 0 ? -EIO : 0);
 
  ERR:
-  lock.lock();
+  locker.lock();
   data.done = 1;
-  lock.unlock();
+  locker.unlock();
   pthread_join(print_thread, NULL);
   return r;
 }
 
-int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify)
-{
+int ObjBencher::rand_read_bench(
+  int seconds_to_run, int num_ops, int num_objects,
+  int concurrentios, int pid, bool no_verify) {
+
   lock_cond lc(&lock);
 
   if (concurrentios <= 0)
@@ -918,10 +886,10 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr
     contents[i] = std::make_unique<bufferlist>();
   }
 
-  lock.lock();
+  unique_lock locker{lock};
   data.finished = 0;
   data.start_time = mono_clock::now();
-  lock.unlock();
+  locker.unlock();
 
   pthread_t print_thread;
   pthread_create(&print_thread, NULL, status_printer, (void *)this);
@@ -939,10 +907,10 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr
       cerr << "r = " << r << std::endl;
       goto ERR;
     }
-    lock.lock();
+    locker.lock();
     ++data.started;
     ++data.in_flight;
-    lock.unlock();
+    locker.unlock();
   }
 
   //keep on adding new reads as old ones complete
@@ -951,8 +919,8 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr
   int rand_id;
 
   slot = 0;
-  while ((seconds_to_run && mono_clock::now() < finish_time)) {
-    lock.lock();
+  while (data.finished < data.started) {
+    locker.lock();
     int old_slot = slot;
     bool found = false;
     while (1) {
@@ -969,22 +937,22 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr
       if (found) {
         break;
       }
-      lc.cond.Wait(lock);
+      lc.cond.wait(locker);
     }
 
     // calculate latency here, so memcmp doesn't inflate it
     data.cur_latency = mono_clock::now() - start_times[slot];
 
-    lock.unlock();
+    locker.unlock();
 
     int current_index = index[slot];
     cur_contents = contents[slot].get();
     completion_wait(slot);
-    lock.lock();
+    locker.lock();
     r = completion_ret(slot);
     if (r < 0) {
       cerr << "read got " << r << std::endl;
-      lock.unlock();
+      locker.unlock();
       goto ERR;
     }
 
@@ -996,26 +964,31 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr
     ++data.finished;
     data.avg_latency = total_latency / data.finished;
     --data.in_flight;
-    lock.unlock();
-    
+
     if (!no_verify) {
       snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
-      if ((cur_contents->length() != data.op_size) || 
+      if ((cur_contents->length() != data.op_size) ||
           (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0)) {
         cerr << name[slot] << " is not correct!" << std::endl;
         ++errors;
       }
-    } 
+    }
+
+    locker.unlock();
+    release_completion(slot);
+
+    if (!seconds_to_run || mono_clock::now() >= finish_time)
+      continue;
+
+    //start new read and check data if requested
 
-    rand_id = rand() % num_objects;
+    rand_id = rand() % num_ops;
     newName = generate_object_name_fast(rand_id / reads_per_object, pid);
     index[slot] = rand_id;
-    release_completion(slot);
 
     // invalidate internal crc cache
     cur_contents->invalidate_crc();
 
-    //start new read and check data if requested
     start_times[slot] = mono_clock::now();
     create_completion(slot, _aio_cb, (void *)&lc);
     r = aio_read(newName, slot, contents[slot].get(), data.op_size,
@@ -1023,59 +996,24 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr
     if (r < 0) {
       goto ERR;
     }
-    lock.lock();
+    locker.lock();
     ++data.started;
     ++data.in_flight;
-    lock.unlock();
+    locker.unlock();
     name[slot] = newName;
   }
 
-
-  //wait for final reads to complete
-  while (data.finished < data.started) {
-    slot = data.finished % concurrentios;
-    completion_wait(slot);
-    lock.lock();
-    r = completion_ret(slot);
-    if (r < 0) {
-      cerr << "read got " << r << std::endl;
-      lock.unlock();
-      goto ERR;
-    }
-    data.cur_latency = mono_clock::now() - start_times[slot];
-    total_latency += data.cur_latency.count();
-    if (data.cur_latency.count() > data.max_latency)
-      data.max_latency = data.cur_latency.count();
-    if (data.cur_latency.count() < data.min_latency)
-      data.min_latency = data.cur_latency.count();
-    ++data.finished;
-    data.avg_latency = total_latency / data.finished;
-    --data.in_flight;
-    release_completion(slot);
-    if (!no_verify) {
-      snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
-      lock.unlock();
-      if ((contents[slot]->length() != data.op_size) || 
-          (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
-        cerr << name[slot] << " is not correct!" << std::endl;
-        ++errors;
-      }
-    } else {
-        lock.unlock();
-    }
-  }
-
   timePassed = mono_clock::now() - data.start_time;
-  lock.lock();
+  locker.lock();
   data.done = true;
-  lock.unlock();
+  locker.unlock();
 
   pthread_join(print_thread, NULL);
 
   double bandwidth;
   bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count();
   bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
-  
+
   double iops_stddev;
   if (data.idata.iops_cycles > 1) {
     iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
@@ -1115,9 +1053,9 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr
   return (errors > 0 ? -EIO : 0);
 
  ERR:
-  lock.lock();
+  locker.lock();
   data.done = 1;
-  lock.unlock();
+  locker.unlock();
   pthread_join(print_thread, NULL);
   return r;
 }
@@ -1125,7 +1063,7 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr
 int ObjBencher::clean_up(const std::string& orig_prefix, int concurrentios, const std::string& run_name) {
   int r = 0;
   uint64_t op_size, object_size;
-  int num_objects;
+  int num_ops, num_objects;
   int prevPid;
 
   // default meta object if user does not specify one
@@ -1172,7 +1110,7 @@ int ObjBencher::clean_up(const std::string& orig_prefix, int concurrentios, cons
       continue;
     }
 
-    r = fetch_bench_metadata(run_name_meta, &op_size, &object_size, &num_objects, &prevPid);
+    r = fetch_bench_metadata(run_name_meta, &op_size, &object_size, &num_ops, &num_objects, &prevPid);
     if (r < 0) {
       return r;
     }
@@ -1189,8 +1127,8 @@ int ObjBencher::clean_up(const std::string& orig_prefix, int concurrentios, cons
 
 int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
   lock_cond lc(&lock);
-  
-  if (concurrentios <= 0) 
+
+  if (concurrentios <= 0)
     return -EINVAL;
 
   std::vector<string> name(concurrentios);
@@ -1198,12 +1136,12 @@ int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
   int r = 0;
   int slot = 0;
 
-  lock.lock();
+  unique_lock locker{lock};
   data.done = false;
   data.in_flight = 0;
   data.started = 0;
   data.finished = 0;
-  lock.unlock();
+  locker.unlock();
 
   // don't start more completions than files
   if (num_objects == 0) {
@@ -1229,15 +1167,15 @@ int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
       cerr << "r = " << r << std::endl;
       goto ERR;
     }
-    lock.lock();
+    locker.lock();
     ++data.started;
     ++data.in_flight;
-    lock.unlock();
+    locker.unlock();
   }
 
   //keep on adding new removes as old ones complete
-  while (data.started < num_objects) {
-    lock.lock();
+  while (data.finished < data.started) {
+    locker.lock();
     int old_slot = slot;
     bool found = false;
     while (1) {
@@ -1254,56 +1192,42 @@ int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
       if (found) {
         break;
       }
-      lc.cond.Wait(lock);
+      lc.cond.wait(locker);
     }
-    lock.unlock();
-    newName = generate_object_name_fast(data.started, prevPid);
+    locker.unlock();
     completion_wait(slot);
-    lock.lock();
+    locker.lock();
     r = completion_ret(slot);
     if (r != 0 && r != -ENOENT) { // file does not exist
       cerr << "remove got " << r << std::endl;
-      lock.unlock();
+      locker.unlock();
       goto ERR;
     }
     ++data.finished;
     --data.in_flight;
-    lock.unlock();
+    locker.unlock();
     release_completion(slot);
 
+    if (data.started >= num_objects)
+      continue;
+
     //start new remove and check data if requested
+    newName = generate_object_name_fast(data.started, prevPid);
     create_completion(slot, _aio_cb, (void *)&lc);
     r = aio_remove(newName, slot);
     if (r < 0) {
       goto ERR;
     }
-    lock.lock();
+    locker.lock();
     ++data.started;
     ++data.in_flight;
-    lock.unlock();
+    locker.unlock();
     name[slot] = newName;
   }
 
-  //wait for final removes to complete
-  while (data.finished < data.started) {
-    slot = data.finished % concurrentios;
-    completion_wait(slot);
-    lock.lock();
-    r = completion_ret(slot);
-    if (r != 0 && r != -ENOENT) { // file does not exist
-      cerr << "remove got " << r << std::endl;
-      lock.unlock();
-      goto ERR;
-    }
-    ++data.finished;
-    --data.in_flight;
-    release_completion(slot);
-    lock.unlock();
-  }
-
-  lock.lock();
+  locker.lock();
   data.done = true;
-  lock.unlock();
+  locker.unlock();
 
   completions_done();
 
@@ -1312,9 +1236,9 @@ int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
   return 0;
 
  ERR:
-  lock.lock();
+  locker.lock();
   data.done = 1;
-  lock.unlock();
+  locker.unlock();
   return r;
 }
 
@@ -1364,12 +1288,12 @@ int ObjBencher::clean_up_slow(const std::string& prefix, int concurrentios) {
   std::list<Object> objects;
   bool objects_remain = true;
 
-  lock.lock();
+  std::unique_lock locker{lock};
   data.done = false;
   data.in_flight = 0;
   data.started = 0;
   data.finished = 0;
-  lock.unlock();
+  locker.unlock();
 
   out(cout) << "Warning: using slow linear search" << std::endl;
 
@@ -1402,15 +1326,15 @@ int ObjBencher::clean_up_slow(const std::string& prefix, int concurrentios) {
       cerr << "r = " << r << std::endl;
       goto ERR;
     }
-    lock.lock();
+    locker.lock();
     ++data.started;
     ++data.in_flight;
-    lock.unlock();
+    locker.unlock();
   }
 
   //keep on adding new removes as old ones complete
   while (objects_remain) {
-    lock.lock();
+    locker.lock();
     int old_slot = slot;
     bool found = false;
     while (1) {
@@ -1427,9 +1351,9 @@ int ObjBencher::clean_up_slow(const std::string& prefix, int concurrentios) {
       if (found) {
         break;
       }
-      lc.cond.Wait(lock);
+      lc.cond.wait(locker);
     }
-    lock.unlock();
+    locker.unlock();
 
     // get more objects if necessary
     if (objects.empty()) {
@@ -1445,16 +1369,16 @@ int ObjBencher::clean_up_slow(const std::string& prefix, int concurrentios) {
     objects.pop_front();
 
     completion_wait(slot);
-    lock.lock();
+    locker.lock();
     r = completion_ret(slot);
     if (r != 0 && r != -ENOENT) { // file does not exist
       cerr << "remove got " << r << std::endl;
-      lock.unlock();
+      locker.unlock();
       goto ERR;
     }
     ++data.finished;
     --data.in_flight;
-    lock.unlock();
+    locker.unlock();
     release_completion(slot);
 
     //start new remove and check data if requested
@@ -1464,10 +1388,10 @@ int ObjBencher::clean_up_slow(const std::string& prefix, int concurrentios) {
     if (r < 0) {
       goto ERR;
     }
-    lock.lock();
+    locker.lock();
     ++data.started;
     ++data.in_flight;
-    lock.unlock();
+    locker.unlock();
     name[slot] = newName;
   }
 
@@ -1475,22 +1399,22 @@ int ObjBencher::clean_up_slow(const std::string& prefix, int concurrentios) {
   while (data.finished < data.started) {
     slot = data.finished % concurrentios;
     completion_wait(slot);
-    lock.lock();
+    locker.lock();
     r = completion_ret(slot);
     if (r != 0 && r != -ENOENT) { // file does not exist
       cerr << "remove got " << r << std::endl;
-      lock.unlock();
+      locker.unlock();
       goto ERR;
     }
     ++data.finished;
     --data.in_flight;
     release_completion(slot);
-    lock.unlock();
+    locker.unlock();
   }
 
-  lock.lock();
+  locker.lock();
   data.done = true;
-  lock.unlock();
+  locker.unlock();
 
   completions_done();
 
@@ -1499,8 +1423,8 @@ int ObjBencher::clean_up_slow(const std::string& prefix, int concurrentios) {
   return 0;
 
  ERR:
-  lock.lock();
+  locker.lock();
   data.done = 1;
-  lock.unlock();
+  locker.unlock();
   return -EIO;
 }