]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/ReplicatedBackend.cc
import ceph 14.2.5
[ceph.git] / ceph / src / osd / ReplicatedBackend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #include "common/errno.h"
15 #include "ReplicatedBackend.h"
16 #include "messages/MOSDOp.h"
17 #include "messages/MOSDRepOp.h"
18 #include "messages/MOSDRepOpReply.h"
19 #include "messages/MOSDPGPush.h"
20 #include "messages/MOSDPGPull.h"
21 #include "messages/MOSDPGPushReply.h"
22 #include "common/EventTrace.h"
23 #include "include/random.h"
24 #include "include/util.h"
25 #include "OSD.h"
26
27 #define dout_context cct
28 #define dout_subsys ceph_subsys_osd
29 #define DOUT_PREFIX_ARGS this
30 #undef dout_prefix
31 #define dout_prefix _prefix(_dout, this)
32 static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
33 return pgb->get_parent()->gen_dbg_prefix(*_dout);
34 }
35
36 namespace {
37 class PG_SendMessageOnConn: public Context {
38 PGBackend::Listener *pg;
39 Message *reply;
40 ConnectionRef conn;
41 public:
42 PG_SendMessageOnConn(
43 PGBackend::Listener *pg,
44 Message *reply,
45 ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {}
46 void finish(int) override {
47 pg->send_message_osd_cluster(reply, conn.get());
48 }
49 };
50
51 class PG_RecoveryQueueAsync : public Context {
52 PGBackend::Listener *pg;
53 unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
54 public:
55 PG_RecoveryQueueAsync(
56 PGBackend::Listener *pg,
57 GenContext<ThreadPool::TPHandle&> *c) : pg(pg), c(c) {}
58 void finish(int) override {
59 pg->schedule_recovery_work(c.release());
60 }
61 };
62 }
63
64 struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context {
65 ReplicatedBackend *pg;
66 RepModifyRef rm;
67 C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
68 : pg(pg), rm(r) {}
69 void finish(int r) override {
70 pg->repop_commit(rm);
71 }
72 };
73
74 static void log_subop_stats(
75 PerfCounters *logger,
76 OpRequestRef op, int subop)
77 {
78 utime_t now = ceph_clock_now();
79 utime_t latency = now;
80 latency -= op->get_req()->get_recv_stamp();
81
82
83 logger->inc(l_osd_sop);
84 logger->tinc(l_osd_sop_lat, latency);
85 logger->inc(subop);
86
87 if (subop != l_osd_sop_pull) {
88 uint64_t inb = op->get_req()->get_data().length();
89 logger->inc(l_osd_sop_inb, inb);
90 if (subop == l_osd_sop_w) {
91 logger->inc(l_osd_sop_w_inb, inb);
92 logger->tinc(l_osd_sop_w_lat, latency);
93 } else if (subop == l_osd_sop_push) {
94 logger->inc(l_osd_sop_push_inb, inb);
95 logger->tinc(l_osd_sop_push_lat, latency);
96 } else
97 ceph_abort_msg("no support subop");
98 } else {
99 logger->tinc(l_osd_sop_pull_lat, latency);
100 }
101 }
102
103 ReplicatedBackend::ReplicatedBackend(
104 PGBackend::Listener *pg,
105 const coll_t &coll,
106 ObjectStore::CollectionHandle &c,
107 ObjectStore *store,
108 CephContext *cct) :
109 PGBackend(cct, pg, store, coll, c) {}
110
111 void ReplicatedBackend::run_recovery_op(
112 PGBackend::RecoveryHandle *_h,
113 int priority)
114 {
115 RPGHandle *h = static_cast<RPGHandle *>(_h);
116 send_pushes(priority, h->pushes);
117 send_pulls(priority, h->pulls);
118 send_recovery_deletes(priority, h->deletes);
119 delete h;
120 }
121
122 int ReplicatedBackend::recover_object(
123 const hobject_t &hoid,
124 eversion_t v,
125 ObjectContextRef head,
126 ObjectContextRef obc,
127 RecoveryHandle *_h
128 )
129 {
130 dout(10) << __func__ << ": " << hoid << dendl;
131 RPGHandle *h = static_cast<RPGHandle *>(_h);
132 if (get_parent()->get_local_missing().is_missing(hoid)) {
133 ceph_assert(!obc);
134 // pull
135 prepare_pull(
136 v,
137 hoid,
138 head,
139 h);
140 } else {
141 ceph_assert(obc);
142 int started = start_pushes(
143 hoid,
144 obc,
145 h);
146 if (started < 0) {
147 pushing[hoid].clear();
148 return started;
149 }
150 }
151 return 0;
152 }
153
154 void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
155 {
156 for(map<pg_shard_t, set<hobject_t> >::iterator i = pull_from_peer.begin();
157 i != pull_from_peer.end();
158 ) {
159 if (osdmap->is_down(i->first.osd)) {
160 dout(10) << "check_recovery_sources resetting pulls from osd." << i->first
161 << ", osdmap has it marked down" << dendl;
162 for (set<hobject_t>::iterator j = i->second.begin();
163 j != i->second.end();
164 ++j) {
165 get_parent()->cancel_pull(*j);
166 clear_pull(pulling.find(*j), false);
167 }
168 pull_from_peer.erase(i++);
169 } else {
170 ++i;
171 }
172 }
173 }
174
175 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op)
176 {
177 dout(10) << __func__ << ": " << op << dendl;
178 switch (op->get_req()->get_type()) {
179 case MSG_OSD_PG_PULL:
180 return true;
181 default:
182 return false;
183 }
184 }
185
186 bool ReplicatedBackend::_handle_message(
187 OpRequestRef op
188 )
189 {
190 dout(10) << __func__ << ": " << op << dendl;
191 switch (op->get_req()->get_type()) {
192 case MSG_OSD_PG_PUSH:
193 do_push(op);
194 return true;
195
196 case MSG_OSD_PG_PULL:
197 do_pull(op);
198 return true;
199
200 case MSG_OSD_PG_PUSH_REPLY:
201 do_push_reply(op);
202 return true;
203
204 case MSG_OSD_REPOP: {
205 do_repop(op);
206 return true;
207 }
208
209 case MSG_OSD_REPOPREPLY: {
210 do_repop_reply(op);
211 return true;
212 }
213
214 default:
215 break;
216 }
217 return false;
218 }
219
220 void ReplicatedBackend::clear_recovery_state()
221 {
222 // clear pushing/pulling maps
223 for (auto &&i: pushing) {
224 for (auto &&j: i.second) {
225 get_parent()->release_locks(j.second.lock_manager);
226 }
227 }
228 pushing.clear();
229
230 for (auto &&i: pulling) {
231 get_parent()->release_locks(i.second.lock_manager);
232 }
233 pulling.clear();
234 pull_from_peer.clear();
235 }
236
237 void ReplicatedBackend::on_change()
238 {
239 dout(10) << __func__ << dendl;
240 for (auto& op : in_progress_ops) {
241 delete op.second->on_commit;
242 op.second->on_commit = nullptr;
243 }
244 in_progress_ops.clear();
245 clear_recovery_state();
246 }
247
248 int ReplicatedBackend::objects_read_sync(
249 const hobject_t &hoid,
250 uint64_t off,
251 uint64_t len,
252 uint32_t op_flags,
253 bufferlist *bl)
254 {
255 return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags);
256 }
257
258 void ReplicatedBackend::objects_read_async(
259 const hobject_t &hoid,
260 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
261 pair<bufferlist*, Context*> > > &to_read,
262 Context *on_complete,
263 bool fast_read)
264 {
265 ceph_abort_msg("async read is not used by replica pool");
266 }
267
268 class C_OSD_OnOpCommit : public Context {
269 ReplicatedBackend *pg;
270 ReplicatedBackend::InProgressOpRef op;
271 public:
272 C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
273 : pg(pg), op(op) {}
274 void finish(int) override {
275 pg->op_commit(op);
276 }
277 };
278
279 void generate_transaction(
280 PGTransactionUPtr &pgt,
281 const coll_t &coll,
282 vector<pg_log_entry_t> &log_entries,
283 ObjectStore::Transaction *t,
284 set<hobject_t> *added,
285 set<hobject_t> *removed)
286 {
287 ceph_assert(t);
288 ceph_assert(added);
289 ceph_assert(removed);
290
291 for (auto &&le: log_entries) {
292 le.mark_unrollbackable();
293 auto oiter = pgt->op_map.find(le.soid);
294 if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) {
295 bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8);
296 encode(oiter->second.updated_snaps->second, bl);
297 le.snaps.swap(bl);
298 le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
299 }
300 }
301
302 pgt->safe_create_traverse(
303 [&](pair<const hobject_t, PGTransaction::ObjectOperation> &obj_op) {
304 const hobject_t &oid = obj_op.first;
305 const ghobject_t goid =
306 ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
307 const PGTransaction::ObjectOperation &op = obj_op.second;
308
309 if (oid.is_temp()) {
310 if (op.is_fresh_object()) {
311 added->insert(oid);
312 } else if (op.is_delete()) {
313 removed->insert(oid);
314 }
315 }
316
317 if (op.delete_first) {
318 t->remove(coll, goid);
319 }
320
321 match(
322 op.init_type,
323 [&](const PGTransaction::ObjectOperation::Init::None &) {
324 },
325 [&](const PGTransaction::ObjectOperation::Init::Create &op) {
326 t->touch(coll, goid);
327 },
328 [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
329 t->clone(
330 coll,
331 ghobject_t(
332 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
333 goid);
334 },
335 [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
336 ceph_assert(op.source.is_temp());
337 t->collection_move_rename(
338 coll,
339 ghobject_t(
340 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
341 coll,
342 goid);
343 });
344
345 if (op.truncate) {
346 t->truncate(coll, goid, op.truncate->first);
347 if (op.truncate->first != op.truncate->second)
348 t->truncate(coll, goid, op.truncate->second);
349 }
350
351 if (!op.attr_updates.empty()) {
352 map<string, bufferlist> attrs;
353 for (auto &&p: op.attr_updates) {
354 if (p.second)
355 attrs[p.first] = *(p.second);
356 else
357 t->rmattr(coll, goid, p.first);
358 }
359 t->setattrs(coll, goid, attrs);
360 }
361
362 if (op.clear_omap)
363 t->omap_clear(coll, goid);
364 if (op.omap_header)
365 t->omap_setheader(coll, goid, *(op.omap_header));
366
367 for (auto &&up: op.omap_updates) {
368 using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType;
369 switch (up.first) {
370 case UpdateType::Remove:
371 t->omap_rmkeys(coll, goid, up.second);
372 break;
373 case UpdateType::Insert:
374 t->omap_setkeys(coll, goid, up.second);
375 break;
376 }
377 }
378
379 // updated_snaps doesn't matter since we marked unrollbackable
380
381 if (op.alloc_hint) {
382 auto &hint = *(op.alloc_hint);
383 t->set_alloc_hint(
384 coll,
385 goid,
386 hint.expected_object_size,
387 hint.expected_write_size,
388 hint.flags);
389 }
390
391 for (auto &&extent: op.buffer_updates) {
392 using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
393 match(
394 extent.get_val(),
395 [&](const BufferUpdate::Write &op) {
396 t->write(
397 coll,
398 goid,
399 extent.get_off(),
400 extent.get_len(),
401 op.buffer);
402 },
403 [&](const BufferUpdate::Zero &op) {
404 t->zero(
405 coll,
406 goid,
407 extent.get_off(),
408 extent.get_len());
409 },
410 [&](const BufferUpdate::CloneRange &op) {
411 ceph_assert(op.len == extent.get_len());
412 t->clone_range(
413 coll,
414 ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
415 goid,
416 op.offset,
417 extent.get_len(),
418 extent.get_off());
419 });
420 }
421 });
422 }
423
424 void ReplicatedBackend::submit_transaction(
425 const hobject_t &soid,
426 const object_stat_sum_t &delta_stats,
427 const eversion_t &at_version,
428 PGTransactionUPtr &&_t,
429 const eversion_t &trim_to,
430 const eversion_t &roll_forward_to,
431 const vector<pg_log_entry_t> &_log_entries,
432 boost::optional<pg_hit_set_history_t> &hset_history,
433 Context *on_all_commit,
434 ceph_tid_t tid,
435 osd_reqid_t reqid,
436 OpRequestRef orig_op)
437 {
438 parent->apply_stats(
439 soid,
440 delta_stats);
441
442 vector<pg_log_entry_t> log_entries(_log_entries);
443 ObjectStore::Transaction op_t;
444 PGTransactionUPtr t(std::move(_t));
445 set<hobject_t> added, removed;
446 generate_transaction(
447 t,
448 coll,
449 log_entries,
450 &op_t,
451 &added,
452 &removed);
453 ceph_assert(added.size() <= 1);
454 ceph_assert(removed.size() <= 1);
455
456 auto insert_res = in_progress_ops.insert(
457 make_pair(
458 tid,
459 new InProgressOp(
460 tid, on_all_commit,
461 orig_op, at_version)
462 )
463 );
464 ceph_assert(insert_res.second);
465 InProgressOp &op = *insert_res.first->second;
466
467 op.waiting_for_commit.insert(
468 parent->get_acting_recovery_backfill_shards().begin(),
469 parent->get_acting_recovery_backfill_shards().end());
470
471 issue_op(
472 soid,
473 at_version,
474 tid,
475 reqid,
476 trim_to,
477 at_version,
478 added.size() ? *(added.begin()) : hobject_t(),
479 removed.size() ? *(removed.begin()) : hobject_t(),
480 log_entries,
481 hset_history,
482 &op,
483 op_t);
484
485 add_temp_objs(added);
486 clear_temp_objs(removed);
487
488 parent->log_operation(
489 log_entries,
490 hset_history,
491 trim_to,
492 at_version,
493 true,
494 op_t);
495
496 op_t.register_on_commit(
497 parent->bless_context(
498 new C_OSD_OnOpCommit(this, &op)));
499
500 vector<ObjectStore::Transaction> tls;
501 tls.push_back(std::move(op_t));
502
503 parent->queue_transactions(tls, op.op);
504 if (at_version != eversion_t()) {
505 parent->op_applied(at_version);
506 }
507 }
508
509 void ReplicatedBackend::op_commit(
510 InProgressOpRef& op)
511 {
512 if (op->on_commit == nullptr) {
513 // aborted
514 return;
515 }
516
517 FUNCTRACE(cct);
518 OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_COMMIT_BEGIN", true);
519 dout(10) << __func__ << ": " << op->tid << dendl;
520 if (op->op) {
521 op->op->mark_event("op_commit");
522 op->op->pg_trace.event("op commit");
523 }
524
525 op->waiting_for_commit.erase(get_parent()->whoami_shard());
526
527 if (op->waiting_for_commit.empty()) {
528 op->on_commit->complete(0);
529 op->on_commit = 0;
530 in_progress_ops.erase(op->tid);
531 }
532 }
533
534 void ReplicatedBackend::do_repop_reply(OpRequestRef op)
535 {
536 static_cast<MOSDRepOpReply*>(op->get_nonconst_req())->finish_decode();
537 const MOSDRepOpReply *r = static_cast<const MOSDRepOpReply *>(op->get_req());
538 ceph_assert(r->get_header().type == MSG_OSD_REPOPREPLY);
539
540 op->mark_started();
541
542 // must be replication.
543 ceph_tid_t rep_tid = r->get_tid();
544 pg_shard_t from = r->from;
545
546 auto iter = in_progress_ops.find(rep_tid);
547 if (iter != in_progress_ops.end()) {
548 InProgressOp &ip_op = *iter->second;
549 const MOSDOp *m = NULL;
550 if (ip_op.op)
551 m = static_cast<const MOSDOp *>(ip_op.op->get_req());
552
553 if (m)
554 dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
555 << " ack_type " << (int)r->ack_type
556 << " from " << from
557 << dendl;
558 else
559 dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) "
560 << " ack_type " << (int)r->ack_type
561 << " from " << from
562 << dendl;
563
564 // oh, good.
565
566 if (r->ack_type & CEPH_OSD_FLAG_ONDISK) {
567 ceph_assert(ip_op.waiting_for_commit.count(from));
568 ip_op.waiting_for_commit.erase(from);
569 if (ip_op.op) {
570 ip_op.op->mark_event("sub_op_commit_rec");
571 ip_op.op->pg_trace.event("sub_op_commit_rec");
572 }
573 } else {
574 // legacy peer; ignore
575 }
576
577 parent->update_peer_last_complete_ondisk(
578 from,
579 r->get_last_complete_ondisk());
580
581 if (ip_op.waiting_for_commit.empty() &&
582 ip_op.on_commit) {
583 ip_op.on_commit->complete(0);
584 ip_op.on_commit = 0;
585 in_progress_ops.erase(iter);
586 }
587 }
588 }
589
590 int ReplicatedBackend::be_deep_scrub(
591 const hobject_t &poid,
592 ScrubMap &map,
593 ScrubMapBuilder &pos,
594 ScrubMap::object &o)
595 {
596 dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
597 int r;
598 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
599 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED |
600 CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE;
601
602 utime_t sleeptime;
603 sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
604 if (sleeptime != utime_t()) {
605 lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
606 sleeptime.sleep();
607 }
608
609 ceph_assert(poid == pos.ls[pos.pos]);
610 if (!pos.data_done()) {
611 if (pos.data_pos == 0) {
612 pos.data_hash = bufferhash(-1);
613 }
614
615 bufferlist bl;
616 r = store->read(
617 ch,
618 ghobject_t(
619 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
620 pos.data_pos,
621 cct->_conf->osd_deep_scrub_stride, bl,
622 fadvise_flags);
623 if (r < 0) {
624 dout(20) << __func__ << " " << poid << " got "
625 << r << " on read, read_error" << dendl;
626 o.read_error = true;
627 return 0;
628 }
629 if (r > 0) {
630 pos.data_hash << bl;
631 }
632 pos.data_pos += r;
633 if (r == cct->_conf->osd_deep_scrub_stride) {
634 dout(20) << __func__ << " " << poid << " more data, digest so far 0x"
635 << std::hex << pos.data_hash.digest() << std::dec << dendl;
636 return -EINPROGRESS;
637 }
638 // done with bytes
639 pos.data_pos = -1;
640 o.digest = pos.data_hash.digest();
641 o.digest_present = true;
642 dout(20) << __func__ << " " << poid << " done with data, digest 0x"
643 << std::hex << o.digest << std::dec << dendl;
644 }
645
646 // omap header
647 if (pos.omap_pos.empty()) {
648 pos.omap_hash = bufferhash(-1);
649
650 bufferlist hdrbl;
651 r = store->omap_get_header(
652 ch,
653 ghobject_t(
654 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
655 &hdrbl, true);
656 if (r == -EIO) {
657 dout(20) << __func__ << " " << poid << " got "
658 << r << " on omap header read, read_error" << dendl;
659 o.read_error = true;
660 return 0;
661 }
662 if (r == 0 && hdrbl.length()) {
663 bool encoded = false;
664 dout(25) << "CRC header " << cleanbin(hdrbl, encoded, true) << dendl;
665 pos.omap_hash << hdrbl;
666 }
667 }
668
669 // omap
670 ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(
671 ch,
672 ghobject_t(
673 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
674 ceph_assert(iter);
675 if (pos.omap_pos.length()) {
676 iter->lower_bound(pos.omap_pos);
677 } else {
678 iter->seek_to_first();
679 }
680 int max = g_conf()->osd_deep_scrub_keys;
681 while (iter->status() == 0 && iter->valid()) {
682 pos.omap_bytes += iter->value().length();
683 ++pos.omap_keys;
684 --max;
685 // fixme: we can do this more efficiently.
686 bufferlist bl;
687 encode(iter->key(), bl);
688 encode(iter->value(), bl);
689 pos.omap_hash << bl;
690
691 iter->next();
692
693 if (iter->valid() && max == 0) {
694 pos.omap_pos = iter->key();
695 return -EINPROGRESS;
696 }
697 if (iter->status() < 0) {
698 dout(25) << __func__ << " " << poid
699 << " on omap scan, db status error" << dendl;
700 o.read_error = true;
701 return 0;
702 }
703 }
704
705 if (pos.omap_keys > cct->_conf->
706 osd_deep_scrub_large_omap_object_key_threshold ||
707 pos.omap_bytes > cct->_conf->
708 osd_deep_scrub_large_omap_object_value_sum_threshold) {
709 dout(25) << __func__ << " " << poid
710 << " large omap object detected. Object has " << pos.omap_keys
711 << " keys and size " << pos.omap_bytes << " bytes" << dendl;
712 o.large_omap_object_found = true;
713 o.large_omap_object_key_count = pos.omap_keys;
714 o.large_omap_object_value_size = pos.omap_bytes;
715 map.has_large_omap_object_errors = true;
716 }
717
718 o.omap_digest = pos.omap_hash.digest();
719 o.omap_digest_present = true;
720 dout(20) << __func__ << " done with " << poid << " omap_digest "
721 << std::hex << o.omap_digest << std::dec << dendl;
722
723 // Sum up omap usage
724 if (pos.omap_keys > 0 || pos.omap_bytes > 0) {
725 dout(25) << __func__ << " adding " << pos.omap_keys << " keys and "
726 << pos.omap_bytes << " bytes to pg_stats sums" << dendl;
727 map.has_omap_keys = true;
728 o.object_omap_bytes = pos.omap_bytes;
729 o.object_omap_keys = pos.omap_keys;
730 }
731
732 // done!
733 return 0;
734 }
735
736 void ReplicatedBackend::_do_push(OpRequestRef op)
737 {
738 const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
739 ceph_assert(m->get_type() == MSG_OSD_PG_PUSH);
740 pg_shard_t from = m->from;
741
742 op->mark_started();
743
744 vector<PushReplyOp> replies;
745 ObjectStore::Transaction t;
746 if (get_parent()->check_failsafe_full()) {
747 dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl;
748 ceph_abort();
749 }
750 for (vector<PushOp>::const_iterator i = m->pushes.begin();
751 i != m->pushes.end();
752 ++i) {
753 replies.push_back(PushReplyOp());
754 handle_push(from, *i, &(replies.back()), &t, m->is_repair);
755 }
756
757 MOSDPGPushReply *reply = new MOSDPGPushReply;
758 reply->from = get_parent()->whoami_shard();
759 reply->set_priority(m->get_priority());
760 reply->pgid = get_info().pgid;
761 reply->map_epoch = m->map_epoch;
762 reply->min_epoch = m->min_epoch;
763 reply->replies.swap(replies);
764 reply->compute_cost(cct);
765
766 t.register_on_complete(
767 new PG_SendMessageOnConn(
768 get_parent(), reply, m->get_connection()));
769
770 get_parent()->queue_transaction(std::move(t));
771 }
772
773 struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
774 ReplicatedBackend *bc;
775 list<ReplicatedBackend::pull_complete_info> to_continue;
776 int priority;
777 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
778 : bc(bc), priority(priority) {}
779
780 void finish(ThreadPool::TPHandle &handle) override {
781 ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
782 for (auto &&i: to_continue) {
783 auto j = bc->pulling.find(i.hoid);
784 ceph_assert(j != bc->pulling.end());
785 ObjectContextRef obc = j->second.obc;
786 bc->clear_pull(j, false /* already did it */);
787 int started = bc->start_pushes(i.hoid, obc, h);
788 if (started < 0) {
789 bc->pushing[i.hoid].clear();
790 bc->get_parent()->primary_failed(i.hoid);
791 bc->get_parent()->primary_error(i.hoid, obc->obs.oi.version);
792 } else if (!started) {
793 bc->get_parent()->on_global_recover(
794 i.hoid, i.stat, false);
795 }
796 handle.reset_tp_timeout();
797 }
798 bc->run_recovery_op(h, priority);
799 }
800 };
801
802 void ReplicatedBackend::_do_pull_response(OpRequestRef op)
803 {
804 const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
805 ceph_assert(m->get_type() == MSG_OSD_PG_PUSH);
806 pg_shard_t from = m->from;
807
808 op->mark_started();
809
810 vector<PullOp> replies(1);
811 if (get_parent()->check_failsafe_full()) {
812 dout(10) << __func__ << " Out of space (failsafe) processing pull response (push)." << dendl;
813 ceph_abort();
814 }
815
816 ObjectStore::Transaction t;
817 list<pull_complete_info> to_continue;
818 for (vector<PushOp>::const_iterator i = m->pushes.begin();
819 i != m->pushes.end();
820 ++i) {
821 bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, &t);
822 if (more)
823 replies.push_back(PullOp());
824 }
825 if (!to_continue.empty()) {
826 C_ReplicatedBackend_OnPullComplete *c =
827 new C_ReplicatedBackend_OnPullComplete(
828 this,
829 m->get_priority());
830 c->to_continue.swap(to_continue);
831 t.register_on_complete(
832 new PG_RecoveryQueueAsync(
833 get_parent(),
834 get_parent()->bless_unlocked_gencontext(c)));
835 }
836 replies.erase(replies.end() - 1);
837
838 if (replies.size()) {
839 MOSDPGPull *reply = new MOSDPGPull;
840 reply->from = parent->whoami_shard();
841 reply->set_priority(m->get_priority());
842 reply->pgid = get_info().pgid;
843 reply->map_epoch = m->map_epoch;
844 reply->min_epoch = m->min_epoch;
845 reply->set_pulls(&replies);
846 reply->compute_cost(cct);
847
848 t.register_on_complete(
849 new PG_SendMessageOnConn(
850 get_parent(), reply, m->get_connection()));
851 }
852
853 get_parent()->queue_transaction(std::move(t));
854 }
855
856 void ReplicatedBackend::do_pull(OpRequestRef op)
857 {
858 MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_nonconst_req());
859 ceph_assert(m->get_type() == MSG_OSD_PG_PULL);
860 pg_shard_t from = m->from;
861
862 map<pg_shard_t, vector<PushOp> > replies;
863 vector<PullOp> pulls;
864 m->take_pulls(&pulls);
865 for (auto& i : pulls) {
866 replies[from].push_back(PushOp());
867 handle_pull(from, i, &(replies[from].back()));
868 }
869 send_pushes(m->get_priority(), replies);
870 }
871
872 void ReplicatedBackend::do_push_reply(OpRequestRef op)
873 {
874 const MOSDPGPushReply *m = static_cast<const MOSDPGPushReply *>(op->get_req());
875 ceph_assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
876 pg_shard_t from = m->from;
877
878 vector<PushOp> replies(1);
879 for (vector<PushReplyOp>::const_iterator i = m->replies.begin();
880 i != m->replies.end();
881 ++i) {
882 bool more = handle_push_reply(from, *i, &(replies.back()));
883 if (more)
884 replies.push_back(PushOp());
885 }
886 replies.erase(replies.end() - 1);
887
888 map<pg_shard_t, vector<PushOp> > _replies;
889 _replies[from].swap(replies);
890 send_pushes(m->get_priority(), _replies);
891 }
892
893 Message * ReplicatedBackend::generate_subop(
894 const hobject_t &soid,
895 const eversion_t &at_version,
896 ceph_tid_t tid,
897 osd_reqid_t reqid,
898 eversion_t pg_trim_to,
899 eversion_t pg_roll_forward_to,
900 hobject_t new_temp_oid,
901 hobject_t discard_temp_oid,
902 const bufferlist &log_entries,
903 boost::optional<pg_hit_set_history_t> &hset_hist,
904 ObjectStore::Transaction &op_t,
905 pg_shard_t peer,
906 const pg_info_t &pinfo)
907 {
908 int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
909 // forward the write/update/whatever
910 MOSDRepOp *wr = new MOSDRepOp(
911 reqid, parent->whoami_shard(),
912 spg_t(get_info().pgid.pgid, peer.shard),
913 soid, acks_wanted,
914 get_osdmap_epoch(),
915 parent->get_last_peering_reset_epoch(),
916 tid, at_version);
917
918 // ship resulting transaction, log entries, and pg_stats
919 if (!parent->should_send_op(peer, soid)) {
920 ObjectStore::Transaction t;
921 encode(t, wr->get_data());
922 } else {
923 encode(op_t, wr->get_data());
924 wr->get_header().data_off = op_t.get_data_alignment();
925 }
926
927 wr->logbl = log_entries;
928
929 if (pinfo.is_incomplete())
930 wr->pg_stats = pinfo.stats; // reflects backfill progress
931 else
932 wr->pg_stats = get_info().stats;
933
934 wr->pg_trim_to = pg_trim_to;
935 wr->pg_roll_forward_to = pg_roll_forward_to;
936
937 wr->new_temp_oid = new_temp_oid;
938 wr->discard_temp_oid = discard_temp_oid;
939 wr->updated_hit_set_history = hset_hist;
940 return wr;
941 }
942
943 void ReplicatedBackend::issue_op(
944 const hobject_t &soid,
945 const eversion_t &at_version,
946 ceph_tid_t tid,
947 osd_reqid_t reqid,
948 eversion_t pg_trim_to,
949 eversion_t pg_roll_forward_to,
950 hobject_t new_temp_oid,
951 hobject_t discard_temp_oid,
952 const vector<pg_log_entry_t> &log_entries,
953 boost::optional<pg_hit_set_history_t> &hset_hist,
954 InProgressOp *op,
955 ObjectStore::Transaction &op_t)
956 {
957 if (parent->get_acting_recovery_backfill_shards().size() > 1) {
958 if (op->op) {
959 op->op->pg_trace.event("issue replication ops");
960 ostringstream ss;
961 set<pg_shard_t> replicas = parent->get_acting_recovery_backfill_shards();
962 replicas.erase(parent->whoami_shard());
963 ss << "waiting for subops from " << replicas;
964 op->op->mark_sub_op_sent(ss.str());
965 }
966
967 // avoid doing the same work in generate_subop
968 bufferlist logs;
969 encode(log_entries, logs);
970
971 for (const auto& shard : get_parent()->get_acting_recovery_backfill_shards()) {
972 if (shard == parent->whoami_shard()) continue;
973 const pg_info_t &pinfo = parent->get_shard_info().find(shard)->second;
974
975 Message *wr;
976 wr = generate_subop(
977 soid,
978 at_version,
979 tid,
980 reqid,
981 pg_trim_to,
982 pg_roll_forward_to,
983 new_temp_oid,
984 discard_temp_oid,
985 logs,
986 hset_hist,
987 op_t,
988 shard,
989 pinfo);
990 if (op->op && op->op->pg_trace)
991 wr->trace.init("replicated op", nullptr, &op->op->pg_trace);
992 get_parent()->send_message_osd_cluster(
993 shard.osd, wr, get_osdmap_epoch());
994 }
995 }
996 }
997
998 // sub op modify
999 void ReplicatedBackend::do_repop(OpRequestRef op)
1000 {
1001 static_cast<MOSDRepOp*>(op->get_nonconst_req())->finish_decode();
1002 const MOSDRepOp *m = static_cast<const MOSDRepOp *>(op->get_req());
1003 int msg_type = m->get_type();
1004 ceph_assert(MSG_OSD_REPOP == msg_type);
1005
1006 const hobject_t& soid = m->poid;
1007
1008 dout(10) << __func__ << " " << soid
1009 << " v " << m->version
1010 << (m->logbl.length() ? " (transaction)" : " (parallel exec")
1011 << " " << m->logbl.length()
1012 << dendl;
1013
1014 // sanity checks
1015 ceph_assert(m->map_epoch >= get_info().history.same_interval_since);
1016
1017 dout(30) << __func__ << " missing before " << get_parent()->get_log().get_missing().get_items() << dendl;
1018 parent->maybe_preempt_replica_scrub(soid);
1019
1020 int ackerosd = m->get_source().num();
1021
1022 op->mark_started();
1023
1024 RepModifyRef rm(std::make_shared<RepModify>());
1025 rm->op = op;
1026 rm->ackerosd = ackerosd;
1027 rm->last_complete = get_info().last_complete;
1028 rm->epoch_started = get_osdmap_epoch();
1029
1030 ceph_assert(m->logbl.length());
1031 // shipped transaction and log entries
1032 vector<pg_log_entry_t> log;
1033
1034 auto p = const_cast<bufferlist&>(m->get_data()).cbegin();
1035 decode(rm->opt, p);
1036
1037 if (m->new_temp_oid != hobject_t()) {
1038 dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
1039 add_temp_obj(m->new_temp_oid);
1040 }
1041 if (m->discard_temp_oid != hobject_t()) {
1042 dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
1043 if (rm->opt.empty()) {
1044 dout(10) << __func__ << ": removing object " << m->discard_temp_oid
1045 << " since we won't get the transaction" << dendl;
1046 rm->localt.remove(coll, ghobject_t(m->discard_temp_oid));
1047 }
1048 clear_temp_obj(m->discard_temp_oid);
1049 }
1050
1051 p = const_cast<bufferlist&>(m->logbl).begin();
1052 decode(log, p);
1053 rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1054
1055 bool update_snaps = false;
1056 if (!rm->opt.empty()) {
1057 // If the opt is non-empty, we infer we are before
1058 // last_backfill (according to the primary, not our
1059 // not-quite-accurate value), and should update the
1060 // collections now. Otherwise, we do it later on push.
1061 update_snaps = true;
1062 }
1063
1064 // flag set to true during async recovery
1065 bool async = false;
1066 pg_missing_tracker_t pmissing = get_parent()->get_local_missing();
1067 if (pmissing.is_missing(soid)) {
1068 async = true;
1069 dout(30) << __func__ << " is_missing " << pmissing.is_missing(soid) << dendl;
1070 for (auto &&e: log) {
1071 dout(30) << " add_next_event entry " << e << dendl;
1072 get_parent()->add_local_next_event(e);
1073 dout(30) << " entry is_delete " << e.is_delete() << dendl;
1074 }
1075 }
1076
1077 parent->update_stats(m->pg_stats);
1078 parent->log_operation(
1079 log,
1080 m->updated_hit_set_history,
1081 m->pg_trim_to,
1082 m->pg_roll_forward_to,
1083 update_snaps,
1084 rm->localt,
1085 async);
1086
1087 rm->opt.register_on_commit(
1088 parent->bless_context(
1089 new C_OSD_RepModifyCommit(this, rm)));
1090 vector<ObjectStore::Transaction> tls;
1091 tls.reserve(2);
1092 tls.push_back(std::move(rm->localt));
1093 tls.push_back(std::move(rm->opt));
1094 parent->queue_transactions(tls, op);
1095 // op is cleaned up by oncommit/onapply when both are executed
1096 dout(30) << __func__ << " missing after" << get_parent()->get_log().get_missing().get_items() << dendl;
1097 }
1098
1099 void ReplicatedBackend::repop_commit(RepModifyRef rm)
1100 {
1101 rm->op->mark_commit_sent();
1102 rm->op->pg_trace.event("sup_op_commit");
1103 rm->committed = true;
1104
1105 // send commit.
1106 const MOSDRepOp *m = static_cast<const MOSDRepOp*>(rm->op->get_req());
1107 ceph_assert(m->get_type() == MSG_OSD_REPOP);
1108 dout(10) << __func__ << " on op " << *m
1109 << ", sending commit to osd." << rm->ackerosd
1110 << dendl;
1111 ceph_assert(get_osdmap()->is_up(rm->ackerosd));
1112
1113 get_parent()->update_last_complete_ondisk(rm->last_complete);
1114
1115 MOSDRepOpReply *reply = new MOSDRepOpReply(
1116 m,
1117 get_parent()->whoami_shard(),
1118 0, get_osdmap_epoch(), m->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
1119 reply->set_last_complete_ondisk(rm->last_complete);
1120 reply->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
1121 reply->trace = rm->op->pg_trace;
1122 get_parent()->send_message_osd_cluster(
1123 rm->ackerosd, reply, get_osdmap_epoch());
1124
1125 log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
1126 }
1127
1128
1129 // ===========================================================
1130
1131 void ReplicatedBackend::calc_head_subsets(
1132 ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
1133 const pg_missing_t& missing,
1134 const hobject_t &last_backfill,
1135 interval_set<uint64_t>& data_subset,
1136 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1137 ObcLockManager &manager)
1138 {
1139 dout(10) << "calc_head_subsets " << head
1140 << " clone_overlap " << snapset.clone_overlap << dendl;
1141
1142 uint64_t size = obc->obs.oi.size;
1143 if (size)
1144 data_subset.insert(0, size);
1145
1146 if (get_parent()->get_pool().allow_incomplete_clones()) {
1147 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1148 return;
1149 }
1150
1151 if (!cct->_conf->osd_recover_clone_overlap) {
1152 dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
1153 return;
1154 }
1155
1156
1157 interval_set<uint64_t> cloning;
1158 interval_set<uint64_t> prev;
1159 if (size)
1160 prev.insert(0, size);
1161
1162 for (int j=snapset.clones.size()-1; j>=0; j--) {
1163 hobject_t c = head;
1164 c.snap = snapset.clones[j];
1165 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1166 if (!missing.is_missing(c) &&
1167 c < last_backfill &&
1168 get_parent()->try_lock_for_read(c, manager)) {
1169 dout(10) << "calc_head_subsets " << head << " has prev " << c
1170 << " overlap " << prev << dendl;
1171 clone_subsets[c] = prev;
1172 cloning.union_of(prev);
1173 break;
1174 }
1175 dout(10) << "calc_head_subsets " << head << " does not have prev " << c
1176 << " overlap " << prev << dendl;
1177 }
1178
1179
1180 if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1181 dout(10) << "skipping clone, too many holes" << dendl;
1182 get_parent()->release_locks(manager);
1183 clone_subsets.clear();
1184 cloning.clear();
1185 }
1186
1187 // what's left for us to push?
1188 data_subset.subtract(cloning);
1189
1190 dout(10) << "calc_head_subsets " << head
1191 << " data_subset " << data_subset
1192 << " clone_subsets " << clone_subsets << dendl;
1193 }
1194
1195 void ReplicatedBackend::calc_clone_subsets(
1196 SnapSet& snapset, const hobject_t& soid,
1197 const pg_missing_t& missing,
1198 const hobject_t &last_backfill,
1199 interval_set<uint64_t>& data_subset,
1200 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1201 ObcLockManager &manager)
1202 {
1203 dout(10) << "calc_clone_subsets " << soid
1204 << " clone_overlap " << snapset.clone_overlap << dendl;
1205
1206 uint64_t size = snapset.clone_size[soid.snap];
1207 if (size)
1208 data_subset.insert(0, size);
1209
1210 if (get_parent()->get_pool().allow_incomplete_clones()) {
1211 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1212 return;
1213 }
1214
1215 if (!cct->_conf->osd_recover_clone_overlap) {
1216 dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
1217 return;
1218 }
1219
1220 unsigned i;
1221 for (i=0; i < snapset.clones.size(); i++)
1222 if (snapset.clones[i] == soid.snap)
1223 break;
1224
1225 // any overlap with next older clone?
1226 interval_set<uint64_t> cloning;
1227 interval_set<uint64_t> prev;
1228 if (size)
1229 prev.insert(0, size);
1230 for (int j=i-1; j>=0; j--) {
1231 hobject_t c = soid;
1232 c.snap = snapset.clones[j];
1233 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1234 if (!missing.is_missing(c) &&
1235 c < last_backfill &&
1236 get_parent()->try_lock_for_read(c, manager)) {
1237 dout(10) << "calc_clone_subsets " << soid << " has prev " << c
1238 << " overlap " << prev << dendl;
1239 clone_subsets[c] = prev;
1240 cloning.union_of(prev);
1241 break;
1242 }
1243 dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
1244 << " overlap " << prev << dendl;
1245 }
1246
1247 // overlap with next newest?
1248 interval_set<uint64_t> next;
1249 if (size)
1250 next.insert(0, size);
1251 for (unsigned j=i+1; j<snapset.clones.size(); j++) {
1252 hobject_t c = soid;
1253 c.snap = snapset.clones[j];
1254 next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
1255 if (!missing.is_missing(c) &&
1256 c < last_backfill &&
1257 get_parent()->try_lock_for_read(c, manager)) {
1258 dout(10) << "calc_clone_subsets " << soid << " has next " << c
1259 << " overlap " << next << dendl;
1260 clone_subsets[c] = next;
1261 cloning.union_of(next);
1262 break;
1263 }
1264 dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
1265 << " overlap " << next << dendl;
1266 }
1267
1268 if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1269 dout(10) << "skipping clone, too many holes" << dendl;
1270 get_parent()->release_locks(manager);
1271 clone_subsets.clear();
1272 cloning.clear();
1273 }
1274
1275
1276 // what's left for us to push?
1277 data_subset.subtract(cloning);
1278
1279 dout(10) << "calc_clone_subsets " << soid
1280 << " data_subset " << data_subset
1281 << " clone_subsets " << clone_subsets << dendl;
1282 }
1283
1284 void ReplicatedBackend::prepare_pull(
1285 eversion_t v,
1286 const hobject_t& soid,
1287 ObjectContextRef headctx,
1288 RPGHandle *h)
1289 {
1290 ceph_assert(get_parent()->get_local_missing().get_items().count(soid));
1291 eversion_t _v = get_parent()->get_local_missing().get_items().find(
1292 soid)->second.need;
1293 ceph_assert(_v == v);
1294 const map<hobject_t, set<pg_shard_t>> &missing_loc(
1295 get_parent()->get_missing_loc_shards());
1296 const map<pg_shard_t, pg_missing_t > &peer_missing(
1297 get_parent()->get_shard_missing());
1298 map<hobject_t, set<pg_shard_t>>::const_iterator q = missing_loc.find(soid);
1299 ceph_assert(q != missing_loc.end());
1300 ceph_assert(!q->second.empty());
1301
1302 // pick a pullee
1303 auto p = q->second.end();
1304 if (cct->_conf->osd_debug_feed_pullee >= 0) {
1305 for (auto it = q->second.begin(); it != q->second.end(); it++) {
1306 if (it->osd == cct->_conf->osd_debug_feed_pullee) {
1307 p = it;
1308 break;
1309 }
1310 }
1311 }
1312 if (p == q->second.end()) {
1313 // probably because user feed a wrong pullee
1314 p = q->second.begin();
1315 std::advance(p,
1316 util::generate_random_number<int>(0,
1317 q->second.size() - 1));
1318 }
1319 ceph_assert(get_osdmap()->is_up(p->osd));
1320 pg_shard_t fromshard = *p;
1321
1322 dout(7) << "pull " << soid
1323 << " v " << v
1324 << " on osds " << q->second
1325 << " from osd." << fromshard
1326 << dendl;
1327
1328 ceph_assert(peer_missing.count(fromshard));
1329 const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
1330 if (pmissing.is_missing(soid, v)) {
1331 ceph_assert(pmissing.get_items().find(soid)->second.have != v);
1332 dout(10) << "pulling soid " << soid << " from osd " << fromshard
1333 << " at version " << pmissing.get_items().find(soid)->second.have
1334 << " rather than at version " << v << dendl;
1335 v = pmissing.get_items().find(soid)->second.have;
1336 ceph_assert(get_parent()->get_log().get_log().objects.count(soid) &&
1337 (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
1338 pg_log_entry_t::LOST_REVERT) &&
1339 (get_parent()->get_log().get_log().objects.find(
1340 soid)->second->reverting_to ==
1341 v));
1342 }
1343
1344 ObjectRecoveryInfo recovery_info;
1345 ObcLockManager lock_manager;
1346
1347 if (soid.is_snap()) {
1348 ceph_assert(!get_parent()->get_local_missing().is_missing(soid.get_head()));
1349 ceph_assert(headctx);
1350 // check snapset
1351 SnapSetContext *ssc = headctx->ssc;
1352 ceph_assert(ssc);
1353 dout(10) << " snapset " << ssc->snapset << dendl;
1354 recovery_info.ss = ssc->snapset;
1355 calc_clone_subsets(
1356 ssc->snapset, soid, get_parent()->get_local_missing(),
1357 get_info().last_backfill,
1358 recovery_info.copy_subset,
1359 recovery_info.clone_subset,
1360 lock_manager);
1361 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1362 dout(10) << " pulling " << recovery_info << dendl;
1363
1364 ceph_assert(ssc->snapset.clone_size.count(soid.snap));
1365 recovery_info.size = ssc->snapset.clone_size[soid.snap];
1366 } else {
1367 // pulling head or unversioned object.
1368 // always pull the whole thing.
1369 recovery_info.copy_subset.insert(0, (uint64_t)-1);
1370 recovery_info.size = ((uint64_t)-1);
1371 }
1372
1373 h->pulls[fromshard].push_back(PullOp());
1374 PullOp &op = h->pulls[fromshard].back();
1375 op.soid = soid;
1376
1377 op.recovery_info = recovery_info;
1378 op.recovery_info.soid = soid;
1379 op.recovery_info.version = v;
1380 op.recovery_progress.data_complete = false;
1381 op.recovery_progress.omap_complete = false;
1382 op.recovery_progress.data_recovered_to = 0;
1383 op.recovery_progress.first = true;
1384
1385 ceph_assert(!pulling.count(soid));
1386 pull_from_peer[fromshard].insert(soid);
1387 PullInfo &pi = pulling[soid];
1388 pi.from = fromshard;
1389 pi.soid = soid;
1390 pi.head_ctx = headctx;
1391 pi.recovery_info = op.recovery_info;
1392 pi.recovery_progress = op.recovery_progress;
1393 pi.cache_dont_need = h->cache_dont_need;
1394 pi.lock_manager = std::move(lock_manager);
1395 }
1396
1397 /*
1398 * intelligently push an object to a replica. make use of existing
1399 * clones/heads and dup data ranges where possible.
1400 */
1401 int ReplicatedBackend::prep_push_to_replica(
1402 ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
1403 PushOp *pop, bool cache_dont_need)
1404 {
1405 const object_info_t& oi = obc->obs.oi;
1406 uint64_t size = obc->obs.oi.size;
1407
1408 dout(10) << __func__ << ": " << soid << " v" << oi.version
1409 << " size " << size << " to osd." << peer << dendl;
1410
1411 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1412 interval_set<uint64_t> data_subset;
1413
1414 ObcLockManager lock_manager;
1415 // are we doing a clone on the replica?
1416 if (soid.snap && soid.snap < CEPH_NOSNAP) {
1417 hobject_t head = soid;
1418 head.snap = CEPH_NOSNAP;
1419
1420 // try to base push off of clones that succeed/preceed poid
1421 // we need the head (and current SnapSet) locally to do that.
1422 if (get_parent()->get_local_missing().is_missing(head)) {
1423 dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
1424 return prep_push(obc, soid, peer, pop, cache_dont_need);
1425 }
1426
1427 SnapSetContext *ssc = obc->ssc;
1428 ceph_assert(ssc);
1429 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1430 pop->recovery_info.ss = ssc->snapset;
1431 map<pg_shard_t, pg_missing_t>::const_iterator pm =
1432 get_parent()->get_shard_missing().find(peer);
1433 ceph_assert(pm != get_parent()->get_shard_missing().end());
1434 map<pg_shard_t, pg_info_t>::const_iterator pi =
1435 get_parent()->get_shard_info().find(peer);
1436 ceph_assert(pi != get_parent()->get_shard_info().end());
1437 calc_clone_subsets(
1438 ssc->snapset, soid,
1439 pm->second,
1440 pi->second.last_backfill,
1441 data_subset, clone_subsets,
1442 lock_manager);
1443 } else if (soid.snap == CEPH_NOSNAP) {
1444 // pushing head or unversioned object.
1445 // base this on partially on replica's clones?
1446 SnapSetContext *ssc = obc->ssc;
1447 ceph_assert(ssc);
1448 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1449 calc_head_subsets(
1450 obc,
1451 ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
1452 get_parent()->get_shard_info().find(peer)->second.last_backfill,
1453 data_subset, clone_subsets,
1454 lock_manager);
1455 }
1456
1457 return prep_push(
1458 obc,
1459 soid,
1460 peer,
1461 oi.version,
1462 data_subset,
1463 clone_subsets,
1464 pop,
1465 cache_dont_need,
1466 std::move(lock_manager));
1467 }
1468
1469 int ReplicatedBackend::prep_push(ObjectContextRef obc,
1470 const hobject_t& soid, pg_shard_t peer,
1471 PushOp *pop, bool cache_dont_need)
1472 {
1473 interval_set<uint64_t> data_subset;
1474 if (obc->obs.oi.size)
1475 data_subset.insert(0, obc->obs.oi.size);
1476 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1477
1478 return prep_push(obc, soid, peer,
1479 obc->obs.oi.version, data_subset, clone_subsets,
1480 pop, cache_dont_need, ObcLockManager());
1481 }
1482
1483 int ReplicatedBackend::prep_push(
1484 ObjectContextRef obc,
1485 const hobject_t& soid, pg_shard_t peer,
1486 eversion_t version,
1487 interval_set<uint64_t> &data_subset,
1488 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1489 PushOp *pop,
1490 bool cache_dont_need,
1491 ObcLockManager &&lock_manager)
1492 {
1493 get_parent()->begin_peer_recover(peer, soid);
1494 // take note.
1495 PushInfo &pi = pushing[soid][peer];
1496 pi.obc = obc;
1497 pi.recovery_info.size = obc->obs.oi.size;
1498 pi.recovery_info.copy_subset = data_subset;
1499 pi.recovery_info.clone_subset = clone_subsets;
1500 pi.recovery_info.soid = soid;
1501 pi.recovery_info.oi = obc->obs.oi;
1502 pi.recovery_info.ss = pop->recovery_info.ss;
1503 pi.recovery_info.version = version;
1504 pi.lock_manager = std::move(lock_manager);
1505
1506 ObjectRecoveryProgress new_progress;
1507 int r = build_push_op(pi.recovery_info,
1508 pi.recovery_progress,
1509 &new_progress,
1510 pop,
1511 &(pi.stat), cache_dont_need);
1512 if (r < 0)
1513 return r;
1514 pi.recovery_progress = new_progress;
1515 return 0;
1516 }
1517
1518 void ReplicatedBackend::submit_push_data(
1519 const ObjectRecoveryInfo &recovery_info,
1520 bool first,
1521 bool complete,
1522 bool cache_dont_need,
1523 const interval_set<uint64_t> &intervals_included,
1524 bufferlist data_included,
1525 bufferlist omap_header,
1526 const map<string, bufferlist> &attrs,
1527 const map<string, bufferlist> &omap_entries,
1528 ObjectStore::Transaction *t)
1529 {
1530 hobject_t target_oid;
1531 if (first && complete) {
1532 target_oid = recovery_info.soid;
1533 } else {
1534 target_oid = get_parent()->get_temp_recovery_object(recovery_info.soid,
1535 recovery_info.version);
1536 if (first) {
1537 dout(10) << __func__ << ": Adding oid "
1538 << target_oid << " in the temp collection" << dendl;
1539 add_temp_obj(target_oid);
1540 }
1541 }
1542
1543 if (first) {
1544 t->remove(coll, ghobject_t(target_oid));
1545 t->touch(coll, ghobject_t(target_oid));
1546 t->truncate(coll, ghobject_t(target_oid), recovery_info.size);
1547 if (omap_header.length())
1548 t->omap_setheader(coll, ghobject_t(target_oid), omap_header);
1549
1550 bufferlist bv = attrs.at(OI_ATTR);
1551 object_info_t oi(bv);
1552 t->set_alloc_hint(coll, ghobject_t(target_oid),
1553 oi.expected_object_size,
1554 oi.expected_write_size,
1555 oi.alloc_hint_flags);
1556 if (get_parent()->pg_is_remote_backfilling()) {
1557 struct stat st;
1558 uint64_t size = 0;
1559 int r = store->stat(ch, ghobject_t(recovery_info.soid), &st);
1560 if (r == 0) {
1561 size = st.st_size;
1562 }
1563 // Don't need to do anything if object is still the same size
1564 if (size != recovery_info.oi.size) {
1565 get_parent()->pg_add_local_num_bytes((int64_t)recovery_info.oi.size - (int64_t)size);
1566 get_parent()->pg_add_num_bytes((int64_t)recovery_info.oi.size - (int64_t)size);
1567 dout(10) << __func__ << " " << recovery_info.soid
1568 << " backfill size " << recovery_info.oi.size
1569 << " previous size " << size
1570 << " net size " << recovery_info.oi.size - size
1571 << dendl;
1572 }
1573 }
1574 }
1575 uint64_t off = 0;
1576 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
1577 if (cache_dont_need)
1578 fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
1579 for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
1580 p != intervals_included.end();
1581 ++p) {
1582 bufferlist bit;
1583 bit.substr_of(data_included, off, p.get_len());
1584 t->write(coll, ghobject_t(target_oid),
1585 p.get_start(), p.get_len(), bit, fadvise_flags);
1586 off += p.get_len();
1587 }
1588
1589 if (!omap_entries.empty())
1590 t->omap_setkeys(coll, ghobject_t(target_oid), omap_entries);
1591 if (!attrs.empty())
1592 t->setattrs(coll, ghobject_t(target_oid), attrs);
1593
1594 if (complete) {
1595 if (!first) {
1596 dout(10) << __func__ << ": Removing oid "
1597 << target_oid << " from the temp collection" << dendl;
1598 clear_temp_obj(target_oid);
1599 t->remove(coll, ghobject_t(recovery_info.soid));
1600 t->collection_move_rename(coll, ghobject_t(target_oid),
1601 coll, ghobject_t(recovery_info.soid));
1602 }
1603
1604 submit_push_complete(recovery_info, t);
1605 }
1606 }
1607
1608 void ReplicatedBackend::submit_push_complete(
1609 const ObjectRecoveryInfo &recovery_info,
1610 ObjectStore::Transaction *t)
1611 {
1612 for (map<hobject_t, interval_set<uint64_t>>::const_iterator p =
1613 recovery_info.clone_subset.begin();
1614 p != recovery_info.clone_subset.end();
1615 ++p) {
1616 for (interval_set<uint64_t>::const_iterator q = p->second.begin();
1617 q != p->second.end();
1618 ++q) {
1619 dout(15) << " clone_range " << p->first << " "
1620 << q.get_start() << "~" << q.get_len() << dendl;
1621 t->clone_range(coll, ghobject_t(p->first), ghobject_t(recovery_info.soid),
1622 q.get_start(), q.get_len(), q.get_start());
1623 }
1624 }
1625 }
1626
1627 ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
1628 const ObjectRecoveryInfo& recovery_info,
1629 SnapSetContext *ssc,
1630 ObcLockManager &manager)
1631 {
1632 if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
1633 return recovery_info;
1634 ObjectRecoveryInfo new_info = recovery_info;
1635 new_info.copy_subset.clear();
1636 new_info.clone_subset.clear();
1637 ceph_assert(ssc);
1638 get_parent()->release_locks(manager); // might already have locks
1639 calc_clone_subsets(
1640 ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
1641 get_info().last_backfill,
1642 new_info.copy_subset, new_info.clone_subset,
1643 manager);
1644 return new_info;
1645 }
1646
1647 bool ReplicatedBackend::handle_pull_response(
1648 pg_shard_t from, const PushOp &pop, PullOp *response,
1649 list<pull_complete_info> *to_continue,
1650 ObjectStore::Transaction *t)
1651 {
1652 interval_set<uint64_t> data_included = pop.data_included;
1653 bufferlist data;
1654 data = pop.data;
1655 dout(10) << "handle_pull_response "
1656 << pop.recovery_info
1657 << pop.after_progress
1658 << " data.size() is " << data.length()
1659 << " data_included: " << data_included
1660 << dendl;
1661 if (pop.version == eversion_t()) {
1662 // replica doesn't have it!
1663 _failed_pull(from, pop.soid);
1664 return false;
1665 }
1666
1667 const hobject_t &hoid = pop.soid;
1668 ceph_assert((data_included.empty() && data.length() == 0) ||
1669 (!data_included.empty() && data.length() > 0));
1670
1671 auto piter = pulling.find(hoid);
1672 if (piter == pulling.end()) {
1673 return false;
1674 }
1675
1676 PullInfo &pi = piter->second;
1677 if (pi.recovery_info.size == (uint64_t(-1))) {
1678 pi.recovery_info.size = pop.recovery_info.size;
1679 pi.recovery_info.copy_subset.intersection_of(
1680 pop.recovery_info.copy_subset);
1681 }
1682 // If primary doesn't have object info and didn't know version
1683 if (pi.recovery_info.version == eversion_t()) {
1684 pi.recovery_info.version = pop.version;
1685 }
1686
1687 bool first = pi.recovery_progress.first;
1688 if (first) {
1689 // attrs only reference the origin bufferlist (decode from
1690 // MOSDPGPush message) whose size is much greater than attrs in
1691 // recovery. If obc cache it (get_obc maybe cache the attr), this
1692 // causes the whole origin bufferlist would not be free until obc
1693 // is evicted from obc cache. So rebuild the bufferlists before
1694 // cache it.
1695 auto attrset = pop.attrset;
1696 for (auto& a : attrset) {
1697 a.second.rebuild();
1698 }
1699 pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset);
1700 pi.recovery_info.oi = pi.obc->obs.oi;
1701 pi.recovery_info = recalc_subsets(
1702 pi.recovery_info,
1703 pi.obc->ssc,
1704 pi.lock_manager);
1705 }
1706
1707
1708 interval_set<uint64_t> usable_intervals;
1709 bufferlist usable_data;
1710 trim_pushed_data(pi.recovery_info.copy_subset,
1711 data_included,
1712 data,
1713 &usable_intervals,
1714 &usable_data);
1715 data_included = usable_intervals;
1716 data.claim(usable_data);
1717
1718
1719 pi.recovery_progress = pop.after_progress;
1720
1721 dout(10) << "new recovery_info " << pi.recovery_info
1722 << ", new progress " << pi.recovery_progress
1723 << dendl;
1724
1725 bool complete = pi.is_complete();
1726
1727 submit_push_data(pi.recovery_info, first,
1728 complete, pi.cache_dont_need,
1729 data_included, data,
1730 pop.omap_header,
1731 pop.attrset,
1732 pop.omap_entries,
1733 t);
1734
1735 pi.stat.num_keys_recovered += pop.omap_entries.size();
1736 pi.stat.num_bytes_recovered += data.length();
1737 get_parent()->get_logger()->inc(l_osd_rbytes, pop.omap_entries.size() + data.length());
1738
1739 if (complete) {
1740 pi.stat.num_objects_recovered++;
1741 // XXX: This could overcount if regular recovery is needed right after a repair
1742 if (get_parent()->pg_is_repair()) {
1743 pi.stat.num_objects_repaired++;
1744 get_parent()->inc_osd_stat_repaired();
1745 }
1746 clear_pull_from(piter);
1747 to_continue->push_back({hoid, pi.stat});
1748 get_parent()->on_local_recover(
1749 hoid, pi.recovery_info, pi.obc, false, t);
1750 return false;
1751 } else {
1752 response->soid = pop.soid;
1753 response->recovery_info = pi.recovery_info;
1754 response->recovery_progress = pi.recovery_progress;
1755 return true;
1756 }
1757 }
1758
1759 void ReplicatedBackend::handle_push(
1760 pg_shard_t from, const PushOp &pop, PushReplyOp *response,
1761 ObjectStore::Transaction *t, bool is_repair)
1762 {
1763 dout(10) << "handle_push "
1764 << pop.recovery_info
1765 << pop.after_progress
1766 << dendl;
1767 bufferlist data;
1768 data = pop.data;
1769 bool first = pop.before_progress.first;
1770 bool complete = pop.after_progress.data_complete &&
1771 pop.after_progress.omap_complete;
1772
1773 response->soid = pop.recovery_info.soid;
1774 submit_push_data(pop.recovery_info,
1775 first,
1776 complete,
1777 true, // must be replicate
1778 pop.data_included,
1779 data,
1780 pop.omap_header,
1781 pop.attrset,
1782 pop.omap_entries,
1783 t);
1784
1785 if (complete) {
1786 if (is_repair) {
1787 get_parent()->inc_osd_stat_repaired();
1788 dout(20) << __func__ << " repair complete" << dendl;
1789 }
1790 get_parent()->on_local_recover(
1791 pop.recovery_info.soid,
1792 pop.recovery_info,
1793 ObjectContextRef(), // ok, is replica
1794 false,
1795 t);
1796 }
1797 }
1798
1799 void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
1800 {
1801 for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
1802 i != pushes.end();
1803 ++i) {
1804 ConnectionRef con = get_parent()->get_con_osd_cluster(
1805 i->first.osd,
1806 get_osdmap_epoch());
1807 if (!con)
1808 continue;
1809 vector<PushOp>::iterator j = i->second.begin();
1810 while (j != i->second.end()) {
1811 uint64_t cost = 0;
1812 uint64_t pushes = 0;
1813 MOSDPGPush *msg = new MOSDPGPush();
1814 msg->from = get_parent()->whoami_shard();
1815 msg->pgid = get_parent()->primary_spg_t();
1816 msg->map_epoch = get_osdmap_epoch();
1817 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1818 msg->set_priority(prio);
1819 msg->is_repair = get_parent()->pg_is_repair();
1820 for (;
1821 (j != i->second.end() &&
1822 cost < cct->_conf->osd_max_push_cost &&
1823 pushes < cct->_conf->osd_max_push_objects) ;
1824 ++j) {
1825 dout(20) << __func__ << ": sending push " << *j
1826 << " to osd." << i->first << dendl;
1827 cost += j->cost(cct);
1828 pushes += 1;
1829 msg->pushes.push_back(*j);
1830 }
1831 msg->set_cost(cost);
1832 get_parent()->send_message_osd_cluster(msg, con);
1833 }
1834 }
1835 }
1836
1837 void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
1838 {
1839 for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
1840 i != pulls.end();
1841 ++i) {
1842 ConnectionRef con = get_parent()->get_con_osd_cluster(
1843 i->first.osd,
1844 get_osdmap_epoch());
1845 if (!con)
1846 continue;
1847 dout(20) << __func__ << ": sending pulls " << i->second
1848 << " to osd." << i->first << dendl;
1849 MOSDPGPull *msg = new MOSDPGPull();
1850 msg->from = parent->whoami_shard();
1851 msg->set_priority(prio);
1852 msg->pgid = get_parent()->primary_spg_t();
1853 msg->map_epoch = get_osdmap_epoch();
1854 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1855 msg->set_pulls(&i->second);
1856 msg->compute_cost(cct);
1857 get_parent()->send_message_osd_cluster(msg, con);
1858 }
1859 }
1860
1861 int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
1862 const ObjectRecoveryProgress &progress,
1863 ObjectRecoveryProgress *out_progress,
1864 PushOp *out_op,
1865 object_stat_sum_t *stat,
1866 bool cache_dont_need)
1867 {
1868 ObjectRecoveryProgress _new_progress;
1869 if (!out_progress)
1870 out_progress = &_new_progress;
1871 ObjectRecoveryProgress &new_progress = *out_progress;
1872 new_progress = progress;
1873
1874 dout(7) << __func__ << " " << recovery_info.soid
1875 << " v " << recovery_info.version
1876 << " size " << recovery_info.size
1877 << " recovery_info: " << recovery_info
1878 << dendl;
1879
1880 eversion_t v = recovery_info.version;
1881 object_info_t oi;
1882 if (progress.first) {
1883 int r = store->omap_get_header(ch, ghobject_t(recovery_info.soid), &out_op->omap_header);
1884 if(r < 0) {
1885 dout(1) << __func__ << " get omap header failed: " << cpp_strerror(-r) << dendl;
1886 return r;
1887 }
1888 r = store->getattrs(ch, ghobject_t(recovery_info.soid), out_op->attrset);
1889 if(r < 0) {
1890 dout(1) << __func__ << " getattrs failed: " << cpp_strerror(-r) << dendl;
1891 return r;
1892 }
1893
1894 // Debug
1895 bufferlist bv = out_op->attrset[OI_ATTR];
1896 try {
1897 auto bliter = bv.cbegin();
1898 decode(oi, bliter);
1899 } catch (...) {
1900 dout(0) << __func__ << ": bad object_info_t: " << recovery_info.soid << dendl;
1901 return -EINVAL;
1902 }
1903
1904 // If requestor didn't know the version, use ours
1905 if (v == eversion_t()) {
1906 v = oi.version;
1907 } else if (oi.version != v) {
1908 get_parent()->clog_error() << get_info().pgid << " push "
1909 << recovery_info.soid << " v "
1910 << recovery_info.version
1911 << " failed because local copy is "
1912 << oi.version;
1913 return -EINVAL;
1914 }
1915
1916 new_progress.first = false;
1917 }
1918 // Once we provide the version subsequent requests will have it, so
1919 // at this point it must be known.
1920 ceph_assert(v != eversion_t());
1921
1922 uint64_t available = cct->_conf->osd_recovery_max_chunk;
1923 if (!progress.omap_complete) {
1924 ObjectMap::ObjectMapIterator iter =
1925 store->get_omap_iterator(ch,
1926 ghobject_t(recovery_info.soid));
1927 ceph_assert(iter);
1928 for (iter->lower_bound(progress.omap_recovered_to);
1929 iter->valid();
1930 iter->next()) {
1931 if (!out_op->omap_entries.empty() &&
1932 ((cct->_conf->osd_recovery_max_omap_entries_per_chunk > 0 &&
1933 out_op->omap_entries.size() >= cct->_conf->osd_recovery_max_omap_entries_per_chunk) ||
1934 available <= iter->key().size() + iter->value().length()))
1935 break;
1936 out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
1937
1938 if ((iter->key().size() + iter->value().length()) <= available)
1939 available -= (iter->key().size() + iter->value().length());
1940 else
1941 available = 0;
1942 }
1943 if (!iter->valid())
1944 new_progress.omap_complete = true;
1945 else
1946 new_progress.omap_recovered_to = iter->key();
1947 }
1948
1949 if (available > 0) {
1950 if (!recovery_info.copy_subset.empty()) {
1951 interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
1952 map<uint64_t, uint64_t> m;
1953 int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0,
1954 copy_subset.range_end(), m);
1955 if (r >= 0) {
1956 interval_set<uint64_t> fiemap_included(m);
1957 copy_subset.intersection_of(fiemap_included);
1958 } else {
1959 // intersection of copy_subset and empty interval_set would be empty anyway
1960 copy_subset.clear();
1961 }
1962
1963 out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
1964 available);
1965 if (out_op->data_included.empty()) // zero filled section, skip to end!
1966 new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
1967 else
1968 new_progress.data_recovered_to = out_op->data_included.range_end();
1969 }
1970 } else {
1971 out_op->data_included.clear();
1972 }
1973
1974 for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
1975 p != out_op->data_included.end();
1976 ++p) {
1977 bufferlist bit;
1978 int r = store->read(ch, ghobject_t(recovery_info.soid),
1979 p.get_start(), p.get_len(), bit,
1980 cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
1981 if (cct->_conf->osd_debug_random_push_read_error &&
1982 (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) {
1983 dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
1984 r = -EIO;
1985 }
1986 if (r < 0) {
1987 return r;
1988 }
1989 if (p.get_len() != bit.length()) {
1990 dout(10) << " extent " << p.get_start() << "~" << p.get_len()
1991 << " is actually " << p.get_start() << "~" << bit.length()
1992 << dendl;
1993 interval_set<uint64_t>::iterator save = p++;
1994 if (bit.length() == 0)
1995 out_op->data_included.erase(save); //Remove this empty interval
1996 else
1997 save.set_len(bit.length());
1998 // Remove any other intervals present
1999 while (p != out_op->data_included.end()) {
2000 interval_set<uint64_t>::iterator save = p++;
2001 out_op->data_included.erase(save);
2002 }
2003 new_progress.data_complete = true;
2004 out_op->data.claim_append(bit);
2005 break;
2006 }
2007 out_op->data.claim_append(bit);
2008 }
2009 if (progress.first && !out_op->data_included.empty() &&
2010 out_op->data_included.begin().get_start() == 0 &&
2011 out_op->data.length() == oi.size && oi.is_data_digest()) {
2012 uint32_t crc = out_op->data.crc32c(-1);
2013 if (oi.data_digest != crc) {
2014 dout(0) << __func__ << " " << coll << std::hex
2015 << " full-object read crc 0x" << crc
2016 << " != expected 0x" << oi.data_digest
2017 << std::dec << " on " << recovery_info.soid << dendl;
2018 return -EIO;
2019 }
2020 }
2021
2022 if (new_progress.is_complete(recovery_info)) {
2023 new_progress.data_complete = true;
2024 if (stat) {
2025 stat->num_objects_recovered++;
2026 if (get_parent()->pg_is_repair())
2027 stat->num_objects_repaired++;
2028 }
2029 }
2030
2031 if (stat) {
2032 stat->num_keys_recovered += out_op->omap_entries.size();
2033 stat->num_bytes_recovered += out_op->data.length();
2034 get_parent()->get_logger()->inc(l_osd_rbytes, out_op->omap_entries.size() + out_op->data.length());
2035 }
2036
2037 get_parent()->get_logger()->inc(l_osd_push);
2038 get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
2039
2040 // send
2041 out_op->version = v;
2042 out_op->soid = recovery_info.soid;
2043 out_op->recovery_info = recovery_info;
2044 out_op->after_progress = new_progress;
2045 out_op->before_progress = progress;
2046 return 0;
2047 }
2048
2049 void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
2050 {
2051 op->recovery_info.version = eversion_t();
2052 op->version = eversion_t();
2053 op->soid = soid;
2054 }
2055
2056 bool ReplicatedBackend::handle_push_reply(
2057 pg_shard_t peer, const PushReplyOp &op, PushOp *reply)
2058 {
2059 const hobject_t &soid = op.soid;
2060 if (pushing.count(soid) == 0) {
2061 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2062 << ", or anybody else"
2063 << dendl;
2064 return false;
2065 } else if (pushing[soid].count(peer) == 0) {
2066 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2067 << dendl;
2068 return false;
2069 } else {
2070 PushInfo *pi = &pushing[soid][peer];
2071 bool error = pushing[soid].begin()->second.recovery_progress.error;
2072
2073 if (!pi->recovery_progress.data_complete && !error) {
2074 dout(10) << " pushing more from, "
2075 << pi->recovery_progress.data_recovered_to
2076 << " of " << pi->recovery_info.copy_subset << dendl;
2077 ObjectRecoveryProgress new_progress;
2078 int r = build_push_op(
2079 pi->recovery_info,
2080 pi->recovery_progress, &new_progress, reply,
2081 &(pi->stat));
2082 // Handle the case of a read error right after we wrote, which is
2083 // hopefully extremely rare.
2084 if (r < 0) {
2085 dout(5) << __func__ << ": oid " << soid << " error " << r << dendl;
2086
2087 error = true;
2088 goto done;
2089 }
2090 pi->recovery_progress = new_progress;
2091 return true;
2092 } else {
2093 // done!
2094 done:
2095 if (!error)
2096 get_parent()->on_peer_recover( peer, soid, pi->recovery_info);
2097
2098 get_parent()->release_locks(pi->lock_manager);
2099 object_stat_sum_t stat = pi->stat;
2100 eversion_t v = pi->recovery_info.version;
2101 pushing[soid].erase(peer);
2102 pi = NULL;
2103
2104 if (pushing[soid].empty()) {
2105 if (!error)
2106 get_parent()->on_global_recover(soid, stat, false);
2107 else
2108 get_parent()->on_primary_error(soid, v);
2109 pushing.erase(soid);
2110 } else {
2111 // This looks weird, but we erased the current peer and need to remember
2112 // the error on any other one, while getting more acks.
2113 if (error)
2114 pushing[soid].begin()->second.recovery_progress.error = true;
2115 dout(10) << "pushed " << soid << ", still waiting for push ack from "
2116 << pushing[soid].size() << " others" << dendl;
2117 }
2118 return false;
2119 }
2120 }
2121 }
2122
2123 void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
2124 {
2125 const hobject_t &soid = op.soid;
2126 struct stat st;
2127 int r = store->stat(ch, ghobject_t(soid), &st);
2128 if (r != 0) {
2129 get_parent()->clog_error() << get_info().pgid << " "
2130 << peer << " tried to pull " << soid
2131 << " but got " << cpp_strerror(-r);
2132 prep_push_op_blank(soid, reply);
2133 } else {
2134 ObjectRecoveryInfo &recovery_info = op.recovery_info;
2135 ObjectRecoveryProgress &progress = op.recovery_progress;
2136 if (progress.first && recovery_info.size == ((uint64_t)-1)) {
2137 // Adjust size and copy_subset
2138 recovery_info.size = st.st_size;
2139 recovery_info.copy_subset.clear();
2140 if (st.st_size)
2141 recovery_info.copy_subset.insert(0, st.st_size);
2142 ceph_assert(recovery_info.clone_subset.empty());
2143 }
2144
2145 r = build_push_op(recovery_info, progress, 0, reply);
2146 if (r < 0)
2147 prep_push_op_blank(soid, reply);
2148 }
2149 }
2150
2151 /**
2152 * trim received data to remove what we don't want
2153 *
2154 * @param copy_subset intervals we want
2155 * @param data_included intervals we got
2156 * @param data_recieved data we got
2157 * @param intervals_usable intervals we want to keep
2158 * @param data_usable matching data we want to keep
2159 */
2160 void ReplicatedBackend::trim_pushed_data(
2161 const interval_set<uint64_t> &copy_subset,
2162 const interval_set<uint64_t> &intervals_received,
2163 bufferlist data_received,
2164 interval_set<uint64_t> *intervals_usable,
2165 bufferlist *data_usable)
2166 {
2167 if (intervals_received.subset_of(copy_subset)) {
2168 *intervals_usable = intervals_received;
2169 *data_usable = data_received;
2170 return;
2171 }
2172
2173 intervals_usable->intersection_of(copy_subset,
2174 intervals_received);
2175
2176 uint64_t off = 0;
2177 for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
2178 p != intervals_received.end();
2179 ++p) {
2180 interval_set<uint64_t> x;
2181 x.insert(p.get_start(), p.get_len());
2182 x.intersection_of(copy_subset);
2183 for (interval_set<uint64_t>::const_iterator q = x.begin();
2184 q != x.end();
2185 ++q) {
2186 bufferlist sub;
2187 uint64_t data_off = off + (q.get_start() - p.get_start());
2188 sub.substr_of(data_received, data_off, q.get_len());
2189 data_usable->claim_append(sub);
2190 }
2191 off += p.get_len();
2192 }
2193 }
2194
2195 void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid)
2196 {
2197 dout(20) << __func__ << ": " << soid << " from " << from << dendl;
2198 list<pg_shard_t> fl = { from };
2199 auto it = pulling.find(soid);
2200 assert(it != pulling.end());
2201 get_parent()->failed_push(fl, soid, it->second.recovery_info.version);
2202
2203 clear_pull(it);
2204 }
2205
2206 void ReplicatedBackend::clear_pull_from(
2207 map<hobject_t, PullInfo>::iterator piter)
2208 {
2209 auto from = piter->second.from;
2210 pull_from_peer[from].erase(piter->second.soid);
2211 if (pull_from_peer[from].empty())
2212 pull_from_peer.erase(from);
2213 }
2214
2215 void ReplicatedBackend::clear_pull(
2216 map<hobject_t, PullInfo>::iterator piter,
2217 bool clear_pull_from_peer)
2218 {
2219 if (clear_pull_from_peer) {
2220 clear_pull_from(piter);
2221 }
2222 get_parent()->release_locks(piter->second.lock_manager);
2223 pulling.erase(piter);
2224 }
2225
2226 int ReplicatedBackend::start_pushes(
2227 const hobject_t &soid,
2228 ObjectContextRef obc,
2229 RPGHandle *h)
2230 {
2231 list< map<pg_shard_t, pg_missing_t>::const_iterator > shards;
2232
2233 dout(20) << __func__ << " soid " << soid << dendl;
2234 // who needs it?
2235 ceph_assert(get_parent()->get_acting_recovery_backfill_shards().size() > 0);
2236 for (set<pg_shard_t>::iterator i =
2237 get_parent()->get_acting_recovery_backfill_shards().begin();
2238 i != get_parent()->get_acting_recovery_backfill_shards().end();
2239 ++i) {
2240 if (*i == get_parent()->whoami_shard()) continue;
2241 pg_shard_t peer = *i;
2242 map<pg_shard_t, pg_missing_t>::const_iterator j =
2243 get_parent()->get_shard_missing().find(peer);
2244 ceph_assert(j != get_parent()->get_shard_missing().end());
2245 if (j->second.is_missing(soid)) {
2246 shards.push_back(j);
2247 }
2248 }
2249
2250 // If more than 1 read will occur ignore possible request to not cache
2251 bool cache = shards.size() == 1 ? h->cache_dont_need : false;
2252
2253 for (auto j : shards) {
2254 pg_shard_t peer = j->first;
2255 h->pushes[peer].push_back(PushOp());
2256 int r = prep_push_to_replica(obc, soid, peer,
2257 &(h->pushes[peer].back()), cache);
2258 if (r < 0) {
2259 // Back out all failed reads
2260 for (auto k : shards) {
2261 pg_shard_t p = k->first;
2262 dout(10) << __func__ << " clean up peer " << p << dendl;
2263 h->pushes[p].pop_back();
2264 if (p == peer) break;
2265 }
2266 return r;
2267 }
2268 }
2269 return shards.size();
2270 }