]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "journal/JournalMetadata.h" | |
5 | #include "journal/Utils.h" | |
6 | #include "common/errno.h" | |
7 | #include "common/Timer.h" | |
8 | #include "cls/journal/cls_journal_client.h" | |
9 | #include <functional> | |
10 | #include <set> | |
11 | ||
12 | #define dout_subsys ceph_subsys_journaler | |
13 | #undef dout_prefix | |
14 | #define dout_prefix *_dout << "JournalMetadata: " << this << " " | |
15 | ||
16 | namespace journal { | |
17 | ||
18 | using namespace cls::journal; | |
19 | ||
20 | namespace { | |
21 | ||
22 | struct C_GetClient : public Context { | |
23 | CephContext *cct; | |
24 | librados::IoCtx &ioctx; | |
25 | const std::string &oid; | |
26 | AsyncOpTracker &async_op_tracker; | |
27 | std::string client_id; | |
28 | cls::journal::Client *client; | |
29 | Context *on_finish; | |
30 | ||
31 | bufferlist out_bl; | |
32 | ||
33 | C_GetClient(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, | |
34 | AsyncOpTracker &async_op_tracker, const std::string &client_id, | |
35 | cls::journal::Client *client, Context *on_finish) | |
36 | : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), | |
37 | client_id(client_id), client(client), on_finish(on_finish) { | |
38 | async_op_tracker.start_op(); | |
39 | } | |
40 | ~C_GetClient() override { | |
41 | async_op_tracker.finish_op(); | |
42 | } | |
43 | ||
44 | virtual void send() { | |
45 | send_get_client(); | |
46 | } | |
47 | ||
48 | void send_get_client() { | |
49 | ldout(cct, 20) << "C_GetClient: " << __func__ << dendl; | |
50 | ||
51 | librados::ObjectReadOperation op; | |
52 | client::get_client_start(&op, client_id); | |
53 | ||
54 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
9f95a23c | 55 | this, &utils::rados_state_callback< |
7c673cae FG |
56 | C_GetClient, &C_GetClient::handle_get_client>); |
57 | ||
58 | int r = ioctx.aio_operate(oid, comp, &op, &out_bl); | |
11fdf7f2 | 59 | ceph_assert(r == 0); |
7c673cae FG |
60 | comp->release(); |
61 | } | |
62 | ||
63 | void handle_get_client(int r) { | |
64 | ldout(cct, 20) << "C_GetClient: " << __func__ << ": r=" << r << dendl; | |
65 | ||
66 | if (r == 0) { | |
11fdf7f2 | 67 | auto it = out_bl.cbegin(); |
7c673cae FG |
68 | r = client::get_client_finish(&it, client); |
69 | } | |
70 | complete(r); | |
71 | } | |
72 | ||
73 | void finish(int r) override { | |
74 | on_finish->complete(r); | |
75 | } | |
76 | }; | |
77 | ||
78 | struct C_AllocateTag : public Context { | |
79 | CephContext *cct; | |
80 | librados::IoCtx &ioctx; | |
81 | const std::string &oid; | |
82 | AsyncOpTracker &async_op_tracker; | |
83 | uint64_t tag_class; | |
84 | Tag *tag; | |
85 | Context *on_finish; | |
86 | ||
87 | bufferlist out_bl; | |
88 | ||
89 | C_AllocateTag(CephContext *cct, librados::IoCtx &ioctx, | |
90 | const std::string &oid, AsyncOpTracker &async_op_tracker, | |
91 | uint64_t tag_class, const bufferlist &data, Tag *tag, | |
92 | Context *on_finish) | |
93 | : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), | |
94 | tag_class(tag_class), tag(tag), on_finish(on_finish) { | |
95 | async_op_tracker.start_op(); | |
96 | tag->data = data; | |
97 | } | |
98 | ~C_AllocateTag() override { | |
99 | async_op_tracker.finish_op(); | |
100 | } | |
101 | ||
102 | void send() { | |
103 | send_get_next_tag_tid(); | |
104 | } | |
105 | ||
106 | void send_get_next_tag_tid() { | |
107 | ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; | |
108 | ||
109 | librados::ObjectReadOperation op; | |
110 | client::get_next_tag_tid_start(&op); | |
111 | ||
112 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
9f95a23c | 113 | this, &utils::rados_state_callback< |
7c673cae FG |
114 | C_AllocateTag, &C_AllocateTag::handle_get_next_tag_tid>); |
115 | ||
116 | out_bl.clear(); | |
117 | int r = ioctx.aio_operate(oid, comp, &op, &out_bl); | |
11fdf7f2 | 118 | ceph_assert(r == 0); |
7c673cae FG |
119 | comp->release(); |
120 | } | |
121 | ||
122 | void handle_get_next_tag_tid(int r) { | |
123 | ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; | |
124 | ||
125 | if (r == 0) { | |
11fdf7f2 | 126 | auto iter = out_bl.cbegin(); |
7c673cae FG |
127 | r = client::get_next_tag_tid_finish(&iter, &tag->tid); |
128 | } | |
129 | if (r < 0) { | |
130 | complete(r); | |
131 | return; | |
132 | } | |
133 | send_tag_create(); | |
134 | } | |
135 | ||
136 | void send_tag_create() { | |
137 | ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; | |
138 | ||
139 | librados::ObjectWriteOperation op; | |
140 | client::tag_create(&op, tag->tid, tag_class, tag->data); | |
141 | ||
142 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
9f95a23c | 143 | this, &utils::rados_state_callback< |
7c673cae FG |
144 | C_AllocateTag, &C_AllocateTag::handle_tag_create>); |
145 | ||
146 | int r = ioctx.aio_operate(oid, comp, &op); | |
11fdf7f2 | 147 | ceph_assert(r == 0); |
7c673cae FG |
148 | comp->release(); |
149 | } | |
150 | ||
151 | void handle_tag_create(int r) { | |
152 | ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; | |
153 | ||
154 | if (r == -ESTALE) { | |
155 | send_get_next_tag_tid(); | |
156 | return; | |
157 | } else if (r < 0) { | |
158 | complete(r); | |
159 | return; | |
160 | } | |
161 | ||
162 | send_get_tag(); | |
163 | } | |
164 | ||
165 | void send_get_tag() { | |
166 | ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; | |
167 | ||
168 | librados::ObjectReadOperation op; | |
169 | client::get_tag_start(&op, tag->tid); | |
170 | ||
171 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
9f95a23c | 172 | this, &utils::rados_state_callback< |
7c673cae FG |
173 | C_AllocateTag, &C_AllocateTag::handle_get_tag>); |
174 | ||
175 | out_bl.clear(); | |
176 | int r = ioctx.aio_operate(oid, comp, &op, &out_bl); | |
11fdf7f2 | 177 | ceph_assert(r == 0); |
7c673cae FG |
178 | comp->release(); |
179 | } | |
180 | ||
181 | void handle_get_tag(int r) { | |
182 | ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; | |
183 | ||
184 | if (r == 0) { | |
11fdf7f2 | 185 | auto iter = out_bl.cbegin(); |
7c673cae FG |
186 | |
187 | cls::journal::Tag journal_tag; | |
188 | r = client::get_tag_finish(&iter, &journal_tag); | |
189 | if (r == 0) { | |
190 | *tag = journal_tag; | |
191 | } | |
192 | } | |
193 | complete(r); | |
194 | } | |
195 | ||
196 | void finish(int r) override { | |
197 | on_finish->complete(r); | |
198 | } | |
199 | }; | |
200 | ||
201 | struct C_GetTag : public Context { | |
202 | CephContext *cct; | |
203 | librados::IoCtx &ioctx; | |
204 | const std::string &oid; | |
205 | AsyncOpTracker &async_op_tracker; | |
206 | uint64_t tag_tid; | |
207 | JournalMetadata::Tag *tag; | |
208 | Context *on_finish; | |
209 | ||
210 | bufferlist out_bl; | |
211 | ||
212 | C_GetTag(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, | |
213 | AsyncOpTracker &async_op_tracker, uint64_t tag_tid, | |
214 | JournalMetadata::Tag *tag, Context *on_finish) | |
215 | : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), | |
216 | tag_tid(tag_tid), tag(tag), on_finish(on_finish) { | |
217 | async_op_tracker.start_op(); | |
218 | } | |
219 | ~C_GetTag() override { | |
220 | async_op_tracker.finish_op(); | |
221 | } | |
222 | ||
223 | void send() { | |
224 | send_get_tag(); | |
225 | } | |
226 | ||
227 | void send_get_tag() { | |
228 | librados::ObjectReadOperation op; | |
229 | client::get_tag_start(&op, tag_tid); | |
230 | ||
231 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
9f95a23c | 232 | this, &utils::rados_state_callback< |
7c673cae FG |
233 | C_GetTag, &C_GetTag::handle_get_tag>); |
234 | ||
235 | int r = ioctx.aio_operate(oid, comp, &op, &out_bl); | |
11fdf7f2 | 236 | ceph_assert(r == 0); |
7c673cae FG |
237 | comp->release(); |
238 | } | |
239 | ||
240 | void handle_get_tag(int r) { | |
241 | if (r == 0) { | |
11fdf7f2 | 242 | auto iter = out_bl.cbegin(); |
7c673cae FG |
243 | r = client::get_tag_finish(&iter, tag); |
244 | } | |
245 | complete(r); | |
246 | } | |
247 | ||
248 | void finish(int r) override { | |
249 | on_finish->complete(r); | |
250 | } | |
251 | }; | |
252 | ||
253 | struct C_GetTags : public Context { | |
254 | CephContext *cct; | |
255 | librados::IoCtx &ioctx; | |
256 | const std::string &oid; | |
257 | const std::string &client_id; | |
258 | AsyncOpTracker &async_op_tracker; | |
259 | uint64_t start_after_tag_tid; | |
260 | boost::optional<uint64_t> tag_class; | |
261 | JournalMetadata::Tags *tags; | |
262 | Context *on_finish; | |
263 | ||
264 | const uint64_t MAX_RETURN = 64; | |
265 | bufferlist out_bl; | |
266 | ||
267 | C_GetTags(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, | |
268 | const std::string &client_id, AsyncOpTracker &async_op_tracker, | |
269 | uint64_t start_after_tag_tid, | |
270 | const boost::optional<uint64_t> &tag_class, | |
271 | JournalMetadata::Tags *tags, Context *on_finish) | |
272 | : cct(cct), ioctx(ioctx), oid(oid), client_id(client_id), | |
273 | async_op_tracker(async_op_tracker), | |
274 | start_after_tag_tid(start_after_tag_tid), tag_class(tag_class), | |
275 | tags(tags), on_finish(on_finish) { | |
276 | async_op_tracker.start_op(); | |
277 | } | |
278 | ~C_GetTags() override { | |
279 | async_op_tracker.finish_op(); | |
280 | } | |
281 | ||
282 | void send() { | |
283 | send_tag_list(); | |
284 | } | |
285 | ||
286 | void send_tag_list() { | |
287 | librados::ObjectReadOperation op; | |
288 | client::tag_list_start(&op, start_after_tag_tid, MAX_RETURN, client_id, | |
289 | tag_class); | |
290 | ||
291 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
9f95a23c | 292 | this, &utils::rados_state_callback< |
7c673cae FG |
293 | C_GetTags, &C_GetTags::handle_tag_list>); |
294 | ||
295 | out_bl.clear(); | |
296 | int r = ioctx.aio_operate(oid, comp, &op, &out_bl); | |
11fdf7f2 | 297 | ceph_assert(r == 0); |
7c673cae FG |
298 | comp->release(); |
299 | } | |
300 | ||
301 | void handle_tag_list(int r) { | |
302 | if (r == 0) { | |
303 | std::set<cls::journal::Tag> journal_tags; | |
11fdf7f2 | 304 | auto iter = out_bl.cbegin(); |
7c673cae FG |
305 | r = client::tag_list_finish(&iter, &journal_tags); |
306 | if (r == 0) { | |
307 | for (auto &journal_tag : journal_tags) { | |
308 | tags->push_back(journal_tag); | |
309 | start_after_tag_tid = journal_tag.tid; | |
310 | } | |
311 | ||
312 | if (journal_tags.size() == MAX_RETURN) { | |
313 | send_tag_list(); | |
314 | return; | |
315 | } | |
316 | } | |
317 | } | |
318 | complete(r); | |
319 | } | |
320 | ||
321 | void finish(int r) override { | |
322 | on_finish->complete(r); | |
323 | } | |
324 | }; | |
325 | ||
326 | struct C_FlushCommitPosition : public Context { | |
327 | Context *commit_position_ctx; | |
328 | Context *on_finish; | |
329 | ||
330 | C_FlushCommitPosition(Context *commit_position_ctx, Context *on_finish) | |
331 | : commit_position_ctx(commit_position_ctx), on_finish(on_finish) { | |
332 | } | |
333 | void finish(int r) override { | |
334 | if (commit_position_ctx != nullptr) { | |
335 | commit_position_ctx->complete(r); | |
336 | } | |
337 | on_finish->complete(r); | |
338 | } | |
339 | }; | |
340 | ||
341 | struct C_AssertActiveTag : public Context { | |
342 | CephContext *cct; | |
343 | librados::IoCtx &ioctx; | |
344 | const std::string &oid; | |
345 | AsyncOpTracker &async_op_tracker; | |
346 | std::string client_id; | |
347 | uint64_t tag_tid; | |
348 | Context *on_finish; | |
349 | ||
350 | bufferlist out_bl; | |
351 | ||
352 | C_AssertActiveTag(CephContext *cct, librados::IoCtx &ioctx, | |
353 | const std::string &oid, AsyncOpTracker &async_op_tracker, | |
354 | const std::string &client_id, uint64_t tag_tid, | |
355 | Context *on_finish) | |
356 | : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), | |
357 | client_id(client_id), tag_tid(tag_tid), on_finish(on_finish) { | |
358 | async_op_tracker.start_op(); | |
359 | } | |
360 | ~C_AssertActiveTag() override { | |
361 | async_op_tracker.finish_op(); | |
362 | } | |
363 | ||
364 | void send() { | |
365 | ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << dendl; | |
366 | ||
367 | librados::ObjectReadOperation op; | |
368 | client::tag_list_start(&op, tag_tid, 2, client_id, boost::none); | |
369 | ||
370 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
9f95a23c | 371 | this, &utils::rados_state_callback< |
7c673cae FG |
372 | C_AssertActiveTag, &C_AssertActiveTag::handle_send>); |
373 | ||
374 | int r = ioctx.aio_operate(oid, comp, &op, &out_bl); | |
11fdf7f2 | 375 | ceph_assert(r == 0); |
7c673cae FG |
376 | comp->release(); |
377 | } | |
378 | ||
379 | void handle_send(int r) { | |
380 | ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << ": r=" << r << dendl; | |
381 | ||
382 | std::set<cls::journal::Tag> tags; | |
383 | if (r == 0) { | |
11fdf7f2 | 384 | auto it = out_bl.cbegin(); |
7c673cae FG |
385 | r = client::tag_list_finish(&it, &tags); |
386 | } | |
387 | ||
388 | // NOTE: since 0 is treated as an uninitialized list filter, we need to | |
389 | // load to entries and look at the last tid | |
390 | if (r == 0 && !tags.empty() && tags.rbegin()->tid > tag_tid) { | |
391 | r = -ESTALE; | |
392 | } | |
393 | complete(r); | |
394 | } | |
395 | ||
396 | void finish(int r) override { | |
397 | on_finish->complete(r); | |
398 | } | |
399 | }; | |
400 | ||
401 | } // anonymous namespace | |
402 | ||
403 | JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, | |
9f95a23c | 404 | ceph::mutex *timer_lock, librados::IoCtx &ioctx, |
7c673cae FG |
405 | const std::string &oid, |
406 | const std::string &client_id, | |
407 | const Settings &settings) | |
9f95a23c TL |
408 | : m_oid(oid), |
409 | m_client_id(client_id), m_settings(settings), | |
7c673cae | 410 | m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock), |
9f95a23c TL |
411 | m_watch_ctx(this) |
412 | { | |
7c673cae FG |
413 | m_ioctx.dup(ioctx); |
414 | m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); | |
415 | } | |
416 | ||
417 | JournalMetadata::~JournalMetadata() { | |
9f95a23c | 418 | std::lock_guard locker{m_lock}; |
11fdf7f2 | 419 | ceph_assert(!m_initialized); |
7c673cae FG |
420 | } |
421 | ||
422 | void JournalMetadata::init(Context *on_finish) { | |
423 | { | |
9f95a23c | 424 | std::lock_guard locker{m_lock}; |
11fdf7f2 | 425 | ceph_assert(!m_initialized); |
7c673cae FG |
426 | m_initialized = true; |
427 | } | |
428 | ||
429 | // chain the init sequence (reverse order) | |
430 | on_finish = utils::create_async_context_callback( | |
431 | this, on_finish); | |
432 | on_finish = new C_ImmutableMetadata(this, on_finish); | |
9f95a23c | 433 | on_finish = new LambdaContext([this, on_finish](int r) { |
7c673cae FG |
434 | if (r < 0) { |
435 | lderr(m_cct) << __func__ << ": failed to watch journal" | |
436 | << cpp_strerror(r) << dendl; | |
9f95a23c | 437 | std::lock_guard locker{m_lock}; |
7c673cae FG |
438 | m_watch_handle = 0; |
439 | on_finish->complete(r); | |
440 | return; | |
441 | } | |
442 | ||
443 | get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, on_finish); | |
444 | }); | |
445 | ||
446 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
9f95a23c | 447 | on_finish, utils::rados_ctx_callback); |
7c673cae | 448 | int r = m_ioctx.aio_watch(m_oid, comp, &m_watch_handle, &m_watch_ctx); |
11fdf7f2 | 449 | ceph_assert(r == 0); |
7c673cae FG |
450 | comp->release(); |
451 | } | |
452 | ||
453 | void JournalMetadata::shut_down(Context *on_finish) { | |
454 | ||
455 | ldout(m_cct, 20) << __func__ << dendl; | |
456 | ||
457 | uint64_t watch_handle = 0; | |
458 | { | |
9f95a23c | 459 | std::lock_guard locker{m_lock}; |
7c673cae FG |
460 | m_initialized = false; |
461 | std::swap(watch_handle, m_watch_handle); | |
462 | } | |
463 | ||
464 | // chain the shut down sequence (reverse order) | |
465 | on_finish = utils::create_async_context_callback( | |
466 | this, on_finish); | |
9f95a23c | 467 | on_finish = new LambdaContext([this, on_finish](int r) { |
7c673cae FG |
468 | ldout(m_cct, 20) << "shut_down: waiting for ops" << dendl; |
469 | m_async_op_tracker.wait_for_ops(on_finish); | |
470 | }); | |
9f95a23c | 471 | on_finish = new LambdaContext([this, on_finish](int r) { |
7c673cae FG |
472 | ldout(m_cct, 20) << "shut_down: flushing watch" << dendl; |
473 | librados::Rados rados(m_ioctx); | |
474 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
9f95a23c | 475 | on_finish, utils::rados_ctx_callback); |
7c673cae | 476 | r = rados.aio_watch_flush(comp); |
11fdf7f2 | 477 | ceph_assert(r == 0); |
7c673cae FG |
478 | comp->release(); |
479 | }); | |
9f95a23c | 480 | on_finish = new LambdaContext([this, on_finish](int r) { |
7c673cae FG |
481 | flush_commit_position(on_finish); |
482 | }); | |
483 | if (watch_handle != 0) { | |
484 | librados::AioCompletion *comp = librados::Rados::aio_create_completion( | |
9f95a23c | 485 | on_finish, utils::rados_ctx_callback); |
7c673cae | 486 | int r = m_ioctx.aio_unwatch(watch_handle, comp); |
11fdf7f2 | 487 | ceph_assert(r == 0); |
7c673cae FG |
488 | comp->release(); |
489 | } else { | |
490 | on_finish->complete(0); | |
491 | } | |
492 | } | |
493 | ||
494 | void JournalMetadata::get_immutable_metadata(uint8_t *order, | |
495 | uint8_t *splay_width, | |
496 | int64_t *pool_id, | |
497 | Context *on_finish) { | |
498 | client::get_immutable_metadata(m_ioctx, m_oid, order, splay_width, pool_id, | |
499 | on_finish); | |
500 | } | |
501 | ||
502 | void JournalMetadata::get_mutable_metadata(uint64_t *minimum_set, | |
503 | uint64_t *active_set, | |
504 | RegisteredClients *clients, | |
505 | Context *on_finish) { | |
506 | client::get_mutable_metadata(m_ioctx, m_oid, minimum_set, active_set, clients, | |
507 | on_finish); | |
508 | } | |
509 | ||
510 | void JournalMetadata::register_client(const bufferlist &data, | |
511 | Context *on_finish) { | |
512 | ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; | |
513 | librados::ObjectWriteOperation op; | |
514 | client::client_register(&op, m_client_id, data); | |
515 | ||
516 | C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); | |
517 | ||
518 | librados::AioCompletion *comp = | |
9f95a23c | 519 | librados::Rados::aio_create_completion(ctx, |
7c673cae FG |
520 | utils::rados_ctx_callback); |
521 | int r = m_ioctx.aio_operate(m_oid, comp, &op); | |
11fdf7f2 | 522 | ceph_assert(r == 0); |
7c673cae FG |
523 | comp->release(); |
524 | } | |
525 | ||
526 | void JournalMetadata::update_client(const bufferlist &data, | |
527 | Context *on_finish) { | |
528 | ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; | |
529 | librados::ObjectWriteOperation op; | |
530 | client::client_update_data(&op, m_client_id, data); | |
531 | ||
532 | C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); | |
533 | ||
534 | librados::AioCompletion *comp = | |
9f95a23c | 535 | librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); |
7c673cae | 536 | int r = m_ioctx.aio_operate(m_oid, comp, &op); |
11fdf7f2 | 537 | ceph_assert(r == 0); |
7c673cae FG |
538 | comp->release(); |
539 | } | |
540 | ||
541 | void JournalMetadata::unregister_client(Context *on_finish) { | |
11fdf7f2 | 542 | ceph_assert(!m_client_id.empty()); |
7c673cae FG |
543 | |
544 | ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; | |
545 | librados::ObjectWriteOperation op; | |
546 | client::client_unregister(&op, m_client_id); | |
547 | ||
548 | C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); | |
549 | ||
550 | librados::AioCompletion *comp = | |
9f95a23c | 551 | librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); |
7c673cae | 552 | int r = m_ioctx.aio_operate(m_oid, comp, &op); |
11fdf7f2 | 553 | ceph_assert(r == 0); |
7c673cae FG |
554 | comp->release(); |
555 | } | |
556 | ||
557 | void JournalMetadata::allocate_tag(uint64_t tag_class, const bufferlist &data, | |
558 | Tag *tag, Context *on_finish) { | |
559 | on_finish = new C_NotifyUpdate(this, on_finish); | |
560 | C_AllocateTag *ctx = new C_AllocateTag(m_cct, m_ioctx, m_oid, | |
561 | m_async_op_tracker, tag_class, | |
562 | data, tag, on_finish); | |
563 | ctx->send(); | |
564 | } | |
565 | ||
566 | void JournalMetadata::get_client(const std::string &client_id, | |
567 | cls::journal::Client *client, | |
568 | Context *on_finish) { | |
569 | C_GetClient *ctx = new C_GetClient(m_cct, m_ioctx, m_oid, m_async_op_tracker, | |
570 | client_id, client, on_finish); | |
571 | ctx->send(); | |
572 | } | |
573 | ||
574 | void JournalMetadata::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) { | |
575 | C_GetTag *ctx = new C_GetTag(m_cct, m_ioctx, m_oid, m_async_op_tracker, | |
576 | tag_tid, tag, on_finish); | |
577 | ctx->send(); | |
578 | } | |
579 | ||
580 | void JournalMetadata::get_tags(uint64_t start_after_tag_tid, | |
581 | const boost::optional<uint64_t> &tag_class, | |
582 | Tags *tags, Context *on_finish) { | |
583 | C_GetTags *ctx = new C_GetTags(m_cct, m_ioctx, m_oid, m_client_id, | |
584 | m_async_op_tracker, start_after_tag_tid, | |
585 | tag_class, tags, on_finish); | |
586 | ctx->send(); | |
587 | } | |
588 | ||
589 | void JournalMetadata::add_listener(JournalMetadataListener *listener) { | |
9f95a23c TL |
590 | std::unique_lock locker{m_lock}; |
591 | m_update_cond.wait(locker, [this] { | |
592 | return m_update_notifications <= 0; | |
593 | }); | |
7c673cae FG |
594 | m_listeners.push_back(listener); |
595 | } | |
596 | ||
597 | void JournalMetadata::remove_listener(JournalMetadataListener *listener) { | |
9f95a23c TL |
598 | std::unique_lock locker{m_lock}; |
599 | m_update_cond.wait(locker, [this] { | |
600 | return m_update_notifications <= 0; | |
601 | }); | |
7c673cae FG |
602 | m_listeners.remove(listener); |
603 | } | |
604 | ||
605 | void JournalMetadata::set_minimum_set(uint64_t object_set) { | |
9f95a23c | 606 | std::lock_guard locker{m_lock}; |
7c673cae FG |
607 | |
608 | ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set | |
609 | << ", new=" << object_set << dendl; | |
610 | if (m_minimum_set >= object_set) { | |
611 | return; | |
612 | } | |
613 | ||
614 | librados::ObjectWriteOperation op; | |
615 | client::set_minimum_set(&op, object_set); | |
616 | ||
617 | C_NotifyUpdate *ctx = new C_NotifyUpdate(this); | |
618 | librados::AioCompletion *comp = | |
9f95a23c | 619 | librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); |
7c673cae | 620 | int r = m_ioctx.aio_operate(m_oid, comp, &op); |
11fdf7f2 | 621 | ceph_assert(r == 0); |
7c673cae FG |
622 | comp->release(); |
623 | ||
624 | m_minimum_set = object_set; | |
625 | } | |
626 | ||
627 | int JournalMetadata::set_active_set(uint64_t object_set) { | |
628 | C_SaferCond ctx; | |
629 | set_active_set(object_set, &ctx); | |
630 | return ctx.wait(); | |
631 | } | |
632 | ||
633 | void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) { | |
9f95a23c | 634 | std::lock_guard locker{m_lock}; |
7c673cae FG |
635 | |
636 | ldout(m_cct, 20) << __func__ << ": current=" << m_active_set | |
637 | << ", new=" << object_set << dendl; | |
638 | if (m_active_set >= object_set) { | |
639 | m_work_queue->queue(on_finish, 0); | |
640 | return; | |
641 | } | |
642 | ||
643 | librados::ObjectWriteOperation op; | |
644 | client::set_active_set(&op, object_set); | |
645 | ||
646 | C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); | |
647 | librados::AioCompletion *comp = | |
9f95a23c | 648 | librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); |
7c673cae | 649 | int r = m_ioctx.aio_operate(m_oid, comp, &op); |
11fdf7f2 | 650 | ceph_assert(r == 0); |
7c673cae FG |
651 | comp->release(); |
652 | ||
653 | m_active_set = object_set; | |
654 | } | |
655 | ||
656 | void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) { | |
9f95a23c | 657 | std::lock_guard locker{m_lock}; |
7c673cae FG |
658 | |
659 | C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid, | |
660 | m_async_op_tracker, | |
661 | m_client_id, tag_tid, | |
662 | on_finish); | |
663 | ctx->send(); | |
664 | } | |
665 | ||
666 | void JournalMetadata::flush_commit_position() { | |
667 | ldout(m_cct, 20) << __func__ << dendl; | |
668 | ||
94b18763 FG |
669 | C_SaferCond ctx; |
670 | flush_commit_position(&ctx); | |
671 | ctx.wait(); | |
7c673cae FG |
672 | } |
673 | ||
674 | void JournalMetadata::flush_commit_position(Context *on_safe) { | |
675 | ldout(m_cct, 20) << __func__ << dendl; | |
676 | ||
9f95a23c | 677 | std::scoped_lock locker{*m_timer_lock, m_lock}; |
94b18763 | 678 | if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) { |
7c673cae FG |
679 | // nothing to flush |
680 | if (on_safe != nullptr) { | |
681 | m_work_queue->queue(on_safe, 0); | |
682 | } | |
683 | return; | |
684 | } | |
685 | ||
686 | if (on_safe != nullptr) { | |
94b18763 FG |
687 | m_flush_commit_position_ctxs.push_back(on_safe); |
688 | } | |
689 | if (m_commit_position_ctx == nullptr) { | |
690 | return; | |
7c673cae | 691 | } |
94b18763 | 692 | |
7c673cae FG |
693 | cancel_commit_task(); |
694 | handle_commit_position_task(); | |
695 | } | |
696 | ||
697 | void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) { | |
9f95a23c | 698 | std::lock_guard locker{m_lock}; |
7c673cae FG |
699 | uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid]; |
700 | if (allocated_entry_tid <= entry_tid) { | |
701 | allocated_entry_tid = entry_tid + 1; | |
702 | } | |
703 | } | |
704 | ||
705 | bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid, | |
706 | uint64_t *entry_tid) const { | |
9f95a23c | 707 | std::lock_guard locker{m_lock}; |
7c673cae FG |
708 | |
709 | AllocatedEntryTids::const_iterator it = m_allocated_entry_tids.find(tag_tid); | |
710 | if (it == m_allocated_entry_tids.end()) { | |
711 | return false; | |
712 | } | |
713 | ||
11fdf7f2 | 714 | ceph_assert(it->second > 0); |
7c673cae FG |
715 | *entry_tid = it->second - 1; |
716 | return true; | |
717 | } | |
718 | ||
719 | void JournalMetadata::handle_immutable_metadata(int r, Context *on_init) { | |
720 | if (r < 0) { | |
721 | lderr(m_cct) << "failed to initialize immutable metadata: " | |
722 | << cpp_strerror(r) << dendl; | |
723 | on_init->complete(r); | |
724 | return; | |
725 | } | |
726 | ||
727 | ldout(m_cct, 10) << "initialized immutable metadata" << dendl; | |
728 | refresh(on_init); | |
729 | } | |
730 | ||
731 | void JournalMetadata::refresh(Context *on_complete) { | |
732 | ldout(m_cct, 10) << "refreshing mutable metadata" << dendl; | |
94b18763 FG |
733 | |
734 | { | |
9f95a23c | 735 | std::lock_guard locker{m_lock}; |
94b18763 FG |
736 | if (on_complete != nullptr) { |
737 | m_refresh_ctxs.push_back(on_complete); | |
738 | } | |
739 | ++m_refreshes_in_progress; | |
740 | } | |
741 | ||
742 | auto refresh = new C_Refresh(this); | |
7c673cae FG |
743 | get_mutable_metadata(&refresh->minimum_set, &refresh->active_set, |
744 | &refresh->registered_clients, refresh); | |
745 | } | |
746 | ||
747 | void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { | |
748 | ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl; | |
7c673cae | 749 | |
9f95a23c | 750 | m_lock.lock(); |
94b18763 | 751 | if (r == 0) { |
7c673cae FG |
752 | Client client(m_client_id, bufferlist()); |
753 | RegisteredClients::iterator it = refresh->registered_clients.find(client); | |
754 | if (it != refresh->registered_clients.end()) { | |
755 | if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) { | |
756 | ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id | |
757 | << dendl; | |
758 | } | |
11fdf7f2 TL |
759 | m_minimum_set = std::max(m_minimum_set, refresh->minimum_set); |
760 | m_active_set = std::max(m_active_set, refresh->active_set); | |
7c673cae FG |
761 | m_registered_clients = refresh->registered_clients; |
762 | m_client = *it; | |
763 | ||
764 | ++m_update_notifications; | |
9f95a23c | 765 | m_lock.unlock(); |
7c673cae FG |
766 | for (Listeners::iterator it = m_listeners.begin(); |
767 | it != m_listeners.end(); ++it) { | |
768 | (*it)->handle_update(this); | |
769 | } | |
9f95a23c | 770 | m_lock.lock(); |
7c673cae | 771 | if (--m_update_notifications == 0) { |
9f95a23c | 772 | m_update_cond.notify_all(); |
7c673cae FG |
773 | } |
774 | } else { | |
775 | lderr(m_cct) << "failed to locate client: " << m_client_id << dendl; | |
776 | r = -ENOENT; | |
777 | } | |
778 | } | |
779 | ||
94b18763 | 780 | Contexts refresh_ctxs; |
11fdf7f2 | 781 | ceph_assert(m_refreshes_in_progress > 0); |
94b18763 FG |
782 | --m_refreshes_in_progress; |
783 | if (m_refreshes_in_progress == 0) { | |
784 | std::swap(refresh_ctxs, m_refresh_ctxs); | |
785 | } | |
9f95a23c | 786 | m_lock.unlock(); |
94b18763 FG |
787 | |
788 | for (auto ctx : refresh_ctxs) { | |
789 | ctx->complete(r); | |
7c673cae FG |
790 | } |
791 | } | |
792 | ||
793 | void JournalMetadata::cancel_commit_task() { | |
794 | ldout(m_cct, 20) << __func__ << dendl; | |
795 | ||
9f95a23c TL |
796 | ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); |
797 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
11fdf7f2 TL |
798 | ceph_assert(m_commit_position_ctx != nullptr); |
799 | ceph_assert(m_commit_position_task_ctx != nullptr); | |
7c673cae FG |
800 | m_timer->cancel_event(m_commit_position_task_ctx); |
801 | m_commit_position_task_ctx = NULL; | |
802 | } | |
803 | ||
804 | void JournalMetadata::schedule_commit_task() { | |
805 | ldout(m_cct, 20) << __func__ << dendl; | |
806 | ||
9f95a23c TL |
807 | ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); |
808 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
11fdf7f2 | 809 | ceph_assert(m_commit_position_ctx != nullptr); |
94b18763 | 810 | if (m_commit_position_task_ctx == nullptr) { |
3efd9988 FG |
811 | m_commit_position_task_ctx = |
812 | m_timer->add_event_after(m_settings.commit_interval, | |
813 | new C_CommitPositionTask(this)); | |
7c673cae FG |
814 | } |
815 | } | |
816 | ||
817 | void JournalMetadata::handle_commit_position_task() { | |
9f95a23c TL |
818 | ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); |
819 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
7c673cae FG |
820 | ldout(m_cct, 20) << __func__ << ": " |
821 | << "client_id=" << m_client_id << ", " | |
822 | << "commit_position=" << m_commit_position << dendl; | |
823 | ||
94b18763 FG |
824 | m_commit_position_task_ctx = nullptr; |
825 | Context* commit_position_ctx = nullptr; | |
826 | std::swap(commit_position_ctx, m_commit_position_ctx); | |
7c673cae | 827 | |
94b18763 FG |
828 | m_async_op_tracker.start_op(); |
829 | ++m_flush_commits_in_progress; | |
7c673cae | 830 | |
9f95a23c | 831 | Context* ctx = new LambdaContext([this, commit_position_ctx](int r) { |
94b18763 | 832 | Contexts flush_commit_position_ctxs; |
9f95a23c | 833 | m_lock.lock(); |
11fdf7f2 | 834 | ceph_assert(m_flush_commits_in_progress > 0); |
94b18763 FG |
835 | --m_flush_commits_in_progress; |
836 | if (m_flush_commits_in_progress == 0) { | |
837 | std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs); | |
838 | } | |
9f95a23c | 839 | m_lock.unlock(); |
7c673cae | 840 | |
94b18763 FG |
841 | commit_position_ctx->complete(0); |
842 | for (auto ctx : flush_commit_position_ctxs) { | |
843 | ctx->complete(0); | |
844 | } | |
845 | m_async_op_tracker.finish_op(); | |
846 | }); | |
847 | ctx = new C_NotifyUpdate(this, ctx); | |
9f95a23c | 848 | ctx = new LambdaContext([this, ctx](int r) { |
94b18763 FG |
849 | // manually kick of a refresh in case the notification is missed |
850 | // and ignore the next notification that we are about to send | |
9f95a23c | 851 | m_lock.lock(); |
94b18763 | 852 | ++m_ignore_watch_notifies; |
9f95a23c | 853 | m_lock.unlock(); |
94b18763 FG |
854 | |
855 | refresh(ctx); | |
856 | }); | |
9f95a23c | 857 | ctx = new LambdaContext([this, ctx](int r) { |
94b18763 FG |
858 | schedule_laggy_clients_disconnect(ctx); |
859 | }); | |
860 | ||
861 | librados::ObjectWriteOperation op; | |
862 | client::client_commit(&op, m_client_id, m_commit_position); | |
863 | ||
9f95a23c | 864 | auto comp = librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); |
7c673cae | 865 | int r = m_ioctx.aio_operate(m_oid, comp, &op); |
11fdf7f2 | 866 | ceph_assert(r == 0); |
7c673cae | 867 | comp->release(); |
7c673cae FG |
868 | } |
869 | ||
870 | void JournalMetadata::schedule_watch_reset() { | |
9f95a23c | 871 | ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); |
7c673cae FG |
872 | m_timer->add_event_after(1, new C_WatchReset(this)); |
873 | } | |
874 | ||
875 | void JournalMetadata::handle_watch_reset() { | |
9f95a23c | 876 | ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); |
7c673cae FG |
877 | if (!m_initialized) { |
878 | return; | |
879 | } | |
880 | ||
881 | int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx); | |
882 | if (r < 0) { | |
883 | if (r == -ENOENT) { | |
884 | ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl; | |
885 | } else if (r == -EBLACKLISTED) { | |
886 | ldout(m_cct, 5) << __func__ << ": client blacklisted" << dendl; | |
887 | } else { | |
888 | lderr(m_cct) << __func__ << ": failed to watch journal: " | |
889 | << cpp_strerror(r) << dendl; | |
890 | } | |
891 | schedule_watch_reset(); | |
892 | } else { | |
893 | ldout(m_cct, 10) << __func__ << ": reset journal watch" << dendl; | |
894 | refresh(NULL); | |
895 | } | |
896 | } | |
897 | ||
898 | void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) { | |
899 | ldout(m_cct, 10) << "journal header updated" << dendl; | |
900 | ||
901 | bufferlist bl; | |
902 | m_ioctx.notify_ack(m_oid, notify_id, cookie, bl); | |
903 | ||
94b18763 | 904 | { |
9f95a23c | 905 | std::lock_guard locker{m_lock}; |
94b18763 FG |
906 | if (m_ignore_watch_notifies > 0) { |
907 | --m_ignore_watch_notifies; | |
908 | return; | |
909 | } | |
910 | } | |
911 | ||
7c673cae FG |
912 | refresh(NULL); |
913 | } | |
914 | ||
915 | void JournalMetadata::handle_watch_error(int err) { | |
916 | if (err == -ENOTCONN) { | |
917 | ldout(m_cct, 5) << "journal watch error: header removed" << dendl; | |
918 | } else if (err == -EBLACKLISTED) { | |
919 | lderr(m_cct) << "journal watch error: client blacklisted" << dendl; | |
920 | } else { | |
921 | lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl; | |
922 | } | |
923 | ||
9f95a23c | 924 | std::scoped_lock locker{*m_timer_lock, m_lock}; |
7c673cae FG |
925 | |
926 | // release old watch on error | |
927 | if (m_watch_handle != 0) { | |
928 | m_ioctx.unwatch2(m_watch_handle); | |
929 | m_watch_handle = 0; | |
930 | } | |
931 | ||
932 | if (m_initialized && err != -ENOENT) { | |
933 | schedule_watch_reset(); | |
934 | } | |
935 | } | |
936 | ||
937 | uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num, | |
938 | uint64_t tag_tid, | |
939 | uint64_t entry_tid) { | |
9f95a23c | 940 | std::lock_guard locker{m_lock}; |
7c673cae FG |
941 | uint64_t commit_tid = ++m_commit_tid; |
942 | m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag_tid, | |
943 | entry_tid); | |
944 | ||
945 | ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " [" | |
946 | << "object_num=" << object_num << ", " | |
947 | << "tag_tid=" << tag_tid << ", " | |
948 | << "entry_tid=" << entry_tid << "]" | |
949 | << dendl; | |
950 | return commit_tid; | |
951 | } | |
952 | ||
953 | void JournalMetadata::overflow_commit_tid(uint64_t commit_tid, | |
954 | uint64_t object_num) { | |
9f95a23c | 955 | std::lock_guard locker{m_lock}; |
7c673cae FG |
956 | |
957 | auto it = m_pending_commit_tids.find(commit_tid); | |
11fdf7f2 TL |
958 | ceph_assert(it != m_pending_commit_tids.end()); |
959 | ceph_assert(it->second.object_num < object_num); | |
7c673cae FG |
960 | |
961 | ldout(m_cct, 20) << __func__ << ": " | |
962 | << "commit_tid=" << commit_tid << ", " | |
963 | << "old_object_num=" << it->second.object_num << ", " | |
964 | << "new_object_num=" << object_num << dendl; | |
965 | it->second.object_num = object_num; | |
966 | } | |
967 | ||
968 | void JournalMetadata::get_commit_entry(uint64_t commit_tid, | |
969 | uint64_t *object_num, | |
970 | uint64_t *tag_tid, uint64_t *entry_tid) { | |
9f95a23c | 971 | std::lock_guard locker{m_lock}; |
7c673cae FG |
972 | |
973 | auto it = m_pending_commit_tids.find(commit_tid); | |
11fdf7f2 | 974 | ceph_assert(it != m_pending_commit_tids.end()); |
7c673cae FG |
975 | |
976 | *object_num = it->second.object_num; | |
977 | *tag_tid = it->second.tag_tid; | |
978 | *entry_tid = it->second.entry_tid; | |
979 | } | |
980 | ||
981 | void JournalMetadata::committed(uint64_t commit_tid, | |
982 | const CreateContext &create_context) { | |
983 | ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl; | |
984 | ||
985 | ObjectSetPosition commit_position; | |
986 | Context *stale_ctx = nullptr; | |
987 | { | |
9f95a23c | 988 | std::scoped_lock locker{*m_timer_lock, m_lock}; |
11fdf7f2 | 989 | ceph_assert(commit_tid > m_commit_position_tid); |
7c673cae FG |
990 | |
991 | if (!m_commit_position.object_positions.empty()) { | |
992 | // in-flight commit position update | |
993 | commit_position = m_commit_position; | |
994 | } else { | |
995 | // safe commit position | |
996 | commit_position = m_client.commit_position; | |
997 | } | |
998 | ||
999 | CommitTids::iterator it = m_pending_commit_tids.find(commit_tid); | |
11fdf7f2 | 1000 | ceph_assert(it != m_pending_commit_tids.end()); |
7c673cae FG |
1001 | |
1002 | CommitEntry &commit_entry = it->second; | |
1003 | commit_entry.committed = true; | |
1004 | ||
1005 | bool update_commit_position = false; | |
1006 | while (!m_pending_commit_tids.empty()) { | |
1007 | CommitTids::iterator it = m_pending_commit_tids.begin(); | |
1008 | CommitEntry &commit_entry = it->second; | |
1009 | if (!commit_entry.committed) { | |
1010 | break; | |
1011 | } | |
1012 | ||
1013 | commit_position.object_positions.emplace_front( | |
1014 | commit_entry.object_num, commit_entry.tag_tid, | |
1015 | commit_entry.entry_tid); | |
1016 | m_pending_commit_tids.erase(it); | |
1017 | update_commit_position = true; | |
1018 | } | |
1019 | ||
1020 | if (!update_commit_position) { | |
1021 | return; | |
1022 | } | |
1023 | ||
1024 | // prune the position to have one position per splay offset | |
1025 | std::set<uint8_t> in_use_splay_offsets; | |
1026 | ObjectPositions::iterator ob_it = commit_position.object_positions.begin(); | |
1027 | while (ob_it != commit_position.object_positions.end()) { | |
1028 | uint8_t splay_offset = ob_it->object_number % m_splay_width; | |
1029 | if (!in_use_splay_offsets.insert(splay_offset).second) { | |
1030 | ob_it = commit_position.object_positions.erase(ob_it); | |
1031 | } else { | |
1032 | ++ob_it; | |
1033 | } | |
1034 | } | |
1035 | ||
1036 | stale_ctx = m_commit_position_ctx; | |
1037 | m_commit_position_ctx = create_context(); | |
1038 | m_commit_position = commit_position; | |
1039 | m_commit_position_tid = commit_tid; | |
1040 | ||
1041 | ldout(m_cct, 20) << "updated commit position: " << commit_position << ", " | |
1042 | << "on_safe=" << m_commit_position_ctx << dendl; | |
1043 | schedule_commit_task(); | |
1044 | } | |
1045 | ||
1046 | ||
1047 | if (stale_ctx != nullptr) { | |
1048 | ldout(m_cct, 20) << "canceling stale commit: on_safe=" << stale_ctx | |
1049 | << dendl; | |
1050 | stale_ctx->complete(-ESTALE); | |
1051 | } | |
1052 | } | |
1053 | ||
1054 | void JournalMetadata::notify_update() { | |
1055 | ldout(m_cct, 10) << "notifying journal header update" << dendl; | |
1056 | ||
1057 | bufferlist bl; | |
1058 | m_ioctx.notify2(m_oid, bl, 5000, NULL); | |
1059 | } | |
1060 | ||
1061 | void JournalMetadata::async_notify_update(Context *on_safe) { | |
1062 | ldout(m_cct, 10) << "async notifying journal header update" << dendl; | |
1063 | ||
1064 | C_AioNotify *ctx = new C_AioNotify(this, on_safe); | |
1065 | librados::AioCompletion *comp = | |
9f95a23c | 1066 | librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); |
7c673cae FG |
1067 | |
1068 | bufferlist bl; | |
1069 | int r = m_ioctx.aio_notify(m_oid, comp, bl, 5000, NULL); | |
11fdf7f2 | 1070 | ceph_assert(r == 0); |
7c673cae FG |
1071 | |
1072 | comp->release(); | |
1073 | } | |
1074 | ||
1075 | void JournalMetadata::wait_for_ops() { | |
1076 | C_SaferCond ctx; | |
1077 | m_async_op_tracker.wait_for_ops(&ctx); | |
1078 | ctx.wait(); | |
1079 | } | |
1080 | ||
1081 | void JournalMetadata::handle_notified(int r) { | |
1082 | ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl; | |
1083 | } | |
1084 | ||
94b18763 | 1085 | void JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) { |
7c673cae | 1086 | ldout(m_cct, 20) << __func__ << dendl; |
7c673cae | 1087 | if (m_settings.max_concurrent_object_sets <= 0) { |
94b18763 FG |
1088 | on_finish->complete(0); |
1089 | return; | |
7c673cae FG |
1090 | } |
1091 | ||
1092 | Context *ctx = on_finish; | |
94b18763 | 1093 | { |
9f95a23c | 1094 | std::lock_guard locker{m_lock}; |
94b18763 FG |
1095 | for (auto &c : m_registered_clients) { |
1096 | if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED || | |
1097 | c.id == m_client_id || | |
1098 | m_settings.whitelisted_laggy_clients.count(c.id) > 0) { | |
1099 | continue; | |
1100 | } | |
1101 | const std::string &client_id = c.id; | |
1102 | uint64_t object_set = 0; | |
1103 | if (!c.commit_position.object_positions.empty()) { | |
1104 | auto &position = *(c.commit_position.object_positions.begin()); | |
1105 | object_set = position.object_number / m_splay_width; | |
1106 | } | |
7c673cae | 1107 | |
94b18763 FG |
1108 | if (m_active_set > object_set + m_settings.max_concurrent_object_sets) { |
1109 | ldout(m_cct, 1) << __func__ << ": " << client_id | |
1110 | << ": scheduling disconnect" << dendl; | |
7c673cae | 1111 | |
9f95a23c | 1112 | ctx = new LambdaContext([this, client_id, ctx](int r1) { |
94b18763 FG |
1113 | ldout(m_cct, 10) << __func__ << ": " << client_id |
1114 | << ": flagging disconnected" << dendl; | |
7c673cae | 1115 | |
94b18763 FG |
1116 | librados::ObjectWriteOperation op; |
1117 | client::client_update_state( | |
1118 | &op, client_id, cls::journal::CLIENT_STATE_DISCONNECTED); | |
7c673cae | 1119 | |
94b18763 | 1120 | auto comp = librados::Rados::aio_create_completion( |
9f95a23c | 1121 | ctx, utils::rados_ctx_callback); |
94b18763 | 1122 | int r = m_ioctx.aio_operate(m_oid, comp, &op); |
11fdf7f2 | 1123 | ceph_assert(r == 0); |
94b18763 FG |
1124 | comp->release(); |
1125 | }); | |
1126 | } | |
7c673cae FG |
1127 | } |
1128 | } | |
1129 | ||
1130 | if (ctx == on_finish) { | |
1131 | ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl; | |
1132 | } | |
94b18763 | 1133 | ctx->complete(0); |
7c673cae FG |
1134 | } |
1135 | ||
1136 | std::ostream &operator<<(std::ostream &os, | |
1137 | const JournalMetadata::RegisteredClients &clients) { | |
1138 | os << "["; | |
1139 | for (JournalMetadata::RegisteredClients::const_iterator c = clients.begin(); | |
1140 | c != clients.end(); ++c) { | |
1141 | os << (c == clients.begin() ? "" : ", " ) << *c; | |
1142 | } | |
1143 | os << "]"; | |
1144 | return os; | |
1145 | } | |
1146 | ||
1147 | std::ostream &operator<<(std::ostream &os, | |
1148 | const JournalMetadata &jm) { | |
9f95a23c | 1149 | std::lock_guard locker{jm.m_lock}; |
7c673cae FG |
1150 | os << "[oid=" << jm.m_oid << ", " |
1151 | << "initialized=" << jm.m_initialized << ", " | |
1152 | << "order=" << (int)jm.m_order << ", " | |
1153 | << "splay_width=" << (int)jm.m_splay_width << ", " | |
1154 | << "pool_id=" << jm.m_pool_id << ", " | |
1155 | << "minimum_set=" << jm.m_minimum_set << ", " | |
1156 | << "active_set=" << jm.m_active_set << ", " | |
1157 | << "client_id=" << jm.m_client_id << ", " | |
1158 | << "commit_tid=" << jm.m_commit_tid << ", " | |
1159 | << "commit_interval=" << jm.m_settings.commit_interval << ", " | |
1160 | << "commit_position=" << jm.m_commit_position << ", " | |
1161 | << "registered_clients=" << jm.m_registered_clients << "]"; | |
1162 | return os; | |
1163 | } | |
1164 | ||
1165 | } // namespace journal |