]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/journal/test_ObjectRecorder.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / journal / test_ObjectRecorder.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "journal/ObjectRecorder.h"
5#include "common/Cond.h"
9f95a23c 6#include "common/ceph_mutex.h"
7c673cae
FG
7#include "common/Timer.h"
8#include "gtest/gtest.h"
9#include "test/librados/test.h"
10#include "test/journal/RadosTestFixture.h"
11#include <limits>
12
20effc67 13using namespace std::chrono_literals;
7c673cae
FG
14using std::shared_ptr;
15
16class TestObjectRecorder : public RadosTestFixture {
17public:
9f95a23c 18 TestObjectRecorder() = default;
7c673cae
FG
19
20 struct Handler : public journal::ObjectRecorder::Handler {
9f95a23c
TL
21 ceph::mutex lock = ceph::make_mutex("lock");
22 ceph::mutex* object_lock = nullptr;
23 ceph::condition_variable cond;
7c673cae
FG
24 bool is_closed = false;
25 uint32_t overflows = 0;
26
9f95a23c 27 Handler() = default;
7c673cae
FG
28
29 void closed(journal::ObjectRecorder *object_recorder) override {
9f95a23c 30 std::lock_guard locker{lock};
7c673cae 31 is_closed = true;
9f95a23c 32 cond.notify_all();
7c673cae
FG
33 }
34 void overflow(journal::ObjectRecorder *object_recorder) override {
9f95a23c 35 std::lock_guard locker{lock};
7c673cae 36 journal::AppendBuffers append_buffers;
9f95a23c 37 object_lock->lock();
7c673cae 38 object_recorder->claim_append_buffers(&append_buffers);
9f95a23c 39 object_lock->unlock();
7c673cae
FG
40
41 ++overflows;
9f95a23c 42 cond.notify_all();
7c673cae
FG
43 }
44 };
45
9f95a23c
TL
46 // flush the pending buffers in dtor
47 class ObjectRecorderFlusher {
48 public:
49 ObjectRecorderFlusher(librados::IoCtx& ioctx,
50 ContextWQ* work_queue)
51 : m_ioctx{ioctx},
52 m_work_queue{work_queue}
53 {}
54 ObjectRecorderFlusher(librados::IoCtx& ioctx,
55 ContextWQ* work_queue,
56 uint32_t flush_interval,
57 uint16_t flush_bytes,
58 double flush_age,
59 int max_in_flight)
60 : m_ioctx{ioctx},
61 m_work_queue{work_queue},
62 m_flush_interval{flush_interval},
63 m_flush_bytes{flush_bytes},
64 m_flush_age{flush_age},
65 m_max_in_flight_appends{max_in_flight < 0 ?
66 std::numeric_limits<uint64_t>::max() :
67 static_cast<uint64_t>(max_in_flight)}
68 {}
69 ~ObjectRecorderFlusher() {
70 for (auto& [object_recorder, m] : m_object_recorders) {
71 C_SaferCond cond;
72 object_recorder->flush(&cond);
73 cond.wait();
74 std::scoped_lock l{*m};
75 if (!object_recorder->is_closed()) {
76 object_recorder->close();
77 }
78 }
7c673cae 79 }
9f95a23c
TL
80 auto create_object(std::string_view oid, uint8_t order, ceph::mutex* lock) {
81 auto object = ceph::make_ref<journal::ObjectRecorder>(
82 m_ioctx, oid, 0, lock, m_work_queue, &m_handler,
83 order, m_max_in_flight_appends);
84 {
85 std::lock_guard locker{*lock};
86 object->set_append_batch_options(m_flush_interval,
87 m_flush_bytes,
88 m_flush_age);
89 }
90 m_object_recorders.emplace_back(object, lock);
91 m_handler.object_lock = lock;
92 return object;
93 }
94 bool wait_for_closed() {
95 std::unique_lock locker{m_handler.lock};
96 return m_handler.cond.wait_for(locker, 10s,
97 [this] { return m_handler.is_closed; });
98 }
99 bool wait_for_overflow() {
100 std::unique_lock locker{m_handler.lock};
101 if (m_handler.cond.wait_for(locker, 10s,
102 [this] { return m_handler.overflows > 0; })) {
103 m_handler.overflows = 0;
104 return true;
105 } else {
106 return false;
107 }
108 }
109 private:
110 librados::IoCtx& m_ioctx;
111 ContextWQ *m_work_queue;
112 uint32_t m_flush_interval = std::numeric_limits<uint32_t>::max();
113 uint64_t m_flush_bytes = std::numeric_limits<uint64_t>::max();
114 double m_flush_age = 600;
115 uint64_t m_max_in_flight_appends = 0;
116 using ObjectRecorders =
117 std::list<std::pair<ceph::ref_t<journal::ObjectRecorder>, ceph::mutex*>>;
118 ObjectRecorders m_object_recorders;
119 Handler m_handler;
120 };
7c673cae 121
9f95a23c
TL
122 journal::AppendBuffer create_append_buffer(uint64_t tag_tid,
123 uint64_t entry_tid,
7c673cae 124 const std::string &payload) {
9f95a23c
TL
125 auto future = ceph::make_ref<journal::FutureImpl>(tag_tid, entry_tid, 456);
126 future->init(ceph::ref_t<journal::FutureImpl>());
7c673cae
FG
127
128 bufferlist bl;
129 bl.append(payload);
130 return std::make_pair(future, bl);
131 }
7c673cae
FG
132};
133
134TEST_F(TestObjectRecorder, Append) {
135 std::string oid = get_temp_oid();
136 ASSERT_EQ(0, create(oid));
137 ASSERT_EQ(0, client_register(oid));
9f95a23c 138 auto metadata = create_metadata(oid);
7c673cae
FG
139 ASSERT_EQ(0, init_metadata(metadata));
140
9f95a23c
TL
141 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
142 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0, 0);
143 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
144
145 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
146 "payload");
147 journal::AppendBuffers append_buffers;
148 append_buffers = {append_buffer1};
9f95a23c 149 lock.lock();
494da23a 150 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 151 lock.unlock();
494da23a 152 ASSERT_EQ(0U, object->get_pending_appends());
7c673cae
FG
153
154 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
155 "payload");
156 append_buffers = {append_buffer2};
9f95a23c 157 lock.lock();
494da23a 158 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 159 lock.unlock();
494da23a 160 ASSERT_EQ(0U, object->get_pending_appends());
7c673cae
FG
161
162 C_SaferCond cond;
163 append_buffer2.first->flush(&cond);
164 ASSERT_EQ(0, cond.wait());
165 ASSERT_EQ(0U, object->get_pending_appends());
166}
167
168TEST_F(TestObjectRecorder, AppendFlushByCount) {
169 std::string oid = get_temp_oid();
170 ASSERT_EQ(0, create(oid));
171 ASSERT_EQ(0, client_register(oid));
9f95a23c 172 auto metadata = create_metadata(oid);
7c673cae
FG
173 ASSERT_EQ(0, init_metadata(metadata));
174
9f95a23c
TL
175 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
176 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1);
177 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
178
179 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
180 "payload");
181 journal::AppendBuffers append_buffers;
182 append_buffers = {append_buffer1};
9f95a23c 183 lock.lock();
494da23a 184 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 185 lock.unlock();
7c673cae
FG
186 ASSERT_EQ(1U, object->get_pending_appends());
187
188 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
189 "payload");
190 append_buffers = {append_buffer2};
9f95a23c 191 lock.lock();
494da23a 192 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 193 lock.unlock();
7c673cae
FG
194 ASSERT_EQ(0U, object->get_pending_appends());
195
196 C_SaferCond cond;
197 append_buffer2.first->wait(&cond);
198 ASSERT_EQ(0, cond.wait());
199}
200
201TEST_F(TestObjectRecorder, AppendFlushByBytes) {
202 std::string oid = get_temp_oid();
203 ASSERT_EQ(0, create(oid));
204 ASSERT_EQ(0, client_register(oid));
9f95a23c 205 auto metadata = create_metadata(oid);
7c673cae
FG
206 ASSERT_EQ(0, init_metadata(metadata));
207
9f95a23c
TL
208 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
209 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
210 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
211
212 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
213 "payload");
214 journal::AppendBuffers append_buffers;
215 append_buffers = {append_buffer1};
9f95a23c 216 lock.lock();
494da23a 217 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 218 lock.unlock();
7c673cae
FG
219 ASSERT_EQ(1U, object->get_pending_appends());
220
221 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
222 "payload");
223 append_buffers = {append_buffer2};
9f95a23c 224 lock.lock();
494da23a 225 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 226 lock.unlock();
7c673cae
FG
227 ASSERT_EQ(0U, object->get_pending_appends());
228
229 C_SaferCond cond;
230 append_buffer2.first->wait(&cond);
231 ASSERT_EQ(0, cond.wait());
232}
233
234TEST_F(TestObjectRecorder, AppendFlushByAge) {
235 std::string oid = get_temp_oid();
236 ASSERT_EQ(0, create(oid));
237 ASSERT_EQ(0, client_register(oid));
9f95a23c 238 auto metadata = create_metadata(oid);
7c673cae
FG
239 ASSERT_EQ(0, init_metadata(metadata));
240
9f95a23c
TL
241 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
242 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0005, -1);
243 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
244
245 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
246 "payload");
247 journal::AppendBuffers append_buffers;
248 append_buffers = {append_buffer1};
9f95a23c 249 lock.lock();
494da23a 250 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 251 lock.unlock();
7c673cae 252
9f95a23c
TL
253 uint32_t offset = 0;
254 journal::AppendBuffer append_buffer2;
255 while (!append_buffer1.first->is_flush_in_progress() &&
256 !append_buffer1.first->is_complete()) {
257 usleep(1000);
258
259 append_buffer2 = create_append_buffer(234, 124 + offset, "payload");
260 ++offset;
261 append_buffers = {append_buffer2};
262
263 lock.lock();
264 ASSERT_FALSE(object->append(std::move(append_buffers)));
265 lock.unlock();
266 }
7c673cae
FG
267
268 C_SaferCond cond;
269 append_buffer2.first->wait(&cond);
270 ASSERT_EQ(0, cond.wait());
271 ASSERT_EQ(0U, object->get_pending_appends());
272}
273
274TEST_F(TestObjectRecorder, AppendFilledObject) {
275 std::string oid = get_temp_oid();
276 ASSERT_EQ(0, create(oid));
277 ASSERT_EQ(0, client_register(oid));
9f95a23c 278 auto metadata = create_metadata(oid);
7c673cae
FG
279 ASSERT_EQ(0, init_metadata(metadata));
280
9f95a23c
TL
281 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
282 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0, -1);
283 auto object = flusher.create_object(oid, 12, &lock);
7c673cae
FG
284
285 std::string payload(2048, '1');
286 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
287 payload);
288 journal::AppendBuffers append_buffers;
289 append_buffers = {append_buffer1};
9f95a23c 290 lock.lock();
494da23a 291 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 292 lock.unlock();
7c673cae
FG
293
294 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
295 payload);
296 append_buffers = {append_buffer2};
9f95a23c 297 lock.lock();
494da23a 298 ASSERT_TRUE(object->append(std::move(append_buffers)));
9f95a23c 299 lock.unlock();
7c673cae
FG
300
301 C_SaferCond cond;
302 append_buffer2.first->wait(&cond);
303 ASSERT_EQ(0, cond.wait());
304 ASSERT_EQ(0U, object->get_pending_appends());
305}
306
307TEST_F(TestObjectRecorder, Flush) {
308 std::string oid = get_temp_oid();
309 ASSERT_EQ(0, create(oid));
310 ASSERT_EQ(0, client_register(oid));
9f95a23c 311 auto metadata = create_metadata(oid);
7c673cae
FG
312 ASSERT_EQ(0, init_metadata(metadata));
313
9f95a23c
TL
314 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
315 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
316 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
317
318 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
319 "payload");
320 journal::AppendBuffers append_buffers;
321 append_buffers = {append_buffer1};
9f95a23c 322 lock.lock();
494da23a 323 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 324 lock.unlock();
7c673cae
FG
325 ASSERT_EQ(1U, object->get_pending_appends());
326
327 C_SaferCond cond1;
328 object->flush(&cond1);
329 ASSERT_EQ(0, cond1.wait());
330
331 C_SaferCond cond2;
332 append_buffer1.first->wait(&cond2);
333 ASSERT_EQ(0, cond2.wait());
334 ASSERT_EQ(0U, object->get_pending_appends());
335}
336
337TEST_F(TestObjectRecorder, FlushFuture) {
338 std::string oid = get_temp_oid();
339 ASSERT_EQ(0, create(oid));
340 ASSERT_EQ(0, client_register(oid));
9f95a23c 341 auto metadata = create_metadata(oid);
7c673cae
FG
342 ASSERT_EQ(0, init_metadata(metadata));
343
9f95a23c
TL
344 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
345 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
346 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
347
348 journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
349 "payload");
350 journal::AppendBuffers append_buffers;
351 append_buffers = {append_buffer};
9f95a23c 352 lock.lock();
494da23a 353 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 354 lock.unlock();
7c673cae
FG
355 ASSERT_EQ(1U, object->get_pending_appends());
356
357 C_SaferCond cond;
358 append_buffer.first->wait(&cond);
7c673cae 359 object->flush(append_buffer.first);
7c673cae
FG
360 ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
361 append_buffer.first->is_complete());
362 ASSERT_EQ(0, cond.wait());
363}
364
365TEST_F(TestObjectRecorder, FlushDetachedFuture) {
366 std::string oid = get_temp_oid();
367 ASSERT_EQ(0, create(oid));
368 ASSERT_EQ(0, client_register(oid));
9f95a23c 369 auto metadata = create_metadata(oid);
7c673cae
FG
370 ASSERT_EQ(0, init_metadata(metadata));
371
9f95a23c
TL
372 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
373 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
374 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
375
376 journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
377 "payload");
378
379 journal::AppendBuffers append_buffers;
380 append_buffers = {append_buffer};
381
7c673cae 382 object->flush(append_buffer.first);
7c673cae 383 ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
9f95a23c 384 lock.lock();
494da23a 385 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 386 lock.unlock();
7c673cae
FG
387
388 // should automatically flush once its attached to the object
389 C_SaferCond cond;
390 append_buffer.first->wait(&cond);
391 ASSERT_EQ(0, cond.wait());
392}
393
394TEST_F(TestObjectRecorder, Close) {
395 std::string oid = get_temp_oid();
396 ASSERT_EQ(0, create(oid));
397 ASSERT_EQ(0, client_register(oid));
9f95a23c 398 auto metadata = create_metadata(oid);
7c673cae
FG
399 ASSERT_EQ(0, init_metadata(metadata));
400
9f95a23c
TL
401 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
402 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1);
403 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
404
405 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
406 "payload");
407 journal::AppendBuffers append_buffers;
408 append_buffers = {append_buffer1};
9f95a23c 409 lock.lock();
494da23a 410 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 411 lock.unlock();
7c673cae
FG
412 ASSERT_EQ(1U, object->get_pending_appends());
413
9f95a23c 414 lock.lock();
7c673cae 415 ASSERT_FALSE(object->close());
9f95a23c
TL
416 ASSERT_TRUE(ceph_mutex_is_locked(lock));
417 lock.unlock();
418
419 ASSERT_TRUE(flusher.wait_for_closed());
7c673cae 420
7c673cae
FG
421 ASSERT_EQ(0U, object->get_pending_appends());
422}
423
424TEST_F(TestObjectRecorder, Overflow) {
425 std::string oid = get_temp_oid();
426 ASSERT_EQ(0, create(oid));
427 ASSERT_EQ(0, client_register(oid));
9f95a23c 428 auto metadata = create_metadata(oid);
7c673cae
FG
429 ASSERT_EQ(0, init_metadata(metadata));
430
9f95a23c
TL
431 ceph::mutex lock1 = ceph::make_mutex("object_recorder_lock_1");
432 ceph::mutex lock2 = ceph::make_mutex("object_recorder_lock_2");
433
434 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
435 auto object1 = flusher.create_object(oid, 12, &lock1);
7c673cae 436
494da23a 437 std::string payload(1 << 11, '1');
7c673cae
FG
438 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
439 payload);
440 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
441 payload);
442 journal::AppendBuffers append_buffers;
443 append_buffers = {append_buffer1, append_buffer2};
9f95a23c 444 lock1.lock();
494da23a 445 ASSERT_TRUE(object1->append(std::move(append_buffers)));
9f95a23c 446 lock1.unlock();
7c673cae
FG
447
448 C_SaferCond cond;
449 append_buffer2.first->wait(&cond);
450 ASSERT_EQ(0, cond.wait());
451 ASSERT_EQ(0U, object1->get_pending_appends());
452
9f95a23c 453 auto object2 = flusher.create_object(oid, 12, &lock2);
494da23a 454
7c673cae
FG
455 journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,
456 payload);
457 append_buffers = {append_buffer3};
9f95a23c 458 lock2.lock();
494da23a 459 ASSERT_FALSE(object2->append(std::move(append_buffers)));
9f95a23c 460 lock2.unlock();
7c673cae
FG
461 append_buffer3.first->flush(NULL);
462
9f95a23c 463 ASSERT_TRUE(flusher.wait_for_overflow());
7c673cae 464}