]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_log_backing.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rgw / rgw_log_backing.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4#include "cls/log/cls_log_client.h"
5#include "cls/version/cls_version_client.h"
6
7#include "rgw_log_backing.h"
8#include "rgw_tools.h"
9#include "cls_fifo_legacy.h"
10
11namespace cb = ceph::buffer;
12
13static constexpr auto dout_subsys = ceph_subsys_rgw;
14
15enum class shard_check { dne, omap, fifo, corrupt };
16inline std::ostream& operator <<(std::ostream& m, const shard_check& t) {
17 switch (t) {
18 case shard_check::dne:
19 return m << "shard_check::dne";
20 case shard_check::omap:
21 return m << "shard_check::omap";
22 case shard_check::fifo:
23 return m << "shard_check::fifo";
24 case shard_check::corrupt:
25 return m << "shard_check::corrupt";
26 }
27
28 return m << "shard_check::UNKNOWN=" << static_cast<uint32_t>(t);
29}
30
31namespace {
32/// Return the shard type, and a bool to see whether it has entries.
33std::pair<shard_check, bool>
34probe_shard(librados::IoCtx& ioctx, const std::string& oid,
35 bool& fifo_unsupported, optional_yield y)
36{
37 auto cct = static_cast<CephContext*>(ioctx.cct());
38 bool omap = false;
39 {
40 librados::ObjectReadOperation op;
41 cls_log_header header;
42 cls_log_info(op, &header);
43 auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, y);
44 if (r == -ENOENT) {
45 return { shard_check::dne, {} };
46 }
47
48 if (r < 0) {
49 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
50 << " error probing for omap: r=" << r
51 << ", oid=" << oid << dendl;
52 return { shard_check::corrupt, {} };
53 }
54 if (header != cls_log_header{})
55 omap = true;
56 }
57 if (!fifo_unsupported) {
58 std::unique_ptr<rgw::cls::fifo::FIFO> fifo;
59 auto r = rgw::cls::fifo::FIFO::open(ioctx, oid,
60 &fifo, y,
61 std::nullopt, true);
62 if (r < 0 && !(r == -ENOENT || r == -ENODATA || r == -EPERM)) {
63 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
64 << " error probing for fifo: r=" << r
65 << ", oid=" << oid << dendl;
66 return { shard_check::corrupt, {} };
67 }
68 if (fifo && omap) {
69 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
70 << " fifo and omap found: oid=" << oid << dendl;
71 return { shard_check::corrupt, {} };
72 }
73 if (fifo) {
74 bool more = false;
75 std::vector<rgw::cls::fifo::list_entry> entries;
76 r = fifo->list(1, nullopt, &entries, &more, y);
77 if (r < 0) {
78 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
79 << ": unable to list entries: r=" << r
80 << ", oid=" << oid << dendl;
81 return { shard_check::corrupt, {} };
82 }
83 return { shard_check::fifo, !entries.empty() };
84 }
85 if (r == -EPERM) {
86 // Returned by OSD id CLS module not loaded.
87 fifo_unsupported = true;
88 }
89 }
90 if (omap) {
91 std::list<cls_log_entry> entries;
92 std::string out_marker;
93 bool truncated = false;
94 librados::ObjectReadOperation op;
95 cls_log_list(op, {}, {}, {}, 1, entries,
96 &out_marker, &truncated);
97 auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, y);
98 if (r < 0) {
99 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
100 << ": failed to list: r=" << r << ", oid=" << oid << dendl;
101 return { shard_check::corrupt, {} };
102 }
103 return { shard_check::omap, !entries.empty() };
104 }
105
106 // An object exists, but has never had FIFO or cls_log entries written
107 // to it. Likely just the marker Omap.
108 return { shard_check::dne, {} };
109}
110
111tl::expected<log_type, bs::error_code>
112handle_dne(librados::IoCtx& ioctx,
113 log_type def,
114 std::string oid,
115 bool fifo_unsupported,
116 optional_yield y)
117{
118 auto cct = static_cast<CephContext*>(ioctx.cct());
119 if (def == log_type::fifo) {
120 if (fifo_unsupported) {
121 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
122 << " WARNING: FIFO set as default but not supported by OSD. "
123 << "Falling back to OMAP." << dendl;
124 return log_type::omap;
125 }
126 std::unique_ptr<rgw::cls::fifo::FIFO> fifo;
127 auto r = rgw::cls::fifo::FIFO::create(ioctx, oid,
128 &fifo, y,
129 std::nullopt);
130 if (r < 0) {
131 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
132 << " error creating FIFO: r=" << r
133 << ", oid=" << oid << dendl;
134 return tl::unexpected(bs::error_code(-r, bs::system_category()));
135 }
136 }
137 return def;
138}
139}
140
141tl::expected<log_type, bs::error_code>
142log_backing_type(librados::IoCtx& ioctx,
143 log_type def,
144 int shards,
145 const fu2::unique_function<std::string(int) const>& get_oid,
146 optional_yield y)
147{
148 auto cct = static_cast<CephContext*>(ioctx.cct());
149 auto check = shard_check::dne;
150 bool fifo_unsupported = false;
151 for (int i = 0; i < shards; ++i) {
152 auto [c, e] = probe_shard(ioctx, get_oid(i), fifo_unsupported, y);
153 if (c == shard_check::corrupt)
154 return tl::unexpected(bs::error_code(EIO, bs::system_category()));
155 if (c == shard_check::dne) continue;
156 if (check == shard_check::dne) {
157 check = c;
158 continue;
159 }
160
161 if (check != c) {
162 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
163 << " clashing types: check=" << check
164 << ", c=" << c << dendl;
165 return tl::unexpected(bs::error_code(EIO, bs::system_category()));
166 }
167 }
168 if (check == shard_check::corrupt) {
169 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
170 << " should be unreachable!" << dendl;
171 return tl::unexpected(bs::error_code(EIO, bs::system_category()));
172 }
173
174 if (check == shard_check::dne)
175 return handle_dne(ioctx,
176 def,
177 get_oid(0),
178 fifo_unsupported,
179 y);
180
181 return (check == shard_check::fifo ? log_type::fifo : log_type::omap);
182}
183
184bs::error_code log_remove(librados::IoCtx& ioctx,
185 int shards,
186 const fu2::unique_function<std::string(int) const>& get_oid,
187 bool leave_zero,
188 optional_yield y)
189{
190 bs::error_code ec;
191 auto cct = static_cast<CephContext*>(ioctx.cct());
192 for (int i = 0; i < shards; ++i) {
193 auto oid = get_oid(i);
194 rados::cls::fifo::info info;
195 uint32_t part_header_size = 0, part_entry_overhead = 0;
196
197 auto r = rgw::cls::fifo::get_meta(ioctx, oid, nullopt, &info,
198 &part_header_size, &part_entry_overhead,
199 0, y, true);
200 if (r == -ENOENT) continue;
201 if (r == 0 && info.head_part_num > -1) {
202 for (auto j = info.tail_part_num; j <= info.head_part_num; ++j) {
203 librados::ObjectWriteOperation op;
204 op.remove();
205 auto part_oid = info.part_oid(j);
206 auto subr = rgw_rados_operate(ioctx, part_oid, &op, null_yield);
207 if (subr < 0 && subr != -ENOENT) {
208 if (!ec)
209 ec = bs::error_code(-subr, bs::system_category());
210 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
211 << ": failed removing FIFO part: part_oid=" << part_oid
212 << ", subr=" << subr << dendl;
213 }
214 }
215 }
216 if (r < 0 && r != -ENODATA) {
217 if (!ec)
218 ec = bs::error_code(-r, bs::system_category());
219 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
220 << ": failed checking FIFO part: oid=" << oid
221 << ", r=" << r << dendl;
222 }
223 librados::ObjectWriteOperation op;
224 if (i == 0 && leave_zero) {
225 // Leave shard 0 in existence, but remove contents and
226 // omap. cls_lock stores things in the xattrs. And sync needs to
227 // rendezvous with locks on generation 0 shard 0.
228 op.omap_set_header({});
229 op.omap_clear();
230 op.truncate(0);
231 } else {
232 op.remove();
233 }
234 r = rgw_rados_operate(ioctx, oid, &op, null_yield);
235 if (r < 0 && r != -ENOENT) {
236 if (!ec)
237 ec = bs::error_code(-r, bs::system_category());
238 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
239 << ": failed removing shard: oid=" << oid
240 << ", r=" << r << dendl;
241 }
242 }
243 return ec;
244}
245
246logback_generations::~logback_generations() {
247 if (watchcookie > 0) {
248 auto cct = static_cast<CephContext*>(ioctx.cct());
249 auto r = ioctx.unwatch2(watchcookie);
250 if (r < 0) {
251 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
252 << ": failed unwatching oid=" << oid
253 << ", r=" << r << dendl;
254 }
255 }
256}
257
258bs::error_code logback_generations::setup(log_type def,
259 optional_yield y) noexcept
260{
261 try {
262 auto cct = static_cast<CephContext*>(ioctx.cct());
263 // First, read.
264 auto res = read(y);
265 if (!res && res.error() != bs::errc::no_such_file_or_directory) {
266 return res.error();
267 }
268 if (res) {
269 std::unique_lock lock(m);
270 std::tie(entries_, version) = std::move(*res);
271 } else {
272 // Are we the first? Then create generation 0 and the generations
273 // metadata.
274 librados::ObjectWriteOperation op;
275 auto type = log_backing_type(ioctx, def, shards,
276 [this](int shard) {
277 return this->get_oid(0, shard);
278 }, y);
279 if (!type)
280 return type.error();
281
282 logback_generation l;
283 l.type = *type;
284
285 std::unique_lock lock(m);
286 version.ver = 1;
287 static constexpr auto TAG_LEN = 24;
288 version.tag.clear();
289 append_rand_alpha(cct, version.tag, version.tag, TAG_LEN);
290 op.create(true);
291 cls_version_set(op, version);
292 cb::list bl;
293 entries_.emplace(0, std::move(l));
294 encode(entries_, bl);
295 lock.unlock();
296
297 op.write_full(bl);
298 auto r = rgw_rados_operate(ioctx, oid, &op, y);
299 if (r < 0 && r != -EEXIST) {
300 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
301 << ": failed writing oid=" << oid
302 << ", r=" << r << dendl;
303 bs::system_error(-r, bs::system_category());
304 }
305 // Did someone race us? Then re-read.
306 if (r != 0) {
307 res = read(y);
308 if (!res)
309 return res.error();
310 if (res->first.empty())
311 return bs::error_code(EIO, bs::system_category());
312 auto l = res->first.begin()->second;
313 // In the unlikely event that someone raced us, created
314 // generation zero, incremented, then erased generation zero,
315 // don't leave generation zero lying around.
316 if (l.gen_id != 0) {
317 auto ec = log_remove(ioctx, shards,
318 [this](int shard) {
319 return this->get_oid(0, shard);
320 }, true, y);
321 if (ec) return ec;
322 }
323 std::unique_lock lock(m);
324 std::tie(entries_, version) = std::move(*res);
325 }
326 }
327 // Pass all non-empty generations to the handler
328 std::unique_lock lock(m);
329 auto i = lowest_nomempty(entries_);
330 entries_t e;
331 std::copy(i, entries_.cend(),
332 std::inserter(e, e.end()));
333 m.unlock();
334 auto ec = watch();
335 if (ec) {
336 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
337 << ": failed to re-establish watch, unsafe to continue: oid="
338 << oid << ", ec=" << ec.message() << dendl;
339 }
340 return handle_init(std::move(e));
341 } catch (const std::bad_alloc&) {
342 return bs::error_code(ENOMEM, bs::system_category());
343 }
344}
345
346bs::error_code logback_generations::update(optional_yield y) noexcept
347{
348 try {
349 auto cct = static_cast<CephContext*>(ioctx.cct());
350 auto res = read(y);
351 if (!res) {
352 return res.error();
353 }
354
355 std::unique_lock l(m);
356 auto& [es, v] = *res;
357 if (v == version) {
358 // Nothing to do!
359 return {};
360 }
361
362 // Check consistency and prepare update
363 if (es.empty()) {
364 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
365 << ": INCONSISTENCY! Read empty update." << dendl;
366 return bs::error_code(EFAULT, bs::system_category());
367 }
368 auto cur_lowest = lowest_nomempty(entries_);
369 // Straight up can't happen
370 assert(cur_lowest != entries_.cend());
371 auto new_lowest = lowest_nomempty(es);
372 if (new_lowest == es.cend()) {
373 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
374 << ": INCONSISTENCY! Read update with no active head." << dendl;
375 return bs::error_code(EFAULT, bs::system_category());
376 }
377 if (new_lowest->first < cur_lowest->first) {
378 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
379 << ": INCONSISTENCY! Tail moved wrong way." << dendl;
380 return bs::error_code(EFAULT, bs::system_category());
381 }
382
383 std::optional<uint64_t> highest_empty;
384 if (new_lowest->first > cur_lowest->first && new_lowest != es.begin()) {
385 --new_lowest;
386 highest_empty = new_lowest->first;
387 }
388
389 entries_t new_entries;
390
391 if ((es.end() - 1)->first < (entries_.end() - 1)->first) {
392 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
393 << ": INCONSISTENCY! Head moved wrong way." << dendl;
394 return bs::error_code(EFAULT, bs::system_category());
395 }
396
397 if ((es.end() - 1)->first > (entries_.end() - 1)->first) {
398 auto ei = es.lower_bound((entries_.end() - 1)->first + 1);
399 std::copy(ei, es.end(), std::inserter(new_entries, new_entries.end()));
400 }
401
402 // Everything checks out!
403
404 version = v;
405 entries_ = es;
406 l.unlock();
407
408 if (highest_empty) {
409 auto ec = handle_empty_to(*highest_empty);
410 if (ec) return ec;
411 }
412
413 if (!new_entries.empty()) {
414 auto ec = handle_new_gens(std::move(new_entries));
415 if (ec) return ec;
416 }
417 } catch (const std::bad_alloc&) {
418 return bs::error_code(ENOMEM, bs::system_category());
419 }
420 return {};
421}
422
423auto logback_generations::read(optional_yield y) noexcept ->
424 tl::expected<std::pair<entries_t, obj_version>, bs::error_code>
425{
426 try {
427 auto cct = static_cast<CephContext*>(ioctx.cct());
428 librados::ObjectReadOperation op;
429 std::unique_lock l(m);
430 cls_version_check(op, version, VER_COND_GE);
431 l.unlock();
432 obj_version v2;
433 cls_version_read(op, &v2);
434 cb::list bl;
435 op.read(0, 0, &bl, nullptr);
436 auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, y);
437 if (r < 0) {
438 if (r == -ENOENT) {
439 ldout(cct, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
440 << ": oid=" << oid
441 << " not found" << dendl;
442 } else {
443 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
444 << ": failed reading oid=" << oid
445 << ", r=" << r << dendl;
446 }
447 return tl::unexpected(bs::error_code(-r, bs::system_category()));
448 }
449 auto bi = bl.cbegin();
450 entries_t e;
451 try {
452 decode(e, bi);
453 } catch (const cb::error& err) {
454 return tl::unexpected(err.code());
455 }
456 return std::pair{ std::move(e), std::move(v2) };
457 } catch (const std::bad_alloc&) {
458 return tl::unexpected(bs::error_code(ENOMEM, bs::system_category()));
459 }
460}
461
462bs::error_code logback_generations::write(entries_t&& e,
463 std::unique_lock<std::mutex>&& l_,
464 optional_yield y) noexcept
465{
466 auto l = std::move(l_);
467 ceph_assert(l.mutex() == &m &&
468 l.owns_lock());
469 try {
470 auto cct = static_cast<CephContext*>(ioctx.cct());
471 librados::ObjectWriteOperation op;
472 cls_version_check(op, version, VER_COND_GE);
473 cb::list bl;
474 encode(e, bl);
475 op.write_full(bl);
476 cls_version_inc(op);
477 auto r = rgw_rados_operate(ioctx, oid, &op, y);
478 if (r == 0) {
479 entries_ = std::move(e);
480 version.inc();
481 return {};
482 }
483 l.unlock();
484 if (r < 0 && r != -ECANCELED) {
485 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
486 << ": failed reading oid=" << oid
487 << ", r=" << r << dendl;
488 return { -r, bs::system_category() };
489 }
490 if (r == -ECANCELED) {
491 auto ec = update(y);
492 if (ec) {
493 return ec;
494 } else {
495 return { ECANCELED, bs::system_category() };
496 }
497 }
498 } catch (const std::bad_alloc&) {
499 return { ENOMEM, bs::system_category() };
500 }
501 return {};
502}
503
504
505bs::error_code logback_generations::watch() noexcept {
506 try {
507 auto cct = static_cast<CephContext*>(ioctx.cct());
508 auto r = ioctx.watch2(oid, &watchcookie, this);
509 if (r < 0) {
510 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
511 << ": failed to set watch oid=" << oid
512 << ", r=" << r << dendl;
513 return { -r, bs::system_category() };
514 }
515 } catch (const std::bad_alloc&) {
516 return bs::error_code(ENOMEM, bs::system_category());
517 }
518 return {};
519}
520
521bs::error_code logback_generations::new_backing(log_type type,
522 optional_yield y) noexcept {
523 auto cct = static_cast<CephContext*>(ioctx.cct());
524 static constexpr auto max_tries = 10;
525 try {
526 auto ec = update(y);
527 if (ec) return ec;
528 auto tries = 0;
529 entries_t new_entries;
530 do {
531 std::unique_lock l(m);
532 auto last = entries_.end() - 1;
533 if (last->second.type == type) {
534 // Nothing to be done
535 return {};
536 }
537 auto newgenid = last->first + 1;
538 logback_generation newgen;
539 newgen.gen_id = newgenid;
540 newgen.type = type;
541 new_entries.emplace(newgenid, newgen);
542 auto es = entries_;
543 es.emplace(newgenid, std::move(newgen));
544 ec = write(std::move(es), std::move(l), y);
545 ++tries;
546 } while (ec == bs::errc::operation_canceled &&
547 tries < max_tries);
548 if (tries >= max_tries) {
549 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
550 << ": exhausted retry attempts." << dendl;
551 return ec;
552 }
553
554 if (ec) {
555 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
556 << ": write failed with ec=" << ec.message() << dendl;
557 return ec;
558 }
559
560 cb::list bl, rbl;
561
562 auto r = rgw_rados_notify(ioctx, oid, bl, 10'000, &rbl, y);
563 if (r < 0) {
564 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
565 << ": notify failed with r=" << r << dendl;
566 return { -r, bs::system_category() };
567 }
568 ec = handle_new_gens(new_entries);
569 } catch (const std::bad_alloc&) {
570 return bs::error_code(ENOMEM, bs::system_category());
571 }
572 return {};
573}
574
575bs::error_code logback_generations::empty_to(uint64_t gen_id,
576 optional_yield y) noexcept {
577 auto cct = static_cast<CephContext*>(ioctx.cct());
578 static constexpr auto max_tries = 10;
579 try {
580 auto ec = update(y);
581 if (ec) return ec;
582 auto tries = 0;
583 uint64_t newtail = 0;
584 do {
585 std::unique_lock l(m);
586 {
587 auto last = entries_.end() - 1;
588 if (gen_id >= last->first) {
589 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
590 << ": Attempt to trim beyond the possible." << dendl;
591 return bs::error_code(EINVAL, bs::system_category());
592 }
593 }
594 auto es = entries_;
595 auto ei = es.upper_bound(gen_id);
596 if (ei == es.begin()) {
597 // Nothing to be done.
598 return {};
599 }
600 for (auto i = es.begin(); i < ei; ++i) {
601 newtail = i->first;
602 i->second.pruned = ceph::real_clock::now();
603 }
604 ec = write(std::move(es), std::move(l), y);
605 ++tries;
606 } while (ec == bs::errc::operation_canceled &&
607 tries < max_tries);
608 if (tries >= max_tries) {
609 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
610 << ": exhausted retry attempts." << dendl;
611 return ec;
612 }
613
614 if (ec) {
615 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
616 << ": write failed with ec=" << ec.message() << dendl;
617 return ec;
618 }
619
620 cb::list bl, rbl;
621
622 auto r = rgw_rados_notify(ioctx, oid, bl, 10'000, &rbl, y);
623 if (r < 0) {
624 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
625 << ": notify failed with r=" << r << dendl;
626 return { -r, bs::system_category() };
627 }
628 ec = handle_empty_to(newtail);
629 } catch (const std::bad_alloc&) {
630 return bs::error_code(ENOMEM, bs::system_category());
631 }
632 return {};
633}
634
635bs::error_code logback_generations::remove_empty(optional_yield y) noexcept {
636 auto cct = static_cast<CephContext*>(ioctx.cct());
637 static constexpr auto max_tries = 10;
638 try {
639 auto ec = update(y);
640 if (ec) return ec;
641 auto tries = 0;
642 entries_t new_entries;
643 std::unique_lock l(m);
644 ceph_assert(!entries_.empty());
645 {
646 auto i = lowest_nomempty(entries_);
647 if (i == entries_.begin()) {
648 return {};
649 }
650 }
651 entries_t es;
652 auto now = ceph::real_clock::now();
653 l.unlock();
654 do {
655 std::copy_if(entries_.cbegin(), entries_.cend(),
656 std::inserter(es, es.end()),
657 [now](const auto& e) {
658 if (!e.second.pruned)
659 return false;
660
661 auto pruned = *e.second.pruned;
662 return (now - pruned) >= 1h;
663 });
664 auto es2 = entries_;
665 for (const auto& [gen_id, e] : es) {
666 ceph_assert(e.pruned);
667 auto ec = log_remove(ioctx, shards,
668 [this, gen_id](int shard) {
669 return this->get_oid(gen_id, shard);
670 }, (gen_id == 0), y);
671 if (ec) {
672 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
673 << ": Error pruning: gen_id=" << gen_id
674 << " ec=" << ec.message() << dendl;
675 }
676 if (auto i = es2.find(gen_id); i != es2.end()) {
677 es2.erase(i);
678 }
679 }
680 l.lock();
681 es.clear();
682 ec = write(std::move(es2), std::move(l), y);
683 ++tries;
684 } while (ec == bs::errc::operation_canceled &&
685 tries < max_tries);
686 if (tries >= max_tries) {
687 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
688 << ": exhausted retry attempts." << dendl;
689 return ec;
690 }
691
692 if (ec) {
693 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
694 << ": write failed with ec=" << ec.message() << dendl;
695 return ec;
696 }
697 } catch (const std::bad_alloc&) {
698 return bs::error_code(ENOMEM, bs::system_category());
699 }
700 return {};
701}
702
703void logback_generations::handle_notify(uint64_t notify_id,
704 uint64_t cookie,
705 uint64_t notifier_id,
706 bufferlist& bl)
707{
708 auto cct = static_cast<CephContext*>(ioctx.cct());
709 if (notifier_id != my_id) {
710 auto ec = update(null_yield);
711 if (ec) {
712 lderr(cct)
713 << __PRETTY_FUNCTION__ << ":" << __LINE__
714 << ": update failed, no one to report to and no safe way to continue."
715 << dendl;
716 abort();
717 }
718 }
719 cb::list rbl;
720 ioctx.notify_ack(oid, notify_id, watchcookie, rbl);
721}
722
723void logback_generations::handle_error(uint64_t cookie, int err) {
724 auto cct = static_cast<CephContext*>(ioctx.cct());
725 auto r = ioctx.unwatch2(watchcookie);
726 if (r < 0) {
727 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
728 << ": failed to set unwatch oid=" << oid
729 << ", r=" << r << dendl;
730 }
731
732 auto ec = watch();
733 if (ec) {
734 lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
735 << ": failed to re-establish watch, unsafe to continue: oid="
736 << oid << ", ec=" << ec.message() << dendl;
737 }
738}