]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/journal/test_ObjectRecorder.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / test / journal / test_ObjectRecorder.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/ObjectRecorder.h"
5 #include "common/Cond.h"
6 #include "common/Mutex.h"
7 #include "common/Timer.h"
8 #include "gtest/gtest.h"
9 #include "test/librados/test.h"
10 #include "test/journal/RadosTestFixture.h"
11 #include <limits>
12
13 using std::shared_ptr;
14
15 class TestObjectRecorder : public RadosTestFixture {
16 public:
17 TestObjectRecorder()
18 : m_flush_interval(std::numeric_limits<uint32_t>::max()),
19 m_flush_bytes(std::numeric_limits<uint64_t>::max()),
20 m_flush_age(600)
21 {
22 }
23
24 struct Handler : public journal::ObjectRecorder::Handler {
25 Mutex lock;
26 shared_ptr<Mutex> object_lock;
27 Cond cond;
28 bool is_closed = false;
29 uint32_t overflows = 0;
30
31 Handler() : lock("lock") {
32 }
33
34 void closed(journal::ObjectRecorder *object_recorder) override {
35 Mutex::Locker locker(lock);
36 is_closed = true;
37 cond.Signal();
38 }
39 void overflow(journal::ObjectRecorder *object_recorder) override {
40 Mutex::Locker locker(lock);
41 journal::AppendBuffers append_buffers;
42 object_lock->Lock();
43 object_recorder->claim_append_buffers(&append_buffers);
44 object_lock->Unlock();
45
46 ++overflows;
47 cond.Signal();
48 }
49 };
50
51 typedef std::list<journal::ObjectRecorderPtr> ObjectRecorders;
52 typedef std::map<std::string, shared_ptr<Mutex>> ObjectRecorderLocksMap;
53
54 ObjectRecorders m_object_recorders;
55 ObjectRecorderLocksMap m_object_recorder_locks;
56
57 uint32_t m_flush_interval;
58 uint64_t m_flush_bytes;
59 double m_flush_age;
60 uint64_t m_max_in_flight_appends = 0;
61 Handler m_handler;
62
63 void TearDown() override {
64 for (ObjectRecorders::iterator it = m_object_recorders.begin();
65 it != m_object_recorders.end(); ++it) {
66 C_SaferCond cond;
67 (*it)->flush(&cond);
68 cond.wait();
69 }
70 m_object_recorders.clear();
71
72 RadosTestFixture::TearDown();
73 }
74
75 inline void set_batch_options(uint32_t flush_interval, uint64_t flush_bytes,
76 double flush_age, int max_in_flight) {
77 m_flush_interval = flush_interval;
78 m_flush_bytes = flush_bytes;
79 m_flush_age = flush_age;
80 m_max_in_flight_appends = max_in_flight;
81 }
82
83 journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
84 const std::string &payload) {
85 journal::FutureImplPtr future(new journal::FutureImpl(tag_tid, entry_tid,
86 456));
87 future->init(journal::FutureImplPtr());
88
89 bufferlist bl;
90 bl.append(payload);
91 return std::make_pair(future, bl);
92 }
93
94 journal::ObjectRecorderPtr create_object(const std::string &oid,
95 uint8_t order, shared_ptr<Mutex> lock) {
96 journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
97 m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order,
98 m_max_in_flight_appends));
99 {
100 Mutex::Locker locker(*lock);
101 object->set_append_batch_options(m_flush_interval, m_flush_bytes,
102 m_flush_age);
103 }
104 m_object_recorders.push_back(object);
105 m_object_recorder_locks.insert(std::make_pair(oid, lock));
106 m_handler.object_lock = lock;
107 return object;
108 }
109 };
110
111 TEST_F(TestObjectRecorder, Append) {
112 std::string oid = get_temp_oid();
113 ASSERT_EQ(0, create(oid));
114 ASSERT_EQ(0, client_register(oid));
115 journal::JournalMetadataPtr metadata = create_metadata(oid);
116 ASSERT_EQ(0, init_metadata(metadata));
117
118 set_batch_options(0, 0, 0, 0);
119 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
120 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
121
122 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
123 "payload");
124 journal::AppendBuffers append_buffers;
125 append_buffers = {append_buffer1};
126 lock->Lock();
127 ASSERT_FALSE(object->append(std::move(append_buffers)));
128 lock->Unlock();
129 ASSERT_EQ(0U, object->get_pending_appends());
130
131 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
132 "payload");
133 append_buffers = {append_buffer2};
134 lock->Lock();
135 ASSERT_FALSE(object->append(std::move(append_buffers)));
136 lock->Unlock();
137 ASSERT_EQ(0U, object->get_pending_appends());
138
139 C_SaferCond cond;
140 append_buffer2.first->flush(&cond);
141 ASSERT_EQ(0, cond.wait());
142 ASSERT_EQ(0U, object->get_pending_appends());
143 }
144
145 TEST_F(TestObjectRecorder, AppendFlushByCount) {
146 std::string oid = get_temp_oid();
147 ASSERT_EQ(0, create(oid));
148 ASSERT_EQ(0, client_register(oid));
149 journal::JournalMetadataPtr metadata = create_metadata(oid);
150 ASSERT_EQ(0, init_metadata(metadata));
151
152 set_batch_options(2, 0, 0, -1);
153 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
154 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
155
156 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
157 "payload");
158 journal::AppendBuffers append_buffers;
159 append_buffers = {append_buffer1};
160 lock->Lock();
161 ASSERT_FALSE(object->append(std::move(append_buffers)));
162 lock->Unlock();
163 ASSERT_EQ(1U, object->get_pending_appends());
164
165 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
166 "payload");
167 append_buffers = {append_buffer2};
168 lock->Lock();
169 ASSERT_FALSE(object->append(std::move(append_buffers)));
170 lock->Unlock();
171 ASSERT_EQ(0U, object->get_pending_appends());
172
173 C_SaferCond cond;
174 append_buffer2.first->wait(&cond);
175 ASSERT_EQ(0, cond.wait());
176 }
177
178 TEST_F(TestObjectRecorder, AppendFlushByBytes) {
179 std::string oid = get_temp_oid();
180 ASSERT_EQ(0, create(oid));
181 ASSERT_EQ(0, client_register(oid));
182 journal::JournalMetadataPtr metadata = create_metadata(oid);
183 ASSERT_EQ(0, init_metadata(metadata));
184
185 set_batch_options(0, 10, 0, -1);
186 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
187 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
188
189 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
190 "payload");
191 journal::AppendBuffers append_buffers;
192 append_buffers = {append_buffer1};
193 lock->Lock();
194 ASSERT_FALSE(object->append(std::move(append_buffers)));
195 lock->Unlock();
196 ASSERT_EQ(1U, object->get_pending_appends());
197
198 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
199 "payload");
200 append_buffers = {append_buffer2};
201 lock->Lock();
202 ASSERT_FALSE(object->append(std::move(append_buffers)));
203 lock->Unlock();
204 ASSERT_EQ(0U, object->get_pending_appends());
205
206 C_SaferCond cond;
207 append_buffer2.first->wait(&cond);
208 ASSERT_EQ(0, cond.wait());
209 }
210
211 TEST_F(TestObjectRecorder, AppendFlushByAge) {
212 std::string oid = get_temp_oid();
213 ASSERT_EQ(0, create(oid));
214 ASSERT_EQ(0, client_register(oid));
215 journal::JournalMetadataPtr metadata = create_metadata(oid);
216 ASSERT_EQ(0, init_metadata(metadata));
217
218 set_batch_options(0, 0, 0.1, -1);
219 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
220 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
221
222 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
223 "payload");
224 journal::AppendBuffers append_buffers;
225 append_buffers = {append_buffer1};
226 lock->Lock();
227 ASSERT_FALSE(object->append(std::move(append_buffers)));
228 lock->Unlock();
229
230 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
231 "payload");
232 append_buffers = {append_buffer2};
233 lock->Lock();
234 ASSERT_FALSE(object->append(std::move(append_buffers)));
235 lock->Unlock();
236
237 C_SaferCond cond;
238 append_buffer2.first->wait(&cond);
239 ASSERT_EQ(0, cond.wait());
240 ASSERT_EQ(0U, object->get_pending_appends());
241 }
242
243 TEST_F(TestObjectRecorder, AppendFilledObject) {
244 std::string oid = get_temp_oid();
245 ASSERT_EQ(0, create(oid));
246 ASSERT_EQ(0, client_register(oid));
247 journal::JournalMetadataPtr metadata = create_metadata(oid);
248 ASSERT_EQ(0, init_metadata(metadata));
249
250 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
251 journal::ObjectRecorderPtr object = create_object(oid, 12, lock);
252
253 std::string payload(2048, '1');
254 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
255 payload);
256 journal::AppendBuffers append_buffers;
257 append_buffers = {append_buffer1};
258 lock->Lock();
259 ASSERT_FALSE(object->append(std::move(append_buffers)));
260 lock->Unlock();
261
262 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
263 payload);
264 append_buffers = {append_buffer2};
265 lock->Lock();
266 ASSERT_TRUE(object->append(std::move(append_buffers)));
267 lock->Unlock();
268
269 C_SaferCond cond;
270 append_buffer2.first->wait(&cond);
271 ASSERT_EQ(0, cond.wait());
272 ASSERT_EQ(0U, object->get_pending_appends());
273 }
274
275 TEST_F(TestObjectRecorder, Flush) {
276 std::string oid = get_temp_oid();
277 ASSERT_EQ(0, create(oid));
278 ASSERT_EQ(0, client_register(oid));
279 journal::JournalMetadataPtr metadata = create_metadata(oid);
280 ASSERT_EQ(0, init_metadata(metadata));
281
282 set_batch_options(0, 10, 0, -1);
283 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
284 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
285
286 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
287 "payload");
288 journal::AppendBuffers append_buffers;
289 append_buffers = {append_buffer1};
290 lock->Lock();
291 ASSERT_FALSE(object->append(std::move(append_buffers)));
292 lock->Unlock();
293 ASSERT_EQ(1U, object->get_pending_appends());
294
295 C_SaferCond cond1;
296 object->flush(&cond1);
297 ASSERT_EQ(0, cond1.wait());
298
299 C_SaferCond cond2;
300 append_buffer1.first->wait(&cond2);
301 ASSERT_EQ(0, cond2.wait());
302 ASSERT_EQ(0U, object->get_pending_appends());
303 }
304
305 TEST_F(TestObjectRecorder, FlushFuture) {
306 std::string oid = get_temp_oid();
307 ASSERT_EQ(0, create(oid));
308 ASSERT_EQ(0, client_register(oid));
309 journal::JournalMetadataPtr metadata = create_metadata(oid);
310 ASSERT_EQ(0, init_metadata(metadata));
311
312 set_batch_options(0, 10, 0, -1);
313 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
314 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
315
316 journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
317 "payload");
318 journal::AppendBuffers append_buffers;
319 append_buffers = {append_buffer};
320 lock->Lock();
321 ASSERT_FALSE(object->append(std::move(append_buffers)));
322 lock->Unlock();
323 ASSERT_EQ(1U, object->get_pending_appends());
324
325 C_SaferCond cond;
326 append_buffer.first->wait(&cond);
327 object->flush(append_buffer.first);
328 ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
329 append_buffer.first->is_complete());
330 ASSERT_EQ(0, cond.wait());
331 }
332
333 TEST_F(TestObjectRecorder, FlushDetachedFuture) {
334 std::string oid = get_temp_oid();
335 ASSERT_EQ(0, create(oid));
336 ASSERT_EQ(0, client_register(oid));
337 journal::JournalMetadataPtr metadata = create_metadata(oid);
338 ASSERT_EQ(0, init_metadata(metadata));
339
340 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
341 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
342
343 journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
344 "payload");
345
346 journal::AppendBuffers append_buffers;
347 append_buffers = {append_buffer};
348
349 object->flush(append_buffer.first);
350 ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
351 lock->Lock();
352 ASSERT_FALSE(object->append(std::move(append_buffers)));
353 lock->Unlock();
354
355 // should automatically flush once its attached to the object
356 C_SaferCond cond;
357 append_buffer.first->wait(&cond);
358 ASSERT_EQ(0, cond.wait());
359 }
360
361 TEST_F(TestObjectRecorder, Close) {
362 std::string oid = get_temp_oid();
363 ASSERT_EQ(0, create(oid));
364 ASSERT_EQ(0, client_register(oid));
365 journal::JournalMetadataPtr metadata = create_metadata(oid);
366 ASSERT_EQ(0, init_metadata(metadata));
367
368 set_batch_options(2, 0, 0, -1);
369 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
370 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
371
372 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
373 "payload");
374 journal::AppendBuffers append_buffers;
375 append_buffers = {append_buffer1};
376 lock->Lock();
377 ASSERT_FALSE(object->append(std::move(append_buffers)));
378 lock->Unlock();
379 ASSERT_EQ(1U, object->get_pending_appends());
380
381 lock->Lock();
382 ASSERT_FALSE(object->close());
383 ASSERT_TRUE(lock->is_locked());
384 lock->Unlock();
385
386 {
387 Mutex::Locker locker(m_handler.lock);
388 while (!m_handler.is_closed) {
389 if (m_handler.cond.WaitInterval(
390 m_handler.lock, utime_t(10, 0)) != 0) {
391 break;
392 }
393 }
394 }
395
396 ASSERT_TRUE(m_handler.is_closed);
397 ASSERT_EQ(0U, object->get_pending_appends());
398 }
399
400 TEST_F(TestObjectRecorder, Overflow) {
401 std::string oid = get_temp_oid();
402 ASSERT_EQ(0, create(oid));
403 ASSERT_EQ(0, client_register(oid));
404 journal::JournalMetadataPtr metadata = create_metadata(oid);
405 ASSERT_EQ(0, init_metadata(metadata));
406
407 shared_ptr<Mutex> lock1(new Mutex("object_recorder_lock_1"));
408 journal::ObjectRecorderPtr object1 = create_object(oid, 12, lock1);
409
410 std::string payload(1 << 11, '1');
411 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
412 payload);
413 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
414 payload);
415 journal::AppendBuffers append_buffers;
416 append_buffers = {append_buffer1, append_buffer2};
417 lock1->Lock();
418 ASSERT_TRUE(object1->append(std::move(append_buffers)));
419 lock1->Unlock();
420
421 C_SaferCond cond;
422 append_buffer2.first->wait(&cond);
423 ASSERT_EQ(0, cond.wait());
424 ASSERT_EQ(0U, object1->get_pending_appends());
425
426 bool overflowed = false;
427 {
428 Mutex::Locker locker(m_handler.lock);
429 while (m_handler.overflows == 0) {
430 if (m_handler.cond.WaitInterval(
431 m_handler.lock, utime_t(10, 0)) != 0) {
432 break;
433 }
434 }
435 if (m_handler.overflows != 0) {
436 overflowed = true;
437 m_handler.overflows = 0;
438 }
439 }
440
441 ASSERT_TRUE(overflowed);
442
443 shared_ptr<Mutex> lock2(new Mutex("object_recorder_lock_2"));
444 journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2);
445
446 journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,
447 payload);
448 append_buffers = {append_buffer3};
449 lock2->Lock();
450 ASSERT_FALSE(object2->append(std::move(append_buffers)));
451 lock2->Unlock();
452 append_buffer3.first->flush(NULL);
453
454 overflowed = false;
455 {
456 Mutex::Locker locker(m_handler.lock);
457 while (m_handler.overflows == 0) {
458 if (m_handler.cond.WaitInterval(
459 m_handler.lock, utime_t(10, 0)) != 0) {
460 break;
461 }
462 }
463 if (m_handler.overflows != 0) {
464 overflowed = true;
465 }
466 }
467
468 ASSERT_TRUE(overflowed);
469 }