]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/journal/test_ObjectRecorder.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / test / journal / test_ObjectRecorder.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "journal/ObjectRecorder.h"
5#include "common/Cond.h"
6#include "common/Mutex.h"
7#include "common/Timer.h"
8#include "gtest/gtest.h"
9#include "test/librados/test.h"
10#include "test/journal/RadosTestFixture.h"
11#include <limits>
12
13using std::shared_ptr;
14
15class TestObjectRecorder : public RadosTestFixture {
16public:
17 TestObjectRecorder()
18 : m_flush_interval(std::numeric_limits<uint32_t>::max()),
19 m_flush_bytes(std::numeric_limits<uint64_t>::max()),
20 m_flush_age(600)
21 {
22 }
23
24 struct Handler : public journal::ObjectRecorder::Handler {
25 Mutex lock;
26 shared_ptr<Mutex> object_lock;
27 Cond cond;
28 bool is_closed = false;
29 uint32_t overflows = 0;
30
31 Handler() : lock("lock") {
32 }
33
34 void closed(journal::ObjectRecorder *object_recorder) override {
35 Mutex::Locker locker(lock);
36 is_closed = true;
37 cond.Signal();
38 }
39 void overflow(journal::ObjectRecorder *object_recorder) override {
40 Mutex::Locker locker(lock);
41 journal::AppendBuffers append_buffers;
42 object_lock->Lock();
43 object_recorder->claim_append_buffers(&append_buffers);
44 object_lock->Unlock();
45
46 ++overflows;
47 cond.Signal();
48 }
49 };
50
51 typedef std::list<journal::ObjectRecorderPtr> ObjectRecorders;
52 typedef std::map<std::string, shared_ptr<Mutex>> ObjectRecorderLocksMap;
53
54 ObjectRecorders m_object_recorders;
55 ObjectRecorderLocksMap m_object_recorder_locks;
56
57 uint32_t m_flush_interval;
58 uint64_t m_flush_bytes;
59 double m_flush_age;
11fdf7f2 60 uint64_t m_max_in_flight_appends = 0;
7c673cae
FG
61 Handler m_handler;
62
63 void TearDown() override {
64 for (ObjectRecorders::iterator it = m_object_recorders.begin();
65 it != m_object_recorders.end(); ++it) {
66 C_SaferCond cond;
67 (*it)->flush(&cond);
68 cond.wait();
69 }
70 m_object_recorders.clear();
71
72 RadosTestFixture::TearDown();
73 }
74
75 inline void set_flush_interval(uint32_t i) {
76 m_flush_interval = i;
77 }
78 inline void set_flush_bytes(uint64_t i) {
79 m_flush_bytes = i;
80 }
81 inline void set_flush_age(double i) {
82 m_flush_age = i;
83 }
84
85 journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
86 const std::string &payload) {
87 journal::FutureImplPtr future(new journal::FutureImpl(tag_tid, entry_tid,
88 456));
89 future->init(journal::FutureImplPtr());
90
91 bufferlist bl;
92 bl.append(payload);
93 return std::make_pair(future, bl);
94 }
95
96 journal::ObjectRecorderPtr create_object(const std::string &oid,
97 uint8_t order, shared_ptr<Mutex> lock) {
98 journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
99 m_ioctx, oid, 0, lock, m_work_queue, *m_timer, m_timer_lock, &m_handler,
11fdf7f2
TL
100 order, m_flush_interval, m_flush_bytes, m_flush_age,
101 m_max_in_flight_appends));
7c673cae
FG
102 m_object_recorders.push_back(object);
103 m_object_recorder_locks.insert(std::make_pair(oid, lock));
104 m_handler.object_lock = lock;
105 return object;
106 }
107};
108
109TEST_F(TestObjectRecorder, Append) {
110 std::string oid = get_temp_oid();
111 ASSERT_EQ(0, create(oid));
112 ASSERT_EQ(0, client_register(oid));
113 journal::JournalMetadataPtr metadata = create_metadata(oid);
114 ASSERT_EQ(0, init_metadata(metadata));
115
116 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
117 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
118
119 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
120 "payload");
121 journal::AppendBuffers append_buffers;
122 append_buffers = {append_buffer1};
123 lock->Lock();
124 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
125 ASSERT_EQ(1U, object->get_pending_appends());
126
127 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
128 "payload");
129 append_buffers = {append_buffer2};
130 lock->Lock();
131 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
132 ASSERT_EQ(2U, object->get_pending_appends());
133
134 C_SaferCond cond;
135 append_buffer2.first->flush(&cond);
136 ASSERT_EQ(0, cond.wait());
137 ASSERT_EQ(0U, object->get_pending_appends());
138}
139
140TEST_F(TestObjectRecorder, AppendFlushByCount) {
141 std::string oid = get_temp_oid();
142 ASSERT_EQ(0, create(oid));
143 ASSERT_EQ(0, client_register(oid));
144 journal::JournalMetadataPtr metadata = create_metadata(oid);
145 ASSERT_EQ(0, init_metadata(metadata));
146
147 set_flush_interval(2);
148 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
149 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
150
151 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
152 "payload");
153 journal::AppendBuffers append_buffers;
154 append_buffers = {append_buffer1};
155 lock->Lock();
156 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
157 ASSERT_EQ(1U, object->get_pending_appends());
158
159 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
160 "payload");
161 append_buffers = {append_buffer2};
162 lock->Lock();
163 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
164 ASSERT_EQ(0U, object->get_pending_appends());
165
166 C_SaferCond cond;
167 append_buffer2.first->wait(&cond);
168 ASSERT_EQ(0, cond.wait());
169}
170
171TEST_F(TestObjectRecorder, AppendFlushByBytes) {
172 std::string oid = get_temp_oid();
173 ASSERT_EQ(0, create(oid));
174 ASSERT_EQ(0, client_register(oid));
175 journal::JournalMetadataPtr metadata = create_metadata(oid);
176 ASSERT_EQ(0, init_metadata(metadata));
177
178 set_flush_bytes(10);
179 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
180 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
181
182 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
183 "payload");
184 journal::AppendBuffers append_buffers;
185 append_buffers = {append_buffer1};
186 lock->Lock();
187 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
188 ASSERT_EQ(1U, object->get_pending_appends());
189
190 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
191 "payload");
192 append_buffers = {append_buffer2};
193 lock->Lock();
194 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
195 ASSERT_EQ(0U, object->get_pending_appends());
196
197 C_SaferCond cond;
198 append_buffer2.first->wait(&cond);
199 ASSERT_EQ(0, cond.wait());
200}
201
202TEST_F(TestObjectRecorder, AppendFlushByAge) {
203 std::string oid = get_temp_oid();
204 ASSERT_EQ(0, create(oid));
205 ASSERT_EQ(0, client_register(oid));
206 journal::JournalMetadataPtr metadata = create_metadata(oid);
207 ASSERT_EQ(0, init_metadata(metadata));
208
209 set_flush_age(0.1);
210 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
211 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
212
213 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
214 "payload");
215 journal::AppendBuffers append_buffers;
216 append_buffers = {append_buffer1};
217 lock->Lock();
218 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
219
220 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
221 "payload");
222 append_buffers = {append_buffer2};
223 lock->Lock();
224 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
225
226 C_SaferCond cond;
227 append_buffer2.first->wait(&cond);
228 ASSERT_EQ(0, cond.wait());
229 ASSERT_EQ(0U, object->get_pending_appends());
230}
231
232TEST_F(TestObjectRecorder, AppendFilledObject) {
233 std::string oid = get_temp_oid();
234 ASSERT_EQ(0, create(oid));
235 ASSERT_EQ(0, client_register(oid));
236 journal::JournalMetadataPtr metadata = create_metadata(oid);
237 ASSERT_EQ(0, init_metadata(metadata));
238
239 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
240 journal::ObjectRecorderPtr object = create_object(oid, 12, lock);
241
242 std::string payload(2048, '1');
243 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
244 payload);
245 journal::AppendBuffers append_buffers;
246 append_buffers = {append_buffer1};
247 lock->Lock();
248 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
249
250 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
251 payload);
252 append_buffers = {append_buffer2};
253 lock->Lock();
254 ASSERT_TRUE(object->append_unlock(std::move(append_buffers)));
255
256 C_SaferCond cond;
257 append_buffer2.first->wait(&cond);
258 ASSERT_EQ(0, cond.wait());
259 ASSERT_EQ(0U, object->get_pending_appends());
260}
261
262TEST_F(TestObjectRecorder, Flush) {
263 std::string oid = get_temp_oid();
264 ASSERT_EQ(0, create(oid));
265 ASSERT_EQ(0, client_register(oid));
266 journal::JournalMetadataPtr metadata = create_metadata(oid);
267 ASSERT_EQ(0, init_metadata(metadata));
268
269 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
270 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
271
272 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
273 "payload");
274 journal::AppendBuffers append_buffers;
275 append_buffers = {append_buffer1};
276 lock->Lock();
277 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
278 ASSERT_EQ(1U, object->get_pending_appends());
279
280 C_SaferCond cond1;
281 object->flush(&cond1);
282 ASSERT_EQ(0, cond1.wait());
283
284 C_SaferCond cond2;
285 append_buffer1.first->wait(&cond2);
286 ASSERT_EQ(0, cond2.wait());
287 ASSERT_EQ(0U, object->get_pending_appends());
288}
289
290TEST_F(TestObjectRecorder, FlushFuture) {
291 std::string oid = get_temp_oid();
292 ASSERT_EQ(0, create(oid));
293 ASSERT_EQ(0, client_register(oid));
294 journal::JournalMetadataPtr metadata = create_metadata(oid);
295 ASSERT_EQ(0, init_metadata(metadata));
296
297 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
298 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
299
300 journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
301 "payload");
302 journal::AppendBuffers append_buffers;
303 append_buffers = {append_buffer};
304 lock->Lock();
305 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
306 ASSERT_EQ(1U, object->get_pending_appends());
307
308 C_SaferCond cond;
309 append_buffer.first->wait(&cond);
310 lock->Lock();
311 object->flush(append_buffer.first);
312 ASSERT_TRUE(lock->is_locked());
313 lock->Unlock();
314 ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
315 append_buffer.first->is_complete());
316 ASSERT_EQ(0, cond.wait());
317}
318
319TEST_F(TestObjectRecorder, FlushDetachedFuture) {
320 std::string oid = get_temp_oid();
321 ASSERT_EQ(0, create(oid));
322 ASSERT_EQ(0, client_register(oid));
323 journal::JournalMetadataPtr metadata = create_metadata(oid);
324 ASSERT_EQ(0, init_metadata(metadata));
325
326 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
327 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
328
329 journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
330 "payload");
331
332 journal::AppendBuffers append_buffers;
333 append_buffers = {append_buffer};
334
335 lock->Lock();
336 object->flush(append_buffer.first);
337 ASSERT_TRUE(lock->is_locked());
338 lock->Unlock();
339 ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
340 lock->Lock();
341 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
342
343 // should automatically flush once its attached to the object
344 C_SaferCond cond;
345 append_buffer.first->wait(&cond);
346 ASSERT_EQ(0, cond.wait());
347}
348
349TEST_F(TestObjectRecorder, Close) {
350 std::string oid = get_temp_oid();
351 ASSERT_EQ(0, create(oid));
352 ASSERT_EQ(0, client_register(oid));
353 journal::JournalMetadataPtr metadata = create_metadata(oid);
354 ASSERT_EQ(0, init_metadata(metadata));
355
356 set_flush_interval(2);
357 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
358 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
359
360 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
361 "payload");
362 journal::AppendBuffers append_buffers;
363 append_buffers = {append_buffer1};
364 lock->Lock();
365 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
366 ASSERT_EQ(1U, object->get_pending_appends());
367
368 lock->Lock();
369 ASSERT_FALSE(object->close());
370 ASSERT_TRUE(lock->is_locked());
371 lock->Unlock();
372
373 {
374 Mutex::Locker locker(m_handler.lock);
375 while (!m_handler.is_closed) {
376 if (m_handler.cond.WaitInterval(
377 m_handler.lock, utime_t(10, 0)) != 0) {
378 break;
379 }
380 }
381 }
382
383 ASSERT_TRUE(m_handler.is_closed);
384 ASSERT_EQ(0U, object->get_pending_appends());
385}
386
387TEST_F(TestObjectRecorder, Overflow) {
388 std::string oid = get_temp_oid();
389 ASSERT_EQ(0, create(oid));
390 ASSERT_EQ(0, client_register(oid));
391 journal::JournalMetadataPtr metadata = create_metadata(oid);
392 ASSERT_EQ(0, init_metadata(metadata));
393
394 shared_ptr<Mutex> lock1(new Mutex("object_recorder_lock_1"));
395 journal::ObjectRecorderPtr object1 = create_object(oid, 12, lock1);
396 shared_ptr<Mutex> lock2(new Mutex("object_recorder_lock_2"));
397 journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2);
398
399 std::string payload(2048, '1');
400 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
401 payload);
402 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
403 payload);
404 journal::AppendBuffers append_buffers;
405 append_buffers = {append_buffer1, append_buffer2};
406 lock1->Lock();
407 ASSERT_TRUE(object1->append_unlock(std::move(append_buffers)));
408
409 C_SaferCond cond;
410 append_buffer2.first->wait(&cond);
411 ASSERT_EQ(0, cond.wait());
412 ASSERT_EQ(0U, object1->get_pending_appends());
413
414 journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,
415 payload);
416 append_buffers = {append_buffer3};
417
418 lock2->Lock();
419 ASSERT_FALSE(object2->append_unlock(std::move(append_buffers)));
420 append_buffer3.first->flush(NULL);
421
422 bool overflowed = false;
423 {
424 Mutex::Locker locker(m_handler.lock);
425 while (m_handler.overflows == 0) {
426 if (m_handler.cond.WaitInterval(
427 m_handler.lock, utime_t(10, 0)) != 0) {
428 break;
429 }
430 }
431 if (m_handler.overflows != 0) {
432 overflowed = true;
433 }
434 }
435
436 ASSERT_TRUE(overflowed);
437}