]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/ReplicatedBackend.cc
bump version to 18.2.4-pve3
[ceph.git] / ceph / src / osd / ReplicatedBackend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #include "common/errno.h"
15 #include "ReplicatedBackend.h"
16 #include "messages/MOSDOp.h"
17 #include "messages/MOSDRepOp.h"
18 #include "messages/MOSDRepOpReply.h"
19 #include "messages/MOSDPGPush.h"
20 #include "messages/MOSDPGPull.h"
21 #include "messages/MOSDPGPushReply.h"
22 #include "common/EventTrace.h"
23 #include "include/random.h"
24 #include "include/util.h"
25 #include "OSD.h"
26 #include "osd_tracer.h"
27
28 #define dout_context cct
29 #define dout_subsys ceph_subsys_osd
30 #define DOUT_PREFIX_ARGS this
31 #undef dout_prefix
32 #define dout_prefix _prefix(_dout, this)
33 static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
34 return pgb->get_parent()->gen_dbg_prefix(*_dout);
35 }
36
37 using std::less;
38 using std::list;
39 using std::make_pair;
40 using std::map;
41 using std::ostringstream;
42 using std::set;
43 using std::pair;
44 using std::string;
45 using std::unique_ptr;
46 using std::vector;
47
48 using ceph::bufferhash;
49 using ceph::bufferlist;
50 using ceph::decode;
51 using ceph::encode;
52
53 namespace {
54 class PG_SendMessageOnConn: public Context {
55 PGBackend::Listener *pg;
56 Message *reply;
57 ConnectionRef conn;
58 public:
59 PG_SendMessageOnConn(
60 PGBackend::Listener *pg,
61 Message *reply,
62 ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {}
63 void finish(int) override {
64 pg->send_message_osd_cluster(MessageRef(reply, false), conn.get());
65 }
66 };
67
68 class PG_RecoveryQueueAsync : public Context {
69 PGBackend::Listener *pg;
70 unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
71 uint64_t cost;
72 public:
73 PG_RecoveryQueueAsync(
74 PGBackend::Listener *pg,
75 GenContext<ThreadPool::TPHandle&> *c,
76 uint64_t cost) : pg(pg), c(c), cost(cost) {}
77 void finish(int) override {
78 pg->schedule_recovery_work(c.release(), cost);
79 }
80 };
81 }
82
83 struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context {
84 ReplicatedBackend *pg;
85 RepModifyRef rm;
86 C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
87 : pg(pg), rm(r) {}
88 void finish(int r) override {
89 pg->repop_commit(rm);
90 }
91 };
92
93 static void log_subop_stats(
94 PerfCounters *logger,
95 OpRequestRef op, int subop)
96 {
97 utime_t latency = ceph_clock_now();
98 latency -= op->get_req()->get_recv_stamp();
99
100
101 logger->inc(l_osd_sop);
102 logger->tinc(l_osd_sop_lat, latency);
103 logger->inc(subop);
104
105 if (subop != l_osd_sop_pull) {
106 uint64_t inb = op->get_req()->get_data().length();
107 logger->inc(l_osd_sop_inb, inb);
108 if (subop == l_osd_sop_w) {
109 logger->inc(l_osd_sop_w_inb, inb);
110 logger->tinc(l_osd_sop_w_lat, latency);
111 } else if (subop == l_osd_sop_push) {
112 logger->inc(l_osd_sop_push_inb, inb);
113 logger->tinc(l_osd_sop_push_lat, latency);
114 } else
115 ceph_abort_msg("no support subop");
116 } else {
117 logger->tinc(l_osd_sop_pull_lat, latency);
118 }
119 }
120
121 ReplicatedBackend::ReplicatedBackend(
122 PGBackend::Listener *pg,
123 const coll_t &coll,
124 ObjectStore::CollectionHandle &c,
125 ObjectStore *store,
126 CephContext *cct) :
127 PGBackend(cct, pg, store, coll, c) {}
128
129 void ReplicatedBackend::run_recovery_op(
130 PGBackend::RecoveryHandle *_h,
131 int priority)
132 {
133 RPGHandle *h = static_cast<RPGHandle *>(_h);
134 send_pushes(priority, h->pushes);
135 send_pulls(priority, h->pulls);
136 send_recovery_deletes(priority, h->deletes);
137 delete h;
138 }
139
140 int ReplicatedBackend::recover_object(
141 const hobject_t &hoid,
142 eversion_t v,
143 ObjectContextRef head,
144 ObjectContextRef obc,
145 RecoveryHandle *_h
146 )
147 {
148 dout(10) << __func__ << ": " << hoid << dendl;
149 RPGHandle *h = static_cast<RPGHandle *>(_h);
150 if (get_parent()->get_local_missing().is_missing(hoid)) {
151 ceph_assert(!obc);
152 // pull
153 prepare_pull(
154 v,
155 hoid,
156 head,
157 h);
158 } else {
159 ceph_assert(obc);
160 int started = start_pushes(
161 hoid,
162 obc,
163 h);
164 if (started < 0) {
165 pushing[hoid].clear();
166 return started;
167 }
168 }
169 return 0;
170 }
171
172 void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
173 {
174 for(map<pg_shard_t, set<hobject_t> >::iterator i = pull_from_peer.begin();
175 i != pull_from_peer.end();
176 ) {
177 if (osdmap->is_down(i->first.osd)) {
178 dout(10) << "check_recovery_sources resetting pulls from osd." << i->first
179 << ", osdmap has it marked down" << dendl;
180 for (set<hobject_t>::iterator j = i->second.begin();
181 j != i->second.end();
182 ++j) {
183 get_parent()->cancel_pull(*j);
184 clear_pull(pulling.find(*j), false);
185 }
186 pull_from_peer.erase(i++);
187 } else {
188 ++i;
189 }
190 }
191 }
192
193 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op)
194 {
195 dout(10) << __func__ << ": " << *op->get_req() << dendl;
196 switch (op->get_req()->get_type()) {
197 case MSG_OSD_PG_PULL:
198 return true;
199 default:
200 return false;
201 }
202 }
203
204 bool ReplicatedBackend::_handle_message(
205 OpRequestRef op
206 )
207 {
208 dout(10) << __func__ << ": " << *op->get_req() << dendl;
209 switch (op->get_req()->get_type()) {
210 case MSG_OSD_PG_PUSH:
211 do_push(op);
212 return true;
213
214 case MSG_OSD_PG_PULL:
215 do_pull(op);
216 return true;
217
218 case MSG_OSD_PG_PUSH_REPLY:
219 do_push_reply(op);
220 return true;
221
222 case MSG_OSD_REPOP: {
223 do_repop(op);
224 return true;
225 }
226
227 case MSG_OSD_REPOPREPLY: {
228 do_repop_reply(op);
229 return true;
230 }
231
232 default:
233 break;
234 }
235 return false;
236 }
237
238 void ReplicatedBackend::clear_recovery_state()
239 {
240 // clear pushing/pulling maps
241 for (auto &&i: pushing) {
242 for (auto &&j: i.second) {
243 get_parent()->release_locks(j.second.lock_manager);
244 }
245 }
246 pushing.clear();
247
248 for (auto &&i: pulling) {
249 get_parent()->release_locks(i.second.lock_manager);
250 }
251 pulling.clear();
252 pull_from_peer.clear();
253 }
254
255 void ReplicatedBackend::on_change()
256 {
257 dout(10) << __func__ << dendl;
258 for (auto& op : in_progress_ops) {
259 delete op.second->on_commit;
260 op.second->on_commit = nullptr;
261 }
262 in_progress_ops.clear();
263 clear_recovery_state();
264 }
265
266 int ReplicatedBackend::objects_read_sync(
267 const hobject_t &hoid,
268 uint64_t off,
269 uint64_t len,
270 uint32_t op_flags,
271 bufferlist *bl)
272 {
273 return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags);
274 }
275
276 int ReplicatedBackend::objects_readv_sync(
277 const hobject_t &hoid,
278 map<uint64_t, uint64_t>&& m,
279 uint32_t op_flags,
280 bufferlist *bl)
281 {
282 interval_set<uint64_t> im(std::move(m));
283 auto r = store->readv(ch, ghobject_t(hoid), im, *bl, op_flags);
284 if (r >= 0) {
285 m = std::move(im).detach();
286 }
287 return r;
288 }
289
290 void ReplicatedBackend::objects_read_async(
291 const hobject_t &hoid,
292 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
293 pair<bufferlist*, Context*> > > &to_read,
294 Context *on_complete,
295 bool fast_read)
296 {
297 ceph_abort_msg("async read is not used by replica pool");
298 }
299
300 class C_OSD_OnOpCommit : public Context {
301 ReplicatedBackend *pg;
302 ceph::ref_t<ReplicatedBackend::InProgressOp> op;
303 public:
304 C_OSD_OnOpCommit(ReplicatedBackend *pg, ceph::ref_t<ReplicatedBackend::InProgressOp> op)
305 : pg(pg), op(std::move(op)) {}
306 void finish(int) override {
307 pg->op_commit(op);
308 }
309 };
310
311 void generate_transaction(
312 PGTransactionUPtr &pgt,
313 const coll_t &coll,
314 vector<pg_log_entry_t> &log_entries,
315 ObjectStore::Transaction *t,
316 set<hobject_t> *added,
317 set<hobject_t> *removed,
318 const ceph_release_t require_osd_release = ceph_release_t::unknown )
319 {
320 ceph_assert(t);
321 ceph_assert(added);
322 ceph_assert(removed);
323
324 for (auto &&le: log_entries) {
325 le.mark_unrollbackable();
326 auto oiter = pgt->op_map.find(le.soid);
327 if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) {
328 bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8);
329 encode(oiter->second.updated_snaps->second, bl);
330 le.snaps.swap(bl);
331 le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
332 }
333 }
334
335 pgt->safe_create_traverse(
336 [&](pair<const hobject_t, PGTransaction::ObjectOperation> &obj_op) {
337 const hobject_t &oid = obj_op.first;
338 const ghobject_t goid =
339 ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
340 const PGTransaction::ObjectOperation &op = obj_op.second;
341
342 if (oid.is_temp()) {
343 if (op.is_fresh_object()) {
344 added->insert(oid);
345 } else if (op.is_delete()) {
346 removed->insert(oid);
347 }
348 }
349
350 if (op.delete_first) {
351 t->remove(coll, goid);
352 }
353
354 match(
355 op.init_type,
356 [&](const PGTransaction::ObjectOperation::Init::None &) {
357 },
358 [&](const PGTransaction::ObjectOperation::Init::Create &op) {
359 if (require_osd_release >= ceph_release_t::octopus) {
360 t->create(coll, goid);
361 } else {
362 t->touch(coll, goid);
363 }
364 },
365 [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
366 t->clone(
367 coll,
368 ghobject_t(
369 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
370 goid);
371 },
372 [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
373 ceph_assert(op.source.is_temp());
374 t->collection_move_rename(
375 coll,
376 ghobject_t(
377 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
378 coll,
379 goid);
380 });
381
382 if (op.truncate) {
383 t->truncate(coll, goid, op.truncate->first);
384 if (op.truncate->first != op.truncate->second)
385 t->truncate(coll, goid, op.truncate->second);
386 }
387
388 if (!op.attr_updates.empty()) {
389 map<string, bufferlist, less<>> attrs;
390 for (auto &&p: op.attr_updates) {
391 if (p.second)
392 attrs[p.first] = *(p.second);
393 else
394 t->rmattr(coll, goid, p.first);
395 }
396 t->setattrs(coll, goid, attrs);
397 }
398
399 if (op.clear_omap)
400 t->omap_clear(coll, goid);
401 if (op.omap_header)
402 t->omap_setheader(coll, goid, *(op.omap_header));
403
404 for (auto &&up: op.omap_updates) {
405 using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType;
406 switch (up.first) {
407 case UpdateType::Remove:
408 t->omap_rmkeys(coll, goid, up.second);
409 break;
410 case UpdateType::Insert:
411 t->omap_setkeys(coll, goid, up.second);
412 break;
413 case UpdateType::RemoveRange:
414 t->omap_rmkeyrange(coll, goid, up.second);
415 break;
416 }
417 }
418
419 // updated_snaps doesn't matter since we marked unrollbackable
420
421 if (op.alloc_hint) {
422 auto &hint = *(op.alloc_hint);
423 t->set_alloc_hint(
424 coll,
425 goid,
426 hint.expected_object_size,
427 hint.expected_write_size,
428 hint.flags);
429 }
430
431 for (auto &&extent: op.buffer_updates) {
432 using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
433 match(
434 extent.get_val(),
435 [&](const BufferUpdate::Write &op) {
436 t->write(
437 coll,
438 goid,
439 extent.get_off(),
440 extent.get_len(),
441 op.buffer,
442 op.fadvise_flags);
443 },
444 [&](const BufferUpdate::Zero &op) {
445 t->zero(
446 coll,
447 goid,
448 extent.get_off(),
449 extent.get_len());
450 },
451 [&](const BufferUpdate::CloneRange &op) {
452 ceph_assert(op.len == extent.get_len());
453 t->clone_range(
454 coll,
455 ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
456 goid,
457 op.offset,
458 extent.get_len(),
459 extent.get_off());
460 });
461 }
462 });
463 }
464
465 void ReplicatedBackend::submit_transaction(
466 const hobject_t &soid,
467 const object_stat_sum_t &delta_stats,
468 const eversion_t &at_version,
469 PGTransactionUPtr &&_t,
470 const eversion_t &trim_to,
471 const eversion_t &min_last_complete_ondisk,
472 vector<pg_log_entry_t>&& _log_entries,
473 std::optional<pg_hit_set_history_t> &hset_history,
474 Context *on_all_commit,
475 ceph_tid_t tid,
476 osd_reqid_t reqid,
477 OpRequestRef orig_op)
478 {
479 parent->apply_stats(
480 soid,
481 delta_stats);
482
483 vector<pg_log_entry_t> log_entries(_log_entries);
484 ObjectStore::Transaction op_t;
485 PGTransactionUPtr t(std::move(_t));
486 set<hobject_t> added, removed;
487 generate_transaction(
488 t,
489 coll,
490 log_entries,
491 &op_t,
492 &added,
493 &removed,
494 get_osdmap()->require_osd_release);
495 ceph_assert(added.size() <= 1);
496 ceph_assert(removed.size() <= 1);
497
498 auto insert_res = in_progress_ops.insert(
499 make_pair(
500 tid,
501 ceph::make_ref<InProgressOp>(
502 tid, on_all_commit,
503 orig_op, at_version)
504 )
505 );
506 ceph_assert(insert_res.second);
507 InProgressOp &op = *insert_res.first->second;
508
509
510 op.waiting_for_commit.insert(
511 parent->get_acting_recovery_backfill_shards().begin(),
512 parent->get_acting_recovery_backfill_shards().end());
513
514 issue_op(
515 soid,
516 at_version,
517 tid,
518 reqid,
519 trim_to,
520 min_last_complete_ondisk,
521 added.size() ? *(added.begin()) : hobject_t(),
522 removed.size() ? *(removed.begin()) : hobject_t(),
523 log_entries,
524 hset_history,
525 &op,
526 op_t);
527
528 add_temp_objs(added);
529 clear_temp_objs(removed);
530
531 parent->log_operation(
532 std::move(log_entries),
533 hset_history,
534 trim_to,
535 at_version,
536 min_last_complete_ondisk,
537 true,
538 op_t);
539
540 op_t.register_on_commit(
541 parent->bless_context(
542 new C_OSD_OnOpCommit(this, &op)));
543
544 vector<ObjectStore::Transaction> tls;
545 tls.push_back(std::move(op_t));
546
547 parent->queue_transactions(tls, op.op);
548 if (at_version != eversion_t()) {
549 parent->op_applied(at_version);
550 }
551 }
552
553 void ReplicatedBackend::op_commit(const ceph::ref_t<InProgressOp>& op)
554 {
555 if (op->on_commit == nullptr) {
556 // aborted
557 return;
558 }
559
560 FUNCTRACE(cct);
561 OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_COMMIT_BEGIN", true);
562 dout(10) << __func__ << ": " << op->tid << dendl;
563 if (op->op) {
564 op->op->mark_event("op_commit");
565 op->op->pg_trace.event("op commit");
566 }
567
568 op->waiting_for_commit.erase(get_parent()->whoami_shard());
569
570 if (op->waiting_for_commit.empty()) {
571 op->on_commit->complete(0);
572 op->on_commit = 0;
573 in_progress_ops.erase(op->tid);
574 }
575 }
576
577 void ReplicatedBackend::do_repop_reply(OpRequestRef op)
578 {
579 static_cast<MOSDRepOpReply*>(op->get_nonconst_req())->finish_decode();
580 auto r = op->get_req<MOSDRepOpReply>();
581 ceph_assert(r->get_header().type == MSG_OSD_REPOPREPLY);
582
583 op->mark_started();
584
585 // must be replication.
586 ceph_tid_t rep_tid = r->get_tid();
587 pg_shard_t from = r->from;
588
589 auto iter = in_progress_ops.find(rep_tid);
590 if (iter != in_progress_ops.end()) {
591 InProgressOp &ip_op = *iter->second;
592 const MOSDOp *m = nullptr;
593 if (ip_op.op)
594 m = ip_op.op->get_req<MOSDOp>();
595
596 if (m)
597 dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
598 << " ack_type " << (int)r->ack_type
599 << " from " << from
600 << dendl;
601 else
602 dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) "
603 << " ack_type " << (int)r->ack_type
604 << " from " << from
605 << dendl;
606
607 // oh, good.
608
609 if (r->ack_type & CEPH_OSD_FLAG_ONDISK) {
610 ceph_assert(ip_op.waiting_for_commit.count(from));
611 ip_op.waiting_for_commit.erase(from);
612 if (ip_op.op) {
613 ip_op.op->mark_event("sub_op_commit_rec");
614 ip_op.op->pg_trace.event("sub_op_commit_rec");
615 }
616 } else {
617 // legacy peer; ignore
618 }
619
620 parent->update_peer_last_complete_ondisk(
621 from,
622 r->get_last_complete_ondisk());
623
624 if (ip_op.waiting_for_commit.empty() &&
625 ip_op.on_commit) {
626 ip_op.on_commit->complete(0);
627 ip_op.on_commit = 0;
628 in_progress_ops.erase(iter);
629 }
630 }
631 }
632
633 int ReplicatedBackend::be_deep_scrub(
634 const hobject_t &poid,
635 ScrubMap &map,
636 ScrubMapBuilder &pos,
637 ScrubMap::object &o)
638 {
639 dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
640 int r;
641 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
642 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED |
643 CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE;
644
645 utime_t sleeptime;
646 sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
647 if (sleeptime != utime_t()) {
648 lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
649 sleeptime.sleep();
650 }
651
652 ceph_assert(poid == pos.ls[pos.pos]);
653 if (!pos.data_done()) {
654 if (pos.data_pos == 0) {
655 pos.data_hash = bufferhash(-1);
656 }
657
658 const uint64_t stride = cct->_conf->osd_deep_scrub_stride;
659
660 bufferlist bl;
661 r = store->read(
662 ch,
663 ghobject_t(
664 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
665 pos.data_pos,
666 stride, bl,
667 fadvise_flags);
668 if (r < 0) {
669 dout(20) << __func__ << " " << poid << " got "
670 << r << " on read, read_error" << dendl;
671 o.read_error = true;
672 return 0;
673 }
674 if (r > 0) {
675 pos.data_hash << bl;
676 }
677 pos.data_pos += r;
678 if (static_cast<uint64_t>(r) == stride) {
679 dout(20) << __func__ << " " << poid << " more data, digest so far 0x"
680 << std::hex << pos.data_hash.digest() << std::dec << dendl;
681 return -EINPROGRESS;
682 }
683 // done with bytes
684 pos.data_pos = -1;
685 o.digest = pos.data_hash.digest();
686 o.digest_present = true;
687 dout(20) << __func__ << " " << poid << " done with data, digest 0x"
688 << std::hex << o.digest << std::dec << dendl;
689 }
690
691 // omap header
692 if (pos.omap_pos.empty()) {
693 pos.omap_hash = bufferhash(-1);
694
695 bufferlist hdrbl;
696 r = store->omap_get_header(
697 ch,
698 ghobject_t(
699 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
700 &hdrbl, true);
701 if (r == -EIO) {
702 dout(20) << __func__ << " " << poid << " got "
703 << r << " on omap header read, read_error" << dendl;
704 o.read_error = true;
705 return 0;
706 }
707 if (r == 0 && hdrbl.length()) {
708 bool encoded = false;
709 dout(25) << "CRC header " << cleanbin(hdrbl, encoded, true) << dendl;
710 pos.omap_hash << hdrbl;
711 }
712 }
713
714 // omap
715 ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(
716 ch,
717 ghobject_t(
718 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
719 ceph_assert(iter);
720 if (pos.omap_pos.length()) {
721 iter->lower_bound(pos.omap_pos);
722 } else {
723 iter->seek_to_first();
724 }
725 int max = g_conf()->osd_deep_scrub_keys;
726 while (iter->status() == 0 && iter->valid()) {
727 pos.omap_bytes += iter->value().length();
728 ++pos.omap_keys;
729 --max;
730 // fixme: we can do this more efficiently.
731 bufferlist bl;
732 encode(iter->key(), bl);
733 encode(iter->value(), bl);
734 pos.omap_hash << bl;
735
736 iter->next();
737
738 if (iter->valid() && max == 0) {
739 pos.omap_pos = iter->key();
740 return -EINPROGRESS;
741 }
742 if (iter->status() < 0) {
743 dout(25) << __func__ << " " << poid
744 << " on omap scan, db status error" << dendl;
745 o.read_error = true;
746 return 0;
747 }
748 }
749
750 if (pos.omap_keys > cct->_conf->
751 osd_deep_scrub_large_omap_object_key_threshold ||
752 pos.omap_bytes > cct->_conf->
753 osd_deep_scrub_large_omap_object_value_sum_threshold) {
754 dout(25) << __func__ << " " << poid
755 << " large omap object detected. Object has " << pos.omap_keys
756 << " keys and size " << pos.omap_bytes << " bytes" << dendl;
757 o.large_omap_object_found = true;
758 o.large_omap_object_key_count = pos.omap_keys;
759 o.large_omap_object_value_size = pos.omap_bytes;
760 map.has_large_omap_object_errors = true;
761 }
762
763 o.omap_digest = pos.omap_hash.digest();
764 o.omap_digest_present = true;
765 dout(20) << __func__ << " done with " << poid << " omap_digest "
766 << std::hex << o.omap_digest << std::dec << dendl;
767
768 // Sum up omap usage
769 if (pos.omap_keys > 0 || pos.omap_bytes > 0) {
770 dout(25) << __func__ << " adding " << pos.omap_keys << " keys and "
771 << pos.omap_bytes << " bytes to pg_stats sums" << dendl;
772 map.has_omap_keys = true;
773 o.object_omap_bytes = pos.omap_bytes;
774 o.object_omap_keys = pos.omap_keys;
775 }
776
777 // done!
778 return 0;
779 }
780
781 void ReplicatedBackend::_do_push(OpRequestRef op)
782 {
783 auto m = op->get_req<MOSDPGPush>();
784 ceph_assert(m->get_type() == MSG_OSD_PG_PUSH);
785 pg_shard_t from = m->from;
786
787 op->mark_started();
788
789 vector<PushReplyOp> replies;
790 ObjectStore::Transaction t;
791 if (get_parent()->check_failsafe_full()) {
792 dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl;
793 ceph_abort();
794 }
795 for (vector<PushOp>::const_iterator i = m->pushes.begin();
796 i != m->pushes.end();
797 ++i) {
798 replies.push_back(PushReplyOp());
799 handle_push(from, *i, &(replies.back()), &t, m->is_repair);
800 }
801
802 MOSDPGPushReply *reply = new MOSDPGPushReply;
803 reply->from = get_parent()->whoami_shard();
804 reply->set_priority(m->get_priority());
805 reply->pgid = get_info().pgid;
806 reply->map_epoch = m->map_epoch;
807 reply->min_epoch = m->min_epoch;
808 reply->replies.swap(replies);
809 reply->compute_cost(cct);
810
811 t.register_on_complete(
812 new PG_SendMessageOnConn(
813 get_parent(), reply, m->get_connection()));
814
815 get_parent()->queue_transaction(std::move(t));
816 }
817
818 struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
819 ReplicatedBackend *bc;
820 list<ReplicatedBackend::pull_complete_info> to_continue;
821 int priority;
822 C_ReplicatedBackend_OnPullComplete(
823 ReplicatedBackend *bc,
824 int priority,
825 list<ReplicatedBackend::pull_complete_info> &&to_continue)
826 : bc(bc), to_continue(std::move(to_continue)), priority(priority) {}
827
828 void finish(ThreadPool::TPHandle &handle) override {
829 ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
830 for (auto &&i: to_continue) {
831 auto j = bc->pulling.find(i.hoid);
832 ceph_assert(j != bc->pulling.end());
833 ObjectContextRef obc = j->second.obc;
834 bc->clear_pull(j, false /* already did it */);
835 int started = bc->start_pushes(i.hoid, obc, h);
836 if (started < 0) {
837 bc->pushing[i.hoid].clear();
838 bc->get_parent()->on_failed_pull(
839 { bc->get_parent()->whoami_shard() },
840 i.hoid, obc->obs.oi.version);
841 } else if (!started) {
842 bc->get_parent()->on_global_recover(
843 i.hoid, i.stat, false);
844 }
845 handle.reset_tp_timeout();
846 }
847 bc->run_recovery_op(h, priority);
848 }
849
850 /// Estimate total data reads required to perform pushes
851 uint64_t estimate_push_costs() const {
852 uint64_t cost = 0;
853 for (const auto &i: to_continue) {
854 cost += i.stat.num_bytes_recovered;
855 }
856 return cost;
857 }
858 };
859
860 void ReplicatedBackend::_do_pull_response(OpRequestRef op)
861 {
862 auto m = op->get_req<MOSDPGPush>();
863 ceph_assert(m->get_type() == MSG_OSD_PG_PUSH);
864 pg_shard_t from = m->from;
865
866 op->mark_started();
867
868 vector<PullOp> replies(1);
869 if (get_parent()->check_failsafe_full()) {
870 dout(10) << __func__ << " Out of space (failsafe) processing pull response (push)." << dendl;
871 ceph_abort();
872 }
873
874 ObjectStore::Transaction t;
875 list<pull_complete_info> to_continue;
876 for (vector<PushOp>::const_iterator i = m->pushes.begin();
877 i != m->pushes.end();
878 ++i) {
879 bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, &t);
880 if (more)
881 replies.push_back(PullOp());
882 }
883 if (!to_continue.empty()) {
884 C_ReplicatedBackend_OnPullComplete *c =
885 new C_ReplicatedBackend_OnPullComplete(
886 this,
887 m->get_priority(),
888 std::move(to_continue));
889 t.register_on_complete(
890 new PG_RecoveryQueueAsync(
891 get_parent(),
892 get_parent()->bless_unlocked_gencontext(c),
893 std::max<uint64_t>(1, c->estimate_push_costs())));
894 }
895 replies.erase(replies.end() - 1);
896
897 if (replies.size()) {
898 MOSDPGPull *reply = new MOSDPGPull;
899 reply->from = parent->whoami_shard();
900 reply->set_priority(m->get_priority());
901 reply->pgid = get_info().pgid;
902 reply->map_epoch = m->map_epoch;
903 reply->min_epoch = m->min_epoch;
904 reply->set_pulls(std::move(replies));
905 reply->compute_cost(cct);
906
907 t.register_on_complete(
908 new PG_SendMessageOnConn(
909 get_parent(), reply, m->get_connection()));
910 }
911
912 get_parent()->queue_transaction(std::move(t));
913 }
914
915 void ReplicatedBackend::do_pull(OpRequestRef op)
916 {
917 MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_nonconst_req());
918 ceph_assert(m->get_type() == MSG_OSD_PG_PULL);
919 pg_shard_t from = m->from;
920
921 map<pg_shard_t, vector<PushOp> > replies;
922 for (auto& i : m->take_pulls()) {
923 replies[from].push_back(PushOp());
924 handle_pull(from, i, &(replies[from].back()));
925 }
926 send_pushes(m->get_priority(), replies);
927 }
928
929 void ReplicatedBackend::do_push_reply(OpRequestRef op)
930 {
931 auto m = op->get_req<MOSDPGPushReply>();
932 ceph_assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
933 pg_shard_t from = m->from;
934
935 vector<PushOp> replies(1);
936 for (vector<PushReplyOp>::const_iterator i = m->replies.begin();
937 i != m->replies.end();
938 ++i) {
939 bool more = handle_push_reply(from, *i, &(replies.back()));
940 if (more)
941 replies.push_back(PushOp());
942 }
943 replies.erase(replies.end() - 1);
944
945 map<pg_shard_t, vector<PushOp> > _replies;
946 _replies[from].swap(replies);
947 send_pushes(m->get_priority(), _replies);
948 }
949
950 Message * ReplicatedBackend::generate_subop(
951 const hobject_t &soid,
952 const eversion_t &at_version,
953 ceph_tid_t tid,
954 osd_reqid_t reqid,
955 eversion_t pg_trim_to,
956 eversion_t min_last_complete_ondisk,
957 hobject_t new_temp_oid,
958 hobject_t discard_temp_oid,
959 const bufferlist &log_entries,
960 std::optional<pg_hit_set_history_t> &hset_hist,
961 ObjectStore::Transaction &op_t,
962 pg_shard_t peer,
963 const pg_info_t &pinfo)
964 {
965 int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
966 // forward the write/update/whatever
967 MOSDRepOp *wr = new MOSDRepOp(
968 reqid, parent->whoami_shard(),
969 spg_t(get_info().pgid.pgid, peer.shard),
970 soid, acks_wanted,
971 get_osdmap_epoch(),
972 parent->get_last_peering_reset_epoch(),
973 tid, at_version);
974
975 // ship resulting transaction, log entries, and pg_stats
976 if (!parent->should_send_op(peer, soid)) {
977 ObjectStore::Transaction t;
978 encode(t, wr->get_data());
979 } else {
980 encode(op_t, wr->get_data());
981 wr->get_header().data_off = op_t.get_data_alignment();
982 }
983
984 wr->logbl = log_entries;
985
986 if (pinfo.is_incomplete())
987 wr->pg_stats = pinfo.stats; // reflects backfill progress
988 else
989 wr->pg_stats = get_info().stats;
990
991 wr->pg_trim_to = pg_trim_to;
992
993 if (HAVE_FEATURE(parent->min_peer_features(), OSD_REPOP_MLCOD)) {
994 wr->min_last_complete_ondisk = min_last_complete_ondisk;
995 } else {
996 /* Some replicas need this field to be at_version. New replicas
997 * will ignore it */
998 wr->set_rollback_to(at_version);
999 }
1000
1001 wr->new_temp_oid = new_temp_oid;
1002 wr->discard_temp_oid = discard_temp_oid;
1003 wr->updated_hit_set_history = hset_hist;
1004 return wr;
1005 }
1006
1007 void ReplicatedBackend::issue_op(
1008 const hobject_t &soid,
1009 const eversion_t &at_version,
1010 ceph_tid_t tid,
1011 osd_reqid_t reqid,
1012 eversion_t pg_trim_to,
1013 eversion_t min_last_complete_ondisk,
1014 hobject_t new_temp_oid,
1015 hobject_t discard_temp_oid,
1016 const vector<pg_log_entry_t> &log_entries,
1017 std::optional<pg_hit_set_history_t> &hset_hist,
1018 InProgressOp *op,
1019 ObjectStore::Transaction &op_t)
1020 {
1021 if (parent->get_acting_recovery_backfill_shards().size() > 1) {
1022 if (op->op) {
1023 op->op->pg_trace.event("issue replication ops");
1024 ostringstream ss;
1025 set<pg_shard_t> replicas = parent->get_acting_recovery_backfill_shards();
1026 replicas.erase(parent->whoami_shard());
1027 ss << "waiting for subops from " << replicas;
1028 op->op->mark_sub_op_sent(ss.str());
1029 }
1030
1031 // avoid doing the same work in generate_subop
1032 bufferlist logs;
1033 encode(log_entries, logs);
1034
1035 for (const auto& shard : get_parent()->get_acting_recovery_backfill_shards()) {
1036 if (shard == parent->whoami_shard()) continue;
1037 const pg_info_t &pinfo = parent->get_shard_info().find(shard)->second;
1038
1039 Message *wr;
1040 wr = generate_subop(
1041 soid,
1042 at_version,
1043 tid,
1044 reqid,
1045 pg_trim_to,
1046 min_last_complete_ondisk,
1047 new_temp_oid,
1048 discard_temp_oid,
1049 logs,
1050 hset_hist,
1051 op_t,
1052 shard,
1053 pinfo);
1054 if (op->op && op->op->pg_trace)
1055 wr->trace.init("replicated op", nullptr, &op->op->pg_trace);
1056 get_parent()->send_message_osd_cluster(
1057 shard.osd, wr, get_osdmap_epoch());
1058 }
1059 }
1060 }
1061
1062 // sub op modify
1063 void ReplicatedBackend::do_repop(OpRequestRef op)
1064 {
1065 static_cast<MOSDRepOp*>(op->get_nonconst_req())->finish_decode();
1066 auto m = op->get_req<MOSDRepOp>();
1067 int msg_type = m->get_type();
1068 ceph_assert(MSG_OSD_REPOP == msg_type);
1069
1070 const hobject_t& soid = m->poid;
1071
1072 dout(10) << __func__ << " " << soid
1073 << " v " << m->version
1074 << (m->logbl.length() ? " (transaction)" : " (parallel exec")
1075 << " " << m->logbl.length()
1076 << dendl;
1077
1078
1079 // sanity checks
1080 ceph_assert(m->map_epoch >= get_info().history.same_interval_since);
1081
1082 dout(30) << __func__ << " missing before " << get_parent()->get_log().get_missing().get_items() << dendl;
1083 parent->maybe_preempt_replica_scrub(soid);
1084
1085 int ackerosd = m->get_source().num();
1086
1087 op->mark_started();
1088
1089 RepModifyRef rm(std::make_shared<RepModify>());
1090 rm->op = op;
1091 rm->ackerosd = ackerosd;
1092 rm->last_complete = get_info().last_complete;
1093 rm->epoch_started = get_osdmap_epoch();
1094
1095 ceph_assert(m->logbl.length());
1096 // shipped transaction and log entries
1097 vector<pg_log_entry_t> log;
1098
1099 auto p = const_cast<bufferlist&>(m->get_data()).cbegin();
1100 decode(rm->opt, p);
1101
1102 if (m->new_temp_oid != hobject_t()) {
1103 dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
1104 add_temp_obj(m->new_temp_oid);
1105 }
1106 if (m->discard_temp_oid != hobject_t()) {
1107 dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
1108 if (rm->opt.empty()) {
1109 dout(10) << __func__ << ": removing object " << m->discard_temp_oid
1110 << " since we won't get the transaction" << dendl;
1111 rm->localt.remove(coll, ghobject_t(m->discard_temp_oid));
1112 }
1113 clear_temp_obj(m->discard_temp_oid);
1114 }
1115
1116 p = const_cast<bufferlist&>(m->logbl).begin();
1117 decode(log, p);
1118 rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1119
1120 bool update_snaps = false;
1121 if (!rm->opt.empty()) {
1122 // If the opt is non-empty, we infer we are before
1123 // last_backfill (according to the primary, not our
1124 // not-quite-accurate value), and should update the
1125 // collections now. Otherwise, we do it later on push.
1126 update_snaps = true;
1127 }
1128
1129 // flag set to true during async recovery
1130 bool async = false;
1131 pg_missing_tracker_t pmissing = get_parent()->get_local_missing();
1132 if (pmissing.is_missing(soid)) {
1133 async = true;
1134 dout(30) << __func__ << " is_missing " << pmissing.is_missing(soid) << dendl;
1135 for (auto &&e: log) {
1136 dout(30) << " add_next_event entry " << e << dendl;
1137 get_parent()->add_local_next_event(e);
1138 dout(30) << " entry is_delete " << e.is_delete() << dendl;
1139 }
1140 }
1141
1142 parent->update_stats(m->pg_stats);
1143 parent->log_operation(
1144 std::move(log),
1145 m->updated_hit_set_history,
1146 m->pg_trim_to,
1147 m->version, /* Replicated PGs don't have rollback info */
1148 m->min_last_complete_ondisk,
1149 update_snaps,
1150 rm->localt,
1151 async);
1152
1153 rm->opt.register_on_commit(
1154 parent->bless_context(
1155 new C_OSD_RepModifyCommit(this, rm)));
1156 vector<ObjectStore::Transaction> tls;
1157 tls.reserve(2);
1158 tls.push_back(std::move(rm->localt));
1159 tls.push_back(std::move(rm->opt));
1160 parent->queue_transactions(tls, op);
1161 // op is cleaned up by oncommit/onapply when both are executed
1162 dout(30) << __func__ << " missing after" << get_parent()->get_log().get_missing().get_items() << dendl;
1163 }
1164
1165 void ReplicatedBackend::repop_commit(RepModifyRef rm)
1166 {
1167 rm->op->mark_commit_sent();
1168 rm->op->pg_trace.event("sup_op_commit");
1169 rm->committed = true;
1170
1171 // send commit.
1172 auto m = rm->op->get_req<MOSDRepOp>();
1173 ceph_assert(m->get_type() == MSG_OSD_REPOP);
1174 dout(10) << __func__ << " on op " << *m
1175 << ", sending commit to osd." << rm->ackerosd
1176 << dendl;
1177 ceph_assert(get_osdmap()->is_up(rm->ackerosd));
1178
1179 get_parent()->update_last_complete_ondisk(rm->last_complete);
1180
1181 MOSDRepOpReply *reply = new MOSDRepOpReply(
1182 m,
1183 get_parent()->whoami_shard(),
1184 0, get_osdmap_epoch(), m->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
1185 reply->set_last_complete_ondisk(rm->last_complete);
1186 reply->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
1187 reply->trace = rm->op->pg_trace;
1188 get_parent()->send_message_osd_cluster(
1189 rm->ackerosd, reply, get_osdmap_epoch());
1190
1191 log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
1192 }
1193
1194
1195 // ===========================================================
1196
1197 void ReplicatedBackend::calc_head_subsets(
1198 ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
1199 const pg_missing_t& missing,
1200 const hobject_t &last_backfill,
1201 interval_set<uint64_t>& data_subset,
1202 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1203 ObcLockManager &manager)
1204 {
1205 dout(10) << "calc_head_subsets " << head
1206 << " clone_overlap " << snapset.clone_overlap << dendl;
1207
1208 uint64_t size = obc->obs.oi.size;
1209 if (size)
1210 data_subset.insert(0, size);
1211
1212 assert(HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS));
1213 const auto it = missing.get_items().find(head);
1214 assert(it != missing.get_items().end());
1215 data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
1216 dout(10) << "calc_head_subsets " << head
1217 << " data_subset " << data_subset << dendl;
1218
1219 if (get_parent()->get_pool().allow_incomplete_clones()) {
1220 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1221 return;
1222 }
1223
1224 if (!cct->_conf->osd_recover_clone_overlap) {
1225 dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
1226 return;
1227 }
1228
1229
1230 interval_set<uint64_t> cloning;
1231 interval_set<uint64_t> prev;
1232 hobject_t c = head;
1233 if (size)
1234 prev.insert(0, size);
1235
1236 for (int j=snapset.clones.size()-1; j>=0; j--) {
1237 c.snap = snapset.clones[j];
1238 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1239 if (!missing.is_missing(c) &&
1240 c < last_backfill &&
1241 get_parent()->try_lock_for_read(c, manager)) {
1242 dout(10) << "calc_head_subsets " << head << " has prev " << c
1243 << " overlap " << prev << dendl;
1244 cloning = prev;
1245 break;
1246 }
1247 dout(10) << "calc_head_subsets " << head << " does not have prev " << c
1248 << " overlap " << prev << dendl;
1249 }
1250
1251 cloning.intersection_of(data_subset);
1252 if (cloning.empty()) {
1253 dout(10) << "skipping clone, nothing needs to clone" << dendl;
1254 return;
1255 }
1256
1257 if (cloning.num_intervals() > g_conf().get_val<uint64_t>("osd_recover_clone_overlap_limit")) {
1258 dout(10) << "skipping clone, too many holes" << dendl;
1259 get_parent()->release_locks(manager);
1260 clone_subsets.clear();
1261 cloning.clear();
1262 return;
1263 }
1264
1265 // what's left for us to push?
1266 clone_subsets[c] = cloning;
1267 data_subset.subtract(cloning);
1268
1269 dout(10) << "calc_head_subsets " << head
1270 << " data_subset " << data_subset
1271 << " clone_subsets " << clone_subsets << dendl;
1272 }
1273
1274 void ReplicatedBackend::calc_clone_subsets(
1275 SnapSet& snapset, const hobject_t& soid,
1276 const pg_missing_t& missing,
1277 const hobject_t &last_backfill,
1278 interval_set<uint64_t>& data_subset,
1279 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1280 ObcLockManager &manager)
1281 {
1282 dout(10) << "calc_clone_subsets " << soid
1283 << " clone_overlap " << snapset.clone_overlap << dendl;
1284
1285 uint64_t size = snapset.clone_size[soid.snap];
1286 if (size)
1287 data_subset.insert(0, size);
1288
1289 if (get_parent()->get_pool().allow_incomplete_clones()) {
1290 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1291 return;
1292 }
1293
1294 if (!cct->_conf->osd_recover_clone_overlap) {
1295 dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
1296 return;
1297 }
1298
1299 unsigned i;
1300 for (i=0; i < snapset.clones.size(); i++)
1301 if (snapset.clones[i] == soid.snap)
1302 break;
1303
1304 // any overlap with next older clone?
1305 interval_set<uint64_t> cloning;
1306 interval_set<uint64_t> prev;
1307 if (size)
1308 prev.insert(0, size);
1309 for (int j=i-1; j>=0; j--) {
1310 hobject_t c = soid;
1311 c.snap = snapset.clones[j];
1312 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1313 if (!missing.is_missing(c) &&
1314 c < last_backfill &&
1315 get_parent()->try_lock_for_read(c, manager)) {
1316 dout(10) << "calc_clone_subsets " << soid << " has prev " << c
1317 << " overlap " << prev << dendl;
1318 clone_subsets[c] = prev;
1319 cloning.union_of(prev);
1320 break;
1321 }
1322 dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
1323 << " overlap " << prev << dendl;
1324 }
1325
1326 // overlap with next newest?
1327 interval_set<uint64_t> next;
1328 if (size)
1329 next.insert(0, size);
1330 for (unsigned j=i+1; j<snapset.clones.size(); j++) {
1331 hobject_t c = soid;
1332 c.snap = snapset.clones[j];
1333 next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
1334 if (!missing.is_missing(c) &&
1335 c < last_backfill &&
1336 get_parent()->try_lock_for_read(c, manager)) {
1337 dout(10) << "calc_clone_subsets " << soid << " has next " << c
1338 << " overlap " << next << dendl;
1339 clone_subsets[c] = next;
1340 cloning.union_of(next);
1341 break;
1342 }
1343 dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
1344 << " overlap " << next << dendl;
1345 }
1346
1347 if (cloning.num_intervals() > g_conf().get_val<uint64_t>("osd_recover_clone_overlap_limit")) {
1348 dout(10) << "skipping clone, too many holes" << dendl;
1349 get_parent()->release_locks(manager);
1350 clone_subsets.clear();
1351 cloning.clear();
1352 }
1353
1354
1355 // what's left for us to push?
1356 data_subset.subtract(cloning);
1357
1358 dout(10) << "calc_clone_subsets " << soid
1359 << " data_subset " << data_subset
1360 << " clone_subsets " << clone_subsets << dendl;
1361 }
1362
1363 void ReplicatedBackend::prepare_pull(
1364 eversion_t v,
1365 const hobject_t& soid,
1366 ObjectContextRef headctx,
1367 RPGHandle *h)
1368 {
1369 const auto missing_iter = get_parent()->get_local_missing().get_items().find(soid);
1370 ceph_assert(missing_iter != get_parent()->get_local_missing().get_items().end());
1371 eversion_t _v = missing_iter->second.need;
1372 ceph_assert(_v == v);
1373 const map<hobject_t, set<pg_shard_t>> &missing_loc(
1374 get_parent()->get_missing_loc_shards());
1375 const map<pg_shard_t, pg_missing_t > &peer_missing(
1376 get_parent()->get_shard_missing());
1377 map<hobject_t, set<pg_shard_t>>::const_iterator q = missing_loc.find(soid);
1378 ceph_assert(q != missing_loc.end());
1379 ceph_assert(!q->second.empty());
1380
1381 // pick a pullee
1382 auto p = q->second.end();
1383 if (cct->_conf->osd_debug_feed_pullee >= 0) {
1384 for (auto it = q->second.begin(); it != q->second.end(); it++) {
1385 if (it->osd == cct->_conf->osd_debug_feed_pullee) {
1386 p = it;
1387 break;
1388 }
1389 }
1390 }
1391 if (p == q->second.end()) {
1392 // probably because user feed a wrong pullee
1393 p = q->second.begin();
1394 std::advance(p,
1395 ceph::util::generate_random_number<int>(0,
1396 q->second.size() - 1));
1397 }
1398 ceph_assert(get_osdmap()->is_up(p->osd));
1399 pg_shard_t fromshard = *p;
1400
1401 dout(7) << "pull " << soid
1402 << " v " << v
1403 << " on osds " << q->second
1404 << " from osd." << fromshard
1405 << dendl;
1406
1407 ceph_assert(peer_missing.count(fromshard));
1408 const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
1409 if (pmissing.is_missing(soid, v)) {
1410 ceph_assert(pmissing.get_items().find(soid)->second.have != v);
1411 dout(10) << "pulling soid " << soid << " from osd " << fromshard
1412 << " at version " << pmissing.get_items().find(soid)->second.have
1413 << " rather than at version " << v << dendl;
1414 v = pmissing.get_items().find(soid)->second.have;
1415 ceph_assert(get_parent()->get_log().get_log().objects.count(soid) &&
1416 (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
1417 pg_log_entry_t::LOST_REVERT) &&
1418 (get_parent()->get_log().get_log().objects.find(
1419 soid)->second->reverting_to ==
1420 v));
1421 }
1422
1423 ObjectRecoveryInfo recovery_info;
1424 ObcLockManager lock_manager;
1425
1426 if (soid.is_snap()) {
1427 ceph_assert(!get_parent()->get_local_missing().is_missing(soid.get_head()));
1428 ceph_assert(headctx);
1429 // check snapset
1430 SnapSetContext *ssc = headctx->ssc;
1431 ceph_assert(ssc);
1432 dout(10) << " snapset " << ssc->snapset << dendl;
1433 recovery_info.ss = ssc->snapset;
1434 calc_clone_subsets(
1435 ssc->snapset, soid, get_parent()->get_local_missing(),
1436 get_info().last_backfill,
1437 recovery_info.copy_subset,
1438 recovery_info.clone_subset,
1439 lock_manager);
1440 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1441 dout(10) << " pulling " << recovery_info << dendl;
1442
1443 ceph_assert(ssc->snapset.clone_size.count(soid.snap));
1444 recovery_info.size = ssc->snapset.clone_size[soid.snap];
1445 recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
1446 } else {
1447 // pulling head or unversioned object.
1448 // always pull the whole thing.
1449 recovery_info.copy_subset.insert(0, (uint64_t)-1);
1450 assert(HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS));
1451 recovery_info.copy_subset.intersection_of(missing_iter->second.clean_regions.get_dirty_regions());
1452 recovery_info.size = ((uint64_t)-1);
1453 recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
1454 }
1455
1456 h->pulls[fromshard].push_back(PullOp());
1457 PullOp &op = h->pulls[fromshard].back();
1458 op.soid = soid;
1459
1460 op.recovery_info = recovery_info;
1461 op.recovery_info.soid = soid;
1462 op.recovery_info.version = v;
1463 op.recovery_progress.data_complete = false;
1464 op.recovery_progress.omap_complete = !missing_iter->second.clean_regions.omap_is_dirty();
1465 op.recovery_progress.data_recovered_to = 0;
1466 op.recovery_progress.first = true;
1467
1468 ceph_assert(!pulling.count(soid));
1469 pull_from_peer[fromshard].insert(soid);
1470 PullInfo &pi = pulling[soid];
1471 pi.from = fromshard;
1472 pi.soid = soid;
1473 pi.head_ctx = headctx;
1474 pi.recovery_info = op.recovery_info;
1475 pi.recovery_progress = op.recovery_progress;
1476 pi.cache_dont_need = h->cache_dont_need;
1477 pi.lock_manager = std::move(lock_manager);
1478 }
1479
1480 /*
1481 * intelligently push an object to a replica. make use of existing
1482 * clones/heads and dup data ranges where possible.
1483 */
1484 int ReplicatedBackend::prep_push_to_replica(
1485 ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
1486 PushOp *pop, bool cache_dont_need)
1487 {
1488 const object_info_t& oi = obc->obs.oi;
1489 uint64_t size = obc->obs.oi.size;
1490
1491 dout(10) << __func__ << ": " << soid << " v" << oi.version
1492 << " size " << size << " to osd." << peer << dendl;
1493
1494 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1495 interval_set<uint64_t> data_subset;
1496
1497 ObcLockManager lock_manager;
1498 // are we doing a clone on the replica?
1499 if (soid.snap && soid.snap < CEPH_NOSNAP) {
1500 hobject_t head = soid;
1501 head.snap = CEPH_NOSNAP;
1502
1503 // try to base push off of clones that succeed/preceed poid
1504 // we need the head (and current SnapSet) locally to do that.
1505 if (get_parent()->get_local_missing().is_missing(head)) {
1506 dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
1507 return prep_push(obc, soid, peer, pop, cache_dont_need);
1508 }
1509
1510 SnapSetContext *ssc = obc->ssc;
1511 ceph_assert(ssc);
1512 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1513 pop->recovery_info.ss = ssc->snapset;
1514 map<pg_shard_t, pg_missing_t>::const_iterator pm =
1515 get_parent()->get_shard_missing().find(peer);
1516 ceph_assert(pm != get_parent()->get_shard_missing().end());
1517 map<pg_shard_t, pg_info_t>::const_iterator pi =
1518 get_parent()->get_shard_info().find(peer);
1519 ceph_assert(pi != get_parent()->get_shard_info().end());
1520 calc_clone_subsets(
1521 ssc->snapset, soid,
1522 pm->second,
1523 pi->second.last_backfill,
1524 data_subset, clone_subsets,
1525 lock_manager);
1526 } else if (soid.snap == CEPH_NOSNAP) {
1527 // pushing head or unversioned object.
1528 // base this on partially on replica's clones?
1529 SnapSetContext *ssc = obc->ssc;
1530 ceph_assert(ssc);
1531 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1532 calc_head_subsets(
1533 obc,
1534 ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
1535 get_parent()->get_shard_info().find(peer)->second.last_backfill,
1536 data_subset, clone_subsets,
1537 lock_manager);
1538 }
1539
1540 return prep_push(
1541 obc,
1542 soid,
1543 peer,
1544 oi.version,
1545 data_subset,
1546 clone_subsets,
1547 pop,
1548 cache_dont_need,
1549 std::move(lock_manager));
1550 }
1551
1552 int ReplicatedBackend::prep_push(ObjectContextRef obc,
1553 const hobject_t& soid, pg_shard_t peer,
1554 PushOp *pop, bool cache_dont_need)
1555 {
1556 interval_set<uint64_t> data_subset;
1557 if (obc->obs.oi.size)
1558 data_subset.insert(0, obc->obs.oi.size);
1559 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1560
1561 return prep_push(obc, soid, peer,
1562 obc->obs.oi.version, data_subset, clone_subsets,
1563 pop, cache_dont_need, ObcLockManager());
1564 }
1565
1566 int ReplicatedBackend::prep_push(
1567 ObjectContextRef obc,
1568 const hobject_t& soid, pg_shard_t peer,
1569 eversion_t version,
1570 interval_set<uint64_t> &data_subset,
1571 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1572 PushOp *pop,
1573 bool cache_dont_need,
1574 ObcLockManager &&lock_manager)
1575 {
1576 get_parent()->begin_peer_recover(peer, soid);
1577 const auto pmissing_iter = get_parent()->get_shard_missing().find(peer);
1578 const auto missing_iter = pmissing_iter->second.get_items().find(soid);
1579 assert(missing_iter != pmissing_iter->second.get_items().end());
1580 // take note.
1581 PushInfo &pi = pushing[soid][peer];
1582 pi.obc = obc;
1583 pi.recovery_info.size = obc->obs.oi.size;
1584 pi.recovery_info.copy_subset = data_subset;
1585 pi.recovery_info.clone_subset = clone_subsets;
1586 pi.recovery_info.soid = soid;
1587 pi.recovery_info.oi = obc->obs.oi;
1588 pi.recovery_info.ss = pop->recovery_info.ss;
1589 pi.recovery_info.version = version;
1590 pi.recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
1591 pi.recovery_progress.omap_complete = !missing_iter->second.clean_regions.omap_is_dirty();
1592 pi.lock_manager = std::move(lock_manager);
1593
1594 ObjectRecoveryProgress new_progress;
1595 int r = build_push_op(pi.recovery_info,
1596 pi.recovery_progress,
1597 &new_progress,
1598 pop,
1599 &(pi.stat), cache_dont_need);
1600 if (r < 0)
1601 return r;
1602 pi.recovery_progress = new_progress;
1603 return 0;
1604 }
1605
1606 void ReplicatedBackend::submit_push_data(
1607 const ObjectRecoveryInfo &recovery_info,
1608 bool first,
1609 bool complete,
1610 bool clear_omap,
1611 bool cache_dont_need,
1612 interval_set<uint64_t> &data_zeros,
1613 const interval_set<uint64_t> &intervals_included,
1614 bufferlist data_included,
1615 bufferlist omap_header,
1616 const map<string, bufferlist, less<>> &attrs,
1617 const map<string, bufferlist> &omap_entries,
1618 ObjectStore::Transaction *t)
1619 {
1620 hobject_t target_oid;
1621 if (first && complete) {
1622 target_oid = recovery_info.soid;
1623 } else {
1624 target_oid = get_parent()->get_temp_recovery_object(recovery_info.soid,
1625 recovery_info.version);
1626 if (first) {
1627 dout(10) << __func__ << ": Adding oid "
1628 << target_oid << " in the temp collection" << dendl;
1629 add_temp_obj(target_oid);
1630 }
1631 }
1632
1633 if (first) {
1634 if (!complete) {
1635 t->remove(coll, ghobject_t(target_oid));
1636 t->touch(coll, ghobject_t(target_oid));
1637 object_info_t oi(attrs.at(OI_ATTR));
1638 t->set_alloc_hint(coll, ghobject_t(target_oid),
1639 oi.expected_object_size,
1640 oi.expected_write_size,
1641 oi.alloc_hint_flags);
1642 } else {
1643 if (!recovery_info.object_exist) {
1644 t->remove(coll, ghobject_t(target_oid));
1645 t->touch(coll, ghobject_t(target_oid));
1646 object_info_t oi(attrs.at(OI_ATTR));
1647 t->set_alloc_hint(coll, ghobject_t(target_oid),
1648 oi.expected_object_size,
1649 oi.expected_write_size,
1650 oi.alloc_hint_flags);
1651 }
1652 //remove xattr and update later if overwrite on original object
1653 t->rmattrs(coll, ghobject_t(target_oid));
1654 //if need update omap, clear the previous content first
1655 if (clear_omap)
1656 t->omap_clear(coll, ghobject_t(target_oid));
1657 }
1658
1659 t->truncate(coll, ghobject_t(target_oid), recovery_info.size);
1660 if (omap_header.length())
1661 t->omap_setheader(coll, ghobject_t(target_oid), omap_header);
1662
1663 struct stat st;
1664 int r = store->stat(ch, ghobject_t(recovery_info.soid), &st);
1665 if (get_parent()->pg_is_remote_backfilling()) {
1666 uint64_t size = 0;
1667 if (r == 0)
1668 size = st.st_size;
1669 // Don't need to do anything if object is still the same size
1670 if (size != recovery_info.oi.size) {
1671 get_parent()->pg_add_local_num_bytes((int64_t)recovery_info.oi.size - (int64_t)size);
1672 get_parent()->pg_add_num_bytes((int64_t)recovery_info.oi.size - (int64_t)size);
1673 dout(10) << __func__ << " " << recovery_info.soid
1674 << " backfill size " << recovery_info.oi.size
1675 << " previous size " << size
1676 << " net size " << recovery_info.oi.size - size
1677 << dendl;
1678 }
1679 }
1680 if (!complete) {
1681 //clone overlap content in local object
1682 if (recovery_info.object_exist) {
1683 assert(r == 0);
1684 uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
1685 interval_set<uint64_t> local_intervals_included, local_intervals_excluded;
1686 if (local_size) {
1687 local_intervals_included.insert(0, local_size);
1688 local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset);
1689 local_intervals_included.subtract(local_intervals_excluded);
1690 }
1691 for (interval_set<uint64_t>::const_iterator q = local_intervals_included.begin();
1692 q != local_intervals_included.end();
1693 ++q) {
1694 dout(15) << " clone_range " << recovery_info.soid << " "
1695 << q.get_start() << "~" << q.get_len() << dendl;
1696 t->clone_range(coll, ghobject_t(recovery_info.soid), ghobject_t(target_oid),
1697 q.get_start(), q.get_len(), q.get_start());
1698 }
1699 }
1700 }
1701 }
1702 uint64_t off = 0;
1703 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
1704 if (cache_dont_need)
1705 fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
1706 // Punch zeros for data, if fiemap indicates nothing but it is marked dirty
1707 if (data_zeros.size() > 0) {
1708 data_zeros.intersection_of(recovery_info.copy_subset);
1709 assert(intervals_included.subset_of(data_zeros));
1710 data_zeros.subtract(intervals_included);
1711
1712 dout(20) << __func__ <<" recovering object " << recovery_info.soid
1713 << " copy_subset: " << recovery_info.copy_subset
1714 << " intervals_included: " << intervals_included
1715 << " data_zeros: " << data_zeros << dendl;
1716
1717 for (auto p = data_zeros.begin(); p != data_zeros.end(); ++p)
1718 t->zero(coll, ghobject_t(target_oid), p.get_start(), p.get_len());
1719 }
1720 for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
1721 p != intervals_included.end();
1722 ++p) {
1723 bufferlist bit;
1724 bit.substr_of(data_included, off, p.get_len());
1725 t->write(coll, ghobject_t(target_oid),
1726 p.get_start(), p.get_len(), bit, fadvise_flags);
1727 off += p.get_len();
1728 }
1729
1730 if (!omap_entries.empty())
1731 t->omap_setkeys(coll, ghobject_t(target_oid), omap_entries);
1732 if (!attrs.empty())
1733 t->setattrs(coll, ghobject_t(target_oid), attrs);
1734
1735 if (complete) {
1736 if (!first) {
1737 dout(10) << __func__ << ": Removing oid "
1738 << target_oid << " from the temp collection" << dendl;
1739 clear_temp_obj(target_oid);
1740 t->remove(coll, ghobject_t(recovery_info.soid));
1741 t->collection_move_rename(coll, ghobject_t(target_oid),
1742 coll, ghobject_t(recovery_info.soid));
1743 }
1744
1745 submit_push_complete(recovery_info, t);
1746
1747 }
1748 }
1749
1750 void ReplicatedBackend::submit_push_complete(
1751 const ObjectRecoveryInfo &recovery_info,
1752 ObjectStore::Transaction *t)
1753 {
1754 for (map<hobject_t, interval_set<uint64_t>>::const_iterator p =
1755 recovery_info.clone_subset.begin();
1756 p != recovery_info.clone_subset.end();
1757 ++p) {
1758 for (interval_set<uint64_t>::const_iterator q = p->second.begin();
1759 q != p->second.end();
1760 ++q) {
1761 dout(15) << " clone_range " << p->first << " "
1762 << q.get_start() << "~" << q.get_len() << dendl;
1763 t->clone_range(coll, ghobject_t(p->first), ghobject_t(recovery_info.soid),
1764 q.get_start(), q.get_len(), q.get_start());
1765 }
1766 }
1767 }
1768
1769 ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
1770 const ObjectRecoveryInfo& recovery_info,
1771 SnapSetContext *ssc,
1772 ObcLockManager &manager)
1773 {
1774 if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
1775 return recovery_info;
1776 ObjectRecoveryInfo new_info = recovery_info;
1777 new_info.copy_subset.clear();
1778 new_info.clone_subset.clear();
1779 ceph_assert(ssc);
1780 get_parent()->release_locks(manager); // might already have locks
1781 calc_clone_subsets(
1782 ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
1783 get_info().last_backfill,
1784 new_info.copy_subset, new_info.clone_subset,
1785 manager);
1786 return new_info;
1787 }
1788
1789 bool ReplicatedBackend::handle_pull_response(
1790 pg_shard_t from, const PushOp &pop, PullOp *response,
1791 list<pull_complete_info> *to_continue,
1792 ObjectStore::Transaction *t)
1793 {
1794 interval_set<uint64_t> data_included = pop.data_included;
1795 bufferlist data;
1796 data = pop.data;
1797 dout(10) << "handle_pull_response "
1798 << pop.recovery_info
1799 << pop.after_progress
1800 << " data.size() is " << data.length()
1801 << " data_included: " << data_included
1802 << dendl;
1803 if (pop.version == eversion_t()) {
1804 // replica doesn't have it!
1805 _failed_pull(from, pop.soid);
1806 return false;
1807 }
1808
1809 const hobject_t &hoid = pop.soid;
1810 ceph_assert((data_included.empty() && data.length() == 0) ||
1811 (!data_included.empty() && data.length() > 0));
1812
1813 auto piter = pulling.find(hoid);
1814 if (piter == pulling.end()) {
1815 return false;
1816 }
1817
1818 PullInfo &pi = piter->second;
1819 if (pi.recovery_info.size == (uint64_t(-1))) {
1820 pi.recovery_info.size = pop.recovery_info.size;
1821 pi.recovery_info.copy_subset.intersection_of(
1822 pop.recovery_info.copy_subset);
1823 }
1824 // If primary doesn't have object info and didn't know version
1825 if (pi.recovery_info.version == eversion_t()) {
1826 pi.recovery_info.version = pop.version;
1827 }
1828
1829 bool first = pi.recovery_progress.first;
1830 if (first) {
1831 // attrs only reference the origin bufferlist (decode from
1832 // MOSDPGPush message) whose size is much greater than attrs in
1833 // recovery. If obc cache it (get_obc maybe cache the attr), this
1834 // causes the whole origin bufferlist would not be free until obc
1835 // is evicted from obc cache. So rebuild the bufferlists before
1836 // cache it.
1837 auto attrset = pop.attrset;
1838 for (auto& a : attrset) {
1839 a.second.rebuild();
1840 }
1841 pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset);
1842 if (attrset.find(SS_ATTR) != attrset.end()) {
1843 bufferlist ssbv = attrset.at(SS_ATTR);
1844 SnapSet ss(ssbv);
1845 assert(!pi.obc->ssc->exists || ss.seq == pi.obc->ssc->snapset.seq);
1846 }
1847 pi.recovery_info.oi = pi.obc->obs.oi;
1848 pi.recovery_info = recalc_subsets(
1849 pi.recovery_info,
1850 pi.obc->ssc,
1851 pi.lock_manager);
1852 }
1853
1854
1855 interval_set<uint64_t> usable_intervals;
1856 bufferlist usable_data;
1857 trim_pushed_data(pi.recovery_info.copy_subset,
1858 data_included,
1859 data,
1860 &usable_intervals,
1861 &usable_data);
1862 data_included = usable_intervals;
1863 data = std::move(usable_data);
1864
1865
1866 pi.recovery_progress = pop.after_progress;
1867
1868 dout(10) << "new recovery_info " << pi.recovery_info
1869 << ", new progress " << pi.recovery_progress
1870 << dendl;
1871 interval_set<uint64_t> data_zeros;
1872 uint64_t z_offset = pop.before_progress.data_recovered_to;
1873 uint64_t z_length = pop.after_progress.data_recovered_to - pop.before_progress.data_recovered_to;
1874 if (z_length)
1875 data_zeros.insert(z_offset, z_length);
1876 bool complete = pi.is_complete();
1877 bool clear_omap = !pop.before_progress.omap_complete;
1878
1879 submit_push_data(pi.recovery_info,
1880 first,
1881 complete,
1882 clear_omap,
1883 pi.cache_dont_need,
1884 data_zeros,
1885 data_included,
1886 data,
1887 pop.omap_header,
1888 pop.attrset,
1889 pop.omap_entries,
1890 t);
1891
1892 pi.stat.num_keys_recovered += pop.omap_entries.size();
1893 pi.stat.num_bytes_recovered += data.length();
1894 get_parent()->get_logger()->inc(l_osd_rbytes, pop.omap_entries.size() + data.length());
1895
1896 if (complete) {
1897 pi.stat.num_objects_recovered++;
1898 // XXX: This could overcount if regular recovery is needed right after a repair
1899 if (get_parent()->pg_is_repair()) {
1900 pi.stat.num_objects_repaired++;
1901 get_parent()->inc_osd_stat_repaired();
1902 }
1903 clear_pull_from(piter);
1904 to_continue->push_back({hoid, pi.stat});
1905 get_parent()->on_local_recover(
1906 hoid, pi.recovery_info, pi.obc, false, t);
1907 return false;
1908 } else {
1909 response->soid = pop.soid;
1910 response->recovery_info = pi.recovery_info;
1911 response->recovery_progress = pi.recovery_progress;
1912 return true;
1913 }
1914 }
1915
1916 void ReplicatedBackend::handle_push(
1917 pg_shard_t from, const PushOp &pop, PushReplyOp *response,
1918 ObjectStore::Transaction *t, bool is_repair)
1919 {
1920 dout(10) << "handle_push "
1921 << pop.recovery_info
1922 << pop.after_progress
1923 << dendl;
1924 bufferlist data;
1925 data = pop.data;
1926 bool first = pop.before_progress.first;
1927 bool complete = pop.after_progress.data_complete &&
1928 pop.after_progress.omap_complete;
1929 bool clear_omap = !pop.before_progress.omap_complete;
1930 interval_set<uint64_t> data_zeros;
1931 uint64_t z_offset = pop.before_progress.data_recovered_to;
1932 uint64_t z_length = pop.after_progress.data_recovered_to - pop.before_progress.data_recovered_to;
1933 if (z_length)
1934 data_zeros.insert(z_offset, z_length);
1935 response->soid = pop.recovery_info.soid;
1936
1937 submit_push_data(pop.recovery_info,
1938 first,
1939 complete,
1940 clear_omap,
1941 true, // must be replicate
1942 data_zeros,
1943 pop.data_included,
1944 data,
1945 pop.omap_header,
1946 pop.attrset,
1947 pop.omap_entries,
1948 t);
1949
1950 if (complete) {
1951 if (is_repair) {
1952 get_parent()->inc_osd_stat_repaired();
1953 dout(20) << __func__ << " repair complete" << dendl;
1954 }
1955 get_parent()->on_local_recover(
1956 pop.recovery_info.soid,
1957 pop.recovery_info,
1958 ObjectContextRef(), // ok, is replica
1959 false,
1960 t);
1961 }
1962 }
1963
1964 void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
1965 {
1966 for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
1967 i != pushes.end();
1968 ++i) {
1969 ConnectionRef con = get_parent()->get_con_osd_cluster(
1970 i->first.osd,
1971 get_osdmap_epoch());
1972 if (!con)
1973 continue;
1974 vector<PushOp>::iterator j = i->second.begin();
1975 while (j != i->second.end()) {
1976 uint64_t cost = 0;
1977 uint64_t pushes = 0;
1978 MOSDPGPush *msg = new MOSDPGPush();
1979 msg->from = get_parent()->whoami_shard();
1980 msg->pgid = get_parent()->primary_spg_t();
1981 msg->map_epoch = get_osdmap_epoch();
1982 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1983 msg->set_priority(prio);
1984 msg->is_repair = get_parent()->pg_is_repair();
1985 for (;
1986 (j != i->second.end() &&
1987 cost < cct->_conf->osd_max_push_cost &&
1988 pushes < cct->_conf->osd_max_push_objects) ;
1989 ++j) {
1990 dout(20) << __func__ << ": sending push " << *j
1991 << " to osd." << i->first << dendl;
1992 cost += j->cost(cct);
1993 pushes += 1;
1994 msg->pushes.push_back(*j);
1995 }
1996 msg->set_cost(cost);
1997 get_parent()->send_message_osd_cluster(msg, con);
1998 }
1999 }
2000 }
2001
2002 void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
2003 {
2004 for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
2005 i != pulls.end();
2006 ++i) {
2007 ConnectionRef con = get_parent()->get_con_osd_cluster(
2008 i->first.osd,
2009 get_osdmap_epoch());
2010 if (!con)
2011 continue;
2012 dout(20) << __func__ << ": sending pulls " << i->second
2013 << " to osd." << i->first << dendl;
2014 MOSDPGPull *msg = new MOSDPGPull();
2015 msg->from = parent->whoami_shard();
2016 msg->set_priority(prio);
2017 msg->pgid = get_parent()->primary_spg_t();
2018 msg->map_epoch = get_osdmap_epoch();
2019 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
2020 msg->set_pulls(std::move(i->second));
2021 msg->compute_cost(cct);
2022 get_parent()->send_message_osd_cluster(msg, con);
2023 }
2024 }
2025
2026 int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
2027 const ObjectRecoveryProgress &progress,
2028 ObjectRecoveryProgress *out_progress,
2029 PushOp *out_op,
2030 object_stat_sum_t *stat,
2031 bool cache_dont_need)
2032 {
2033 ObjectRecoveryProgress _new_progress;
2034 if (!out_progress)
2035 out_progress = &_new_progress;
2036 ObjectRecoveryProgress &new_progress = *out_progress;
2037 new_progress = progress;
2038
2039 dout(7) << __func__ << " " << recovery_info.soid
2040 << " v " << recovery_info.version
2041 << " size " << recovery_info.size
2042 << " recovery_info: " << recovery_info
2043 << dendl;
2044
2045 eversion_t v = recovery_info.version;
2046 object_info_t oi;
2047 if (progress.first) {
2048 int r = store->omap_get_header(ch, ghobject_t(recovery_info.soid), &out_op->omap_header);
2049 if (r < 0) {
2050 dout(1) << __func__ << " get omap header failed: " << cpp_strerror(-r) << dendl;
2051 return r;
2052 }
2053 r = store->getattrs(ch, ghobject_t(recovery_info.soid), out_op->attrset);
2054 if (r < 0) {
2055 dout(1) << __func__ << " getattrs failed: " << cpp_strerror(-r) << dendl;
2056 return r;
2057 }
2058
2059 // Debug
2060 try {
2061 oi.decode(out_op->attrset[OI_ATTR]);
2062 } catch (...) {
2063 dout(0) << __func__ << ": bad object_info_t: " << recovery_info.soid << dendl;
2064 return -EINVAL;
2065 }
2066
2067 // If requestor didn't know the version, use ours
2068 if (v == eversion_t()) {
2069 v = oi.version;
2070 } else if (oi.version != v) {
2071 get_parent()->clog_error() << get_info().pgid << " push "
2072 << recovery_info.soid << " v "
2073 << recovery_info.version
2074 << " failed because local copy is "
2075 << oi.version;
2076 return -EINVAL;
2077 }
2078
2079 new_progress.first = false;
2080 }
2081 // Once we provide the version subsequent requests will have it, so
2082 // at this point it must be known.
2083 ceph_assert(v != eversion_t());
2084
2085 uint64_t available = cct->_conf->osd_recovery_max_chunk;
2086 if (!progress.omap_complete) {
2087 ObjectMap::ObjectMapIterator iter =
2088 store->get_omap_iterator(ch,
2089 ghobject_t(recovery_info.soid));
2090 ceph_assert(iter);
2091 for (iter->lower_bound(progress.omap_recovered_to);
2092 iter->valid();
2093 iter->next()) {
2094 if (!out_op->omap_entries.empty() &&
2095 ((cct->_conf->osd_recovery_max_omap_entries_per_chunk > 0 &&
2096 out_op->omap_entries.size() >= cct->_conf->osd_recovery_max_omap_entries_per_chunk) ||
2097 available <= iter->key().size() + iter->value().length()))
2098 break;
2099 out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
2100
2101 if ((iter->key().size() + iter->value().length()) <= available)
2102 available -= (iter->key().size() + iter->value().length());
2103 else
2104 available = 0;
2105 }
2106 if (!iter->valid())
2107 new_progress.omap_complete = true;
2108 else
2109 new_progress.omap_recovered_to = iter->key();
2110 }
2111
2112 if (available > 0) {
2113 if (!recovery_info.copy_subset.empty()) {
2114 interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
2115 map<uint64_t, uint64_t> m;
2116 int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0,
2117 copy_subset.range_end(), m);
2118 if (r >= 0) {
2119 interval_set<uint64_t> fiemap_included(std::move(m));
2120 copy_subset.intersection_of(fiemap_included);
2121 } else {
2122 // intersection of copy_subset and empty interval_set would be empty anyway
2123 copy_subset.clear();
2124 }
2125
2126 out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
2127 available);
2128 // zero filled section, skip to end!
2129 if (out_op->data_included.empty() ||
2130 out_op->data_included.range_end() == copy_subset.range_end())
2131 new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
2132 else
2133 new_progress.data_recovered_to = out_op->data_included.range_end();
2134 }
2135 } else {
2136 out_op->data_included.clear();
2137 }
2138
2139 auto origin_size = out_op->data_included.size();
2140 bufferlist bit;
2141 int r = store->readv(ch, ghobject_t(recovery_info.soid),
2142 out_op->data_included, bit,
2143 cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
2144 if (cct->_conf->osd_debug_random_push_read_error &&
2145 (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) {
2146 dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
2147 r = -EIO;
2148 }
2149 if (r < 0) {
2150 return r;
2151 }
2152 if (out_op->data_included.size() != origin_size) {
2153 dout(10) << __func__ << " some extents get pruned "
2154 << out_op->data_included.size() << "/" << origin_size
2155 << dendl;
2156 new_progress.data_complete = true;
2157 }
2158 out_op->data.claim_append(bit);
2159 if (progress.first && !out_op->data_included.empty() &&
2160 out_op->data_included.begin().get_start() == 0 &&
2161 out_op->data.length() == oi.size && oi.is_data_digest()) {
2162 uint32_t crc = out_op->data.crc32c(-1);
2163 if (oi.data_digest != crc) {
2164 dout(0) << __func__ << " " << coll << std::hex
2165 << " full-object read crc 0x" << crc
2166 << " != expected 0x" << oi.data_digest
2167 << std::dec << " on " << recovery_info.soid << dendl;
2168 return -EIO;
2169 }
2170 }
2171
2172 if (new_progress.is_complete(recovery_info)) {
2173 new_progress.data_complete = true;
2174 if (stat) {
2175 stat->num_objects_recovered++;
2176 if (get_parent()->pg_is_repair())
2177 stat->num_objects_repaired++;
2178 }
2179 } else if (progress.first && progress.omap_complete) {
2180 // If omap is not changed, we need recovery omap when recovery cannot be completed once
2181 new_progress.omap_complete = false;
2182 }
2183
2184 if (stat) {
2185 stat->num_keys_recovered += out_op->omap_entries.size();
2186 stat->num_bytes_recovered += out_op->data.length();
2187 get_parent()->get_logger()->inc(l_osd_rbytes, out_op->omap_entries.size() + out_op->data.length());
2188 }
2189
2190 get_parent()->get_logger()->inc(l_osd_push);
2191 get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
2192
2193 // send
2194 out_op->version = v;
2195 out_op->soid = recovery_info.soid;
2196 out_op->recovery_info = recovery_info;
2197 out_op->after_progress = new_progress;
2198 out_op->before_progress = progress;
2199 return 0;
2200 }
2201
2202 void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
2203 {
2204 op->recovery_info.version = eversion_t();
2205 op->version = eversion_t();
2206 op->soid = soid;
2207 }
2208
2209 bool ReplicatedBackend::handle_push_reply(
2210 pg_shard_t peer, const PushReplyOp &op, PushOp *reply)
2211 {
2212 const hobject_t &soid = op.soid;
2213 if (pushing.count(soid) == 0) {
2214 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2215 << ", or anybody else"
2216 << dendl;
2217 return false;
2218 } else if (pushing[soid].count(peer) == 0) {
2219 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2220 << dendl;
2221 return false;
2222 } else {
2223 PushInfo *pi = &pushing[soid][peer];
2224 bool error = pushing[soid].begin()->second.recovery_progress.error;
2225
2226 if (!pi->recovery_progress.data_complete && !error) {
2227 dout(10) << " pushing more from, "
2228 << pi->recovery_progress.data_recovered_to
2229 << " of " << pi->recovery_info.copy_subset << dendl;
2230 ObjectRecoveryProgress new_progress;
2231 int r = build_push_op(
2232 pi->recovery_info,
2233 pi->recovery_progress, &new_progress, reply,
2234 &(pi->stat));
2235 // Handle the case of a read error right after we wrote, which is
2236 // hopefully extremely rare.
2237 if (r < 0) {
2238 dout(5) << __func__ << ": oid " << soid << " error " << r << dendl;
2239
2240 error = true;
2241 goto done;
2242 }
2243 pi->recovery_progress = new_progress;
2244 return true;
2245 } else {
2246 // done!
2247 done:
2248 if (!error)
2249 get_parent()->on_peer_recover( peer, soid, pi->recovery_info);
2250
2251 get_parent()->release_locks(pi->lock_manager);
2252 object_stat_sum_t stat = pi->stat;
2253 eversion_t v = pi->recovery_info.version;
2254 pushing[soid].erase(peer);
2255 pi = NULL;
2256
2257 if (pushing[soid].empty()) {
2258 if (!error)
2259 get_parent()->on_global_recover(soid, stat, false);
2260 else
2261 get_parent()->on_failed_pull(
2262 std::set<pg_shard_t>{ get_parent()->whoami_shard() },
2263 soid,
2264 v);
2265 pushing.erase(soid);
2266 } else {
2267 // This looks weird, but we erased the current peer and need to remember
2268 // the error on any other one, while getting more acks.
2269 if (error)
2270 pushing[soid].begin()->second.recovery_progress.error = true;
2271 dout(10) << "pushed " << soid << ", still waiting for push ack from "
2272 << pushing[soid].size() << " others" << dendl;
2273 }
2274 return false;
2275 }
2276 }
2277 }
2278
2279 void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
2280 {
2281 const hobject_t &soid = op.soid;
2282 struct stat st;
2283 int r = store->stat(ch, ghobject_t(soid), &st);
2284 if (r != 0) {
2285 get_parent()->clog_error() << get_info().pgid << " "
2286 << peer << " tried to pull " << soid
2287 << " but got " << cpp_strerror(-r);
2288 prep_push_op_blank(soid, reply);
2289 } else {
2290 ObjectRecoveryInfo &recovery_info = op.recovery_info;
2291 ObjectRecoveryProgress &progress = op.recovery_progress;
2292 if (progress.first && recovery_info.size == ((uint64_t)-1)) {
2293 // Adjust size and copy_subset
2294 recovery_info.size = st.st_size;
2295 if (st.st_size) {
2296 interval_set<uint64_t> object_range;
2297 object_range.insert(0, st.st_size);
2298 recovery_info.copy_subset.intersection_of(object_range);
2299 } else {
2300 recovery_info.copy_subset.clear();
2301 }
2302 assert(recovery_info.clone_subset.empty());
2303 }
2304
2305 r = build_push_op(recovery_info, progress, 0, reply);
2306 if (r < 0)
2307 prep_push_op_blank(soid, reply);
2308 }
2309 }
2310
2311 /**
2312 * trim received data to remove what we don't want
2313 *
2314 * @param copy_subset intervals we want
2315 * @param data_included intervals we got
2316 * @param data_recieved data we got
2317 * @param intervals_usable intervals we want to keep
2318 * @param data_usable matching data we want to keep
2319 */
2320 void ReplicatedBackend::trim_pushed_data(
2321 const interval_set<uint64_t> &copy_subset,
2322 const interval_set<uint64_t> &intervals_received,
2323 bufferlist data_received,
2324 interval_set<uint64_t> *intervals_usable,
2325 bufferlist *data_usable)
2326 {
2327 if (intervals_received.subset_of(copy_subset)) {
2328 *intervals_usable = intervals_received;
2329 *data_usable = data_received;
2330 return;
2331 }
2332
2333 intervals_usable->intersection_of(copy_subset,
2334 intervals_received);
2335
2336 uint64_t off = 0;
2337 for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
2338 p != intervals_received.end();
2339 ++p) {
2340 interval_set<uint64_t> x;
2341 x.insert(p.get_start(), p.get_len());
2342 x.intersection_of(copy_subset);
2343 for (interval_set<uint64_t>::const_iterator q = x.begin();
2344 q != x.end();
2345 ++q) {
2346 bufferlist sub;
2347 uint64_t data_off = off + (q.get_start() - p.get_start());
2348 sub.substr_of(data_received, data_off, q.get_len());
2349 data_usable->claim_append(sub);
2350 }
2351 off += p.get_len();
2352 }
2353 }
2354
2355 void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid)
2356 {
2357 dout(20) << __func__ << ": " << soid << " from " << from << dendl;
2358 auto it = pulling.find(soid);
2359 assert(it != pulling.end());
2360 get_parent()->on_failed_pull(
2361 { from },
2362 soid,
2363 it->second.recovery_info.version);
2364
2365 clear_pull(it);
2366 }
2367
2368 void ReplicatedBackend::clear_pull_from(
2369 map<hobject_t, PullInfo>::iterator piter)
2370 {
2371 auto from = piter->second.from;
2372 pull_from_peer[from].erase(piter->second.soid);
2373 if (pull_from_peer[from].empty())
2374 pull_from_peer.erase(from);
2375 }
2376
2377 void ReplicatedBackend::clear_pull(
2378 map<hobject_t, PullInfo>::iterator piter,
2379 bool clear_pull_from_peer)
2380 {
2381 if (clear_pull_from_peer) {
2382 clear_pull_from(piter);
2383 }
2384 get_parent()->release_locks(piter->second.lock_manager);
2385 pulling.erase(piter);
2386 }
2387
2388 int ReplicatedBackend::start_pushes(
2389 const hobject_t &soid,
2390 ObjectContextRef obc,
2391 RPGHandle *h)
2392 {
2393 list< map<pg_shard_t, pg_missing_t>::const_iterator > shards;
2394
2395 dout(20) << __func__ << " soid " << soid << dendl;
2396 // who needs it?
2397 ceph_assert(get_parent()->get_acting_recovery_backfill_shards().size() > 0);
2398 for (set<pg_shard_t>::iterator i =
2399 get_parent()->get_acting_recovery_backfill_shards().begin();
2400 i != get_parent()->get_acting_recovery_backfill_shards().end();
2401 ++i) {
2402 if (*i == get_parent()->whoami_shard()) continue;
2403 pg_shard_t peer = *i;
2404 map<pg_shard_t, pg_missing_t>::const_iterator j =
2405 get_parent()->get_shard_missing().find(peer);
2406 ceph_assert(j != get_parent()->get_shard_missing().end());
2407 if (j->second.is_missing(soid)) {
2408 shards.push_back(j);
2409 }
2410 }
2411
2412 // If more than 1 read will occur ignore possible request to not cache
2413 bool cache = shards.size() == 1 ? h->cache_dont_need : false;
2414
2415 for (auto j : shards) {
2416 pg_shard_t peer = j->first;
2417 h->pushes[peer].push_back(PushOp());
2418 int r = prep_push_to_replica(obc, soid, peer,
2419 &(h->pushes[peer].back()), cache);
2420 if (r < 0) {
2421 // Back out all failed reads
2422 for (auto k : shards) {
2423 pg_shard_t p = k->first;
2424 dout(10) << __func__ << " clean up peer " << p << dendl;
2425 h->pushes[p].pop_back();
2426 if (p == peer) break;
2427 }
2428 return r;
2429 }
2430 }
2431 return shards.size();
2432 }