]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/ReplicatedBackend.cc
081204a033fd91f4885f18d57b5ead56129a7d20
[ceph.git] / ceph / src / osd / ReplicatedBackend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #include "common/errno.h"
15 #include "ReplicatedBackend.h"
16 #include "messages/MOSDOp.h"
17 #include "messages/MOSDSubOp.h"
18 #include "messages/MOSDRepOp.h"
19 #include "messages/MOSDSubOpReply.h"
20 #include "messages/MOSDRepOpReply.h"
21 #include "messages/MOSDPGPush.h"
22 #include "messages/MOSDPGPull.h"
23 #include "messages/MOSDPGPushReply.h"
24 #include "common/EventTrace.h"
25
26 #define dout_context cct
27 #define dout_subsys ceph_subsys_osd
28 #define DOUT_PREFIX_ARGS this
29 #undef dout_prefix
30 #define dout_prefix _prefix(_dout, this)
31 static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
32 return *_dout << pgb->get_parent()->gen_dbg_prefix();
33 }
34
35 namespace {
36 class PG_SendMessageOnConn: public Context {
37 PGBackend::Listener *pg;
38 Message *reply;
39 ConnectionRef conn;
40 public:
41 PG_SendMessageOnConn(
42 PGBackend::Listener *pg,
43 Message *reply,
44 ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {}
45 void finish(int) override {
46 pg->send_message_osd_cluster(reply, conn.get());
47 }
48 };
49
50 class PG_RecoveryQueueAsync : public Context {
51 PGBackend::Listener *pg;
52 unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
53 public:
54 PG_RecoveryQueueAsync(
55 PGBackend::Listener *pg,
56 GenContext<ThreadPool::TPHandle&> *c) : pg(pg), c(c) {}
57 void finish(int) override {
58 pg->schedule_recovery_work(c.release());
59 }
60 };
61 }
62
63 struct ReplicatedBackend::C_OSD_RepModifyApply : public Context {
64 ReplicatedBackend *pg;
65 RepModifyRef rm;
66 C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r)
67 : pg(pg), rm(r) {}
68 void finish(int r) override {
69 pg->repop_applied(rm);
70 }
71 };
72
73 struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context {
74 ReplicatedBackend *pg;
75 RepModifyRef rm;
76 C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
77 : pg(pg), rm(r) {}
78 void finish(int r) override {
79 pg->repop_commit(rm);
80 }
81 };
82
83 static void log_subop_stats(
84 PerfCounters *logger,
85 OpRequestRef op, int subop)
86 {
87 utime_t now = ceph_clock_now();
88 utime_t latency = now;
89 latency -= op->get_req()->get_recv_stamp();
90
91
92 logger->inc(l_osd_sop);
93 logger->tinc(l_osd_sop_lat, latency);
94 logger->inc(subop);
95
96 if (subop != l_osd_sop_pull) {
97 uint64_t inb = op->get_req()->get_data().length();
98 logger->inc(l_osd_sop_inb, inb);
99 if (subop == l_osd_sop_w) {
100 logger->inc(l_osd_sop_w_inb, inb);
101 logger->tinc(l_osd_sop_w_lat, latency);
102 } else if (subop == l_osd_sop_push) {
103 logger->inc(l_osd_sop_push_inb, inb);
104 logger->tinc(l_osd_sop_push_lat, latency);
105 } else
106 assert("no support subop" == 0);
107 } else {
108 logger->tinc(l_osd_sop_pull_lat, latency);
109 }
110 }
111
112 ReplicatedBackend::ReplicatedBackend(
113 PGBackend::Listener *pg,
114 coll_t coll,
115 ObjectStore::CollectionHandle &c,
116 ObjectStore *store,
117 CephContext *cct) :
118 PGBackend(cct, pg, store, coll, c) {}
119
120 void ReplicatedBackend::run_recovery_op(
121 PGBackend::RecoveryHandle *_h,
122 int priority)
123 {
124 RPGHandle *h = static_cast<RPGHandle *>(_h);
125 send_pushes(priority, h->pushes);
126 send_pulls(priority, h->pulls);
127 send_recovery_deletes(priority, h->deletes);
128 delete h;
129 }
130
131 int ReplicatedBackend::recover_object(
132 const hobject_t &hoid,
133 eversion_t v,
134 ObjectContextRef head,
135 ObjectContextRef obc,
136 RecoveryHandle *_h
137 )
138 {
139 dout(10) << __func__ << ": " << hoid << dendl;
140 RPGHandle *h = static_cast<RPGHandle *>(_h);
141 if (get_parent()->get_local_missing().is_missing(hoid)) {
142 assert(!obc);
143 // pull
144 prepare_pull(
145 v,
146 hoid,
147 head,
148 h);
149 } else {
150 assert(obc);
151 int started = start_pushes(
152 hoid,
153 obc,
154 h);
155 if (started < 0) {
156 pushing[hoid].clear();
157 return started;
158 }
159 }
160 return 0;
161 }
162
163 void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
164 {
165 for(map<pg_shard_t, set<hobject_t> >::iterator i = pull_from_peer.begin();
166 i != pull_from_peer.end();
167 ) {
168 if (osdmap->is_down(i->first.osd)) {
169 dout(10) << "check_recovery_sources resetting pulls from osd." << i->first
170 << ", osdmap has it marked down" << dendl;
171 for (set<hobject_t>::iterator j = i->second.begin();
172 j != i->second.end();
173 ++j) {
174 get_parent()->cancel_pull(*j);
175 clear_pull(pulling.find(*j), false);
176 }
177 pull_from_peer.erase(i++);
178 } else {
179 ++i;
180 }
181 }
182 }
183
184 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op)
185 {
186 dout(10) << __func__ << ": " << op << dendl;
187 switch (op->get_req()->get_type()) {
188 case MSG_OSD_PG_PULL:
189 return true;
190 default:
191 return false;
192 }
193 }
194
195 bool ReplicatedBackend::_handle_message(
196 OpRequestRef op
197 )
198 {
199 dout(10) << __func__ << ": " << op << dendl;
200 switch (op->get_req()->get_type()) {
201 case MSG_OSD_PG_PUSH:
202 do_push(op);
203 return true;
204
205 case MSG_OSD_PG_PULL:
206 do_pull(op);
207 return true;
208
209 case MSG_OSD_PG_PUSH_REPLY:
210 do_push_reply(op);
211 return true;
212
213 case MSG_OSD_SUBOP: {
214 const MOSDSubOp *m = static_cast<const MOSDSubOp*>(op->get_req());
215 if (m->ops.size() == 0) {
216 assert(0);
217 }
218 break;
219 }
220
221 case MSG_OSD_REPOP: {
222 do_repop(op);
223 return true;
224 }
225
226 case MSG_OSD_REPOPREPLY: {
227 do_repop_reply(op);
228 return true;
229 }
230
231 default:
232 break;
233 }
234 return false;
235 }
236
237 void ReplicatedBackend::clear_recovery_state()
238 {
239 // clear pushing/pulling maps
240 for (auto &&i: pushing) {
241 for (auto &&j: i.second) {
242 get_parent()->release_locks(j.second.lock_manager);
243 }
244 }
245 pushing.clear();
246
247 for (auto &&i: pulling) {
248 get_parent()->release_locks(i.second.lock_manager);
249 }
250 pulling.clear();
251 pull_from_peer.clear();
252 }
253
254 void ReplicatedBackend::on_change()
255 {
256 dout(10) << __func__ << dendl;
257 for (map<ceph_tid_t, InProgressOp>::iterator i = in_progress_ops.begin();
258 i != in_progress_ops.end();
259 in_progress_ops.erase(i++)) {
260 if (i->second.on_commit)
261 delete i->second.on_commit;
262 if (i->second.on_applied)
263 delete i->second.on_applied;
264 }
265 clear_recovery_state();
266 }
267
268 void ReplicatedBackend::on_flushed()
269 {
270 }
271
272 int ReplicatedBackend::objects_read_sync(
273 const hobject_t &hoid,
274 uint64_t off,
275 uint64_t len,
276 uint32_t op_flags,
277 bufferlist *bl)
278 {
279 return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags);
280 }
281
282 struct AsyncReadCallback : public GenContext<ThreadPool::TPHandle&> {
283 int r;
284 Context *c;
285 AsyncReadCallback(int r, Context *c) : r(r), c(c) {}
286 void finish(ThreadPool::TPHandle&) override {
287 c->complete(r);
288 c = NULL;
289 }
290 ~AsyncReadCallback() override {
291 delete c;
292 }
293 };
294 void ReplicatedBackend::objects_read_async(
295 const hobject_t &hoid,
296 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
297 pair<bufferlist*, Context*> > > &to_read,
298 Context *on_complete,
299 bool fast_read)
300 {
301 // There is no fast read implementation for replication backend yet
302 assert(!fast_read);
303
304 int r = 0;
305 for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
306 pair<bufferlist*, Context*> > >::const_iterator i =
307 to_read.begin();
308 i != to_read.end() && r >= 0;
309 ++i) {
310 int _r = store->read(ch, ghobject_t(hoid), i->first.get<0>(),
311 i->first.get<1>(), *(i->second.first),
312 i->first.get<2>());
313 if (i->second.second) {
314 get_parent()->schedule_recovery_work(
315 get_parent()->bless_gencontext(
316 new AsyncReadCallback(_r, i->second.second)));
317 }
318 if (_r < 0)
319 r = _r;
320 }
321 get_parent()->schedule_recovery_work(
322 get_parent()->bless_gencontext(
323 new AsyncReadCallback(r, on_complete)));
324 }
325
326 class C_OSD_OnOpCommit : public Context {
327 ReplicatedBackend *pg;
328 ReplicatedBackend::InProgressOp *op;
329 public:
330 C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
331 : pg(pg), op(op) {}
332 void finish(int) override {
333 pg->op_commit(op);
334 }
335 };
336
337 class C_OSD_OnOpApplied : public Context {
338 ReplicatedBackend *pg;
339 ReplicatedBackend::InProgressOp *op;
340 public:
341 C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
342 : pg(pg), op(op) {}
343 void finish(int) override {
344 pg->op_applied(op);
345 }
346 };
347
348 void generate_transaction(
349 PGTransactionUPtr &pgt,
350 const coll_t &coll,
351 bool legacy_log_entries,
352 vector<pg_log_entry_t> &log_entries,
353 ObjectStore::Transaction *t,
354 set<hobject_t> *added,
355 set<hobject_t> *removed)
356 {
357 assert(t);
358 assert(added);
359 assert(removed);
360
361 for (auto &&le: log_entries) {
362 le.mark_unrollbackable();
363 auto oiter = pgt->op_map.find(le.soid);
364 if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) {
365 bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8);
366 ::encode(oiter->second.updated_snaps->second, bl);
367 le.snaps.swap(bl);
368 le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
369 }
370 }
371
372 pgt->safe_create_traverse(
373 [&](pair<const hobject_t, PGTransaction::ObjectOperation> &obj_op) {
374 const hobject_t &oid = obj_op.first;
375 const ghobject_t goid =
376 ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
377 const PGTransaction::ObjectOperation &op = obj_op.second;
378
379 if (oid.is_temp()) {
380 if (op.is_fresh_object()) {
381 added->insert(oid);
382 } else if (op.is_delete()) {
383 removed->insert(oid);
384 }
385 }
386
387 if (op.delete_first) {
388 t->remove(coll, goid);
389 }
390
391 match(
392 op.init_type,
393 [&](const PGTransaction::ObjectOperation::Init::None &) {
394 },
395 [&](const PGTransaction::ObjectOperation::Init::Create &op) {
396 t->touch(coll, goid);
397 },
398 [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
399 t->clone(
400 coll,
401 ghobject_t(
402 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
403 goid);
404 },
405 [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
406 assert(op.source.is_temp());
407 t->collection_move_rename(
408 coll,
409 ghobject_t(
410 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
411 coll,
412 goid);
413 });
414
415 if (op.truncate) {
416 t->truncate(coll, goid, op.truncate->first);
417 if (op.truncate->first != op.truncate->second)
418 t->truncate(coll, goid, op.truncate->second);
419 }
420
421 if (!op.attr_updates.empty()) {
422 map<string, bufferlist> attrs;
423 for (auto &&p: op.attr_updates) {
424 if (p.second)
425 attrs[p.first] = *(p.second);
426 else
427 t->rmattr(coll, goid, p.first);
428 }
429 t->setattrs(coll, goid, attrs);
430 }
431
432 if (op.clear_omap)
433 t->omap_clear(coll, goid);
434 if (op.omap_header)
435 t->omap_setheader(coll, goid, *(op.omap_header));
436
437 for (auto &&up: op.omap_updates) {
438 using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType;
439 switch (up.first) {
440 case UpdateType::Remove:
441 t->omap_rmkeys(coll, goid, up.second);
442 break;
443 case UpdateType::Insert:
444 t->omap_setkeys(coll, goid, up.second);
445 break;
446 }
447 }
448
449 // updated_snaps doesn't matter since we marked unrollbackable
450
451 if (op.alloc_hint) {
452 auto &hint = *(op.alloc_hint);
453 t->set_alloc_hint(
454 coll,
455 goid,
456 hint.expected_object_size,
457 hint.expected_write_size,
458 hint.flags);
459 }
460
461 for (auto &&extent: op.buffer_updates) {
462 using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
463 match(
464 extent.get_val(),
465 [&](const BufferUpdate::Write &op) {
466 t->write(
467 coll,
468 goid,
469 extent.get_off(),
470 extent.get_len(),
471 op.buffer);
472 },
473 [&](const BufferUpdate::Zero &op) {
474 t->zero(
475 coll,
476 goid,
477 extent.get_off(),
478 extent.get_len());
479 },
480 [&](const BufferUpdate::CloneRange &op) {
481 assert(op.len == extent.get_len());
482 t->clone_range(
483 coll,
484 ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
485 goid,
486 op.offset,
487 extent.get_len(),
488 extent.get_off());
489 });
490 }
491 });
492 }
493
494 void ReplicatedBackend::submit_transaction(
495 const hobject_t &soid,
496 const object_stat_sum_t &delta_stats,
497 const eversion_t &at_version,
498 PGTransactionUPtr &&_t,
499 const eversion_t &trim_to,
500 const eversion_t &roll_forward_to,
501 const vector<pg_log_entry_t> &_log_entries,
502 boost::optional<pg_hit_set_history_t> &hset_history,
503 Context *on_local_applied_sync,
504 Context *on_all_acked,
505 Context *on_all_commit,
506 ceph_tid_t tid,
507 osd_reqid_t reqid,
508 OpRequestRef orig_op)
509 {
510 parent->apply_stats(
511 soid,
512 delta_stats);
513
514 vector<pg_log_entry_t> log_entries(_log_entries);
515 ObjectStore::Transaction op_t;
516 PGTransactionUPtr t(std::move(_t));
517 set<hobject_t> added, removed;
518 generate_transaction(
519 t,
520 coll,
521 (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
522 log_entries,
523 &op_t,
524 &added,
525 &removed);
526 assert(added.size() <= 1);
527 assert(removed.size() <= 1);
528
529 assert(!in_progress_ops.count(tid));
530 InProgressOp &op = in_progress_ops.insert(
531 make_pair(
532 tid,
533 InProgressOp(
534 tid, on_all_commit, on_all_acked,
535 orig_op, at_version)
536 )
537 ).first->second;
538
539 op.waiting_for_applied.insert(
540 parent->get_actingbackfill_shards().begin(),
541 parent->get_actingbackfill_shards().end());
542 op.waiting_for_commit.insert(
543 parent->get_actingbackfill_shards().begin(),
544 parent->get_actingbackfill_shards().end());
545
546 issue_op(
547 soid,
548 at_version,
549 tid,
550 reqid,
551 trim_to,
552 at_version,
553 added.size() ? *(added.begin()) : hobject_t(),
554 removed.size() ? *(removed.begin()) : hobject_t(),
555 log_entries,
556 hset_history,
557 &op,
558 op_t);
559
560 add_temp_objs(added);
561 clear_temp_objs(removed);
562
563 parent->log_operation(
564 log_entries,
565 hset_history,
566 trim_to,
567 at_version,
568 true,
569 op_t);
570
571 op_t.register_on_applied_sync(on_local_applied_sync);
572 op_t.register_on_applied(
573 parent->bless_context(
574 new C_OSD_OnOpApplied(this, &op)));
575 op_t.register_on_commit(
576 parent->bless_context(
577 new C_OSD_OnOpCommit(this, &op)));
578
579 vector<ObjectStore::Transaction> tls;
580 tls.push_back(std::move(op_t));
581
582 parent->queue_transactions(tls, op.op);
583 }
584
585 void ReplicatedBackend::op_applied(
586 InProgressOp *op)
587 {
588 FUNCTRACE();
589 OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_APPLIED_BEGIN", true);
590 dout(10) << __func__ << ": " << op->tid << dendl;
591 if (op->op) {
592 op->op->mark_event("op_applied");
593 op->op->pg_trace.event("op applied");
594 }
595
596 op->waiting_for_applied.erase(get_parent()->whoami_shard());
597 parent->op_applied(op->v);
598
599 if (op->waiting_for_applied.empty()) {
600 op->on_applied->complete(0);
601 op->on_applied = 0;
602 }
603 if (op->done()) {
604 assert(!op->on_commit && !op->on_applied);
605 in_progress_ops.erase(op->tid);
606 }
607 }
608
609 void ReplicatedBackend::op_commit(
610 InProgressOp *op)
611 {
612 FUNCTRACE();
613 OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_COMMIT_BEGIN", true);
614 dout(10) << __func__ << ": " << op->tid << dendl;
615 if (op->op) {
616 op->op->mark_event("op_commit");
617 op->op->pg_trace.event("op commit");
618 }
619
620 op->waiting_for_commit.erase(get_parent()->whoami_shard());
621
622 if (op->waiting_for_commit.empty()) {
623 op->on_commit->complete(0);
624 op->on_commit = 0;
625 }
626 if (op->done()) {
627 assert(!op->on_commit && !op->on_applied);
628 in_progress_ops.erase(op->tid);
629 }
630 }
631
632 void ReplicatedBackend::do_repop_reply(OpRequestRef op)
633 {
634 static_cast<MOSDRepOpReply*>(op->get_nonconst_req())->finish_decode();
635 const MOSDRepOpReply *r = static_cast<const MOSDRepOpReply *>(op->get_req());
636 assert(r->get_header().type == MSG_OSD_REPOPREPLY);
637
638 op->mark_started();
639
640 // must be replication.
641 ceph_tid_t rep_tid = r->get_tid();
642 pg_shard_t from = r->from;
643
644 if (in_progress_ops.count(rep_tid)) {
645 map<ceph_tid_t, InProgressOp>::iterator iter =
646 in_progress_ops.find(rep_tid);
647 InProgressOp &ip_op = iter->second;
648 const MOSDOp *m = NULL;
649 if (ip_op.op)
650 m = static_cast<const MOSDOp *>(ip_op.op->get_req());
651
652 if (m)
653 dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
654 << " ack_type " << (int)r->ack_type
655 << " from " << from
656 << dendl;
657 else
658 dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) "
659 << " ack_type " << (int)r->ack_type
660 << " from " << from
661 << dendl;
662
663 // oh, good.
664
665 if (r->ack_type & CEPH_OSD_FLAG_ONDISK) {
666 assert(ip_op.waiting_for_commit.count(from));
667 ip_op.waiting_for_commit.erase(from);
668 if (ip_op.op) {
669 ostringstream ss;
670 ss << "sub_op_commit_rec from " << from;
671 ip_op.op->mark_event_string(ss.str());
672 ip_op.op->pg_trace.event("sub_op_commit_rec");
673 }
674 } else {
675 assert(ip_op.waiting_for_applied.count(from));
676 if (ip_op.op) {
677 ostringstream ss;
678 ss << "sub_op_applied_rec from " << from;
679 ip_op.op->mark_event_string(ss.str());
680 ip_op.op->pg_trace.event("sub_op_applied_rec");
681 }
682 }
683 ip_op.waiting_for_applied.erase(from);
684
685 parent->update_peer_last_complete_ondisk(
686 from,
687 r->get_last_complete_ondisk());
688
689 if (ip_op.waiting_for_applied.empty() &&
690 ip_op.on_applied) {
691 ip_op.on_applied->complete(0);
692 ip_op.on_applied = 0;
693 }
694 if (ip_op.waiting_for_commit.empty() &&
695 ip_op.on_commit) {
696 ip_op.on_commit->complete(0);
697 ip_op.on_commit= 0;
698 }
699 if (ip_op.done()) {
700 assert(!ip_op.on_commit && !ip_op.on_applied);
701 in_progress_ops.erase(iter);
702 }
703 }
704 }
705
706 void ReplicatedBackend::be_deep_scrub(
707 const hobject_t &poid,
708 uint32_t seed,
709 ScrubMap::object &o,
710 ThreadPool::TPHandle &handle)
711 {
712 dout(10) << __func__ << " " << poid << " seed "
713 << std::hex << seed << std::dec << dendl;
714 bufferhash h(seed), oh(seed);
715 bufferlist bl, hdrbl;
716 int r;
717 __u64 pos = 0;
718
719 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
720
721 while (true) {
722 handle.reset_tp_timeout();
723 r = store->read(
724 ch,
725 ghobject_t(
726 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
727 pos,
728 cct->_conf->osd_deep_scrub_stride, bl,
729 fadvise_flags);
730 if (r <= 0)
731 break;
732
733 h << bl;
734 pos += bl.length();
735 bl.clear();
736 }
737 if (r == -EIO) {
738 dout(25) << __func__ << " " << poid << " got "
739 << r << " on read, read_error" << dendl;
740 o.read_error = true;
741 return;
742 }
743 o.digest = h.digest();
744 o.digest_present = true;
745
746 bl.clear();
747 r = store->omap_get_header(
748 coll,
749 ghobject_t(
750 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
751 &hdrbl, true);
752 // NOTE: bobtail to giant, we would crc the head as (len, head).
753 // that changes at the same time we start using a non-zero seed.
754 if (r == 0 && hdrbl.length()) {
755 dout(25) << "CRC header " << string(hdrbl.c_str(), hdrbl.length())
756 << dendl;
757 if (seed == 0) {
758 // legacy
759 bufferlist bl;
760 ::encode(hdrbl, bl);
761 oh << bl;
762 } else {
763 oh << hdrbl;
764 }
765 } else if (r == -EIO) {
766 dout(25) << __func__ << " " << poid << " got "
767 << r << " on omap header read, read_error" << dendl;
768 o.read_error = true;
769 return;
770 }
771
772 ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(
773 coll,
774 ghobject_t(
775 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
776 assert(iter);
777 for (iter->seek_to_first(); iter->status() == 0 && iter->valid();
778 iter->next(false)) {
779 handle.reset_tp_timeout();
780
781 dout(25) << "CRC key " << iter->key() << " value:\n";
782 iter->value().hexdump(*_dout);
783 *_dout << dendl;
784
785 ::encode(iter->key(), bl);
786 ::encode(iter->value(), bl);
787 oh << bl;
788 bl.clear();
789 }
790
791 if (iter->status() < 0) {
792 dout(25) << __func__ << " " << poid
793 << " on omap scan, db status error" << dendl;
794 o.read_error = true;
795 return;
796 }
797
798 //Store final calculated CRC32 of omap header & key/values
799 o.omap_digest = oh.digest();
800 o.omap_digest_present = true;
801 dout(20) << __func__ << " " << poid << " omap_digest "
802 << std::hex << o.omap_digest << std::dec << dendl;
803 }
804
805 void ReplicatedBackend::_do_push(OpRequestRef op)
806 {
807 const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
808 assert(m->get_type() == MSG_OSD_PG_PUSH);
809 pg_shard_t from = m->from;
810
811 op->mark_started();
812
813 vector<PushReplyOp> replies;
814 ObjectStore::Transaction t;
815 ostringstream ss;
816 if (get_parent()->check_failsafe_full(ss)) {
817 dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl;
818 ceph_abort();
819 }
820 for (vector<PushOp>::const_iterator i = m->pushes.begin();
821 i != m->pushes.end();
822 ++i) {
823 replies.push_back(PushReplyOp());
824 handle_push(from, *i, &(replies.back()), &t);
825 }
826
827 MOSDPGPushReply *reply = new MOSDPGPushReply;
828 reply->from = get_parent()->whoami_shard();
829 reply->set_priority(m->get_priority());
830 reply->pgid = get_info().pgid;
831 reply->map_epoch = m->map_epoch;
832 reply->min_epoch = m->min_epoch;
833 reply->replies.swap(replies);
834 reply->compute_cost(cct);
835
836 t.register_on_complete(
837 new PG_SendMessageOnConn(
838 get_parent(), reply, m->get_connection()));
839
840 get_parent()->queue_transaction(std::move(t));
841 }
842
843 struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
844 ReplicatedBackend *bc;
845 list<ReplicatedBackend::pull_complete_info> to_continue;
846 int priority;
847 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
848 : bc(bc), priority(priority) {}
849
850 void finish(ThreadPool::TPHandle &handle) override {
851 ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
852 for (auto &&i: to_continue) {
853 auto j = bc->pulling.find(i.hoid);
854 assert(j != bc->pulling.end());
855 ObjectContextRef obc = j->second.obc;
856 bc->clear_pull(j, false /* already did it */);
857 int started = bc->start_pushes(i.hoid, obc, h);
858 if (started < 0) {
859 bc->pushing[i.hoid].clear();
860 bc->get_parent()->primary_failed(i.hoid);
861 bc->get_parent()->primary_error(i.hoid, obc->obs.oi.version);
862 } else if (!started) {
863 bc->get_parent()->on_global_recover(
864 i.hoid, i.stat, false);
865 }
866 handle.reset_tp_timeout();
867 }
868 bc->run_recovery_op(h, priority);
869 }
870 };
871
872 void ReplicatedBackend::_do_pull_response(OpRequestRef op)
873 {
874 const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
875 assert(m->get_type() == MSG_OSD_PG_PUSH);
876 pg_shard_t from = m->from;
877
878 op->mark_started();
879
880 vector<PullOp> replies(1);
881
882 ostringstream ss;
883 if (get_parent()->check_failsafe_full(ss)) {
884 dout(10) << __func__ << " Out of space (failsafe) processing pull response (push): " << ss.str() << dendl;
885 ceph_abort();
886 }
887
888 ObjectStore::Transaction t;
889 list<pull_complete_info> to_continue;
890 for (vector<PushOp>::const_iterator i = m->pushes.begin();
891 i != m->pushes.end();
892 ++i) {
893 bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, &t);
894 if (more)
895 replies.push_back(PullOp());
896 }
897 if (!to_continue.empty()) {
898 C_ReplicatedBackend_OnPullComplete *c =
899 new C_ReplicatedBackend_OnPullComplete(
900 this,
901 m->get_priority());
902 c->to_continue.swap(to_continue);
903 t.register_on_complete(
904 new PG_RecoveryQueueAsync(
905 get_parent(),
906 get_parent()->bless_gencontext(c)));
907 }
908 replies.erase(replies.end() - 1);
909
910 if (replies.size()) {
911 MOSDPGPull *reply = new MOSDPGPull;
912 reply->from = parent->whoami_shard();
913 reply->set_priority(m->get_priority());
914 reply->pgid = get_info().pgid;
915 reply->map_epoch = m->map_epoch;
916 reply->min_epoch = m->min_epoch;
917 reply->set_pulls(&replies);
918 reply->compute_cost(cct);
919
920 t.register_on_complete(
921 new PG_SendMessageOnConn(
922 get_parent(), reply, m->get_connection()));
923 }
924
925 get_parent()->queue_transaction(std::move(t));
926 }
927
928 void ReplicatedBackend::do_pull(OpRequestRef op)
929 {
930 MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_nonconst_req());
931 assert(m->get_type() == MSG_OSD_PG_PULL);
932 pg_shard_t from = m->from;
933
934 map<pg_shard_t, vector<PushOp> > replies;
935 vector<PullOp> pulls;
936 m->take_pulls(&pulls);
937 for (auto& i : pulls) {
938 replies[from].push_back(PushOp());
939 handle_pull(from, i, &(replies[from].back()));
940 }
941 send_pushes(m->get_priority(), replies);
942 }
943
944 void ReplicatedBackend::do_push_reply(OpRequestRef op)
945 {
946 const MOSDPGPushReply *m = static_cast<const MOSDPGPushReply *>(op->get_req());
947 assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
948 pg_shard_t from = m->from;
949
950 vector<PushOp> replies(1);
951 for (vector<PushReplyOp>::const_iterator i = m->replies.begin();
952 i != m->replies.end();
953 ++i) {
954 bool more = handle_push_reply(from, *i, &(replies.back()));
955 if (more)
956 replies.push_back(PushOp());
957 }
958 replies.erase(replies.end() - 1);
959
960 map<pg_shard_t, vector<PushOp> > _replies;
961 _replies[from].swap(replies);
962 send_pushes(m->get_priority(), _replies);
963 }
964
965 Message * ReplicatedBackend::generate_subop(
966 const hobject_t &soid,
967 const eversion_t &at_version,
968 ceph_tid_t tid,
969 osd_reqid_t reqid,
970 eversion_t pg_trim_to,
971 eversion_t pg_roll_forward_to,
972 hobject_t new_temp_oid,
973 hobject_t discard_temp_oid,
974 const vector<pg_log_entry_t> &log_entries,
975 boost::optional<pg_hit_set_history_t> &hset_hist,
976 ObjectStore::Transaction &op_t,
977 pg_shard_t peer,
978 const pg_info_t &pinfo)
979 {
980 int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
981 // forward the write/update/whatever
982 MOSDRepOp *wr = new MOSDRepOp(
983 reqid, parent->whoami_shard(),
984 spg_t(get_info().pgid.pgid, peer.shard),
985 soid, acks_wanted,
986 get_osdmap()->get_epoch(),
987 parent->get_last_peering_reset_epoch(),
988 tid, at_version);
989
990 // ship resulting transaction, log entries, and pg_stats
991 if (!parent->should_send_op(peer, soid)) {
992 dout(10) << "issue_repop shipping empty opt to osd." << peer
993 <<", object " << soid
994 << " beyond MAX(last_backfill_started "
995 << ", pinfo.last_backfill "
996 << pinfo.last_backfill << ")" << dendl;
997 ObjectStore::Transaction t;
998 ::encode(t, wr->get_data());
999 } else {
1000 ::encode(op_t, wr->get_data());
1001 wr->get_header().data_off = op_t.get_data_alignment();
1002 }
1003
1004 ::encode(log_entries, wr->logbl);
1005
1006 if (pinfo.is_incomplete())
1007 wr->pg_stats = pinfo.stats; // reflects backfill progress
1008 else
1009 wr->pg_stats = get_info().stats;
1010
1011 wr->pg_trim_to = pg_trim_to;
1012 wr->pg_roll_forward_to = pg_roll_forward_to;
1013
1014 wr->new_temp_oid = new_temp_oid;
1015 wr->discard_temp_oid = discard_temp_oid;
1016 wr->updated_hit_set_history = hset_hist;
1017 return wr;
1018 }
1019
1020 void ReplicatedBackend::issue_op(
1021 const hobject_t &soid,
1022 const eversion_t &at_version,
1023 ceph_tid_t tid,
1024 osd_reqid_t reqid,
1025 eversion_t pg_trim_to,
1026 eversion_t pg_roll_forward_to,
1027 hobject_t new_temp_oid,
1028 hobject_t discard_temp_oid,
1029 const vector<pg_log_entry_t> &log_entries,
1030 boost::optional<pg_hit_set_history_t> &hset_hist,
1031 InProgressOp *op,
1032 ObjectStore::Transaction &op_t)
1033 {
1034 if (op->op)
1035 op->op->pg_trace.event("issue replication ops");
1036
1037 if (parent->get_actingbackfill_shards().size() > 1) {
1038 ostringstream ss;
1039 set<pg_shard_t> replicas = parent->get_actingbackfill_shards();
1040 replicas.erase(parent->whoami_shard());
1041 ss << "waiting for subops from " << replicas;
1042 if (op->op)
1043 op->op->mark_sub_op_sent(ss.str());
1044 }
1045 for (set<pg_shard_t>::const_iterator i =
1046 parent->get_actingbackfill_shards().begin();
1047 i != parent->get_actingbackfill_shards().end();
1048 ++i) {
1049 if (*i == parent->whoami_shard()) continue;
1050 pg_shard_t peer = *i;
1051 const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second;
1052
1053 Message *wr;
1054 wr = generate_subop(
1055 soid,
1056 at_version,
1057 tid,
1058 reqid,
1059 pg_trim_to,
1060 pg_roll_forward_to,
1061 new_temp_oid,
1062 discard_temp_oid,
1063 log_entries,
1064 hset_hist,
1065 op_t,
1066 peer,
1067 pinfo);
1068 if (op->op)
1069 wr->trace.init("replicated op", nullptr, &op->op->pg_trace);
1070 get_parent()->send_message_osd_cluster(
1071 peer.osd, wr, get_osdmap()->get_epoch());
1072 }
1073 }
1074
1075 // sub op modify
1076 void ReplicatedBackend::do_repop(OpRequestRef op)
1077 {
1078 static_cast<MOSDRepOp*>(op->get_nonconst_req())->finish_decode();
1079 const MOSDRepOp *m = static_cast<const MOSDRepOp *>(op->get_req());
1080 int msg_type = m->get_type();
1081 assert(MSG_OSD_REPOP == msg_type);
1082
1083 const hobject_t& soid = m->poid;
1084
1085 dout(10) << __func__ << " " << soid
1086 << " v " << m->version
1087 << (m->logbl.length() ? " (transaction)" : " (parallel exec")
1088 << " " << m->logbl.length()
1089 << dendl;
1090
1091 // sanity checks
1092 assert(m->map_epoch >= get_info().history.same_interval_since);
1093
1094 // we better not be missing this.
1095 assert(!parent->get_log().get_missing().is_missing(soid));
1096
1097 int ackerosd = m->get_source().num();
1098
1099 op->mark_started();
1100
1101 RepModifyRef rm(std::make_shared<RepModify>());
1102 rm->op = op;
1103 rm->ackerosd = ackerosd;
1104 rm->last_complete = get_info().last_complete;
1105 rm->epoch_started = get_osdmap()->get_epoch();
1106
1107 assert(m->logbl.length());
1108 // shipped transaction and log entries
1109 vector<pg_log_entry_t> log;
1110
1111 bufferlist::iterator p = const_cast<bufferlist&>(m->get_data()).begin();
1112 ::decode(rm->opt, p);
1113
1114 if (m->new_temp_oid != hobject_t()) {
1115 dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
1116 add_temp_obj(m->new_temp_oid);
1117 }
1118 if (m->discard_temp_oid != hobject_t()) {
1119 dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
1120 if (rm->opt.empty()) {
1121 dout(10) << __func__ << ": removing object " << m->discard_temp_oid
1122 << " since we won't get the transaction" << dendl;
1123 rm->localt.remove(coll, ghobject_t(m->discard_temp_oid));
1124 }
1125 clear_temp_obj(m->discard_temp_oid);
1126 }
1127
1128 p = const_cast<bufferlist&>(m->logbl).begin();
1129 ::decode(log, p);
1130 rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1131
1132 bool update_snaps = false;
1133 if (!rm->opt.empty()) {
1134 // If the opt is non-empty, we infer we are before
1135 // last_backfill (according to the primary, not our
1136 // not-quite-accurate value), and should update the
1137 // collections now. Otherwise, we do it later on push.
1138 update_snaps = true;
1139 }
1140 parent->update_stats(m->pg_stats);
1141 parent->log_operation(
1142 log,
1143 m->updated_hit_set_history,
1144 m->pg_trim_to,
1145 m->pg_roll_forward_to,
1146 update_snaps,
1147 rm->localt);
1148
1149 rm->opt.register_on_commit(
1150 parent->bless_context(
1151 new C_OSD_RepModifyCommit(this, rm)));
1152 rm->localt.register_on_applied(
1153 parent->bless_context(
1154 new C_OSD_RepModifyApply(this, rm)));
1155 vector<ObjectStore::Transaction> tls;
1156 tls.reserve(2);
1157 tls.push_back(std::move(rm->localt));
1158 tls.push_back(std::move(rm->opt));
1159 parent->queue_transactions(tls, op);
1160 // op is cleaned up by oncommit/onapply when both are executed
1161 }
1162
1163 void ReplicatedBackend::repop_applied(RepModifyRef rm)
1164 {
1165 rm->op->mark_event("sub_op_applied");
1166 rm->applied = true;
1167 rm->op->pg_trace.event("sup_op_applied");
1168
1169 dout(10) << __func__ << " on " << rm << " op "
1170 << *rm->op->get_req() << dendl;
1171 const Message *m = rm->op->get_req();
1172 const MOSDRepOp *req = static_cast<const MOSDRepOp*>(m);
1173 eversion_t version = req->version;
1174
1175 // send ack to acker only if we haven't sent a commit already
1176 if (!rm->committed) {
1177 Message *ack = new MOSDRepOpReply(
1178 req, parent->whoami_shard(),
1179 0, get_osdmap()->get_epoch(), req->min_epoch, CEPH_OSD_FLAG_ACK);
1180 ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
1181 ack->trace = rm->op->pg_trace;
1182 get_parent()->send_message_osd_cluster(
1183 rm->ackerosd, ack, get_osdmap()->get_epoch());
1184 }
1185
1186 parent->op_applied(version);
1187 }
1188
1189 void ReplicatedBackend::repop_commit(RepModifyRef rm)
1190 {
1191 rm->op->mark_commit_sent();
1192 rm->op->pg_trace.event("sup_op_commit");
1193 rm->committed = true;
1194
1195 // send commit.
1196 const MOSDRepOp *m = static_cast<const MOSDRepOp*>(rm->op->get_req());
1197 assert(m->get_type() == MSG_OSD_REPOP);
1198 dout(10) << __func__ << " on op " << *m
1199 << ", sending commit to osd." << rm->ackerosd
1200 << dendl;
1201 assert(get_osdmap()->is_up(rm->ackerosd));
1202
1203 get_parent()->update_last_complete_ondisk(rm->last_complete);
1204
1205 MOSDRepOpReply *reply = new MOSDRepOpReply(
1206 m,
1207 get_parent()->whoami_shard(),
1208 0, get_osdmap()->get_epoch(), m->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
1209 reply->set_last_complete_ondisk(rm->last_complete);
1210 reply->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
1211 reply->trace = rm->op->pg_trace;
1212 get_parent()->send_message_osd_cluster(
1213 rm->ackerosd, reply, get_osdmap()->get_epoch());
1214
1215 log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
1216 }
1217
1218
1219 // ===========================================================
1220
1221 void ReplicatedBackend::calc_head_subsets(
1222 ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
1223 const pg_missing_t& missing,
1224 const hobject_t &last_backfill,
1225 interval_set<uint64_t>& data_subset,
1226 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1227 ObcLockManager &manager)
1228 {
1229 dout(10) << "calc_head_subsets " << head
1230 << " clone_overlap " << snapset.clone_overlap << dendl;
1231
1232 uint64_t size = obc->obs.oi.size;
1233 if (size)
1234 data_subset.insert(0, size);
1235
1236 if (get_parent()->get_pool().allow_incomplete_clones()) {
1237 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1238 return;
1239 }
1240
1241 if (!cct->_conf->osd_recover_clone_overlap) {
1242 dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
1243 return;
1244 }
1245
1246
1247 interval_set<uint64_t> cloning;
1248 interval_set<uint64_t> prev;
1249 if (size)
1250 prev.insert(0, size);
1251
1252 for (int j=snapset.clones.size()-1; j>=0; j--) {
1253 hobject_t c = head;
1254 c.snap = snapset.clones[j];
1255 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1256 if (!missing.is_missing(c) &&
1257 c < last_backfill &&
1258 get_parent()->try_lock_for_read(c, manager)) {
1259 dout(10) << "calc_head_subsets " << head << " has prev " << c
1260 << " overlap " << prev << dendl;
1261 clone_subsets[c] = prev;
1262 cloning.union_of(prev);
1263 break;
1264 }
1265 dout(10) << "calc_head_subsets " << head << " does not have prev " << c
1266 << " overlap " << prev << dendl;
1267 }
1268
1269
1270 if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1271 dout(10) << "skipping clone, too many holes" << dendl;
1272 get_parent()->release_locks(manager);
1273 clone_subsets.clear();
1274 cloning.clear();
1275 }
1276
1277 // what's left for us to push?
1278 data_subset.subtract(cloning);
1279
1280 dout(10) << "calc_head_subsets " << head
1281 << " data_subset " << data_subset
1282 << " clone_subsets " << clone_subsets << dendl;
1283 }
1284
1285 void ReplicatedBackend::calc_clone_subsets(
1286 SnapSet& snapset, const hobject_t& soid,
1287 const pg_missing_t& missing,
1288 const hobject_t &last_backfill,
1289 interval_set<uint64_t>& data_subset,
1290 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1291 ObcLockManager &manager)
1292 {
1293 dout(10) << "calc_clone_subsets " << soid
1294 << " clone_overlap " << snapset.clone_overlap << dendl;
1295
1296 uint64_t size = snapset.clone_size[soid.snap];
1297 if (size)
1298 data_subset.insert(0, size);
1299
1300 if (get_parent()->get_pool().allow_incomplete_clones()) {
1301 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1302 return;
1303 }
1304
1305 if (!cct->_conf->osd_recover_clone_overlap) {
1306 dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
1307 return;
1308 }
1309
1310 unsigned i;
1311 for (i=0; i < snapset.clones.size(); i++)
1312 if (snapset.clones[i] == soid.snap)
1313 break;
1314
1315 // any overlap with next older clone?
1316 interval_set<uint64_t> cloning;
1317 interval_set<uint64_t> prev;
1318 if (size)
1319 prev.insert(0, size);
1320 for (int j=i-1; j>=0; j--) {
1321 hobject_t c = soid;
1322 c.snap = snapset.clones[j];
1323 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1324 if (!missing.is_missing(c) &&
1325 c < last_backfill &&
1326 get_parent()->try_lock_for_read(c, manager)) {
1327 dout(10) << "calc_clone_subsets " << soid << " has prev " << c
1328 << " overlap " << prev << dendl;
1329 clone_subsets[c] = prev;
1330 cloning.union_of(prev);
1331 break;
1332 }
1333 dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
1334 << " overlap " << prev << dendl;
1335 }
1336
1337 // overlap with next newest?
1338 interval_set<uint64_t> next;
1339 if (size)
1340 next.insert(0, size);
1341 for (unsigned j=i+1; j<snapset.clones.size(); j++) {
1342 hobject_t c = soid;
1343 c.snap = snapset.clones[j];
1344 next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
1345 if (!missing.is_missing(c) &&
1346 c < last_backfill &&
1347 get_parent()->try_lock_for_read(c, manager)) {
1348 dout(10) << "calc_clone_subsets " << soid << " has next " << c
1349 << " overlap " << next << dendl;
1350 clone_subsets[c] = next;
1351 cloning.union_of(next);
1352 break;
1353 }
1354 dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
1355 << " overlap " << next << dendl;
1356 }
1357
1358 if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1359 dout(10) << "skipping clone, too many holes" << dendl;
1360 get_parent()->release_locks(manager);
1361 clone_subsets.clear();
1362 cloning.clear();
1363 }
1364
1365
1366 // what's left for us to push?
1367 data_subset.subtract(cloning);
1368
1369 dout(10) << "calc_clone_subsets " << soid
1370 << " data_subset " << data_subset
1371 << " clone_subsets " << clone_subsets << dendl;
1372 }
1373
1374 void ReplicatedBackend::prepare_pull(
1375 eversion_t v,
1376 const hobject_t& soid,
1377 ObjectContextRef headctx,
1378 RPGHandle *h)
1379 {
1380 assert(get_parent()->get_local_missing().get_items().count(soid));
1381 eversion_t _v = get_parent()->get_local_missing().get_items().find(
1382 soid)->second.need;
1383 assert(_v == v);
1384 const map<hobject_t, set<pg_shard_t>> &missing_loc(
1385 get_parent()->get_missing_loc_shards());
1386 const map<pg_shard_t, pg_missing_t > &peer_missing(
1387 get_parent()->get_shard_missing());
1388 map<hobject_t, set<pg_shard_t>>::const_iterator q = missing_loc.find(soid);
1389 assert(q != missing_loc.end());
1390 assert(!q->second.empty());
1391
1392 // pick a pullee
1393 vector<pg_shard_t> shuffle(q->second.begin(), q->second.end());
1394 random_shuffle(shuffle.begin(), shuffle.end());
1395 vector<pg_shard_t>::iterator p = shuffle.begin();
1396 assert(get_osdmap()->is_up(p->osd));
1397 pg_shard_t fromshard = *p;
1398
1399 dout(7) << "pull " << soid
1400 << " v " << v
1401 << " on osds " << q->second
1402 << " from osd." << fromshard
1403 << dendl;
1404
1405 assert(peer_missing.count(fromshard));
1406 const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
1407 if (pmissing.is_missing(soid, v)) {
1408 assert(pmissing.get_items().find(soid)->second.have != v);
1409 dout(10) << "pulling soid " << soid << " from osd " << fromshard
1410 << " at version " << pmissing.get_items().find(soid)->second.have
1411 << " rather than at version " << v << dendl;
1412 v = pmissing.get_items().find(soid)->second.have;
1413 assert(get_parent()->get_log().get_log().objects.count(soid) &&
1414 (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
1415 pg_log_entry_t::LOST_REVERT) &&
1416 (get_parent()->get_log().get_log().objects.find(
1417 soid)->second->reverting_to ==
1418 v));
1419 }
1420
1421 ObjectRecoveryInfo recovery_info;
1422 ObcLockManager lock_manager;
1423
1424 if (soid.is_snap()) {
1425 assert(!get_parent()->get_local_missing().is_missing(
1426 soid.get_head()) ||
1427 !get_parent()->get_local_missing().is_missing(
1428 soid.get_snapdir()));
1429 assert(headctx);
1430 // check snapset
1431 SnapSetContext *ssc = headctx->ssc;
1432 assert(ssc);
1433 dout(10) << " snapset " << ssc->snapset << dendl;
1434 recovery_info.ss = ssc->snapset;
1435 calc_clone_subsets(
1436 ssc->snapset, soid, get_parent()->get_local_missing(),
1437 get_info().last_backfill,
1438 recovery_info.copy_subset,
1439 recovery_info.clone_subset,
1440 lock_manager);
1441 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1442 dout(10) << " pulling " << recovery_info << dendl;
1443
1444 assert(ssc->snapset.clone_size.count(soid.snap));
1445 recovery_info.size = ssc->snapset.clone_size[soid.snap];
1446 } else {
1447 // pulling head or unversioned object.
1448 // always pull the whole thing.
1449 recovery_info.copy_subset.insert(0, (uint64_t)-1);
1450 recovery_info.size = ((uint64_t)-1);
1451 }
1452
1453 h->pulls[fromshard].push_back(PullOp());
1454 PullOp &op = h->pulls[fromshard].back();
1455 op.soid = soid;
1456
1457 op.recovery_info = recovery_info;
1458 op.recovery_info.soid = soid;
1459 op.recovery_info.version = v;
1460 op.recovery_progress.data_complete = false;
1461 op.recovery_progress.omap_complete = false;
1462 op.recovery_progress.data_recovered_to = 0;
1463 op.recovery_progress.first = true;
1464
1465 assert(!pulling.count(soid));
1466 pull_from_peer[fromshard].insert(soid);
1467 PullInfo &pi = pulling[soid];
1468 pi.from = fromshard;
1469 pi.soid = soid;
1470 pi.head_ctx = headctx;
1471 pi.recovery_info = op.recovery_info;
1472 pi.recovery_progress = op.recovery_progress;
1473 pi.cache_dont_need = h->cache_dont_need;
1474 pi.lock_manager = std::move(lock_manager);
1475 }
1476
1477 /*
1478 * intelligently push an object to a replica. make use of existing
1479 * clones/heads and dup data ranges where possible.
1480 */
1481 int ReplicatedBackend::prep_push_to_replica(
1482 ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
1483 PushOp *pop, bool cache_dont_need)
1484 {
1485 const object_info_t& oi = obc->obs.oi;
1486 uint64_t size = obc->obs.oi.size;
1487
1488 dout(10) << __func__ << ": " << soid << " v" << oi.version
1489 << " size " << size << " to osd." << peer << dendl;
1490
1491 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1492 interval_set<uint64_t> data_subset;
1493
1494 ObcLockManager lock_manager;
1495 // are we doing a clone on the replica?
1496 if (soid.snap && soid.snap < CEPH_NOSNAP) {
1497 hobject_t head = soid;
1498 head.snap = CEPH_NOSNAP;
1499
1500 // try to base push off of clones that succeed/preceed poid
1501 // we need the head (and current SnapSet) locally to do that.
1502 if (get_parent()->get_local_missing().is_missing(head)) {
1503 dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
1504 return prep_push(obc, soid, peer, pop, cache_dont_need);
1505 }
1506 hobject_t snapdir = head;
1507 snapdir.snap = CEPH_SNAPDIR;
1508 if (get_parent()->get_local_missing().is_missing(snapdir)) {
1509 dout(15) << "push_to_replica missing snapdir " << snapdir
1510 << ", pushing raw clone" << dendl;
1511 return prep_push(obc, soid, peer, pop, cache_dont_need);
1512 }
1513
1514 SnapSetContext *ssc = obc->ssc;
1515 assert(ssc);
1516 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1517 pop->recovery_info.ss = ssc->snapset;
1518 map<pg_shard_t, pg_missing_t>::const_iterator pm =
1519 get_parent()->get_shard_missing().find(peer);
1520 assert(pm != get_parent()->get_shard_missing().end());
1521 map<pg_shard_t, pg_info_t>::const_iterator pi =
1522 get_parent()->get_shard_info().find(peer);
1523 assert(pi != get_parent()->get_shard_info().end());
1524 calc_clone_subsets(
1525 ssc->snapset, soid,
1526 pm->second,
1527 pi->second.last_backfill,
1528 data_subset, clone_subsets,
1529 lock_manager);
1530 } else if (soid.snap == CEPH_NOSNAP) {
1531 // pushing head or unversioned object.
1532 // base this on partially on replica's clones?
1533 SnapSetContext *ssc = obc->ssc;
1534 assert(ssc);
1535 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1536 calc_head_subsets(
1537 obc,
1538 ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
1539 get_parent()->get_shard_info().find(peer)->second.last_backfill,
1540 data_subset, clone_subsets,
1541 lock_manager);
1542 }
1543
1544 return prep_push(
1545 obc,
1546 soid,
1547 peer,
1548 oi.version,
1549 data_subset,
1550 clone_subsets,
1551 pop,
1552 cache_dont_need,
1553 std::move(lock_manager));
1554 }
1555
1556 int ReplicatedBackend::prep_push(ObjectContextRef obc,
1557 const hobject_t& soid, pg_shard_t peer,
1558 PushOp *pop, bool cache_dont_need)
1559 {
1560 interval_set<uint64_t> data_subset;
1561 if (obc->obs.oi.size)
1562 data_subset.insert(0, obc->obs.oi.size);
1563 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1564
1565 return prep_push(obc, soid, peer,
1566 obc->obs.oi.version, data_subset, clone_subsets,
1567 pop, cache_dont_need, ObcLockManager());
1568 }
1569
1570 int ReplicatedBackend::prep_push(
1571 ObjectContextRef obc,
1572 const hobject_t& soid, pg_shard_t peer,
1573 eversion_t version,
1574 interval_set<uint64_t> &data_subset,
1575 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1576 PushOp *pop,
1577 bool cache_dont_need,
1578 ObcLockManager &&lock_manager)
1579 {
1580 get_parent()->begin_peer_recover(peer, soid);
1581 // take note.
1582 PushInfo &pi = pushing[soid][peer];
1583 pi.obc = obc;
1584 pi.recovery_info.size = obc->obs.oi.size;
1585 pi.recovery_info.copy_subset = data_subset;
1586 pi.recovery_info.clone_subset = clone_subsets;
1587 pi.recovery_info.soid = soid;
1588 pi.recovery_info.oi = obc->obs.oi;
1589 pi.recovery_info.ss = pop->recovery_info.ss;
1590 pi.recovery_info.version = version;
1591 pi.lock_manager = std::move(lock_manager);
1592
1593 ObjectRecoveryProgress new_progress;
1594 int r = build_push_op(pi.recovery_info,
1595 pi.recovery_progress,
1596 &new_progress,
1597 pop,
1598 &(pi.stat), cache_dont_need);
1599 if (r < 0)
1600 return r;
1601 pi.recovery_progress = new_progress;
1602 return 0;
1603 }
1604
1605 void ReplicatedBackend::submit_push_data(
1606 const ObjectRecoveryInfo &recovery_info,
1607 bool first,
1608 bool complete,
1609 bool cache_dont_need,
1610 const interval_set<uint64_t> &intervals_included,
1611 bufferlist data_included,
1612 bufferlist omap_header,
1613 const map<string, bufferlist> &attrs,
1614 const map<string, bufferlist> &omap_entries,
1615 ObjectStore::Transaction *t)
1616 {
1617 hobject_t target_oid;
1618 if (first && complete) {
1619 target_oid = recovery_info.soid;
1620 } else {
1621 target_oid = get_parent()->get_temp_recovery_object(recovery_info.soid,
1622 recovery_info.version);
1623 if (first) {
1624 dout(10) << __func__ << ": Adding oid "
1625 << target_oid << " in the temp collection" << dendl;
1626 add_temp_obj(target_oid);
1627 }
1628 }
1629
1630 if (first) {
1631 t->remove(coll, ghobject_t(target_oid));
1632 t->touch(coll, ghobject_t(target_oid));
1633 t->truncate(coll, ghobject_t(target_oid), recovery_info.size);
1634 if (omap_header.length())
1635 t->omap_setheader(coll, ghobject_t(target_oid), omap_header);
1636
1637 bufferlist bv = attrs.at(OI_ATTR);
1638 object_info_t oi(bv);
1639 t->set_alloc_hint(coll, ghobject_t(target_oid),
1640 oi.expected_object_size,
1641 oi.expected_write_size,
1642 oi.alloc_hint_flags);
1643 }
1644 uint64_t off = 0;
1645 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
1646 if (cache_dont_need)
1647 fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
1648 for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
1649 p != intervals_included.end();
1650 ++p) {
1651 bufferlist bit;
1652 bit.substr_of(data_included, off, p.get_len());
1653 t->write(coll, ghobject_t(target_oid),
1654 p.get_start(), p.get_len(), bit, fadvise_flags);
1655 off += p.get_len();
1656 }
1657
1658 if (!omap_entries.empty())
1659 t->omap_setkeys(coll, ghobject_t(target_oid), omap_entries);
1660 if (!attrs.empty())
1661 t->setattrs(coll, ghobject_t(target_oid), attrs);
1662
1663 if (complete) {
1664 if (!first) {
1665 dout(10) << __func__ << ": Removing oid "
1666 << target_oid << " from the temp collection" << dendl;
1667 clear_temp_obj(target_oid);
1668 t->remove(coll, ghobject_t(recovery_info.soid));
1669 t->collection_move_rename(coll, ghobject_t(target_oid),
1670 coll, ghobject_t(recovery_info.soid));
1671 }
1672
1673 submit_push_complete(recovery_info, t);
1674 }
1675 }
1676
1677 void ReplicatedBackend::submit_push_complete(
1678 const ObjectRecoveryInfo &recovery_info,
1679 ObjectStore::Transaction *t)
1680 {
1681 for (map<hobject_t, interval_set<uint64_t>>::const_iterator p =
1682 recovery_info.clone_subset.begin();
1683 p != recovery_info.clone_subset.end();
1684 ++p) {
1685 for (interval_set<uint64_t>::const_iterator q = p->second.begin();
1686 q != p->second.end();
1687 ++q) {
1688 dout(15) << " clone_range " << p->first << " "
1689 << q.get_start() << "~" << q.get_len() << dendl;
1690 t->clone_range(coll, ghobject_t(p->first), ghobject_t(recovery_info.soid),
1691 q.get_start(), q.get_len(), q.get_start());
1692 }
1693 }
1694 }
1695
1696 ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
1697 const ObjectRecoveryInfo& recovery_info,
1698 SnapSetContext *ssc,
1699 ObcLockManager &manager)
1700 {
1701 if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
1702 return recovery_info;
1703 ObjectRecoveryInfo new_info = recovery_info;
1704 new_info.copy_subset.clear();
1705 new_info.clone_subset.clear();
1706 assert(ssc);
1707 get_parent()->release_locks(manager); // might already have locks
1708 calc_clone_subsets(
1709 ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
1710 get_info().last_backfill,
1711 new_info.copy_subset, new_info.clone_subset,
1712 manager);
1713 return new_info;
1714 }
1715
1716 bool ReplicatedBackend::handle_pull_response(
1717 pg_shard_t from, const PushOp &pop, PullOp *response,
1718 list<pull_complete_info> *to_continue,
1719 ObjectStore::Transaction *t)
1720 {
1721 interval_set<uint64_t> data_included = pop.data_included;
1722 bufferlist data;
1723 data = pop.data;
1724 dout(10) << "handle_pull_response "
1725 << pop.recovery_info
1726 << pop.after_progress
1727 << " data.size() is " << data.length()
1728 << " data_included: " << data_included
1729 << dendl;
1730 if (pop.version == eversion_t()) {
1731 // replica doesn't have it!
1732 _failed_pull(from, pop.soid);
1733 return false;
1734 }
1735
1736 const hobject_t &hoid = pop.soid;
1737 assert((data_included.empty() && data.length() == 0) ||
1738 (!data_included.empty() && data.length() > 0));
1739
1740 auto piter = pulling.find(hoid);
1741 if (piter == pulling.end()) {
1742 return false;
1743 }
1744
1745 PullInfo &pi = piter->second;
1746 if (pi.recovery_info.size == (uint64_t(-1))) {
1747 pi.recovery_info.size = pop.recovery_info.size;
1748 pi.recovery_info.copy_subset.intersection_of(
1749 pop.recovery_info.copy_subset);
1750 }
1751 // If primary doesn't have object info and didn't know version
1752 if (pi.recovery_info.version == eversion_t()) {
1753 pi.recovery_info.version = pop.version;
1754 }
1755
1756 bool first = pi.recovery_progress.first;
1757 if (first) {
1758 // attrs only reference the origin bufferlist (decode from
1759 // MOSDPGPush message) whose size is much greater than attrs in
1760 // recovery. If obc cache it (get_obc maybe cache the attr), this
1761 // causes the whole origin bufferlist would not be free until obc
1762 // is evicted from obc cache. So rebuild the bufferlists before
1763 // cache it.
1764 auto attrset = pop.attrset;
1765 for (auto& a : attrset) {
1766 a.second.rebuild();
1767 }
1768 pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset);
1769 pi.recovery_info.oi = pi.obc->obs.oi;
1770 pi.recovery_info = recalc_subsets(
1771 pi.recovery_info,
1772 pi.obc->ssc,
1773 pi.lock_manager);
1774 }
1775
1776
1777 interval_set<uint64_t> usable_intervals;
1778 bufferlist usable_data;
1779 trim_pushed_data(pi.recovery_info.copy_subset,
1780 data_included,
1781 data,
1782 &usable_intervals,
1783 &usable_data);
1784 data_included = usable_intervals;
1785 data.claim(usable_data);
1786
1787
1788 pi.recovery_progress = pop.after_progress;
1789
1790 dout(10) << "new recovery_info " << pi.recovery_info
1791 << ", new progress " << pi.recovery_progress
1792 << dendl;
1793
1794 bool complete = pi.is_complete();
1795
1796 submit_push_data(pi.recovery_info, first,
1797 complete, pi.cache_dont_need,
1798 data_included, data,
1799 pop.omap_header,
1800 pop.attrset,
1801 pop.omap_entries,
1802 t);
1803
1804 pi.stat.num_keys_recovered += pop.omap_entries.size();
1805 pi.stat.num_bytes_recovered += data.length();
1806
1807 if (complete) {
1808 pi.stat.num_objects_recovered++;
1809 clear_pull_from(piter);
1810 to_continue->push_back({hoid, pi.stat});
1811 get_parent()->on_local_recover(
1812 hoid, pi.recovery_info, pi.obc, false, t);
1813 return false;
1814 } else {
1815 response->soid = pop.soid;
1816 response->recovery_info = pi.recovery_info;
1817 response->recovery_progress = pi.recovery_progress;
1818 return true;
1819 }
1820 }
1821
1822 void ReplicatedBackend::handle_push(
1823 pg_shard_t from, const PushOp &pop, PushReplyOp *response,
1824 ObjectStore::Transaction *t)
1825 {
1826 dout(10) << "handle_push "
1827 << pop.recovery_info
1828 << pop.after_progress
1829 << dendl;
1830 bufferlist data;
1831 data = pop.data;
1832 bool first = pop.before_progress.first;
1833 bool complete = pop.after_progress.data_complete &&
1834 pop.after_progress.omap_complete;
1835
1836 response->soid = pop.recovery_info.soid;
1837 submit_push_data(pop.recovery_info,
1838 first,
1839 complete,
1840 true, // must be replicate
1841 pop.data_included,
1842 data,
1843 pop.omap_header,
1844 pop.attrset,
1845 pop.omap_entries,
1846 t);
1847
1848 if (complete)
1849 get_parent()->on_local_recover(
1850 pop.recovery_info.soid,
1851 pop.recovery_info,
1852 ObjectContextRef(), // ok, is replica
1853 false,
1854 t);
1855 }
1856
1857 void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
1858 {
1859 for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
1860 i != pushes.end();
1861 ++i) {
1862 ConnectionRef con = get_parent()->get_con_osd_cluster(
1863 i->first.osd,
1864 get_osdmap()->get_epoch());
1865 if (!con)
1866 continue;
1867 vector<PushOp>::iterator j = i->second.begin();
1868 while (j != i->second.end()) {
1869 uint64_t cost = 0;
1870 uint64_t pushes = 0;
1871 MOSDPGPush *msg = new MOSDPGPush();
1872 msg->from = get_parent()->whoami_shard();
1873 msg->pgid = get_parent()->primary_spg_t();
1874 msg->map_epoch = get_osdmap()->get_epoch();
1875 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1876 msg->set_priority(prio);
1877 for (;
1878 (j != i->second.end() &&
1879 cost < cct->_conf->osd_max_push_cost &&
1880 pushes < cct->_conf->osd_max_push_objects) ;
1881 ++j) {
1882 dout(20) << __func__ << ": sending push " << *j
1883 << " to osd." << i->first << dendl;
1884 cost += j->cost(cct);
1885 pushes += 1;
1886 msg->pushes.push_back(*j);
1887 }
1888 msg->set_cost(cost);
1889 get_parent()->send_message_osd_cluster(msg, con);
1890 }
1891 }
1892 }
1893
1894 void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
1895 {
1896 for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
1897 i != pulls.end();
1898 ++i) {
1899 ConnectionRef con = get_parent()->get_con_osd_cluster(
1900 i->first.osd,
1901 get_osdmap()->get_epoch());
1902 if (!con)
1903 continue;
1904 dout(20) << __func__ << ": sending pulls " << i->second
1905 << " to osd." << i->first << dendl;
1906 MOSDPGPull *msg = new MOSDPGPull();
1907 msg->from = parent->whoami_shard();
1908 msg->set_priority(prio);
1909 msg->pgid = get_parent()->primary_spg_t();
1910 msg->map_epoch = get_osdmap()->get_epoch();
1911 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1912 msg->set_pulls(&i->second);
1913 msg->compute_cost(cct);
1914 get_parent()->send_message_osd_cluster(msg, con);
1915 }
1916 }
1917
1918 int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
1919 const ObjectRecoveryProgress &progress,
1920 ObjectRecoveryProgress *out_progress,
1921 PushOp *out_op,
1922 object_stat_sum_t *stat,
1923 bool cache_dont_need)
1924 {
1925 ObjectRecoveryProgress _new_progress;
1926 if (!out_progress)
1927 out_progress = &_new_progress;
1928 ObjectRecoveryProgress &new_progress = *out_progress;
1929 new_progress = progress;
1930
1931 dout(7) << __func__ << " " << recovery_info.soid
1932 << " v " << recovery_info.version
1933 << " size " << recovery_info.size
1934 << " recovery_info: " << recovery_info
1935 << dendl;
1936
1937 eversion_t v = recovery_info.version;
1938 if (progress.first) {
1939 int r = store->omap_get_header(coll, ghobject_t(recovery_info.soid), &out_op->omap_header);
1940 if(r < 0) {
1941 dout(1) << __func__ << " get omap header failed: " << cpp_strerror(-r) << dendl;
1942 return r;
1943 }
1944 r = store->getattrs(ch, ghobject_t(recovery_info.soid), out_op->attrset);
1945 if(r < 0) {
1946 dout(1) << __func__ << " getattrs failed: " << cpp_strerror(-r) << dendl;
1947 return r;
1948 }
1949
1950 // Debug
1951 bufferlist bv = out_op->attrset[OI_ATTR];
1952 object_info_t oi;
1953 try {
1954 bufferlist::iterator bliter = bv.begin();
1955 ::decode(oi, bliter);
1956 } catch (...) {
1957 dout(0) << __func__ << ": bad object_info_t: " << recovery_info.soid << dendl;
1958 return -EINVAL;
1959 }
1960
1961 // If requestor didn't know the version, use ours
1962 if (v == eversion_t()) {
1963 v = oi.version;
1964 } else if (oi.version != v) {
1965 get_parent()->clog_error() << get_info().pgid << " push "
1966 << recovery_info.soid << " v "
1967 << recovery_info.version
1968 << " failed because local copy is "
1969 << oi.version;
1970 return -EINVAL;
1971 }
1972
1973 new_progress.first = false;
1974 }
1975 // Once we provide the version subsequent requests will have it, so
1976 // at this point it must be known.
1977 assert(v != eversion_t());
1978
1979 uint64_t available = cct->_conf->osd_recovery_max_chunk;
1980 if (!progress.omap_complete) {
1981 ObjectMap::ObjectMapIterator iter =
1982 store->get_omap_iterator(coll,
1983 ghobject_t(recovery_info.soid));
1984 assert(iter);
1985 for (iter->lower_bound(progress.omap_recovered_to);
1986 iter->valid();
1987 iter->next(false)) {
1988 if (!out_op->omap_entries.empty() &&
1989 ((cct->_conf->osd_recovery_max_omap_entries_per_chunk > 0 &&
1990 out_op->omap_entries.size() >= cct->_conf->osd_recovery_max_omap_entries_per_chunk) ||
1991 available <= iter->key().size() + iter->value().length()))
1992 break;
1993 out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
1994
1995 if ((iter->key().size() + iter->value().length()) <= available)
1996 available -= (iter->key().size() + iter->value().length());
1997 else
1998 available = 0;
1999 }
2000 if (!iter->valid())
2001 new_progress.omap_complete = true;
2002 else
2003 new_progress.omap_recovered_to = iter->key();
2004 }
2005
2006 if (available > 0) {
2007 if (!recovery_info.copy_subset.empty()) {
2008 interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
2009 map<uint64_t, uint64_t> m;
2010 int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0,
2011 copy_subset.range_end(), m);
2012 if (r >= 0) {
2013 interval_set<uint64_t> fiemap_included(m);
2014 copy_subset.intersection_of(fiemap_included);
2015 } else {
2016 // intersection of copy_subset and empty interval_set would be empty anyway
2017 copy_subset.clear();
2018 }
2019
2020 out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
2021 available);
2022 if (out_op->data_included.empty()) // zero filled section, skip to end!
2023 new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
2024 else
2025 new_progress.data_recovered_to = out_op->data_included.range_end();
2026 }
2027 } else {
2028 out_op->data_included.clear();
2029 }
2030
2031 for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
2032 p != out_op->data_included.end();
2033 ++p) {
2034 bufferlist bit;
2035 int r = store->read(ch, ghobject_t(recovery_info.soid),
2036 p.get_start(), p.get_len(), bit,
2037 cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
2038 if (cct->_conf->osd_debug_random_push_read_error &&
2039 (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) {
2040 dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
2041 r = -EIO;
2042 }
2043 if (r < 0) {
2044 return r;
2045 }
2046 if (p.get_len() != bit.length()) {
2047 dout(10) << " extent " << p.get_start() << "~" << p.get_len()
2048 << " is actually " << p.get_start() << "~" << bit.length()
2049 << dendl;
2050 interval_set<uint64_t>::iterator save = p++;
2051 if (bit.length() == 0)
2052 out_op->data_included.erase(save); //Remove this empty interval
2053 else
2054 save.set_len(bit.length());
2055 // Remove any other intervals present
2056 while (p != out_op->data_included.end()) {
2057 interval_set<uint64_t>::iterator save = p++;
2058 out_op->data_included.erase(save);
2059 }
2060 new_progress.data_complete = true;
2061 out_op->data.claim_append(bit);
2062 break;
2063 }
2064 out_op->data.claim_append(bit);
2065 }
2066
2067 if (new_progress.is_complete(recovery_info)) {
2068 new_progress.data_complete = true;
2069 if (stat)
2070 stat->num_objects_recovered++;
2071 }
2072
2073 if (stat) {
2074 stat->num_keys_recovered += out_op->omap_entries.size();
2075 stat->num_bytes_recovered += out_op->data.length();
2076 }
2077
2078 get_parent()->get_logger()->inc(l_osd_push);
2079 get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
2080
2081 // send
2082 out_op->version = v;
2083 out_op->soid = recovery_info.soid;
2084 out_op->recovery_info = recovery_info;
2085 out_op->after_progress = new_progress;
2086 out_op->before_progress = progress;
2087 return 0;
2088 }
2089
2090 void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
2091 {
2092 op->recovery_info.version = eversion_t();
2093 op->version = eversion_t();
2094 op->soid = soid;
2095 }
2096
2097 bool ReplicatedBackend::handle_push_reply(
2098 pg_shard_t peer, const PushReplyOp &op, PushOp *reply)
2099 {
2100 const hobject_t &soid = op.soid;
2101 if (pushing.count(soid) == 0) {
2102 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2103 << ", or anybody else"
2104 << dendl;
2105 return false;
2106 } else if (pushing[soid].count(peer) == 0) {
2107 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2108 << dendl;
2109 return false;
2110 } else {
2111 PushInfo *pi = &pushing[soid][peer];
2112 bool error = pushing[soid].begin()->second.recovery_progress.error;
2113
2114 if (!pi->recovery_progress.data_complete && !error) {
2115 dout(10) << " pushing more from, "
2116 << pi->recovery_progress.data_recovered_to
2117 << " of " << pi->recovery_info.copy_subset << dendl;
2118 ObjectRecoveryProgress new_progress;
2119 int r = build_push_op(
2120 pi->recovery_info,
2121 pi->recovery_progress, &new_progress, reply,
2122 &(pi->stat));
2123 // Handle the case of a read error right after we wrote, which is
2124 // hopefuilly extremely rare.
2125 if (r < 0) {
2126 dout(5) << __func__ << ": oid " << soid << " error " << r << dendl;
2127
2128 error = true;
2129 goto done;
2130 }
2131 pi->recovery_progress = new_progress;
2132 return true;
2133 } else {
2134 // done!
2135 done:
2136 if (!error)
2137 get_parent()->on_peer_recover( peer, soid, pi->recovery_info);
2138
2139 get_parent()->release_locks(pi->lock_manager);
2140 object_stat_sum_t stat = pi->stat;
2141 eversion_t v = pi->recovery_info.version;
2142 pushing[soid].erase(peer);
2143 pi = NULL;
2144
2145 if (pushing[soid].empty()) {
2146 if (!error)
2147 get_parent()->on_global_recover(soid, stat, false);
2148 else
2149 get_parent()->on_primary_error(soid, v);
2150 pushing.erase(soid);
2151 } else {
2152 // This looks weird, but we erased the current peer and need to remember
2153 // the error on any other one, while getting more acks.
2154 if (error)
2155 pushing[soid].begin()->second.recovery_progress.error = true;
2156 dout(10) << "pushed " << soid << ", still waiting for push ack from "
2157 << pushing[soid].size() << " others" << dendl;
2158 }
2159 return false;
2160 }
2161 }
2162 }
2163
2164 void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
2165 {
2166 const hobject_t &soid = op.soid;
2167 struct stat st;
2168 int r = store->stat(ch, ghobject_t(soid), &st);
2169 if (r != 0) {
2170 get_parent()->clog_error() << get_info().pgid << " "
2171 << peer << " tried to pull " << soid
2172 << " but got " << cpp_strerror(-r);
2173 prep_push_op_blank(soid, reply);
2174 } else {
2175 ObjectRecoveryInfo &recovery_info = op.recovery_info;
2176 ObjectRecoveryProgress &progress = op.recovery_progress;
2177 if (progress.first && recovery_info.size == ((uint64_t)-1)) {
2178 // Adjust size and copy_subset
2179 recovery_info.size = st.st_size;
2180 recovery_info.copy_subset.clear();
2181 if (st.st_size)
2182 recovery_info.copy_subset.insert(0, st.st_size);
2183 assert(recovery_info.clone_subset.empty());
2184 }
2185
2186 r = build_push_op(recovery_info, progress, 0, reply);
2187 if (r < 0)
2188 prep_push_op_blank(soid, reply);
2189 }
2190 }
2191
2192 /**
2193 * trim received data to remove what we don't want
2194 *
2195 * @param copy_subset intervals we want
2196 * @param data_included intervals we got
2197 * @param data_recieved data we got
2198 * @param intervals_usable intervals we want to keep
2199 * @param data_usable matching data we want to keep
2200 */
2201 void ReplicatedBackend::trim_pushed_data(
2202 const interval_set<uint64_t> &copy_subset,
2203 const interval_set<uint64_t> &intervals_received,
2204 bufferlist data_received,
2205 interval_set<uint64_t> *intervals_usable,
2206 bufferlist *data_usable)
2207 {
2208 if (intervals_received.subset_of(copy_subset)) {
2209 *intervals_usable = intervals_received;
2210 *data_usable = data_received;
2211 return;
2212 }
2213
2214 intervals_usable->intersection_of(copy_subset,
2215 intervals_received);
2216
2217 uint64_t off = 0;
2218 for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
2219 p != intervals_received.end();
2220 ++p) {
2221 interval_set<uint64_t> x;
2222 x.insert(p.get_start(), p.get_len());
2223 x.intersection_of(copy_subset);
2224 for (interval_set<uint64_t>::const_iterator q = x.begin();
2225 q != x.end();
2226 ++q) {
2227 bufferlist sub;
2228 uint64_t data_off = off + (q.get_start() - p.get_start());
2229 sub.substr_of(data_received, data_off, q.get_len());
2230 data_usable->claim_append(sub);
2231 }
2232 off += p.get_len();
2233 }
2234 }
2235
2236 void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid)
2237 {
2238 dout(20) << __func__ << ": " << soid << " from " << from << dendl;
2239 list<pg_shard_t> fl = { from };
2240 get_parent()->failed_push(fl, soid);
2241
2242 clear_pull(pulling.find(soid));
2243 }
2244
2245 void ReplicatedBackend::clear_pull_from(
2246 map<hobject_t, PullInfo>::iterator piter)
2247 {
2248 auto from = piter->second.from;
2249 pull_from_peer[from].erase(piter->second.soid);
2250 if (pull_from_peer[from].empty())
2251 pull_from_peer.erase(from);
2252 }
2253
2254 void ReplicatedBackend::clear_pull(
2255 map<hobject_t, PullInfo>::iterator piter,
2256 bool clear_pull_from_peer)
2257 {
2258 if (clear_pull_from_peer) {
2259 clear_pull_from(piter);
2260 }
2261 get_parent()->release_locks(piter->second.lock_manager);
2262 pulling.erase(piter);
2263 }
2264
2265 int ReplicatedBackend::start_pushes(
2266 const hobject_t &soid,
2267 ObjectContextRef obc,
2268 RPGHandle *h)
2269 {
2270 list< map<pg_shard_t, pg_missing_t>::const_iterator > shards;
2271
2272 dout(20) << __func__ << " soid " << soid << dendl;
2273 // who needs it?
2274 assert(get_parent()->get_actingbackfill_shards().size() > 0);
2275 for (set<pg_shard_t>::iterator i =
2276 get_parent()->get_actingbackfill_shards().begin();
2277 i != get_parent()->get_actingbackfill_shards().end();
2278 ++i) {
2279 if (*i == get_parent()->whoami_shard()) continue;
2280 pg_shard_t peer = *i;
2281 map<pg_shard_t, pg_missing_t>::const_iterator j =
2282 get_parent()->get_shard_missing().find(peer);
2283 assert(j != get_parent()->get_shard_missing().end());
2284 if (j->second.is_missing(soid)) {
2285 shards.push_back(j);
2286 }
2287 }
2288
2289 // If more than 1 read will occur ignore possible request to not cache
2290 bool cache = shards.size() == 1 ? h->cache_dont_need : false;
2291
2292 for (auto j : shards) {
2293 pg_shard_t peer = j->first;
2294 h->pushes[peer].push_back(PushOp());
2295 int r = prep_push_to_replica(obc, soid, peer,
2296 &(h->pushes[peer].back()), cache);
2297 if (r < 0) {
2298 // Back out all failed reads
2299 for (auto k : shards) {
2300 pg_shard_t p = k->first;
2301 dout(10) << __func__ << " clean up peer " << p << dendl;
2302 h->pushes[p].pop_back();
2303 if (p == peer) break;
2304 }
2305 return r;
2306 }
2307 }
2308 return shards.size();
2309 }