]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/journal/test_ObjectRecorder.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / test / journal / test_ObjectRecorder.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "journal/ObjectRecorder.h"
5#include "common/Cond.h"
6#include "common/Mutex.h"
7#include "common/Timer.h"
8#include "gtest/gtest.h"
9#include "test/librados/test.h"
10#include "test/journal/RadosTestFixture.h"
11#include <limits>
12
13using std::shared_ptr;
14
15class TestObjectRecorder : public RadosTestFixture {
16public:
17 TestObjectRecorder()
18 : m_flush_interval(std::numeric_limits<uint32_t>::max()),
19 m_flush_bytes(std::numeric_limits<uint64_t>::max()),
20 m_flush_age(600)
21 {
22 }
23
24 struct Handler : public journal::ObjectRecorder::Handler {
25 Mutex lock;
26 shared_ptr<Mutex> object_lock;
27 Cond cond;
28 bool is_closed = false;
29 uint32_t overflows = 0;
30
31 Handler() : lock("lock") {
32 }
33
34 void closed(journal::ObjectRecorder *object_recorder) override {
35 Mutex::Locker locker(lock);
36 is_closed = true;
37 cond.Signal();
38 }
39 void overflow(journal::ObjectRecorder *object_recorder) override {
40 Mutex::Locker locker(lock);
41 journal::AppendBuffers append_buffers;
42 object_lock->Lock();
43 object_recorder->claim_append_buffers(&append_buffers);
44 object_lock->Unlock();
45
46 ++overflows;
47 cond.Signal();
48 }
49 };
50
51 typedef std::list<journal::ObjectRecorderPtr> ObjectRecorders;
52 typedef std::map<std::string, shared_ptr<Mutex>> ObjectRecorderLocksMap;
53
54 ObjectRecorders m_object_recorders;
55 ObjectRecorderLocksMap m_object_recorder_locks;
56
57 uint32_t m_flush_interval;
58 uint64_t m_flush_bytes;
59 double m_flush_age;
60 Handler m_handler;
61
62 void TearDown() override {
63 for (ObjectRecorders::iterator it = m_object_recorders.begin();
64 it != m_object_recorders.end(); ++it) {
65 C_SaferCond cond;
66 (*it)->flush(&cond);
67 cond.wait();
68 }
69 m_object_recorders.clear();
70
71 RadosTestFixture::TearDown();
72 }
73
74 inline void set_flush_interval(uint32_t i) {
75 m_flush_interval = i;
76 }
77 inline void set_flush_bytes(uint64_t i) {
78 m_flush_bytes = i;
79 }
80 inline void set_flush_age(double i) {
81 m_flush_age = i;
82 }
83
84 journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
85 const std::string &payload) {
86 journal::FutureImplPtr future(new journal::FutureImpl(tag_tid, entry_tid,
87 456));
88 future->init(journal::FutureImplPtr());
89
90 bufferlist bl;
91 bl.append(payload);
92 return std::make_pair(future, bl);
93 }
94
95 journal::ObjectRecorderPtr create_object(const std::string &oid,
96 uint8_t order, shared_ptr<Mutex> lock) {
97 journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
98 m_ioctx, oid, 0, lock, m_work_queue, *m_timer, m_timer_lock, &m_handler,
99 order, m_flush_interval, m_flush_bytes, m_flush_age));
100 m_object_recorders.push_back(object);
101 m_object_recorder_locks.insert(std::make_pair(oid, lock));
102 m_handler.object_lock = lock;
103 return object;
104 }
105};
106
107TEST_F(TestObjectRecorder, Append) {
108 std::string oid = get_temp_oid();
109 ASSERT_EQ(0, create(oid));
110 ASSERT_EQ(0, client_register(oid));
111 journal::JournalMetadataPtr metadata = create_metadata(oid);
112 ASSERT_EQ(0, init_metadata(metadata));
113
114 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
115 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
116
117 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
118 "payload");
119 journal::AppendBuffers append_buffers;
120 append_buffers = {append_buffer1};
121 lock->Lock();
122 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
123 ASSERT_EQ(1U, object->get_pending_appends());
124
125 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
126 "payload");
127 append_buffers = {append_buffer2};
128 lock->Lock();
129 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
130 ASSERT_EQ(2U, object->get_pending_appends());
131
132 C_SaferCond cond;
133 append_buffer2.first->flush(&cond);
134 ASSERT_EQ(0, cond.wait());
135 ASSERT_EQ(0U, object->get_pending_appends());
136}
137
138TEST_F(TestObjectRecorder, AppendFlushByCount) {
139 std::string oid = get_temp_oid();
140 ASSERT_EQ(0, create(oid));
141 ASSERT_EQ(0, client_register(oid));
142 journal::JournalMetadataPtr metadata = create_metadata(oid);
143 ASSERT_EQ(0, init_metadata(metadata));
144
145 set_flush_interval(2);
146 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
147 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
148
149 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
150 "payload");
151 journal::AppendBuffers append_buffers;
152 append_buffers = {append_buffer1};
153 lock->Lock();
154 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
155 ASSERT_EQ(1U, object->get_pending_appends());
156
157 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
158 "payload");
159 append_buffers = {append_buffer2};
160 lock->Lock();
161 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
162 ASSERT_EQ(0U, object->get_pending_appends());
163
164 C_SaferCond cond;
165 append_buffer2.first->wait(&cond);
166 ASSERT_EQ(0, cond.wait());
167}
168
169TEST_F(TestObjectRecorder, AppendFlushByBytes) {
170 std::string oid = get_temp_oid();
171 ASSERT_EQ(0, create(oid));
172 ASSERT_EQ(0, client_register(oid));
173 journal::JournalMetadataPtr metadata = create_metadata(oid);
174 ASSERT_EQ(0, init_metadata(metadata));
175
176 set_flush_bytes(10);
177 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
178 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
179
180 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
181 "payload");
182 journal::AppendBuffers append_buffers;
183 append_buffers = {append_buffer1};
184 lock->Lock();
185 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
186 ASSERT_EQ(1U, object->get_pending_appends());
187
188 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
189 "payload");
190 append_buffers = {append_buffer2};
191 lock->Lock();
192 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
193 ASSERT_EQ(0U, object->get_pending_appends());
194
195 C_SaferCond cond;
196 append_buffer2.first->wait(&cond);
197 ASSERT_EQ(0, cond.wait());
198}
199
200TEST_F(TestObjectRecorder, AppendFlushByAge) {
201 std::string oid = get_temp_oid();
202 ASSERT_EQ(0, create(oid));
203 ASSERT_EQ(0, client_register(oid));
204 journal::JournalMetadataPtr metadata = create_metadata(oid);
205 ASSERT_EQ(0, init_metadata(metadata));
206
207 set_flush_age(0.1);
208 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
209 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
210
211 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
212 "payload");
213 journal::AppendBuffers append_buffers;
214 append_buffers = {append_buffer1};
215 lock->Lock();
216 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
217
218 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
219 "payload");
220 append_buffers = {append_buffer2};
221 lock->Lock();
222 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
223
224 C_SaferCond cond;
225 append_buffer2.first->wait(&cond);
226 ASSERT_EQ(0, cond.wait());
227 ASSERT_EQ(0U, object->get_pending_appends());
228}
229
230TEST_F(TestObjectRecorder, AppendFilledObject) {
231 std::string oid = get_temp_oid();
232 ASSERT_EQ(0, create(oid));
233 ASSERT_EQ(0, client_register(oid));
234 journal::JournalMetadataPtr metadata = create_metadata(oid);
235 ASSERT_EQ(0, init_metadata(metadata));
236
237 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
238 journal::ObjectRecorderPtr object = create_object(oid, 12, lock);
239
240 std::string payload(2048, '1');
241 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
242 payload);
243 journal::AppendBuffers append_buffers;
244 append_buffers = {append_buffer1};
245 lock->Lock();
246 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
247
248 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
249 payload);
250 append_buffers = {append_buffer2};
251 lock->Lock();
252 ASSERT_TRUE(object->append_unlock(std::move(append_buffers)));
253
254 C_SaferCond cond;
255 append_buffer2.first->wait(&cond);
256 ASSERT_EQ(0, cond.wait());
257 ASSERT_EQ(0U, object->get_pending_appends());
258}
259
260TEST_F(TestObjectRecorder, Flush) {
261 std::string oid = get_temp_oid();
262 ASSERT_EQ(0, create(oid));
263 ASSERT_EQ(0, client_register(oid));
264 journal::JournalMetadataPtr metadata = create_metadata(oid);
265 ASSERT_EQ(0, init_metadata(metadata));
266
267 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
268 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
269
270 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
271 "payload");
272 journal::AppendBuffers append_buffers;
273 append_buffers = {append_buffer1};
274 lock->Lock();
275 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
276 ASSERT_EQ(1U, object->get_pending_appends());
277
278 C_SaferCond cond1;
279 object->flush(&cond1);
280 ASSERT_EQ(0, cond1.wait());
281
282 C_SaferCond cond2;
283 append_buffer1.first->wait(&cond2);
284 ASSERT_EQ(0, cond2.wait());
285 ASSERT_EQ(0U, object->get_pending_appends());
286}
287
288TEST_F(TestObjectRecorder, FlushFuture) {
289 std::string oid = get_temp_oid();
290 ASSERT_EQ(0, create(oid));
291 ASSERT_EQ(0, client_register(oid));
292 journal::JournalMetadataPtr metadata = create_metadata(oid);
293 ASSERT_EQ(0, init_metadata(metadata));
294
295 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
296 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
297
298 journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
299 "payload");
300 journal::AppendBuffers append_buffers;
301 append_buffers = {append_buffer};
302 lock->Lock();
303 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
304 ASSERT_EQ(1U, object->get_pending_appends());
305
306 C_SaferCond cond;
307 append_buffer.first->wait(&cond);
308 lock->Lock();
309 object->flush(append_buffer.first);
310 ASSERT_TRUE(lock->is_locked());
311 lock->Unlock();
312 ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
313 append_buffer.first->is_complete());
314 ASSERT_EQ(0, cond.wait());
315}
316
317TEST_F(TestObjectRecorder, FlushDetachedFuture) {
318 std::string oid = get_temp_oid();
319 ASSERT_EQ(0, create(oid));
320 ASSERT_EQ(0, client_register(oid));
321 journal::JournalMetadataPtr metadata = create_metadata(oid);
322 ASSERT_EQ(0, init_metadata(metadata));
323
324 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
325 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
326
327 journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
328 "payload");
329
330 journal::AppendBuffers append_buffers;
331 append_buffers = {append_buffer};
332
333 lock->Lock();
334 object->flush(append_buffer.first);
335 ASSERT_TRUE(lock->is_locked());
336 lock->Unlock();
337 ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
338 lock->Lock();
339 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
340
341 // should automatically flush once its attached to the object
342 C_SaferCond cond;
343 append_buffer.first->wait(&cond);
344 ASSERT_EQ(0, cond.wait());
345}
346
347TEST_F(TestObjectRecorder, Close) {
348 std::string oid = get_temp_oid();
349 ASSERT_EQ(0, create(oid));
350 ASSERT_EQ(0, client_register(oid));
351 journal::JournalMetadataPtr metadata = create_metadata(oid);
352 ASSERT_EQ(0, init_metadata(metadata));
353
354 set_flush_interval(2);
355 shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
356 journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
357
358 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
359 "payload");
360 journal::AppendBuffers append_buffers;
361 append_buffers = {append_buffer1};
362 lock->Lock();
363 ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
364 ASSERT_EQ(1U, object->get_pending_appends());
365
366 lock->Lock();
367 ASSERT_FALSE(object->close());
368 ASSERT_TRUE(lock->is_locked());
369 lock->Unlock();
370
371 {
372 Mutex::Locker locker(m_handler.lock);
373 while (!m_handler.is_closed) {
374 if (m_handler.cond.WaitInterval(
375 m_handler.lock, utime_t(10, 0)) != 0) {
376 break;
377 }
378 }
379 }
380
381 ASSERT_TRUE(m_handler.is_closed);
382 ASSERT_EQ(0U, object->get_pending_appends());
383}
384
385TEST_F(TestObjectRecorder, Overflow) {
386 std::string oid = get_temp_oid();
387 ASSERT_EQ(0, create(oid));
388 ASSERT_EQ(0, client_register(oid));
389 journal::JournalMetadataPtr metadata = create_metadata(oid);
390 ASSERT_EQ(0, init_metadata(metadata));
391
392 shared_ptr<Mutex> lock1(new Mutex("object_recorder_lock_1"));
393 journal::ObjectRecorderPtr object1 = create_object(oid, 12, lock1);
394 shared_ptr<Mutex> lock2(new Mutex("object_recorder_lock_2"));
395 journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2);
396
397 std::string payload(2048, '1');
398 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
399 payload);
400 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
401 payload);
402 journal::AppendBuffers append_buffers;
403 append_buffers = {append_buffer1, append_buffer2};
404 lock1->Lock();
405 ASSERT_TRUE(object1->append_unlock(std::move(append_buffers)));
406
407 C_SaferCond cond;
408 append_buffer2.first->wait(&cond);
409 ASSERT_EQ(0, cond.wait());
410 ASSERT_EQ(0U, object1->get_pending_appends());
411
412 journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,
413 payload);
414 append_buffers = {append_buffer3};
415
416 lock2->Lock();
417 ASSERT_FALSE(object2->append_unlock(std::move(append_buffers)));
418 append_buffer3.first->flush(NULL);
419
420 bool overflowed = false;
421 {
422 Mutex::Locker locker(m_handler.lock);
423 while (m_handler.overflows == 0) {
424 if (m_handler.cond.WaitInterval(
425 m_handler.lock, utime_t(10, 0)) != 0) {
426 break;
427 }
428 }
429 if (m_handler.overflows != 0) {
430 overflowed = true;
431 }
432 }
433
434 ASSERT_TRUE(overflowed);
435}