]>
Commit | Line | Data |
---|---|---|
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- | |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "journal/JournalMetadata.h" | |
5 | #include "journal/Utils.h" | |
6 | #include "common/errno.h" | |
7 | #include "common/Timer.h" | |
8 | #include "cls/journal/cls_journal_client.h" | |
9 | #include <functional> | |
10 | #include <set> | |
11 | ||
12 | #define dout_subsys ceph_subsys_journaler | |
13 | #undef dout_prefix | |
14 | #define dout_prefix *_dout << "JournalMetadata: " << this << " " | |
15 | ||
16 | namespace journal { | |
17 | ||
18 | using namespace cls::journal; | |
19 | ||
20 | namespace { | |
21 | ||
22 | struct C_GetClient : public Context { | |
23 | CephContext *cct; | |
24 | librados::IoCtx &ioctx; | |
25 | const std::string &oid; | |
26 | AsyncOpTracker &async_op_tracker; | |
27 | std::string client_id; | |
28 | cls::journal::Client *client; | |
29 | Context *on_finish; | |
30 | ||
31 | bufferlist out_bl; | |
32 | ||
33 | C_GetClient(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, | |
34 | AsyncOpTracker &async_op_tracker, const std::string &client_id, | |
35 | cls::journal::Client *client, Context *on_finish) | |
36 | : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), | |
37 | client_id(client_id), client(client), on_finish(on_finish) { | |
38 | async_op_tracker.start_op(); | |
39 | } | |
40 | ~C_GetClient() override { | |
41 | async_op_tracker.finish_op(); | |
42 | } | |
43 | ||
44 | virtual void send() { | |
45 | send_get_client(); | |
46 | } | |
47 | ||
48 | void send_get_client() { | |
49 | ldout(cct, 20) << "C_GetClient: " << __func__ << dendl; | |
50 | ||
51 | librados::ObjectReadOperation op; | |
52 | client::get_client_start(&op, client_id); | |
53 | ||
54 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
55 | this, nullptr, &utils::rados_state_callback< | |
56 | C_GetClient, &C_GetClient::handle_get_client>); | |
57 | ||
58 | int r = ioctx.aio_operate(oid, comp, &op, &out_bl); | |
59 | assert(r == 0); | |
60 | comp->release(); | |
61 | } | |
62 | ||
63 | void handle_get_client(int r) { | |
64 | ldout(cct, 20) << "C_GetClient: " << __func__ << ": r=" << r << dendl; | |
65 | ||
66 | if (r == 0) { | |
67 | bufferlist::iterator it = out_bl.begin(); | |
68 | r = client::get_client_finish(&it, client); | |
69 | } | |
70 | complete(r); | |
71 | } | |
72 | ||
73 | void finish(int r) override { | |
74 | on_finish->complete(r); | |
75 | } | |
76 | }; | |
77 | ||
78 | struct C_AllocateTag : public Context { | |
79 | CephContext *cct; | |
80 | librados::IoCtx &ioctx; | |
81 | const std::string &oid; | |
82 | AsyncOpTracker &async_op_tracker; | |
83 | uint64_t tag_class; | |
84 | Tag *tag; | |
85 | Context *on_finish; | |
86 | ||
87 | bufferlist out_bl; | |
88 | ||
89 | C_AllocateTag(CephContext *cct, librados::IoCtx &ioctx, | |
90 | const std::string &oid, AsyncOpTracker &async_op_tracker, | |
91 | uint64_t tag_class, const bufferlist &data, Tag *tag, | |
92 | Context *on_finish) | |
93 | : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), | |
94 | tag_class(tag_class), tag(tag), on_finish(on_finish) { | |
95 | async_op_tracker.start_op(); | |
96 | tag->data = data; | |
97 | } | |
98 | ~C_AllocateTag() override { | |
99 | async_op_tracker.finish_op(); | |
100 | } | |
101 | ||
102 | void send() { | |
103 | send_get_next_tag_tid(); | |
104 | } | |
105 | ||
106 | void send_get_next_tag_tid() { | |
107 | ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; | |
108 | ||
109 | librados::ObjectReadOperation op; | |
110 | client::get_next_tag_tid_start(&op); | |
111 | ||
112 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
113 | this, nullptr, &utils::rados_state_callback< | |
114 | C_AllocateTag, &C_AllocateTag::handle_get_next_tag_tid>); | |
115 | ||
116 | out_bl.clear(); | |
117 | int r = ioctx.aio_operate(oid, comp, &op, &out_bl); | |
118 | assert(r == 0); | |
119 | comp->release(); | |
120 | } | |
121 | ||
122 | void handle_get_next_tag_tid(int r) { | |
123 | ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; | |
124 | ||
125 | if (r == 0) { | |
126 | bufferlist::iterator iter = out_bl.begin(); | |
127 | r = client::get_next_tag_tid_finish(&iter, &tag->tid); | |
128 | } | |
129 | if (r < 0) { | |
130 | complete(r); | |
131 | return; | |
132 | } | |
133 | send_tag_create(); | |
134 | } | |
135 | ||
136 | void send_tag_create() { | |
137 | ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; | |
138 | ||
139 | librados::ObjectWriteOperation op; | |
140 | client::tag_create(&op, tag->tid, tag_class, tag->data); | |
141 | ||
142 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
143 | this, nullptr, &utils::rados_state_callback< | |
144 | C_AllocateTag, &C_AllocateTag::handle_tag_create>); | |
145 | ||
146 | int r = ioctx.aio_operate(oid, comp, &op); | |
147 | assert(r == 0); | |
148 | comp->release(); | |
149 | } | |
150 | ||
151 | void handle_tag_create(int r) { | |
152 | ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; | |
153 | ||
154 | if (r == -ESTALE) { | |
155 | send_get_next_tag_tid(); | |
156 | return; | |
157 | } else if (r < 0) { | |
158 | complete(r); | |
159 | return; | |
160 | } | |
161 | ||
162 | send_get_tag(); | |
163 | } | |
164 | ||
165 | void send_get_tag() { | |
166 | ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; | |
167 | ||
168 | librados::ObjectReadOperation op; | |
169 | client::get_tag_start(&op, tag->tid); | |
170 | ||
171 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
172 | this, nullptr, &utils::rados_state_callback< | |
173 | C_AllocateTag, &C_AllocateTag::handle_get_tag>); | |
174 | ||
175 | out_bl.clear(); | |
176 | int r = ioctx.aio_operate(oid, comp, &op, &out_bl); | |
177 | assert(r == 0); | |
178 | comp->release(); | |
179 | } | |
180 | ||
181 | void handle_get_tag(int r) { | |
182 | ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; | |
183 | ||
184 | if (r == 0) { | |
185 | bufferlist::iterator iter = out_bl.begin(); | |
186 | ||
187 | cls::journal::Tag journal_tag; | |
188 | r = client::get_tag_finish(&iter, &journal_tag); | |
189 | if (r == 0) { | |
190 | *tag = journal_tag; | |
191 | } | |
192 | } | |
193 | complete(r); | |
194 | } | |
195 | ||
196 | void finish(int r) override { | |
197 | on_finish->complete(r); | |
198 | } | |
199 | }; | |
200 | ||
201 | struct C_GetTag : public Context { | |
202 | CephContext *cct; | |
203 | librados::IoCtx &ioctx; | |
204 | const std::string &oid; | |
205 | AsyncOpTracker &async_op_tracker; | |
206 | uint64_t tag_tid; | |
207 | JournalMetadata::Tag *tag; | |
208 | Context *on_finish; | |
209 | ||
210 | bufferlist out_bl; | |
211 | ||
212 | C_GetTag(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, | |
213 | AsyncOpTracker &async_op_tracker, uint64_t tag_tid, | |
214 | JournalMetadata::Tag *tag, Context *on_finish) | |
215 | : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), | |
216 | tag_tid(tag_tid), tag(tag), on_finish(on_finish) { | |
217 | async_op_tracker.start_op(); | |
218 | } | |
219 | ~C_GetTag() override { | |
220 | async_op_tracker.finish_op(); | |
221 | } | |
222 | ||
223 | void send() { | |
224 | send_get_tag(); | |
225 | } | |
226 | ||
227 | void send_get_tag() { | |
228 | librados::ObjectReadOperation op; | |
229 | client::get_tag_start(&op, tag_tid); | |
230 | ||
231 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
232 | this, nullptr, &utils::rados_state_callback< | |
233 | C_GetTag, &C_GetTag::handle_get_tag>); | |
234 | ||
235 | int r = ioctx.aio_operate(oid, comp, &op, &out_bl); | |
236 | assert(r == 0); | |
237 | comp->release(); | |
238 | } | |
239 | ||
240 | void handle_get_tag(int r) { | |
241 | if (r == 0) { | |
242 | bufferlist::iterator iter = out_bl.begin(); | |
243 | r = client::get_tag_finish(&iter, tag); | |
244 | } | |
245 | complete(r); | |
246 | } | |
247 | ||
248 | void finish(int r) override { | |
249 | on_finish->complete(r); | |
250 | } | |
251 | }; | |
252 | ||
253 | struct C_GetTags : public Context { | |
254 | CephContext *cct; | |
255 | librados::IoCtx &ioctx; | |
256 | const std::string &oid; | |
257 | const std::string &client_id; | |
258 | AsyncOpTracker &async_op_tracker; | |
259 | uint64_t start_after_tag_tid; | |
260 | boost::optional<uint64_t> tag_class; | |
261 | JournalMetadata::Tags *tags; | |
262 | Context *on_finish; | |
263 | ||
264 | const uint64_t MAX_RETURN = 64; | |
265 | bufferlist out_bl; | |
266 | ||
267 | C_GetTags(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, | |
268 | const std::string &client_id, AsyncOpTracker &async_op_tracker, | |
269 | uint64_t start_after_tag_tid, | |
270 | const boost::optional<uint64_t> &tag_class, | |
271 | JournalMetadata::Tags *tags, Context *on_finish) | |
272 | : cct(cct), ioctx(ioctx), oid(oid), client_id(client_id), | |
273 | async_op_tracker(async_op_tracker), | |
274 | start_after_tag_tid(start_after_tag_tid), tag_class(tag_class), | |
275 | tags(tags), on_finish(on_finish) { | |
276 | async_op_tracker.start_op(); | |
277 | } | |
278 | ~C_GetTags() override { | |
279 | async_op_tracker.finish_op(); | |
280 | } | |
281 | ||
282 | void send() { | |
283 | send_tag_list(); | |
284 | } | |
285 | ||
286 | void send_tag_list() { | |
287 | librados::ObjectReadOperation op; | |
288 | client::tag_list_start(&op, start_after_tag_tid, MAX_RETURN, client_id, | |
289 | tag_class); | |
290 | ||
291 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
292 | this, nullptr, &utils::rados_state_callback< | |
293 | C_GetTags, &C_GetTags::handle_tag_list>); | |
294 | ||
295 | out_bl.clear(); | |
296 | int r = ioctx.aio_operate(oid, comp, &op, &out_bl); | |
297 | assert(r == 0); | |
298 | comp->release(); | |
299 | } | |
300 | ||
301 | void handle_tag_list(int r) { | |
302 | if (r == 0) { | |
303 | std::set<cls::journal::Tag> journal_tags; | |
304 | bufferlist::iterator iter = out_bl.begin(); | |
305 | r = client::tag_list_finish(&iter, &journal_tags); | |
306 | if (r == 0) { | |
307 | for (auto &journal_tag : journal_tags) { | |
308 | tags->push_back(journal_tag); | |
309 | start_after_tag_tid = journal_tag.tid; | |
310 | } | |
311 | ||
312 | if (journal_tags.size() == MAX_RETURN) { | |
313 | send_tag_list(); | |
314 | return; | |
315 | } | |
316 | } | |
317 | } | |
318 | complete(r); | |
319 | } | |
320 | ||
321 | void finish(int r) override { | |
322 | on_finish->complete(r); | |
323 | } | |
324 | }; | |
325 | ||
326 | struct C_FlushCommitPosition : public Context { | |
327 | Context *commit_position_ctx; | |
328 | Context *on_finish; | |
329 | ||
330 | C_FlushCommitPosition(Context *commit_position_ctx, Context *on_finish) | |
331 | : commit_position_ctx(commit_position_ctx), on_finish(on_finish) { | |
332 | } | |
333 | void finish(int r) override { | |
334 | if (commit_position_ctx != nullptr) { | |
335 | commit_position_ctx->complete(r); | |
336 | } | |
337 | on_finish->complete(r); | |
338 | } | |
339 | }; | |
340 | ||
341 | struct C_AssertActiveTag : public Context { | |
342 | CephContext *cct; | |
343 | librados::IoCtx &ioctx; | |
344 | const std::string &oid; | |
345 | AsyncOpTracker &async_op_tracker; | |
346 | std::string client_id; | |
347 | uint64_t tag_tid; | |
348 | Context *on_finish; | |
349 | ||
350 | bufferlist out_bl; | |
351 | ||
352 | C_AssertActiveTag(CephContext *cct, librados::IoCtx &ioctx, | |
353 | const std::string &oid, AsyncOpTracker &async_op_tracker, | |
354 | const std::string &client_id, uint64_t tag_tid, | |
355 | Context *on_finish) | |
356 | : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), | |
357 | client_id(client_id), tag_tid(tag_tid), on_finish(on_finish) { | |
358 | async_op_tracker.start_op(); | |
359 | } | |
360 | ~C_AssertActiveTag() override { | |
361 | async_op_tracker.finish_op(); | |
362 | } | |
363 | ||
364 | void send() { | |
365 | ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << dendl; | |
366 | ||
367 | librados::ObjectReadOperation op; | |
368 | client::tag_list_start(&op, tag_tid, 2, client_id, boost::none); | |
369 | ||
370 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
371 | this, nullptr, &utils::rados_state_callback< | |
372 | C_AssertActiveTag, &C_AssertActiveTag::handle_send>); | |
373 | ||
374 | int r = ioctx.aio_operate(oid, comp, &op, &out_bl); | |
375 | assert(r == 0); | |
376 | comp->release(); | |
377 | } | |
378 | ||
379 | void handle_send(int r) { | |
380 | ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << ": r=" << r << dendl; | |
381 | ||
382 | std::set<cls::journal::Tag> tags; | |
383 | if (r == 0) { | |
384 | bufferlist::iterator it = out_bl.begin(); | |
385 | r = client::tag_list_finish(&it, &tags); | |
386 | } | |
387 | ||
388 | // NOTE: since 0 is treated as an uninitialized list filter, we need to | |
389 | // load to entries and look at the last tid | |
390 | if (r == 0 && !tags.empty() && tags.rbegin()->tid > tag_tid) { | |
391 | r = -ESTALE; | |
392 | } | |
393 | complete(r); | |
394 | } | |
395 | ||
396 | void finish(int r) override { | |
397 | on_finish->complete(r); | |
398 | } | |
399 | }; | |
400 | ||
401 | } // anonymous namespace | |
402 | ||
403 | JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, | |
404 | Mutex *timer_lock, librados::IoCtx &ioctx, | |
405 | const std::string &oid, | |
406 | const std::string &client_id, | |
407 | const Settings &settings) | |
408 | : RefCountedObject(NULL, 0), m_cct(NULL), m_oid(oid), | |
409 | m_client_id(client_id), m_settings(settings), m_order(0), | |
410 | m_splay_width(0), m_pool_id(-1), m_initialized(false), | |
411 | m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock), | |
412 | m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this), | |
413 | m_watch_handle(0), m_minimum_set(0), m_active_set(0), | |
414 | m_update_notifications(0), m_commit_position_ctx(NULL), | |
415 | m_commit_position_task_ctx(NULL) { | |
416 | m_ioctx.dup(ioctx); | |
417 | m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); | |
418 | } | |
419 | ||
420 | JournalMetadata::~JournalMetadata() { | |
421 | Mutex::Locker locker(m_lock); | |
422 | assert(!m_initialized); | |
423 | } | |
424 | ||
425 | void JournalMetadata::init(Context *on_finish) { | |
426 | { | |
427 | Mutex::Locker locker(m_lock); | |
428 | assert(!m_initialized); | |
429 | m_initialized = true; | |
430 | } | |
431 | ||
432 | // chain the init sequence (reverse order) | |
433 | on_finish = utils::create_async_context_callback( | |
434 | this, on_finish); | |
435 | on_finish = new C_ImmutableMetadata(this, on_finish); | |
436 | on_finish = new FunctionContext([this, on_finish](int r) { | |
437 | if (r < 0) { | |
438 | lderr(m_cct) << __func__ << ": failed to watch journal" | |
439 | << cpp_strerror(r) << dendl; | |
440 | Mutex::Locker locker(m_lock); | |
441 | m_watch_handle = 0; | |
442 | on_finish->complete(r); | |
443 | return; | |
444 | } | |
445 | ||
446 | get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, on_finish); | |
447 | }); | |
448 | ||
449 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
450 | on_finish, nullptr, utils::rados_ctx_callback); | |
451 | int r = m_ioctx.aio_watch(m_oid, comp, &m_watch_handle, &m_watch_ctx); | |
452 | assert(r == 0); | |
453 | comp->release(); | |
454 | } | |
455 | ||
456 | void JournalMetadata::shut_down(Context *on_finish) { | |
457 | ||
458 | ldout(m_cct, 20) << __func__ << dendl; | |
459 | ||
460 | uint64_t watch_handle = 0; | |
461 | { | |
462 | Mutex::Locker locker(m_lock); | |
463 | m_initialized = false; | |
464 | std::swap(watch_handle, m_watch_handle); | |
465 | } | |
466 | ||
467 | // chain the shut down sequence (reverse order) | |
468 | on_finish = utils::create_async_context_callback( | |
469 | this, on_finish); | |
470 | on_finish = new FunctionContext([this, on_finish](int r) { | |
471 | ldout(m_cct, 20) << "shut_down: waiting for ops" << dendl; | |
472 | m_async_op_tracker.wait_for_ops(on_finish); | |
473 | }); | |
474 | on_finish = new FunctionContext([this, on_finish](int r) { | |
475 | ldout(m_cct, 20) << "shut_down: flushing watch" << dendl; | |
476 | librados::Rados rados(m_ioctx); | |
477 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
478 | on_finish, nullptr, utils::rados_ctx_callback); | |
479 | r = rados.aio_watch_flush(comp); | |
480 | assert(r == 0); | |
481 | comp->release(); | |
482 | }); | |
483 | on_finish = new FunctionContext([this, on_finish](int r) { | |
484 | flush_commit_position(on_finish); | |
485 | }); | |
486 | if (watch_handle != 0) { | |
487 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
488 | on_finish, nullptr, utils::rados_ctx_callback); | |
489 | int r = m_ioctx.aio_unwatch(watch_handle, comp); | |
490 | assert(r == 0); | |
491 | comp->release(); | |
492 | } else { | |
493 | on_finish->complete(0); | |
494 | } | |
495 | } | |
496 | ||
497 | void JournalMetadata::get_immutable_metadata(uint8_t *order, | |
498 | uint8_t *splay_width, | |
499 | int64_t *pool_id, | |
500 | Context *on_finish) { | |
501 | client::get_immutable_metadata(m_ioctx, m_oid, order, splay_width, pool_id, | |
502 | on_finish); | |
503 | } | |
504 | ||
505 | void JournalMetadata::get_mutable_metadata(uint64_t *minimum_set, | |
506 | uint64_t *active_set, | |
507 | RegisteredClients *clients, | |
508 | Context *on_finish) { | |
509 | client::get_mutable_metadata(m_ioctx, m_oid, minimum_set, active_set, clients, | |
510 | on_finish); | |
511 | } | |
512 | ||
513 | void JournalMetadata::register_client(const bufferlist &data, | |
514 | Context *on_finish) { | |
515 | ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; | |
516 | librados::ObjectWriteOperation op; | |
517 | client::client_register(&op, m_client_id, data); | |
518 | ||
519 | C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); | |
520 | ||
521 | librados::AioCompletion *comp = | |
522 | librados::Rados::aio_create_completion(ctx, NULL, | |
523 | utils::rados_ctx_callback); | |
524 | int r = m_ioctx.aio_operate(m_oid, comp, &op); | |
525 | assert(r == 0); | |
526 | comp->release(); | |
527 | } | |
528 | ||
529 | void JournalMetadata::update_client(const bufferlist &data, | |
530 | Context *on_finish) { | |
531 | ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; | |
532 | librados::ObjectWriteOperation op; | |
533 | client::client_update_data(&op, m_client_id, data); | |
534 | ||
535 | C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); | |
536 | ||
537 | librados::AioCompletion *comp = | |
538 | librados::Rados::aio_create_completion(ctx, NULL, | |
539 | utils::rados_ctx_callback); | |
540 | int r = m_ioctx.aio_operate(m_oid, comp, &op); | |
541 | assert(r == 0); | |
542 | comp->release(); | |
543 | } | |
544 | ||
545 | void JournalMetadata::unregister_client(Context *on_finish) { | |
546 | assert(!m_client_id.empty()); | |
547 | ||
548 | ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; | |
549 | librados::ObjectWriteOperation op; | |
550 | client::client_unregister(&op, m_client_id); | |
551 | ||
552 | C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); | |
553 | ||
554 | librados::AioCompletion *comp = | |
555 | librados::Rados::aio_create_completion(ctx, NULL, | |
556 | utils::rados_ctx_callback); | |
557 | int r = m_ioctx.aio_operate(m_oid, comp, &op); | |
558 | assert(r == 0); | |
559 | comp->release(); | |
560 | } | |
561 | ||
562 | void JournalMetadata::allocate_tag(uint64_t tag_class, const bufferlist &data, | |
563 | Tag *tag, Context *on_finish) { | |
564 | on_finish = new C_NotifyUpdate(this, on_finish); | |
565 | C_AllocateTag *ctx = new C_AllocateTag(m_cct, m_ioctx, m_oid, | |
566 | m_async_op_tracker, tag_class, | |
567 | data, tag, on_finish); | |
568 | ctx->send(); | |
569 | } | |
570 | ||
571 | void JournalMetadata::get_client(const std::string &client_id, | |
572 | cls::journal::Client *client, | |
573 | Context *on_finish) { | |
574 | C_GetClient *ctx = new C_GetClient(m_cct, m_ioctx, m_oid, m_async_op_tracker, | |
575 | client_id, client, on_finish); | |
576 | ctx->send(); | |
577 | } | |
578 | ||
579 | void JournalMetadata::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) { | |
580 | C_GetTag *ctx = new C_GetTag(m_cct, m_ioctx, m_oid, m_async_op_tracker, | |
581 | tag_tid, tag, on_finish); | |
582 | ctx->send(); | |
583 | } | |
584 | ||
585 | void JournalMetadata::get_tags(uint64_t start_after_tag_tid, | |
586 | const boost::optional<uint64_t> &tag_class, | |
587 | Tags *tags, Context *on_finish) { | |
588 | C_GetTags *ctx = new C_GetTags(m_cct, m_ioctx, m_oid, m_client_id, | |
589 | m_async_op_tracker, start_after_tag_tid, | |
590 | tag_class, tags, on_finish); | |
591 | ctx->send(); | |
592 | } | |
593 | ||
594 | void JournalMetadata::add_listener(JournalMetadataListener *listener) { | |
595 | Mutex::Locker locker(m_lock); | |
596 | while (m_update_notifications > 0) { | |
597 | m_update_cond.Wait(m_lock); | |
598 | } | |
599 | m_listeners.push_back(listener); | |
600 | } | |
601 | ||
602 | void JournalMetadata::remove_listener(JournalMetadataListener *listener) { | |
603 | Mutex::Locker locker(m_lock); | |
604 | while (m_update_notifications > 0) { | |
605 | m_update_cond.Wait(m_lock); | |
606 | } | |
607 | m_listeners.remove(listener); | |
608 | } | |
609 | ||
610 | void JournalMetadata::set_minimum_set(uint64_t object_set) { | |
611 | Mutex::Locker locker(m_lock); | |
612 | ||
613 | ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set | |
614 | << ", new=" << object_set << dendl; | |
615 | if (m_minimum_set >= object_set) { | |
616 | return; | |
617 | } | |
618 | ||
619 | librados::ObjectWriteOperation op; | |
620 | client::set_minimum_set(&op, object_set); | |
621 | ||
622 | C_NotifyUpdate *ctx = new C_NotifyUpdate(this); | |
623 | librados::AioCompletion *comp = | |
624 | librados::Rados::aio_create_completion(ctx, NULL, | |
625 | utils::rados_ctx_callback); | |
626 | int r = m_ioctx.aio_operate(m_oid, comp, &op); | |
627 | assert(r == 0); | |
628 | comp->release(); | |
629 | ||
630 | m_minimum_set = object_set; | |
631 | } | |
632 | ||
633 | int JournalMetadata::set_active_set(uint64_t object_set) { | |
634 | C_SaferCond ctx; | |
635 | set_active_set(object_set, &ctx); | |
636 | return ctx.wait(); | |
637 | } | |
638 | ||
639 | void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) { | |
640 | Mutex::Locker locker(m_lock); | |
641 | ||
642 | ldout(m_cct, 20) << __func__ << ": current=" << m_active_set | |
643 | << ", new=" << object_set << dendl; | |
644 | if (m_active_set >= object_set) { | |
645 | m_work_queue->queue(on_finish, 0); | |
646 | return; | |
647 | } | |
648 | ||
649 | librados::ObjectWriteOperation op; | |
650 | client::set_active_set(&op, object_set); | |
651 | ||
652 | C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); | |
653 | librados::AioCompletion *comp = | |
654 | librados::Rados::aio_create_completion(ctx, NULL, | |
655 | utils::rados_ctx_callback); | |
656 | int r = m_ioctx.aio_operate(m_oid, comp, &op); | |
657 | assert(r == 0); | |
658 | comp->release(); | |
659 | ||
660 | m_active_set = object_set; | |
661 | } | |
662 | ||
663 | void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) { | |
664 | Mutex::Locker locker(m_lock); | |
665 | ||
666 | C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid, | |
667 | m_async_op_tracker, | |
668 | m_client_id, tag_tid, | |
669 | on_finish); | |
670 | ctx->send(); | |
671 | } | |
672 | ||
673 | void JournalMetadata::flush_commit_position() { | |
674 | ldout(m_cct, 20) << __func__ << dendl; | |
675 | ||
676 | C_SaferCond ctx; | |
677 | flush_commit_position(&ctx); | |
678 | ctx.wait(); | |
679 | } | |
680 | ||
681 | void JournalMetadata::flush_commit_position(Context *on_safe) { | |
682 | ldout(m_cct, 20) << __func__ << dendl; | |
683 | ||
684 | Mutex::Locker timer_locker(*m_timer_lock); | |
685 | Mutex::Locker locker(m_lock); | |
686 | if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) { | |
687 | // nothing to flush | |
688 | if (on_safe != nullptr) { | |
689 | m_work_queue->queue(on_safe, 0); | |
690 | } | |
691 | return; | |
692 | } | |
693 | ||
694 | if (on_safe != nullptr) { | |
695 | m_flush_commit_position_ctxs.push_back(on_safe); | |
696 | } | |
697 | if (m_commit_position_ctx == nullptr) { | |
698 | return; | |
699 | } | |
700 | ||
701 | cancel_commit_task(); | |
702 | handle_commit_position_task(); | |
703 | } | |
704 | ||
705 | void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) { | |
706 | Mutex::Locker locker(m_lock); | |
707 | uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid]; | |
708 | if (allocated_entry_tid <= entry_tid) { | |
709 | allocated_entry_tid = entry_tid + 1; | |
710 | } | |
711 | } | |
712 | ||
713 | bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid, | |
714 | uint64_t *entry_tid) const { | |
715 | Mutex::Locker locker(m_lock); | |
716 | ||
717 | AllocatedEntryTids::const_iterator it = m_allocated_entry_tids.find(tag_tid); | |
718 | if (it == m_allocated_entry_tids.end()) { | |
719 | return false; | |
720 | } | |
721 | ||
722 | assert(it->second > 0); | |
723 | *entry_tid = it->second - 1; | |
724 | return true; | |
725 | } | |
726 | ||
727 | void JournalMetadata::handle_immutable_metadata(int r, Context *on_init) { | |
728 | if (r < 0) { | |
729 | lderr(m_cct) << "failed to initialize immutable metadata: " | |
730 | << cpp_strerror(r) << dendl; | |
731 | on_init->complete(r); | |
732 | return; | |
733 | } | |
734 | ||
735 | ldout(m_cct, 10) << "initialized immutable metadata" << dendl; | |
736 | refresh(on_init); | |
737 | } | |
738 | ||
739 | void JournalMetadata::refresh(Context *on_complete) { | |
740 | ldout(m_cct, 10) << "refreshing mutable metadata" << dendl; | |
741 | ||
742 | { | |
743 | Mutex::Locker locker(m_lock); | |
744 | if (on_complete != nullptr) { | |
745 | m_refresh_ctxs.push_back(on_complete); | |
746 | } | |
747 | ++m_refreshes_in_progress; | |
748 | } | |
749 | ||
750 | auto refresh = new C_Refresh(this); | |
751 | get_mutable_metadata(&refresh->minimum_set, &refresh->active_set, | |
752 | &refresh->registered_clients, refresh); | |
753 | } | |
754 | ||
755 | void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { | |
756 | ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl; | |
757 | ||
758 | m_lock.Lock(); | |
759 | if (r == 0) { | |
760 | Client client(m_client_id, bufferlist()); | |
761 | RegisteredClients::iterator it = refresh->registered_clients.find(client); | |
762 | if (it != refresh->registered_clients.end()) { | |
763 | if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) { | |
764 | ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id | |
765 | << dendl; | |
766 | } | |
767 | m_minimum_set = MAX(m_minimum_set, refresh->minimum_set); | |
768 | m_active_set = MAX(m_active_set, refresh->active_set); | |
769 | m_registered_clients = refresh->registered_clients; | |
770 | m_client = *it; | |
771 | ||
772 | ++m_update_notifications; | |
773 | m_lock.Unlock(); | |
774 | for (Listeners::iterator it = m_listeners.begin(); | |
775 | it != m_listeners.end(); ++it) { | |
776 | (*it)->handle_update(this); | |
777 | } | |
778 | m_lock.Lock(); | |
779 | if (--m_update_notifications == 0) { | |
780 | m_update_cond.Signal(); | |
781 | } | |
782 | } else { | |
783 | lderr(m_cct) << "failed to locate client: " << m_client_id << dendl; | |
784 | r = -ENOENT; | |
785 | } | |
786 | } | |
787 | ||
788 | Contexts refresh_ctxs; | |
789 | assert(m_refreshes_in_progress > 0); | |
790 | --m_refreshes_in_progress; | |
791 | if (m_refreshes_in_progress == 0) { | |
792 | std::swap(refresh_ctxs, m_refresh_ctxs); | |
793 | } | |
794 | m_lock.Unlock(); | |
795 | ||
796 | for (auto ctx : refresh_ctxs) { | |
797 | ctx->complete(r); | |
798 | } | |
799 | } | |
800 | ||
801 | void JournalMetadata::cancel_commit_task() { | |
802 | ldout(m_cct, 20) << __func__ << dendl; | |
803 | ||
804 | assert(m_timer_lock->is_locked()); | |
805 | assert(m_lock.is_locked()); | |
806 | assert(m_commit_position_ctx != nullptr); | |
807 | assert(m_commit_position_task_ctx != nullptr); | |
808 | m_timer->cancel_event(m_commit_position_task_ctx); | |
809 | m_commit_position_task_ctx = NULL; | |
810 | } | |
811 | ||
812 | void JournalMetadata::schedule_commit_task() { | |
813 | ldout(m_cct, 20) << __func__ << dendl; | |
814 | ||
815 | assert(m_timer_lock->is_locked()); | |
816 | assert(m_lock.is_locked()); | |
817 | assert(m_commit_position_ctx != nullptr); | |
818 | if (m_commit_position_task_ctx == nullptr) { | |
819 | m_commit_position_task_ctx = | |
820 | m_timer->add_event_after(m_settings.commit_interval, | |
821 | new C_CommitPositionTask(this)); | |
822 | } | |
823 | } | |
824 | ||
825 | void JournalMetadata::handle_commit_position_task() { | |
826 | assert(m_timer_lock->is_locked()); | |
827 | assert(m_lock.is_locked()); | |
828 | ldout(m_cct, 20) << __func__ << ": " | |
829 | << "client_id=" << m_client_id << ", " | |
830 | << "commit_position=" << m_commit_position << dendl; | |
831 | ||
832 | m_commit_position_task_ctx = nullptr; | |
833 | Context* commit_position_ctx = nullptr; | |
834 | std::swap(commit_position_ctx, m_commit_position_ctx); | |
835 | ||
836 | m_async_op_tracker.start_op(); | |
837 | ++m_flush_commits_in_progress; | |
838 | ||
839 | Context* ctx = new FunctionContext([this, commit_position_ctx](int r) { | |
840 | Contexts flush_commit_position_ctxs; | |
841 | m_lock.Lock(); | |
842 | assert(m_flush_commits_in_progress > 0); | |
843 | --m_flush_commits_in_progress; | |
844 | if (m_flush_commits_in_progress == 0) { | |
845 | std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs); | |
846 | } | |
847 | m_lock.Unlock(); | |
848 | ||
849 | commit_position_ctx->complete(0); | |
850 | for (auto ctx : flush_commit_position_ctxs) { | |
851 | ctx->complete(0); | |
852 | } | |
853 | m_async_op_tracker.finish_op(); | |
854 | }); | |
855 | ctx = new C_NotifyUpdate(this, ctx); | |
856 | ctx = new FunctionContext([this, ctx](int r) { | |
857 | // manually kick of a refresh in case the notification is missed | |
858 | // and ignore the next notification that we are about to send | |
859 | m_lock.Lock(); | |
860 | ++m_ignore_watch_notifies; | |
861 | m_lock.Unlock(); | |
862 | ||
863 | refresh(ctx); | |
864 | }); | |
865 | ctx = new FunctionContext([this, ctx](int r) { | |
866 | schedule_laggy_clients_disconnect(ctx); | |
867 | }); | |
868 | ||
869 | librados::ObjectWriteOperation op; | |
870 | client::client_commit(&op, m_client_id, m_commit_position); | |
871 | ||
872 | auto comp = librados::Rados::aio_create_completion(ctx, nullptr, | |
873 | utils::rados_ctx_callback); | |
874 | int r = m_ioctx.aio_operate(m_oid, comp, &op); | |
875 | assert(r == 0); | |
876 | comp->release(); | |
877 | } | |
878 | ||
879 | void JournalMetadata::schedule_watch_reset() { | |
880 | assert(m_timer_lock->is_locked()); | |
881 | m_timer->add_event_after(1, new C_WatchReset(this)); | |
882 | } | |
883 | ||
884 | void JournalMetadata::handle_watch_reset() { | |
885 | assert(m_timer_lock->is_locked()); | |
886 | if (!m_initialized) { | |
887 | return; | |
888 | } | |
889 | ||
890 | int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx); | |
891 | if (r < 0) { | |
892 | if (r == -ENOENT) { | |
893 | ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl; | |
894 | } else if (r == -EBLACKLISTED) { | |
895 | ldout(m_cct, 5) << __func__ << ": client blacklisted" << dendl; | |
896 | } else { | |
897 | lderr(m_cct) << __func__ << ": failed to watch journal: " | |
898 | << cpp_strerror(r) << dendl; | |
899 | } | |
900 | schedule_watch_reset(); | |
901 | } else { | |
902 | ldout(m_cct, 10) << __func__ << ": reset journal watch" << dendl; | |
903 | refresh(NULL); | |
904 | } | |
905 | } | |
906 | ||
907 | void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) { | |
908 | ldout(m_cct, 10) << "journal header updated" << dendl; | |
909 | ||
910 | bufferlist bl; | |
911 | m_ioctx.notify_ack(m_oid, notify_id, cookie, bl); | |
912 | ||
913 | { | |
914 | Mutex::Locker locker(m_lock); | |
915 | if (m_ignore_watch_notifies > 0) { | |
916 | --m_ignore_watch_notifies; | |
917 | return; | |
918 | } | |
919 | } | |
920 | ||
921 | refresh(NULL); | |
922 | } | |
923 | ||
924 | void JournalMetadata::handle_watch_error(int err) { | |
925 | if (err == -ENOTCONN) { | |
926 | ldout(m_cct, 5) << "journal watch error: header removed" << dendl; | |
927 | } else if (err == -EBLACKLISTED) { | |
928 | lderr(m_cct) << "journal watch error: client blacklisted" << dendl; | |
929 | } else { | |
930 | lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl; | |
931 | } | |
932 | ||
933 | Mutex::Locker timer_locker(*m_timer_lock); | |
934 | Mutex::Locker locker(m_lock); | |
935 | ||
936 | // release old watch on error | |
937 | if (m_watch_handle != 0) { | |
938 | m_ioctx.unwatch2(m_watch_handle); | |
939 | m_watch_handle = 0; | |
940 | } | |
941 | ||
942 | if (m_initialized && err != -ENOENT) { | |
943 | schedule_watch_reset(); | |
944 | } | |
945 | } | |
946 | ||
947 | uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num, | |
948 | uint64_t tag_tid, | |
949 | uint64_t entry_tid) { | |
950 | Mutex::Locker locker(m_lock); | |
951 | uint64_t commit_tid = ++m_commit_tid; | |
952 | m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag_tid, | |
953 | entry_tid); | |
954 | ||
955 | ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " [" | |
956 | << "object_num=" << object_num << ", " | |
957 | << "tag_tid=" << tag_tid << ", " | |
958 | << "entry_tid=" << entry_tid << "]" | |
959 | << dendl; | |
960 | return commit_tid; | |
961 | } | |
962 | ||
963 | void JournalMetadata::overflow_commit_tid(uint64_t commit_tid, | |
964 | uint64_t object_num) { | |
965 | Mutex::Locker locker(m_lock); | |
966 | ||
967 | auto it = m_pending_commit_tids.find(commit_tid); | |
968 | assert(it != m_pending_commit_tids.end()); | |
969 | assert(it->second.object_num < object_num); | |
970 | ||
971 | ldout(m_cct, 20) << __func__ << ": " | |
972 | << "commit_tid=" << commit_tid << ", " | |
973 | << "old_object_num=" << it->second.object_num << ", " | |
974 | << "new_object_num=" << object_num << dendl; | |
975 | it->second.object_num = object_num; | |
976 | } | |
977 | ||
978 | void JournalMetadata::get_commit_entry(uint64_t commit_tid, | |
979 | uint64_t *object_num, | |
980 | uint64_t *tag_tid, uint64_t *entry_tid) { | |
981 | Mutex::Locker locker(m_lock); | |
982 | ||
983 | auto it = m_pending_commit_tids.find(commit_tid); | |
984 | assert(it != m_pending_commit_tids.end()); | |
985 | ||
986 | *object_num = it->second.object_num; | |
987 | *tag_tid = it->second.tag_tid; | |
988 | *entry_tid = it->second.entry_tid; | |
989 | } | |
990 | ||
991 | void JournalMetadata::committed(uint64_t commit_tid, | |
992 | const CreateContext &create_context) { | |
993 | ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl; | |
994 | ||
995 | ObjectSetPosition commit_position; | |
996 | Context *stale_ctx = nullptr; | |
997 | { | |
998 | Mutex::Locker timer_locker(*m_timer_lock); | |
999 | Mutex::Locker locker(m_lock); | |
1000 | assert(commit_tid > m_commit_position_tid); | |
1001 | ||
1002 | if (!m_commit_position.object_positions.empty()) { | |
1003 | // in-flight commit position update | |
1004 | commit_position = m_commit_position; | |
1005 | } else { | |
1006 | // safe commit position | |
1007 | commit_position = m_client.commit_position; | |
1008 | } | |
1009 | ||
1010 | CommitTids::iterator it = m_pending_commit_tids.find(commit_tid); | |
1011 | assert(it != m_pending_commit_tids.end()); | |
1012 | ||
1013 | CommitEntry &commit_entry = it->second; | |
1014 | commit_entry.committed = true; | |
1015 | ||
1016 | bool update_commit_position = false; | |
1017 | while (!m_pending_commit_tids.empty()) { | |
1018 | CommitTids::iterator it = m_pending_commit_tids.begin(); | |
1019 | CommitEntry &commit_entry = it->second; | |
1020 | if (!commit_entry.committed) { | |
1021 | break; | |
1022 | } | |
1023 | ||
1024 | commit_position.object_positions.emplace_front( | |
1025 | commit_entry.object_num, commit_entry.tag_tid, | |
1026 | commit_entry.entry_tid); | |
1027 | m_pending_commit_tids.erase(it); | |
1028 | update_commit_position = true; | |
1029 | } | |
1030 | ||
1031 | if (!update_commit_position) { | |
1032 | return; | |
1033 | } | |
1034 | ||
1035 | // prune the position to have one position per splay offset | |
1036 | std::set<uint8_t> in_use_splay_offsets; | |
1037 | ObjectPositions::iterator ob_it = commit_position.object_positions.begin(); | |
1038 | while (ob_it != commit_position.object_positions.end()) { | |
1039 | uint8_t splay_offset = ob_it->object_number % m_splay_width; | |
1040 | if (!in_use_splay_offsets.insert(splay_offset).second) { | |
1041 | ob_it = commit_position.object_positions.erase(ob_it); | |
1042 | } else { | |
1043 | ++ob_it; | |
1044 | } | |
1045 | } | |
1046 | ||
1047 | stale_ctx = m_commit_position_ctx; | |
1048 | m_commit_position_ctx = create_context(); | |
1049 | m_commit_position = commit_position; | |
1050 | m_commit_position_tid = commit_tid; | |
1051 | ||
1052 | ldout(m_cct, 20) << "updated commit position: " << commit_position << ", " | |
1053 | << "on_safe=" << m_commit_position_ctx << dendl; | |
1054 | schedule_commit_task(); | |
1055 | } | |
1056 | ||
1057 | ||
1058 | if (stale_ctx != nullptr) { | |
1059 | ldout(m_cct, 20) << "canceling stale commit: on_safe=" << stale_ctx | |
1060 | << dendl; | |
1061 | stale_ctx->complete(-ESTALE); | |
1062 | } | |
1063 | } | |
1064 | ||
1065 | void JournalMetadata::notify_update() { | |
1066 | ldout(m_cct, 10) << "notifying journal header update" << dendl; | |
1067 | ||
1068 | bufferlist bl; | |
1069 | m_ioctx.notify2(m_oid, bl, 5000, NULL); | |
1070 | } | |
1071 | ||
1072 | void JournalMetadata::async_notify_update(Context *on_safe) { | |
1073 | ldout(m_cct, 10) << "async notifying journal header update" << dendl; | |
1074 | ||
1075 | C_AioNotify *ctx = new C_AioNotify(this, on_safe); | |
1076 | librados::AioCompletion *comp = | |
1077 | librados::Rados::aio_create_completion(ctx, NULL, | |
1078 | utils::rados_ctx_callback); | |
1079 | ||
1080 | bufferlist bl; | |
1081 | int r = m_ioctx.aio_notify(m_oid, comp, bl, 5000, NULL); | |
1082 | assert(r == 0); | |
1083 | ||
1084 | comp->release(); | |
1085 | } | |
1086 | ||
1087 | void JournalMetadata::wait_for_ops() { | |
1088 | C_SaferCond ctx; | |
1089 | m_async_op_tracker.wait_for_ops(&ctx); | |
1090 | ctx.wait(); | |
1091 | } | |
1092 | ||
1093 | void JournalMetadata::handle_notified(int r) { | |
1094 | ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl; | |
1095 | } | |
1096 | ||
1097 | void JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) { | |
1098 | ldout(m_cct, 20) << __func__ << dendl; | |
1099 | if (m_settings.max_concurrent_object_sets <= 0) { | |
1100 | on_finish->complete(0); | |
1101 | return; | |
1102 | } | |
1103 | ||
1104 | Context *ctx = on_finish; | |
1105 | { | |
1106 | Mutex::Locker locker(m_lock); | |
1107 | for (auto &c : m_registered_clients) { | |
1108 | if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED || | |
1109 | c.id == m_client_id || | |
1110 | m_settings.whitelisted_laggy_clients.count(c.id) > 0) { | |
1111 | continue; | |
1112 | } | |
1113 | const std::string &client_id = c.id; | |
1114 | uint64_t object_set = 0; | |
1115 | if (!c.commit_position.object_positions.empty()) { | |
1116 | auto &position = *(c.commit_position.object_positions.begin()); | |
1117 | object_set = position.object_number / m_splay_width; | |
1118 | } | |
1119 | ||
1120 | if (m_active_set > object_set + m_settings.max_concurrent_object_sets) { | |
1121 | ldout(m_cct, 1) << __func__ << ": " << client_id | |
1122 | << ": scheduling disconnect" << dendl; | |
1123 | ||
1124 | ctx = new FunctionContext([this, client_id, ctx](int r1) { | |
1125 | ldout(m_cct, 10) << __func__ << ": " << client_id | |
1126 | << ": flagging disconnected" << dendl; | |
1127 | ||
1128 | librados::ObjectWriteOperation op; | |
1129 | client::client_update_state( | |
1130 | &op, client_id, cls::journal::CLIENT_STATE_DISCONNECTED); | |
1131 | ||
1132 | auto comp = librados::Rados::aio_create_completion( | |
1133 | ctx, nullptr, utils::rados_ctx_callback); | |
1134 | int r = m_ioctx.aio_operate(m_oid, comp, &op); | |
1135 | assert(r == 0); | |
1136 | comp->release(); | |
1137 | }); | |
1138 | } | |
1139 | } | |
1140 | } | |
1141 | ||
1142 | if (ctx == on_finish) { | |
1143 | ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl; | |
1144 | } | |
1145 | ctx->complete(0); | |
1146 | } | |
1147 | ||
1148 | std::ostream &operator<<(std::ostream &os, | |
1149 | const JournalMetadata::RegisteredClients &clients) { | |
1150 | os << "["; | |
1151 | for (JournalMetadata::RegisteredClients::const_iterator c = clients.begin(); | |
1152 | c != clients.end(); ++c) { | |
1153 | os << (c == clients.begin() ? "" : ", " ) << *c; | |
1154 | } | |
1155 | os << "]"; | |
1156 | return os; | |
1157 | } | |
1158 | ||
1159 | std::ostream &operator<<(std::ostream &os, | |
1160 | const JournalMetadata &jm) { | |
1161 | Mutex::Locker locker(jm.m_lock); | |
1162 | os << "[oid=" << jm.m_oid << ", " | |
1163 | << "initialized=" << jm.m_initialized << ", " | |
1164 | << "order=" << (int)jm.m_order << ", " | |
1165 | << "splay_width=" << (int)jm.m_splay_width << ", " | |
1166 | << "pool_id=" << jm.m_pool_id << ", " | |
1167 | << "minimum_set=" << jm.m_minimum_set << ", " | |
1168 | << "active_set=" << jm.m_active_set << ", " | |
1169 | << "client_id=" << jm.m_client_id << ", " | |
1170 | << "commit_tid=" << jm.m_commit_tid << ", " | |
1171 | << "commit_interval=" << jm.m_settings.commit_interval << ", " | |
1172 | << "commit_position=" << jm.m_commit_position << ", " | |
1173 | << "registered_clients=" << jm.m_registered_clients << "]"; | |
1174 | return os; | |
1175 | } | |
1176 | ||
1177 | } // namespace journal |