]> git.proxmox.com Git - ceph.git/blame - ceph/src/journal/JournalMetadata.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / journal / JournalMetadata.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "journal/JournalMetadata.h"
5#include "journal/Utils.h"
6#include "common/errno.h"
7#include "common/Timer.h"
8#include "cls/journal/cls_journal_client.h"
9#include <functional>
10#include <set>
11
12#define dout_subsys ceph_subsys_journaler
13#undef dout_prefix
14#define dout_prefix *_dout << "JournalMetadata: " << this << " "
15
16namespace journal {
17
18using namespace cls::journal;
19
20namespace {
21
22struct C_GetClient : public Context {
23 CephContext *cct;
24 librados::IoCtx &ioctx;
25 const std::string &oid;
26 AsyncOpTracker &async_op_tracker;
27 std::string client_id;
28 cls::journal::Client *client;
29 Context *on_finish;
30
31 bufferlist out_bl;
32
33 C_GetClient(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
34 AsyncOpTracker &async_op_tracker, const std::string &client_id,
35 cls::journal::Client *client, Context *on_finish)
36 : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
37 client_id(client_id), client(client), on_finish(on_finish) {
38 async_op_tracker.start_op();
39 }
40 ~C_GetClient() override {
41 async_op_tracker.finish_op();
42 }
43
44 virtual void send() {
45 send_get_client();
46 }
47
48 void send_get_client() {
49 ldout(cct, 20) << "C_GetClient: " << __func__ << dendl;
50
51 librados::ObjectReadOperation op;
52 client::get_client_start(&op, client_id);
53
54 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
9f95a23c 55 this, &utils::rados_state_callback<
7c673cae
FG
56 C_GetClient, &C_GetClient::handle_get_client>);
57
58 int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
11fdf7f2 59 ceph_assert(r == 0);
7c673cae
FG
60 comp->release();
61 }
62
63 void handle_get_client(int r) {
64 ldout(cct, 20) << "C_GetClient: " << __func__ << ": r=" << r << dendl;
65
66 if (r == 0) {
11fdf7f2 67 auto it = out_bl.cbegin();
7c673cae
FG
68 r = client::get_client_finish(&it, client);
69 }
70 complete(r);
71 }
72
73 void finish(int r) override {
74 on_finish->complete(r);
75 }
76};
77
78struct C_AllocateTag : public Context {
79 CephContext *cct;
80 librados::IoCtx &ioctx;
81 const std::string &oid;
82 AsyncOpTracker &async_op_tracker;
83 uint64_t tag_class;
84 Tag *tag;
85 Context *on_finish;
86
87 bufferlist out_bl;
88
89 C_AllocateTag(CephContext *cct, librados::IoCtx &ioctx,
90 const std::string &oid, AsyncOpTracker &async_op_tracker,
91 uint64_t tag_class, const bufferlist &data, Tag *tag,
92 Context *on_finish)
93 : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
94 tag_class(tag_class), tag(tag), on_finish(on_finish) {
95 async_op_tracker.start_op();
96 tag->data = data;
97 }
98 ~C_AllocateTag() override {
99 async_op_tracker.finish_op();
100 }
101
102 void send() {
103 send_get_next_tag_tid();
104 }
105
106 void send_get_next_tag_tid() {
107 ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
108
109 librados::ObjectReadOperation op;
110 client::get_next_tag_tid_start(&op);
111
112 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
9f95a23c 113 this, &utils::rados_state_callback<
7c673cae
FG
114 C_AllocateTag, &C_AllocateTag::handle_get_next_tag_tid>);
115
116 out_bl.clear();
117 int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
11fdf7f2 118 ceph_assert(r == 0);
7c673cae
FG
119 comp->release();
120 }
121
122 void handle_get_next_tag_tid(int r) {
123 ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
124
125 if (r == 0) {
11fdf7f2 126 auto iter = out_bl.cbegin();
7c673cae
FG
127 r = client::get_next_tag_tid_finish(&iter, &tag->tid);
128 }
129 if (r < 0) {
130 complete(r);
131 return;
132 }
133 send_tag_create();
134 }
135
136 void send_tag_create() {
137 ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
138
139 librados::ObjectWriteOperation op;
140 client::tag_create(&op, tag->tid, tag_class, tag->data);
141
142 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
9f95a23c 143 this, &utils::rados_state_callback<
7c673cae
FG
144 C_AllocateTag, &C_AllocateTag::handle_tag_create>);
145
146 int r = ioctx.aio_operate(oid, comp, &op);
11fdf7f2 147 ceph_assert(r == 0);
7c673cae
FG
148 comp->release();
149 }
150
151 void handle_tag_create(int r) {
152 ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
153
154 if (r == -ESTALE) {
155 send_get_next_tag_tid();
156 return;
157 } else if (r < 0) {
158 complete(r);
159 return;
160 }
161
162 send_get_tag();
163 }
164
165 void send_get_tag() {
166 ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
167
168 librados::ObjectReadOperation op;
169 client::get_tag_start(&op, tag->tid);
170
171 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
9f95a23c 172 this, &utils::rados_state_callback<
7c673cae
FG
173 C_AllocateTag, &C_AllocateTag::handle_get_tag>);
174
175 out_bl.clear();
176 int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
11fdf7f2 177 ceph_assert(r == 0);
7c673cae
FG
178 comp->release();
179 }
180
181 void handle_get_tag(int r) {
182 ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
183
184 if (r == 0) {
11fdf7f2 185 auto iter = out_bl.cbegin();
7c673cae
FG
186
187 cls::journal::Tag journal_tag;
188 r = client::get_tag_finish(&iter, &journal_tag);
189 if (r == 0) {
190 *tag = journal_tag;
191 }
192 }
193 complete(r);
194 }
195
196 void finish(int r) override {
197 on_finish->complete(r);
198 }
199};
200
201struct C_GetTag : public Context {
202 CephContext *cct;
203 librados::IoCtx &ioctx;
204 const std::string &oid;
205 AsyncOpTracker &async_op_tracker;
206 uint64_t tag_tid;
207 JournalMetadata::Tag *tag;
208 Context *on_finish;
209
210 bufferlist out_bl;
211
212 C_GetTag(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
213 AsyncOpTracker &async_op_tracker, uint64_t tag_tid,
214 JournalMetadata::Tag *tag, Context *on_finish)
215 : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
216 tag_tid(tag_tid), tag(tag), on_finish(on_finish) {
217 async_op_tracker.start_op();
218 }
219 ~C_GetTag() override {
220 async_op_tracker.finish_op();
221 }
222
223 void send() {
224 send_get_tag();
225 }
226
227 void send_get_tag() {
228 librados::ObjectReadOperation op;
229 client::get_tag_start(&op, tag_tid);
230
231 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
9f95a23c 232 this, &utils::rados_state_callback<
7c673cae
FG
233 C_GetTag, &C_GetTag::handle_get_tag>);
234
235 int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
11fdf7f2 236 ceph_assert(r == 0);
7c673cae
FG
237 comp->release();
238 }
239
240 void handle_get_tag(int r) {
241 if (r == 0) {
11fdf7f2 242 auto iter = out_bl.cbegin();
7c673cae
FG
243 r = client::get_tag_finish(&iter, tag);
244 }
245 complete(r);
246 }
247
248 void finish(int r) override {
249 on_finish->complete(r);
250 }
251};
252
253struct C_GetTags : public Context {
254 CephContext *cct;
255 librados::IoCtx &ioctx;
256 const std::string &oid;
257 const std::string &client_id;
258 AsyncOpTracker &async_op_tracker;
259 uint64_t start_after_tag_tid;
260 boost::optional<uint64_t> tag_class;
261 JournalMetadata::Tags *tags;
262 Context *on_finish;
263
264 const uint64_t MAX_RETURN = 64;
265 bufferlist out_bl;
266
267 C_GetTags(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
268 const std::string &client_id, AsyncOpTracker &async_op_tracker,
269 uint64_t start_after_tag_tid,
270 const boost::optional<uint64_t> &tag_class,
271 JournalMetadata::Tags *tags, Context *on_finish)
272 : cct(cct), ioctx(ioctx), oid(oid), client_id(client_id),
273 async_op_tracker(async_op_tracker),
274 start_after_tag_tid(start_after_tag_tid), tag_class(tag_class),
275 tags(tags), on_finish(on_finish) {
276 async_op_tracker.start_op();
277 }
278 ~C_GetTags() override {
279 async_op_tracker.finish_op();
280 }
281
282 void send() {
283 send_tag_list();
284 }
285
286 void send_tag_list() {
287 librados::ObjectReadOperation op;
288 client::tag_list_start(&op, start_after_tag_tid, MAX_RETURN, client_id,
289 tag_class);
290
291 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
9f95a23c 292 this, &utils::rados_state_callback<
7c673cae
FG
293 C_GetTags, &C_GetTags::handle_tag_list>);
294
295 out_bl.clear();
296 int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
11fdf7f2 297 ceph_assert(r == 0);
7c673cae
FG
298 comp->release();
299 }
300
301 void handle_tag_list(int r) {
302 if (r == 0) {
303 std::set<cls::journal::Tag> journal_tags;
11fdf7f2 304 auto iter = out_bl.cbegin();
7c673cae
FG
305 r = client::tag_list_finish(&iter, &journal_tags);
306 if (r == 0) {
307 for (auto &journal_tag : journal_tags) {
308 tags->push_back(journal_tag);
309 start_after_tag_tid = journal_tag.tid;
310 }
311
312 if (journal_tags.size() == MAX_RETURN) {
313 send_tag_list();
314 return;
315 }
316 }
317 }
318 complete(r);
319 }
320
321 void finish(int r) override {
322 on_finish->complete(r);
323 }
324};
325
326struct C_FlushCommitPosition : public Context {
327 Context *commit_position_ctx;
328 Context *on_finish;
329
330 C_FlushCommitPosition(Context *commit_position_ctx, Context *on_finish)
331 : commit_position_ctx(commit_position_ctx), on_finish(on_finish) {
332 }
333 void finish(int r) override {
334 if (commit_position_ctx != nullptr) {
335 commit_position_ctx->complete(r);
336 }
337 on_finish->complete(r);
338 }
339};
340
341struct C_AssertActiveTag : public Context {
342 CephContext *cct;
343 librados::IoCtx &ioctx;
344 const std::string &oid;
345 AsyncOpTracker &async_op_tracker;
346 std::string client_id;
347 uint64_t tag_tid;
348 Context *on_finish;
349
350 bufferlist out_bl;
351
352 C_AssertActiveTag(CephContext *cct, librados::IoCtx &ioctx,
353 const std::string &oid, AsyncOpTracker &async_op_tracker,
354 const std::string &client_id, uint64_t tag_tid,
355 Context *on_finish)
356 : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
357 client_id(client_id), tag_tid(tag_tid), on_finish(on_finish) {
358 async_op_tracker.start_op();
359 }
360 ~C_AssertActiveTag() override {
361 async_op_tracker.finish_op();
362 }
363
364 void send() {
365 ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << dendl;
366
367 librados::ObjectReadOperation op;
368 client::tag_list_start(&op, tag_tid, 2, client_id, boost::none);
369
370 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
9f95a23c 371 this, &utils::rados_state_callback<
7c673cae
FG
372 C_AssertActiveTag, &C_AssertActiveTag::handle_send>);
373
374 int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
11fdf7f2 375 ceph_assert(r == 0);
7c673cae
FG
376 comp->release();
377 }
378
379 void handle_send(int r) {
380 ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << ": r=" << r << dendl;
381
382 std::set<cls::journal::Tag> tags;
383 if (r == 0) {
11fdf7f2 384 auto it = out_bl.cbegin();
7c673cae
FG
385 r = client::tag_list_finish(&it, &tags);
386 }
387
388 // NOTE: since 0 is treated as an uninitialized list filter, we need to
389 // load to entries and look at the last tid
390 if (r == 0 && !tags.empty() && tags.rbegin()->tid > tag_tid) {
391 r = -ESTALE;
392 }
393 complete(r);
394 }
395
396 void finish(int r) override {
397 on_finish->complete(r);
398 }
399};
400
401} // anonymous namespace
402
403JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
9f95a23c 404 ceph::mutex *timer_lock, librados::IoCtx &ioctx,
7c673cae
FG
405 const std::string &oid,
406 const std::string &client_id,
407 const Settings &settings)
9f95a23c
TL
408 : m_oid(oid),
409 m_client_id(client_id), m_settings(settings),
7c673cae 410 m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
9f95a23c
TL
411 m_watch_ctx(this)
412{
7c673cae
FG
413 m_ioctx.dup(ioctx);
414 m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
415}
416
417JournalMetadata::~JournalMetadata() {
9f95a23c 418 std::lock_guard locker{m_lock};
11fdf7f2 419 ceph_assert(!m_initialized);
7c673cae
FG
420}
421
422void JournalMetadata::init(Context *on_finish) {
423 {
9f95a23c 424 std::lock_guard locker{m_lock};
11fdf7f2 425 ceph_assert(!m_initialized);
7c673cae
FG
426 m_initialized = true;
427 }
428
429 // chain the init sequence (reverse order)
430 on_finish = utils::create_async_context_callback(
431 this, on_finish);
432 on_finish = new C_ImmutableMetadata(this, on_finish);
9f95a23c 433 on_finish = new LambdaContext([this, on_finish](int r) {
7c673cae
FG
434 if (r < 0) {
435 lderr(m_cct) << __func__ << ": failed to watch journal"
436 << cpp_strerror(r) << dendl;
9f95a23c 437 std::lock_guard locker{m_lock};
7c673cae
FG
438 m_watch_handle = 0;
439 on_finish->complete(r);
440 return;
441 }
442
443 get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, on_finish);
444 });
445
446 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
9f95a23c 447 on_finish, utils::rados_ctx_callback);
7c673cae 448 int r = m_ioctx.aio_watch(m_oid, comp, &m_watch_handle, &m_watch_ctx);
11fdf7f2 449 ceph_assert(r == 0);
7c673cae
FG
450 comp->release();
451}
452
453void JournalMetadata::shut_down(Context *on_finish) {
454
455 ldout(m_cct, 20) << __func__ << dendl;
456
457 uint64_t watch_handle = 0;
458 {
9f95a23c 459 std::lock_guard locker{m_lock};
7c673cae
FG
460 m_initialized = false;
461 std::swap(watch_handle, m_watch_handle);
462 }
463
464 // chain the shut down sequence (reverse order)
465 on_finish = utils::create_async_context_callback(
466 this, on_finish);
9f95a23c 467 on_finish = new LambdaContext([this, on_finish](int r) {
7c673cae
FG
468 ldout(m_cct, 20) << "shut_down: waiting for ops" << dendl;
469 m_async_op_tracker.wait_for_ops(on_finish);
470 });
9f95a23c 471 on_finish = new LambdaContext([this, on_finish](int r) {
7c673cae
FG
472 ldout(m_cct, 20) << "shut_down: flushing watch" << dendl;
473 librados::Rados rados(m_ioctx);
474 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
9f95a23c 475 on_finish, utils::rados_ctx_callback);
7c673cae 476 r = rados.aio_watch_flush(comp);
11fdf7f2 477 ceph_assert(r == 0);
7c673cae
FG
478 comp->release();
479 });
9f95a23c 480 on_finish = new LambdaContext([this, on_finish](int r) {
7c673cae
FG
481 flush_commit_position(on_finish);
482 });
483 if (watch_handle != 0) {
484 librados::AioCompletion *comp = librados::Rados::aio_create_completion(
9f95a23c 485 on_finish, utils::rados_ctx_callback);
7c673cae 486 int r = m_ioctx.aio_unwatch(watch_handle, comp);
11fdf7f2 487 ceph_assert(r == 0);
7c673cae
FG
488 comp->release();
489 } else {
490 on_finish->complete(0);
491 }
492}
493
494void JournalMetadata::get_immutable_metadata(uint8_t *order,
495 uint8_t *splay_width,
496 int64_t *pool_id,
497 Context *on_finish) {
498 client::get_immutable_metadata(m_ioctx, m_oid, order, splay_width, pool_id,
499 on_finish);
500}
501
502void JournalMetadata::get_mutable_metadata(uint64_t *minimum_set,
503 uint64_t *active_set,
504 RegisteredClients *clients,
505 Context *on_finish) {
506 client::get_mutable_metadata(m_ioctx, m_oid, minimum_set, active_set, clients,
507 on_finish);
508}
509
510void JournalMetadata::register_client(const bufferlist &data,
511 Context *on_finish) {
512 ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
513 librados::ObjectWriteOperation op;
514 client::client_register(&op, m_client_id, data);
515
516 C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
517
518 librados::AioCompletion *comp =
9f95a23c 519 librados::Rados::aio_create_completion(ctx,
7c673cae
FG
520 utils::rados_ctx_callback);
521 int r = m_ioctx.aio_operate(m_oid, comp, &op);
11fdf7f2 522 ceph_assert(r == 0);
7c673cae
FG
523 comp->release();
524}
525
526void JournalMetadata::update_client(const bufferlist &data,
527 Context *on_finish) {
528 ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
529 librados::ObjectWriteOperation op;
530 client::client_update_data(&op, m_client_id, data);
531
532 C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
533
534 librados::AioCompletion *comp =
9f95a23c 535 librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback);
7c673cae 536 int r = m_ioctx.aio_operate(m_oid, comp, &op);
11fdf7f2 537 ceph_assert(r == 0);
7c673cae
FG
538 comp->release();
539}
540
541void JournalMetadata::unregister_client(Context *on_finish) {
11fdf7f2 542 ceph_assert(!m_client_id.empty());
7c673cae
FG
543
544 ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
545 librados::ObjectWriteOperation op;
546 client::client_unregister(&op, m_client_id);
547
548 C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
549
550 librados::AioCompletion *comp =
9f95a23c 551 librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback);
7c673cae 552 int r = m_ioctx.aio_operate(m_oid, comp, &op);
11fdf7f2 553 ceph_assert(r == 0);
7c673cae
FG
554 comp->release();
555}
556
557void JournalMetadata::allocate_tag(uint64_t tag_class, const bufferlist &data,
558 Tag *tag, Context *on_finish) {
559 on_finish = new C_NotifyUpdate(this, on_finish);
560 C_AllocateTag *ctx = new C_AllocateTag(m_cct, m_ioctx, m_oid,
561 m_async_op_tracker, tag_class,
562 data, tag, on_finish);
563 ctx->send();
564}
565
566void JournalMetadata::get_client(const std::string &client_id,
567 cls::journal::Client *client,
568 Context *on_finish) {
569 C_GetClient *ctx = new C_GetClient(m_cct, m_ioctx, m_oid, m_async_op_tracker,
570 client_id, client, on_finish);
571 ctx->send();
572}
573
574void JournalMetadata::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) {
575 C_GetTag *ctx = new C_GetTag(m_cct, m_ioctx, m_oid, m_async_op_tracker,
576 tag_tid, tag, on_finish);
577 ctx->send();
578}
579
580void JournalMetadata::get_tags(uint64_t start_after_tag_tid,
581 const boost::optional<uint64_t> &tag_class,
582 Tags *tags, Context *on_finish) {
583 C_GetTags *ctx = new C_GetTags(m_cct, m_ioctx, m_oid, m_client_id,
584 m_async_op_tracker, start_after_tag_tid,
585 tag_class, tags, on_finish);
586 ctx->send();
587}
588
589void JournalMetadata::add_listener(JournalMetadataListener *listener) {
9f95a23c
TL
590 std::unique_lock locker{m_lock};
591 m_update_cond.wait(locker, [this] {
592 return m_update_notifications <= 0;
593 });
7c673cae
FG
594 m_listeners.push_back(listener);
595}
596
597void JournalMetadata::remove_listener(JournalMetadataListener *listener) {
9f95a23c
TL
598 std::unique_lock locker{m_lock};
599 m_update_cond.wait(locker, [this] {
600 return m_update_notifications <= 0;
601 });
7c673cae
FG
602 m_listeners.remove(listener);
603}
604
605void JournalMetadata::set_minimum_set(uint64_t object_set) {
9f95a23c 606 std::lock_guard locker{m_lock};
7c673cae
FG
607
608 ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set
609 << ", new=" << object_set << dendl;
610 if (m_minimum_set >= object_set) {
611 return;
612 }
613
614 librados::ObjectWriteOperation op;
615 client::set_minimum_set(&op, object_set);
616
617 C_NotifyUpdate *ctx = new C_NotifyUpdate(this);
618 librados::AioCompletion *comp =
9f95a23c 619 librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback);
7c673cae 620 int r = m_ioctx.aio_operate(m_oid, comp, &op);
11fdf7f2 621 ceph_assert(r == 0);
7c673cae
FG
622 comp->release();
623
624 m_minimum_set = object_set;
625}
626
627int JournalMetadata::set_active_set(uint64_t object_set) {
628 C_SaferCond ctx;
629 set_active_set(object_set, &ctx);
630 return ctx.wait();
631}
632
633void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) {
9f95a23c 634 std::lock_guard locker{m_lock};
7c673cae
FG
635
636 ldout(m_cct, 20) << __func__ << ": current=" << m_active_set
637 << ", new=" << object_set << dendl;
638 if (m_active_set >= object_set) {
639 m_work_queue->queue(on_finish, 0);
640 return;
641 }
642
643 librados::ObjectWriteOperation op;
644 client::set_active_set(&op, object_set);
645
646 C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
647 librados::AioCompletion *comp =
9f95a23c 648 librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback);
7c673cae 649 int r = m_ioctx.aio_operate(m_oid, comp, &op);
11fdf7f2 650 ceph_assert(r == 0);
7c673cae
FG
651 comp->release();
652
653 m_active_set = object_set;
654}
655
656void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) {
9f95a23c 657 std::lock_guard locker{m_lock};
7c673cae
FG
658
659 C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid,
660 m_async_op_tracker,
661 m_client_id, tag_tid,
662 on_finish);
663 ctx->send();
664}
665
666void JournalMetadata::flush_commit_position() {
667 ldout(m_cct, 20) << __func__ << dendl;
668
94b18763
FG
669 C_SaferCond ctx;
670 flush_commit_position(&ctx);
671 ctx.wait();
7c673cae
FG
672}
673
674void JournalMetadata::flush_commit_position(Context *on_safe) {
675 ldout(m_cct, 20) << __func__ << dendl;
676
9f95a23c 677 std::scoped_lock locker{*m_timer_lock, m_lock};
94b18763 678 if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) {
7c673cae
FG
679 // nothing to flush
680 if (on_safe != nullptr) {
681 m_work_queue->queue(on_safe, 0);
682 }
683 return;
684 }
685
686 if (on_safe != nullptr) {
94b18763
FG
687 m_flush_commit_position_ctxs.push_back(on_safe);
688 }
689 if (m_commit_position_ctx == nullptr) {
690 return;
7c673cae 691 }
94b18763 692
7c673cae
FG
693 cancel_commit_task();
694 handle_commit_position_task();
695}
696
697void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) {
9f95a23c 698 std::lock_guard locker{m_lock};
7c673cae
FG
699 uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid];
700 if (allocated_entry_tid <= entry_tid) {
701 allocated_entry_tid = entry_tid + 1;
702 }
703}
704
705bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid,
706 uint64_t *entry_tid) const {
9f95a23c 707 std::lock_guard locker{m_lock};
7c673cae
FG
708
709 AllocatedEntryTids::const_iterator it = m_allocated_entry_tids.find(tag_tid);
710 if (it == m_allocated_entry_tids.end()) {
711 return false;
712 }
713
11fdf7f2 714 ceph_assert(it->second > 0);
7c673cae
FG
715 *entry_tid = it->second - 1;
716 return true;
717}
718
719void JournalMetadata::handle_immutable_metadata(int r, Context *on_init) {
720 if (r < 0) {
721 lderr(m_cct) << "failed to initialize immutable metadata: "
722 << cpp_strerror(r) << dendl;
723 on_init->complete(r);
724 return;
725 }
726
727 ldout(m_cct, 10) << "initialized immutable metadata" << dendl;
728 refresh(on_init);
729}
730
731void JournalMetadata::refresh(Context *on_complete) {
732 ldout(m_cct, 10) << "refreshing mutable metadata" << dendl;
94b18763
FG
733
734 {
9f95a23c 735 std::lock_guard locker{m_lock};
94b18763
FG
736 if (on_complete != nullptr) {
737 m_refresh_ctxs.push_back(on_complete);
738 }
739 ++m_refreshes_in_progress;
740 }
741
742 auto refresh = new C_Refresh(this);
7c673cae
FG
743 get_mutable_metadata(&refresh->minimum_set, &refresh->active_set,
744 &refresh->registered_clients, refresh);
745}
746
747void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
748 ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl;
7c673cae 749
9f95a23c 750 m_lock.lock();
94b18763 751 if (r == 0) {
7c673cae
FG
752 Client client(m_client_id, bufferlist());
753 RegisteredClients::iterator it = refresh->registered_clients.find(client);
754 if (it != refresh->registered_clients.end()) {
755 if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) {
756 ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id
757 << dendl;
758 }
11fdf7f2
TL
759 m_minimum_set = std::max(m_minimum_set, refresh->minimum_set);
760 m_active_set = std::max(m_active_set, refresh->active_set);
7c673cae
FG
761 m_registered_clients = refresh->registered_clients;
762 m_client = *it;
763
764 ++m_update_notifications;
9f95a23c 765 m_lock.unlock();
7c673cae
FG
766 for (Listeners::iterator it = m_listeners.begin();
767 it != m_listeners.end(); ++it) {
768 (*it)->handle_update(this);
769 }
9f95a23c 770 m_lock.lock();
7c673cae 771 if (--m_update_notifications == 0) {
9f95a23c 772 m_update_cond.notify_all();
7c673cae
FG
773 }
774 } else {
775 lderr(m_cct) << "failed to locate client: " << m_client_id << dendl;
776 r = -ENOENT;
777 }
778 }
779
94b18763 780 Contexts refresh_ctxs;
11fdf7f2 781 ceph_assert(m_refreshes_in_progress > 0);
94b18763
FG
782 --m_refreshes_in_progress;
783 if (m_refreshes_in_progress == 0) {
784 std::swap(refresh_ctxs, m_refresh_ctxs);
785 }
9f95a23c 786 m_lock.unlock();
94b18763
FG
787
788 for (auto ctx : refresh_ctxs) {
789 ctx->complete(r);
7c673cae
FG
790 }
791}
792
793void JournalMetadata::cancel_commit_task() {
794 ldout(m_cct, 20) << __func__ << dendl;
795
9f95a23c
TL
796 ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
797 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
798 ceph_assert(m_commit_position_ctx != nullptr);
799 ceph_assert(m_commit_position_task_ctx != nullptr);
7c673cae
FG
800 m_timer->cancel_event(m_commit_position_task_ctx);
801 m_commit_position_task_ctx = NULL;
802}
803
804void JournalMetadata::schedule_commit_task() {
805 ldout(m_cct, 20) << __func__ << dendl;
806
9f95a23c
TL
807 ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
808 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2 809 ceph_assert(m_commit_position_ctx != nullptr);
94b18763 810 if (m_commit_position_task_ctx == nullptr) {
3efd9988
FG
811 m_commit_position_task_ctx =
812 m_timer->add_event_after(m_settings.commit_interval,
813 new C_CommitPositionTask(this));
7c673cae
FG
814 }
815}
816
817void JournalMetadata::handle_commit_position_task() {
9f95a23c
TL
818 ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
819 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
820 ldout(m_cct, 20) << __func__ << ": "
821 << "client_id=" << m_client_id << ", "
822 << "commit_position=" << m_commit_position << dendl;
823
94b18763
FG
824 m_commit_position_task_ctx = nullptr;
825 Context* commit_position_ctx = nullptr;
826 std::swap(commit_position_ctx, m_commit_position_ctx);
7c673cae 827
94b18763
FG
828 m_async_op_tracker.start_op();
829 ++m_flush_commits_in_progress;
7c673cae 830
9f95a23c 831 Context* ctx = new LambdaContext([this, commit_position_ctx](int r) {
94b18763 832 Contexts flush_commit_position_ctxs;
9f95a23c 833 m_lock.lock();
11fdf7f2 834 ceph_assert(m_flush_commits_in_progress > 0);
94b18763
FG
835 --m_flush_commits_in_progress;
836 if (m_flush_commits_in_progress == 0) {
837 std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs);
838 }
9f95a23c 839 m_lock.unlock();
7c673cae 840
94b18763
FG
841 commit_position_ctx->complete(0);
842 for (auto ctx : flush_commit_position_ctxs) {
843 ctx->complete(0);
844 }
845 m_async_op_tracker.finish_op();
846 });
847 ctx = new C_NotifyUpdate(this, ctx);
9f95a23c 848 ctx = new LambdaContext([this, ctx](int r) {
94b18763
FG
849 // manually kick of a refresh in case the notification is missed
850 // and ignore the next notification that we are about to send
9f95a23c 851 m_lock.lock();
94b18763 852 ++m_ignore_watch_notifies;
9f95a23c 853 m_lock.unlock();
94b18763
FG
854
855 refresh(ctx);
856 });
9f95a23c 857 ctx = new LambdaContext([this, ctx](int r) {
94b18763
FG
858 schedule_laggy_clients_disconnect(ctx);
859 });
860
861 librados::ObjectWriteOperation op;
862 client::client_commit(&op, m_client_id, m_commit_position);
863
9f95a23c 864 auto comp = librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback);
7c673cae 865 int r = m_ioctx.aio_operate(m_oid, comp, &op);
11fdf7f2 866 ceph_assert(r == 0);
7c673cae 867 comp->release();
7c673cae
FG
868}
869
870void JournalMetadata::schedule_watch_reset() {
9f95a23c 871 ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
7c673cae
FG
872 m_timer->add_event_after(1, new C_WatchReset(this));
873}
874
875void JournalMetadata::handle_watch_reset() {
9f95a23c 876 ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
7c673cae
FG
877 if (!m_initialized) {
878 return;
879 }
880
881 int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
882 if (r < 0) {
883 if (r == -ENOENT) {
884 ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl;
885 } else if (r == -EBLACKLISTED) {
886 ldout(m_cct, 5) << __func__ << ": client blacklisted" << dendl;
887 } else {
888 lderr(m_cct) << __func__ << ": failed to watch journal: "
889 << cpp_strerror(r) << dendl;
890 }
891 schedule_watch_reset();
892 } else {
893 ldout(m_cct, 10) << __func__ << ": reset journal watch" << dendl;
894 refresh(NULL);
895 }
896}
897
898void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) {
899 ldout(m_cct, 10) << "journal header updated" << dendl;
900
901 bufferlist bl;
902 m_ioctx.notify_ack(m_oid, notify_id, cookie, bl);
903
94b18763 904 {
9f95a23c 905 std::lock_guard locker{m_lock};
94b18763
FG
906 if (m_ignore_watch_notifies > 0) {
907 --m_ignore_watch_notifies;
908 return;
909 }
910 }
911
7c673cae
FG
912 refresh(NULL);
913}
914
915void JournalMetadata::handle_watch_error(int err) {
916 if (err == -ENOTCONN) {
917 ldout(m_cct, 5) << "journal watch error: header removed" << dendl;
918 } else if (err == -EBLACKLISTED) {
919 lderr(m_cct) << "journal watch error: client blacklisted" << dendl;
920 } else {
921 lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
922 }
923
9f95a23c 924 std::scoped_lock locker{*m_timer_lock, m_lock};
7c673cae
FG
925
926 // release old watch on error
927 if (m_watch_handle != 0) {
928 m_ioctx.unwatch2(m_watch_handle);
929 m_watch_handle = 0;
930 }
931
932 if (m_initialized && err != -ENOENT) {
933 schedule_watch_reset();
934 }
935}
936
937uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num,
938 uint64_t tag_tid,
939 uint64_t entry_tid) {
9f95a23c 940 std::lock_guard locker{m_lock};
7c673cae
FG
941 uint64_t commit_tid = ++m_commit_tid;
942 m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag_tid,
943 entry_tid);
944
945 ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " ["
946 << "object_num=" << object_num << ", "
947 << "tag_tid=" << tag_tid << ", "
948 << "entry_tid=" << entry_tid << "]"
949 << dendl;
950 return commit_tid;
951}
952
953void JournalMetadata::overflow_commit_tid(uint64_t commit_tid,
954 uint64_t object_num) {
9f95a23c 955 std::lock_guard locker{m_lock};
7c673cae
FG
956
957 auto it = m_pending_commit_tids.find(commit_tid);
11fdf7f2
TL
958 ceph_assert(it != m_pending_commit_tids.end());
959 ceph_assert(it->second.object_num < object_num);
7c673cae
FG
960
961 ldout(m_cct, 20) << __func__ << ": "
962 << "commit_tid=" << commit_tid << ", "
963 << "old_object_num=" << it->second.object_num << ", "
964 << "new_object_num=" << object_num << dendl;
965 it->second.object_num = object_num;
966}
967
968void JournalMetadata::get_commit_entry(uint64_t commit_tid,
969 uint64_t *object_num,
970 uint64_t *tag_tid, uint64_t *entry_tid) {
9f95a23c 971 std::lock_guard locker{m_lock};
7c673cae
FG
972
973 auto it = m_pending_commit_tids.find(commit_tid);
11fdf7f2 974 ceph_assert(it != m_pending_commit_tids.end());
7c673cae
FG
975
976 *object_num = it->second.object_num;
977 *tag_tid = it->second.tag_tid;
978 *entry_tid = it->second.entry_tid;
979}
980
981void JournalMetadata::committed(uint64_t commit_tid,
982 const CreateContext &create_context) {
983 ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl;
984
985 ObjectSetPosition commit_position;
986 Context *stale_ctx = nullptr;
987 {
9f95a23c 988 std::scoped_lock locker{*m_timer_lock, m_lock};
11fdf7f2 989 ceph_assert(commit_tid > m_commit_position_tid);
7c673cae
FG
990
991 if (!m_commit_position.object_positions.empty()) {
992 // in-flight commit position update
993 commit_position = m_commit_position;
994 } else {
995 // safe commit position
996 commit_position = m_client.commit_position;
997 }
998
999 CommitTids::iterator it = m_pending_commit_tids.find(commit_tid);
11fdf7f2 1000 ceph_assert(it != m_pending_commit_tids.end());
7c673cae
FG
1001
1002 CommitEntry &commit_entry = it->second;
1003 commit_entry.committed = true;
1004
1005 bool update_commit_position = false;
1006 while (!m_pending_commit_tids.empty()) {
1007 CommitTids::iterator it = m_pending_commit_tids.begin();
1008 CommitEntry &commit_entry = it->second;
1009 if (!commit_entry.committed) {
1010 break;
1011 }
1012
1013 commit_position.object_positions.emplace_front(
1014 commit_entry.object_num, commit_entry.tag_tid,
1015 commit_entry.entry_tid);
1016 m_pending_commit_tids.erase(it);
1017 update_commit_position = true;
1018 }
1019
1020 if (!update_commit_position) {
1021 return;
1022 }
1023
1024 // prune the position to have one position per splay offset
1025 std::set<uint8_t> in_use_splay_offsets;
1026 ObjectPositions::iterator ob_it = commit_position.object_positions.begin();
1027 while (ob_it != commit_position.object_positions.end()) {
1028 uint8_t splay_offset = ob_it->object_number % m_splay_width;
1029 if (!in_use_splay_offsets.insert(splay_offset).second) {
1030 ob_it = commit_position.object_positions.erase(ob_it);
1031 } else {
1032 ++ob_it;
1033 }
1034 }
1035
1036 stale_ctx = m_commit_position_ctx;
1037 m_commit_position_ctx = create_context();
1038 m_commit_position = commit_position;
1039 m_commit_position_tid = commit_tid;
1040
1041 ldout(m_cct, 20) << "updated commit position: " << commit_position << ", "
1042 << "on_safe=" << m_commit_position_ctx << dendl;
1043 schedule_commit_task();
1044 }
1045
1046
1047 if (stale_ctx != nullptr) {
1048 ldout(m_cct, 20) << "canceling stale commit: on_safe=" << stale_ctx
1049 << dendl;
1050 stale_ctx->complete(-ESTALE);
1051 }
1052}
1053
1054void JournalMetadata::notify_update() {
1055 ldout(m_cct, 10) << "notifying journal header update" << dendl;
1056
1057 bufferlist bl;
1058 m_ioctx.notify2(m_oid, bl, 5000, NULL);
1059}
1060
1061void JournalMetadata::async_notify_update(Context *on_safe) {
1062 ldout(m_cct, 10) << "async notifying journal header update" << dendl;
1063
1064 C_AioNotify *ctx = new C_AioNotify(this, on_safe);
1065 librados::AioCompletion *comp =
9f95a23c 1066 librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback);
7c673cae
FG
1067
1068 bufferlist bl;
1069 int r = m_ioctx.aio_notify(m_oid, comp, bl, 5000, NULL);
11fdf7f2 1070 ceph_assert(r == 0);
7c673cae
FG
1071
1072 comp->release();
1073}
1074
1075void JournalMetadata::wait_for_ops() {
1076 C_SaferCond ctx;
1077 m_async_op_tracker.wait_for_ops(&ctx);
1078 ctx.wait();
1079}
1080
1081void JournalMetadata::handle_notified(int r) {
1082 ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl;
1083}
1084
94b18763 1085void JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
7c673cae 1086 ldout(m_cct, 20) << __func__ << dendl;
7c673cae 1087 if (m_settings.max_concurrent_object_sets <= 0) {
94b18763
FG
1088 on_finish->complete(0);
1089 return;
7c673cae
FG
1090 }
1091
1092 Context *ctx = on_finish;
94b18763 1093 {
9f95a23c 1094 std::lock_guard locker{m_lock};
94b18763
FG
1095 for (auto &c : m_registered_clients) {
1096 if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
1097 c.id == m_client_id ||
1098 m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
1099 continue;
1100 }
1101 const std::string &client_id = c.id;
1102 uint64_t object_set = 0;
1103 if (!c.commit_position.object_positions.empty()) {
1104 auto &position = *(c.commit_position.object_positions.begin());
1105 object_set = position.object_number / m_splay_width;
1106 }
7c673cae 1107
94b18763
FG
1108 if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
1109 ldout(m_cct, 1) << __func__ << ": " << client_id
1110 << ": scheduling disconnect" << dendl;
7c673cae 1111
9f95a23c 1112 ctx = new LambdaContext([this, client_id, ctx](int r1) {
94b18763
FG
1113 ldout(m_cct, 10) << __func__ << ": " << client_id
1114 << ": flagging disconnected" << dendl;
7c673cae 1115
94b18763
FG
1116 librados::ObjectWriteOperation op;
1117 client::client_update_state(
1118 &op, client_id, cls::journal::CLIENT_STATE_DISCONNECTED);
7c673cae 1119
94b18763 1120 auto comp = librados::Rados::aio_create_completion(
9f95a23c 1121 ctx, utils::rados_ctx_callback);
94b18763 1122 int r = m_ioctx.aio_operate(m_oid, comp, &op);
11fdf7f2 1123 ceph_assert(r == 0);
94b18763
FG
1124 comp->release();
1125 });
1126 }
7c673cae
FG
1127 }
1128 }
1129
1130 if (ctx == on_finish) {
1131 ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl;
1132 }
94b18763 1133 ctx->complete(0);
7c673cae
FG
1134}
1135
1136std::ostream &operator<<(std::ostream &os,
1137 const JournalMetadata::RegisteredClients &clients) {
1138 os << "[";
1139 for (JournalMetadata::RegisteredClients::const_iterator c = clients.begin();
1140 c != clients.end(); ++c) {
1141 os << (c == clients.begin() ? "" : ", " ) << *c;
1142 }
1143 os << "]";
1144 return os;
1145}
1146
1147std::ostream &operator<<(std::ostream &os,
1148 const JournalMetadata &jm) {
9f95a23c 1149 std::lock_guard locker{jm.m_lock};
7c673cae
FG
1150 os << "[oid=" << jm.m_oid << ", "
1151 << "initialized=" << jm.m_initialized << ", "
1152 << "order=" << (int)jm.m_order << ", "
1153 << "splay_width=" << (int)jm.m_splay_width << ", "
1154 << "pool_id=" << jm.m_pool_id << ", "
1155 << "minimum_set=" << jm.m_minimum_set << ", "
1156 << "active_set=" << jm.m_active_set << ", "
1157 << "client_id=" << jm.m_client_id << ", "
1158 << "commit_tid=" << jm.m_commit_tid << ", "
1159 << "commit_interval=" << jm.m_settings.commit_interval << ", "
1160 << "commit_position=" << jm.m_commit_position << ", "
1161 << "registered_clients=" << jm.m_registered_clients << "]";
1162 return os;
1163}
1164
1165} // namespace journal