]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/journal/test_ObjectRecorder.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / test / journal / test_ObjectRecorder.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "journal/ObjectRecorder.h"
5#include "common/Cond.h"
9f95a23c 6#include "common/ceph_mutex.h"
7c673cae
FG
7#include "common/Timer.h"
8#include "gtest/gtest.h"
9#include "test/librados/test.h"
10#include "test/journal/RadosTestFixture.h"
11#include <limits>
12
13using std::shared_ptr;
14
15class TestObjectRecorder : public RadosTestFixture {
16public:
9f95a23c 17 TestObjectRecorder() = default;
7c673cae
FG
18
19 struct Handler : public journal::ObjectRecorder::Handler {
9f95a23c
TL
20 ceph::mutex lock = ceph::make_mutex("lock");
21 ceph::mutex* object_lock = nullptr;
22 ceph::condition_variable cond;
7c673cae
FG
23 bool is_closed = false;
24 uint32_t overflows = 0;
25
9f95a23c 26 Handler() = default;
7c673cae
FG
27
28 void closed(journal::ObjectRecorder *object_recorder) override {
9f95a23c 29 std::lock_guard locker{lock};
7c673cae 30 is_closed = true;
9f95a23c 31 cond.notify_all();
7c673cae
FG
32 }
33 void overflow(journal::ObjectRecorder *object_recorder) override {
9f95a23c 34 std::lock_guard locker{lock};
7c673cae 35 journal::AppendBuffers append_buffers;
9f95a23c 36 object_lock->lock();
7c673cae 37 object_recorder->claim_append_buffers(&append_buffers);
9f95a23c 38 object_lock->unlock();
7c673cae
FG
39
40 ++overflows;
9f95a23c 41 cond.notify_all();
7c673cae
FG
42 }
43 };
44
9f95a23c
TL
45 // flush the pending buffers in dtor
46 class ObjectRecorderFlusher {
47 public:
48 ObjectRecorderFlusher(librados::IoCtx& ioctx,
49 ContextWQ* work_queue)
50 : m_ioctx{ioctx},
51 m_work_queue{work_queue}
52 {}
53 ObjectRecorderFlusher(librados::IoCtx& ioctx,
54 ContextWQ* work_queue,
55 uint32_t flush_interval,
56 uint16_t flush_bytes,
57 double flush_age,
58 int max_in_flight)
59 : m_ioctx{ioctx},
60 m_work_queue{work_queue},
61 m_flush_interval{flush_interval},
62 m_flush_bytes{flush_bytes},
63 m_flush_age{flush_age},
64 m_max_in_flight_appends{max_in_flight < 0 ?
65 std::numeric_limits<uint64_t>::max() :
66 static_cast<uint64_t>(max_in_flight)}
67 {}
68 ~ObjectRecorderFlusher() {
69 for (auto& [object_recorder, m] : m_object_recorders) {
70 C_SaferCond cond;
71 object_recorder->flush(&cond);
72 cond.wait();
73 std::scoped_lock l{*m};
74 if (!object_recorder->is_closed()) {
75 object_recorder->close();
76 }
77 }
7c673cae 78 }
9f95a23c
TL
79 auto create_object(std::string_view oid, uint8_t order, ceph::mutex* lock) {
80 auto object = ceph::make_ref<journal::ObjectRecorder>(
81 m_ioctx, oid, 0, lock, m_work_queue, &m_handler,
82 order, m_max_in_flight_appends);
83 {
84 std::lock_guard locker{*lock};
85 object->set_append_batch_options(m_flush_interval,
86 m_flush_bytes,
87 m_flush_age);
88 }
89 m_object_recorders.emplace_back(object, lock);
90 m_handler.object_lock = lock;
91 return object;
92 }
93 bool wait_for_closed() {
94 std::unique_lock locker{m_handler.lock};
95 return m_handler.cond.wait_for(locker, 10s,
96 [this] { return m_handler.is_closed; });
97 }
98 bool wait_for_overflow() {
99 std::unique_lock locker{m_handler.lock};
100 if (m_handler.cond.wait_for(locker, 10s,
101 [this] { return m_handler.overflows > 0; })) {
102 m_handler.overflows = 0;
103 return true;
104 } else {
105 return false;
106 }
107 }
108 private:
109 librados::IoCtx& m_ioctx;
110 ContextWQ *m_work_queue;
111 uint32_t m_flush_interval = std::numeric_limits<uint32_t>::max();
112 uint64_t m_flush_bytes = std::numeric_limits<uint64_t>::max();
113 double m_flush_age = 600;
114 uint64_t m_max_in_flight_appends = 0;
115 using ObjectRecorders =
116 std::list<std::pair<ceph::ref_t<journal::ObjectRecorder>, ceph::mutex*>>;
117 ObjectRecorders m_object_recorders;
118 Handler m_handler;
119 };
7c673cae 120
9f95a23c
TL
121 journal::AppendBuffer create_append_buffer(uint64_t tag_tid,
122 uint64_t entry_tid,
7c673cae 123 const std::string &payload) {
9f95a23c
TL
124 auto future = ceph::make_ref<journal::FutureImpl>(tag_tid, entry_tid, 456);
125 future->init(ceph::ref_t<journal::FutureImpl>());
7c673cae
FG
126
127 bufferlist bl;
128 bl.append(payload);
129 return std::make_pair(future, bl);
130 }
7c673cae
FG
131};
132
133TEST_F(TestObjectRecorder, Append) {
134 std::string oid = get_temp_oid();
135 ASSERT_EQ(0, create(oid));
136 ASSERT_EQ(0, client_register(oid));
9f95a23c 137 auto metadata = create_metadata(oid);
7c673cae
FG
138 ASSERT_EQ(0, init_metadata(metadata));
139
9f95a23c
TL
140 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
141 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0, 0);
142 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
143
144 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
145 "payload");
146 journal::AppendBuffers append_buffers;
147 append_buffers = {append_buffer1};
9f95a23c 148 lock.lock();
494da23a 149 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 150 lock.unlock();
494da23a 151 ASSERT_EQ(0U, object->get_pending_appends());
7c673cae
FG
152
153 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
154 "payload");
155 append_buffers = {append_buffer2};
9f95a23c 156 lock.lock();
494da23a 157 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 158 lock.unlock();
494da23a 159 ASSERT_EQ(0U, object->get_pending_appends());
7c673cae
FG
160
161 C_SaferCond cond;
162 append_buffer2.first->flush(&cond);
163 ASSERT_EQ(0, cond.wait());
164 ASSERT_EQ(0U, object->get_pending_appends());
165}
166
167TEST_F(TestObjectRecorder, AppendFlushByCount) {
168 std::string oid = get_temp_oid();
169 ASSERT_EQ(0, create(oid));
170 ASSERT_EQ(0, client_register(oid));
9f95a23c 171 auto metadata = create_metadata(oid);
7c673cae
FG
172 ASSERT_EQ(0, init_metadata(metadata));
173
9f95a23c
TL
174 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
175 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1);
176 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
177
178 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
179 "payload");
180 journal::AppendBuffers append_buffers;
181 append_buffers = {append_buffer1};
9f95a23c 182 lock.lock();
494da23a 183 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 184 lock.unlock();
7c673cae
FG
185 ASSERT_EQ(1U, object->get_pending_appends());
186
187 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
188 "payload");
189 append_buffers = {append_buffer2};
9f95a23c 190 lock.lock();
494da23a 191 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 192 lock.unlock();
7c673cae
FG
193 ASSERT_EQ(0U, object->get_pending_appends());
194
195 C_SaferCond cond;
196 append_buffer2.first->wait(&cond);
197 ASSERT_EQ(0, cond.wait());
198}
199
200TEST_F(TestObjectRecorder, AppendFlushByBytes) {
201 std::string oid = get_temp_oid();
202 ASSERT_EQ(0, create(oid));
203 ASSERT_EQ(0, client_register(oid));
9f95a23c 204 auto metadata = create_metadata(oid);
7c673cae
FG
205 ASSERT_EQ(0, init_metadata(metadata));
206
9f95a23c
TL
207 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
208 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
209 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
210
211 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
212 "payload");
213 journal::AppendBuffers append_buffers;
214 append_buffers = {append_buffer1};
9f95a23c 215 lock.lock();
494da23a 216 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 217 lock.unlock();
7c673cae
FG
218 ASSERT_EQ(1U, object->get_pending_appends());
219
220 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
221 "payload");
222 append_buffers = {append_buffer2};
9f95a23c 223 lock.lock();
494da23a 224 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 225 lock.unlock();
7c673cae
FG
226 ASSERT_EQ(0U, object->get_pending_appends());
227
228 C_SaferCond cond;
229 append_buffer2.first->wait(&cond);
230 ASSERT_EQ(0, cond.wait());
231}
232
233TEST_F(TestObjectRecorder, AppendFlushByAge) {
234 std::string oid = get_temp_oid();
235 ASSERT_EQ(0, create(oid));
236 ASSERT_EQ(0, client_register(oid));
9f95a23c 237 auto metadata = create_metadata(oid);
7c673cae
FG
238 ASSERT_EQ(0, init_metadata(metadata));
239
9f95a23c
TL
240 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
241 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0005, -1);
242 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
243
244 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
245 "payload");
246 journal::AppendBuffers append_buffers;
247 append_buffers = {append_buffer1};
9f95a23c 248 lock.lock();
494da23a 249 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 250 lock.unlock();
7c673cae 251
9f95a23c
TL
252 uint32_t offset = 0;
253 journal::AppendBuffer append_buffer2;
254 while (!append_buffer1.first->is_flush_in_progress() &&
255 !append_buffer1.first->is_complete()) {
256 usleep(1000);
257
258 append_buffer2 = create_append_buffer(234, 124 + offset, "payload");
259 ++offset;
260 append_buffers = {append_buffer2};
261
262 lock.lock();
263 ASSERT_FALSE(object->append(std::move(append_buffers)));
264 lock.unlock();
265 }
7c673cae
FG
266
267 C_SaferCond cond;
268 append_buffer2.first->wait(&cond);
269 ASSERT_EQ(0, cond.wait());
270 ASSERT_EQ(0U, object->get_pending_appends());
271}
272
273TEST_F(TestObjectRecorder, AppendFilledObject) {
274 std::string oid = get_temp_oid();
275 ASSERT_EQ(0, create(oid));
276 ASSERT_EQ(0, client_register(oid));
9f95a23c 277 auto metadata = create_metadata(oid);
7c673cae
FG
278 ASSERT_EQ(0, init_metadata(metadata));
279
9f95a23c
TL
280 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
281 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0, -1);
282 auto object = flusher.create_object(oid, 12, &lock);
7c673cae
FG
283
284 std::string payload(2048, '1');
285 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
286 payload);
287 journal::AppendBuffers append_buffers;
288 append_buffers = {append_buffer1};
9f95a23c 289 lock.lock();
494da23a 290 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 291 lock.unlock();
7c673cae
FG
292
293 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
294 payload);
295 append_buffers = {append_buffer2};
9f95a23c 296 lock.lock();
494da23a 297 ASSERT_TRUE(object->append(std::move(append_buffers)));
9f95a23c 298 lock.unlock();
7c673cae
FG
299
300 C_SaferCond cond;
301 append_buffer2.first->wait(&cond);
302 ASSERT_EQ(0, cond.wait());
303 ASSERT_EQ(0U, object->get_pending_appends());
304}
305
306TEST_F(TestObjectRecorder, Flush) {
307 std::string oid = get_temp_oid();
308 ASSERT_EQ(0, create(oid));
309 ASSERT_EQ(0, client_register(oid));
9f95a23c 310 auto metadata = create_metadata(oid);
7c673cae
FG
311 ASSERT_EQ(0, init_metadata(metadata));
312
9f95a23c
TL
313 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
314 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
315 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
316
317 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
318 "payload");
319 journal::AppendBuffers append_buffers;
320 append_buffers = {append_buffer1};
9f95a23c 321 lock.lock();
494da23a 322 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 323 lock.unlock();
7c673cae
FG
324 ASSERT_EQ(1U, object->get_pending_appends());
325
326 C_SaferCond cond1;
327 object->flush(&cond1);
328 ASSERT_EQ(0, cond1.wait());
329
330 C_SaferCond cond2;
331 append_buffer1.first->wait(&cond2);
332 ASSERT_EQ(0, cond2.wait());
333 ASSERT_EQ(0U, object->get_pending_appends());
334}
335
336TEST_F(TestObjectRecorder, FlushFuture) {
337 std::string oid = get_temp_oid();
338 ASSERT_EQ(0, create(oid));
339 ASSERT_EQ(0, client_register(oid));
9f95a23c 340 auto metadata = create_metadata(oid);
7c673cae
FG
341 ASSERT_EQ(0, init_metadata(metadata));
342
9f95a23c
TL
343 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
344 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1);
345 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
346
347 journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
348 "payload");
349 journal::AppendBuffers append_buffers;
350 append_buffers = {append_buffer};
9f95a23c 351 lock.lock();
494da23a 352 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 353 lock.unlock();
7c673cae
FG
354 ASSERT_EQ(1U, object->get_pending_appends());
355
356 C_SaferCond cond;
357 append_buffer.first->wait(&cond);
7c673cae 358 object->flush(append_buffer.first);
7c673cae
FG
359 ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
360 append_buffer.first->is_complete());
361 ASSERT_EQ(0, cond.wait());
362}
363
364TEST_F(TestObjectRecorder, FlushDetachedFuture) {
365 std::string oid = get_temp_oid();
366 ASSERT_EQ(0, create(oid));
367 ASSERT_EQ(0, client_register(oid));
9f95a23c 368 auto metadata = create_metadata(oid);
7c673cae
FG
369 ASSERT_EQ(0, init_metadata(metadata));
370
9f95a23c
TL
371 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
372 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
373 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
374
375 journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
376 "payload");
377
378 journal::AppendBuffers append_buffers;
379 append_buffers = {append_buffer};
380
7c673cae 381 object->flush(append_buffer.first);
7c673cae 382 ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
9f95a23c 383 lock.lock();
494da23a 384 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 385 lock.unlock();
7c673cae
FG
386
387 // should automatically flush once its attached to the object
388 C_SaferCond cond;
389 append_buffer.first->wait(&cond);
390 ASSERT_EQ(0, cond.wait());
391}
392
393TEST_F(TestObjectRecorder, Close) {
394 std::string oid = get_temp_oid();
395 ASSERT_EQ(0, create(oid));
396 ASSERT_EQ(0, client_register(oid));
9f95a23c 397 auto metadata = create_metadata(oid);
7c673cae
FG
398 ASSERT_EQ(0, init_metadata(metadata));
399
9f95a23c
TL
400 ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
401 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1);
402 auto object = flusher.create_object(oid, 24, &lock);
7c673cae
FG
403
404 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
405 "payload");
406 journal::AppendBuffers append_buffers;
407 append_buffers = {append_buffer1};
9f95a23c 408 lock.lock();
494da23a 409 ASSERT_FALSE(object->append(std::move(append_buffers)));
9f95a23c 410 lock.unlock();
7c673cae
FG
411 ASSERT_EQ(1U, object->get_pending_appends());
412
9f95a23c 413 lock.lock();
7c673cae 414 ASSERT_FALSE(object->close());
9f95a23c
TL
415 ASSERT_TRUE(ceph_mutex_is_locked(lock));
416 lock.unlock();
417
418 ASSERT_TRUE(flusher.wait_for_closed());
7c673cae 419
7c673cae
FG
420 ASSERT_EQ(0U, object->get_pending_appends());
421}
422
423TEST_F(TestObjectRecorder, Overflow) {
424 std::string oid = get_temp_oid();
425 ASSERT_EQ(0, create(oid));
426 ASSERT_EQ(0, client_register(oid));
9f95a23c 427 auto metadata = create_metadata(oid);
7c673cae
FG
428 ASSERT_EQ(0, init_metadata(metadata));
429
9f95a23c
TL
430 ceph::mutex lock1 = ceph::make_mutex("object_recorder_lock_1");
431 ceph::mutex lock2 = ceph::make_mutex("object_recorder_lock_2");
432
433 ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
434 auto object1 = flusher.create_object(oid, 12, &lock1);
7c673cae 435
494da23a 436 std::string payload(1 << 11, '1');
7c673cae
FG
437 journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
438 payload);
439 journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
440 payload);
441 journal::AppendBuffers append_buffers;
442 append_buffers = {append_buffer1, append_buffer2};
9f95a23c 443 lock1.lock();
494da23a 444 ASSERT_TRUE(object1->append(std::move(append_buffers)));
9f95a23c 445 lock1.unlock();
7c673cae
FG
446
447 C_SaferCond cond;
448 append_buffer2.first->wait(&cond);
449 ASSERT_EQ(0, cond.wait());
450 ASSERT_EQ(0U, object1->get_pending_appends());
451
9f95a23c 452 auto object2 = flusher.create_object(oid, 12, &lock2);
494da23a 453
7c673cae
FG
454 journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,
455 payload);
456 append_buffers = {append_buffer3};
9f95a23c 457 lock2.lock();
494da23a 458 ASSERT_FALSE(object2->append(std::move(append_buffers)));
9f95a23c 459 lock2.unlock();
7c673cae
FG
460 append_buffer3.first->flush(NULL);
461
9f95a23c 462 ASSERT_TRUE(flusher.wait_for_overflow());
7c673cae 463}