#include "journal/ObjectRecorder.h"
#include "common/Cond.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/Timer.h"
#include "gtest/gtest.h"
#include "test/librados/test.h"
class TestObjectRecorder : public RadosTestFixture {
public:
- TestObjectRecorder()
- : m_flush_interval(std::numeric_limits<uint32_t>::max()),
- m_flush_bytes(std::numeric_limits<uint64_t>::max()),
- m_flush_age(600)
- {
- }
+ TestObjectRecorder() = default;
struct Handler : public journal::ObjectRecorder::Handler {
- Mutex lock;
- shared_ptr<Mutex> object_lock;
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("lock");
+ ceph::mutex* object_lock = nullptr;
+ ceph::condition_variable cond;
bool is_closed = false;
uint32_t overflows = 0;
- Handler() : lock("lock") {
- }
+ Handler() = default;
void closed(journal::ObjectRecorder *object_recorder) override {
- Mutex::Locker locker(lock);
+ std::lock_guard locker{lock};
is_closed = true;
- cond.Signal();
+ cond.notify_all();
}
void overflow(journal::ObjectRecorder *object_recorder) override {
- Mutex::Locker locker(lock);
+ std::lock_guard locker{lock};
journal::AppendBuffers append_buffers;
- object_lock->Lock();
+ object_lock->lock();
object_recorder->claim_append_buffers(&append_buffers);
- object_lock->Unlock();
+ object_lock->unlock();
++overflows;
- cond.Signal();
+ cond.notify_all();
}
};
- typedef std::list<journal::ObjectRecorderPtr> ObjectRecorders;
- typedef std::map<std::string, shared_ptr<Mutex>> ObjectRecorderLocksMap;
-
- ObjectRecorders m_object_recorders;
- ObjectRecorderLocksMap m_object_recorder_locks;
-
- uint32_t m_flush_interval;
- uint64_t m_flush_bytes;
- double m_flush_age;
- uint64_t m_max_in_flight_appends = 0;
- Handler m_handler;
-
- void TearDown() override {
- for (ObjectRecorders::iterator it = m_object_recorders.begin();
- it != m_object_recorders.end(); ++it) {
- C_SaferCond cond;
- (*it)->flush(&cond);
- cond.wait();
+ // flush the pending buffers in dtor
+ class ObjectRecorderFlusher {
+ public:
+ ObjectRecorderFlusher(librados::IoCtx& ioctx,
+ ContextWQ* work_queue)
+ : m_ioctx{ioctx},
+ m_work_queue{work_queue}
+ {}
+ ObjectRecorderFlusher(librados::IoCtx& ioctx,
+ ContextWQ* work_queue,
+ uint32_t flush_interval,
+ uint16_t flush_bytes,
+ double flush_age,
+ int max_in_flight)
+ : m_ioctx{ioctx},
+ m_work_queue{work_queue},
+ m_flush_interval{flush_interval},
+ m_flush_bytes{flush_bytes},
+ m_flush_age{flush_age},
+ m_max_in_flight_appends{max_in_flight < 0 ?
+ std::numeric_limits<uint64_t>::max() :
+ static_cast<uint64_t>(max_in_flight)}
+ {}
+ ~ObjectRecorderFlusher() {
+ for (auto& [object_recorder, m] : m_object_recorders) {
+ C_SaferCond cond;
+ object_recorder->flush(&cond);
+ cond.wait();
+ std::scoped_lock l{*m};
+ if (!object_recorder->is_closed()) {
+ object_recorder->close();
+ }
+ }
}
- m_object_recorders.clear();
-
- RadosTestFixture::TearDown();
- }
-
- inline void set_batch_options(uint32_t flush_interval, uint64_t flush_bytes,
- double flush_age, int max_in_flight) {
- m_flush_interval = flush_interval;
- m_flush_bytes = flush_bytes;
- m_flush_age = flush_age;
- m_max_in_flight_appends = max_in_flight;
- }
+ auto create_object(std::string_view oid, uint8_t order, ceph::mutex* lock) {
+ auto object = ceph::make_ref<journal::ObjectRecorder>(
+ m_ioctx, oid, 0, lock, m_work_queue, &m_handler,
+ order, m_max_in_flight_appends);
+ {
+ std::lock_guard locker{*lock};
+ object->set_append_batch_options(m_flush_interval,
+ m_flush_bytes,
+ m_flush_age);
+ }
+ m_object_recorders.emplace_back(object, lock);
+ m_handler.object_lock = lock;
+ return object;
+ }
+ bool wait_for_closed() {
+ std::unique_lock locker{m_handler.lock};
+ return m_handler.cond.wait_for(locker, 10s,
+ [this] { return m_handler.is_closed; });
+ }
+ bool wait_for_overflow() {
+ std::unique_lock locker{m_handler.lock};
+ if (m_handler.cond.wait_for(locker, 10s,
+ [this] { return m_handler.overflows > 0; })) {
+ m_handler.overflows = 0;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ private:
+ librados::IoCtx& m_ioctx;
+ ContextWQ *m_work_queue;
+ uint32_t m_flush_interval = std::numeric_limits<uint32_t>::max();
+ uint64_t m_flush_bytes = std::numeric_limits<uint64_t>::max();
+ double m_flush_age = 600;
+ uint64_t m_max_in_flight_appends = 0;
+ using ObjectRecorders =
+ std::list<std::pair<ceph::ref_t<journal::ObjectRecorder>, ceph::mutex*>>;
+ ObjectRecorders m_object_recorders;
+ Handler m_handler;
+ };
- journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
+ journal::AppendBuffer create_append_buffer(uint64_t tag_tid,
+ uint64_t entry_tid,
const std::string &payload) {
- journal::FutureImplPtr future(new journal::FutureImpl(tag_tid, entry_tid,
- 456));
- future->init(journal::FutureImplPtr());
+ auto future = ceph::make_ref<journal::FutureImpl>(tag_tid, entry_tid, 456);
+ future->init(ceph::ref_t<journal::FutureImpl>());
bufferlist bl;
bl.append(payload);
return std::make_pair(future, bl);
}
-
- journal::ObjectRecorderPtr create_object(const std::string &oid,
- uint8_t order, shared_ptr<Mutex> lock) {
- journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
- m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order,
- m_max_in_flight_appends));
- {
- Mutex::Locker locker(*lock);
- object->set_append_batch_options(m_flush_interval, m_flush_bytes,
- m_flush_age);
- }
- m_object_recorders.push_back(object);
- m_object_recorder_locks.insert(std::make_pair(oid, lock));
- m_handler.object_lock = lock;
- return object;
- }
};
TEST_F(TestObjectRecorder, Append) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- set_batch_options(0, 0, 0, 0);
- shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
- journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0, 0);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
- lock->Lock();
+ lock.lock();
ASSERT_FALSE(object->append(std::move(append_buffers)));
- lock->Unlock();
+ lock.unlock();
ASSERT_EQ(0U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
- lock->Lock();
+ lock.lock();
ASSERT_FALSE(object->append(std::move(append_buffers)));
- lock->Unlock();
+ lock.unlock();
ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- set_batch_options(2, 0, 0, -1);
- shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
- journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
- lock->Lock();
+ lock.lock();
ASSERT_FALSE(object->append(std::move(append_buffers)));
- lock->Unlock();
+ lock.unlock();
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
- lock->Lock();
+ lock.lock();
ASSERT_FALSE(object->append(std::move(append_buffers)));
- lock->Unlock();
+ lock.unlock();
ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- set_batch_options(0, 10, 0, -1);
- shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
- journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
- lock->Lock();
+ lock.lock();
ASSERT_FALSE(object->append(std::move(append_buffers)));
- lock->Unlock();
+ lock.unlock();
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
- lock->Lock();
+ lock.lock();
ASSERT_FALSE(object->append(std::move(append_buffers)));
- lock->Unlock();
+ lock.unlock();
ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- set_batch_options(0, 0, 0.1, -1);
- shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
- journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0005, -1);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
- lock->Lock();
+ lock.lock();
ASSERT_FALSE(object->append(std::move(append_buffers)));
- lock->Unlock();
+ lock.unlock();
- journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
- "payload");
- append_buffers = {append_buffer2};
- lock->Lock();
- ASSERT_FALSE(object->append(std::move(append_buffers)));
- lock->Unlock();
+ uint32_t offset = 0;
+ journal::AppendBuffer append_buffer2;
+ while (!append_buffer1.first->is_flush_in_progress() &&
+ !append_buffer1.first->is_complete()) {
+ usleep(1000);
+
+ append_buffer2 = create_append_buffer(234, 124 + offset, "payload");
+ ++offset;
+ append_buffers = {append_buffer2};
+
+ lock.lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock.unlock();
+ }
C_SaferCond cond;
append_buffer2.first->wait(&cond);
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
- journal::ObjectRecorderPtr object = create_object(oid, 12, lock);
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0, -1);
+ auto object = flusher.create_object(oid, 12, &lock);
std::string payload(2048, '1');
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
payload);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
- lock->Lock();
+ lock.lock();
ASSERT_FALSE(object->append(std::move(append_buffers)));
- lock->Unlock();
+ lock.unlock();
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
payload);
append_buffers = {append_buffer2};
- lock->Lock();
+ lock.lock();
ASSERT_TRUE(object->append(std::move(append_buffers)));
- lock->Unlock();
+ lock.unlock();
C_SaferCond cond;
append_buffer2.first->wait(&cond);
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- set_batch_options(0, 10, 0, -1);
- shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
- journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
- lock->Lock();
+ lock.lock();
ASSERT_FALSE(object->append(std::move(append_buffers)));
- lock->Unlock();
+ lock.unlock();
ASSERT_EQ(1U, object->get_pending_appends());
C_SaferCond cond1;
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- set_batch_options(0, 10, 0, -1);
- shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
- journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer};
- lock->Lock();
+ lock.lock();
ASSERT_FALSE(object->append(std::move(append_buffers)));
- lock->Unlock();
+ lock.unlock();
ASSERT_EQ(1U, object->get_pending_appends());
C_SaferCond cond;
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
- journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
"payload");
object->flush(append_buffer.first);
ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
- lock->Lock();
+ lock.lock();
ASSERT_FALSE(object->append(std::move(append_buffers)));
- lock->Unlock();
+ lock.unlock();
// should automatically flush once its attached to the object
C_SaferCond cond;
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- set_batch_options(2, 0, 0, -1);
- shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
- journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+ ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1);
+ auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
- lock->Lock();
+ lock.lock();
ASSERT_FALSE(object->append(std::move(append_buffers)));
- lock->Unlock();
+ lock.unlock();
ASSERT_EQ(1U, object->get_pending_appends());
- lock->Lock();
+ lock.lock();
ASSERT_FALSE(object->close());
- ASSERT_TRUE(lock->is_locked());
- lock->Unlock();
-
- {
- Mutex::Locker locker(m_handler.lock);
- while (!m_handler.is_closed) {
- if (m_handler.cond.WaitInterval(
- m_handler.lock, utime_t(10, 0)) != 0) {
- break;
- }
- }
- }
+ ASSERT_TRUE(ceph_mutex_is_locked(lock));
+ lock.unlock();
+
+ ASSERT_TRUE(flusher.wait_for_closed());
- ASSERT_TRUE(m_handler.is_closed);
ASSERT_EQ(0U, object->get_pending_appends());
}
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
- journal::JournalMetadataPtr metadata = create_metadata(oid);
+ auto metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- shared_ptr<Mutex> lock1(new Mutex("object_recorder_lock_1"));
- journal::ObjectRecorderPtr object1 = create_object(oid, 12, lock1);
+ ceph::mutex lock1 = ceph::make_mutex("object_recorder_lock_1");
+ ceph::mutex lock2 = ceph::make_mutex("object_recorder_lock_2");
+
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
+ auto object1 = flusher.create_object(oid, 12, &lock1);
std::string payload(1 << 11, '1');
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
payload);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1, append_buffer2};
- lock1->Lock();
+ lock1.lock();
ASSERT_TRUE(object1->append(std::move(append_buffers)));
- lock1->Unlock();
+ lock1.unlock();
C_SaferCond cond;
append_buffer2.first->wait(&cond);
ASSERT_EQ(0, cond.wait());
ASSERT_EQ(0U, object1->get_pending_appends());
- bool overflowed = false;
- {
- Mutex::Locker locker(m_handler.lock);
- while (m_handler.overflows == 0) {
- if (m_handler.cond.WaitInterval(
- m_handler.lock, utime_t(10, 0)) != 0) {
- break;
- }
- }
- if (m_handler.overflows != 0) {
- overflowed = true;
- m_handler.overflows = 0;
- }
- }
-
- ASSERT_TRUE(overflowed);
-
- shared_ptr<Mutex> lock2(new Mutex("object_recorder_lock_2"));
- journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2);
+ auto object2 = flusher.create_object(oid, 12, &lock2);
journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,
payload);
append_buffers = {append_buffer3};
- lock2->Lock();
+ lock2.lock();
ASSERT_FALSE(object2->append(std::move(append_buffers)));
- lock2->Unlock();
+ lock2.unlock();
append_buffer3.first->flush(NULL);
- overflowed = false;
- {
- Mutex::Locker locker(m_handler.lock);
- while (m_handler.overflows == 0) {
- if (m_handler.cond.WaitInterval(
- m_handler.lock, utime_t(10, 0)) != 0) {
- break;
- }
- }
- if (m_handler.overflows != 0) {
- overflowed = true;
- }
- }
-
- ASSERT_TRUE(overflowed);
+ ASSERT_TRUE(flusher.wait_for_overflow());
}