RadosTestFixture::TearDown();
}
- inline void set_flush_interval(uint32_t i) {
- m_flush_interval = i;
- }
- inline void set_flush_bytes(uint64_t i) {
- m_flush_bytes = i;
- }
- inline void set_flush_age(double i) {
- m_flush_age = i;
+ inline void set_batch_options(uint32_t flush_interval, uint64_t flush_bytes,
+ double flush_age, int max_in_flight) {
+ m_flush_interval = flush_interval;
+ m_flush_bytes = flush_bytes;
+ m_flush_age = flush_age;
+ m_max_in_flight_appends = max_in_flight;
}
journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
journal::ObjectRecorderPtr create_object(const std::string &oid,
uint8_t order, shared_ptr<Mutex> lock) {
journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
- m_ioctx, oid, 0, lock, m_work_queue, *m_timer, m_timer_lock, &m_handler,
- order, m_flush_interval, m_flush_bytes, m_flush_age,
+ m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order,
m_max_in_flight_appends));
+ {
+ Mutex::Locker locker(*lock);
+ object->set_append_batch_options(m_flush_interval, m_flush_bytes,
+ m_flush_age);
+ }
m_object_recorders.push_back(object);
m_object_recorder_locks.insert(std::make_pair(oid, lock));
m_handler.object_lock = lock;
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
+ set_batch_options(0, 0, 0, 0);
shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
- ASSERT_EQ(1U, object->get_pending_appends());
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+ ASSERT_EQ(0U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
- ASSERT_EQ(2U, object->get_pending_appends());
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+ ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
append_buffer2.first->flush(&cond);
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- set_flush_interval(2);
+ set_batch_options(2, 0, 0, -1);
shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- set_flush_bytes(10);
+ set_batch_options(0, 10, 0, -1);
shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- set_flush_age(0.1);
+ set_batch_options(0, 0, 0.1, -1);
shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
C_SaferCond cond;
append_buffer2.first->wait(&cond);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
payload);
append_buffers = {append_buffer2};
lock->Lock();
- ASSERT_TRUE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_TRUE(object->append(std::move(append_buffers)));
+ lock->Unlock();
C_SaferCond cond;
append_buffer2.first->wait(&cond);
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
+ set_batch_options(0, 10, 0, -1);
shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
ASSERT_EQ(1U, object->get_pending_appends());
C_SaferCond cond1;
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
+ set_batch_options(0, 10, 0, -1);
shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
ASSERT_EQ(1U, object->get_pending_appends());
C_SaferCond cond;
append_buffer.first->wait(&cond);
- lock->Lock();
object->flush(append_buffer.first);
- ASSERT_TRUE(lock->is_locked());
- lock->Unlock();
ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
append_buffer.first->is_complete());
ASSERT_EQ(0, cond.wait());
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer};
- lock->Lock();
object->flush(append_buffer.first);
- ASSERT_TRUE(lock->is_locked());
- lock->Unlock();
ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
// should automatically flush once its attached to the object
C_SaferCond cond;
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- set_flush_interval(2);
+ set_batch_options(2, 0, 0, -1);
shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
ASSERT_EQ(1U, object->get_pending_appends());
lock->Lock();
shared_ptr<Mutex> lock1(new Mutex("object_recorder_lock_1"));
journal::ObjectRecorderPtr object1 = create_object(oid, 12, lock1);
- shared_ptr<Mutex> lock2(new Mutex("object_recorder_lock_2"));
- journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2);
- std::string payload(2048, '1');
+ std::string payload(1 << 11, '1');
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
payload);
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1, append_buffer2};
lock1->Lock();
- ASSERT_TRUE(object1->append_unlock(std::move(append_buffers)));
+ ASSERT_TRUE(object1->append(std::move(append_buffers)));
+ lock1->Unlock();
C_SaferCond cond;
append_buffer2.first->wait(&cond);
ASSERT_EQ(0, cond.wait());
ASSERT_EQ(0U, object1->get_pending_appends());
+ bool overflowed = false;
+ {
+ Mutex::Locker locker(m_handler.lock);
+ while (m_handler.overflows == 0) {
+ if (m_handler.cond.WaitInterval(
+ m_handler.lock, utime_t(10, 0)) != 0) {
+ break;
+ }
+ }
+ if (m_handler.overflows != 0) {
+ overflowed = true;
+ m_handler.overflows = 0;
+ }
+ }
+
+ ASSERT_TRUE(overflowed);
+
+ shared_ptr<Mutex> lock2(new Mutex("object_recorder_lock_2"));
+ journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2);
+
journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,
payload);
append_buffers = {append_buffer3};
-
lock2->Lock();
- ASSERT_FALSE(object2->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object2->append(std::move(append_buffers)));
+ lock2->Unlock();
append_buffer3.first->flush(NULL);
- bool overflowed = false;
+ overflowed = false;
{
Mutex::Locker locker(m_handler.lock);
while (m_handler.overflows == 0) {