]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/ReplicatedBackend.cc
update sources to v12.1.1
[ceph.git] / ceph / src / osd / ReplicatedBackend.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14#include "common/errno.h"
15#include "ReplicatedBackend.h"
16#include "messages/MOSDOp.h"
17#include "messages/MOSDSubOp.h"
18#include "messages/MOSDRepOp.h"
19#include "messages/MOSDSubOpReply.h"
20#include "messages/MOSDRepOpReply.h"
21#include "messages/MOSDPGPush.h"
22#include "messages/MOSDPGPull.h"
23#include "messages/MOSDPGPushReply.h"
24#include "common/EventTrace.h"
25
26#define dout_context cct
27#define dout_subsys ceph_subsys_osd
28#define DOUT_PREFIX_ARGS this
29#undef dout_prefix
30#define dout_prefix _prefix(_dout, this)
31static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
32 return *_dout << pgb->get_parent()->gen_dbg_prefix();
33}
34
35namespace {
36class PG_SendMessageOnConn: public Context {
37 PGBackend::Listener *pg;
38 Message *reply;
39 ConnectionRef conn;
40 public:
41 PG_SendMessageOnConn(
42 PGBackend::Listener *pg,
43 Message *reply,
44 ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {}
45 void finish(int) override {
46 pg->send_message_osd_cluster(reply, conn.get());
47 }
48};
49
50class PG_RecoveryQueueAsync : public Context {
51 PGBackend::Listener *pg;
52 unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
53 public:
54 PG_RecoveryQueueAsync(
55 PGBackend::Listener *pg,
56 GenContext<ThreadPool::TPHandle&> *c) : pg(pg), c(c) {}
57 void finish(int) override {
58 pg->schedule_recovery_work(c.release());
59 }
60};
61}
62
63struct ReplicatedBackend::C_OSD_RepModifyApply : public Context {
64 ReplicatedBackend *pg;
65 RepModifyRef rm;
66 C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r)
67 : pg(pg), rm(r) {}
68 void finish(int r) override {
69 pg->repop_applied(rm);
70 }
71};
72
73struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context {
74 ReplicatedBackend *pg;
75 RepModifyRef rm;
76 C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
77 : pg(pg), rm(r) {}
78 void finish(int r) override {
79 pg->repop_commit(rm);
80 }
81};
82
83static void log_subop_stats(
84 PerfCounters *logger,
85 OpRequestRef op, int subop)
86{
87 utime_t now = ceph_clock_now();
88 utime_t latency = now;
89 latency -= op->get_req()->get_recv_stamp();
90
91
92 logger->inc(l_osd_sop);
93 logger->tinc(l_osd_sop_lat, latency);
94 logger->inc(subop);
95
96 if (subop != l_osd_sop_pull) {
97 uint64_t inb = op->get_req()->get_data().length();
98 logger->inc(l_osd_sop_inb, inb);
99 if (subop == l_osd_sop_w) {
100 logger->inc(l_osd_sop_w_inb, inb);
101 logger->tinc(l_osd_sop_w_lat, latency);
102 } else if (subop == l_osd_sop_push) {
103 logger->inc(l_osd_sop_push_inb, inb);
104 logger->tinc(l_osd_sop_push_lat, latency);
105 } else
106 assert("no support subop" == 0);
107 } else {
108 logger->tinc(l_osd_sop_pull_lat, latency);
109 }
110}
111
112ReplicatedBackend::ReplicatedBackend(
113 PGBackend::Listener *pg,
114 coll_t coll,
115 ObjectStore::CollectionHandle &c,
116 ObjectStore *store,
117 CephContext *cct) :
118 PGBackend(cct, pg, store, coll, c) {}
119
120void ReplicatedBackend::run_recovery_op(
121 PGBackend::RecoveryHandle *_h,
122 int priority)
123{
124 RPGHandle *h = static_cast<RPGHandle *>(_h);
125 send_pushes(priority, h->pushes);
126 send_pulls(priority, h->pulls);
127 delete h;
128}
129
224ce89b 130int ReplicatedBackend::recover_object(
7c673cae
FG
131 const hobject_t &hoid,
132 eversion_t v,
133 ObjectContextRef head,
134 ObjectContextRef obc,
135 RecoveryHandle *_h
136 )
137{
138 dout(10) << __func__ << ": " << hoid << dendl;
139 RPGHandle *h = static_cast<RPGHandle *>(_h);
140 if (get_parent()->get_local_missing().is_missing(hoid)) {
141 assert(!obc);
142 // pull
143 prepare_pull(
144 v,
145 hoid,
146 head,
147 h);
7c673cae
FG
148 } else {
149 assert(obc);
150 int started = start_pushes(
151 hoid,
152 obc,
153 h);
224ce89b
WB
154 if (started < 0) {
155 pushing[hoid].clear();
156 return started;
157 }
7c673cae 158 }
224ce89b 159 return 0;
7c673cae
FG
160}
161
162void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
163{
164 for(map<pg_shard_t, set<hobject_t> >::iterator i = pull_from_peer.begin();
165 i != pull_from_peer.end();
166 ) {
167 if (osdmap->is_down(i->first.osd)) {
168 dout(10) << "check_recovery_sources resetting pulls from osd." << i->first
169 << ", osdmap has it marked down" << dendl;
170 for (set<hobject_t>::iterator j = i->second.begin();
171 j != i->second.end();
172 ++j) {
173 get_parent()->cancel_pull(*j);
174 clear_pull(pulling.find(*j), false);
175 }
176 pull_from_peer.erase(i++);
177 } else {
178 ++i;
179 }
180 }
181}
182
183bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op)
184{
185 dout(10) << __func__ << ": " << op << dendl;
186 switch (op->get_req()->get_type()) {
187 case MSG_OSD_PG_PULL:
188 return true;
189 default:
190 return false;
191 }
192}
193
194bool ReplicatedBackend::handle_message(
195 OpRequestRef op
196 )
197{
198 dout(10) << __func__ << ": " << op << dendl;
199 switch (op->get_req()->get_type()) {
200 case MSG_OSD_PG_PUSH:
201 do_push(op);
202 return true;
203
204 case MSG_OSD_PG_PULL:
205 do_pull(op);
206 return true;
207
208 case MSG_OSD_PG_PUSH_REPLY:
209 do_push_reply(op);
210 return true;
211
212 case MSG_OSD_SUBOP: {
213 const MOSDSubOp *m = static_cast<const MOSDSubOp*>(op->get_req());
214 if (m->ops.size() == 0) {
215 assert(0);
216 }
217 break;
218 }
219
220 case MSG_OSD_REPOP: {
221 do_repop(op);
222 return true;
223 }
224
225 case MSG_OSD_REPOPREPLY: {
226 do_repop_reply(op);
227 return true;
228 }
229
230 default:
231 break;
232 }
233 return false;
234}
235
236void ReplicatedBackend::clear_recovery_state()
237{
238 // clear pushing/pulling maps
239 for (auto &&i: pushing) {
240 for (auto &&j: i.second) {
241 get_parent()->release_locks(j.second.lock_manager);
242 }
243 }
244 pushing.clear();
245
246 for (auto &&i: pulling) {
247 get_parent()->release_locks(i.second.lock_manager);
248 }
249 pulling.clear();
250 pull_from_peer.clear();
251}
252
253void ReplicatedBackend::on_change()
254{
255 dout(10) << __func__ << dendl;
256 for (map<ceph_tid_t, InProgressOp>::iterator i = in_progress_ops.begin();
257 i != in_progress_ops.end();
258 in_progress_ops.erase(i++)) {
259 if (i->second.on_commit)
260 delete i->second.on_commit;
261 if (i->second.on_applied)
262 delete i->second.on_applied;
263 }
264 clear_recovery_state();
265}
266
267void ReplicatedBackend::on_flushed()
268{
269}
270
271int ReplicatedBackend::objects_read_sync(
272 const hobject_t &hoid,
273 uint64_t off,
274 uint64_t len,
275 uint32_t op_flags,
276 bufferlist *bl)
277{
278 return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags);
279}
280
281struct AsyncReadCallback : public GenContext<ThreadPool::TPHandle&> {
282 int r;
283 Context *c;
284 AsyncReadCallback(int r, Context *c) : r(r), c(c) {}
285 void finish(ThreadPool::TPHandle&) override {
286 c->complete(r);
287 c = NULL;
288 }
289 ~AsyncReadCallback() override {
290 delete c;
291 }
292};
293void ReplicatedBackend::objects_read_async(
294 const hobject_t &hoid,
295 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
296 pair<bufferlist*, Context*> > > &to_read,
297 Context *on_complete,
298 bool fast_read)
299{
300 // There is no fast read implementation for replication backend yet
301 assert(!fast_read);
302
303 int r = 0;
304 for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
305 pair<bufferlist*, Context*> > >::const_iterator i =
306 to_read.begin();
307 i != to_read.end() && r >= 0;
308 ++i) {
309 int _r = store->read(ch, ghobject_t(hoid), i->first.get<0>(),
310 i->first.get<1>(), *(i->second.first),
311 i->first.get<2>());
312 if (i->second.second) {
313 get_parent()->schedule_recovery_work(
314 get_parent()->bless_gencontext(
315 new AsyncReadCallback(_r, i->second.second)));
316 }
317 if (_r < 0)
318 r = _r;
319 }
320 get_parent()->schedule_recovery_work(
321 get_parent()->bless_gencontext(
322 new AsyncReadCallback(r, on_complete)));
323}
324
325class C_OSD_OnOpCommit : public Context {
326 ReplicatedBackend *pg;
327 ReplicatedBackend::InProgressOp *op;
328public:
329 C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
330 : pg(pg), op(op) {}
331 void finish(int) override {
332 pg->op_commit(op);
333 }
334};
335
336class C_OSD_OnOpApplied : public Context {
337 ReplicatedBackend *pg;
338 ReplicatedBackend::InProgressOp *op;
339public:
340 C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
341 : pg(pg), op(op) {}
342 void finish(int) override {
343 pg->op_applied(op);
344 }
345};
346
347void generate_transaction(
348 PGTransactionUPtr &pgt,
349 const coll_t &coll,
350 bool legacy_log_entries,
351 vector<pg_log_entry_t> &log_entries,
352 ObjectStore::Transaction *t,
353 set<hobject_t> *added,
354 set<hobject_t> *removed)
355{
356 assert(t);
357 assert(added);
358 assert(removed);
359
360 for (auto &&le: log_entries) {
361 le.mark_unrollbackable();
362 auto oiter = pgt->op_map.find(le.soid);
363 if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) {
31f18b77
FG
364 bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8);
365 ::encode(oiter->second.updated_snaps->second, bl);
366 le.snaps.swap(bl);
367 le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
7c673cae
FG
368 }
369 }
370
371 pgt->safe_create_traverse(
372 [&](pair<const hobject_t, PGTransaction::ObjectOperation> &obj_op) {
373 const hobject_t &oid = obj_op.first;
374 const ghobject_t goid =
375 ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
376 const PGTransaction::ObjectOperation &op = obj_op.second;
377
378 if (oid.is_temp()) {
379 if (op.is_fresh_object()) {
380 added->insert(oid);
381 } else if (op.is_delete()) {
382 removed->insert(oid);
383 }
384 }
385
386 if (op.delete_first) {
387 t->remove(coll, goid);
388 }
389
390 match(
391 op.init_type,
392 [&](const PGTransaction::ObjectOperation::Init::None &) {
393 },
394 [&](const PGTransaction::ObjectOperation::Init::Create &op) {
395 t->touch(coll, goid);
396 },
397 [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
398 t->clone(
399 coll,
400 ghobject_t(
401 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
402 goid);
403 },
404 [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
405 assert(op.source.is_temp());
406 t->collection_move_rename(
407 coll,
408 ghobject_t(
409 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
410 coll,
411 goid);
412 });
413
414 if (op.truncate) {
415 t->truncate(coll, goid, op.truncate->first);
416 if (op.truncate->first != op.truncate->second)
417 t->truncate(coll, goid, op.truncate->second);
418 }
419
420 if (!op.attr_updates.empty()) {
421 map<string, bufferlist> attrs;
422 for (auto &&p: op.attr_updates) {
423 if (p.second)
424 attrs[p.first] = *(p.second);
425 else
426 t->rmattr(coll, goid, p.first);
427 }
428 t->setattrs(coll, goid, attrs);
429 }
430
431 if (op.clear_omap)
432 t->omap_clear(coll, goid);
433 if (op.omap_header)
434 t->omap_setheader(coll, goid, *(op.omap_header));
435
436 for (auto &&up: op.omap_updates) {
437 using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType;
438 switch (up.first) {
439 case UpdateType::Remove:
440 t->omap_rmkeys(coll, goid, up.second);
441 break;
442 case UpdateType::Insert:
443 t->omap_setkeys(coll, goid, up.second);
444 break;
445 }
446 }
447
448 // updated_snaps doesn't matter since we marked unrollbackable
449
450 if (op.alloc_hint) {
451 auto &hint = *(op.alloc_hint);
452 t->set_alloc_hint(
453 coll,
454 goid,
455 hint.expected_object_size,
456 hint.expected_write_size,
457 hint.flags);
458 }
459
460 for (auto &&extent: op.buffer_updates) {
461 using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
462 match(
463 extent.get_val(),
464 [&](const BufferUpdate::Write &op) {
465 t->write(
466 coll,
467 goid,
468 extent.get_off(),
469 extent.get_len(),
470 op.buffer);
471 },
472 [&](const BufferUpdate::Zero &op) {
473 t->zero(
474 coll,
475 goid,
476 extent.get_off(),
477 extent.get_len());
478 },
479 [&](const BufferUpdate::CloneRange &op) {
480 assert(op.len == extent.get_len());
481 t->clone_range(
482 coll,
483 ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
484 goid,
485 op.offset,
486 extent.get_len(),
487 extent.get_off());
488 });
489 }
490 });
491}
492
493void ReplicatedBackend::submit_transaction(
494 const hobject_t &soid,
495 const object_stat_sum_t &delta_stats,
496 const eversion_t &at_version,
497 PGTransactionUPtr &&_t,
498 const eversion_t &trim_to,
499 const eversion_t &roll_forward_to,
500 const vector<pg_log_entry_t> &_log_entries,
501 boost::optional<pg_hit_set_history_t> &hset_history,
502 Context *on_local_applied_sync,
503 Context *on_all_acked,
504 Context *on_all_commit,
505 ceph_tid_t tid,
506 osd_reqid_t reqid,
507 OpRequestRef orig_op)
508{
509 parent->apply_stats(
510 soid,
511 delta_stats);
512
513 vector<pg_log_entry_t> log_entries(_log_entries);
514 ObjectStore::Transaction op_t;
515 PGTransactionUPtr t(std::move(_t));
516 set<hobject_t> added, removed;
517 generate_transaction(
518 t,
519 coll,
31f18b77 520 (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
7c673cae
FG
521 log_entries,
522 &op_t,
523 &added,
524 &removed);
525 assert(added.size() <= 1);
526 assert(removed.size() <= 1);
527
528 assert(!in_progress_ops.count(tid));
529 InProgressOp &op = in_progress_ops.insert(
530 make_pair(
531 tid,
532 InProgressOp(
533 tid, on_all_commit, on_all_acked,
534 orig_op, at_version)
535 )
536 ).first->second;
537
538 op.waiting_for_applied.insert(
539 parent->get_actingbackfill_shards().begin(),
540 parent->get_actingbackfill_shards().end());
541 op.waiting_for_commit.insert(
542 parent->get_actingbackfill_shards().begin(),
543 parent->get_actingbackfill_shards().end());
544
545 issue_op(
546 soid,
547 at_version,
548 tid,
549 reqid,
550 trim_to,
551 at_version,
552 added.size() ? *(added.begin()) : hobject_t(),
553 removed.size() ? *(removed.begin()) : hobject_t(),
554 log_entries,
555 hset_history,
556 &op,
557 op_t);
558
559 add_temp_objs(added);
560 clear_temp_objs(removed);
561
562 parent->log_operation(
563 log_entries,
564 hset_history,
565 trim_to,
566 at_version,
567 true,
568 op_t);
569
570 op_t.register_on_applied_sync(on_local_applied_sync);
571 op_t.register_on_applied(
572 parent->bless_context(
573 new C_OSD_OnOpApplied(this, &op)));
574 op_t.register_on_commit(
575 parent->bless_context(
576 new C_OSD_OnOpCommit(this, &op)));
577
578 vector<ObjectStore::Transaction> tls;
579 tls.push_back(std::move(op_t));
580
581 parent->queue_transactions(tls, op.op);
582}
583
584void ReplicatedBackend::op_applied(
585 InProgressOp *op)
586{
587 FUNCTRACE();
588 OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_APPLIED_BEGIN", true);
589 dout(10) << __func__ << ": " << op->tid << dendl;
590 if (op->op) {
591 op->op->mark_event("op_applied");
592 op->op->pg_trace.event("op applied");
593 }
594
595 op->waiting_for_applied.erase(get_parent()->whoami_shard());
596 parent->op_applied(op->v);
597
598 if (op->waiting_for_applied.empty()) {
599 op->on_applied->complete(0);
600 op->on_applied = 0;
601 }
602 if (op->done()) {
603 assert(!op->on_commit && !op->on_applied);
604 in_progress_ops.erase(op->tid);
605 }
606}
607
608void ReplicatedBackend::op_commit(
609 InProgressOp *op)
610{
611 FUNCTRACE();
612 OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_COMMIT_BEGIN", true);
613 dout(10) << __func__ << ": " << op->tid << dendl;
614 if (op->op) {
615 op->op->mark_event("op_commit");
616 op->op->pg_trace.event("op commit");
617 }
618
619 op->waiting_for_commit.erase(get_parent()->whoami_shard());
620
621 if (op->waiting_for_commit.empty()) {
622 op->on_commit->complete(0);
623 op->on_commit = 0;
624 }
625 if (op->done()) {
626 assert(!op->on_commit && !op->on_applied);
627 in_progress_ops.erase(op->tid);
628 }
629}
630
631void ReplicatedBackend::do_repop_reply(OpRequestRef op)
632{
633 static_cast<MOSDRepOpReply*>(op->get_nonconst_req())->finish_decode();
634 const MOSDRepOpReply *r = static_cast<const MOSDRepOpReply *>(op->get_req());
635 assert(r->get_header().type == MSG_OSD_REPOPREPLY);
636
637 op->mark_started();
638
639 // must be replication.
640 ceph_tid_t rep_tid = r->get_tid();
641 pg_shard_t from = r->from;
642
643 if (in_progress_ops.count(rep_tid)) {
644 map<ceph_tid_t, InProgressOp>::iterator iter =
645 in_progress_ops.find(rep_tid);
646 InProgressOp &ip_op = iter->second;
647 const MOSDOp *m = NULL;
648 if (ip_op.op)
649 m = static_cast<const MOSDOp *>(ip_op.op->get_req());
650
651 if (m)
652 dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
653 << " ack_type " << (int)r->ack_type
654 << " from " << from
655 << dendl;
656 else
657 dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) "
658 << " ack_type " << (int)r->ack_type
659 << " from " << from
660 << dendl;
661
662 // oh, good.
663
664 if (r->ack_type & CEPH_OSD_FLAG_ONDISK) {
665 assert(ip_op.waiting_for_commit.count(from));
666 ip_op.waiting_for_commit.erase(from);
667 if (ip_op.op) {
668 ostringstream ss;
669 ss << "sub_op_commit_rec from " << from;
670 ip_op.op->mark_event_string(ss.str());
671 ip_op.op->pg_trace.event("sub_op_commit_rec");
672 }
673 } else {
674 assert(ip_op.waiting_for_applied.count(from));
675 if (ip_op.op) {
676 ostringstream ss;
677 ss << "sub_op_applied_rec from " << from;
678 ip_op.op->mark_event_string(ss.str());
679 ip_op.op->pg_trace.event("sub_op_applied_rec");
680 }
681 }
682 ip_op.waiting_for_applied.erase(from);
683
684 parent->update_peer_last_complete_ondisk(
685 from,
686 r->get_last_complete_ondisk());
687
688 if (ip_op.waiting_for_applied.empty() &&
689 ip_op.on_applied) {
690 ip_op.on_applied->complete(0);
691 ip_op.on_applied = 0;
692 }
693 if (ip_op.waiting_for_commit.empty() &&
694 ip_op.on_commit) {
695 ip_op.on_commit->complete(0);
696 ip_op.on_commit= 0;
697 }
698 if (ip_op.done()) {
699 assert(!ip_op.on_commit && !ip_op.on_applied);
700 in_progress_ops.erase(iter);
701 }
702 }
703}
704
705void ReplicatedBackend::be_deep_scrub(
706 const hobject_t &poid,
707 uint32_t seed,
708 ScrubMap::object &o,
709 ThreadPool::TPHandle &handle)
710{
711 dout(10) << __func__ << " " << poid << " seed "
712 << std::hex << seed << std::dec << dendl;
713 bufferhash h(seed), oh(seed);
714 bufferlist bl, hdrbl;
715 int r;
716 __u64 pos = 0;
717
718 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
719
720 while (true) {
721 handle.reset_tp_timeout();
722 r = store->read(
723 ch,
724 ghobject_t(
725 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
726 pos,
727 cct->_conf->osd_deep_scrub_stride, bl,
224ce89b 728 fadvise_flags);
7c673cae
FG
729 if (r <= 0)
730 break;
731
732 h << bl;
733 pos += bl.length();
734 bl.clear();
735 }
736 if (r == -EIO) {
737 dout(25) << __func__ << " " << poid << " got "
738 << r << " on read, read_error" << dendl;
739 o.read_error = true;
740 return;
741 }
742 o.digest = h.digest();
743 o.digest_present = true;
744
745 bl.clear();
746 r = store->omap_get_header(
747 coll,
748 ghobject_t(
749 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
750 &hdrbl, true);
751 // NOTE: bobtail to giant, we would crc the head as (len, head).
752 // that changes at the same time we start using a non-zero seed.
753 if (r == 0 && hdrbl.length()) {
754 dout(25) << "CRC header " << string(hdrbl.c_str(), hdrbl.length())
755 << dendl;
756 if (seed == 0) {
757 // legacy
758 bufferlist bl;
759 ::encode(hdrbl, bl);
760 oh << bl;
761 } else {
762 oh << hdrbl;
763 }
764 } else if (r == -EIO) {
765 dout(25) << __func__ << " " << poid << " got "
766 << r << " on omap header read, read_error" << dendl;
767 o.read_error = true;
768 return;
769 }
770
771 ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(
772 coll,
773 ghobject_t(
774 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
775 assert(iter);
7c673cae
FG
776 for (iter->seek_to_first(); iter->status() == 0 && iter->valid();
777 iter->next(false)) {
224ce89b 778 handle.reset_tp_timeout();
7c673cae
FG
779
780 dout(25) << "CRC key " << iter->key() << " value:\n";
781 iter->value().hexdump(*_dout);
782 *_dout << dendl;
783
784 ::encode(iter->key(), bl);
785 ::encode(iter->value(), bl);
786 oh << bl;
787 bl.clear();
788 }
789
790 if (iter->status() < 0) {
791 dout(25) << __func__ << " " << poid
792 << " on omap scan, db status error" << dendl;
793 o.read_error = true;
794 return;
795 }
796
797 //Store final calculated CRC32 of omap header & key/values
798 o.omap_digest = oh.digest();
799 o.omap_digest_present = true;
800 dout(20) << __func__ << " " << poid << " omap_digest "
801 << std::hex << o.omap_digest << std::dec << dendl;
802}
803
804void ReplicatedBackend::_do_push(OpRequestRef op)
805{
806 const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
807 assert(m->get_type() == MSG_OSD_PG_PUSH);
808 pg_shard_t from = m->from;
809
810 op->mark_started();
811
812 vector<PushReplyOp> replies;
813 ObjectStore::Transaction t;
814 ostringstream ss;
815 if (get_parent()->check_failsafe_full(ss)) {
816 dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl;
817 ceph_abort();
818 }
819 for (vector<PushOp>::const_iterator i = m->pushes.begin();
820 i != m->pushes.end();
821 ++i) {
822 replies.push_back(PushReplyOp());
823 handle_push(from, *i, &(replies.back()), &t);
824 }
825
826 MOSDPGPushReply *reply = new MOSDPGPushReply;
827 reply->from = get_parent()->whoami_shard();
828 reply->set_priority(m->get_priority());
829 reply->pgid = get_info().pgid;
830 reply->map_epoch = m->map_epoch;
831 reply->min_epoch = m->min_epoch;
832 reply->replies.swap(replies);
833 reply->compute_cost(cct);
834
835 t.register_on_complete(
836 new PG_SendMessageOnConn(
837 get_parent(), reply, m->get_connection()));
838
839 get_parent()->queue_transaction(std::move(t));
840}
841
842struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
843 ReplicatedBackend *bc;
844 list<ReplicatedBackend::pull_complete_info> to_continue;
845 int priority;
846 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
847 : bc(bc), priority(priority) {}
848
849 void finish(ThreadPool::TPHandle &handle) override {
850 ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
851 for (auto &&i: to_continue) {
852 auto j = bc->pulling.find(i.hoid);
853 assert(j != bc->pulling.end());
854 ObjectContextRef obc = j->second.obc;
855 bc->clear_pull(j, false /* already did it */);
224ce89b
WB
856 int started = bc->start_pushes(i.hoid, obc, h);
857 if (started < 0) {
858 bc->pushing[i.hoid].clear();
859 bc->get_parent()->primary_failed(i.hoid);
860 bc->get_parent()->primary_error(i.hoid, obc->obs.oi.version);
861 } else if (!started) {
7c673cae
FG
862 bc->get_parent()->on_global_recover(
863 i.hoid, i.stat);
864 }
865 handle.reset_tp_timeout();
866 }
867 bc->run_recovery_op(h, priority);
868 }
869};
870
871void ReplicatedBackend::_do_pull_response(OpRequestRef op)
872{
873 const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
874 assert(m->get_type() == MSG_OSD_PG_PUSH);
875 pg_shard_t from = m->from;
876
877 op->mark_started();
878
879 vector<PullOp> replies(1);
880
881 ostringstream ss;
882 if (get_parent()->check_failsafe_full(ss)) {
883 dout(10) << __func__ << " Out of space (failsafe) processing pull response (push): " << ss.str() << dendl;
884 ceph_abort();
885 }
886
887 ObjectStore::Transaction t;
888 list<pull_complete_info> to_continue;
889 for (vector<PushOp>::const_iterator i = m->pushes.begin();
890 i != m->pushes.end();
891 ++i) {
892 bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, &t);
893 if (more)
894 replies.push_back(PullOp());
895 }
896 if (!to_continue.empty()) {
897 C_ReplicatedBackend_OnPullComplete *c =
898 new C_ReplicatedBackend_OnPullComplete(
899 this,
900 m->get_priority());
901 c->to_continue.swap(to_continue);
902 t.register_on_complete(
903 new PG_RecoveryQueueAsync(
904 get_parent(),
905 get_parent()->bless_gencontext(c)));
906 }
907 replies.erase(replies.end() - 1);
908
909 if (replies.size()) {
910 MOSDPGPull *reply = new MOSDPGPull;
911 reply->from = parent->whoami_shard();
912 reply->set_priority(m->get_priority());
913 reply->pgid = get_info().pgid;
914 reply->map_epoch = m->map_epoch;
915 reply->min_epoch = m->min_epoch;
916 reply->set_pulls(&replies);
917 reply->compute_cost(cct);
918
919 t.register_on_complete(
920 new PG_SendMessageOnConn(
921 get_parent(), reply, m->get_connection()));
922 }
923
924 get_parent()->queue_transaction(std::move(t));
925}
926
927void ReplicatedBackend::do_pull(OpRequestRef op)
928{
929 MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_nonconst_req());
930 assert(m->get_type() == MSG_OSD_PG_PULL);
931 pg_shard_t from = m->from;
932
933 map<pg_shard_t, vector<PushOp> > replies;
934 vector<PullOp> pulls;
935 m->take_pulls(&pulls);
936 for (auto& i : pulls) {
937 replies[from].push_back(PushOp());
938 handle_pull(from, i, &(replies[from].back()));
939 }
940 send_pushes(m->get_priority(), replies);
941}
942
943void ReplicatedBackend::do_push_reply(OpRequestRef op)
944{
945 const MOSDPGPushReply *m = static_cast<const MOSDPGPushReply *>(op->get_req());
946 assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
947 pg_shard_t from = m->from;
948
949 vector<PushOp> replies(1);
950 for (vector<PushReplyOp>::const_iterator i = m->replies.begin();
951 i != m->replies.end();
952 ++i) {
953 bool more = handle_push_reply(from, *i, &(replies.back()));
954 if (more)
955 replies.push_back(PushOp());
956 }
957 replies.erase(replies.end() - 1);
958
959 map<pg_shard_t, vector<PushOp> > _replies;
960 _replies[from].swap(replies);
961 send_pushes(m->get_priority(), _replies);
962}
963
964Message * ReplicatedBackend::generate_subop(
965 const hobject_t &soid,
966 const eversion_t &at_version,
967 ceph_tid_t tid,
968 osd_reqid_t reqid,
969 eversion_t pg_trim_to,
970 eversion_t pg_roll_forward_to,
971 hobject_t new_temp_oid,
972 hobject_t discard_temp_oid,
973 const vector<pg_log_entry_t> &log_entries,
974 boost::optional<pg_hit_set_history_t> &hset_hist,
975 ObjectStore::Transaction &op_t,
976 pg_shard_t peer,
977 const pg_info_t &pinfo)
978{
979 int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
980 // forward the write/update/whatever
981 MOSDRepOp *wr = new MOSDRepOp(
982 reqid, parent->whoami_shard(),
983 spg_t(get_info().pgid.pgid, peer.shard),
984 soid, acks_wanted,
985 get_osdmap()->get_epoch(),
986 parent->get_last_peering_reset_epoch(),
987 tid, at_version);
988
989 // ship resulting transaction, log entries, and pg_stats
990 if (!parent->should_send_op(peer, soid)) {
991 dout(10) << "issue_repop shipping empty opt to osd." << peer
992 <<", object " << soid
993 << " beyond MAX(last_backfill_started "
994 << ", pinfo.last_backfill "
995 << pinfo.last_backfill << ")" << dendl;
996 ObjectStore::Transaction t;
997 ::encode(t, wr->get_data());
998 } else {
999 ::encode(op_t, wr->get_data());
1000 wr->get_header().data_off = op_t.get_data_alignment();
1001 }
1002
1003 ::encode(log_entries, wr->logbl);
1004
1005 if (pinfo.is_incomplete())
1006 wr->pg_stats = pinfo.stats; // reflects backfill progress
1007 else
1008 wr->pg_stats = get_info().stats;
1009
1010 wr->pg_trim_to = pg_trim_to;
1011 wr->pg_roll_forward_to = pg_roll_forward_to;
1012
1013 wr->new_temp_oid = new_temp_oid;
1014 wr->discard_temp_oid = discard_temp_oid;
1015 wr->updated_hit_set_history = hset_hist;
1016 return wr;
1017}
1018
1019void ReplicatedBackend::issue_op(
1020 const hobject_t &soid,
1021 const eversion_t &at_version,
1022 ceph_tid_t tid,
1023 osd_reqid_t reqid,
1024 eversion_t pg_trim_to,
1025 eversion_t pg_roll_forward_to,
1026 hobject_t new_temp_oid,
1027 hobject_t discard_temp_oid,
1028 const vector<pg_log_entry_t> &log_entries,
1029 boost::optional<pg_hit_set_history_t> &hset_hist,
1030 InProgressOp *op,
1031 ObjectStore::Transaction &op_t)
1032{
1033 if (op->op)
1034 op->op->pg_trace.event("issue replication ops");
1035
1036 if (parent->get_actingbackfill_shards().size() > 1) {
1037 ostringstream ss;
1038 set<pg_shard_t> replicas = parent->get_actingbackfill_shards();
1039 replicas.erase(parent->whoami_shard());
1040 ss << "waiting for subops from " << replicas;
1041 if (op->op)
1042 op->op->mark_sub_op_sent(ss.str());
1043 }
1044 for (set<pg_shard_t>::const_iterator i =
1045 parent->get_actingbackfill_shards().begin();
1046 i != parent->get_actingbackfill_shards().end();
1047 ++i) {
1048 if (*i == parent->whoami_shard()) continue;
1049 pg_shard_t peer = *i;
1050 const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second;
1051
1052 Message *wr;
1053 wr = generate_subop(
1054 soid,
1055 at_version,
1056 tid,
1057 reqid,
1058 pg_trim_to,
1059 pg_roll_forward_to,
1060 new_temp_oid,
1061 discard_temp_oid,
1062 log_entries,
1063 hset_hist,
1064 op_t,
1065 peer,
1066 pinfo);
1067 if (op->op)
1068 wr->trace.init("replicated op", nullptr, &op->op->pg_trace);
1069 get_parent()->send_message_osd_cluster(
1070 peer.osd, wr, get_osdmap()->get_epoch());
1071 }
1072}
1073
1074// sub op modify
1075void ReplicatedBackend::do_repop(OpRequestRef op)
1076{
1077 static_cast<MOSDRepOp*>(op->get_nonconst_req())->finish_decode();
1078 const MOSDRepOp *m = static_cast<const MOSDRepOp *>(op->get_req());
1079 int msg_type = m->get_type();
1080 assert(MSG_OSD_REPOP == msg_type);
1081
1082 const hobject_t& soid = m->poid;
1083
1084 dout(10) << __func__ << " " << soid
1085 << " v " << m->version
1086 << (m->logbl.length() ? " (transaction)" : " (parallel exec")
1087 << " " << m->logbl.length()
1088 << dendl;
1089
1090 // sanity checks
1091 assert(m->map_epoch >= get_info().history.same_interval_since);
1092
1093 // we better not be missing this.
1094 assert(!parent->get_log().get_missing().is_missing(soid));
1095
1096 int ackerosd = m->get_source().num();
1097
1098 op->mark_started();
1099
1100 RepModifyRef rm(std::make_shared<RepModify>());
1101 rm->op = op;
1102 rm->ackerosd = ackerosd;
1103 rm->last_complete = get_info().last_complete;
1104 rm->epoch_started = get_osdmap()->get_epoch();
1105
1106 assert(m->logbl.length());
1107 // shipped transaction and log entries
1108 vector<pg_log_entry_t> log;
1109
1110 bufferlist::iterator p = const_cast<bufferlist&>(m->get_data()).begin();
1111 ::decode(rm->opt, p);
1112
1113 if (m->new_temp_oid != hobject_t()) {
1114 dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
1115 add_temp_obj(m->new_temp_oid);
1116 }
1117 if (m->discard_temp_oid != hobject_t()) {
1118 dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
1119 if (rm->opt.empty()) {
1120 dout(10) << __func__ << ": removing object " << m->discard_temp_oid
1121 << " since we won't get the transaction" << dendl;
1122 rm->localt.remove(coll, ghobject_t(m->discard_temp_oid));
1123 }
1124 clear_temp_obj(m->discard_temp_oid);
1125 }
1126
1127 p = const_cast<bufferlist&>(m->logbl).begin();
1128 ::decode(log, p);
1129 rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1130
1131 bool update_snaps = false;
1132 if (!rm->opt.empty()) {
1133 // If the opt is non-empty, we infer we are before
1134 // last_backfill (according to the primary, not our
1135 // not-quite-accurate value), and should update the
1136 // collections now. Otherwise, we do it later on push.
1137 update_snaps = true;
1138 }
1139 parent->update_stats(m->pg_stats);
1140 parent->log_operation(
1141 log,
1142 m->updated_hit_set_history,
1143 m->pg_trim_to,
1144 m->pg_roll_forward_to,
1145 update_snaps,
1146 rm->localt);
1147
1148 rm->opt.register_on_commit(
1149 parent->bless_context(
1150 new C_OSD_RepModifyCommit(this, rm)));
1151 rm->localt.register_on_applied(
1152 parent->bless_context(
1153 new C_OSD_RepModifyApply(this, rm)));
1154 vector<ObjectStore::Transaction> tls;
1155 tls.reserve(2);
1156 tls.push_back(std::move(rm->localt));
1157 tls.push_back(std::move(rm->opt));
1158 parent->queue_transactions(tls, op);
1159 // op is cleaned up by oncommit/onapply when both are executed
1160}
1161
1162void ReplicatedBackend::repop_applied(RepModifyRef rm)
1163{
1164 rm->op->mark_event("sub_op_applied");
1165 rm->applied = true;
1166 rm->op->pg_trace.event("sup_op_applied");
1167
1168 dout(10) << __func__ << " on " << rm << " op "
1169 << *rm->op->get_req() << dendl;
1170 const Message *m = rm->op->get_req();
1171 const MOSDRepOp *req = static_cast<const MOSDRepOp*>(m);
1172 eversion_t version = req->version;
1173
1174 // send ack to acker only if we haven't sent a commit already
1175 if (!rm->committed) {
1176 Message *ack = new MOSDRepOpReply(
1177 req, parent->whoami_shard(),
1178 0, get_osdmap()->get_epoch(), req->min_epoch, CEPH_OSD_FLAG_ACK);
1179 ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
1180 ack->trace = rm->op->pg_trace;
1181 get_parent()->send_message_osd_cluster(
1182 rm->ackerosd, ack, get_osdmap()->get_epoch());
1183 }
1184
1185 parent->op_applied(version);
1186}
1187
1188void ReplicatedBackend::repop_commit(RepModifyRef rm)
1189{
1190 rm->op->mark_commit_sent();
1191 rm->op->pg_trace.event("sup_op_commit");
1192 rm->committed = true;
1193
1194 // send commit.
1195 const MOSDRepOp *m = static_cast<const MOSDRepOp*>(rm->op->get_req());
1196 assert(m->get_type() == MSG_OSD_REPOP);
1197 dout(10) << __func__ << " on op " << *m
1198 << ", sending commit to osd." << rm->ackerosd
1199 << dendl;
1200 assert(get_osdmap()->is_up(rm->ackerosd));
1201
1202 get_parent()->update_last_complete_ondisk(rm->last_complete);
1203
1204 MOSDRepOpReply *reply = new MOSDRepOpReply(
1205 m,
1206 get_parent()->whoami_shard(),
1207 0, get_osdmap()->get_epoch(), m->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
1208 reply->set_last_complete_ondisk(rm->last_complete);
1209 reply->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
1210 reply->trace = rm->op->pg_trace;
1211 get_parent()->send_message_osd_cluster(
1212 rm->ackerosd, reply, get_osdmap()->get_epoch());
1213
1214 log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
1215}
1216
1217
1218// ===========================================================
1219
1220void ReplicatedBackend::calc_head_subsets(
1221 ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
1222 const pg_missing_t& missing,
1223 const hobject_t &last_backfill,
1224 interval_set<uint64_t>& data_subset,
1225 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1226 ObcLockManager &manager)
1227{
1228 dout(10) << "calc_head_subsets " << head
1229 << " clone_overlap " << snapset.clone_overlap << dendl;
1230
1231 uint64_t size = obc->obs.oi.size;
1232 if (size)
1233 data_subset.insert(0, size);
1234
1235 if (get_parent()->get_pool().allow_incomplete_clones()) {
1236 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1237 return;
1238 }
1239
1240 if (!cct->_conf->osd_recover_clone_overlap) {
1241 dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
1242 return;
1243 }
1244
1245
1246 interval_set<uint64_t> cloning;
1247 interval_set<uint64_t> prev;
1248 if (size)
1249 prev.insert(0, size);
1250
1251 for (int j=snapset.clones.size()-1; j>=0; j--) {
1252 hobject_t c = head;
1253 c.snap = snapset.clones[j];
1254 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1255 if (!missing.is_missing(c) &&
1256 c < last_backfill &&
1257 get_parent()->try_lock_for_read(c, manager)) {
1258 dout(10) << "calc_head_subsets " << head << " has prev " << c
1259 << " overlap " << prev << dendl;
1260 clone_subsets[c] = prev;
1261 cloning.union_of(prev);
1262 break;
1263 }
1264 dout(10) << "calc_head_subsets " << head << " does not have prev " << c
1265 << " overlap " << prev << dendl;
1266 }
1267
1268
1269 if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1270 dout(10) << "skipping clone, too many holes" << dendl;
1271 get_parent()->release_locks(manager);
1272 clone_subsets.clear();
1273 cloning.clear();
1274 }
1275
1276 // what's left for us to push?
1277 data_subset.subtract(cloning);
1278
1279 dout(10) << "calc_head_subsets " << head
1280 << " data_subset " << data_subset
1281 << " clone_subsets " << clone_subsets << dendl;
1282}
1283
1284void ReplicatedBackend::calc_clone_subsets(
1285 SnapSet& snapset, const hobject_t& soid,
1286 const pg_missing_t& missing,
1287 const hobject_t &last_backfill,
1288 interval_set<uint64_t>& data_subset,
1289 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1290 ObcLockManager &manager)
1291{
1292 dout(10) << "calc_clone_subsets " << soid
1293 << " clone_overlap " << snapset.clone_overlap << dendl;
1294
1295 uint64_t size = snapset.clone_size[soid.snap];
1296 if (size)
1297 data_subset.insert(0, size);
1298
1299 if (get_parent()->get_pool().allow_incomplete_clones()) {
1300 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1301 return;
1302 }
1303
1304 if (!cct->_conf->osd_recover_clone_overlap) {
1305 dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
1306 return;
1307 }
1308
1309 unsigned i;
1310 for (i=0; i < snapset.clones.size(); i++)
1311 if (snapset.clones[i] == soid.snap)
1312 break;
1313
1314 // any overlap with next older clone?
1315 interval_set<uint64_t> cloning;
1316 interval_set<uint64_t> prev;
1317 if (size)
1318 prev.insert(0, size);
1319 for (int j=i-1; j>=0; j--) {
1320 hobject_t c = soid;
1321 c.snap = snapset.clones[j];
1322 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1323 if (!missing.is_missing(c) &&
1324 c < last_backfill &&
1325 get_parent()->try_lock_for_read(c, manager)) {
1326 dout(10) << "calc_clone_subsets " << soid << " has prev " << c
1327 << " overlap " << prev << dendl;
1328 clone_subsets[c] = prev;
1329 cloning.union_of(prev);
1330 break;
1331 }
1332 dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
1333 << " overlap " << prev << dendl;
1334 }
1335
1336 // overlap with next newest?
1337 interval_set<uint64_t> next;
1338 if (size)
1339 next.insert(0, size);
1340 for (unsigned j=i+1; j<snapset.clones.size(); j++) {
1341 hobject_t c = soid;
1342 c.snap = snapset.clones[j];
1343 next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
1344 if (!missing.is_missing(c) &&
1345 c < last_backfill &&
1346 get_parent()->try_lock_for_read(c, manager)) {
1347 dout(10) << "calc_clone_subsets " << soid << " has next " << c
1348 << " overlap " << next << dendl;
1349 clone_subsets[c] = next;
1350 cloning.union_of(next);
1351 break;
1352 }
1353 dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
1354 << " overlap " << next << dendl;
1355 }
1356
1357 if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1358 dout(10) << "skipping clone, too many holes" << dendl;
1359 get_parent()->release_locks(manager);
1360 clone_subsets.clear();
1361 cloning.clear();
1362 }
1363
1364
1365 // what's left for us to push?
1366 data_subset.subtract(cloning);
1367
1368 dout(10) << "calc_clone_subsets " << soid
1369 << " data_subset " << data_subset
1370 << " clone_subsets " << clone_subsets << dendl;
1371}
1372
1373void ReplicatedBackend::prepare_pull(
1374 eversion_t v,
1375 const hobject_t& soid,
1376 ObjectContextRef headctx,
1377 RPGHandle *h)
1378{
1379 assert(get_parent()->get_local_missing().get_items().count(soid));
1380 eversion_t _v = get_parent()->get_local_missing().get_items().find(
1381 soid)->second.need;
1382 assert(_v == v);
1383 const map<hobject_t, set<pg_shard_t>> &missing_loc(
1384 get_parent()->get_missing_loc_shards());
1385 const map<pg_shard_t, pg_missing_t > &peer_missing(
1386 get_parent()->get_shard_missing());
1387 map<hobject_t, set<pg_shard_t>>::const_iterator q = missing_loc.find(soid);
1388 assert(q != missing_loc.end());
1389 assert(!q->second.empty());
1390
1391 // pick a pullee
1392 vector<pg_shard_t> shuffle(q->second.begin(), q->second.end());
1393 random_shuffle(shuffle.begin(), shuffle.end());
1394 vector<pg_shard_t>::iterator p = shuffle.begin();
1395 assert(get_osdmap()->is_up(p->osd));
1396 pg_shard_t fromshard = *p;
1397
1398 dout(7) << "pull " << soid
1399 << " v " << v
1400 << " on osds " << q->second
1401 << " from osd." << fromshard
1402 << dendl;
1403
1404 assert(peer_missing.count(fromshard));
1405 const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
1406 if (pmissing.is_missing(soid, v)) {
1407 assert(pmissing.get_items().find(soid)->second.have != v);
1408 dout(10) << "pulling soid " << soid << " from osd " << fromshard
1409 << " at version " << pmissing.get_items().find(soid)->second.have
1410 << " rather than at version " << v << dendl;
1411 v = pmissing.get_items().find(soid)->second.have;
1412 assert(get_parent()->get_log().get_log().objects.count(soid) &&
1413 (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
1414 pg_log_entry_t::LOST_REVERT) &&
1415 (get_parent()->get_log().get_log().objects.find(
1416 soid)->second->reverting_to ==
1417 v));
1418 }
1419
1420 ObjectRecoveryInfo recovery_info;
1421 ObcLockManager lock_manager;
1422
1423 if (soid.is_snap()) {
1424 assert(!get_parent()->get_local_missing().is_missing(
1425 soid.get_head()) ||
1426 !get_parent()->get_local_missing().is_missing(
1427 soid.get_snapdir()));
1428 assert(headctx);
1429 // check snapset
1430 SnapSetContext *ssc = headctx->ssc;
1431 assert(ssc);
1432 dout(10) << " snapset " << ssc->snapset << dendl;
1433 recovery_info.ss = ssc->snapset;
1434 calc_clone_subsets(
1435 ssc->snapset, soid, get_parent()->get_local_missing(),
1436 get_info().last_backfill,
1437 recovery_info.copy_subset,
1438 recovery_info.clone_subset,
1439 lock_manager);
1440 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1441 dout(10) << " pulling " << recovery_info << dendl;
1442
1443 assert(ssc->snapset.clone_size.count(soid.snap));
1444 recovery_info.size = ssc->snapset.clone_size[soid.snap];
1445 } else {
1446 // pulling head or unversioned object.
1447 // always pull the whole thing.
1448 recovery_info.copy_subset.insert(0, (uint64_t)-1);
1449 recovery_info.size = ((uint64_t)-1);
1450 }
1451
1452 h->pulls[fromshard].push_back(PullOp());
1453 PullOp &op = h->pulls[fromshard].back();
1454 op.soid = soid;
1455
1456 op.recovery_info = recovery_info;
1457 op.recovery_info.soid = soid;
1458 op.recovery_info.version = v;
1459 op.recovery_progress.data_complete = false;
1460 op.recovery_progress.omap_complete = false;
1461 op.recovery_progress.data_recovered_to = 0;
1462 op.recovery_progress.first = true;
1463
1464 assert(!pulling.count(soid));
1465 pull_from_peer[fromshard].insert(soid);
1466 PullInfo &pi = pulling[soid];
1467 pi.from = fromshard;
1468 pi.soid = soid;
1469 pi.head_ctx = headctx;
1470 pi.recovery_info = op.recovery_info;
1471 pi.recovery_progress = op.recovery_progress;
1472 pi.cache_dont_need = h->cache_dont_need;
1473 pi.lock_manager = std::move(lock_manager);
1474}
1475
1476/*
1477 * intelligently push an object to a replica. make use of existing
1478 * clones/heads and dup data ranges where possible.
1479 */
224ce89b 1480int ReplicatedBackend::prep_push_to_replica(
7c673cae
FG
1481 ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
1482 PushOp *pop, bool cache_dont_need)
1483{
1484 const object_info_t& oi = obc->obs.oi;
1485 uint64_t size = obc->obs.oi.size;
1486
1487 dout(10) << __func__ << ": " << soid << " v" << oi.version
1488 << " size " << size << " to osd." << peer << dendl;
1489
1490 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1491 interval_set<uint64_t> data_subset;
1492
1493 ObcLockManager lock_manager;
1494 // are we doing a clone on the replica?
1495 if (soid.snap && soid.snap < CEPH_NOSNAP) {
1496 hobject_t head = soid;
1497 head.snap = CEPH_NOSNAP;
1498
1499 // try to base push off of clones that succeed/preceed poid
1500 // we need the head (and current SnapSet) locally to do that.
1501 if (get_parent()->get_local_missing().is_missing(head)) {
1502 dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
1503 return prep_push(obc, soid, peer, pop, cache_dont_need);
1504 }
1505 hobject_t snapdir = head;
1506 snapdir.snap = CEPH_SNAPDIR;
1507 if (get_parent()->get_local_missing().is_missing(snapdir)) {
1508 dout(15) << "push_to_replica missing snapdir " << snapdir
1509 << ", pushing raw clone" << dendl;
1510 return prep_push(obc, soid, peer, pop, cache_dont_need);
1511 }
1512
1513 SnapSetContext *ssc = obc->ssc;
1514 assert(ssc);
1515 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1516 pop->recovery_info.ss = ssc->snapset;
1517 map<pg_shard_t, pg_missing_t>::const_iterator pm =
1518 get_parent()->get_shard_missing().find(peer);
1519 assert(pm != get_parent()->get_shard_missing().end());
1520 map<pg_shard_t, pg_info_t>::const_iterator pi =
1521 get_parent()->get_shard_info().find(peer);
1522 assert(pi != get_parent()->get_shard_info().end());
1523 calc_clone_subsets(
1524 ssc->snapset, soid,
1525 pm->second,
1526 pi->second.last_backfill,
1527 data_subset, clone_subsets,
1528 lock_manager);
1529 } else if (soid.snap == CEPH_NOSNAP) {
1530 // pushing head or unversioned object.
1531 // base this on partially on replica's clones?
1532 SnapSetContext *ssc = obc->ssc;
1533 assert(ssc);
1534 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1535 calc_head_subsets(
1536 obc,
1537 ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
1538 get_parent()->get_shard_info().find(peer)->second.last_backfill,
1539 data_subset, clone_subsets,
1540 lock_manager);
1541 }
1542
224ce89b 1543 return prep_push(
7c673cae
FG
1544 obc,
1545 soid,
1546 peer,
1547 oi.version,
1548 data_subset,
1549 clone_subsets,
1550 pop,
1551 cache_dont_need,
1552 std::move(lock_manager));
1553}
1554
224ce89b 1555int ReplicatedBackend::prep_push(ObjectContextRef obc,
7c673cae
FG
1556 const hobject_t& soid, pg_shard_t peer,
1557 PushOp *pop, bool cache_dont_need)
1558{
1559 interval_set<uint64_t> data_subset;
1560 if (obc->obs.oi.size)
1561 data_subset.insert(0, obc->obs.oi.size);
1562 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1563
224ce89b 1564 return prep_push(obc, soid, peer,
7c673cae
FG
1565 obc->obs.oi.version, data_subset, clone_subsets,
1566 pop, cache_dont_need, ObcLockManager());
1567}
1568
224ce89b 1569int ReplicatedBackend::prep_push(
7c673cae
FG
1570 ObjectContextRef obc,
1571 const hobject_t& soid, pg_shard_t peer,
1572 eversion_t version,
1573 interval_set<uint64_t> &data_subset,
1574 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1575 PushOp *pop,
1576 bool cache_dont_need,
1577 ObcLockManager &&lock_manager)
1578{
1579 get_parent()->begin_peer_recover(peer, soid);
1580 // take note.
1581 PushInfo &pi = pushing[soid][peer];
1582 pi.obc = obc;
1583 pi.recovery_info.size = obc->obs.oi.size;
1584 pi.recovery_info.copy_subset = data_subset;
1585 pi.recovery_info.clone_subset = clone_subsets;
1586 pi.recovery_info.soid = soid;
1587 pi.recovery_info.oi = obc->obs.oi;
1588 pi.recovery_info.ss = pop->recovery_info.ss;
1589 pi.recovery_info.version = version;
1590 pi.lock_manager = std::move(lock_manager);
1591
1592 ObjectRecoveryProgress new_progress;
1593 int r = build_push_op(pi.recovery_info,
1594 pi.recovery_progress,
1595 &new_progress,
1596 pop,
1597 &(pi.stat), cache_dont_need);
224ce89b
WB
1598 if (r < 0)
1599 return r;
7c673cae 1600 pi.recovery_progress = new_progress;
224ce89b 1601 return 0;
7c673cae
FG
1602}
1603
1604void ReplicatedBackend::submit_push_data(
1605 const ObjectRecoveryInfo &recovery_info,
1606 bool first,
1607 bool complete,
1608 bool cache_dont_need,
1609 const interval_set<uint64_t> &intervals_included,
1610 bufferlist data_included,
1611 bufferlist omap_header,
1612 const map<string, bufferlist> &attrs,
1613 const map<string, bufferlist> &omap_entries,
1614 ObjectStore::Transaction *t)
1615{
1616 hobject_t target_oid;
1617 if (first && complete) {
1618 target_oid = recovery_info.soid;
1619 } else {
1620 target_oid = get_parent()->get_temp_recovery_object(recovery_info.soid,
1621 recovery_info.version);
1622 if (first) {
1623 dout(10) << __func__ << ": Adding oid "
1624 << target_oid << " in the temp collection" << dendl;
1625 add_temp_obj(target_oid);
1626 }
1627 }
1628
1629 if (first) {
1630 t->remove(coll, ghobject_t(target_oid));
1631 t->touch(coll, ghobject_t(target_oid));
1632 t->truncate(coll, ghobject_t(target_oid), recovery_info.size);
1633 if (omap_header.length())
1634 t->omap_setheader(coll, ghobject_t(target_oid), omap_header);
1635
1636 bufferlist bv = attrs.at(OI_ATTR);
1637 object_info_t oi(bv);
1638 t->set_alloc_hint(coll, ghobject_t(target_oid),
1639 oi.expected_object_size,
1640 oi.expected_write_size,
1641 oi.alloc_hint_flags);
1642 }
1643 uint64_t off = 0;
1644 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
1645 if (cache_dont_need)
1646 fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
1647 for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
1648 p != intervals_included.end();
1649 ++p) {
1650 bufferlist bit;
1651 bit.substr_of(data_included, off, p.get_len());
1652 t->write(coll, ghobject_t(target_oid),
1653 p.get_start(), p.get_len(), bit, fadvise_flags);
1654 off += p.get_len();
1655 }
1656
1657 if (!omap_entries.empty())
1658 t->omap_setkeys(coll, ghobject_t(target_oid), omap_entries);
1659 if (!attrs.empty())
1660 t->setattrs(coll, ghobject_t(target_oid), attrs);
1661
1662 if (complete) {
1663 if (!first) {
1664 dout(10) << __func__ << ": Removing oid "
1665 << target_oid << " from the temp collection" << dendl;
1666 clear_temp_obj(target_oid);
1667 t->remove(coll, ghobject_t(recovery_info.soid));
1668 t->collection_move_rename(coll, ghobject_t(target_oid),
1669 coll, ghobject_t(recovery_info.soid));
1670 }
1671
1672 submit_push_complete(recovery_info, t);
1673 }
1674}
1675
1676void ReplicatedBackend::submit_push_complete(
1677 const ObjectRecoveryInfo &recovery_info,
1678 ObjectStore::Transaction *t)
1679{
1680 for (map<hobject_t, interval_set<uint64_t>>::const_iterator p =
1681 recovery_info.clone_subset.begin();
1682 p != recovery_info.clone_subset.end();
1683 ++p) {
1684 for (interval_set<uint64_t>::const_iterator q = p->second.begin();
1685 q != p->second.end();
1686 ++q) {
1687 dout(15) << " clone_range " << p->first << " "
1688 << q.get_start() << "~" << q.get_len() << dendl;
1689 t->clone_range(coll, ghobject_t(p->first), ghobject_t(recovery_info.soid),
1690 q.get_start(), q.get_len(), q.get_start());
1691 }
1692 }
1693}
1694
1695ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
1696 const ObjectRecoveryInfo& recovery_info,
1697 SnapSetContext *ssc,
1698 ObcLockManager &manager)
1699{
1700 if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
1701 return recovery_info;
1702 ObjectRecoveryInfo new_info = recovery_info;
1703 new_info.copy_subset.clear();
1704 new_info.clone_subset.clear();
1705 assert(ssc);
1706 get_parent()->release_locks(manager); // might already have locks
1707 calc_clone_subsets(
1708 ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
1709 get_info().last_backfill,
1710 new_info.copy_subset, new_info.clone_subset,
1711 manager);
1712 return new_info;
1713}
1714
1715bool ReplicatedBackend::handle_pull_response(
1716 pg_shard_t from, const PushOp &pop, PullOp *response,
1717 list<pull_complete_info> *to_continue,
1718 ObjectStore::Transaction *t)
1719{
1720 interval_set<uint64_t> data_included = pop.data_included;
1721 bufferlist data;
1722 data = pop.data;
1723 dout(10) << "handle_pull_response "
1724 << pop.recovery_info
1725 << pop.after_progress
1726 << " data.size() is " << data.length()
1727 << " data_included: " << data_included
1728 << dendl;
1729 if (pop.version == eversion_t()) {
1730 // replica doesn't have it!
224ce89b 1731 _failed_pull(from, pop.soid);
7c673cae
FG
1732 return false;
1733 }
1734
1735 const hobject_t &hoid = pop.soid;
1736 assert((data_included.empty() && data.length() == 0) ||
1737 (!data_included.empty() && data.length() > 0));
1738
1739 auto piter = pulling.find(hoid);
1740 if (piter == pulling.end()) {
1741 return false;
1742 }
1743
1744 PullInfo &pi = piter->second;
1745 if (pi.recovery_info.size == (uint64_t(-1))) {
1746 pi.recovery_info.size = pop.recovery_info.size;
1747 pi.recovery_info.copy_subset.intersection_of(
1748 pop.recovery_info.copy_subset);
1749 }
224ce89b
WB
1750 // If primary doesn't have object info and didn't know version
1751 if (pi.recovery_info.version == eversion_t()) {
1752 pi.recovery_info.version = pop.version;
1753 }
7c673cae
FG
1754
1755 bool first = pi.recovery_progress.first;
1756 if (first) {
1757 // attrs only reference the origin bufferlist (decode from
1758 // MOSDPGPush message) whose size is much greater than attrs in
1759 // recovery. If obc cache it (get_obc maybe cache the attr), this
1760 // causes the whole origin bufferlist would not be free until obc
1761 // is evicted from obc cache. So rebuild the bufferlists before
1762 // cache it.
1763 auto attrset = pop.attrset;
1764 for (auto& a : attrset) {
1765 a.second.rebuild();
1766 }
1767 pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset);
1768 pi.recovery_info.oi = pi.obc->obs.oi;
1769 pi.recovery_info = recalc_subsets(
1770 pi.recovery_info,
1771 pi.obc->ssc,
1772 pi.lock_manager);
1773 }
1774
1775
1776 interval_set<uint64_t> usable_intervals;
1777 bufferlist usable_data;
1778 trim_pushed_data(pi.recovery_info.copy_subset,
1779 data_included,
1780 data,
1781 &usable_intervals,
1782 &usable_data);
1783 data_included = usable_intervals;
1784 data.claim(usable_data);
1785
1786
1787 pi.recovery_progress = pop.after_progress;
1788
1789 dout(10) << "new recovery_info " << pi.recovery_info
1790 << ", new progress " << pi.recovery_progress
1791 << dendl;
1792
1793 bool complete = pi.is_complete();
1794
1795 submit_push_data(pi.recovery_info, first,
1796 complete, pi.cache_dont_need,
1797 data_included, data,
1798 pop.omap_header,
1799 pop.attrset,
1800 pop.omap_entries,
1801 t);
1802
1803 pi.stat.num_keys_recovered += pop.omap_entries.size();
1804 pi.stat.num_bytes_recovered += data.length();
1805
1806 if (complete) {
1807 pi.stat.num_objects_recovered++;
1808 clear_pull_from(piter);
1809 to_continue->push_back({hoid, pi.stat});
1810 get_parent()->on_local_recover(
1811 hoid, pi.recovery_info, pi.obc, t);
1812 return false;
1813 } else {
1814 response->soid = pop.soid;
1815 response->recovery_info = pi.recovery_info;
1816 response->recovery_progress = pi.recovery_progress;
1817 return true;
1818 }
1819}
1820
1821void ReplicatedBackend::handle_push(
1822 pg_shard_t from, const PushOp &pop, PushReplyOp *response,
1823 ObjectStore::Transaction *t)
1824{
1825 dout(10) << "handle_push "
1826 << pop.recovery_info
1827 << pop.after_progress
1828 << dendl;
1829 bufferlist data;
1830 data = pop.data;
1831 bool first = pop.before_progress.first;
1832 bool complete = pop.after_progress.data_complete &&
1833 pop.after_progress.omap_complete;
1834
1835 response->soid = pop.recovery_info.soid;
1836 submit_push_data(pop.recovery_info,
1837 first,
1838 complete,
1839 true, // must be replicate
1840 pop.data_included,
1841 data,
1842 pop.omap_header,
1843 pop.attrset,
1844 pop.omap_entries,
1845 t);
1846
1847 if (complete)
1848 get_parent()->on_local_recover(
1849 pop.recovery_info.soid,
1850 pop.recovery_info,
1851 ObjectContextRef(), // ok, is replica
1852 t);
1853}
1854
1855void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
1856{
1857 for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
1858 i != pushes.end();
1859 ++i) {
1860 ConnectionRef con = get_parent()->get_con_osd_cluster(
1861 i->first.osd,
1862 get_osdmap()->get_epoch());
1863 if (!con)
1864 continue;
1865 vector<PushOp>::iterator j = i->second.begin();
1866 while (j != i->second.end()) {
1867 uint64_t cost = 0;
1868 uint64_t pushes = 0;
1869 MOSDPGPush *msg = new MOSDPGPush();
1870 msg->from = get_parent()->whoami_shard();
1871 msg->pgid = get_parent()->primary_spg_t();
1872 msg->map_epoch = get_osdmap()->get_epoch();
1873 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1874 msg->set_priority(prio);
1875 for (;
1876 (j != i->second.end() &&
1877 cost < cct->_conf->osd_max_push_cost &&
1878 pushes < cct->_conf->osd_max_push_objects) ;
1879 ++j) {
1880 dout(20) << __func__ << ": sending push " << *j
1881 << " to osd." << i->first << dendl;
1882 cost += j->cost(cct);
1883 pushes += 1;
1884 msg->pushes.push_back(*j);
1885 }
1886 msg->set_cost(cost);
1887 get_parent()->send_message_osd_cluster(msg, con);
1888 }
1889 }
1890}
1891
1892void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
1893{
1894 for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
1895 i != pulls.end();
1896 ++i) {
1897 ConnectionRef con = get_parent()->get_con_osd_cluster(
1898 i->first.osd,
1899 get_osdmap()->get_epoch());
1900 if (!con)
1901 continue;
1902 dout(20) << __func__ << ": sending pulls " << i->second
1903 << " to osd." << i->first << dendl;
1904 MOSDPGPull *msg = new MOSDPGPull();
1905 msg->from = parent->whoami_shard();
1906 msg->set_priority(prio);
1907 msg->pgid = get_parent()->primary_spg_t();
1908 msg->map_epoch = get_osdmap()->get_epoch();
1909 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1910 msg->set_pulls(&i->second);
1911 msg->compute_cost(cct);
1912 get_parent()->send_message_osd_cluster(msg, con);
1913 }
1914}
1915
1916int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
1917 const ObjectRecoveryProgress &progress,
1918 ObjectRecoveryProgress *out_progress,
1919 PushOp *out_op,
1920 object_stat_sum_t *stat,
1921 bool cache_dont_need)
1922{
1923 ObjectRecoveryProgress _new_progress;
1924 if (!out_progress)
1925 out_progress = &_new_progress;
1926 ObjectRecoveryProgress &new_progress = *out_progress;
1927 new_progress = progress;
1928
224ce89b 1929 dout(7) << __func__ << " " << recovery_info.soid
7c673cae
FG
1930 << " v " << recovery_info.version
1931 << " size " << recovery_info.size
1932 << " recovery_info: " << recovery_info
1933 << dendl;
1934
224ce89b 1935 eversion_t v = recovery_info.version;
7c673cae
FG
1936 if (progress.first) {
1937 int r = store->omap_get_header(coll, ghobject_t(recovery_info.soid), &out_op->omap_header);
1938 if(r < 0) {
1939 dout(1) << __func__ << " get omap header failed: " << cpp_strerror(-r) << dendl;
1940 return r;
1941 }
1942 r = store->getattrs(ch, ghobject_t(recovery_info.soid), out_op->attrset);
1943 if(r < 0) {
1944 dout(1) << __func__ << " getattrs failed: " << cpp_strerror(-r) << dendl;
1945 return r;
1946 }
1947
1948 // Debug
1949 bufferlist bv = out_op->attrset[OI_ATTR];
224ce89b
WB
1950 object_info_t oi;
1951 try {
1952 bufferlist::iterator bliter = bv.begin();
1953 ::decode(oi, bliter);
1954 } catch (...) {
1955 dout(0) << __func__ << ": bad object_info_t: " << recovery_info.soid << dendl;
1956 return -EINVAL;
1957 }
7c673cae 1958
224ce89b
WB
1959 // If requestor didn't know the version, use ours
1960 if (v == eversion_t()) {
1961 v = oi.version;
1962 } else if (oi.version != v) {
7c673cae
FG
1963 get_parent()->clog_error() << get_info().pgid << " push "
1964 << recovery_info.soid << " v "
1965 << recovery_info.version
1966 << " failed because local copy is "
1967 << oi.version;
1968 return -EINVAL;
1969 }
1970
1971 new_progress.first = false;
1972 }
224ce89b
WB
1973 // Once we provide the version subsequent requests will have it, so
1974 // at this point it must be known.
1975 assert(v != eversion_t());
7c673cae
FG
1976
1977 uint64_t available = cct->_conf->osd_recovery_max_chunk;
1978 if (!progress.omap_complete) {
1979 ObjectMap::ObjectMapIterator iter =
1980 store->get_omap_iterator(coll,
1981 ghobject_t(recovery_info.soid));
1982 assert(iter);
1983 for (iter->lower_bound(progress.omap_recovered_to);
1984 iter->valid();
1985 iter->next(false)) {
1986 if (!out_op->omap_entries.empty() &&
1987 ((cct->_conf->osd_recovery_max_omap_entries_per_chunk > 0 &&
1988 out_op->omap_entries.size() >= cct->_conf->osd_recovery_max_omap_entries_per_chunk) ||
1989 available <= iter->key().size() + iter->value().length()))
1990 break;
1991 out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
1992
1993 if ((iter->key().size() + iter->value().length()) <= available)
1994 available -= (iter->key().size() + iter->value().length());
1995 else
1996 available = 0;
1997 }
1998 if (!iter->valid())
1999 new_progress.omap_complete = true;
2000 else
2001 new_progress.omap_recovered_to = iter->key();
2002 }
2003
2004 if (available > 0) {
2005 if (!recovery_info.copy_subset.empty()) {
2006 interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
2007 map<uint64_t, uint64_t> m;
2008 int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0,
2009 copy_subset.range_end(), m);
2010 if (r >= 0) {
2011 interval_set<uint64_t> fiemap_included(m);
2012 copy_subset.intersection_of(fiemap_included);
2013 } else {
2014 // intersection of copy_subset and empty interval_set would be empty anyway
2015 copy_subset.clear();
2016 }
2017
2018 out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
2019 available);
2020 if (out_op->data_included.empty()) // zero filled section, skip to end!
2021 new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
2022 else
2023 new_progress.data_recovered_to = out_op->data_included.range_end();
2024 }
2025 } else {
2026 out_op->data_included.clear();
2027 }
2028
2029 for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
2030 p != out_op->data_included.end();
2031 ++p) {
2032 bufferlist bit;
224ce89b 2033 int r = store->read(ch, ghobject_t(recovery_info.soid),
7c673cae
FG
2034 p.get_start(), p.get_len(), bit,
2035 cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
224ce89b
WB
2036 if (cct->_conf->osd_debug_random_push_read_error &&
2037 (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) {
2038 dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
2039 r = -EIO;
2040 }
2041 if (r < 0) {
2042 return r;
2043 }
7c673cae
FG
2044 if (p.get_len() != bit.length()) {
2045 dout(10) << " extent " << p.get_start() << "~" << p.get_len()
2046 << " is actually " << p.get_start() << "~" << bit.length()
2047 << dendl;
2048 interval_set<uint64_t>::iterator save = p++;
2049 if (bit.length() == 0)
2050 out_op->data_included.erase(save); //Remove this empty interval
2051 else
2052 save.set_len(bit.length());
2053 // Remove any other intervals present
2054 while (p != out_op->data_included.end()) {
2055 interval_set<uint64_t>::iterator save = p++;
2056 out_op->data_included.erase(save);
2057 }
2058 new_progress.data_complete = true;
2059 out_op->data.claim_append(bit);
2060 break;
2061 }
2062 out_op->data.claim_append(bit);
2063 }
2064
2065 if (new_progress.is_complete(recovery_info)) {
2066 new_progress.data_complete = true;
2067 if (stat)
2068 stat->num_objects_recovered++;
2069 }
2070
2071 if (stat) {
2072 stat->num_keys_recovered += out_op->omap_entries.size();
2073 stat->num_bytes_recovered += out_op->data.length();
2074 }
2075
2076 get_parent()->get_logger()->inc(l_osd_push);
2077 get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
2078
2079 // send
224ce89b 2080 out_op->version = v;
7c673cae
FG
2081 out_op->soid = recovery_info.soid;
2082 out_op->recovery_info = recovery_info;
2083 out_op->after_progress = new_progress;
2084 out_op->before_progress = progress;
2085 return 0;
2086}
2087
2088void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
2089{
2090 op->recovery_info.version = eversion_t();
2091 op->version = eversion_t();
2092 op->soid = soid;
2093}
2094
2095bool ReplicatedBackend::handle_push_reply(
2096 pg_shard_t peer, const PushReplyOp &op, PushOp *reply)
2097{
2098 const hobject_t &soid = op.soid;
2099 if (pushing.count(soid) == 0) {
2100 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2101 << ", or anybody else"
2102 << dendl;
2103 return false;
2104 } else if (pushing[soid].count(peer) == 0) {
2105 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2106 << dendl;
2107 return false;
2108 } else {
2109 PushInfo *pi = &pushing[soid][peer];
224ce89b 2110 bool error = pushing[soid].begin()->second.recovery_progress.error;
7c673cae 2111
224ce89b 2112 if (!pi->recovery_progress.data_complete && !error) {
7c673cae
FG
2113 dout(10) << " pushing more from, "
2114 << pi->recovery_progress.data_recovered_to
2115 << " of " << pi->recovery_info.copy_subset << dendl;
2116 ObjectRecoveryProgress new_progress;
2117 int r = build_push_op(
2118 pi->recovery_info,
2119 pi->recovery_progress, &new_progress, reply,
2120 &(pi->stat));
224ce89b
WB
2121 // Handle the case of a read error right after we wrote, which is
2122 // hopefuilly extremely rare.
2123 if (r < 0) {
2124 dout(5) << __func__ << ": oid " << soid << " error " << r << dendl;
2125
2126 error = true;
2127 goto done;
2128 }
7c673cae
FG
2129 pi->recovery_progress = new_progress;
2130 return true;
2131 } else {
2132 // done!
224ce89b
WB
2133done:
2134 if (!error)
2135 get_parent()->on_peer_recover( peer, soid, pi->recovery_info);
7c673cae
FG
2136
2137 get_parent()->release_locks(pi->lock_manager);
2138 object_stat_sum_t stat = pi->stat;
224ce89b 2139 eversion_t v = pi->recovery_info.version;
7c673cae
FG
2140 pushing[soid].erase(peer);
2141 pi = NULL;
2142
2143 if (pushing[soid].empty()) {
224ce89b
WB
2144 if (!error)
2145 get_parent()->on_global_recover(soid, stat);
2146 else
2147 get_parent()->on_primary_error(soid, v);
2148
7c673cae
FG
2149 pushing.erase(soid);
2150 } else {
224ce89b
WB
2151 // This looks weird, but we erased the current peer and need to remember
2152 // the error on any other one, while getting more acks.
2153 if (error)
2154 pushing[soid].begin()->second.recovery_progress.error = true;
7c673cae
FG
2155 dout(10) << "pushed " << soid << ", still waiting for push ack from "
2156 << pushing[soid].size() << " others" << dendl;
2157 }
2158 return false;
2159 }
2160 }
2161}
2162
2163void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
2164{
2165 const hobject_t &soid = op.soid;
2166 struct stat st;
2167 int r = store->stat(ch, ghobject_t(soid), &st);
2168 if (r != 0) {
2169 get_parent()->clog_error() << get_info().pgid << " "
2170 << peer << " tried to pull " << soid
2171 << " but got " << cpp_strerror(-r);
2172 prep_push_op_blank(soid, reply);
2173 } else {
2174 ObjectRecoveryInfo &recovery_info = op.recovery_info;
2175 ObjectRecoveryProgress &progress = op.recovery_progress;
2176 if (progress.first && recovery_info.size == ((uint64_t)-1)) {
2177 // Adjust size and copy_subset
2178 recovery_info.size = st.st_size;
2179 recovery_info.copy_subset.clear();
2180 if (st.st_size)
2181 recovery_info.copy_subset.insert(0, st.st_size);
2182 assert(recovery_info.clone_subset.empty());
2183 }
2184
2185 r = build_push_op(recovery_info, progress, 0, reply);
2186 if (r < 0)
2187 prep_push_op_blank(soid, reply);
2188 }
2189}
2190
2191/**
2192 * trim received data to remove what we don't want
2193 *
2194 * @param copy_subset intervals we want
2195 * @param data_included intervals we got
2196 * @param data_recieved data we got
2197 * @param intervals_usable intervals we want to keep
2198 * @param data_usable matching data we want to keep
2199 */
2200void ReplicatedBackend::trim_pushed_data(
2201 const interval_set<uint64_t> &copy_subset,
2202 const interval_set<uint64_t> &intervals_received,
2203 bufferlist data_received,
2204 interval_set<uint64_t> *intervals_usable,
2205 bufferlist *data_usable)
2206{
2207 if (intervals_received.subset_of(copy_subset)) {
2208 *intervals_usable = intervals_received;
2209 *data_usable = data_received;
2210 return;
2211 }
2212
2213 intervals_usable->intersection_of(copy_subset,
2214 intervals_received);
2215
2216 uint64_t off = 0;
2217 for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
2218 p != intervals_received.end();
2219 ++p) {
2220 interval_set<uint64_t> x;
2221 x.insert(p.get_start(), p.get_len());
2222 x.intersection_of(copy_subset);
2223 for (interval_set<uint64_t>::const_iterator q = x.begin();
2224 q != x.end();
2225 ++q) {
2226 bufferlist sub;
2227 uint64_t data_off = off + (q.get_start() - p.get_start());
2228 sub.substr_of(data_received, data_off, q.get_len());
2229 data_usable->claim_append(sub);
2230 }
2231 off += p.get_len();
2232 }
2233}
2234
224ce89b 2235void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid)
7c673cae 2236{
224ce89b 2237 dout(20) << __func__ << ": " << soid << " from " << from << dendl;
7c673cae
FG
2238 list<pg_shard_t> fl = { from };
2239 get_parent()->failed_push(fl, soid);
2240
2241 clear_pull(pulling.find(soid));
2242}
2243
2244void ReplicatedBackend::clear_pull_from(
2245 map<hobject_t, PullInfo>::iterator piter)
2246{
2247 auto from = piter->second.from;
2248 pull_from_peer[from].erase(piter->second.soid);
2249 if (pull_from_peer[from].empty())
2250 pull_from_peer.erase(from);
2251}
2252
2253void ReplicatedBackend::clear_pull(
2254 map<hobject_t, PullInfo>::iterator piter,
2255 bool clear_pull_from_peer)
2256{
2257 if (clear_pull_from_peer) {
2258 clear_pull_from(piter);
2259 }
2260 get_parent()->release_locks(piter->second.lock_manager);
2261 pulling.erase(piter);
2262}
2263
2264int ReplicatedBackend::start_pushes(
2265 const hobject_t &soid,
2266 ObjectContextRef obc,
2267 RPGHandle *h)
2268{
224ce89b
WB
2269 list< map<pg_shard_t, pg_missing_t>::const_iterator > shards;
2270
2271 dout(20) << __func__ << " soid " << soid << dendl;
7c673cae
FG
2272 // who needs it?
2273 assert(get_parent()->get_actingbackfill_shards().size() > 0);
2274 for (set<pg_shard_t>::iterator i =
2275 get_parent()->get_actingbackfill_shards().begin();
2276 i != get_parent()->get_actingbackfill_shards().end();
2277 ++i) {
2278 if (*i == get_parent()->whoami_shard()) continue;
2279 pg_shard_t peer = *i;
2280 map<pg_shard_t, pg_missing_t>::const_iterator j =
2281 get_parent()->get_shard_missing().find(peer);
2282 assert(j != get_parent()->get_shard_missing().end());
2283 if (j->second.is_missing(soid)) {
224ce89b
WB
2284 shards.push_back(j);
2285 }
2286 }
2287
2288 // If more than 1 read will occur ignore possible request to not cache
2289 bool cache = shards.size() == 1 ? h->cache_dont_need : false;
2290
2291 for (auto j : shards) {
2292 pg_shard_t peer = j->first;
2293 h->pushes[peer].push_back(PushOp());
2294 int r = prep_push_to_replica(obc, soid, peer,
2295 &(h->pushes[peer].back()), cache);
2296 if (r < 0) {
2297 // Back out all failed reads
2298 for (auto k : shards) {
2299 pg_shard_t p = k->first;
2300 dout(10) << __func__ << " clean up peer " << p << dendl;
2301 h->pushes[p].pop_back();
2302 if (p == peer) break;
2303 }
2304 return r;
7c673cae
FG
2305 }
2306 }
224ce89b 2307 return shards.size();
7c673cae 2308}