1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "journal/ObjectRecorder.h"
5 #include "common/Cond.h"
6 #include "common/Mutex.h"
7 #include "common/Timer.h"
8 #include "gtest/gtest.h"
9 #include "test/librados/test.h"
10 #include "test/journal/RadosTestFixture.h"
13 using std::shared_ptr
;
15 class TestObjectRecorder
: public RadosTestFixture
{
18 : m_flush_interval(std::numeric_limits
<uint32_t>::max()),
19 m_flush_bytes(std::numeric_limits
<uint64_t>::max()),
24 struct Handler
: public journal::ObjectRecorder::Handler
{
26 shared_ptr
<Mutex
> object_lock
;
28 bool is_closed
= false;
29 uint32_t overflows
= 0;
31 Handler() : lock("lock") {
34 void closed(journal::ObjectRecorder
*object_recorder
) override
{
35 Mutex::Locker
locker(lock
);
39 void overflow(journal::ObjectRecorder
*object_recorder
) override
{
40 Mutex::Locker
locker(lock
);
41 journal::AppendBuffers append_buffers
;
43 object_recorder
->claim_append_buffers(&append_buffers
);
44 object_lock
->Unlock();
51 typedef std::list
<journal::ObjectRecorderPtr
> ObjectRecorders
;
52 typedef std::map
<std::string
, shared_ptr
<Mutex
>> ObjectRecorderLocksMap
;
54 ObjectRecorders m_object_recorders
;
55 ObjectRecorderLocksMap m_object_recorder_locks
;
57 uint32_t m_flush_interval
;
58 uint64_t m_flush_bytes
;
60 uint64_t m_max_in_flight_appends
= 0;
63 void TearDown() override
{
64 for (ObjectRecorders::iterator it
= m_object_recorders
.begin();
65 it
!= m_object_recorders
.end(); ++it
) {
70 m_object_recorders
.clear();
72 RadosTestFixture::TearDown();
75 inline void set_batch_options(uint32_t flush_interval
, uint64_t flush_bytes
,
76 double flush_age
, int max_in_flight
) {
77 m_flush_interval
= flush_interval
;
78 m_flush_bytes
= flush_bytes
;
79 m_flush_age
= flush_age
;
80 m_max_in_flight_appends
= max_in_flight
;
83 journal::AppendBuffer
create_append_buffer(uint64_t tag_tid
, uint64_t entry_tid
,
84 const std::string
&payload
) {
85 journal::FutureImplPtr
future(new journal::FutureImpl(tag_tid
, entry_tid
,
87 future
->init(journal::FutureImplPtr());
91 return std::make_pair(future
, bl
);
94 journal::ObjectRecorderPtr
create_object(const std::string
&oid
,
95 uint8_t order
, shared_ptr
<Mutex
> lock
) {
96 journal::ObjectRecorderPtr
object(new journal::ObjectRecorder(
97 m_ioctx
, oid
, 0, lock
, m_work_queue
, &m_handler
, order
,
98 m_max_in_flight_appends
));
100 Mutex::Locker
locker(*lock
);
101 object
->set_append_batch_options(m_flush_interval
, m_flush_bytes
,
104 m_object_recorders
.push_back(object
);
105 m_object_recorder_locks
.insert(std::make_pair(oid
, lock
));
106 m_handler
.object_lock
= lock
;
111 TEST_F(TestObjectRecorder
, Append
) {
112 std::string oid
= get_temp_oid();
113 ASSERT_EQ(0, create(oid
));
114 ASSERT_EQ(0, client_register(oid
));
115 journal::JournalMetadataPtr metadata
= create_metadata(oid
);
116 ASSERT_EQ(0, init_metadata(metadata
));
118 set_batch_options(0, 0, 0, 0);
119 shared_ptr
<Mutex
> lock(new Mutex("object_recorder_lock"));
120 journal::ObjectRecorderPtr object
= create_object(oid
, 24, lock
);
122 journal::AppendBuffer append_buffer1
= create_append_buffer(234, 123,
124 journal::AppendBuffers append_buffers
;
125 append_buffers
= {append_buffer1
};
127 ASSERT_FALSE(object
->append(std::move(append_buffers
)));
129 ASSERT_EQ(0U, object
->get_pending_appends());
131 journal::AppendBuffer append_buffer2
= create_append_buffer(234, 124,
133 append_buffers
= {append_buffer2
};
135 ASSERT_FALSE(object
->append(std::move(append_buffers
)));
137 ASSERT_EQ(0U, object
->get_pending_appends());
140 append_buffer2
.first
->flush(&cond
);
141 ASSERT_EQ(0, cond
.wait());
142 ASSERT_EQ(0U, object
->get_pending_appends());
145 TEST_F(TestObjectRecorder
, AppendFlushByCount
) {
146 std::string oid
= get_temp_oid();
147 ASSERT_EQ(0, create(oid
));
148 ASSERT_EQ(0, client_register(oid
));
149 journal::JournalMetadataPtr metadata
= create_metadata(oid
);
150 ASSERT_EQ(0, init_metadata(metadata
));
152 set_batch_options(2, 0, 0, -1);
153 shared_ptr
<Mutex
> lock(new Mutex("object_recorder_lock"));
154 journal::ObjectRecorderPtr object
= create_object(oid
, 24, lock
);
156 journal::AppendBuffer append_buffer1
= create_append_buffer(234, 123,
158 journal::AppendBuffers append_buffers
;
159 append_buffers
= {append_buffer1
};
161 ASSERT_FALSE(object
->append(std::move(append_buffers
)));
163 ASSERT_EQ(1U, object
->get_pending_appends());
165 journal::AppendBuffer append_buffer2
= create_append_buffer(234, 124,
167 append_buffers
= {append_buffer2
};
169 ASSERT_FALSE(object
->append(std::move(append_buffers
)));
171 ASSERT_EQ(0U, object
->get_pending_appends());
174 append_buffer2
.first
->wait(&cond
);
175 ASSERT_EQ(0, cond
.wait());
178 TEST_F(TestObjectRecorder
, AppendFlushByBytes
) {
179 std::string oid
= get_temp_oid();
180 ASSERT_EQ(0, create(oid
));
181 ASSERT_EQ(0, client_register(oid
));
182 journal::JournalMetadataPtr metadata
= create_metadata(oid
);
183 ASSERT_EQ(0, init_metadata(metadata
));
185 set_batch_options(0, 10, 0, -1);
186 shared_ptr
<Mutex
> lock(new Mutex("object_recorder_lock"));
187 journal::ObjectRecorderPtr object
= create_object(oid
, 24, lock
);
189 journal::AppendBuffer append_buffer1
= create_append_buffer(234, 123,
191 journal::AppendBuffers append_buffers
;
192 append_buffers
= {append_buffer1
};
194 ASSERT_FALSE(object
->append(std::move(append_buffers
)));
196 ASSERT_EQ(1U, object
->get_pending_appends());
198 journal::AppendBuffer append_buffer2
= create_append_buffer(234, 124,
200 append_buffers
= {append_buffer2
};
202 ASSERT_FALSE(object
->append(std::move(append_buffers
)));
204 ASSERT_EQ(0U, object
->get_pending_appends());
207 append_buffer2
.first
->wait(&cond
);
208 ASSERT_EQ(0, cond
.wait());
211 TEST_F(TestObjectRecorder
, AppendFlushByAge
) {
212 std::string oid
= get_temp_oid();
213 ASSERT_EQ(0, create(oid
));
214 ASSERT_EQ(0, client_register(oid
));
215 journal::JournalMetadataPtr metadata
= create_metadata(oid
);
216 ASSERT_EQ(0, init_metadata(metadata
));
218 set_batch_options(0, 0, 0.1, -1);
219 shared_ptr
<Mutex
> lock(new Mutex("object_recorder_lock"));
220 journal::ObjectRecorderPtr object
= create_object(oid
, 24, lock
);
222 journal::AppendBuffer append_buffer1
= create_append_buffer(234, 123,
224 journal::AppendBuffers append_buffers
;
225 append_buffers
= {append_buffer1
};
227 ASSERT_FALSE(object
->append(std::move(append_buffers
)));
230 journal::AppendBuffer append_buffer2
= create_append_buffer(234, 124,
232 append_buffers
= {append_buffer2
};
234 ASSERT_FALSE(object
->append(std::move(append_buffers
)));
238 append_buffer2
.first
->wait(&cond
);
239 ASSERT_EQ(0, cond
.wait());
240 ASSERT_EQ(0U, object
->get_pending_appends());
243 TEST_F(TestObjectRecorder
, AppendFilledObject
) {
244 std::string oid
= get_temp_oid();
245 ASSERT_EQ(0, create(oid
));
246 ASSERT_EQ(0, client_register(oid
));
247 journal::JournalMetadataPtr metadata
= create_metadata(oid
);
248 ASSERT_EQ(0, init_metadata(metadata
));
250 shared_ptr
<Mutex
> lock(new Mutex("object_recorder_lock"));
251 journal::ObjectRecorderPtr object
= create_object(oid
, 12, lock
);
253 std::string
payload(2048, '1');
254 journal::AppendBuffer append_buffer1
= create_append_buffer(234, 123,
256 journal::AppendBuffers append_buffers
;
257 append_buffers
= {append_buffer1
};
259 ASSERT_FALSE(object
->append(std::move(append_buffers
)));
262 journal::AppendBuffer append_buffer2
= create_append_buffer(234, 124,
264 append_buffers
= {append_buffer2
};
266 ASSERT_TRUE(object
->append(std::move(append_buffers
)));
270 append_buffer2
.first
->wait(&cond
);
271 ASSERT_EQ(0, cond
.wait());
272 ASSERT_EQ(0U, object
->get_pending_appends());
275 TEST_F(TestObjectRecorder
, Flush
) {
276 std::string oid
= get_temp_oid();
277 ASSERT_EQ(0, create(oid
));
278 ASSERT_EQ(0, client_register(oid
));
279 journal::JournalMetadataPtr metadata
= create_metadata(oid
);
280 ASSERT_EQ(0, init_metadata(metadata
));
282 set_batch_options(0, 10, 0, -1);
283 shared_ptr
<Mutex
> lock(new Mutex("object_recorder_lock"));
284 journal::ObjectRecorderPtr object
= create_object(oid
, 24, lock
);
286 journal::AppendBuffer append_buffer1
= create_append_buffer(234, 123,
288 journal::AppendBuffers append_buffers
;
289 append_buffers
= {append_buffer1
};
291 ASSERT_FALSE(object
->append(std::move(append_buffers
)));
293 ASSERT_EQ(1U, object
->get_pending_appends());
296 object
->flush(&cond1
);
297 ASSERT_EQ(0, cond1
.wait());
300 append_buffer1
.first
->wait(&cond2
);
301 ASSERT_EQ(0, cond2
.wait());
302 ASSERT_EQ(0U, object
->get_pending_appends());
305 TEST_F(TestObjectRecorder
, FlushFuture
) {
306 std::string oid
= get_temp_oid();
307 ASSERT_EQ(0, create(oid
));
308 ASSERT_EQ(0, client_register(oid
));
309 journal::JournalMetadataPtr metadata
= create_metadata(oid
);
310 ASSERT_EQ(0, init_metadata(metadata
));
312 set_batch_options(0, 10, 0, -1);
313 shared_ptr
<Mutex
> lock(new Mutex("object_recorder_lock"));
314 journal::ObjectRecorderPtr object
= create_object(oid
, 24, lock
);
316 journal::AppendBuffer append_buffer
= create_append_buffer(234, 123,
318 journal::AppendBuffers append_buffers
;
319 append_buffers
= {append_buffer
};
321 ASSERT_FALSE(object
->append(std::move(append_buffers
)));
323 ASSERT_EQ(1U, object
->get_pending_appends());
326 append_buffer
.first
->wait(&cond
);
327 object
->flush(append_buffer
.first
);
328 ASSERT_TRUE(append_buffer
.first
->is_flush_in_progress() ||
329 append_buffer
.first
->is_complete());
330 ASSERT_EQ(0, cond
.wait());
333 TEST_F(TestObjectRecorder
, FlushDetachedFuture
) {
334 std::string oid
= get_temp_oid();
335 ASSERT_EQ(0, create(oid
));
336 ASSERT_EQ(0, client_register(oid
));
337 journal::JournalMetadataPtr metadata
= create_metadata(oid
);
338 ASSERT_EQ(0, init_metadata(metadata
));
340 shared_ptr
<Mutex
> lock(new Mutex("object_recorder_lock"));
341 journal::ObjectRecorderPtr object
= create_object(oid
, 24, lock
);
343 journal::AppendBuffer append_buffer
= create_append_buffer(234, 123,
346 journal::AppendBuffers append_buffers
;
347 append_buffers
= {append_buffer
};
349 object
->flush(append_buffer
.first
);
350 ASSERT_FALSE(append_buffer
.first
->is_flush_in_progress());
352 ASSERT_FALSE(object
->append(std::move(append_buffers
)));
355 // should automatically flush once its attached to the object
357 append_buffer
.first
->wait(&cond
);
358 ASSERT_EQ(0, cond
.wait());
361 TEST_F(TestObjectRecorder
, Close
) {
362 std::string oid
= get_temp_oid();
363 ASSERT_EQ(0, create(oid
));
364 ASSERT_EQ(0, client_register(oid
));
365 journal::JournalMetadataPtr metadata
= create_metadata(oid
);
366 ASSERT_EQ(0, init_metadata(metadata
));
368 set_batch_options(2, 0, 0, -1);
369 shared_ptr
<Mutex
> lock(new Mutex("object_recorder_lock"));
370 journal::ObjectRecorderPtr object
= create_object(oid
, 24, lock
);
372 journal::AppendBuffer append_buffer1
= create_append_buffer(234, 123,
374 journal::AppendBuffers append_buffers
;
375 append_buffers
= {append_buffer1
};
377 ASSERT_FALSE(object
->append(std::move(append_buffers
)));
379 ASSERT_EQ(1U, object
->get_pending_appends());
382 ASSERT_FALSE(object
->close());
383 ASSERT_TRUE(lock
->is_locked());
387 Mutex::Locker
locker(m_handler
.lock
);
388 while (!m_handler
.is_closed
) {
389 if (m_handler
.cond
.WaitInterval(
390 m_handler
.lock
, utime_t(10, 0)) != 0) {
396 ASSERT_TRUE(m_handler
.is_closed
);
397 ASSERT_EQ(0U, object
->get_pending_appends());
400 TEST_F(TestObjectRecorder
, Overflow
) {
401 std::string oid
= get_temp_oid();
402 ASSERT_EQ(0, create(oid
));
403 ASSERT_EQ(0, client_register(oid
));
404 journal::JournalMetadataPtr metadata
= create_metadata(oid
);
405 ASSERT_EQ(0, init_metadata(metadata
));
407 shared_ptr
<Mutex
> lock1(new Mutex("object_recorder_lock_1"));
408 journal::ObjectRecorderPtr object1
= create_object(oid
, 12, lock1
);
410 std::string
payload(1 << 11, '1');
411 journal::AppendBuffer append_buffer1
= create_append_buffer(234, 123,
413 journal::AppendBuffer append_buffer2
= create_append_buffer(234, 124,
415 journal::AppendBuffers append_buffers
;
416 append_buffers
= {append_buffer1
, append_buffer2
};
418 ASSERT_TRUE(object1
->append(std::move(append_buffers
)));
422 append_buffer2
.first
->wait(&cond
);
423 ASSERT_EQ(0, cond
.wait());
424 ASSERT_EQ(0U, object1
->get_pending_appends());
426 bool overflowed
= false;
428 Mutex::Locker
locker(m_handler
.lock
);
429 while (m_handler
.overflows
== 0) {
430 if (m_handler
.cond
.WaitInterval(
431 m_handler
.lock
, utime_t(10, 0)) != 0) {
435 if (m_handler
.overflows
!= 0) {
437 m_handler
.overflows
= 0;
441 ASSERT_TRUE(overflowed
);
443 shared_ptr
<Mutex
> lock2(new Mutex("object_recorder_lock_2"));
444 journal::ObjectRecorderPtr object2
= create_object(oid
, 12, lock2
);
446 journal::AppendBuffer append_buffer3
= create_append_buffer(456, 123,
448 append_buffers
= {append_buffer3
};
450 ASSERT_FALSE(object2
->append(std::move(append_buffers
)));
452 append_buffer3
.first
->flush(NULL
);
456 Mutex::Locker
locker(m_handler
.lock
);
457 while (m_handler
.overflows
== 0) {
458 if (m_handler
.cond
.WaitInterval(
459 m_handler
.lock
, utime_t(10, 0)) != 0) {
463 if (m_handler
.overflows
!= 0) {
468 ASSERT_TRUE(overflowed
);