]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/ReplicatedBackend.cc
update sources to 12.2.7
[ceph.git] / ceph / src / osd / ReplicatedBackend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #include "common/errno.h"
15 #include "ReplicatedBackend.h"
16 #include "messages/MOSDOp.h"
17 #include "messages/MOSDSubOp.h"
18 #include "messages/MOSDRepOp.h"
19 #include "messages/MOSDSubOpReply.h"
20 #include "messages/MOSDRepOpReply.h"
21 #include "messages/MOSDPGPush.h"
22 #include "messages/MOSDPGPull.h"
23 #include "messages/MOSDPGPushReply.h"
24 #include "common/EventTrace.h"
25
26 #define dout_context cct
27 #define dout_subsys ceph_subsys_osd
28 #define DOUT_PREFIX_ARGS this
29 #undef dout_prefix
30 #define dout_prefix _prefix(_dout, this)
31 static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
32 return *_dout << pgb->get_parent()->gen_dbg_prefix();
33 }
34
35 namespace {
36 class PG_SendMessageOnConn: public Context {
37 PGBackend::Listener *pg;
38 Message *reply;
39 ConnectionRef conn;
40 public:
41 PG_SendMessageOnConn(
42 PGBackend::Listener *pg,
43 Message *reply,
44 ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {}
45 void finish(int) override {
46 pg->send_message_osd_cluster(reply, conn.get());
47 }
48 };
49
50 class PG_RecoveryQueueAsync : public Context {
51 PGBackend::Listener *pg;
52 unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
53 public:
54 PG_RecoveryQueueAsync(
55 PGBackend::Listener *pg,
56 GenContext<ThreadPool::TPHandle&> *c) : pg(pg), c(c) {}
57 void finish(int) override {
58 pg->schedule_recovery_work(c.release());
59 }
60 };
61 }
62
63 struct ReplicatedBackend::C_OSD_RepModifyApply : public Context {
64 ReplicatedBackend *pg;
65 RepModifyRef rm;
66 C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r)
67 : pg(pg), rm(r) {}
68 void finish(int r) override {
69 pg->repop_applied(rm);
70 }
71 };
72
73 struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context {
74 ReplicatedBackend *pg;
75 RepModifyRef rm;
76 C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
77 : pg(pg), rm(r) {}
78 void finish(int r) override {
79 pg->repop_commit(rm);
80 }
81 };
82
83 static void log_subop_stats(
84 PerfCounters *logger,
85 OpRequestRef op, int subop)
86 {
87 utime_t now = ceph_clock_now();
88 utime_t latency = now;
89 latency -= op->get_req()->get_recv_stamp();
90
91
92 logger->inc(l_osd_sop);
93 logger->tinc(l_osd_sop_lat, latency);
94 logger->inc(subop);
95
96 if (subop != l_osd_sop_pull) {
97 uint64_t inb = op->get_req()->get_data().length();
98 logger->inc(l_osd_sop_inb, inb);
99 if (subop == l_osd_sop_w) {
100 logger->inc(l_osd_sop_w_inb, inb);
101 logger->tinc(l_osd_sop_w_lat, latency);
102 } else if (subop == l_osd_sop_push) {
103 logger->inc(l_osd_sop_push_inb, inb);
104 logger->tinc(l_osd_sop_push_lat, latency);
105 } else
106 assert("no support subop" == 0);
107 } else {
108 logger->tinc(l_osd_sop_pull_lat, latency);
109 }
110 }
111
112 ReplicatedBackend::ReplicatedBackend(
113 PGBackend::Listener *pg,
114 coll_t coll,
115 ObjectStore::CollectionHandle &c,
116 ObjectStore *store,
117 CephContext *cct) :
118 PGBackend(cct, pg, store, coll, c) {}
119
120 void ReplicatedBackend::run_recovery_op(
121 PGBackend::RecoveryHandle *_h,
122 int priority)
123 {
124 RPGHandle *h = static_cast<RPGHandle *>(_h);
125 send_pushes(priority, h->pushes);
126 send_pulls(priority, h->pulls);
127 send_recovery_deletes(priority, h->deletes);
128 delete h;
129 }
130
131 int ReplicatedBackend::recover_object(
132 const hobject_t &hoid,
133 eversion_t v,
134 ObjectContextRef head,
135 ObjectContextRef obc,
136 RecoveryHandle *_h
137 )
138 {
139 dout(10) << __func__ << ": " << hoid << dendl;
140 RPGHandle *h = static_cast<RPGHandle *>(_h);
141 if (get_parent()->get_local_missing().is_missing(hoid)) {
142 assert(!obc);
143 // pull
144 prepare_pull(
145 v,
146 hoid,
147 head,
148 h);
149 } else {
150 assert(obc);
151 int started = start_pushes(
152 hoid,
153 obc,
154 h);
155 if (started < 0) {
156 pushing[hoid].clear();
157 return started;
158 }
159 }
160 return 0;
161 }
162
163 void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
164 {
165 for(map<pg_shard_t, set<hobject_t> >::iterator i = pull_from_peer.begin();
166 i != pull_from_peer.end();
167 ) {
168 if (osdmap->is_down(i->first.osd)) {
169 dout(10) << "check_recovery_sources resetting pulls from osd." << i->first
170 << ", osdmap has it marked down" << dendl;
171 for (set<hobject_t>::iterator j = i->second.begin();
172 j != i->second.end();
173 ++j) {
174 get_parent()->cancel_pull(*j);
175 clear_pull(pulling.find(*j), false);
176 }
177 pull_from_peer.erase(i++);
178 } else {
179 ++i;
180 }
181 }
182 }
183
184 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op)
185 {
186 dout(10) << __func__ << ": " << op << dendl;
187 switch (op->get_req()->get_type()) {
188 case MSG_OSD_PG_PULL:
189 return true;
190 default:
191 return false;
192 }
193 }
194
195 bool ReplicatedBackend::_handle_message(
196 OpRequestRef op
197 )
198 {
199 dout(10) << __func__ << ": " << op << dendl;
200 switch (op->get_req()->get_type()) {
201 case MSG_OSD_PG_PUSH:
202 do_push(op);
203 return true;
204
205 case MSG_OSD_PG_PULL:
206 do_pull(op);
207 return true;
208
209 case MSG_OSD_PG_PUSH_REPLY:
210 do_push_reply(op);
211 return true;
212
213 case MSG_OSD_SUBOP: {
214 const MOSDSubOp *m = static_cast<const MOSDSubOp*>(op->get_req());
215 if (m->ops.size() == 0) {
216 assert(0);
217 }
218 break;
219 }
220
221 case MSG_OSD_REPOP: {
222 do_repop(op);
223 return true;
224 }
225
226 case MSG_OSD_REPOPREPLY: {
227 do_repop_reply(op);
228 return true;
229 }
230
231 default:
232 break;
233 }
234 return false;
235 }
236
237 void ReplicatedBackend::clear_recovery_state()
238 {
239 // clear pushing/pulling maps
240 for (auto &&i: pushing) {
241 for (auto &&j: i.second) {
242 get_parent()->release_locks(j.second.lock_manager);
243 }
244 }
245 pushing.clear();
246
247 for (auto &&i: pulling) {
248 get_parent()->release_locks(i.second.lock_manager);
249 }
250 pulling.clear();
251 pull_from_peer.clear();
252 }
253
254 void ReplicatedBackend::on_change()
255 {
256 dout(10) << __func__ << dendl;
257 for (map<ceph_tid_t, InProgressOp>::iterator i = in_progress_ops.begin();
258 i != in_progress_ops.end();
259 in_progress_ops.erase(i++)) {
260 if (i->second.on_commit)
261 delete i->second.on_commit;
262 if (i->second.on_applied)
263 delete i->second.on_applied;
264 }
265 clear_recovery_state();
266 }
267
268 void ReplicatedBackend::on_flushed()
269 {
270 }
271
272 int ReplicatedBackend::objects_read_sync(
273 const hobject_t &hoid,
274 uint64_t off,
275 uint64_t len,
276 uint32_t op_flags,
277 bufferlist *bl)
278 {
279 return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags);
280 }
281
282 struct AsyncReadCallback : public GenContext<ThreadPool::TPHandle&> {
283 int r;
284 Context *c;
285 AsyncReadCallback(int r, Context *c) : r(r), c(c) {}
286 void finish(ThreadPool::TPHandle&) override {
287 c->complete(r);
288 c = NULL;
289 }
290 ~AsyncReadCallback() override {
291 delete c;
292 }
293 };
294 void ReplicatedBackend::objects_read_async(
295 const hobject_t &hoid,
296 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
297 pair<bufferlist*, Context*> > > &to_read,
298 Context *on_complete,
299 bool fast_read)
300 {
301 // There is no fast read implementation for replication backend yet
302 assert(!fast_read);
303
304 int r = 0;
305 for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
306 pair<bufferlist*, Context*> > >::const_iterator i =
307 to_read.begin();
308 i != to_read.end() && r >= 0;
309 ++i) {
310 int _r = store->read(ch, ghobject_t(hoid), i->first.get<0>(),
311 i->first.get<1>(), *(i->second.first),
312 i->first.get<2>());
313 if (i->second.second) {
314 get_parent()->schedule_recovery_work(
315 get_parent()->bless_gencontext(
316 new AsyncReadCallback(_r, i->second.second)));
317 }
318 if (_r < 0)
319 r = _r;
320 }
321 get_parent()->schedule_recovery_work(
322 get_parent()->bless_gencontext(
323 new AsyncReadCallback(r, on_complete)));
324 }
325
326 class C_OSD_OnOpCommit : public Context {
327 ReplicatedBackend *pg;
328 ReplicatedBackend::InProgressOp *op;
329 public:
330 C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
331 : pg(pg), op(op) {}
332 void finish(int) override {
333 pg->op_commit(op);
334 }
335 };
336
337 class C_OSD_OnOpApplied : public Context {
338 ReplicatedBackend *pg;
339 ReplicatedBackend::InProgressOp *op;
340 public:
341 C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
342 : pg(pg), op(op) {}
343 void finish(int) override {
344 pg->op_applied(op);
345 }
346 };
347
348 void generate_transaction(
349 PGTransactionUPtr &pgt,
350 const coll_t &coll,
351 bool legacy_log_entries,
352 vector<pg_log_entry_t> &log_entries,
353 ObjectStore::Transaction *t,
354 set<hobject_t> *added,
355 set<hobject_t> *removed)
356 {
357 assert(t);
358 assert(added);
359 assert(removed);
360
361 for (auto &&le: log_entries) {
362 le.mark_unrollbackable();
363 auto oiter = pgt->op_map.find(le.soid);
364 if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) {
365 bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8);
366 ::encode(oiter->second.updated_snaps->second, bl);
367 le.snaps.swap(bl);
368 le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
369 }
370 }
371
372 pgt->safe_create_traverse(
373 [&](pair<const hobject_t, PGTransaction::ObjectOperation> &obj_op) {
374 const hobject_t &oid = obj_op.first;
375 const ghobject_t goid =
376 ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
377 const PGTransaction::ObjectOperation &op = obj_op.second;
378
379 if (oid.is_temp()) {
380 if (op.is_fresh_object()) {
381 added->insert(oid);
382 } else if (op.is_delete()) {
383 removed->insert(oid);
384 }
385 }
386
387 if (op.delete_first) {
388 t->remove(coll, goid);
389 }
390
391 match(
392 op.init_type,
393 [&](const PGTransaction::ObjectOperation::Init::None &) {
394 },
395 [&](const PGTransaction::ObjectOperation::Init::Create &op) {
396 t->touch(coll, goid);
397 },
398 [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
399 t->clone(
400 coll,
401 ghobject_t(
402 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
403 goid);
404 },
405 [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
406 assert(op.source.is_temp());
407 t->collection_move_rename(
408 coll,
409 ghobject_t(
410 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
411 coll,
412 goid);
413 });
414
415 if (op.truncate) {
416 t->truncate(coll, goid, op.truncate->first);
417 if (op.truncate->first != op.truncate->second)
418 t->truncate(coll, goid, op.truncate->second);
419 }
420
421 if (!op.attr_updates.empty()) {
422 map<string, bufferlist> attrs;
423 for (auto &&p: op.attr_updates) {
424 if (p.second)
425 attrs[p.first] = *(p.second);
426 else
427 t->rmattr(coll, goid, p.first);
428 }
429 t->setattrs(coll, goid, attrs);
430 }
431
432 if (op.clear_omap)
433 t->omap_clear(coll, goid);
434 if (op.omap_header)
435 t->omap_setheader(coll, goid, *(op.omap_header));
436
437 for (auto &&up: op.omap_updates) {
438 using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType;
439 switch (up.first) {
440 case UpdateType::Remove:
441 t->omap_rmkeys(coll, goid, up.second);
442 break;
443 case UpdateType::Insert:
444 t->omap_setkeys(coll, goid, up.second);
445 break;
446 }
447 }
448
449 // updated_snaps doesn't matter since we marked unrollbackable
450
451 if (op.alloc_hint) {
452 auto &hint = *(op.alloc_hint);
453 t->set_alloc_hint(
454 coll,
455 goid,
456 hint.expected_object_size,
457 hint.expected_write_size,
458 hint.flags);
459 }
460
461 for (auto &&extent: op.buffer_updates) {
462 using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
463 match(
464 extent.get_val(),
465 [&](const BufferUpdate::Write &op) {
466 t->write(
467 coll,
468 goid,
469 extent.get_off(),
470 extent.get_len(),
471 op.buffer);
472 },
473 [&](const BufferUpdate::Zero &op) {
474 t->zero(
475 coll,
476 goid,
477 extent.get_off(),
478 extent.get_len());
479 },
480 [&](const BufferUpdate::CloneRange &op) {
481 assert(op.len == extent.get_len());
482 t->clone_range(
483 coll,
484 ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
485 goid,
486 op.offset,
487 extent.get_len(),
488 extent.get_off());
489 });
490 }
491 });
492 }
493
494 void ReplicatedBackend::submit_transaction(
495 const hobject_t &soid,
496 const object_stat_sum_t &delta_stats,
497 const eversion_t &at_version,
498 PGTransactionUPtr &&_t,
499 const eversion_t &trim_to,
500 const eversion_t &roll_forward_to,
501 const vector<pg_log_entry_t> &_log_entries,
502 boost::optional<pg_hit_set_history_t> &hset_history,
503 Context *on_local_applied_sync,
504 Context *on_all_acked,
505 Context *on_all_commit,
506 ceph_tid_t tid,
507 osd_reqid_t reqid,
508 OpRequestRef orig_op)
509 {
510 parent->apply_stats(
511 soid,
512 delta_stats);
513
514 vector<pg_log_entry_t> log_entries(_log_entries);
515 ObjectStore::Transaction op_t;
516 PGTransactionUPtr t(std::move(_t));
517 set<hobject_t> added, removed;
518 generate_transaction(
519 t,
520 coll,
521 (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
522 log_entries,
523 &op_t,
524 &added,
525 &removed);
526 assert(added.size() <= 1);
527 assert(removed.size() <= 1);
528
529 assert(!in_progress_ops.count(tid));
530 InProgressOp &op = in_progress_ops.insert(
531 make_pair(
532 tid,
533 InProgressOp(
534 tid, on_all_commit, on_all_acked,
535 orig_op, at_version)
536 )
537 ).first->second;
538
539 op.waiting_for_applied.insert(
540 parent->get_actingbackfill_shards().begin(),
541 parent->get_actingbackfill_shards().end());
542 op.waiting_for_commit.insert(
543 parent->get_actingbackfill_shards().begin(),
544 parent->get_actingbackfill_shards().end());
545
546 issue_op(
547 soid,
548 at_version,
549 tid,
550 reqid,
551 trim_to,
552 at_version,
553 added.size() ? *(added.begin()) : hobject_t(),
554 removed.size() ? *(removed.begin()) : hobject_t(),
555 log_entries,
556 hset_history,
557 &op,
558 op_t);
559
560 add_temp_objs(added);
561 clear_temp_objs(removed);
562
563 parent->log_operation(
564 log_entries,
565 hset_history,
566 trim_to,
567 at_version,
568 true,
569 op_t);
570
571 op_t.register_on_applied_sync(on_local_applied_sync);
572 op_t.register_on_applied(
573 parent->bless_context(
574 new C_OSD_OnOpApplied(this, &op)));
575 op_t.register_on_commit(
576 parent->bless_context(
577 new C_OSD_OnOpCommit(this, &op)));
578
579 vector<ObjectStore::Transaction> tls;
580 tls.push_back(std::move(op_t));
581
582 parent->queue_transactions(tls, op.op);
583 }
584
585 void ReplicatedBackend::op_applied(
586 InProgressOp *op)
587 {
588 FUNCTRACE();
589 OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_APPLIED_BEGIN", true);
590 dout(10) << __func__ << ": " << op->tid << dendl;
591 if (op->op) {
592 op->op->mark_event("op_applied");
593 op->op->pg_trace.event("op applied");
594 }
595
596 op->waiting_for_applied.erase(get_parent()->whoami_shard());
597 parent->op_applied(op->v);
598
599 if (op->waiting_for_applied.empty()) {
600 op->on_applied->complete(0);
601 op->on_applied = 0;
602 }
603 if (op->done()) {
604 assert(!op->on_commit && !op->on_applied);
605 in_progress_ops.erase(op->tid);
606 }
607 }
608
609 void ReplicatedBackend::op_commit(
610 InProgressOp *op)
611 {
612 FUNCTRACE();
613 OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_COMMIT_BEGIN", true);
614 dout(10) << __func__ << ": " << op->tid << dendl;
615 if (op->op) {
616 op->op->mark_event("op_commit");
617 op->op->pg_trace.event("op commit");
618 }
619
620 op->waiting_for_commit.erase(get_parent()->whoami_shard());
621
622 if (op->waiting_for_commit.empty()) {
623 op->on_commit->complete(0);
624 op->on_commit = 0;
625 }
626 if (op->done()) {
627 assert(!op->on_commit && !op->on_applied);
628 in_progress_ops.erase(op->tid);
629 }
630 }
631
632 void ReplicatedBackend::do_repop_reply(OpRequestRef op)
633 {
634 static_cast<MOSDRepOpReply*>(op->get_nonconst_req())->finish_decode();
635 const MOSDRepOpReply *r = static_cast<const MOSDRepOpReply *>(op->get_req());
636 assert(r->get_header().type == MSG_OSD_REPOPREPLY);
637
638 op->mark_started();
639
640 // must be replication.
641 ceph_tid_t rep_tid = r->get_tid();
642 pg_shard_t from = r->from;
643
644 if (in_progress_ops.count(rep_tid)) {
645 map<ceph_tid_t, InProgressOp>::iterator iter =
646 in_progress_ops.find(rep_tid);
647 InProgressOp &ip_op = iter->second;
648 const MOSDOp *m = NULL;
649 if (ip_op.op)
650 m = static_cast<const MOSDOp *>(ip_op.op->get_req());
651
652 if (m)
653 dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
654 << " ack_type " << (int)r->ack_type
655 << " from " << from
656 << dendl;
657 else
658 dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) "
659 << " ack_type " << (int)r->ack_type
660 << " from " << from
661 << dendl;
662
663 // oh, good.
664
665 if (r->ack_type & CEPH_OSD_FLAG_ONDISK) {
666 assert(ip_op.waiting_for_commit.count(from));
667 ip_op.waiting_for_commit.erase(from);
668 if (ip_op.op) {
669 ostringstream ss;
670 ss << "sub_op_commit_rec from " << from;
671 ip_op.op->mark_event_string(ss.str());
672 ip_op.op->pg_trace.event("sub_op_commit_rec");
673 }
674 } else {
675 assert(ip_op.waiting_for_applied.count(from));
676 if (ip_op.op) {
677 ostringstream ss;
678 ss << "sub_op_applied_rec from " << from;
679 ip_op.op->mark_event_string(ss.str());
680 ip_op.op->pg_trace.event("sub_op_applied_rec");
681 }
682 }
683 ip_op.waiting_for_applied.erase(from);
684
685 parent->update_peer_last_complete_ondisk(
686 from,
687 r->get_last_complete_ondisk());
688
689 if (ip_op.waiting_for_applied.empty() &&
690 ip_op.on_applied) {
691 ip_op.on_applied->complete(0);
692 ip_op.on_applied = 0;
693 }
694 if (ip_op.waiting_for_commit.empty() &&
695 ip_op.on_commit) {
696 ip_op.on_commit->complete(0);
697 ip_op.on_commit= 0;
698 }
699 if (ip_op.done()) {
700 assert(!ip_op.on_commit && !ip_op.on_applied);
701 in_progress_ops.erase(iter);
702 }
703 }
704 }
705
706 int ReplicatedBackend::be_deep_scrub(
707 const hobject_t &poid,
708 ScrubMap &map,
709 ScrubMapBuilder &pos,
710 ScrubMap::object &o)
711 {
712 dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
713 int r;
714 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
715 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
716
717 utime_t sleeptime;
718 sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
719 if (sleeptime != utime_t()) {
720 lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
721 sleeptime.sleep();
722 }
723
724 assert(poid == pos.ls[pos.pos]);
725 if (!pos.data_done()) {
726 if (pos.data_pos == 0) {
727 pos.data_hash = bufferhash(-1);
728 }
729
730 bufferlist bl;
731 r = store->read(
732 ch,
733 ghobject_t(
734 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
735 pos.data_pos,
736 cct->_conf->osd_deep_scrub_stride, bl,
737 fadvise_flags);
738 if (r < 0) {
739 dout(20) << __func__ << " " << poid << " got "
740 << r << " on read, read_error" << dendl;
741 o.read_error = true;
742 return 0;
743 }
744 if (r > 0) {
745 pos.data_hash << bl;
746 }
747 pos.data_pos += r;
748 if (r == cct->_conf->osd_deep_scrub_stride) {
749 dout(20) << __func__ << " " << poid << " more data, digest so far 0x"
750 << std::hex << pos.data_hash.digest() << std::dec << dendl;
751 return -EINPROGRESS;
752 }
753 // done with bytes
754 pos.data_pos = -1;
755 o.digest = pos.data_hash.digest();
756 o.digest_present = true;
757 dout(20) << __func__ << " " << poid << " done with data, digest 0x"
758 << std::hex << o.digest << std::dec << dendl;
759 }
760
761 // omap header
762 if (pos.omap_pos.empty()) {
763 pos.omap_hash = bufferhash(-1);
764
765 bufferlist hdrbl;
766 r = store->omap_get_header(
767 coll,
768 ghobject_t(
769 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
770 &hdrbl, true);
771 if (r == -EIO) {
772 dout(20) << __func__ << " " << poid << " got "
773 << r << " on omap header read, read_error" << dendl;
774 o.read_error = true;
775 return 0;
776 }
777 if (r == 0 && hdrbl.length()) {
778 dout(25) << "CRC header " << string(hdrbl.c_str(), hdrbl.length())
779 << dendl;
780 pos.omap_hash << hdrbl;
781 }
782 }
783
784 // omap
785 ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(
786 coll,
787 ghobject_t(
788 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
789 assert(iter);
790 if (pos.omap_pos.length()) {
791 iter->lower_bound(pos.omap_pos);
792 } else {
793 iter->seek_to_first();
794 }
795 int max = g_conf->osd_deep_scrub_keys;
796 while (iter->status() == 0 && iter->valid()) {
797 pos.omap_bytes += iter->value().length();
798 ++pos.omap_keys;
799 --max;
800 // fixme: we can do this more efficiently.
801 bufferlist bl;
802 ::encode(iter->key(), bl);
803 ::encode(iter->value(), bl);
804 pos.omap_hash << bl;
805
806 iter->next();
807
808 if (iter->valid() && max == 0) {
809 pos.omap_pos = iter->key();
810 return -EINPROGRESS;
811 }
812 if (iter->status() < 0) {
813 dout(25) << __func__ << " " << poid
814 << " on omap scan, db status error" << dendl;
815 o.read_error = true;
816 return 0;
817 }
818 }
819
820 if (pos.omap_keys > cct->_conf->
821 osd_deep_scrub_large_omap_object_key_threshold ||
822 pos.omap_bytes > cct->_conf->
823 osd_deep_scrub_large_omap_object_value_sum_threshold) {
824 dout(25) << __func__ << " " << poid
825 << " large omap object detected. Object has " << pos.omap_keys
826 << " keys and size " << pos.omap_bytes << " bytes" << dendl;
827 o.large_omap_object_found = true;
828 o.large_omap_object_key_count = pos.omap_keys;
829 o.large_omap_object_value_size = pos.omap_bytes;
830 map.has_large_omap_object_errors = true;
831 }
832
833 o.omap_digest = pos.omap_hash.digest();
834 o.omap_digest_present = true;
835 dout(20) << __func__ << " done with " << poid << " omap_digest "
836 << std::hex << o.omap_digest << std::dec << dendl;
837
838 // done!
839 return 0;
840 }
841
842 void ReplicatedBackend::_do_push(OpRequestRef op)
843 {
844 const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
845 assert(m->get_type() == MSG_OSD_PG_PUSH);
846 pg_shard_t from = m->from;
847
848 op->mark_started();
849
850 vector<PushReplyOp> replies;
851 ObjectStore::Transaction t;
852 ostringstream ss;
853 if (get_parent()->check_failsafe_full(ss)) {
854 dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl;
855 ceph_abort();
856 }
857 for (vector<PushOp>::const_iterator i = m->pushes.begin();
858 i != m->pushes.end();
859 ++i) {
860 replies.push_back(PushReplyOp());
861 handle_push(from, *i, &(replies.back()), &t);
862 }
863
864 MOSDPGPushReply *reply = new MOSDPGPushReply;
865 reply->from = get_parent()->whoami_shard();
866 reply->set_priority(m->get_priority());
867 reply->pgid = get_info().pgid;
868 reply->map_epoch = m->map_epoch;
869 reply->min_epoch = m->min_epoch;
870 reply->replies.swap(replies);
871 reply->compute_cost(cct);
872
873 t.register_on_complete(
874 new PG_SendMessageOnConn(
875 get_parent(), reply, m->get_connection()));
876
877 get_parent()->queue_transaction(std::move(t));
878 }
879
880 struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
881 ReplicatedBackend *bc;
882 list<ReplicatedBackend::pull_complete_info> to_continue;
883 int priority;
884 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
885 : bc(bc), priority(priority) {}
886
887 void finish(ThreadPool::TPHandle &handle) override {
888 ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
889 for (auto &&i: to_continue) {
890 auto j = bc->pulling.find(i.hoid);
891 assert(j != bc->pulling.end());
892 ObjectContextRef obc = j->second.obc;
893 bc->clear_pull(j, false /* already did it */);
894 int started = bc->start_pushes(i.hoid, obc, h);
895 if (started < 0) {
896 bc->pushing[i.hoid].clear();
897 bc->get_parent()->primary_failed(i.hoid);
898 bc->get_parent()->primary_error(i.hoid, obc->obs.oi.version);
899 } else if (!started) {
900 bc->get_parent()->on_global_recover(
901 i.hoid, i.stat, false);
902 }
903 handle.reset_tp_timeout();
904 }
905 bc->run_recovery_op(h, priority);
906 }
907 };
908
909 void ReplicatedBackend::_do_pull_response(OpRequestRef op)
910 {
911 const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
912 assert(m->get_type() == MSG_OSD_PG_PUSH);
913 pg_shard_t from = m->from;
914
915 op->mark_started();
916
917 vector<PullOp> replies(1);
918
919 ostringstream ss;
920 if (get_parent()->check_failsafe_full(ss)) {
921 dout(10) << __func__ << " Out of space (failsafe) processing pull response (push): " << ss.str() << dendl;
922 ceph_abort();
923 }
924
925 ObjectStore::Transaction t;
926 list<pull_complete_info> to_continue;
927 for (vector<PushOp>::const_iterator i = m->pushes.begin();
928 i != m->pushes.end();
929 ++i) {
930 bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, &t);
931 if (more)
932 replies.push_back(PullOp());
933 }
934 if (!to_continue.empty()) {
935 C_ReplicatedBackend_OnPullComplete *c =
936 new C_ReplicatedBackend_OnPullComplete(
937 this,
938 m->get_priority());
939 c->to_continue.swap(to_continue);
940 t.register_on_complete(
941 new PG_RecoveryQueueAsync(
942 get_parent(),
943 get_parent()->bless_gencontext(c)));
944 }
945 replies.erase(replies.end() - 1);
946
947 if (replies.size()) {
948 MOSDPGPull *reply = new MOSDPGPull;
949 reply->from = parent->whoami_shard();
950 reply->set_priority(m->get_priority());
951 reply->pgid = get_info().pgid;
952 reply->map_epoch = m->map_epoch;
953 reply->min_epoch = m->min_epoch;
954 reply->set_pulls(&replies);
955 reply->compute_cost(cct);
956
957 t.register_on_complete(
958 new PG_SendMessageOnConn(
959 get_parent(), reply, m->get_connection()));
960 }
961
962 get_parent()->queue_transaction(std::move(t));
963 }
964
965 void ReplicatedBackend::do_pull(OpRequestRef op)
966 {
967 MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_nonconst_req());
968 assert(m->get_type() == MSG_OSD_PG_PULL);
969 pg_shard_t from = m->from;
970
971 map<pg_shard_t, vector<PushOp> > replies;
972 vector<PullOp> pulls;
973 m->take_pulls(&pulls);
974 for (auto& i : pulls) {
975 replies[from].push_back(PushOp());
976 handle_pull(from, i, &(replies[from].back()));
977 }
978 send_pushes(m->get_priority(), replies);
979 }
980
981 void ReplicatedBackend::do_push_reply(OpRequestRef op)
982 {
983 const MOSDPGPushReply *m = static_cast<const MOSDPGPushReply *>(op->get_req());
984 assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
985 pg_shard_t from = m->from;
986
987 vector<PushOp> replies(1);
988 for (vector<PushReplyOp>::const_iterator i = m->replies.begin();
989 i != m->replies.end();
990 ++i) {
991 bool more = handle_push_reply(from, *i, &(replies.back()));
992 if (more)
993 replies.push_back(PushOp());
994 }
995 replies.erase(replies.end() - 1);
996
997 map<pg_shard_t, vector<PushOp> > _replies;
998 _replies[from].swap(replies);
999 send_pushes(m->get_priority(), _replies);
1000 }
1001
1002 Message * ReplicatedBackend::generate_subop(
1003 const hobject_t &soid,
1004 const eversion_t &at_version,
1005 ceph_tid_t tid,
1006 osd_reqid_t reqid,
1007 eversion_t pg_trim_to,
1008 eversion_t pg_roll_forward_to,
1009 hobject_t new_temp_oid,
1010 hobject_t discard_temp_oid,
1011 const vector<pg_log_entry_t> &log_entries,
1012 boost::optional<pg_hit_set_history_t> &hset_hist,
1013 ObjectStore::Transaction &op_t,
1014 pg_shard_t peer,
1015 const pg_info_t &pinfo)
1016 {
1017 int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
1018 // forward the write/update/whatever
1019 MOSDRepOp *wr = new MOSDRepOp(
1020 reqid, parent->whoami_shard(),
1021 spg_t(get_info().pgid.pgid, peer.shard),
1022 soid, acks_wanted,
1023 get_osdmap()->get_epoch(),
1024 parent->get_last_peering_reset_epoch(),
1025 tid, at_version);
1026
1027 // ship resulting transaction, log entries, and pg_stats
1028 if (!parent->should_send_op(peer, soid)) {
1029 dout(10) << "issue_repop shipping empty opt to osd." << peer
1030 <<", object " << soid
1031 << " beyond MAX(last_backfill_started "
1032 << ", pinfo.last_backfill "
1033 << pinfo.last_backfill << ")" << dendl;
1034 ObjectStore::Transaction t;
1035 ::encode(t, wr->get_data());
1036 } else {
1037 ::encode(op_t, wr->get_data());
1038 wr->get_header().data_off = op_t.get_data_alignment();
1039 }
1040
1041 ::encode(log_entries, wr->logbl);
1042
1043 if (pinfo.is_incomplete())
1044 wr->pg_stats = pinfo.stats; // reflects backfill progress
1045 else
1046 wr->pg_stats = get_info().stats;
1047
1048 wr->pg_trim_to = pg_trim_to;
1049 wr->pg_roll_forward_to = pg_roll_forward_to;
1050
1051 wr->new_temp_oid = new_temp_oid;
1052 wr->discard_temp_oid = discard_temp_oid;
1053 wr->updated_hit_set_history = hset_hist;
1054 return wr;
1055 }
1056
1057 void ReplicatedBackend::issue_op(
1058 const hobject_t &soid,
1059 const eversion_t &at_version,
1060 ceph_tid_t tid,
1061 osd_reqid_t reqid,
1062 eversion_t pg_trim_to,
1063 eversion_t pg_roll_forward_to,
1064 hobject_t new_temp_oid,
1065 hobject_t discard_temp_oid,
1066 const vector<pg_log_entry_t> &log_entries,
1067 boost::optional<pg_hit_set_history_t> &hset_hist,
1068 InProgressOp *op,
1069 ObjectStore::Transaction &op_t)
1070 {
1071 if (op->op)
1072 op->op->pg_trace.event("issue replication ops");
1073
1074 if (parent->get_actingbackfill_shards().size() > 1) {
1075 ostringstream ss;
1076 set<pg_shard_t> replicas = parent->get_actingbackfill_shards();
1077 replicas.erase(parent->whoami_shard());
1078 ss << "waiting for subops from " << replicas;
1079 if (op->op)
1080 op->op->mark_sub_op_sent(ss.str());
1081 }
1082 for (set<pg_shard_t>::const_iterator i =
1083 parent->get_actingbackfill_shards().begin();
1084 i != parent->get_actingbackfill_shards().end();
1085 ++i) {
1086 if (*i == parent->whoami_shard()) continue;
1087 pg_shard_t peer = *i;
1088 const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second;
1089
1090 Message *wr;
1091 wr = generate_subop(
1092 soid,
1093 at_version,
1094 tid,
1095 reqid,
1096 pg_trim_to,
1097 pg_roll_forward_to,
1098 new_temp_oid,
1099 discard_temp_oid,
1100 log_entries,
1101 hset_hist,
1102 op_t,
1103 peer,
1104 pinfo);
1105 if (op->op)
1106 wr->trace.init("replicated op", nullptr, &op->op->pg_trace);
1107 get_parent()->send_message_osd_cluster(
1108 peer.osd, wr, get_osdmap()->get_epoch());
1109 }
1110 }
1111
1112 // sub op modify
1113 void ReplicatedBackend::do_repop(OpRequestRef op)
1114 {
1115 static_cast<MOSDRepOp*>(op->get_nonconst_req())->finish_decode();
1116 const MOSDRepOp *m = static_cast<const MOSDRepOp *>(op->get_req());
1117 int msg_type = m->get_type();
1118 assert(MSG_OSD_REPOP == msg_type);
1119
1120 const hobject_t& soid = m->poid;
1121
1122 dout(10) << __func__ << " " << soid
1123 << " v " << m->version
1124 << (m->logbl.length() ? " (transaction)" : " (parallel exec")
1125 << " " << m->logbl.length()
1126 << dendl;
1127
1128 // sanity checks
1129 assert(m->map_epoch >= get_info().history.same_interval_since);
1130
1131 // we better not be missing this.
1132 assert(!parent->get_log().get_missing().is_missing(soid));
1133
1134 parent->maybe_preempt_replica_scrub(soid);
1135
1136 int ackerosd = m->get_source().num();
1137
1138 op->mark_started();
1139
1140 RepModifyRef rm(std::make_shared<RepModify>());
1141 rm->op = op;
1142 rm->ackerosd = ackerosd;
1143 rm->last_complete = get_info().last_complete;
1144 rm->epoch_started = get_osdmap()->get_epoch();
1145
1146 assert(m->logbl.length());
1147 // shipped transaction and log entries
1148 vector<pg_log_entry_t> log;
1149
1150 bufferlist::iterator p = const_cast<bufferlist&>(m->get_data()).begin();
1151 ::decode(rm->opt, p);
1152
1153 if (m->new_temp_oid != hobject_t()) {
1154 dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
1155 add_temp_obj(m->new_temp_oid);
1156 }
1157 if (m->discard_temp_oid != hobject_t()) {
1158 dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
1159 if (rm->opt.empty()) {
1160 dout(10) << __func__ << ": removing object " << m->discard_temp_oid
1161 << " since we won't get the transaction" << dendl;
1162 rm->localt.remove(coll, ghobject_t(m->discard_temp_oid));
1163 }
1164 clear_temp_obj(m->discard_temp_oid);
1165 }
1166
1167 p = const_cast<bufferlist&>(m->logbl).begin();
1168 ::decode(log, p);
1169 rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1170
1171 bool update_snaps = false;
1172 if (!rm->opt.empty()) {
1173 // If the opt is non-empty, we infer we are before
1174 // last_backfill (according to the primary, not our
1175 // not-quite-accurate value), and should update the
1176 // collections now. Otherwise, we do it later on push.
1177 update_snaps = true;
1178 }
1179 parent->update_stats(m->pg_stats);
1180 parent->log_operation(
1181 log,
1182 m->updated_hit_set_history,
1183 m->pg_trim_to,
1184 m->pg_roll_forward_to,
1185 update_snaps,
1186 rm->localt);
1187
1188 rm->opt.register_on_commit(
1189 parent->bless_context(
1190 new C_OSD_RepModifyCommit(this, rm)));
1191 rm->localt.register_on_applied(
1192 parent->bless_context(
1193 new C_OSD_RepModifyApply(this, rm)));
1194 vector<ObjectStore::Transaction> tls;
1195 tls.reserve(2);
1196 tls.push_back(std::move(rm->localt));
1197 tls.push_back(std::move(rm->opt));
1198 parent->queue_transactions(tls, op);
1199 // op is cleaned up by oncommit/onapply when both are executed
1200 }
1201
1202 void ReplicatedBackend::repop_applied(RepModifyRef rm)
1203 {
1204 rm->op->mark_event("sub_op_applied");
1205 rm->applied = true;
1206 rm->op->pg_trace.event("sup_op_applied");
1207
1208 dout(10) << __func__ << " on " << rm << " op "
1209 << *rm->op->get_req() << dendl;
1210 const Message *m = rm->op->get_req();
1211 const MOSDRepOp *req = static_cast<const MOSDRepOp*>(m);
1212 eversion_t version = req->version;
1213
1214 // send ack to acker only if we haven't sent a commit already
1215 if (!rm->committed) {
1216 Message *ack = new MOSDRepOpReply(
1217 req, parent->whoami_shard(),
1218 0, get_osdmap()->get_epoch(), req->min_epoch, CEPH_OSD_FLAG_ACK);
1219 ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
1220 ack->trace = rm->op->pg_trace;
1221 get_parent()->send_message_osd_cluster(
1222 rm->ackerosd, ack, get_osdmap()->get_epoch());
1223 }
1224
1225 parent->op_applied(version);
1226 }
1227
1228 void ReplicatedBackend::repop_commit(RepModifyRef rm)
1229 {
1230 rm->op->mark_commit_sent();
1231 rm->op->pg_trace.event("sup_op_commit");
1232 rm->committed = true;
1233
1234 // send commit.
1235 const MOSDRepOp *m = static_cast<const MOSDRepOp*>(rm->op->get_req());
1236 assert(m->get_type() == MSG_OSD_REPOP);
1237 dout(10) << __func__ << " on op " << *m
1238 << ", sending commit to osd." << rm->ackerosd
1239 << dendl;
1240 assert(get_osdmap()->is_up(rm->ackerosd));
1241
1242 get_parent()->update_last_complete_ondisk(rm->last_complete);
1243
1244 MOSDRepOpReply *reply = new MOSDRepOpReply(
1245 m,
1246 get_parent()->whoami_shard(),
1247 0, get_osdmap()->get_epoch(), m->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
1248 reply->set_last_complete_ondisk(rm->last_complete);
1249 reply->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
1250 reply->trace = rm->op->pg_trace;
1251 get_parent()->send_message_osd_cluster(
1252 rm->ackerosd, reply, get_osdmap()->get_epoch());
1253
1254 log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
1255 }
1256
1257
1258 // ===========================================================
1259
1260 void ReplicatedBackend::calc_head_subsets(
1261 ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
1262 const pg_missing_t& missing,
1263 const hobject_t &last_backfill,
1264 interval_set<uint64_t>& data_subset,
1265 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1266 ObcLockManager &manager)
1267 {
1268 dout(10) << "calc_head_subsets " << head
1269 << " clone_overlap " << snapset.clone_overlap << dendl;
1270
1271 uint64_t size = obc->obs.oi.size;
1272 if (size)
1273 data_subset.insert(0, size);
1274
1275 if (get_parent()->get_pool().allow_incomplete_clones()) {
1276 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1277 return;
1278 }
1279
1280 if (!cct->_conf->osd_recover_clone_overlap) {
1281 dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
1282 return;
1283 }
1284
1285
1286 interval_set<uint64_t> cloning;
1287 interval_set<uint64_t> prev;
1288 if (size)
1289 prev.insert(0, size);
1290
1291 for (int j=snapset.clones.size()-1; j>=0; j--) {
1292 hobject_t c = head;
1293 c.snap = snapset.clones[j];
1294 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1295 if (!missing.is_missing(c) &&
1296 c < last_backfill &&
1297 get_parent()->try_lock_for_read(c, manager)) {
1298 dout(10) << "calc_head_subsets " << head << " has prev " << c
1299 << " overlap " << prev << dendl;
1300 clone_subsets[c] = prev;
1301 cloning.union_of(prev);
1302 break;
1303 }
1304 dout(10) << "calc_head_subsets " << head << " does not have prev " << c
1305 << " overlap " << prev << dendl;
1306 }
1307
1308
1309 if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1310 dout(10) << "skipping clone, too many holes" << dendl;
1311 get_parent()->release_locks(manager);
1312 clone_subsets.clear();
1313 cloning.clear();
1314 }
1315
1316 // what's left for us to push?
1317 data_subset.subtract(cloning);
1318
1319 dout(10) << "calc_head_subsets " << head
1320 << " data_subset " << data_subset
1321 << " clone_subsets " << clone_subsets << dendl;
1322 }
1323
1324 void ReplicatedBackend::calc_clone_subsets(
1325 SnapSet& snapset, const hobject_t& soid,
1326 const pg_missing_t& missing,
1327 const hobject_t &last_backfill,
1328 interval_set<uint64_t>& data_subset,
1329 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1330 ObcLockManager &manager)
1331 {
1332 dout(10) << "calc_clone_subsets " << soid
1333 << " clone_overlap " << snapset.clone_overlap << dendl;
1334
1335 uint64_t size = snapset.clone_size[soid.snap];
1336 if (size)
1337 data_subset.insert(0, size);
1338
1339 if (get_parent()->get_pool().allow_incomplete_clones()) {
1340 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1341 return;
1342 }
1343
1344 if (!cct->_conf->osd_recover_clone_overlap) {
1345 dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
1346 return;
1347 }
1348
1349 unsigned i;
1350 for (i=0; i < snapset.clones.size(); i++)
1351 if (snapset.clones[i] == soid.snap)
1352 break;
1353
1354 // any overlap with next older clone?
1355 interval_set<uint64_t> cloning;
1356 interval_set<uint64_t> prev;
1357 if (size)
1358 prev.insert(0, size);
1359 for (int j=i-1; j>=0; j--) {
1360 hobject_t c = soid;
1361 c.snap = snapset.clones[j];
1362 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1363 if (!missing.is_missing(c) &&
1364 c < last_backfill &&
1365 get_parent()->try_lock_for_read(c, manager)) {
1366 dout(10) << "calc_clone_subsets " << soid << " has prev " << c
1367 << " overlap " << prev << dendl;
1368 clone_subsets[c] = prev;
1369 cloning.union_of(prev);
1370 break;
1371 }
1372 dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
1373 << " overlap " << prev << dendl;
1374 }
1375
1376 // overlap with next newest?
1377 interval_set<uint64_t> next;
1378 if (size)
1379 next.insert(0, size);
1380 for (unsigned j=i+1; j<snapset.clones.size(); j++) {
1381 hobject_t c = soid;
1382 c.snap = snapset.clones[j];
1383 next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
1384 if (!missing.is_missing(c) &&
1385 c < last_backfill &&
1386 get_parent()->try_lock_for_read(c, manager)) {
1387 dout(10) << "calc_clone_subsets " << soid << " has next " << c
1388 << " overlap " << next << dendl;
1389 clone_subsets[c] = next;
1390 cloning.union_of(next);
1391 break;
1392 }
1393 dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
1394 << " overlap " << next << dendl;
1395 }
1396
1397 if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1398 dout(10) << "skipping clone, too many holes" << dendl;
1399 get_parent()->release_locks(manager);
1400 clone_subsets.clear();
1401 cloning.clear();
1402 }
1403
1404
1405 // what's left for us to push?
1406 data_subset.subtract(cloning);
1407
1408 dout(10) << "calc_clone_subsets " << soid
1409 << " data_subset " << data_subset
1410 << " clone_subsets " << clone_subsets << dendl;
1411 }
1412
1413 void ReplicatedBackend::prepare_pull(
1414 eversion_t v,
1415 const hobject_t& soid,
1416 ObjectContextRef headctx,
1417 RPGHandle *h)
1418 {
1419 assert(get_parent()->get_local_missing().get_items().count(soid));
1420 eversion_t _v = get_parent()->get_local_missing().get_items().find(
1421 soid)->second.need;
1422 assert(_v == v);
1423 const map<hobject_t, set<pg_shard_t>> &missing_loc(
1424 get_parent()->get_missing_loc_shards());
1425 const map<pg_shard_t, pg_missing_t > &peer_missing(
1426 get_parent()->get_shard_missing());
1427 map<hobject_t, set<pg_shard_t>>::const_iterator q = missing_loc.find(soid);
1428 assert(q != missing_loc.end());
1429 assert(!q->second.empty());
1430
1431 // pick a pullee
1432 vector<pg_shard_t> shuffle(q->second.begin(), q->second.end());
1433 random_shuffle(shuffle.begin(), shuffle.end());
1434 vector<pg_shard_t>::iterator p = shuffle.begin();
1435 assert(get_osdmap()->is_up(p->osd));
1436 pg_shard_t fromshard = *p;
1437
1438 dout(7) << "pull " << soid
1439 << " v " << v
1440 << " on osds " << q->second
1441 << " from osd." << fromshard
1442 << dendl;
1443
1444 assert(peer_missing.count(fromshard));
1445 const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
1446 if (pmissing.is_missing(soid, v)) {
1447 assert(pmissing.get_items().find(soid)->second.have != v);
1448 dout(10) << "pulling soid " << soid << " from osd " << fromshard
1449 << " at version " << pmissing.get_items().find(soid)->second.have
1450 << " rather than at version " << v << dendl;
1451 v = pmissing.get_items().find(soid)->second.have;
1452 assert(get_parent()->get_log().get_log().objects.count(soid) &&
1453 (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
1454 pg_log_entry_t::LOST_REVERT) &&
1455 (get_parent()->get_log().get_log().objects.find(
1456 soid)->second->reverting_to ==
1457 v));
1458 }
1459
1460 ObjectRecoveryInfo recovery_info;
1461 ObcLockManager lock_manager;
1462
1463 if (soid.is_snap()) {
1464 assert(!get_parent()->get_local_missing().is_missing(
1465 soid.get_head()) ||
1466 !get_parent()->get_local_missing().is_missing(
1467 soid.get_snapdir()));
1468 assert(headctx);
1469 // check snapset
1470 SnapSetContext *ssc = headctx->ssc;
1471 assert(ssc);
1472 dout(10) << " snapset " << ssc->snapset << dendl;
1473 recovery_info.ss = ssc->snapset;
1474 calc_clone_subsets(
1475 ssc->snapset, soid, get_parent()->get_local_missing(),
1476 get_info().last_backfill,
1477 recovery_info.copy_subset,
1478 recovery_info.clone_subset,
1479 lock_manager);
1480 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1481 dout(10) << " pulling " << recovery_info << dendl;
1482
1483 assert(ssc->snapset.clone_size.count(soid.snap));
1484 recovery_info.size = ssc->snapset.clone_size[soid.snap];
1485 } else {
1486 // pulling head or unversioned object.
1487 // always pull the whole thing.
1488 recovery_info.copy_subset.insert(0, (uint64_t)-1);
1489 recovery_info.size = ((uint64_t)-1);
1490 }
1491
1492 h->pulls[fromshard].push_back(PullOp());
1493 PullOp &op = h->pulls[fromshard].back();
1494 op.soid = soid;
1495
1496 op.recovery_info = recovery_info;
1497 op.recovery_info.soid = soid;
1498 op.recovery_info.version = v;
1499 op.recovery_progress.data_complete = false;
1500 op.recovery_progress.omap_complete = false;
1501 op.recovery_progress.data_recovered_to = 0;
1502 op.recovery_progress.first = true;
1503
1504 assert(!pulling.count(soid));
1505 pull_from_peer[fromshard].insert(soid);
1506 PullInfo &pi = pulling[soid];
1507 pi.from = fromshard;
1508 pi.soid = soid;
1509 pi.head_ctx = headctx;
1510 pi.recovery_info = op.recovery_info;
1511 pi.recovery_progress = op.recovery_progress;
1512 pi.cache_dont_need = h->cache_dont_need;
1513 pi.lock_manager = std::move(lock_manager);
1514 }
1515
1516 /*
1517 * intelligently push an object to a replica. make use of existing
1518 * clones/heads and dup data ranges where possible.
1519 */
1520 int ReplicatedBackend::prep_push_to_replica(
1521 ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
1522 PushOp *pop, bool cache_dont_need)
1523 {
1524 const object_info_t& oi = obc->obs.oi;
1525 uint64_t size = obc->obs.oi.size;
1526
1527 dout(10) << __func__ << ": " << soid << " v" << oi.version
1528 << " size " << size << " to osd." << peer << dendl;
1529
1530 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1531 interval_set<uint64_t> data_subset;
1532
1533 ObcLockManager lock_manager;
1534 // are we doing a clone on the replica?
1535 if (soid.snap && soid.snap < CEPH_NOSNAP) {
1536 hobject_t head = soid;
1537 head.snap = CEPH_NOSNAP;
1538
1539 // try to base push off of clones that succeed/preceed poid
1540 // we need the head (and current SnapSet) locally to do that.
1541 if (get_parent()->get_local_missing().is_missing(head)) {
1542 dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
1543 return prep_push(obc, soid, peer, pop, cache_dont_need);
1544 }
1545 hobject_t snapdir = head;
1546 snapdir.snap = CEPH_SNAPDIR;
1547 if (get_parent()->get_local_missing().is_missing(snapdir)) {
1548 dout(15) << "push_to_replica missing snapdir " << snapdir
1549 << ", pushing raw clone" << dendl;
1550 return prep_push(obc, soid, peer, pop, cache_dont_need);
1551 }
1552
1553 SnapSetContext *ssc = obc->ssc;
1554 assert(ssc);
1555 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1556 pop->recovery_info.ss = ssc->snapset;
1557 map<pg_shard_t, pg_missing_t>::const_iterator pm =
1558 get_parent()->get_shard_missing().find(peer);
1559 assert(pm != get_parent()->get_shard_missing().end());
1560 map<pg_shard_t, pg_info_t>::const_iterator pi =
1561 get_parent()->get_shard_info().find(peer);
1562 assert(pi != get_parent()->get_shard_info().end());
1563 calc_clone_subsets(
1564 ssc->snapset, soid,
1565 pm->second,
1566 pi->second.last_backfill,
1567 data_subset, clone_subsets,
1568 lock_manager);
1569 } else if (soid.snap == CEPH_NOSNAP) {
1570 // pushing head or unversioned object.
1571 // base this on partially on replica's clones?
1572 SnapSetContext *ssc = obc->ssc;
1573 assert(ssc);
1574 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1575 calc_head_subsets(
1576 obc,
1577 ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
1578 get_parent()->get_shard_info().find(peer)->second.last_backfill,
1579 data_subset, clone_subsets,
1580 lock_manager);
1581 }
1582
1583 return prep_push(
1584 obc,
1585 soid,
1586 peer,
1587 oi.version,
1588 data_subset,
1589 clone_subsets,
1590 pop,
1591 cache_dont_need,
1592 std::move(lock_manager));
1593 }
1594
1595 int ReplicatedBackend::prep_push(ObjectContextRef obc,
1596 const hobject_t& soid, pg_shard_t peer,
1597 PushOp *pop, bool cache_dont_need)
1598 {
1599 interval_set<uint64_t> data_subset;
1600 if (obc->obs.oi.size)
1601 data_subset.insert(0, obc->obs.oi.size);
1602 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1603
1604 return prep_push(obc, soid, peer,
1605 obc->obs.oi.version, data_subset, clone_subsets,
1606 pop, cache_dont_need, ObcLockManager());
1607 }
1608
1609 int ReplicatedBackend::prep_push(
1610 ObjectContextRef obc,
1611 const hobject_t& soid, pg_shard_t peer,
1612 eversion_t version,
1613 interval_set<uint64_t> &data_subset,
1614 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1615 PushOp *pop,
1616 bool cache_dont_need,
1617 ObcLockManager &&lock_manager)
1618 {
1619 get_parent()->begin_peer_recover(peer, soid);
1620 // take note.
1621 PushInfo &pi = pushing[soid][peer];
1622 pi.obc = obc;
1623 pi.recovery_info.size = obc->obs.oi.size;
1624 pi.recovery_info.copy_subset = data_subset;
1625 pi.recovery_info.clone_subset = clone_subsets;
1626 pi.recovery_info.soid = soid;
1627 pi.recovery_info.oi = obc->obs.oi;
1628 pi.recovery_info.ss = pop->recovery_info.ss;
1629 pi.recovery_info.version = version;
1630 pi.lock_manager = std::move(lock_manager);
1631
1632 ObjectRecoveryProgress new_progress;
1633 int r = build_push_op(pi.recovery_info,
1634 pi.recovery_progress,
1635 &new_progress,
1636 pop,
1637 &(pi.stat), cache_dont_need);
1638 if (r < 0)
1639 return r;
1640 pi.recovery_progress = new_progress;
1641 return 0;
1642 }
1643
1644 void ReplicatedBackend::submit_push_data(
1645 const ObjectRecoveryInfo &recovery_info,
1646 bool first,
1647 bool complete,
1648 bool cache_dont_need,
1649 const interval_set<uint64_t> &intervals_included,
1650 bufferlist data_included,
1651 bufferlist omap_header,
1652 const map<string, bufferlist> &attrs,
1653 const map<string, bufferlist> &omap_entries,
1654 ObjectStore::Transaction *t)
1655 {
1656 hobject_t target_oid;
1657 if (first && complete) {
1658 target_oid = recovery_info.soid;
1659 } else {
1660 target_oid = get_parent()->get_temp_recovery_object(recovery_info.soid,
1661 recovery_info.version);
1662 if (first) {
1663 dout(10) << __func__ << ": Adding oid "
1664 << target_oid << " in the temp collection" << dendl;
1665 add_temp_obj(target_oid);
1666 }
1667 }
1668
1669 if (first) {
1670 t->remove(coll, ghobject_t(target_oid));
1671 t->touch(coll, ghobject_t(target_oid));
1672 t->truncate(coll, ghobject_t(target_oid), recovery_info.size);
1673 if (omap_header.length())
1674 t->omap_setheader(coll, ghobject_t(target_oid), omap_header);
1675
1676 bufferlist bv = attrs.at(OI_ATTR);
1677 object_info_t oi(bv);
1678 t->set_alloc_hint(coll, ghobject_t(target_oid),
1679 oi.expected_object_size,
1680 oi.expected_write_size,
1681 oi.alloc_hint_flags);
1682 }
1683 uint64_t off = 0;
1684 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
1685 if (cache_dont_need)
1686 fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
1687 for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
1688 p != intervals_included.end();
1689 ++p) {
1690 bufferlist bit;
1691 bit.substr_of(data_included, off, p.get_len());
1692 t->write(coll, ghobject_t(target_oid),
1693 p.get_start(), p.get_len(), bit, fadvise_flags);
1694 off += p.get_len();
1695 }
1696
1697 if (!omap_entries.empty())
1698 t->omap_setkeys(coll, ghobject_t(target_oid), omap_entries);
1699 if (!attrs.empty())
1700 t->setattrs(coll, ghobject_t(target_oid), attrs);
1701
1702 if (complete) {
1703 if (!first) {
1704 dout(10) << __func__ << ": Removing oid "
1705 << target_oid << " from the temp collection" << dendl;
1706 clear_temp_obj(target_oid);
1707 t->remove(coll, ghobject_t(recovery_info.soid));
1708 t->collection_move_rename(coll, ghobject_t(target_oid),
1709 coll, ghobject_t(recovery_info.soid));
1710 }
1711
1712 submit_push_complete(recovery_info, t);
1713 }
1714 }
1715
1716 void ReplicatedBackend::submit_push_complete(
1717 const ObjectRecoveryInfo &recovery_info,
1718 ObjectStore::Transaction *t)
1719 {
1720 for (map<hobject_t, interval_set<uint64_t>>::const_iterator p =
1721 recovery_info.clone_subset.begin();
1722 p != recovery_info.clone_subset.end();
1723 ++p) {
1724 for (interval_set<uint64_t>::const_iterator q = p->second.begin();
1725 q != p->second.end();
1726 ++q) {
1727 dout(15) << " clone_range " << p->first << " "
1728 << q.get_start() << "~" << q.get_len() << dendl;
1729 t->clone_range(coll, ghobject_t(p->first), ghobject_t(recovery_info.soid),
1730 q.get_start(), q.get_len(), q.get_start());
1731 }
1732 }
1733 }
1734
1735 ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
1736 const ObjectRecoveryInfo& recovery_info,
1737 SnapSetContext *ssc,
1738 ObcLockManager &manager)
1739 {
1740 if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
1741 return recovery_info;
1742 ObjectRecoveryInfo new_info = recovery_info;
1743 new_info.copy_subset.clear();
1744 new_info.clone_subset.clear();
1745 assert(ssc);
1746 get_parent()->release_locks(manager); // might already have locks
1747 calc_clone_subsets(
1748 ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
1749 get_info().last_backfill,
1750 new_info.copy_subset, new_info.clone_subset,
1751 manager);
1752 return new_info;
1753 }
1754
1755 bool ReplicatedBackend::handle_pull_response(
1756 pg_shard_t from, const PushOp &pop, PullOp *response,
1757 list<pull_complete_info> *to_continue,
1758 ObjectStore::Transaction *t)
1759 {
1760 interval_set<uint64_t> data_included = pop.data_included;
1761 bufferlist data;
1762 data = pop.data;
1763 dout(10) << "handle_pull_response "
1764 << pop.recovery_info
1765 << pop.after_progress
1766 << " data.size() is " << data.length()
1767 << " data_included: " << data_included
1768 << dendl;
1769 if (pop.version == eversion_t()) {
1770 // replica doesn't have it!
1771 _failed_pull(from, pop.soid);
1772 return false;
1773 }
1774
1775 const hobject_t &hoid = pop.soid;
1776 assert((data_included.empty() && data.length() == 0) ||
1777 (!data_included.empty() && data.length() > 0));
1778
1779 auto piter = pulling.find(hoid);
1780 if (piter == pulling.end()) {
1781 return false;
1782 }
1783
1784 PullInfo &pi = piter->second;
1785 if (pi.recovery_info.size == (uint64_t(-1))) {
1786 pi.recovery_info.size = pop.recovery_info.size;
1787 pi.recovery_info.copy_subset.intersection_of(
1788 pop.recovery_info.copy_subset);
1789 }
1790 // If primary doesn't have object info and didn't know version
1791 if (pi.recovery_info.version == eversion_t()) {
1792 pi.recovery_info.version = pop.version;
1793 }
1794
1795 bool first = pi.recovery_progress.first;
1796 if (first) {
1797 // attrs only reference the origin bufferlist (decode from
1798 // MOSDPGPush message) whose size is much greater than attrs in
1799 // recovery. If obc cache it (get_obc maybe cache the attr), this
1800 // causes the whole origin bufferlist would not be free until obc
1801 // is evicted from obc cache. So rebuild the bufferlists before
1802 // cache it.
1803 auto attrset = pop.attrset;
1804 for (auto& a : attrset) {
1805 a.second.rebuild();
1806 }
1807 pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset);
1808 pi.recovery_info.oi = pi.obc->obs.oi;
1809 pi.recovery_info = recalc_subsets(
1810 pi.recovery_info,
1811 pi.obc->ssc,
1812 pi.lock_manager);
1813 }
1814
1815
1816 interval_set<uint64_t> usable_intervals;
1817 bufferlist usable_data;
1818 trim_pushed_data(pi.recovery_info.copy_subset,
1819 data_included,
1820 data,
1821 &usable_intervals,
1822 &usable_data);
1823 data_included = usable_intervals;
1824 data.claim(usable_data);
1825
1826
1827 pi.recovery_progress = pop.after_progress;
1828
1829 dout(10) << "new recovery_info " << pi.recovery_info
1830 << ", new progress " << pi.recovery_progress
1831 << dendl;
1832
1833 bool complete = pi.is_complete();
1834
1835 submit_push_data(pi.recovery_info, first,
1836 complete, pi.cache_dont_need,
1837 data_included, data,
1838 pop.omap_header,
1839 pop.attrset,
1840 pop.omap_entries,
1841 t);
1842
1843 pi.stat.num_keys_recovered += pop.omap_entries.size();
1844 pi.stat.num_bytes_recovered += data.length();
1845
1846 if (complete) {
1847 pi.stat.num_objects_recovered++;
1848 clear_pull_from(piter);
1849 to_continue->push_back({hoid, pi.stat});
1850 get_parent()->on_local_recover(
1851 hoid, pi.recovery_info, pi.obc, false, t);
1852 return false;
1853 } else {
1854 response->soid = pop.soid;
1855 response->recovery_info = pi.recovery_info;
1856 response->recovery_progress = pi.recovery_progress;
1857 return true;
1858 }
1859 }
1860
1861 void ReplicatedBackend::handle_push(
1862 pg_shard_t from, const PushOp &pop, PushReplyOp *response,
1863 ObjectStore::Transaction *t)
1864 {
1865 dout(10) << "handle_push "
1866 << pop.recovery_info
1867 << pop.after_progress
1868 << dendl;
1869 bufferlist data;
1870 data = pop.data;
1871 bool first = pop.before_progress.first;
1872 bool complete = pop.after_progress.data_complete &&
1873 pop.after_progress.omap_complete;
1874
1875 response->soid = pop.recovery_info.soid;
1876 submit_push_data(pop.recovery_info,
1877 first,
1878 complete,
1879 true, // must be replicate
1880 pop.data_included,
1881 data,
1882 pop.omap_header,
1883 pop.attrset,
1884 pop.omap_entries,
1885 t);
1886
1887 if (complete)
1888 get_parent()->on_local_recover(
1889 pop.recovery_info.soid,
1890 pop.recovery_info,
1891 ObjectContextRef(), // ok, is replica
1892 false,
1893 t);
1894 }
1895
1896 void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
1897 {
1898 for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
1899 i != pushes.end();
1900 ++i) {
1901 ConnectionRef con = get_parent()->get_con_osd_cluster(
1902 i->first.osd,
1903 get_osdmap()->get_epoch());
1904 if (!con)
1905 continue;
1906 vector<PushOp>::iterator j = i->second.begin();
1907 while (j != i->second.end()) {
1908 uint64_t cost = 0;
1909 uint64_t pushes = 0;
1910 MOSDPGPush *msg = new MOSDPGPush();
1911 msg->from = get_parent()->whoami_shard();
1912 msg->pgid = get_parent()->primary_spg_t();
1913 msg->map_epoch = get_osdmap()->get_epoch();
1914 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1915 msg->set_priority(prio);
1916 for (;
1917 (j != i->second.end() &&
1918 cost < cct->_conf->osd_max_push_cost &&
1919 pushes < cct->_conf->osd_max_push_objects) ;
1920 ++j) {
1921 dout(20) << __func__ << ": sending push " << *j
1922 << " to osd." << i->first << dendl;
1923 cost += j->cost(cct);
1924 pushes += 1;
1925 msg->pushes.push_back(*j);
1926 }
1927 msg->set_cost(cost);
1928 get_parent()->send_message_osd_cluster(msg, con);
1929 }
1930 }
1931 }
1932
1933 void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
1934 {
1935 for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
1936 i != pulls.end();
1937 ++i) {
1938 ConnectionRef con = get_parent()->get_con_osd_cluster(
1939 i->first.osd,
1940 get_osdmap()->get_epoch());
1941 if (!con)
1942 continue;
1943 dout(20) << __func__ << ": sending pulls " << i->second
1944 << " to osd." << i->first << dendl;
1945 MOSDPGPull *msg = new MOSDPGPull();
1946 msg->from = parent->whoami_shard();
1947 msg->set_priority(prio);
1948 msg->pgid = get_parent()->primary_spg_t();
1949 msg->map_epoch = get_osdmap()->get_epoch();
1950 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1951 msg->set_pulls(&i->second);
1952 msg->compute_cost(cct);
1953 get_parent()->send_message_osd_cluster(msg, con);
1954 }
1955 }
1956
1957 int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
1958 const ObjectRecoveryProgress &progress,
1959 ObjectRecoveryProgress *out_progress,
1960 PushOp *out_op,
1961 object_stat_sum_t *stat,
1962 bool cache_dont_need)
1963 {
1964 ObjectRecoveryProgress _new_progress;
1965 if (!out_progress)
1966 out_progress = &_new_progress;
1967 ObjectRecoveryProgress &new_progress = *out_progress;
1968 new_progress = progress;
1969
1970 dout(7) << __func__ << " " << recovery_info.soid
1971 << " v " << recovery_info.version
1972 << " size " << recovery_info.size
1973 << " recovery_info: " << recovery_info
1974 << dendl;
1975
1976 eversion_t v = recovery_info.version;
1977 if (progress.first) {
1978 int r = store->omap_get_header(coll, ghobject_t(recovery_info.soid), &out_op->omap_header);
1979 if(r < 0) {
1980 dout(1) << __func__ << " get omap header failed: " << cpp_strerror(-r) << dendl;
1981 return r;
1982 }
1983 r = store->getattrs(ch, ghobject_t(recovery_info.soid), out_op->attrset);
1984 if(r < 0) {
1985 dout(1) << __func__ << " getattrs failed: " << cpp_strerror(-r) << dendl;
1986 return r;
1987 }
1988
1989 // Debug
1990 bufferlist bv = out_op->attrset[OI_ATTR];
1991 object_info_t oi;
1992 try {
1993 bufferlist::iterator bliter = bv.begin();
1994 ::decode(oi, bliter);
1995 } catch (...) {
1996 dout(0) << __func__ << ": bad object_info_t: " << recovery_info.soid << dendl;
1997 return -EINVAL;
1998 }
1999
2000 // If requestor didn't know the version, use ours
2001 if (v == eversion_t()) {
2002 v = oi.version;
2003 } else if (oi.version != v) {
2004 get_parent()->clog_error() << get_info().pgid << " push "
2005 << recovery_info.soid << " v "
2006 << recovery_info.version
2007 << " failed because local copy is "
2008 << oi.version;
2009 return -EINVAL;
2010 }
2011
2012 new_progress.first = false;
2013 }
2014 // Once we provide the version subsequent requests will have it, so
2015 // at this point it must be known.
2016 assert(v != eversion_t());
2017
2018 uint64_t available = cct->_conf->osd_recovery_max_chunk;
2019 if (!progress.omap_complete) {
2020 ObjectMap::ObjectMapIterator iter =
2021 store->get_omap_iterator(coll,
2022 ghobject_t(recovery_info.soid));
2023 assert(iter);
2024 for (iter->lower_bound(progress.omap_recovered_to);
2025 iter->valid();
2026 iter->next(false)) {
2027 if (!out_op->omap_entries.empty() &&
2028 ((cct->_conf->osd_recovery_max_omap_entries_per_chunk > 0 &&
2029 out_op->omap_entries.size() >= cct->_conf->osd_recovery_max_omap_entries_per_chunk) ||
2030 available <= iter->key().size() + iter->value().length()))
2031 break;
2032 out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
2033
2034 if ((iter->key().size() + iter->value().length()) <= available)
2035 available -= (iter->key().size() + iter->value().length());
2036 else
2037 available = 0;
2038 }
2039 if (!iter->valid())
2040 new_progress.omap_complete = true;
2041 else
2042 new_progress.omap_recovered_to = iter->key();
2043 }
2044
2045 if (available > 0) {
2046 if (!recovery_info.copy_subset.empty()) {
2047 interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
2048 map<uint64_t, uint64_t> m;
2049 int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0,
2050 copy_subset.range_end(), m);
2051 if (r >= 0) {
2052 interval_set<uint64_t> fiemap_included(m);
2053 copy_subset.intersection_of(fiemap_included);
2054 } else {
2055 // intersection of copy_subset and empty interval_set would be empty anyway
2056 copy_subset.clear();
2057 }
2058
2059 out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
2060 available);
2061 if (out_op->data_included.empty()) // zero filled section, skip to end!
2062 new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
2063 else
2064 new_progress.data_recovered_to = out_op->data_included.range_end();
2065 }
2066 } else {
2067 out_op->data_included.clear();
2068 }
2069
2070 for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
2071 p != out_op->data_included.end();
2072 ++p) {
2073 bufferlist bit;
2074 int r = store->read(ch, ghobject_t(recovery_info.soid),
2075 p.get_start(), p.get_len(), bit,
2076 cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
2077 if (cct->_conf->osd_debug_random_push_read_error &&
2078 (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) {
2079 dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
2080 r = -EIO;
2081 }
2082 if (r < 0) {
2083 return r;
2084 }
2085 if (p.get_len() != bit.length()) {
2086 dout(10) << " extent " << p.get_start() << "~" << p.get_len()
2087 << " is actually " << p.get_start() << "~" << bit.length()
2088 << dendl;
2089 interval_set<uint64_t>::iterator save = p++;
2090 if (bit.length() == 0)
2091 out_op->data_included.erase(save); //Remove this empty interval
2092 else
2093 save.set_len(bit.length());
2094 // Remove any other intervals present
2095 while (p != out_op->data_included.end()) {
2096 interval_set<uint64_t>::iterator save = p++;
2097 out_op->data_included.erase(save);
2098 }
2099 new_progress.data_complete = true;
2100 out_op->data.claim_append(bit);
2101 break;
2102 }
2103 out_op->data.claim_append(bit);
2104 }
2105
2106 if (new_progress.is_complete(recovery_info)) {
2107 new_progress.data_complete = true;
2108 if (stat)
2109 stat->num_objects_recovered++;
2110 }
2111
2112 if (stat) {
2113 stat->num_keys_recovered += out_op->omap_entries.size();
2114 stat->num_bytes_recovered += out_op->data.length();
2115 }
2116
2117 get_parent()->get_logger()->inc(l_osd_push);
2118 get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
2119
2120 // send
2121 out_op->version = v;
2122 out_op->soid = recovery_info.soid;
2123 out_op->recovery_info = recovery_info;
2124 out_op->after_progress = new_progress;
2125 out_op->before_progress = progress;
2126 return 0;
2127 }
2128
2129 void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
2130 {
2131 op->recovery_info.version = eversion_t();
2132 op->version = eversion_t();
2133 op->soid = soid;
2134 }
2135
2136 bool ReplicatedBackend::handle_push_reply(
2137 pg_shard_t peer, const PushReplyOp &op, PushOp *reply)
2138 {
2139 const hobject_t &soid = op.soid;
2140 if (pushing.count(soid) == 0) {
2141 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2142 << ", or anybody else"
2143 << dendl;
2144 return false;
2145 } else if (pushing[soid].count(peer) == 0) {
2146 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2147 << dendl;
2148 return false;
2149 } else {
2150 PushInfo *pi = &pushing[soid][peer];
2151 bool error = pushing[soid].begin()->second.recovery_progress.error;
2152
2153 if (!pi->recovery_progress.data_complete && !error) {
2154 dout(10) << " pushing more from, "
2155 << pi->recovery_progress.data_recovered_to
2156 << " of " << pi->recovery_info.copy_subset << dendl;
2157 ObjectRecoveryProgress new_progress;
2158 int r = build_push_op(
2159 pi->recovery_info,
2160 pi->recovery_progress, &new_progress, reply,
2161 &(pi->stat));
2162 // Handle the case of a read error right after we wrote, which is
2163 // hopefuilly extremely rare.
2164 if (r < 0) {
2165 dout(5) << __func__ << ": oid " << soid << " error " << r << dendl;
2166
2167 error = true;
2168 goto done;
2169 }
2170 pi->recovery_progress = new_progress;
2171 return true;
2172 } else {
2173 // done!
2174 done:
2175 if (!error)
2176 get_parent()->on_peer_recover( peer, soid, pi->recovery_info);
2177
2178 get_parent()->release_locks(pi->lock_manager);
2179 object_stat_sum_t stat = pi->stat;
2180 eversion_t v = pi->recovery_info.version;
2181 pushing[soid].erase(peer);
2182 pi = NULL;
2183
2184 if (pushing[soid].empty()) {
2185 if (!error)
2186 get_parent()->on_global_recover(soid, stat, false);
2187 else
2188 get_parent()->on_primary_error(soid, v);
2189 pushing.erase(soid);
2190 } else {
2191 // This looks weird, but we erased the current peer and need to remember
2192 // the error on any other one, while getting more acks.
2193 if (error)
2194 pushing[soid].begin()->second.recovery_progress.error = true;
2195 dout(10) << "pushed " << soid << ", still waiting for push ack from "
2196 << pushing[soid].size() << " others" << dendl;
2197 }
2198 return false;
2199 }
2200 }
2201 }
2202
2203 void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
2204 {
2205 const hobject_t &soid = op.soid;
2206 struct stat st;
2207 int r = store->stat(ch, ghobject_t(soid), &st);
2208 if (r != 0) {
2209 get_parent()->clog_error() << get_info().pgid << " "
2210 << peer << " tried to pull " << soid
2211 << " but got " << cpp_strerror(-r);
2212 prep_push_op_blank(soid, reply);
2213 } else {
2214 ObjectRecoveryInfo &recovery_info = op.recovery_info;
2215 ObjectRecoveryProgress &progress = op.recovery_progress;
2216 if (progress.first && recovery_info.size == ((uint64_t)-1)) {
2217 // Adjust size and copy_subset
2218 recovery_info.size = st.st_size;
2219 recovery_info.copy_subset.clear();
2220 if (st.st_size)
2221 recovery_info.copy_subset.insert(0, st.st_size);
2222 assert(recovery_info.clone_subset.empty());
2223 }
2224
2225 r = build_push_op(recovery_info, progress, 0, reply);
2226 if (r < 0)
2227 prep_push_op_blank(soid, reply);
2228 }
2229 }
2230
2231 /**
2232 * trim received data to remove what we don't want
2233 *
2234 * @param copy_subset intervals we want
2235 * @param data_included intervals we got
2236 * @param data_recieved data we got
2237 * @param intervals_usable intervals we want to keep
2238 * @param data_usable matching data we want to keep
2239 */
2240 void ReplicatedBackend::trim_pushed_data(
2241 const interval_set<uint64_t> &copy_subset,
2242 const interval_set<uint64_t> &intervals_received,
2243 bufferlist data_received,
2244 interval_set<uint64_t> *intervals_usable,
2245 bufferlist *data_usable)
2246 {
2247 if (intervals_received.subset_of(copy_subset)) {
2248 *intervals_usable = intervals_received;
2249 *data_usable = data_received;
2250 return;
2251 }
2252
2253 intervals_usable->intersection_of(copy_subset,
2254 intervals_received);
2255
2256 uint64_t off = 0;
2257 for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
2258 p != intervals_received.end();
2259 ++p) {
2260 interval_set<uint64_t> x;
2261 x.insert(p.get_start(), p.get_len());
2262 x.intersection_of(copy_subset);
2263 for (interval_set<uint64_t>::const_iterator q = x.begin();
2264 q != x.end();
2265 ++q) {
2266 bufferlist sub;
2267 uint64_t data_off = off + (q.get_start() - p.get_start());
2268 sub.substr_of(data_received, data_off, q.get_len());
2269 data_usable->claim_append(sub);
2270 }
2271 off += p.get_len();
2272 }
2273 }
2274
2275 void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid)
2276 {
2277 dout(20) << __func__ << ": " << soid << " from " << from << dendl;
2278 list<pg_shard_t> fl = { from };
2279 get_parent()->failed_push(fl, soid);
2280
2281 clear_pull(pulling.find(soid));
2282 }
2283
2284 void ReplicatedBackend::clear_pull_from(
2285 map<hobject_t, PullInfo>::iterator piter)
2286 {
2287 auto from = piter->second.from;
2288 pull_from_peer[from].erase(piter->second.soid);
2289 if (pull_from_peer[from].empty())
2290 pull_from_peer.erase(from);
2291 }
2292
2293 void ReplicatedBackend::clear_pull(
2294 map<hobject_t, PullInfo>::iterator piter,
2295 bool clear_pull_from_peer)
2296 {
2297 if (clear_pull_from_peer) {
2298 clear_pull_from(piter);
2299 }
2300 get_parent()->release_locks(piter->second.lock_manager);
2301 pulling.erase(piter);
2302 }
2303
2304 int ReplicatedBackend::start_pushes(
2305 const hobject_t &soid,
2306 ObjectContextRef obc,
2307 RPGHandle *h)
2308 {
2309 list< map<pg_shard_t, pg_missing_t>::const_iterator > shards;
2310
2311 dout(20) << __func__ << " soid " << soid << dendl;
2312 // who needs it?
2313 assert(get_parent()->get_actingbackfill_shards().size() > 0);
2314 for (set<pg_shard_t>::iterator i =
2315 get_parent()->get_actingbackfill_shards().begin();
2316 i != get_parent()->get_actingbackfill_shards().end();
2317 ++i) {
2318 if (*i == get_parent()->whoami_shard()) continue;
2319 pg_shard_t peer = *i;
2320 map<pg_shard_t, pg_missing_t>::const_iterator j =
2321 get_parent()->get_shard_missing().find(peer);
2322 assert(j != get_parent()->get_shard_missing().end());
2323 if (j->second.is_missing(soid)) {
2324 shards.push_back(j);
2325 }
2326 }
2327
2328 // If more than 1 read will occur ignore possible request to not cache
2329 bool cache = shards.size() == 1 ? h->cache_dont_need : false;
2330
2331 for (auto j : shards) {
2332 pg_shard_t peer = j->first;
2333 h->pushes[peer].push_back(PushOp());
2334 int r = prep_push_to_replica(obc, soid, peer,
2335 &(h->pushes[peer].back()), cache);
2336 if (r < 0) {
2337 // Back out all failed reads
2338 for (auto k : shards) {
2339 pg_shard_t p = k->first;
2340 dout(10) << __func__ << " clean up peer " << p << dendl;
2341 h->pushes[p].pop_back();
2342 if (p == peer) break;
2343 }
2344 return r;
2345 }
2346 }
2347 return shards.size();
2348 }