]> git.proxmox.com Git - ceph.git/blob - ceph/src/journal/JournalMetadata.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / journal / JournalMetadata.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/JournalMetadata.h"
5 #include "journal/Utils.h"
6 #include "common/errno.h"
7 #include "common/Timer.h"
8 #include "cls/journal/cls_journal_client.h"
9 #include <functional>
10 #include <set>
11
12 #define dout_subsys ceph_subsys_journaler
13 #undef dout_prefix
14 #define dout_prefix *_dout << "JournalMetadata: " << this << " "
15
16 namespace journal {
17
18 using namespace cls::journal;
19
20 namespace {
21
22 struct C_GetClient : public Context {
23 CephContext *cct;
24 librados::IoCtx &ioctx;
25 const std::string &oid;
26 AsyncOpTracker &async_op_tracker;
27 std::string client_id;
28 cls::journal::Client *client;
29 Context *on_finish;
30
31 bufferlist out_bl;
32
33 C_GetClient(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
34 AsyncOpTracker &async_op_tracker, const std::string &client_id,
35 cls::journal::Client *client, Context *on_finish)
36 : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
37 client_id(client_id), client(client), on_finish(on_finish) {
38 async_op_tracker.start_op();
39 }
40 ~C_GetClient() override {
41 async_op_tracker.finish_op();
42 }
43
44 virtual void send() {
45 send_get_client();
46 }
47
48 void send_get_client() {
49 ldout(cct, 20) << "C_GetClient: " << __func__ << dendl;
50
51 librados::ObjectReadOperation op;
52 client::get_client_start(&op, client_id);
53
54 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
55 this, nullptr, &utils::rados_state_callback<
56 C_GetClient, &C_GetClient::handle_get_client>);
57
58 int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
59 assert(r == 0);
60 comp->release();
61 }
62
63 void handle_get_client(int r) {
64 ldout(cct, 20) << "C_GetClient: " << __func__ << ": r=" << r << dendl;
65
66 if (r == 0) {
67 bufferlist::iterator it = out_bl.begin();
68 r = client::get_client_finish(&it, client);
69 }
70 complete(r);
71 }
72
73 void finish(int r) override {
74 on_finish->complete(r);
75 }
76 };
77
78 struct C_AllocateTag : public Context {
79 CephContext *cct;
80 librados::IoCtx &ioctx;
81 const std::string &oid;
82 AsyncOpTracker &async_op_tracker;
83 uint64_t tag_class;
84 Tag *tag;
85 Context *on_finish;
86
87 bufferlist out_bl;
88
89 C_AllocateTag(CephContext *cct, librados::IoCtx &ioctx,
90 const std::string &oid, AsyncOpTracker &async_op_tracker,
91 uint64_t tag_class, const bufferlist &data, Tag *tag,
92 Context *on_finish)
93 : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
94 tag_class(tag_class), tag(tag), on_finish(on_finish) {
95 async_op_tracker.start_op();
96 tag->data = data;
97 }
98 ~C_AllocateTag() override {
99 async_op_tracker.finish_op();
100 }
101
102 void send() {
103 send_get_next_tag_tid();
104 }
105
106 void send_get_next_tag_tid() {
107 ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
108
109 librados::ObjectReadOperation op;
110 client::get_next_tag_tid_start(&op);
111
112 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
113 this, nullptr, &utils::rados_state_callback<
114 C_AllocateTag, &C_AllocateTag::handle_get_next_tag_tid>);
115
116 out_bl.clear();
117 int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
118 assert(r == 0);
119 comp->release();
120 }
121
122 void handle_get_next_tag_tid(int r) {
123 ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
124
125 if (r == 0) {
126 bufferlist::iterator iter = out_bl.begin();
127 r = client::get_next_tag_tid_finish(&iter, &tag->tid);
128 }
129 if (r < 0) {
130 complete(r);
131 return;
132 }
133 send_tag_create();
134 }
135
136 void send_tag_create() {
137 ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
138
139 librados::ObjectWriteOperation op;
140 client::tag_create(&op, tag->tid, tag_class, tag->data);
141
142 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
143 this, nullptr, &utils::rados_state_callback<
144 C_AllocateTag, &C_AllocateTag::handle_tag_create>);
145
146 int r = ioctx.aio_operate(oid, comp, &op);
147 assert(r == 0);
148 comp->release();
149 }
150
151 void handle_tag_create(int r) {
152 ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
153
154 if (r == -ESTALE) {
155 send_get_next_tag_tid();
156 return;
157 } else if (r < 0) {
158 complete(r);
159 return;
160 }
161
162 send_get_tag();
163 }
164
165 void send_get_tag() {
166 ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
167
168 librados::ObjectReadOperation op;
169 client::get_tag_start(&op, tag->tid);
170
171 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
172 this, nullptr, &utils::rados_state_callback<
173 C_AllocateTag, &C_AllocateTag::handle_get_tag>);
174
175 out_bl.clear();
176 int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
177 assert(r == 0);
178 comp->release();
179 }
180
181 void handle_get_tag(int r) {
182 ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
183
184 if (r == 0) {
185 bufferlist::iterator iter = out_bl.begin();
186
187 cls::journal::Tag journal_tag;
188 r = client::get_tag_finish(&iter, &journal_tag);
189 if (r == 0) {
190 *tag = journal_tag;
191 }
192 }
193 complete(r);
194 }
195
196 void finish(int r) override {
197 on_finish->complete(r);
198 }
199 };
200
201 struct C_GetTag : public Context {
202 CephContext *cct;
203 librados::IoCtx &ioctx;
204 const std::string &oid;
205 AsyncOpTracker &async_op_tracker;
206 uint64_t tag_tid;
207 JournalMetadata::Tag *tag;
208 Context *on_finish;
209
210 bufferlist out_bl;
211
212 C_GetTag(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
213 AsyncOpTracker &async_op_tracker, uint64_t tag_tid,
214 JournalMetadata::Tag *tag, Context *on_finish)
215 : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
216 tag_tid(tag_tid), tag(tag), on_finish(on_finish) {
217 async_op_tracker.start_op();
218 }
219 ~C_GetTag() override {
220 async_op_tracker.finish_op();
221 }
222
223 void send() {
224 send_get_tag();
225 }
226
227 void send_get_tag() {
228 librados::ObjectReadOperation op;
229 client::get_tag_start(&op, tag_tid);
230
231 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
232 this, nullptr, &utils::rados_state_callback<
233 C_GetTag, &C_GetTag::handle_get_tag>);
234
235 int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
236 assert(r == 0);
237 comp->release();
238 }
239
240 void handle_get_tag(int r) {
241 if (r == 0) {
242 bufferlist::iterator iter = out_bl.begin();
243 r = client::get_tag_finish(&iter, tag);
244 }
245 complete(r);
246 }
247
248 void finish(int r) override {
249 on_finish->complete(r);
250 }
251 };
252
253 struct C_GetTags : public Context {
254 CephContext *cct;
255 librados::IoCtx &ioctx;
256 const std::string &oid;
257 const std::string &client_id;
258 AsyncOpTracker &async_op_tracker;
259 uint64_t start_after_tag_tid;
260 boost::optional<uint64_t> tag_class;
261 JournalMetadata::Tags *tags;
262 Context *on_finish;
263
264 const uint64_t MAX_RETURN = 64;
265 bufferlist out_bl;
266
267 C_GetTags(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
268 const std::string &client_id, AsyncOpTracker &async_op_tracker,
269 uint64_t start_after_tag_tid,
270 const boost::optional<uint64_t> &tag_class,
271 JournalMetadata::Tags *tags, Context *on_finish)
272 : cct(cct), ioctx(ioctx), oid(oid), client_id(client_id),
273 async_op_tracker(async_op_tracker),
274 start_after_tag_tid(start_after_tag_tid), tag_class(tag_class),
275 tags(tags), on_finish(on_finish) {
276 async_op_tracker.start_op();
277 }
278 ~C_GetTags() override {
279 async_op_tracker.finish_op();
280 }
281
282 void send() {
283 send_tag_list();
284 }
285
286 void send_tag_list() {
287 librados::ObjectReadOperation op;
288 client::tag_list_start(&op, start_after_tag_tid, MAX_RETURN, client_id,
289 tag_class);
290
291 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
292 this, nullptr, &utils::rados_state_callback<
293 C_GetTags, &C_GetTags::handle_tag_list>);
294
295 out_bl.clear();
296 int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
297 assert(r == 0);
298 comp->release();
299 }
300
301 void handle_tag_list(int r) {
302 if (r == 0) {
303 std::set<cls::journal::Tag> journal_tags;
304 bufferlist::iterator iter = out_bl.begin();
305 r = client::tag_list_finish(&iter, &journal_tags);
306 if (r == 0) {
307 for (auto &journal_tag : journal_tags) {
308 tags->push_back(journal_tag);
309 start_after_tag_tid = journal_tag.tid;
310 }
311
312 if (journal_tags.size() == MAX_RETURN) {
313 send_tag_list();
314 return;
315 }
316 }
317 }
318 complete(r);
319 }
320
321 void finish(int r) override {
322 on_finish->complete(r);
323 }
324 };
325
326 struct C_FlushCommitPosition : public Context {
327 Context *commit_position_ctx;
328 Context *on_finish;
329
330 C_FlushCommitPosition(Context *commit_position_ctx, Context *on_finish)
331 : commit_position_ctx(commit_position_ctx), on_finish(on_finish) {
332 }
333 void finish(int r) override {
334 if (commit_position_ctx != nullptr) {
335 commit_position_ctx->complete(r);
336 }
337 on_finish->complete(r);
338 }
339 };
340
341 struct C_AssertActiveTag : public Context {
342 CephContext *cct;
343 librados::IoCtx &ioctx;
344 const std::string &oid;
345 AsyncOpTracker &async_op_tracker;
346 std::string client_id;
347 uint64_t tag_tid;
348 Context *on_finish;
349
350 bufferlist out_bl;
351
352 C_AssertActiveTag(CephContext *cct, librados::IoCtx &ioctx,
353 const std::string &oid, AsyncOpTracker &async_op_tracker,
354 const std::string &client_id, uint64_t tag_tid,
355 Context *on_finish)
356 : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
357 client_id(client_id), tag_tid(tag_tid), on_finish(on_finish) {
358 async_op_tracker.start_op();
359 }
360 ~C_AssertActiveTag() override {
361 async_op_tracker.finish_op();
362 }
363
364 void send() {
365 ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << dendl;
366
367 librados::ObjectReadOperation op;
368 client::tag_list_start(&op, tag_tid, 2, client_id, boost::none);
369
370 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
371 this, nullptr, &utils::rados_state_callback<
372 C_AssertActiveTag, &C_AssertActiveTag::handle_send>);
373
374 int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
375 assert(r == 0);
376 comp->release();
377 }
378
379 void handle_send(int r) {
380 ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << ": r=" << r << dendl;
381
382 std::set<cls::journal::Tag> tags;
383 if (r == 0) {
384 bufferlist::iterator it = out_bl.begin();
385 r = client::tag_list_finish(&it, &tags);
386 }
387
388 // NOTE: since 0 is treated as an uninitialized list filter, we need to
389 // load to entries and look at the last tid
390 if (r == 0 && !tags.empty() && tags.rbegin()->tid > tag_tid) {
391 r = -ESTALE;
392 }
393 complete(r);
394 }
395
396 void finish(int r) override {
397 on_finish->complete(r);
398 }
399 };
400
401 } // anonymous namespace
402
403 JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
404 Mutex *timer_lock, librados::IoCtx &ioctx,
405 const std::string &oid,
406 const std::string &client_id,
407 const Settings &settings)
408 : RefCountedObject(NULL, 0), m_cct(NULL), m_oid(oid),
409 m_client_id(client_id), m_settings(settings), m_order(0),
410 m_splay_width(0), m_pool_id(-1), m_initialized(false),
411 m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
412 m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this),
413 m_watch_handle(0), m_minimum_set(0), m_active_set(0),
414 m_update_notifications(0), m_commit_position_ctx(NULL),
415 m_commit_position_task_ctx(NULL) {
416 m_ioctx.dup(ioctx);
417 m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
418 }
419
420 JournalMetadata::~JournalMetadata() {
421 Mutex::Locker locker(m_lock);
422 assert(!m_initialized);
423 }
424
425 void JournalMetadata::init(Context *on_finish) {
426 {
427 Mutex::Locker locker(m_lock);
428 assert(!m_initialized);
429 m_initialized = true;
430 }
431
432 // chain the init sequence (reverse order)
433 on_finish = utils::create_async_context_callback(
434 this, on_finish);
435 on_finish = new C_ImmutableMetadata(this, on_finish);
436 on_finish = new FunctionContext([this, on_finish](int r) {
437 if (r < 0) {
438 lderr(m_cct) << __func__ << ": failed to watch journal"
439 << cpp_strerror(r) << dendl;
440 Mutex::Locker locker(m_lock);
441 m_watch_handle = 0;
442 on_finish->complete(r);
443 return;
444 }
445
446 get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, on_finish);
447 });
448
449 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
450 on_finish, nullptr, utils::rados_ctx_callback);
451 int r = m_ioctx.aio_watch(m_oid, comp, &m_watch_handle, &m_watch_ctx);
452 assert(r == 0);
453 comp->release();
454 }
455
456 void JournalMetadata::shut_down(Context *on_finish) {
457
458 ldout(m_cct, 20) << __func__ << dendl;
459
460 uint64_t watch_handle = 0;
461 {
462 Mutex::Locker locker(m_lock);
463 m_initialized = false;
464 std::swap(watch_handle, m_watch_handle);
465 }
466
467 // chain the shut down sequence (reverse order)
468 on_finish = utils::create_async_context_callback(
469 this, on_finish);
470 on_finish = new FunctionContext([this, on_finish](int r) {
471 ldout(m_cct, 20) << "shut_down: waiting for ops" << dendl;
472 m_async_op_tracker.wait_for_ops(on_finish);
473 });
474 on_finish = new FunctionContext([this, on_finish](int r) {
475 ldout(m_cct, 20) << "shut_down: flushing watch" << dendl;
476 librados::Rados rados(m_ioctx);
477 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
478 on_finish, nullptr, utils::rados_ctx_callback);
479 r = rados.aio_watch_flush(comp);
480 assert(r == 0);
481 comp->release();
482 });
483 on_finish = new FunctionContext([this, on_finish](int r) {
484 flush_commit_position(on_finish);
485 });
486 if (watch_handle != 0) {
487 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
488 on_finish, nullptr, utils::rados_ctx_callback);
489 int r = m_ioctx.aio_unwatch(watch_handle, comp);
490 assert(r == 0);
491 comp->release();
492 } else {
493 on_finish->complete(0);
494 }
495 }
496
497 void JournalMetadata::get_immutable_metadata(uint8_t *order,
498 uint8_t *splay_width,
499 int64_t *pool_id,
500 Context *on_finish) {
501 client::get_immutable_metadata(m_ioctx, m_oid, order, splay_width, pool_id,
502 on_finish);
503 }
504
505 void JournalMetadata::get_mutable_metadata(uint64_t *minimum_set,
506 uint64_t *active_set,
507 RegisteredClients *clients,
508 Context *on_finish) {
509 client::get_mutable_metadata(m_ioctx, m_oid, minimum_set, active_set, clients,
510 on_finish);
511 }
512
513 void JournalMetadata::register_client(const bufferlist &data,
514 Context *on_finish) {
515 ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
516 librados::ObjectWriteOperation op;
517 client::client_register(&op, m_client_id, data);
518
519 C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
520
521 librados::AioCompletion *comp =
522 librados::Rados::aio_create_completion(ctx, NULL,
523 utils::rados_ctx_callback);
524 int r = m_ioctx.aio_operate(m_oid, comp, &op);
525 assert(r == 0);
526 comp->release();
527 }
528
529 void JournalMetadata::update_client(const bufferlist &data,
530 Context *on_finish) {
531 ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
532 librados::ObjectWriteOperation op;
533 client::client_update_data(&op, m_client_id, data);
534
535 C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
536
537 librados::AioCompletion *comp =
538 librados::Rados::aio_create_completion(ctx, NULL,
539 utils::rados_ctx_callback);
540 int r = m_ioctx.aio_operate(m_oid, comp, &op);
541 assert(r == 0);
542 comp->release();
543 }
544
545 void JournalMetadata::unregister_client(Context *on_finish) {
546 assert(!m_client_id.empty());
547
548 ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
549 librados::ObjectWriteOperation op;
550 client::client_unregister(&op, m_client_id);
551
552 C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
553
554 librados::AioCompletion *comp =
555 librados::Rados::aio_create_completion(ctx, NULL,
556 utils::rados_ctx_callback);
557 int r = m_ioctx.aio_operate(m_oid, comp, &op);
558 assert(r == 0);
559 comp->release();
560 }
561
562 void JournalMetadata::allocate_tag(uint64_t tag_class, const bufferlist &data,
563 Tag *tag, Context *on_finish) {
564 on_finish = new C_NotifyUpdate(this, on_finish);
565 C_AllocateTag *ctx = new C_AllocateTag(m_cct, m_ioctx, m_oid,
566 m_async_op_tracker, tag_class,
567 data, tag, on_finish);
568 ctx->send();
569 }
570
571 void JournalMetadata::get_client(const std::string &client_id,
572 cls::journal::Client *client,
573 Context *on_finish) {
574 C_GetClient *ctx = new C_GetClient(m_cct, m_ioctx, m_oid, m_async_op_tracker,
575 client_id, client, on_finish);
576 ctx->send();
577 }
578
579 void JournalMetadata::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) {
580 C_GetTag *ctx = new C_GetTag(m_cct, m_ioctx, m_oid, m_async_op_tracker,
581 tag_tid, tag, on_finish);
582 ctx->send();
583 }
584
585 void JournalMetadata::get_tags(uint64_t start_after_tag_tid,
586 const boost::optional<uint64_t> &tag_class,
587 Tags *tags, Context *on_finish) {
588 C_GetTags *ctx = new C_GetTags(m_cct, m_ioctx, m_oid, m_client_id,
589 m_async_op_tracker, start_after_tag_tid,
590 tag_class, tags, on_finish);
591 ctx->send();
592 }
593
594 void JournalMetadata::add_listener(JournalMetadataListener *listener) {
595 Mutex::Locker locker(m_lock);
596 while (m_update_notifications > 0) {
597 m_update_cond.Wait(m_lock);
598 }
599 m_listeners.push_back(listener);
600 }
601
602 void JournalMetadata::remove_listener(JournalMetadataListener *listener) {
603 Mutex::Locker locker(m_lock);
604 while (m_update_notifications > 0) {
605 m_update_cond.Wait(m_lock);
606 }
607 m_listeners.remove(listener);
608 }
609
610 void JournalMetadata::set_minimum_set(uint64_t object_set) {
611 Mutex::Locker locker(m_lock);
612
613 ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set
614 << ", new=" << object_set << dendl;
615 if (m_minimum_set >= object_set) {
616 return;
617 }
618
619 librados::ObjectWriteOperation op;
620 client::set_minimum_set(&op, object_set);
621
622 C_NotifyUpdate *ctx = new C_NotifyUpdate(this);
623 librados::AioCompletion *comp =
624 librados::Rados::aio_create_completion(ctx, NULL,
625 utils::rados_ctx_callback);
626 int r = m_ioctx.aio_operate(m_oid, comp, &op);
627 assert(r == 0);
628 comp->release();
629
630 m_minimum_set = object_set;
631 }
632
633 int JournalMetadata::set_active_set(uint64_t object_set) {
634 C_SaferCond ctx;
635 set_active_set(object_set, &ctx);
636 return ctx.wait();
637 }
638
639 void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) {
640 Mutex::Locker locker(m_lock);
641
642 ldout(m_cct, 20) << __func__ << ": current=" << m_active_set
643 << ", new=" << object_set << dendl;
644 if (m_active_set >= object_set) {
645 m_work_queue->queue(on_finish, 0);
646 return;
647 }
648
649 librados::ObjectWriteOperation op;
650 client::set_active_set(&op, object_set);
651
652 C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
653 librados::AioCompletion *comp =
654 librados::Rados::aio_create_completion(ctx, NULL,
655 utils::rados_ctx_callback);
656 int r = m_ioctx.aio_operate(m_oid, comp, &op);
657 assert(r == 0);
658 comp->release();
659
660 m_active_set = object_set;
661 }
662
663 void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) {
664 Mutex::Locker locker(m_lock);
665
666 C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid,
667 m_async_op_tracker,
668 m_client_id, tag_tid,
669 on_finish);
670 ctx->send();
671 }
672
673 void JournalMetadata::flush_commit_position() {
674 ldout(m_cct, 20) << __func__ << dendl;
675
676 Mutex::Locker timer_locker(*m_timer_lock);
677 Mutex::Locker locker(m_lock);
678 if (m_commit_position_ctx == nullptr) {
679 return;
680 }
681
682 cancel_commit_task();
683 handle_commit_position_task();
684 }
685
686 void JournalMetadata::flush_commit_position(Context *on_safe) {
687 ldout(m_cct, 20) << __func__ << dendl;
688
689 Mutex::Locker timer_locker(*m_timer_lock);
690 Mutex::Locker locker(m_lock);
691 if (m_commit_position_ctx == nullptr) {
692 // nothing to flush
693 if (on_safe != nullptr) {
694 m_work_queue->queue(on_safe, 0);
695 }
696 return;
697 }
698
699 if (on_safe != nullptr) {
700 m_commit_position_ctx = new C_FlushCommitPosition(
701 m_commit_position_ctx, on_safe);
702 }
703 cancel_commit_task();
704 handle_commit_position_task();
705 }
706
707 void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) {
708 Mutex::Locker locker(m_lock);
709 uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid];
710 if (allocated_entry_tid <= entry_tid) {
711 allocated_entry_tid = entry_tid + 1;
712 }
713 }
714
715 bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid,
716 uint64_t *entry_tid) const {
717 Mutex::Locker locker(m_lock);
718
719 AllocatedEntryTids::const_iterator it = m_allocated_entry_tids.find(tag_tid);
720 if (it == m_allocated_entry_tids.end()) {
721 return false;
722 }
723
724 assert(it->second > 0);
725 *entry_tid = it->second - 1;
726 return true;
727 }
728
729 void JournalMetadata::handle_immutable_metadata(int r, Context *on_init) {
730 if (r < 0) {
731 lderr(m_cct) << "failed to initialize immutable metadata: "
732 << cpp_strerror(r) << dendl;
733 on_init->complete(r);
734 return;
735 }
736
737 ldout(m_cct, 10) << "initialized immutable metadata" << dendl;
738 refresh(on_init);
739 }
740
741 void JournalMetadata::refresh(Context *on_complete) {
742 ldout(m_cct, 10) << "refreshing mutable metadata" << dendl;
743 C_Refresh *refresh = new C_Refresh(this, on_complete);
744 get_mutable_metadata(&refresh->minimum_set, &refresh->active_set,
745 &refresh->registered_clients, refresh);
746 }
747
748 void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
749 ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl;
750 if (r == 0) {
751 Mutex::Locker locker(m_lock);
752
753 Client client(m_client_id, bufferlist());
754 RegisteredClients::iterator it = refresh->registered_clients.find(client);
755 if (it != refresh->registered_clients.end()) {
756 if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) {
757 ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id
758 << dendl;
759 }
760 m_minimum_set = MAX(m_minimum_set, refresh->minimum_set);
761 m_active_set = MAX(m_active_set, refresh->active_set);
762 m_registered_clients = refresh->registered_clients;
763 m_client = *it;
764
765 ++m_update_notifications;
766 m_lock.Unlock();
767 for (Listeners::iterator it = m_listeners.begin();
768 it != m_listeners.end(); ++it) {
769 (*it)->handle_update(this);
770 }
771 m_lock.Lock();
772 if (--m_update_notifications == 0) {
773 m_update_cond.Signal();
774 }
775 } else {
776 lderr(m_cct) << "failed to locate client: " << m_client_id << dendl;
777 r = -ENOENT;
778 }
779 }
780
781 if (refresh->on_finish != NULL) {
782 refresh->on_finish->complete(r);
783 }
784 }
785
786 void JournalMetadata::cancel_commit_task() {
787 ldout(m_cct, 20) << __func__ << dendl;
788
789 assert(m_timer_lock->is_locked());
790 assert(m_lock.is_locked());
791 assert(m_commit_position_ctx != nullptr);
792 assert(m_commit_position_task_ctx != nullptr);
793
794 m_timer->cancel_event(m_commit_position_task_ctx);
795 m_commit_position_task_ctx = NULL;
796 }
797
798 void JournalMetadata::schedule_commit_task() {
799 ldout(m_cct, 20) << __func__ << dendl;
800
801 assert(m_timer_lock->is_locked());
802 assert(m_lock.is_locked());
803 assert(m_commit_position_ctx != nullptr);
804 if (m_commit_position_task_ctx == NULL) {
805 m_commit_position_task_ctx = new C_CommitPositionTask(this);
806 m_timer->add_event_after(m_settings.commit_interval,
807 m_commit_position_task_ctx);
808 }
809 }
810
811 void JournalMetadata::handle_commit_position_task() {
812 assert(m_timer_lock->is_locked());
813 assert(m_lock.is_locked());
814 ldout(m_cct, 20) << __func__ << ": "
815 << "client_id=" << m_client_id << ", "
816 << "commit_position=" << m_commit_position << dendl;
817
818 librados::ObjectWriteOperation op;
819 client::client_commit(&op, m_client_id, m_commit_position);
820
821 Context *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
822 m_commit_position_ctx = NULL;
823
824 ctx = schedule_laggy_clients_disconnect(ctx);
825
826 librados::AioCompletion *comp =
827 librados::Rados::aio_create_completion(ctx, NULL,
828 utils::rados_ctx_callback);
829 int r = m_ioctx.aio_operate(m_oid, comp, &op);
830 assert(r == 0);
831 comp->release();
832
833 m_commit_position_task_ctx = NULL;
834 }
835
836 void JournalMetadata::schedule_watch_reset() {
837 assert(m_timer_lock->is_locked());
838 m_timer->add_event_after(1, new C_WatchReset(this));
839 }
840
841 void JournalMetadata::handle_watch_reset() {
842 assert(m_timer_lock->is_locked());
843 if (!m_initialized) {
844 return;
845 }
846
847 int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
848 if (r < 0) {
849 if (r == -ENOENT) {
850 ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl;
851 } else if (r == -EBLACKLISTED) {
852 ldout(m_cct, 5) << __func__ << ": client blacklisted" << dendl;
853 } else {
854 lderr(m_cct) << __func__ << ": failed to watch journal: "
855 << cpp_strerror(r) << dendl;
856 }
857 schedule_watch_reset();
858 } else {
859 ldout(m_cct, 10) << __func__ << ": reset journal watch" << dendl;
860 refresh(NULL);
861 }
862 }
863
864 void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) {
865 ldout(m_cct, 10) << "journal header updated" << dendl;
866
867 bufferlist bl;
868 m_ioctx.notify_ack(m_oid, notify_id, cookie, bl);
869
870 refresh(NULL);
871 }
872
873 void JournalMetadata::handle_watch_error(int err) {
874 if (err == -ENOTCONN) {
875 ldout(m_cct, 5) << "journal watch error: header removed" << dendl;
876 } else if (err == -EBLACKLISTED) {
877 lderr(m_cct) << "journal watch error: client blacklisted" << dendl;
878 } else {
879 lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
880 }
881
882 Mutex::Locker timer_locker(*m_timer_lock);
883 Mutex::Locker locker(m_lock);
884
885 // release old watch on error
886 if (m_watch_handle != 0) {
887 m_ioctx.unwatch2(m_watch_handle);
888 m_watch_handle = 0;
889 }
890
891 if (m_initialized && err != -ENOENT) {
892 schedule_watch_reset();
893 }
894 }
895
896 uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num,
897 uint64_t tag_tid,
898 uint64_t entry_tid) {
899 Mutex::Locker locker(m_lock);
900 uint64_t commit_tid = ++m_commit_tid;
901 m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag_tid,
902 entry_tid);
903
904 ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " ["
905 << "object_num=" << object_num << ", "
906 << "tag_tid=" << tag_tid << ", "
907 << "entry_tid=" << entry_tid << "]"
908 << dendl;
909 return commit_tid;
910 }
911
912 void JournalMetadata::overflow_commit_tid(uint64_t commit_tid,
913 uint64_t object_num) {
914 Mutex::Locker locker(m_lock);
915
916 auto it = m_pending_commit_tids.find(commit_tid);
917 assert(it != m_pending_commit_tids.end());
918 assert(it->second.object_num < object_num);
919
920 ldout(m_cct, 20) << __func__ << ": "
921 << "commit_tid=" << commit_tid << ", "
922 << "old_object_num=" << it->second.object_num << ", "
923 << "new_object_num=" << object_num << dendl;
924 it->second.object_num = object_num;
925 }
926
927 void JournalMetadata::get_commit_entry(uint64_t commit_tid,
928 uint64_t *object_num,
929 uint64_t *tag_tid, uint64_t *entry_tid) {
930 Mutex::Locker locker(m_lock);
931
932 auto it = m_pending_commit_tids.find(commit_tid);
933 assert(it != m_pending_commit_tids.end());
934
935 *object_num = it->second.object_num;
936 *tag_tid = it->second.tag_tid;
937 *entry_tid = it->second.entry_tid;
938 }
939
940 void JournalMetadata::committed(uint64_t commit_tid,
941 const CreateContext &create_context) {
942 ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl;
943
944 ObjectSetPosition commit_position;
945 Context *stale_ctx = nullptr;
946 {
947 Mutex::Locker timer_locker(*m_timer_lock);
948 Mutex::Locker locker(m_lock);
949 assert(commit_tid > m_commit_position_tid);
950
951 if (!m_commit_position.object_positions.empty()) {
952 // in-flight commit position update
953 commit_position = m_commit_position;
954 } else {
955 // safe commit position
956 commit_position = m_client.commit_position;
957 }
958
959 CommitTids::iterator it = m_pending_commit_tids.find(commit_tid);
960 assert(it != m_pending_commit_tids.end());
961
962 CommitEntry &commit_entry = it->second;
963 commit_entry.committed = true;
964
965 bool update_commit_position = false;
966 while (!m_pending_commit_tids.empty()) {
967 CommitTids::iterator it = m_pending_commit_tids.begin();
968 CommitEntry &commit_entry = it->second;
969 if (!commit_entry.committed) {
970 break;
971 }
972
973 commit_position.object_positions.emplace_front(
974 commit_entry.object_num, commit_entry.tag_tid,
975 commit_entry.entry_tid);
976 m_pending_commit_tids.erase(it);
977 update_commit_position = true;
978 }
979
980 if (!update_commit_position) {
981 return;
982 }
983
984 // prune the position to have one position per splay offset
985 std::set<uint8_t> in_use_splay_offsets;
986 ObjectPositions::iterator ob_it = commit_position.object_positions.begin();
987 while (ob_it != commit_position.object_positions.end()) {
988 uint8_t splay_offset = ob_it->object_number % m_splay_width;
989 if (!in_use_splay_offsets.insert(splay_offset).second) {
990 ob_it = commit_position.object_positions.erase(ob_it);
991 } else {
992 ++ob_it;
993 }
994 }
995
996 stale_ctx = m_commit_position_ctx;
997 m_commit_position_ctx = create_context();
998 m_commit_position = commit_position;
999 m_commit_position_tid = commit_tid;
1000
1001 ldout(m_cct, 20) << "updated commit position: " << commit_position << ", "
1002 << "on_safe=" << m_commit_position_ctx << dendl;
1003 schedule_commit_task();
1004 }
1005
1006
1007 if (stale_ctx != nullptr) {
1008 ldout(m_cct, 20) << "canceling stale commit: on_safe=" << stale_ctx
1009 << dendl;
1010 stale_ctx->complete(-ESTALE);
1011 }
1012 }
1013
1014 void JournalMetadata::notify_update() {
1015 ldout(m_cct, 10) << "notifying journal header update" << dendl;
1016
1017 bufferlist bl;
1018 m_ioctx.notify2(m_oid, bl, 5000, NULL);
1019 }
1020
1021 void JournalMetadata::async_notify_update(Context *on_safe) {
1022 ldout(m_cct, 10) << "async notifying journal header update" << dendl;
1023
1024 C_AioNotify *ctx = new C_AioNotify(this, on_safe);
1025 librados::AioCompletion *comp =
1026 librados::Rados::aio_create_completion(ctx, NULL,
1027 utils::rados_ctx_callback);
1028
1029 bufferlist bl;
1030 int r = m_ioctx.aio_notify(m_oid, comp, bl, 5000, NULL);
1031 assert(r == 0);
1032
1033 comp->release();
1034 }
1035
1036 void JournalMetadata::wait_for_ops() {
1037 C_SaferCond ctx;
1038 m_async_op_tracker.wait_for_ops(&ctx);
1039 ctx.wait();
1040 }
1041
1042 void JournalMetadata::handle_notified(int r) {
1043 ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl;
1044 }
1045
1046 Context *JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
1047 assert(m_lock.is_locked());
1048
1049 ldout(m_cct, 20) << __func__ << dendl;
1050
1051 if (m_settings.max_concurrent_object_sets <= 0) {
1052 return on_finish;
1053 }
1054
1055 Context *ctx = on_finish;
1056
1057 for (auto &c : m_registered_clients) {
1058 if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
1059 c.id == m_client_id ||
1060 m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
1061 continue;
1062 }
1063 const std::string &client_id = c.id;
1064 uint64_t object_set = 0;
1065 if (!c.commit_position.object_positions.empty()) {
1066 auto &position = *(c.commit_position.object_positions.begin());
1067 object_set = position.object_number / m_splay_width;
1068 }
1069
1070 if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
1071 ldout(m_cct, 1) << __func__ << ": " << client_id
1072 << ": scheduling disconnect" << dendl;
1073
1074 ctx = new FunctionContext([this, client_id, ctx](int r1) {
1075 ldout(m_cct, 10) << __func__ << ": " << client_id
1076 << ": flagging disconnected" << dendl;
1077
1078 librados::ObjectWriteOperation op;
1079 client::client_update_state(&op, client_id,
1080 cls::journal::CLIENT_STATE_DISCONNECTED);
1081
1082 librados::AioCompletion *comp =
1083 librados::Rados::aio_create_completion(ctx, nullptr,
1084 utils::rados_ctx_callback);
1085 int r = m_ioctx.aio_operate(m_oid, comp, &op);
1086 assert(r == 0);
1087 comp->release();
1088 });
1089 }
1090 }
1091
1092 if (ctx == on_finish) {
1093 ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl;
1094 }
1095
1096 return ctx;
1097 }
1098
1099 std::ostream &operator<<(std::ostream &os,
1100 const JournalMetadata::RegisteredClients &clients) {
1101 os << "[";
1102 for (JournalMetadata::RegisteredClients::const_iterator c = clients.begin();
1103 c != clients.end(); ++c) {
1104 os << (c == clients.begin() ? "" : ", " ) << *c;
1105 }
1106 os << "]";
1107 return os;
1108 }
1109
1110 std::ostream &operator<<(std::ostream &os,
1111 const JournalMetadata &jm) {
1112 Mutex::Locker locker(jm.m_lock);
1113 os << "[oid=" << jm.m_oid << ", "
1114 << "initialized=" << jm.m_initialized << ", "
1115 << "order=" << (int)jm.m_order << ", "
1116 << "splay_width=" << (int)jm.m_splay_width << ", "
1117 << "pool_id=" << jm.m_pool_id << ", "
1118 << "minimum_set=" << jm.m_minimum_set << ", "
1119 << "active_set=" << jm.m_active_set << ", "
1120 << "client_id=" << jm.m_client_id << ", "
1121 << "commit_tid=" << jm.m_commit_tid << ", "
1122 << "commit_interval=" << jm.m_settings.commit_interval << ", "
1123 << "commit_position=" << jm.m_commit_position << ", "
1124 << "registered_clients=" << jm.m_registered_clients << "]";
1125 return os;
1126 }
1127
1128 } // namespace journal