]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/journal/test_ObjectRecorder.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / test / journal / test_ObjectRecorder.cc
index 3cc8e893cfe0c93bd7e4cb0669d9d8334a8b139e..ac110a23e6fbe50b737a76ddf8d77a5d7080ad5a 100644 (file)
@@ -3,7 +3,7 @@
 
 #include "journal/ObjectRecorder.h"
 #include "common/Cond.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "common/Timer.h"
 #include "gtest/gtest.h"
 #include "test/librados/test.h"
@@ -14,126 +14,148 @@ using std::shared_ptr;
 
 class TestObjectRecorder : public RadosTestFixture {
 public:
-  TestObjectRecorder()
-    : m_flush_interval(std::numeric_limits<uint32_t>::max()),
-      m_flush_bytes(std::numeric_limits<uint64_t>::max()),
-      m_flush_age(600)
-  {
-  }
+  TestObjectRecorder() = default;
 
   struct Handler : public journal::ObjectRecorder::Handler {
-    Mutex lock;
-    shared_ptr<Mutex> object_lock;
-    Cond cond;
+    ceph::mutex lock = ceph::make_mutex("lock");
+    ceph::mutex* object_lock = nullptr;
+    ceph::condition_variable cond;
     bool is_closed = false;
     uint32_t overflows = 0;
 
-    Handler() : lock("lock") {
-    }
+    Handler() = default;
 
     void closed(journal::ObjectRecorder *object_recorder) override {
-      Mutex::Locker locker(lock);
+      std::lock_guard locker{lock};
       is_closed = true;
-      cond.Signal();
+      cond.notify_all();
     }
     void overflow(journal::ObjectRecorder *object_recorder) override {
-      Mutex::Locker locker(lock);
+      std::lock_guard locker{lock};
       journal::AppendBuffers append_buffers;
-      object_lock->Lock();
+      object_lock->lock();
       object_recorder->claim_append_buffers(&append_buffers);
-      object_lock->Unlock();
+      object_lock->unlock();
 
       ++overflows;
-      cond.Signal();
+      cond.notify_all();
     }
   };
 
-  typedef std::list<journal::ObjectRecorderPtr> ObjectRecorders;
-  typedef std::map<std::string, shared_ptr<Mutex>> ObjectRecorderLocksMap;
-
-  ObjectRecorders m_object_recorders;
-  ObjectRecorderLocksMap m_object_recorder_locks;
-
-  uint32_t m_flush_interval;
-  uint64_t m_flush_bytes;
-  double m_flush_age;
-  uint64_t m_max_in_flight_appends = 0;
-  Handler m_handler;
-
-  void TearDown() override {
-    for (ObjectRecorders::iterator it = m_object_recorders.begin();
-         it != m_object_recorders.end(); ++it) {
-      C_SaferCond cond;
-      (*it)->flush(&cond);
-      cond.wait();
+  // flush the pending buffers in dtor
+  class ObjectRecorderFlusher {
+  public:
+    ObjectRecorderFlusher(librados::IoCtx& ioctx,
+                         ContextWQ* work_queue)
+      : m_ioctx{ioctx},
+       m_work_queue{work_queue}
+    {}
+    ObjectRecorderFlusher(librados::IoCtx& ioctx,
+                         ContextWQ* work_queue,
+                         uint32_t flush_interval,
+                         uint16_t flush_bytes,
+                         double flush_age,
+                         int max_in_flight)
+      : m_ioctx{ioctx},
+       m_work_queue{work_queue},
+       m_flush_interval{flush_interval},
+       m_flush_bytes{flush_bytes},
+       m_flush_age{flush_age},
+       m_max_in_flight_appends{max_in_flight < 0 ?
+                               std::numeric_limits<uint64_t>::max() :
+                               static_cast<uint64_t>(max_in_flight)}
+    {}
+    ~ObjectRecorderFlusher() {
+      for (auto& [object_recorder, m] : m_object_recorders) {
+       C_SaferCond cond;
+       object_recorder->flush(&cond);
+       cond.wait();
+       std::scoped_lock l{*m};
+       if (!object_recorder->is_closed()) {
+         object_recorder->close();
+       }
+      }
     }
-    m_object_recorders.clear();
-
-    RadosTestFixture::TearDown();
-  }
-
-  inline void set_batch_options(uint32_t flush_interval, uint64_t flush_bytes,
-                                double flush_age, int max_in_flight) {
-    m_flush_interval = flush_interval;
-    m_flush_bytes = flush_bytes;
-    m_flush_age = flush_age;
-    m_max_in_flight_appends = max_in_flight;
-  }
+    auto create_object(std::string_view oid, uint8_t order, ceph::mutex* lock) {
+      auto object = ceph::make_ref<journal::ObjectRecorder>(
+        m_ioctx, oid, 0, lock, m_work_queue, &m_handler,
+       order, m_max_in_flight_appends);
+      {
+       std::lock_guard locker{*lock};
+       object->set_append_batch_options(m_flush_interval,
+                                        m_flush_bytes,
+                                        m_flush_age);
+      }
+      m_object_recorders.emplace_back(object, lock);
+      m_handler.object_lock = lock;
+      return object;
+    }
+    bool wait_for_closed() {
+      std::unique_lock locker{m_handler.lock};
+      return m_handler.cond.wait_for(locker, 10s,
+                                    [this] { return m_handler.is_closed; });
+    }
+    bool wait_for_overflow() {
+      std::unique_lock locker{m_handler.lock};
+      if (m_handler.cond.wait_for(locker, 10s,
+                                 [this] { return m_handler.overflows > 0; })) {
+       m_handler.overflows = 0;
+       return true;
+      } else {
+       return false;
+      }
+    }
+  private:
+    librados::IoCtx& m_ioctx;
+    ContextWQ *m_work_queue;
+    uint32_t m_flush_interval = std::numeric_limits<uint32_t>::max();
+    uint64_t m_flush_bytes = std::numeric_limits<uint64_t>::max();
+    double m_flush_age = 600;
+    uint64_t m_max_in_flight_appends = 0;
+    using ObjectRecorders =
+      std::list<std::pair<ceph::ref_t<journal::ObjectRecorder>, ceph::mutex*>>;
+    ObjectRecorders m_object_recorders;
+    Handler m_handler;
+  };
 
-  journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
+  journal::AppendBuffer create_append_buffer(uint64_t tag_tid,
+                                             uint64_t entry_tid,
                                              const std::string &payload) {
-    journal::FutureImplPtr future(new journal::FutureImpl(tag_tid, entry_tid,
-                                                          456));
-    future->init(journal::FutureImplPtr());
+    auto future = ceph::make_ref<journal::FutureImpl>(tag_tid, entry_tid, 456);
+    future->init(ceph::ref_t<journal::FutureImpl>());
 
     bufferlist bl;
     bl.append(payload);
     return std::make_pair(future, bl);
   }
-
-  journal::ObjectRecorderPtr create_object(const std::string &oid,
-                                           uint8_t order, shared_ptr<Mutex> lock) {
-    journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
-      m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order,
-      m_max_in_flight_appends));
-    {
-      Mutex::Locker locker(*lock);
-      object->set_append_batch_options(m_flush_interval, m_flush_bytes,
-                                       m_flush_age);
-    }
-    m_object_recorders.push_back(object);
-    m_object_recorder_locks.insert(std::make_pair(oid, lock));
-    m_handler.object_lock = lock;
-    return object;
-  }
 };
 
 TEST_F(TestObjectRecorder, Append) {
   std::string oid = get_temp_oid();
   ASSERT_EQ(0, create(oid));
   ASSERT_EQ(0, client_register(oid));
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  auto metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_batch_options(0, 0, 0, 0);
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0, 0);
+  auto object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(0U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(0U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -146,28 +168,28 @@ TEST_F(TestObjectRecorder, AppendFlushByCount) {
   std::string oid = get_temp_oid();
   ASSERT_EQ(0, create(oid));
   ASSERT_EQ(0, client_register(oid));
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  auto metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_batch_options(2, 0, 0, -1);
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1);
+  auto object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(1U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(0U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -179,28 +201,28 @@ TEST_F(TestObjectRecorder, AppendFlushByBytes) {
   std::string oid = get_temp_oid();
   ASSERT_EQ(0, create(oid));
   ASSERT_EQ(0, client_register(oid));
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  auto metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_batch_options(0, 10, 0, -1);
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
+  auto object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(1U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(0U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -212,27 +234,35 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) {
   std::string oid = get_temp_oid();
   ASSERT_EQ(0, create(oid));
   ASSERT_EQ(0, client_register(oid));
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  auto metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_batch_options(0, 0, 0.1, -1);
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0005, -1);
+  auto object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
 
-  journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
-                                                              "payload");
-  append_buffers = {append_buffer2};
-  lock->Lock();
-  ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  uint32_t offset  = 0;
+  journal::AppendBuffer append_buffer2;
+  while (!append_buffer1.first->is_flush_in_progress() &&
+         !append_buffer1.first->is_complete()) {
+    usleep(1000);
+
+    append_buffer2 = create_append_buffer(234, 124 + offset, "payload");
+    ++offset;
+    append_buffers = {append_buffer2};
+
+    lock.lock();
+    ASSERT_FALSE(object->append(std::move(append_buffers)));
+    lock.unlock();
+  }
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -244,27 +274,28 @@ TEST_F(TestObjectRecorder, AppendFilledObject) {
   std::string oid = get_temp_oid();
   ASSERT_EQ(0, create(oid));
   ASSERT_EQ(0, client_register(oid));
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  auto metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 12, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0, -1);
+  auto object = flusher.create_object(oid, 12, &lock);
 
   std::string payload(2048, '1');
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               payload);
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               payload);
   append_buffers = {append_buffer2};
-  lock->Lock();
+  lock.lock();
   ASSERT_TRUE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -276,20 +307,20 @@ TEST_F(TestObjectRecorder, Flush) {
   std::string oid = get_temp_oid();
   ASSERT_EQ(0, create(oid));
   ASSERT_EQ(0, client_register(oid));
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  auto metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_batch_options(0, 10, 0, -1);
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
+  auto object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(1U, object->get_pending_appends());
 
   C_SaferCond cond1;
@@ -306,20 +337,20 @@ TEST_F(TestObjectRecorder, FlushFuture) {
   std::string oid = get_temp_oid();
   ASSERT_EQ(0, create(oid));
   ASSERT_EQ(0, client_register(oid));
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  auto metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_batch_options(0, 10, 0, -1);
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
+  auto object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
                                                              "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(1U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -334,11 +365,12 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) {
   std::string oid = get_temp_oid();
   ASSERT_EQ(0, create(oid));
   ASSERT_EQ(0, client_register(oid));
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  auto metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
+  auto object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
                                                              "payload");
@@ -348,9 +380,9 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) {
 
   object->flush(append_buffer.first);
   ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
 
   // should automatically flush once its attached to the object
   C_SaferCond cond;
@@ -362,38 +394,29 @@ TEST_F(TestObjectRecorder, Close) {
   std::string oid = get_temp_oid();
   ASSERT_EQ(0, create(oid));
   ASSERT_EQ(0, client_register(oid));
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  auto metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_batch_options(2, 0, 0, -1);
-  shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
-  journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+  ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1);
+  auto object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
                                                               "payload");
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock->Unlock();
+  lock.unlock();
   ASSERT_EQ(1U, object->get_pending_appends());
 
-  lock->Lock();
+  lock.lock();
   ASSERT_FALSE(object->close());
-  ASSERT_TRUE(lock->is_locked());
-  lock->Unlock();
-
-  {
-    Mutex::Locker locker(m_handler.lock);
-    while (!m_handler.is_closed) {
-      if (m_handler.cond.WaitInterval(
-            m_handler.lock, utime_t(10, 0)) != 0) {
-        break;
-      }
-    }
-  }
+  ASSERT_TRUE(ceph_mutex_is_locked(lock));
+  lock.unlock();
+
+  ASSERT_TRUE(flusher.wait_for_closed());
 
-  ASSERT_TRUE(m_handler.is_closed);
   ASSERT_EQ(0U, object->get_pending_appends());
 }
 
@@ -401,11 +424,14 @@ TEST_F(TestObjectRecorder, Overflow) {
   std::string oid = get_temp_oid();
   ASSERT_EQ(0, create(oid));
   ASSERT_EQ(0, client_register(oid));
-  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  auto metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  shared_ptr<Mutex> lock1(new Mutex("object_recorder_lock_1"));
-  journal::ObjectRecorderPtr object1 = create_object(oid, 12, lock1);
+  ceph::mutex lock1 = ceph::make_mutex("object_recorder_lock_1");
+  ceph::mutex lock2 = ceph::make_mutex("object_recorder_lock_2");
+
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
+  auto object1 = flusher.create_object(oid, 12, &lock1);
 
   std::string payload(1 << 11, '1');
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
@@ -414,56 +440,24 @@ TEST_F(TestObjectRecorder, Overflow) {
                                                               payload);
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1, append_buffer2};
-  lock1->Lock();
+  lock1.lock();
   ASSERT_TRUE(object1->append(std::move(append_buffers)));
-  lock1->Unlock();
+  lock1.unlock();
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
   ASSERT_EQ(0, cond.wait());
   ASSERT_EQ(0U, object1->get_pending_appends());
 
-  bool overflowed = false;
-  {
-    Mutex::Locker locker(m_handler.lock);
-    while (m_handler.overflows == 0) {
-      if (m_handler.cond.WaitInterval(
-            m_handler.lock, utime_t(10, 0)) != 0) {
-        break;
-      }
-    }
-    if (m_handler.overflows != 0) {
-      overflowed = true;
-      m_handler.overflows = 0;
-    }
-  }
-
-  ASSERT_TRUE(overflowed);
-
-  shared_ptr<Mutex> lock2(new Mutex("object_recorder_lock_2"));
-  journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2);
+  auto object2 = flusher.create_object(oid, 12, &lock2);
 
   journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,
                                                               payload);
   append_buffers = {append_buffer3};
-  lock2->Lock();
+  lock2.lock();
   ASSERT_FALSE(object2->append(std::move(append_buffers)));
-  lock2->Unlock();
+  lock2.unlock();
   append_buffer3.first->flush(NULL);
 
-  overflowed = false;
-  {
-    Mutex::Locker locker(m_handler.lock);
-    while (m_handler.overflows == 0) {
-      if (m_handler.cond.WaitInterval(
-            m_handler.lock, utime_t(10, 0)) != 0) {
-        break;
-      }
-    }
-    if (m_handler.overflows != 0) {
-      overflowed = true;
-    }
-  }
-
-  ASSERT_TRUE(overflowed);
+  ASSERT_TRUE(flusher.wait_for_overflow());
 }