]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/ReplicatedBackend.cc
07c0d316cf6d1e69dc8c0e29e0e273ef4416d2ad
[ceph.git] / ceph / src / osd / ReplicatedBackend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #include "common/errno.h"
15 #include "ReplicatedBackend.h"
16 #include "messages/MOSDOp.h"
17 #include "messages/MOSDSubOp.h"
18 #include "messages/MOSDRepOp.h"
19 #include "messages/MOSDSubOpReply.h"
20 #include "messages/MOSDRepOpReply.h"
21 #include "messages/MOSDPGPush.h"
22 #include "messages/MOSDPGPull.h"
23 #include "messages/MOSDPGPushReply.h"
24 #include "common/EventTrace.h"
25
26 #define dout_context cct
27 #define dout_subsys ceph_subsys_osd
28 #define DOUT_PREFIX_ARGS this
29 #undef dout_prefix
30 #define dout_prefix _prefix(_dout, this)
31 static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
32 return *_dout << pgb->get_parent()->gen_dbg_prefix();
33 }
34
35 namespace {
36 class PG_SendMessageOnConn: public Context {
37 PGBackend::Listener *pg;
38 Message *reply;
39 ConnectionRef conn;
40 public:
41 PG_SendMessageOnConn(
42 PGBackend::Listener *pg,
43 Message *reply,
44 ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {}
45 void finish(int) override {
46 pg->send_message_osd_cluster(reply, conn.get());
47 }
48 };
49
50 class PG_RecoveryQueueAsync : public Context {
51 PGBackend::Listener *pg;
52 unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
53 public:
54 PG_RecoveryQueueAsync(
55 PGBackend::Listener *pg,
56 GenContext<ThreadPool::TPHandle&> *c) : pg(pg), c(c) {}
57 void finish(int) override {
58 pg->schedule_recovery_work(c.release());
59 }
60 };
61 }
62
63 struct ReplicatedBackend::C_OSD_RepModifyApply : public Context {
64 ReplicatedBackend *pg;
65 RepModifyRef rm;
66 C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r)
67 : pg(pg), rm(r) {}
68 void finish(int r) override {
69 pg->repop_applied(rm);
70 }
71 };
72
73 struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context {
74 ReplicatedBackend *pg;
75 RepModifyRef rm;
76 C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
77 : pg(pg), rm(r) {}
78 void finish(int r) override {
79 pg->repop_commit(rm);
80 }
81 };
82
83 static void log_subop_stats(
84 PerfCounters *logger,
85 OpRequestRef op, int subop)
86 {
87 utime_t now = ceph_clock_now();
88 utime_t latency = now;
89 latency -= op->get_req()->get_recv_stamp();
90
91
92 logger->inc(l_osd_sop);
93 logger->tinc(l_osd_sop_lat, latency);
94 logger->inc(subop);
95
96 if (subop != l_osd_sop_pull) {
97 uint64_t inb = op->get_req()->get_data().length();
98 logger->inc(l_osd_sop_inb, inb);
99 if (subop == l_osd_sop_w) {
100 logger->inc(l_osd_sop_w_inb, inb);
101 logger->tinc(l_osd_sop_w_lat, latency);
102 } else if (subop == l_osd_sop_push) {
103 logger->inc(l_osd_sop_push_inb, inb);
104 logger->tinc(l_osd_sop_push_lat, latency);
105 } else
106 assert("no support subop" == 0);
107 } else {
108 logger->tinc(l_osd_sop_pull_lat, latency);
109 }
110 }
111
112 ReplicatedBackend::ReplicatedBackend(
113 PGBackend::Listener *pg,
114 coll_t coll,
115 ObjectStore::CollectionHandle &c,
116 ObjectStore *store,
117 CephContext *cct) :
118 PGBackend(cct, pg, store, coll, c) {}
119
120 void ReplicatedBackend::run_recovery_op(
121 PGBackend::RecoveryHandle *_h,
122 int priority)
123 {
124 RPGHandle *h = static_cast<RPGHandle *>(_h);
125 send_pushes(priority, h->pushes);
126 send_pulls(priority, h->pulls);
127 delete h;
128 }
129
130 void ReplicatedBackend::recover_object(
131 const hobject_t &hoid,
132 eversion_t v,
133 ObjectContextRef head,
134 ObjectContextRef obc,
135 RecoveryHandle *_h
136 )
137 {
138 dout(10) << __func__ << ": " << hoid << dendl;
139 RPGHandle *h = static_cast<RPGHandle *>(_h);
140 if (get_parent()->get_local_missing().is_missing(hoid)) {
141 assert(!obc);
142 // pull
143 prepare_pull(
144 v,
145 hoid,
146 head,
147 h);
148 return;
149 } else {
150 assert(obc);
151 int started = start_pushes(
152 hoid,
153 obc,
154 h);
155 assert(started > 0);
156 }
157 }
158
159 void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
160 {
161 for(map<pg_shard_t, set<hobject_t> >::iterator i = pull_from_peer.begin();
162 i != pull_from_peer.end();
163 ) {
164 if (osdmap->is_down(i->first.osd)) {
165 dout(10) << "check_recovery_sources resetting pulls from osd." << i->first
166 << ", osdmap has it marked down" << dendl;
167 for (set<hobject_t>::iterator j = i->second.begin();
168 j != i->second.end();
169 ++j) {
170 get_parent()->cancel_pull(*j);
171 clear_pull(pulling.find(*j), false);
172 }
173 pull_from_peer.erase(i++);
174 } else {
175 ++i;
176 }
177 }
178 }
179
180 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op)
181 {
182 dout(10) << __func__ << ": " << op << dendl;
183 switch (op->get_req()->get_type()) {
184 case MSG_OSD_PG_PULL:
185 return true;
186 default:
187 return false;
188 }
189 }
190
191 bool ReplicatedBackend::handle_message(
192 OpRequestRef op
193 )
194 {
195 dout(10) << __func__ << ": " << op << dendl;
196 switch (op->get_req()->get_type()) {
197 case MSG_OSD_PG_PUSH:
198 do_push(op);
199 return true;
200
201 case MSG_OSD_PG_PULL:
202 do_pull(op);
203 return true;
204
205 case MSG_OSD_PG_PUSH_REPLY:
206 do_push_reply(op);
207 return true;
208
209 case MSG_OSD_SUBOP: {
210 const MOSDSubOp *m = static_cast<const MOSDSubOp*>(op->get_req());
211 if (m->ops.size() == 0) {
212 assert(0);
213 }
214 break;
215 }
216
217 case MSG_OSD_REPOP: {
218 do_repop(op);
219 return true;
220 }
221
222 case MSG_OSD_REPOPREPLY: {
223 do_repop_reply(op);
224 return true;
225 }
226
227 default:
228 break;
229 }
230 return false;
231 }
232
233 void ReplicatedBackend::clear_recovery_state()
234 {
235 // clear pushing/pulling maps
236 for (auto &&i: pushing) {
237 for (auto &&j: i.second) {
238 get_parent()->release_locks(j.second.lock_manager);
239 }
240 }
241 pushing.clear();
242
243 for (auto &&i: pulling) {
244 get_parent()->release_locks(i.second.lock_manager);
245 }
246 pulling.clear();
247 pull_from_peer.clear();
248 }
249
250 void ReplicatedBackend::on_change()
251 {
252 dout(10) << __func__ << dendl;
253 for (map<ceph_tid_t, InProgressOp>::iterator i = in_progress_ops.begin();
254 i != in_progress_ops.end();
255 in_progress_ops.erase(i++)) {
256 if (i->second.on_commit)
257 delete i->second.on_commit;
258 if (i->second.on_applied)
259 delete i->second.on_applied;
260 }
261 clear_recovery_state();
262 }
263
264 void ReplicatedBackend::on_flushed()
265 {
266 }
267
268 int ReplicatedBackend::objects_read_sync(
269 const hobject_t &hoid,
270 uint64_t off,
271 uint64_t len,
272 uint32_t op_flags,
273 bufferlist *bl)
274 {
275 return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags);
276 }
277
278 struct AsyncReadCallback : public GenContext<ThreadPool::TPHandle&> {
279 int r;
280 Context *c;
281 AsyncReadCallback(int r, Context *c) : r(r), c(c) {}
282 void finish(ThreadPool::TPHandle&) override {
283 c->complete(r);
284 c = NULL;
285 }
286 ~AsyncReadCallback() override {
287 delete c;
288 }
289 };
290 void ReplicatedBackend::objects_read_async(
291 const hobject_t &hoid,
292 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
293 pair<bufferlist*, Context*> > > &to_read,
294 Context *on_complete,
295 bool fast_read)
296 {
297 // There is no fast read implementation for replication backend yet
298 assert(!fast_read);
299
300 int r = 0;
301 for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
302 pair<bufferlist*, Context*> > >::const_iterator i =
303 to_read.begin();
304 i != to_read.end() && r >= 0;
305 ++i) {
306 int _r = store->read(ch, ghobject_t(hoid), i->first.get<0>(),
307 i->first.get<1>(), *(i->second.first),
308 i->first.get<2>());
309 if (i->second.second) {
310 get_parent()->schedule_recovery_work(
311 get_parent()->bless_gencontext(
312 new AsyncReadCallback(_r, i->second.second)));
313 }
314 if (_r < 0)
315 r = _r;
316 }
317 get_parent()->schedule_recovery_work(
318 get_parent()->bless_gencontext(
319 new AsyncReadCallback(r, on_complete)));
320 }
321
322 class C_OSD_OnOpCommit : public Context {
323 ReplicatedBackend *pg;
324 ReplicatedBackend::InProgressOp *op;
325 public:
326 C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
327 : pg(pg), op(op) {}
328 void finish(int) override {
329 pg->op_commit(op);
330 }
331 };
332
333 class C_OSD_OnOpApplied : public Context {
334 ReplicatedBackend *pg;
335 ReplicatedBackend::InProgressOp *op;
336 public:
337 C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
338 : pg(pg), op(op) {}
339 void finish(int) override {
340 pg->op_applied(op);
341 }
342 };
343
344 void generate_transaction(
345 PGTransactionUPtr &pgt,
346 const coll_t &coll,
347 bool legacy_log_entries,
348 vector<pg_log_entry_t> &log_entries,
349 ObjectStore::Transaction *t,
350 set<hobject_t> *added,
351 set<hobject_t> *removed)
352 {
353 assert(t);
354 assert(added);
355 assert(removed);
356
357 for (auto &&le: log_entries) {
358 le.mark_unrollbackable();
359 auto oiter = pgt->op_map.find(le.soid);
360 if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) {
361 bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8);
362 ::encode(oiter->second.updated_snaps->second, bl);
363 le.snaps.swap(bl);
364 le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
365 }
366 }
367
368 pgt->safe_create_traverse(
369 [&](pair<const hobject_t, PGTransaction::ObjectOperation> &obj_op) {
370 const hobject_t &oid = obj_op.first;
371 const ghobject_t goid =
372 ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
373 const PGTransaction::ObjectOperation &op = obj_op.second;
374
375 if (oid.is_temp()) {
376 if (op.is_fresh_object()) {
377 added->insert(oid);
378 } else if (op.is_delete()) {
379 removed->insert(oid);
380 }
381 }
382
383 if (op.delete_first) {
384 t->remove(coll, goid);
385 }
386
387 match(
388 op.init_type,
389 [&](const PGTransaction::ObjectOperation::Init::None &) {
390 },
391 [&](const PGTransaction::ObjectOperation::Init::Create &op) {
392 t->touch(coll, goid);
393 },
394 [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
395 t->clone(
396 coll,
397 ghobject_t(
398 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
399 goid);
400 },
401 [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
402 assert(op.source.is_temp());
403 t->collection_move_rename(
404 coll,
405 ghobject_t(
406 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
407 coll,
408 goid);
409 });
410
411 if (op.truncate) {
412 t->truncate(coll, goid, op.truncate->first);
413 if (op.truncate->first != op.truncate->second)
414 t->truncate(coll, goid, op.truncate->second);
415 }
416
417 if (!op.attr_updates.empty()) {
418 map<string, bufferlist> attrs;
419 for (auto &&p: op.attr_updates) {
420 if (p.second)
421 attrs[p.first] = *(p.second);
422 else
423 t->rmattr(coll, goid, p.first);
424 }
425 t->setattrs(coll, goid, attrs);
426 }
427
428 if (op.clear_omap)
429 t->omap_clear(coll, goid);
430 if (op.omap_header)
431 t->omap_setheader(coll, goid, *(op.omap_header));
432
433 for (auto &&up: op.omap_updates) {
434 using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType;
435 switch (up.first) {
436 case UpdateType::Remove:
437 t->omap_rmkeys(coll, goid, up.second);
438 break;
439 case UpdateType::Insert:
440 t->omap_setkeys(coll, goid, up.second);
441 break;
442 }
443 }
444
445 // updated_snaps doesn't matter since we marked unrollbackable
446
447 if (op.alloc_hint) {
448 auto &hint = *(op.alloc_hint);
449 t->set_alloc_hint(
450 coll,
451 goid,
452 hint.expected_object_size,
453 hint.expected_write_size,
454 hint.flags);
455 }
456
457 for (auto &&extent: op.buffer_updates) {
458 using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
459 match(
460 extent.get_val(),
461 [&](const BufferUpdate::Write &op) {
462 t->write(
463 coll,
464 goid,
465 extent.get_off(),
466 extent.get_len(),
467 op.buffer);
468 },
469 [&](const BufferUpdate::Zero &op) {
470 t->zero(
471 coll,
472 goid,
473 extent.get_off(),
474 extent.get_len());
475 },
476 [&](const BufferUpdate::CloneRange &op) {
477 assert(op.len == extent.get_len());
478 t->clone_range(
479 coll,
480 ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
481 goid,
482 op.offset,
483 extent.get_len(),
484 extent.get_off());
485 });
486 }
487 });
488 }
489
490 void ReplicatedBackend::submit_transaction(
491 const hobject_t &soid,
492 const object_stat_sum_t &delta_stats,
493 const eversion_t &at_version,
494 PGTransactionUPtr &&_t,
495 const eversion_t &trim_to,
496 const eversion_t &roll_forward_to,
497 const vector<pg_log_entry_t> &_log_entries,
498 boost::optional<pg_hit_set_history_t> &hset_history,
499 Context *on_local_applied_sync,
500 Context *on_all_acked,
501 Context *on_all_commit,
502 ceph_tid_t tid,
503 osd_reqid_t reqid,
504 OpRequestRef orig_op)
505 {
506 parent->apply_stats(
507 soid,
508 delta_stats);
509
510 vector<pg_log_entry_t> log_entries(_log_entries);
511 ObjectStore::Transaction op_t;
512 PGTransactionUPtr t(std::move(_t));
513 set<hobject_t> added, removed;
514 generate_transaction(
515 t,
516 coll,
517 (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
518 log_entries,
519 &op_t,
520 &added,
521 &removed);
522 assert(added.size() <= 1);
523 assert(removed.size() <= 1);
524
525 assert(!in_progress_ops.count(tid));
526 InProgressOp &op = in_progress_ops.insert(
527 make_pair(
528 tid,
529 InProgressOp(
530 tid, on_all_commit, on_all_acked,
531 orig_op, at_version)
532 )
533 ).first->second;
534
535 op.waiting_for_applied.insert(
536 parent->get_actingbackfill_shards().begin(),
537 parent->get_actingbackfill_shards().end());
538 op.waiting_for_commit.insert(
539 parent->get_actingbackfill_shards().begin(),
540 parent->get_actingbackfill_shards().end());
541
542 issue_op(
543 soid,
544 at_version,
545 tid,
546 reqid,
547 trim_to,
548 at_version,
549 added.size() ? *(added.begin()) : hobject_t(),
550 removed.size() ? *(removed.begin()) : hobject_t(),
551 log_entries,
552 hset_history,
553 &op,
554 op_t);
555
556 add_temp_objs(added);
557 clear_temp_objs(removed);
558
559 parent->log_operation(
560 log_entries,
561 hset_history,
562 trim_to,
563 at_version,
564 true,
565 op_t);
566
567 op_t.register_on_applied_sync(on_local_applied_sync);
568 op_t.register_on_applied(
569 parent->bless_context(
570 new C_OSD_OnOpApplied(this, &op)));
571 op_t.register_on_commit(
572 parent->bless_context(
573 new C_OSD_OnOpCommit(this, &op)));
574
575 vector<ObjectStore::Transaction> tls;
576 tls.push_back(std::move(op_t));
577
578 parent->queue_transactions(tls, op.op);
579 }
580
581 void ReplicatedBackend::op_applied(
582 InProgressOp *op)
583 {
584 FUNCTRACE();
585 OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_APPLIED_BEGIN", true);
586 dout(10) << __func__ << ": " << op->tid << dendl;
587 if (op->op) {
588 op->op->mark_event("op_applied");
589 op->op->pg_trace.event("op applied");
590 }
591
592 op->waiting_for_applied.erase(get_parent()->whoami_shard());
593 parent->op_applied(op->v);
594
595 if (op->waiting_for_applied.empty()) {
596 op->on_applied->complete(0);
597 op->on_applied = 0;
598 }
599 if (op->done()) {
600 assert(!op->on_commit && !op->on_applied);
601 in_progress_ops.erase(op->tid);
602 }
603 }
604
605 void ReplicatedBackend::op_commit(
606 InProgressOp *op)
607 {
608 FUNCTRACE();
609 OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_COMMIT_BEGIN", true);
610 dout(10) << __func__ << ": " << op->tid << dendl;
611 if (op->op) {
612 op->op->mark_event("op_commit");
613 op->op->pg_trace.event("op commit");
614 }
615
616 op->waiting_for_commit.erase(get_parent()->whoami_shard());
617
618 if (op->waiting_for_commit.empty()) {
619 op->on_commit->complete(0);
620 op->on_commit = 0;
621 }
622 if (op->done()) {
623 assert(!op->on_commit && !op->on_applied);
624 in_progress_ops.erase(op->tid);
625 }
626 }
627
628 void ReplicatedBackend::do_repop_reply(OpRequestRef op)
629 {
630 static_cast<MOSDRepOpReply*>(op->get_nonconst_req())->finish_decode();
631 const MOSDRepOpReply *r = static_cast<const MOSDRepOpReply *>(op->get_req());
632 assert(r->get_header().type == MSG_OSD_REPOPREPLY);
633
634 op->mark_started();
635
636 // must be replication.
637 ceph_tid_t rep_tid = r->get_tid();
638 pg_shard_t from = r->from;
639
640 if (in_progress_ops.count(rep_tid)) {
641 map<ceph_tid_t, InProgressOp>::iterator iter =
642 in_progress_ops.find(rep_tid);
643 InProgressOp &ip_op = iter->second;
644 const MOSDOp *m = NULL;
645 if (ip_op.op)
646 m = static_cast<const MOSDOp *>(ip_op.op->get_req());
647
648 if (m)
649 dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
650 << " ack_type " << (int)r->ack_type
651 << " from " << from
652 << dendl;
653 else
654 dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) "
655 << " ack_type " << (int)r->ack_type
656 << " from " << from
657 << dendl;
658
659 // oh, good.
660
661 if (r->ack_type & CEPH_OSD_FLAG_ONDISK) {
662 assert(ip_op.waiting_for_commit.count(from));
663 ip_op.waiting_for_commit.erase(from);
664 if (ip_op.op) {
665 ostringstream ss;
666 ss << "sub_op_commit_rec from " << from;
667 ip_op.op->mark_event_string(ss.str());
668 ip_op.op->pg_trace.event("sub_op_commit_rec");
669 }
670 } else {
671 assert(ip_op.waiting_for_applied.count(from));
672 if (ip_op.op) {
673 ostringstream ss;
674 ss << "sub_op_applied_rec from " << from;
675 ip_op.op->mark_event_string(ss.str());
676 ip_op.op->pg_trace.event("sub_op_applied_rec");
677 }
678 }
679 ip_op.waiting_for_applied.erase(from);
680
681 parent->update_peer_last_complete_ondisk(
682 from,
683 r->get_last_complete_ondisk());
684
685 if (ip_op.waiting_for_applied.empty() &&
686 ip_op.on_applied) {
687 ip_op.on_applied->complete(0);
688 ip_op.on_applied = 0;
689 }
690 if (ip_op.waiting_for_commit.empty() &&
691 ip_op.on_commit) {
692 ip_op.on_commit->complete(0);
693 ip_op.on_commit= 0;
694 }
695 if (ip_op.done()) {
696 assert(!ip_op.on_commit && !ip_op.on_applied);
697 in_progress_ops.erase(iter);
698 }
699 }
700 }
701
702 void ReplicatedBackend::be_deep_scrub(
703 const hobject_t &poid,
704 uint32_t seed,
705 ScrubMap::object &o,
706 ThreadPool::TPHandle &handle)
707 {
708 dout(10) << __func__ << " " << poid << " seed "
709 << std::hex << seed << std::dec << dendl;
710 bufferhash h(seed), oh(seed);
711 bufferlist bl, hdrbl;
712 int r;
713 __u64 pos = 0;
714
715 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
716
717 while (true) {
718 handle.reset_tp_timeout();
719 r = store->read(
720 ch,
721 ghobject_t(
722 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
723 pos,
724 cct->_conf->osd_deep_scrub_stride, bl,
725 fadvise_flags, true);
726 if (r <= 0)
727 break;
728
729 h << bl;
730 pos += bl.length();
731 bl.clear();
732 }
733 if (r == -EIO) {
734 dout(25) << __func__ << " " << poid << " got "
735 << r << " on read, read_error" << dendl;
736 o.read_error = true;
737 return;
738 }
739 o.digest = h.digest();
740 o.digest_present = true;
741
742 bl.clear();
743 r = store->omap_get_header(
744 coll,
745 ghobject_t(
746 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
747 &hdrbl, true);
748 // NOTE: bobtail to giant, we would crc the head as (len, head).
749 // that changes at the same time we start using a non-zero seed.
750 if (r == 0 && hdrbl.length()) {
751 dout(25) << "CRC header " << string(hdrbl.c_str(), hdrbl.length())
752 << dendl;
753 if (seed == 0) {
754 // legacy
755 bufferlist bl;
756 ::encode(hdrbl, bl);
757 oh << bl;
758 } else {
759 oh << hdrbl;
760 }
761 } else if (r == -EIO) {
762 dout(25) << __func__ << " " << poid << " got "
763 << r << " on omap header read, read_error" << dendl;
764 o.read_error = true;
765 return;
766 }
767
768 ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(
769 coll,
770 ghobject_t(
771 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
772 assert(iter);
773 uint64_t keys_scanned = 0;
774 for (iter->seek_to_first(); iter->status() == 0 && iter->valid();
775 iter->next(false)) {
776 if (cct->_conf->osd_scan_list_ping_tp_interval &&
777 (keys_scanned % cct->_conf->osd_scan_list_ping_tp_interval == 0)) {
778 handle.reset_tp_timeout();
779 }
780 ++keys_scanned;
781
782 dout(25) << "CRC key " << iter->key() << " value:\n";
783 iter->value().hexdump(*_dout);
784 *_dout << dendl;
785
786 ::encode(iter->key(), bl);
787 ::encode(iter->value(), bl);
788 oh << bl;
789 bl.clear();
790 }
791
792 if (iter->status() < 0) {
793 dout(25) << __func__ << " " << poid
794 << " on omap scan, db status error" << dendl;
795 o.read_error = true;
796 return;
797 }
798
799 //Store final calculated CRC32 of omap header & key/values
800 o.omap_digest = oh.digest();
801 o.omap_digest_present = true;
802 dout(20) << __func__ << " " << poid << " omap_digest "
803 << std::hex << o.omap_digest << std::dec << dendl;
804 }
805
806 void ReplicatedBackend::_do_push(OpRequestRef op)
807 {
808 const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
809 assert(m->get_type() == MSG_OSD_PG_PUSH);
810 pg_shard_t from = m->from;
811
812 op->mark_started();
813
814 vector<PushReplyOp> replies;
815 ObjectStore::Transaction t;
816 ostringstream ss;
817 if (get_parent()->check_failsafe_full(ss)) {
818 dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl;
819 ceph_abort();
820 }
821 for (vector<PushOp>::const_iterator i = m->pushes.begin();
822 i != m->pushes.end();
823 ++i) {
824 replies.push_back(PushReplyOp());
825 handle_push(from, *i, &(replies.back()), &t);
826 }
827
828 MOSDPGPushReply *reply = new MOSDPGPushReply;
829 reply->from = get_parent()->whoami_shard();
830 reply->set_priority(m->get_priority());
831 reply->pgid = get_info().pgid;
832 reply->map_epoch = m->map_epoch;
833 reply->min_epoch = m->min_epoch;
834 reply->replies.swap(replies);
835 reply->compute_cost(cct);
836
837 t.register_on_complete(
838 new PG_SendMessageOnConn(
839 get_parent(), reply, m->get_connection()));
840
841 get_parent()->queue_transaction(std::move(t));
842 }
843
844 struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
845 ReplicatedBackend *bc;
846 list<ReplicatedBackend::pull_complete_info> to_continue;
847 int priority;
848 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
849 : bc(bc), priority(priority) {}
850
851 void finish(ThreadPool::TPHandle &handle) override {
852 ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
853 for (auto &&i: to_continue) {
854 auto j = bc->pulling.find(i.hoid);
855 assert(j != bc->pulling.end());
856 ObjectContextRef obc = j->second.obc;
857 bc->clear_pull(j, false /* already did it */);
858 if (!bc->start_pushes(i.hoid, obc, h)) {
859 bc->get_parent()->on_global_recover(
860 i.hoid, i.stat);
861 }
862 handle.reset_tp_timeout();
863 }
864 bc->run_recovery_op(h, priority);
865 }
866 };
867
868 void ReplicatedBackend::_do_pull_response(OpRequestRef op)
869 {
870 const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
871 assert(m->get_type() == MSG_OSD_PG_PUSH);
872 pg_shard_t from = m->from;
873
874 op->mark_started();
875
876 vector<PullOp> replies(1);
877
878 ostringstream ss;
879 if (get_parent()->check_failsafe_full(ss)) {
880 dout(10) << __func__ << " Out of space (failsafe) processing pull response (push): " << ss.str() << dendl;
881 ceph_abort();
882 }
883
884 ObjectStore::Transaction t;
885 list<pull_complete_info> to_continue;
886 for (vector<PushOp>::const_iterator i = m->pushes.begin();
887 i != m->pushes.end();
888 ++i) {
889 bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, &t);
890 if (more)
891 replies.push_back(PullOp());
892 }
893 if (!to_continue.empty()) {
894 C_ReplicatedBackend_OnPullComplete *c =
895 new C_ReplicatedBackend_OnPullComplete(
896 this,
897 m->get_priority());
898 c->to_continue.swap(to_continue);
899 t.register_on_complete(
900 new PG_RecoveryQueueAsync(
901 get_parent(),
902 get_parent()->bless_gencontext(c)));
903 }
904 replies.erase(replies.end() - 1);
905
906 if (replies.size()) {
907 MOSDPGPull *reply = new MOSDPGPull;
908 reply->from = parent->whoami_shard();
909 reply->set_priority(m->get_priority());
910 reply->pgid = get_info().pgid;
911 reply->map_epoch = m->map_epoch;
912 reply->min_epoch = m->min_epoch;
913 reply->set_pulls(&replies);
914 reply->compute_cost(cct);
915
916 t.register_on_complete(
917 new PG_SendMessageOnConn(
918 get_parent(), reply, m->get_connection()));
919 }
920
921 get_parent()->queue_transaction(std::move(t));
922 }
923
924 void ReplicatedBackend::do_pull(OpRequestRef op)
925 {
926 MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_nonconst_req());
927 assert(m->get_type() == MSG_OSD_PG_PULL);
928 pg_shard_t from = m->from;
929
930 map<pg_shard_t, vector<PushOp> > replies;
931 vector<PullOp> pulls;
932 m->take_pulls(&pulls);
933 for (auto& i : pulls) {
934 replies[from].push_back(PushOp());
935 handle_pull(from, i, &(replies[from].back()));
936 }
937 send_pushes(m->get_priority(), replies);
938 }
939
940 void ReplicatedBackend::do_push_reply(OpRequestRef op)
941 {
942 const MOSDPGPushReply *m = static_cast<const MOSDPGPushReply *>(op->get_req());
943 assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
944 pg_shard_t from = m->from;
945
946 vector<PushOp> replies(1);
947 for (vector<PushReplyOp>::const_iterator i = m->replies.begin();
948 i != m->replies.end();
949 ++i) {
950 bool more = handle_push_reply(from, *i, &(replies.back()));
951 if (more)
952 replies.push_back(PushOp());
953 }
954 replies.erase(replies.end() - 1);
955
956 map<pg_shard_t, vector<PushOp> > _replies;
957 _replies[from].swap(replies);
958 send_pushes(m->get_priority(), _replies);
959 }
960
961 Message * ReplicatedBackend::generate_subop(
962 const hobject_t &soid,
963 const eversion_t &at_version,
964 ceph_tid_t tid,
965 osd_reqid_t reqid,
966 eversion_t pg_trim_to,
967 eversion_t pg_roll_forward_to,
968 hobject_t new_temp_oid,
969 hobject_t discard_temp_oid,
970 const vector<pg_log_entry_t> &log_entries,
971 boost::optional<pg_hit_set_history_t> &hset_hist,
972 ObjectStore::Transaction &op_t,
973 pg_shard_t peer,
974 const pg_info_t &pinfo)
975 {
976 int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
977 // forward the write/update/whatever
978 MOSDRepOp *wr = new MOSDRepOp(
979 reqid, parent->whoami_shard(),
980 spg_t(get_info().pgid.pgid, peer.shard),
981 soid, acks_wanted,
982 get_osdmap()->get_epoch(),
983 parent->get_last_peering_reset_epoch(),
984 tid, at_version);
985
986 // ship resulting transaction, log entries, and pg_stats
987 if (!parent->should_send_op(peer, soid)) {
988 dout(10) << "issue_repop shipping empty opt to osd." << peer
989 <<", object " << soid
990 << " beyond MAX(last_backfill_started "
991 << ", pinfo.last_backfill "
992 << pinfo.last_backfill << ")" << dendl;
993 ObjectStore::Transaction t;
994 ::encode(t, wr->get_data());
995 } else {
996 ::encode(op_t, wr->get_data());
997 wr->get_header().data_off = op_t.get_data_alignment();
998 }
999
1000 ::encode(log_entries, wr->logbl);
1001
1002 if (pinfo.is_incomplete())
1003 wr->pg_stats = pinfo.stats; // reflects backfill progress
1004 else
1005 wr->pg_stats = get_info().stats;
1006
1007 wr->pg_trim_to = pg_trim_to;
1008 wr->pg_roll_forward_to = pg_roll_forward_to;
1009
1010 wr->new_temp_oid = new_temp_oid;
1011 wr->discard_temp_oid = discard_temp_oid;
1012 wr->updated_hit_set_history = hset_hist;
1013 return wr;
1014 }
1015
1016 void ReplicatedBackend::issue_op(
1017 const hobject_t &soid,
1018 const eversion_t &at_version,
1019 ceph_tid_t tid,
1020 osd_reqid_t reqid,
1021 eversion_t pg_trim_to,
1022 eversion_t pg_roll_forward_to,
1023 hobject_t new_temp_oid,
1024 hobject_t discard_temp_oid,
1025 const vector<pg_log_entry_t> &log_entries,
1026 boost::optional<pg_hit_set_history_t> &hset_hist,
1027 InProgressOp *op,
1028 ObjectStore::Transaction &op_t)
1029 {
1030 if (op->op)
1031 op->op->pg_trace.event("issue replication ops");
1032
1033 if (parent->get_actingbackfill_shards().size() > 1) {
1034 ostringstream ss;
1035 set<pg_shard_t> replicas = parent->get_actingbackfill_shards();
1036 replicas.erase(parent->whoami_shard());
1037 ss << "waiting for subops from " << replicas;
1038 if (op->op)
1039 op->op->mark_sub_op_sent(ss.str());
1040 }
1041 for (set<pg_shard_t>::const_iterator i =
1042 parent->get_actingbackfill_shards().begin();
1043 i != parent->get_actingbackfill_shards().end();
1044 ++i) {
1045 if (*i == parent->whoami_shard()) continue;
1046 pg_shard_t peer = *i;
1047 const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second;
1048
1049 Message *wr;
1050 wr = generate_subop(
1051 soid,
1052 at_version,
1053 tid,
1054 reqid,
1055 pg_trim_to,
1056 pg_roll_forward_to,
1057 new_temp_oid,
1058 discard_temp_oid,
1059 log_entries,
1060 hset_hist,
1061 op_t,
1062 peer,
1063 pinfo);
1064 if (op->op)
1065 wr->trace.init("replicated op", nullptr, &op->op->pg_trace);
1066 get_parent()->send_message_osd_cluster(
1067 peer.osd, wr, get_osdmap()->get_epoch());
1068 }
1069 }
1070
1071 // sub op modify
1072 void ReplicatedBackend::do_repop(OpRequestRef op)
1073 {
1074 static_cast<MOSDRepOp*>(op->get_nonconst_req())->finish_decode();
1075 const MOSDRepOp *m = static_cast<const MOSDRepOp *>(op->get_req());
1076 int msg_type = m->get_type();
1077 assert(MSG_OSD_REPOP == msg_type);
1078
1079 const hobject_t& soid = m->poid;
1080
1081 dout(10) << __func__ << " " << soid
1082 << " v " << m->version
1083 << (m->logbl.length() ? " (transaction)" : " (parallel exec")
1084 << " " << m->logbl.length()
1085 << dendl;
1086
1087 // sanity checks
1088 assert(m->map_epoch >= get_info().history.same_interval_since);
1089
1090 // we better not be missing this.
1091 assert(!parent->get_log().get_missing().is_missing(soid));
1092
1093 int ackerosd = m->get_source().num();
1094
1095 op->mark_started();
1096
1097 RepModifyRef rm(std::make_shared<RepModify>());
1098 rm->op = op;
1099 rm->ackerosd = ackerosd;
1100 rm->last_complete = get_info().last_complete;
1101 rm->epoch_started = get_osdmap()->get_epoch();
1102
1103 assert(m->logbl.length());
1104 // shipped transaction and log entries
1105 vector<pg_log_entry_t> log;
1106
1107 bufferlist::iterator p = const_cast<bufferlist&>(m->get_data()).begin();
1108 ::decode(rm->opt, p);
1109
1110 if (m->new_temp_oid != hobject_t()) {
1111 dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
1112 add_temp_obj(m->new_temp_oid);
1113 }
1114 if (m->discard_temp_oid != hobject_t()) {
1115 dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
1116 if (rm->opt.empty()) {
1117 dout(10) << __func__ << ": removing object " << m->discard_temp_oid
1118 << " since we won't get the transaction" << dendl;
1119 rm->localt.remove(coll, ghobject_t(m->discard_temp_oid));
1120 }
1121 clear_temp_obj(m->discard_temp_oid);
1122 }
1123
1124 p = const_cast<bufferlist&>(m->logbl).begin();
1125 ::decode(log, p);
1126 rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1127
1128 bool update_snaps = false;
1129 if (!rm->opt.empty()) {
1130 // If the opt is non-empty, we infer we are before
1131 // last_backfill (according to the primary, not our
1132 // not-quite-accurate value), and should update the
1133 // collections now. Otherwise, we do it later on push.
1134 update_snaps = true;
1135 }
1136 parent->update_stats(m->pg_stats);
1137 parent->log_operation(
1138 log,
1139 m->updated_hit_set_history,
1140 m->pg_trim_to,
1141 m->pg_roll_forward_to,
1142 update_snaps,
1143 rm->localt);
1144
1145 rm->opt.register_on_commit(
1146 parent->bless_context(
1147 new C_OSD_RepModifyCommit(this, rm)));
1148 rm->localt.register_on_applied(
1149 parent->bless_context(
1150 new C_OSD_RepModifyApply(this, rm)));
1151 vector<ObjectStore::Transaction> tls;
1152 tls.reserve(2);
1153 tls.push_back(std::move(rm->localt));
1154 tls.push_back(std::move(rm->opt));
1155 parent->queue_transactions(tls, op);
1156 // op is cleaned up by oncommit/onapply when both are executed
1157 }
1158
1159 void ReplicatedBackend::repop_applied(RepModifyRef rm)
1160 {
1161 rm->op->mark_event("sub_op_applied");
1162 rm->applied = true;
1163 rm->op->pg_trace.event("sup_op_applied");
1164
1165 dout(10) << __func__ << " on " << rm << " op "
1166 << *rm->op->get_req() << dendl;
1167 const Message *m = rm->op->get_req();
1168 const MOSDRepOp *req = static_cast<const MOSDRepOp*>(m);
1169 eversion_t version = req->version;
1170
1171 // send ack to acker only if we haven't sent a commit already
1172 if (!rm->committed) {
1173 Message *ack = new MOSDRepOpReply(
1174 req, parent->whoami_shard(),
1175 0, get_osdmap()->get_epoch(), req->min_epoch, CEPH_OSD_FLAG_ACK);
1176 ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
1177 ack->trace = rm->op->pg_trace;
1178 get_parent()->send_message_osd_cluster(
1179 rm->ackerosd, ack, get_osdmap()->get_epoch());
1180 }
1181
1182 parent->op_applied(version);
1183 }
1184
1185 void ReplicatedBackend::repop_commit(RepModifyRef rm)
1186 {
1187 rm->op->mark_commit_sent();
1188 rm->op->pg_trace.event("sup_op_commit");
1189 rm->committed = true;
1190
1191 // send commit.
1192 const MOSDRepOp *m = static_cast<const MOSDRepOp*>(rm->op->get_req());
1193 assert(m->get_type() == MSG_OSD_REPOP);
1194 dout(10) << __func__ << " on op " << *m
1195 << ", sending commit to osd." << rm->ackerosd
1196 << dendl;
1197 assert(get_osdmap()->is_up(rm->ackerosd));
1198
1199 get_parent()->update_last_complete_ondisk(rm->last_complete);
1200
1201 MOSDRepOpReply *reply = new MOSDRepOpReply(
1202 m,
1203 get_parent()->whoami_shard(),
1204 0, get_osdmap()->get_epoch(), m->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
1205 reply->set_last_complete_ondisk(rm->last_complete);
1206 reply->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
1207 reply->trace = rm->op->pg_trace;
1208 get_parent()->send_message_osd_cluster(
1209 rm->ackerosd, reply, get_osdmap()->get_epoch());
1210
1211 log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
1212 }
1213
1214
1215 // ===========================================================
1216
1217 void ReplicatedBackend::calc_head_subsets(
1218 ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
1219 const pg_missing_t& missing,
1220 const hobject_t &last_backfill,
1221 interval_set<uint64_t>& data_subset,
1222 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1223 ObcLockManager &manager)
1224 {
1225 dout(10) << "calc_head_subsets " << head
1226 << " clone_overlap " << snapset.clone_overlap << dendl;
1227
1228 uint64_t size = obc->obs.oi.size;
1229 if (size)
1230 data_subset.insert(0, size);
1231
1232 if (get_parent()->get_pool().allow_incomplete_clones()) {
1233 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1234 return;
1235 }
1236
1237 if (!cct->_conf->osd_recover_clone_overlap) {
1238 dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
1239 return;
1240 }
1241
1242
1243 interval_set<uint64_t> cloning;
1244 interval_set<uint64_t> prev;
1245 if (size)
1246 prev.insert(0, size);
1247
1248 for (int j=snapset.clones.size()-1; j>=0; j--) {
1249 hobject_t c = head;
1250 c.snap = snapset.clones[j];
1251 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1252 if (!missing.is_missing(c) &&
1253 c < last_backfill &&
1254 get_parent()->try_lock_for_read(c, manager)) {
1255 dout(10) << "calc_head_subsets " << head << " has prev " << c
1256 << " overlap " << prev << dendl;
1257 clone_subsets[c] = prev;
1258 cloning.union_of(prev);
1259 break;
1260 }
1261 dout(10) << "calc_head_subsets " << head << " does not have prev " << c
1262 << " overlap " << prev << dendl;
1263 }
1264
1265
1266 if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1267 dout(10) << "skipping clone, too many holes" << dendl;
1268 get_parent()->release_locks(manager);
1269 clone_subsets.clear();
1270 cloning.clear();
1271 }
1272
1273 // what's left for us to push?
1274 data_subset.subtract(cloning);
1275
1276 dout(10) << "calc_head_subsets " << head
1277 << " data_subset " << data_subset
1278 << " clone_subsets " << clone_subsets << dendl;
1279 }
1280
1281 void ReplicatedBackend::calc_clone_subsets(
1282 SnapSet& snapset, const hobject_t& soid,
1283 const pg_missing_t& missing,
1284 const hobject_t &last_backfill,
1285 interval_set<uint64_t>& data_subset,
1286 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1287 ObcLockManager &manager)
1288 {
1289 dout(10) << "calc_clone_subsets " << soid
1290 << " clone_overlap " << snapset.clone_overlap << dendl;
1291
1292 uint64_t size = snapset.clone_size[soid.snap];
1293 if (size)
1294 data_subset.insert(0, size);
1295
1296 if (get_parent()->get_pool().allow_incomplete_clones()) {
1297 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1298 return;
1299 }
1300
1301 if (!cct->_conf->osd_recover_clone_overlap) {
1302 dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
1303 return;
1304 }
1305
1306 unsigned i;
1307 for (i=0; i < snapset.clones.size(); i++)
1308 if (snapset.clones[i] == soid.snap)
1309 break;
1310
1311 // any overlap with next older clone?
1312 interval_set<uint64_t> cloning;
1313 interval_set<uint64_t> prev;
1314 if (size)
1315 prev.insert(0, size);
1316 for (int j=i-1; j>=0; j--) {
1317 hobject_t c = soid;
1318 c.snap = snapset.clones[j];
1319 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1320 if (!missing.is_missing(c) &&
1321 c < last_backfill &&
1322 get_parent()->try_lock_for_read(c, manager)) {
1323 dout(10) << "calc_clone_subsets " << soid << " has prev " << c
1324 << " overlap " << prev << dendl;
1325 clone_subsets[c] = prev;
1326 cloning.union_of(prev);
1327 break;
1328 }
1329 dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
1330 << " overlap " << prev << dendl;
1331 }
1332
1333 // overlap with next newest?
1334 interval_set<uint64_t> next;
1335 if (size)
1336 next.insert(0, size);
1337 for (unsigned j=i+1; j<snapset.clones.size(); j++) {
1338 hobject_t c = soid;
1339 c.snap = snapset.clones[j];
1340 next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
1341 if (!missing.is_missing(c) &&
1342 c < last_backfill &&
1343 get_parent()->try_lock_for_read(c, manager)) {
1344 dout(10) << "calc_clone_subsets " << soid << " has next " << c
1345 << " overlap " << next << dendl;
1346 clone_subsets[c] = next;
1347 cloning.union_of(next);
1348 break;
1349 }
1350 dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
1351 << " overlap " << next << dendl;
1352 }
1353
1354 if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1355 dout(10) << "skipping clone, too many holes" << dendl;
1356 get_parent()->release_locks(manager);
1357 clone_subsets.clear();
1358 cloning.clear();
1359 }
1360
1361
1362 // what's left for us to push?
1363 data_subset.subtract(cloning);
1364
1365 dout(10) << "calc_clone_subsets " << soid
1366 << " data_subset " << data_subset
1367 << " clone_subsets " << clone_subsets << dendl;
1368 }
1369
1370 void ReplicatedBackend::prepare_pull(
1371 eversion_t v,
1372 const hobject_t& soid,
1373 ObjectContextRef headctx,
1374 RPGHandle *h)
1375 {
1376 assert(get_parent()->get_local_missing().get_items().count(soid));
1377 eversion_t _v = get_parent()->get_local_missing().get_items().find(
1378 soid)->second.need;
1379 assert(_v == v);
1380 const map<hobject_t, set<pg_shard_t>> &missing_loc(
1381 get_parent()->get_missing_loc_shards());
1382 const map<pg_shard_t, pg_missing_t > &peer_missing(
1383 get_parent()->get_shard_missing());
1384 map<hobject_t, set<pg_shard_t>>::const_iterator q = missing_loc.find(soid);
1385 assert(q != missing_loc.end());
1386 assert(!q->second.empty());
1387
1388 // pick a pullee
1389 vector<pg_shard_t> shuffle(q->second.begin(), q->second.end());
1390 random_shuffle(shuffle.begin(), shuffle.end());
1391 vector<pg_shard_t>::iterator p = shuffle.begin();
1392 assert(get_osdmap()->is_up(p->osd));
1393 pg_shard_t fromshard = *p;
1394
1395 dout(7) << "pull " << soid
1396 << " v " << v
1397 << " on osds " << q->second
1398 << " from osd." << fromshard
1399 << dendl;
1400
1401 assert(peer_missing.count(fromshard));
1402 const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
1403 if (pmissing.is_missing(soid, v)) {
1404 assert(pmissing.get_items().find(soid)->second.have != v);
1405 dout(10) << "pulling soid " << soid << " from osd " << fromshard
1406 << " at version " << pmissing.get_items().find(soid)->second.have
1407 << " rather than at version " << v << dendl;
1408 v = pmissing.get_items().find(soid)->second.have;
1409 assert(get_parent()->get_log().get_log().objects.count(soid) &&
1410 (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
1411 pg_log_entry_t::LOST_REVERT) &&
1412 (get_parent()->get_log().get_log().objects.find(
1413 soid)->second->reverting_to ==
1414 v));
1415 }
1416
1417 ObjectRecoveryInfo recovery_info;
1418 ObcLockManager lock_manager;
1419
1420 if (soid.is_snap()) {
1421 assert(!get_parent()->get_local_missing().is_missing(
1422 soid.get_head()) ||
1423 !get_parent()->get_local_missing().is_missing(
1424 soid.get_snapdir()));
1425 assert(headctx);
1426 // check snapset
1427 SnapSetContext *ssc = headctx->ssc;
1428 assert(ssc);
1429 dout(10) << " snapset " << ssc->snapset << dendl;
1430 recovery_info.ss = ssc->snapset;
1431 calc_clone_subsets(
1432 ssc->snapset, soid, get_parent()->get_local_missing(),
1433 get_info().last_backfill,
1434 recovery_info.copy_subset,
1435 recovery_info.clone_subset,
1436 lock_manager);
1437 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1438 dout(10) << " pulling " << recovery_info << dendl;
1439
1440 assert(ssc->snapset.clone_size.count(soid.snap));
1441 recovery_info.size = ssc->snapset.clone_size[soid.snap];
1442 } else {
1443 // pulling head or unversioned object.
1444 // always pull the whole thing.
1445 recovery_info.copy_subset.insert(0, (uint64_t)-1);
1446 recovery_info.size = ((uint64_t)-1);
1447 }
1448
1449 h->pulls[fromshard].push_back(PullOp());
1450 PullOp &op = h->pulls[fromshard].back();
1451 op.soid = soid;
1452
1453 op.recovery_info = recovery_info;
1454 op.recovery_info.soid = soid;
1455 op.recovery_info.version = v;
1456 op.recovery_progress.data_complete = false;
1457 op.recovery_progress.omap_complete = false;
1458 op.recovery_progress.data_recovered_to = 0;
1459 op.recovery_progress.first = true;
1460
1461 assert(!pulling.count(soid));
1462 pull_from_peer[fromshard].insert(soid);
1463 PullInfo &pi = pulling[soid];
1464 pi.from = fromshard;
1465 pi.soid = soid;
1466 pi.head_ctx = headctx;
1467 pi.recovery_info = op.recovery_info;
1468 pi.recovery_progress = op.recovery_progress;
1469 pi.cache_dont_need = h->cache_dont_need;
1470 pi.lock_manager = std::move(lock_manager);
1471 }
1472
1473 /*
1474 * intelligently push an object to a replica. make use of existing
1475 * clones/heads and dup data ranges where possible.
1476 */
1477 void ReplicatedBackend::prep_push_to_replica(
1478 ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
1479 PushOp *pop, bool cache_dont_need)
1480 {
1481 const object_info_t& oi = obc->obs.oi;
1482 uint64_t size = obc->obs.oi.size;
1483
1484 dout(10) << __func__ << ": " << soid << " v" << oi.version
1485 << " size " << size << " to osd." << peer << dendl;
1486
1487 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1488 interval_set<uint64_t> data_subset;
1489
1490 ObcLockManager lock_manager;
1491 // are we doing a clone on the replica?
1492 if (soid.snap && soid.snap < CEPH_NOSNAP) {
1493 hobject_t head = soid;
1494 head.snap = CEPH_NOSNAP;
1495
1496 // try to base push off of clones that succeed/preceed poid
1497 // we need the head (and current SnapSet) locally to do that.
1498 if (get_parent()->get_local_missing().is_missing(head)) {
1499 dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
1500 return prep_push(obc, soid, peer, pop, cache_dont_need);
1501 }
1502 hobject_t snapdir = head;
1503 snapdir.snap = CEPH_SNAPDIR;
1504 if (get_parent()->get_local_missing().is_missing(snapdir)) {
1505 dout(15) << "push_to_replica missing snapdir " << snapdir
1506 << ", pushing raw clone" << dendl;
1507 return prep_push(obc, soid, peer, pop, cache_dont_need);
1508 }
1509
1510 SnapSetContext *ssc = obc->ssc;
1511 assert(ssc);
1512 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1513 pop->recovery_info.ss = ssc->snapset;
1514 map<pg_shard_t, pg_missing_t>::const_iterator pm =
1515 get_parent()->get_shard_missing().find(peer);
1516 assert(pm != get_parent()->get_shard_missing().end());
1517 map<pg_shard_t, pg_info_t>::const_iterator pi =
1518 get_parent()->get_shard_info().find(peer);
1519 assert(pi != get_parent()->get_shard_info().end());
1520 calc_clone_subsets(
1521 ssc->snapset, soid,
1522 pm->second,
1523 pi->second.last_backfill,
1524 data_subset, clone_subsets,
1525 lock_manager);
1526 } else if (soid.snap == CEPH_NOSNAP) {
1527 // pushing head or unversioned object.
1528 // base this on partially on replica's clones?
1529 SnapSetContext *ssc = obc->ssc;
1530 assert(ssc);
1531 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1532 calc_head_subsets(
1533 obc,
1534 ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
1535 get_parent()->get_shard_info().find(peer)->second.last_backfill,
1536 data_subset, clone_subsets,
1537 lock_manager);
1538 }
1539
1540 prep_push(
1541 obc,
1542 soid,
1543 peer,
1544 oi.version,
1545 data_subset,
1546 clone_subsets,
1547 pop,
1548 cache_dont_need,
1549 std::move(lock_manager));
1550 }
1551
1552 void ReplicatedBackend::prep_push(ObjectContextRef obc,
1553 const hobject_t& soid, pg_shard_t peer,
1554 PushOp *pop, bool cache_dont_need)
1555 {
1556 interval_set<uint64_t> data_subset;
1557 if (obc->obs.oi.size)
1558 data_subset.insert(0, obc->obs.oi.size);
1559 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1560
1561 prep_push(obc, soid, peer,
1562 obc->obs.oi.version, data_subset, clone_subsets,
1563 pop, cache_dont_need, ObcLockManager());
1564 }
1565
1566 void ReplicatedBackend::prep_push(
1567 ObjectContextRef obc,
1568 const hobject_t& soid, pg_shard_t peer,
1569 eversion_t version,
1570 interval_set<uint64_t> &data_subset,
1571 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1572 PushOp *pop,
1573 bool cache_dont_need,
1574 ObcLockManager &&lock_manager)
1575 {
1576 get_parent()->begin_peer_recover(peer, soid);
1577 // take note.
1578 PushInfo &pi = pushing[soid][peer];
1579 pi.obc = obc;
1580 pi.recovery_info.size = obc->obs.oi.size;
1581 pi.recovery_info.copy_subset = data_subset;
1582 pi.recovery_info.clone_subset = clone_subsets;
1583 pi.recovery_info.soid = soid;
1584 pi.recovery_info.oi = obc->obs.oi;
1585 pi.recovery_info.ss = pop->recovery_info.ss;
1586 pi.recovery_info.version = version;
1587 pi.lock_manager = std::move(lock_manager);
1588
1589 ObjectRecoveryProgress new_progress;
1590 int r = build_push_op(pi.recovery_info,
1591 pi.recovery_progress,
1592 &new_progress,
1593 pop,
1594 &(pi.stat), cache_dont_need);
1595 assert(r == 0);
1596 pi.recovery_progress = new_progress;
1597 }
1598
1599 void ReplicatedBackend::submit_push_data(
1600 const ObjectRecoveryInfo &recovery_info,
1601 bool first,
1602 bool complete,
1603 bool cache_dont_need,
1604 const interval_set<uint64_t> &intervals_included,
1605 bufferlist data_included,
1606 bufferlist omap_header,
1607 const map<string, bufferlist> &attrs,
1608 const map<string, bufferlist> &omap_entries,
1609 ObjectStore::Transaction *t)
1610 {
1611 hobject_t target_oid;
1612 if (first && complete) {
1613 target_oid = recovery_info.soid;
1614 } else {
1615 target_oid = get_parent()->get_temp_recovery_object(recovery_info.soid,
1616 recovery_info.version);
1617 if (first) {
1618 dout(10) << __func__ << ": Adding oid "
1619 << target_oid << " in the temp collection" << dendl;
1620 add_temp_obj(target_oid);
1621 }
1622 }
1623
1624 if (first) {
1625 t->remove(coll, ghobject_t(target_oid));
1626 t->touch(coll, ghobject_t(target_oid));
1627 t->truncate(coll, ghobject_t(target_oid), recovery_info.size);
1628 if (omap_header.length())
1629 t->omap_setheader(coll, ghobject_t(target_oid), omap_header);
1630
1631 bufferlist bv = attrs.at(OI_ATTR);
1632 object_info_t oi(bv);
1633 t->set_alloc_hint(coll, ghobject_t(target_oid),
1634 oi.expected_object_size,
1635 oi.expected_write_size,
1636 oi.alloc_hint_flags);
1637 }
1638 uint64_t off = 0;
1639 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
1640 if (cache_dont_need)
1641 fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
1642 for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
1643 p != intervals_included.end();
1644 ++p) {
1645 bufferlist bit;
1646 bit.substr_of(data_included, off, p.get_len());
1647 t->write(coll, ghobject_t(target_oid),
1648 p.get_start(), p.get_len(), bit, fadvise_flags);
1649 off += p.get_len();
1650 }
1651
1652 if (!omap_entries.empty())
1653 t->omap_setkeys(coll, ghobject_t(target_oid), omap_entries);
1654 if (!attrs.empty())
1655 t->setattrs(coll, ghobject_t(target_oid), attrs);
1656
1657 if (complete) {
1658 if (!first) {
1659 dout(10) << __func__ << ": Removing oid "
1660 << target_oid << " from the temp collection" << dendl;
1661 clear_temp_obj(target_oid);
1662 t->remove(coll, ghobject_t(recovery_info.soid));
1663 t->collection_move_rename(coll, ghobject_t(target_oid),
1664 coll, ghobject_t(recovery_info.soid));
1665 }
1666
1667 submit_push_complete(recovery_info, t);
1668 }
1669 }
1670
1671 void ReplicatedBackend::submit_push_complete(
1672 const ObjectRecoveryInfo &recovery_info,
1673 ObjectStore::Transaction *t)
1674 {
1675 for (map<hobject_t, interval_set<uint64_t>>::const_iterator p =
1676 recovery_info.clone_subset.begin();
1677 p != recovery_info.clone_subset.end();
1678 ++p) {
1679 for (interval_set<uint64_t>::const_iterator q = p->second.begin();
1680 q != p->second.end();
1681 ++q) {
1682 dout(15) << " clone_range " << p->first << " "
1683 << q.get_start() << "~" << q.get_len() << dendl;
1684 t->clone_range(coll, ghobject_t(p->first), ghobject_t(recovery_info.soid),
1685 q.get_start(), q.get_len(), q.get_start());
1686 }
1687 }
1688 }
1689
1690 ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
1691 const ObjectRecoveryInfo& recovery_info,
1692 SnapSetContext *ssc,
1693 ObcLockManager &manager)
1694 {
1695 if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
1696 return recovery_info;
1697 ObjectRecoveryInfo new_info = recovery_info;
1698 new_info.copy_subset.clear();
1699 new_info.clone_subset.clear();
1700 assert(ssc);
1701 get_parent()->release_locks(manager); // might already have locks
1702 calc_clone_subsets(
1703 ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
1704 get_info().last_backfill,
1705 new_info.copy_subset, new_info.clone_subset,
1706 manager);
1707 return new_info;
1708 }
1709
1710 bool ReplicatedBackend::handle_pull_response(
1711 pg_shard_t from, const PushOp &pop, PullOp *response,
1712 list<pull_complete_info> *to_continue,
1713 ObjectStore::Transaction *t)
1714 {
1715 interval_set<uint64_t> data_included = pop.data_included;
1716 bufferlist data;
1717 data = pop.data;
1718 dout(10) << "handle_pull_response "
1719 << pop.recovery_info
1720 << pop.after_progress
1721 << " data.size() is " << data.length()
1722 << " data_included: " << data_included
1723 << dendl;
1724 if (pop.version == eversion_t()) {
1725 // replica doesn't have it!
1726 _failed_push(from, pop.soid);
1727 return false;
1728 }
1729
1730 const hobject_t &hoid = pop.soid;
1731 assert((data_included.empty() && data.length() == 0) ||
1732 (!data_included.empty() && data.length() > 0));
1733
1734 auto piter = pulling.find(hoid);
1735 if (piter == pulling.end()) {
1736 return false;
1737 }
1738
1739 PullInfo &pi = piter->second;
1740 if (pi.recovery_info.size == (uint64_t(-1))) {
1741 pi.recovery_info.size = pop.recovery_info.size;
1742 pi.recovery_info.copy_subset.intersection_of(
1743 pop.recovery_info.copy_subset);
1744 }
1745
1746 bool first = pi.recovery_progress.first;
1747 if (first) {
1748 // attrs only reference the origin bufferlist (decode from
1749 // MOSDPGPush message) whose size is much greater than attrs in
1750 // recovery. If obc cache it (get_obc maybe cache the attr), this
1751 // causes the whole origin bufferlist would not be free until obc
1752 // is evicted from obc cache. So rebuild the bufferlists before
1753 // cache it.
1754 auto attrset = pop.attrset;
1755 for (auto& a : attrset) {
1756 a.second.rebuild();
1757 }
1758 pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset);
1759 pi.recovery_info.oi = pi.obc->obs.oi;
1760 pi.recovery_info = recalc_subsets(
1761 pi.recovery_info,
1762 pi.obc->ssc,
1763 pi.lock_manager);
1764 }
1765
1766
1767 interval_set<uint64_t> usable_intervals;
1768 bufferlist usable_data;
1769 trim_pushed_data(pi.recovery_info.copy_subset,
1770 data_included,
1771 data,
1772 &usable_intervals,
1773 &usable_data);
1774 data_included = usable_intervals;
1775 data.claim(usable_data);
1776
1777
1778 pi.recovery_progress = pop.after_progress;
1779
1780 dout(10) << "new recovery_info " << pi.recovery_info
1781 << ", new progress " << pi.recovery_progress
1782 << dendl;
1783
1784 bool complete = pi.is_complete();
1785
1786 submit_push_data(pi.recovery_info, first,
1787 complete, pi.cache_dont_need,
1788 data_included, data,
1789 pop.omap_header,
1790 pop.attrset,
1791 pop.omap_entries,
1792 t);
1793
1794 pi.stat.num_keys_recovered += pop.omap_entries.size();
1795 pi.stat.num_bytes_recovered += data.length();
1796
1797 if (complete) {
1798 pi.stat.num_objects_recovered++;
1799 clear_pull_from(piter);
1800 to_continue->push_back({hoid, pi.stat});
1801 get_parent()->on_local_recover(
1802 hoid, pi.recovery_info, pi.obc, t);
1803 return false;
1804 } else {
1805 response->soid = pop.soid;
1806 response->recovery_info = pi.recovery_info;
1807 response->recovery_progress = pi.recovery_progress;
1808 return true;
1809 }
1810 }
1811
1812 void ReplicatedBackend::handle_push(
1813 pg_shard_t from, const PushOp &pop, PushReplyOp *response,
1814 ObjectStore::Transaction *t)
1815 {
1816 dout(10) << "handle_push "
1817 << pop.recovery_info
1818 << pop.after_progress
1819 << dendl;
1820 bufferlist data;
1821 data = pop.data;
1822 bool first = pop.before_progress.first;
1823 bool complete = pop.after_progress.data_complete &&
1824 pop.after_progress.omap_complete;
1825
1826 response->soid = pop.recovery_info.soid;
1827 submit_push_data(pop.recovery_info,
1828 first,
1829 complete,
1830 true, // must be replicate
1831 pop.data_included,
1832 data,
1833 pop.omap_header,
1834 pop.attrset,
1835 pop.omap_entries,
1836 t);
1837
1838 if (complete)
1839 get_parent()->on_local_recover(
1840 pop.recovery_info.soid,
1841 pop.recovery_info,
1842 ObjectContextRef(), // ok, is replica
1843 t);
1844 }
1845
1846 void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
1847 {
1848 for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
1849 i != pushes.end();
1850 ++i) {
1851 ConnectionRef con = get_parent()->get_con_osd_cluster(
1852 i->first.osd,
1853 get_osdmap()->get_epoch());
1854 if (!con)
1855 continue;
1856 vector<PushOp>::iterator j = i->second.begin();
1857 while (j != i->second.end()) {
1858 uint64_t cost = 0;
1859 uint64_t pushes = 0;
1860 MOSDPGPush *msg = new MOSDPGPush();
1861 msg->from = get_parent()->whoami_shard();
1862 msg->pgid = get_parent()->primary_spg_t();
1863 msg->map_epoch = get_osdmap()->get_epoch();
1864 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1865 msg->set_priority(prio);
1866 for (;
1867 (j != i->second.end() &&
1868 cost < cct->_conf->osd_max_push_cost &&
1869 pushes < cct->_conf->osd_max_push_objects) ;
1870 ++j) {
1871 dout(20) << __func__ << ": sending push " << *j
1872 << " to osd." << i->first << dendl;
1873 cost += j->cost(cct);
1874 pushes += 1;
1875 msg->pushes.push_back(*j);
1876 }
1877 msg->set_cost(cost);
1878 get_parent()->send_message_osd_cluster(msg, con);
1879 }
1880 }
1881 }
1882
1883 void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
1884 {
1885 for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
1886 i != pulls.end();
1887 ++i) {
1888 ConnectionRef con = get_parent()->get_con_osd_cluster(
1889 i->first.osd,
1890 get_osdmap()->get_epoch());
1891 if (!con)
1892 continue;
1893 dout(20) << __func__ << ": sending pulls " << i->second
1894 << " to osd." << i->first << dendl;
1895 MOSDPGPull *msg = new MOSDPGPull();
1896 msg->from = parent->whoami_shard();
1897 msg->set_priority(prio);
1898 msg->pgid = get_parent()->primary_spg_t();
1899 msg->map_epoch = get_osdmap()->get_epoch();
1900 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1901 msg->set_pulls(&i->second);
1902 msg->compute_cost(cct);
1903 get_parent()->send_message_osd_cluster(msg, con);
1904 }
1905 }
1906
1907 int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
1908 const ObjectRecoveryProgress &progress,
1909 ObjectRecoveryProgress *out_progress,
1910 PushOp *out_op,
1911 object_stat_sum_t *stat,
1912 bool cache_dont_need)
1913 {
1914 ObjectRecoveryProgress _new_progress;
1915 if (!out_progress)
1916 out_progress = &_new_progress;
1917 ObjectRecoveryProgress &new_progress = *out_progress;
1918 new_progress = progress;
1919
1920 dout(7) << "send_push_op " << recovery_info.soid
1921 << " v " << recovery_info.version
1922 << " size " << recovery_info.size
1923 << " recovery_info: " << recovery_info
1924 << dendl;
1925
1926 if (progress.first) {
1927 int r = store->omap_get_header(coll, ghobject_t(recovery_info.soid), &out_op->omap_header);
1928 if(r < 0) {
1929 dout(1) << __func__ << " get omap header failed: " << cpp_strerror(-r) << dendl;
1930 return r;
1931 }
1932 r = store->getattrs(ch, ghobject_t(recovery_info.soid), out_op->attrset);
1933 if(r < 0) {
1934 dout(1) << __func__ << " getattrs failed: " << cpp_strerror(-r) << dendl;
1935 return r;
1936 }
1937
1938 // Debug
1939 bufferlist bv = out_op->attrset[OI_ATTR];
1940 object_info_t oi(bv);
1941
1942 if (oi.version != recovery_info.version) {
1943 get_parent()->clog_error() << get_info().pgid << " push "
1944 << recovery_info.soid << " v "
1945 << recovery_info.version
1946 << " failed because local copy is "
1947 << oi.version;
1948 return -EINVAL;
1949 }
1950
1951 new_progress.first = false;
1952 }
1953
1954 uint64_t available = cct->_conf->osd_recovery_max_chunk;
1955 if (!progress.omap_complete) {
1956 ObjectMap::ObjectMapIterator iter =
1957 store->get_omap_iterator(coll,
1958 ghobject_t(recovery_info.soid));
1959 assert(iter);
1960 for (iter->lower_bound(progress.omap_recovered_to);
1961 iter->valid();
1962 iter->next(false)) {
1963 if (!out_op->omap_entries.empty() &&
1964 ((cct->_conf->osd_recovery_max_omap_entries_per_chunk > 0 &&
1965 out_op->omap_entries.size() >= cct->_conf->osd_recovery_max_omap_entries_per_chunk) ||
1966 available <= iter->key().size() + iter->value().length()))
1967 break;
1968 out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
1969
1970 if ((iter->key().size() + iter->value().length()) <= available)
1971 available -= (iter->key().size() + iter->value().length());
1972 else
1973 available = 0;
1974 }
1975 if (!iter->valid())
1976 new_progress.omap_complete = true;
1977 else
1978 new_progress.omap_recovered_to = iter->key();
1979 }
1980
1981 if (available > 0) {
1982 if (!recovery_info.copy_subset.empty()) {
1983 interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
1984 map<uint64_t, uint64_t> m;
1985 int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0,
1986 copy_subset.range_end(), m);
1987 if (r >= 0) {
1988 interval_set<uint64_t> fiemap_included(m);
1989 copy_subset.intersection_of(fiemap_included);
1990 } else {
1991 // intersection of copy_subset and empty interval_set would be empty anyway
1992 copy_subset.clear();
1993 }
1994
1995 out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
1996 available);
1997 if (out_op->data_included.empty()) // zero filled section, skip to end!
1998 new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
1999 else
2000 new_progress.data_recovered_to = out_op->data_included.range_end();
2001 }
2002 } else {
2003 out_op->data_included.clear();
2004 }
2005
2006 for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
2007 p != out_op->data_included.end();
2008 ++p) {
2009 bufferlist bit;
2010 store->read(ch, ghobject_t(recovery_info.soid),
2011 p.get_start(), p.get_len(), bit,
2012 cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
2013 if (p.get_len() != bit.length()) {
2014 dout(10) << " extent " << p.get_start() << "~" << p.get_len()
2015 << " is actually " << p.get_start() << "~" << bit.length()
2016 << dendl;
2017 interval_set<uint64_t>::iterator save = p++;
2018 if (bit.length() == 0)
2019 out_op->data_included.erase(save); //Remove this empty interval
2020 else
2021 save.set_len(bit.length());
2022 // Remove any other intervals present
2023 while (p != out_op->data_included.end()) {
2024 interval_set<uint64_t>::iterator save = p++;
2025 out_op->data_included.erase(save);
2026 }
2027 new_progress.data_complete = true;
2028 out_op->data.claim_append(bit);
2029 break;
2030 }
2031 out_op->data.claim_append(bit);
2032 }
2033
2034 if (new_progress.is_complete(recovery_info)) {
2035 new_progress.data_complete = true;
2036 if (stat)
2037 stat->num_objects_recovered++;
2038 }
2039
2040 if (stat) {
2041 stat->num_keys_recovered += out_op->omap_entries.size();
2042 stat->num_bytes_recovered += out_op->data.length();
2043 }
2044
2045 get_parent()->get_logger()->inc(l_osd_push);
2046 get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
2047
2048 // send
2049 out_op->version = recovery_info.version;
2050 out_op->soid = recovery_info.soid;
2051 out_op->recovery_info = recovery_info;
2052 out_op->after_progress = new_progress;
2053 out_op->before_progress = progress;
2054 return 0;
2055 }
2056
2057 void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
2058 {
2059 op->recovery_info.version = eversion_t();
2060 op->version = eversion_t();
2061 op->soid = soid;
2062 }
2063
2064 bool ReplicatedBackend::handle_push_reply(
2065 pg_shard_t peer, const PushReplyOp &op, PushOp *reply)
2066 {
2067 const hobject_t &soid = op.soid;
2068 if (pushing.count(soid) == 0) {
2069 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2070 << ", or anybody else"
2071 << dendl;
2072 return false;
2073 } else if (pushing[soid].count(peer) == 0) {
2074 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2075 << dendl;
2076 return false;
2077 } else {
2078 PushInfo *pi = &pushing[soid][peer];
2079
2080 if (!pi->recovery_progress.data_complete) {
2081 dout(10) << " pushing more from, "
2082 << pi->recovery_progress.data_recovered_to
2083 << " of " << pi->recovery_info.copy_subset << dendl;
2084 ObjectRecoveryProgress new_progress;
2085 int r = build_push_op(
2086 pi->recovery_info,
2087 pi->recovery_progress, &new_progress, reply,
2088 &(pi->stat));
2089 assert(r == 0);
2090 pi->recovery_progress = new_progress;
2091 return true;
2092 } else {
2093 // done!
2094 get_parent()->on_peer_recover(
2095 peer, soid, pi->recovery_info);
2096
2097 get_parent()->release_locks(pi->lock_manager);
2098 object_stat_sum_t stat = pi->stat;
2099 pushing[soid].erase(peer);
2100 pi = NULL;
2101
2102 if (pushing[soid].empty()) {
2103 get_parent()->on_global_recover(soid, stat);
2104 pushing.erase(soid);
2105 } else {
2106 dout(10) << "pushed " << soid << ", still waiting for push ack from "
2107 << pushing[soid].size() << " others" << dendl;
2108 }
2109 return false;
2110 }
2111 }
2112 }
2113
2114 void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
2115 {
2116 const hobject_t &soid = op.soid;
2117 struct stat st;
2118 int r = store->stat(ch, ghobject_t(soid), &st);
2119 if (r != 0) {
2120 get_parent()->clog_error() << get_info().pgid << " "
2121 << peer << " tried to pull " << soid
2122 << " but got " << cpp_strerror(-r);
2123 prep_push_op_blank(soid, reply);
2124 } else {
2125 ObjectRecoveryInfo &recovery_info = op.recovery_info;
2126 ObjectRecoveryProgress &progress = op.recovery_progress;
2127 if (progress.first && recovery_info.size == ((uint64_t)-1)) {
2128 // Adjust size and copy_subset
2129 recovery_info.size = st.st_size;
2130 recovery_info.copy_subset.clear();
2131 if (st.st_size)
2132 recovery_info.copy_subset.insert(0, st.st_size);
2133 assert(recovery_info.clone_subset.empty());
2134 }
2135
2136 r = build_push_op(recovery_info, progress, 0, reply);
2137 if (r < 0)
2138 prep_push_op_blank(soid, reply);
2139 }
2140 }
2141
2142 /**
2143 * trim received data to remove what we don't want
2144 *
2145 * @param copy_subset intervals we want
2146 * @param data_included intervals we got
2147 * @param data_recieved data we got
2148 * @param intervals_usable intervals we want to keep
2149 * @param data_usable matching data we want to keep
2150 */
2151 void ReplicatedBackend::trim_pushed_data(
2152 const interval_set<uint64_t> &copy_subset,
2153 const interval_set<uint64_t> &intervals_received,
2154 bufferlist data_received,
2155 interval_set<uint64_t> *intervals_usable,
2156 bufferlist *data_usable)
2157 {
2158 if (intervals_received.subset_of(copy_subset)) {
2159 *intervals_usable = intervals_received;
2160 *data_usable = data_received;
2161 return;
2162 }
2163
2164 intervals_usable->intersection_of(copy_subset,
2165 intervals_received);
2166
2167 uint64_t off = 0;
2168 for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
2169 p != intervals_received.end();
2170 ++p) {
2171 interval_set<uint64_t> x;
2172 x.insert(p.get_start(), p.get_len());
2173 x.intersection_of(copy_subset);
2174 for (interval_set<uint64_t>::const_iterator q = x.begin();
2175 q != x.end();
2176 ++q) {
2177 bufferlist sub;
2178 uint64_t data_off = off + (q.get_start() - p.get_start());
2179 sub.substr_of(data_received, data_off, q.get_len());
2180 data_usable->claim_append(sub);
2181 }
2182 off += p.get_len();
2183 }
2184 }
2185
2186 void ReplicatedBackend::_failed_push(pg_shard_t from, const hobject_t &soid)
2187 {
2188 list<pg_shard_t> fl = { from };
2189 get_parent()->failed_push(fl, soid);
2190
2191 clear_pull(pulling.find(soid));
2192 }
2193
2194 void ReplicatedBackend::clear_pull_from(
2195 map<hobject_t, PullInfo>::iterator piter)
2196 {
2197 auto from = piter->second.from;
2198 pull_from_peer[from].erase(piter->second.soid);
2199 if (pull_from_peer[from].empty())
2200 pull_from_peer.erase(from);
2201 }
2202
2203 void ReplicatedBackend::clear_pull(
2204 map<hobject_t, PullInfo>::iterator piter,
2205 bool clear_pull_from_peer)
2206 {
2207 if (clear_pull_from_peer) {
2208 clear_pull_from(piter);
2209 }
2210 get_parent()->release_locks(piter->second.lock_manager);
2211 pulling.erase(piter);
2212 }
2213
2214 int ReplicatedBackend::start_pushes(
2215 const hobject_t &soid,
2216 ObjectContextRef obc,
2217 RPGHandle *h)
2218 {
2219 int pushes = 0;
2220 // who needs it?
2221 assert(get_parent()->get_actingbackfill_shards().size() > 0);
2222 for (set<pg_shard_t>::iterator i =
2223 get_parent()->get_actingbackfill_shards().begin();
2224 i != get_parent()->get_actingbackfill_shards().end();
2225 ++i) {
2226 if (*i == get_parent()->whoami_shard()) continue;
2227 pg_shard_t peer = *i;
2228 map<pg_shard_t, pg_missing_t>::const_iterator j =
2229 get_parent()->get_shard_missing().find(peer);
2230 assert(j != get_parent()->get_shard_missing().end());
2231 if (j->second.is_missing(soid)) {
2232 ++pushes;
2233 h->pushes[peer].push_back(PushOp());
2234 prep_push_to_replica(obc, soid, peer,
2235 &(h->pushes[peer].back()), h->cache_dont_need);
2236 }
2237 }
2238 return pushes;
2239 }