]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/ReplicatedBackend.cc
update sources to 12.2.10
[ceph.git] / ceph / src / osd / ReplicatedBackend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #include "common/errno.h"
15 #include "ReplicatedBackend.h"
16 #include "messages/MOSDOp.h"
17 #include "messages/MOSDSubOp.h"
18 #include "messages/MOSDRepOp.h"
19 #include "messages/MOSDSubOpReply.h"
20 #include "messages/MOSDRepOpReply.h"
21 #include "messages/MOSDPGPush.h"
22 #include "messages/MOSDPGPull.h"
23 #include "messages/MOSDPGPushReply.h"
24 #include "common/EventTrace.h"
25
26 #define dout_context cct
27 #define dout_subsys ceph_subsys_osd
28 #define DOUT_PREFIX_ARGS this
29 #undef dout_prefix
30 #define dout_prefix _prefix(_dout, this)
31 static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
32 return *_dout << pgb->get_parent()->gen_dbg_prefix();
33 }
34
35 namespace {
36 class PG_SendMessageOnConn: public Context {
37 PGBackend::Listener *pg;
38 Message *reply;
39 ConnectionRef conn;
40 public:
41 PG_SendMessageOnConn(
42 PGBackend::Listener *pg,
43 Message *reply,
44 ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {}
45 void finish(int) override {
46 pg->send_message_osd_cluster(reply, conn.get());
47 }
48 };
49
50 class PG_RecoveryQueueAsync : public Context {
51 PGBackend::Listener *pg;
52 unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
53 public:
54 PG_RecoveryQueueAsync(
55 PGBackend::Listener *pg,
56 GenContext<ThreadPool::TPHandle&> *c) : pg(pg), c(c) {}
57 void finish(int) override {
58 pg->schedule_recovery_work(c.release());
59 }
60 };
61 }
62
63 struct ReplicatedBackend::C_OSD_RepModifyApply : public Context {
64 ReplicatedBackend *pg;
65 RepModifyRef rm;
66 C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r)
67 : pg(pg), rm(r) {}
68 void finish(int r) override {
69 pg->repop_applied(rm);
70 }
71 };
72
73 struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context {
74 ReplicatedBackend *pg;
75 RepModifyRef rm;
76 C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
77 : pg(pg), rm(r) {}
78 void finish(int r) override {
79 pg->repop_commit(rm);
80 }
81 };
82
83 static void log_subop_stats(
84 PerfCounters *logger,
85 OpRequestRef op, int subop)
86 {
87 utime_t now = ceph_clock_now();
88 utime_t latency = now;
89 latency -= op->get_req()->get_recv_stamp();
90
91
92 logger->inc(l_osd_sop);
93 logger->tinc(l_osd_sop_lat, latency);
94 logger->inc(subop);
95
96 if (subop != l_osd_sop_pull) {
97 uint64_t inb = op->get_req()->get_data().length();
98 logger->inc(l_osd_sop_inb, inb);
99 if (subop == l_osd_sop_w) {
100 logger->inc(l_osd_sop_w_inb, inb);
101 logger->tinc(l_osd_sop_w_lat, latency);
102 } else if (subop == l_osd_sop_push) {
103 logger->inc(l_osd_sop_push_inb, inb);
104 logger->tinc(l_osd_sop_push_lat, latency);
105 } else
106 assert("no support subop" == 0);
107 } else {
108 logger->tinc(l_osd_sop_pull_lat, latency);
109 }
110 }
111
112 ReplicatedBackend::ReplicatedBackend(
113 PGBackend::Listener *pg,
114 coll_t coll,
115 ObjectStore::CollectionHandle &c,
116 ObjectStore *store,
117 CephContext *cct) :
118 PGBackend(cct, pg, store, coll, c) {}
119
120 void ReplicatedBackend::run_recovery_op(
121 PGBackend::RecoveryHandle *_h,
122 int priority)
123 {
124 RPGHandle *h = static_cast<RPGHandle *>(_h);
125 send_pushes(priority, h->pushes);
126 send_pulls(priority, h->pulls);
127 send_recovery_deletes(priority, h->deletes);
128 delete h;
129 }
130
131 int ReplicatedBackend::recover_object(
132 const hobject_t &hoid,
133 eversion_t v,
134 ObjectContextRef head,
135 ObjectContextRef obc,
136 RecoveryHandle *_h
137 )
138 {
139 dout(10) << __func__ << ": " << hoid << dendl;
140 RPGHandle *h = static_cast<RPGHandle *>(_h);
141 if (get_parent()->get_local_missing().is_missing(hoid)) {
142 assert(!obc);
143 // pull
144 prepare_pull(
145 v,
146 hoid,
147 head,
148 h);
149 } else {
150 assert(obc);
151 int started = start_pushes(
152 hoid,
153 obc,
154 h);
155 if (started < 0) {
156 pushing[hoid].clear();
157 return started;
158 }
159 }
160 return 0;
161 }
162
163 void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
164 {
165 for(map<pg_shard_t, set<hobject_t> >::iterator i = pull_from_peer.begin();
166 i != pull_from_peer.end();
167 ) {
168 if (osdmap->is_down(i->first.osd)) {
169 dout(10) << "check_recovery_sources resetting pulls from osd." << i->first
170 << ", osdmap has it marked down" << dendl;
171 for (set<hobject_t>::iterator j = i->second.begin();
172 j != i->second.end();
173 ++j) {
174 get_parent()->cancel_pull(*j);
175 clear_pull(pulling.find(*j), false);
176 }
177 pull_from_peer.erase(i++);
178 } else {
179 ++i;
180 }
181 }
182 }
183
184 bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op)
185 {
186 dout(10) << __func__ << ": " << op << dendl;
187 switch (op->get_req()->get_type()) {
188 case MSG_OSD_PG_PULL:
189 return true;
190 default:
191 return false;
192 }
193 }
194
195 bool ReplicatedBackend::_handle_message(
196 OpRequestRef op
197 )
198 {
199 dout(10) << __func__ << ": " << op << dendl;
200 switch (op->get_req()->get_type()) {
201 case MSG_OSD_PG_PUSH:
202 do_push(op);
203 return true;
204
205 case MSG_OSD_PG_PULL:
206 do_pull(op);
207 return true;
208
209 case MSG_OSD_PG_PUSH_REPLY:
210 do_push_reply(op);
211 return true;
212
213 case MSG_OSD_SUBOP: {
214 const MOSDSubOp *m = static_cast<const MOSDSubOp*>(op->get_req());
215 if (m->ops.size() == 0) {
216 assert(0);
217 }
218 break;
219 }
220
221 case MSG_OSD_REPOP: {
222 do_repop(op);
223 return true;
224 }
225
226 case MSG_OSD_REPOPREPLY: {
227 do_repop_reply(op);
228 return true;
229 }
230
231 default:
232 break;
233 }
234 return false;
235 }
236
237 void ReplicatedBackend::clear_recovery_state()
238 {
239 // clear pushing/pulling maps
240 for (auto &&i: pushing) {
241 for (auto &&j: i.second) {
242 get_parent()->release_locks(j.second.lock_manager);
243 }
244 }
245 pushing.clear();
246
247 for (auto &&i: pulling) {
248 get_parent()->release_locks(i.second.lock_manager);
249 }
250 pulling.clear();
251 pull_from_peer.clear();
252 }
253
254 void ReplicatedBackend::on_change()
255 {
256 dout(10) << __func__ << dendl;
257 for (map<ceph_tid_t, InProgressOp>::iterator i = in_progress_ops.begin();
258 i != in_progress_ops.end();
259 in_progress_ops.erase(i++)) {
260 if (i->second.on_commit)
261 delete i->second.on_commit;
262 if (i->second.on_applied)
263 delete i->second.on_applied;
264 }
265 clear_recovery_state();
266 }
267
268 void ReplicatedBackend::on_flushed()
269 {
270 }
271
272 int ReplicatedBackend::objects_read_sync(
273 const hobject_t &hoid,
274 uint64_t off,
275 uint64_t len,
276 uint32_t op_flags,
277 bufferlist *bl)
278 {
279 return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags);
280 }
281
282 struct AsyncReadCallback : public GenContext<ThreadPool::TPHandle&> {
283 int r;
284 Context *c;
285 AsyncReadCallback(int r, Context *c) : r(r), c(c) {}
286 void finish(ThreadPool::TPHandle&) override {
287 c->complete(r);
288 c = NULL;
289 }
290 ~AsyncReadCallback() override {
291 delete c;
292 }
293 };
294 void ReplicatedBackend::objects_read_async(
295 const hobject_t &hoid,
296 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
297 pair<bufferlist*, Context*> > > &to_read,
298 Context *on_complete,
299 bool fast_read)
300 {
301 // There is no fast read implementation for replication backend yet
302 assert(!fast_read);
303
304 int r = 0;
305 for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
306 pair<bufferlist*, Context*> > >::const_iterator i =
307 to_read.begin();
308 i != to_read.end() && r >= 0;
309 ++i) {
310 int _r = store->read(ch, ghobject_t(hoid), i->first.get<0>(),
311 i->first.get<1>(), *(i->second.first),
312 i->first.get<2>());
313 if (i->second.second) {
314 get_parent()->schedule_recovery_work(
315 get_parent()->bless_gencontext(
316 new AsyncReadCallback(_r, i->second.second)));
317 }
318 if (_r < 0)
319 r = _r;
320 }
321 get_parent()->schedule_recovery_work(
322 get_parent()->bless_gencontext(
323 new AsyncReadCallback(r, on_complete)));
324 }
325
326 class C_OSD_OnOpCommit : public Context {
327 ReplicatedBackend *pg;
328 ReplicatedBackend::InProgressOp *op;
329 public:
330 C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
331 : pg(pg), op(op) {}
332 void finish(int) override {
333 pg->op_commit(op);
334 }
335 };
336
337 class C_OSD_OnOpApplied : public Context {
338 ReplicatedBackend *pg;
339 ReplicatedBackend::InProgressOp *op;
340 public:
341 C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
342 : pg(pg), op(op) {}
343 void finish(int) override {
344 pg->op_applied(op);
345 }
346 };
347
348 void generate_transaction(
349 PGTransactionUPtr &pgt,
350 const coll_t &coll,
351 bool legacy_log_entries,
352 vector<pg_log_entry_t> &log_entries,
353 ObjectStore::Transaction *t,
354 set<hobject_t> *added,
355 set<hobject_t> *removed)
356 {
357 assert(t);
358 assert(added);
359 assert(removed);
360
361 for (auto &&le: log_entries) {
362 le.mark_unrollbackable();
363 auto oiter = pgt->op_map.find(le.soid);
364 if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) {
365 bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8);
366 ::encode(oiter->second.updated_snaps->second, bl);
367 le.snaps.swap(bl);
368 le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
369 }
370 }
371
372 pgt->safe_create_traverse(
373 [&](pair<const hobject_t, PGTransaction::ObjectOperation> &obj_op) {
374 const hobject_t &oid = obj_op.first;
375 const ghobject_t goid =
376 ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
377 const PGTransaction::ObjectOperation &op = obj_op.second;
378
379 if (oid.is_temp()) {
380 if (op.is_fresh_object()) {
381 added->insert(oid);
382 } else if (op.is_delete()) {
383 removed->insert(oid);
384 }
385 }
386
387 if (op.delete_first) {
388 t->remove(coll, goid);
389 }
390
391 match(
392 op.init_type,
393 [&](const PGTransaction::ObjectOperation::Init::None &) {
394 },
395 [&](const PGTransaction::ObjectOperation::Init::Create &op) {
396 t->touch(coll, goid);
397 },
398 [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
399 t->clone(
400 coll,
401 ghobject_t(
402 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
403 goid);
404 },
405 [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
406 assert(op.source.is_temp());
407 t->collection_move_rename(
408 coll,
409 ghobject_t(
410 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
411 coll,
412 goid);
413 });
414
415 if (op.truncate) {
416 t->truncate(coll, goid, op.truncate->first);
417 if (op.truncate->first != op.truncate->second)
418 t->truncate(coll, goid, op.truncate->second);
419 }
420
421 if (!op.attr_updates.empty()) {
422 map<string, bufferlist> attrs;
423 for (auto &&p: op.attr_updates) {
424 if (p.second)
425 attrs[p.first] = *(p.second);
426 else
427 t->rmattr(coll, goid, p.first);
428 }
429 t->setattrs(coll, goid, attrs);
430 }
431
432 if (op.clear_omap)
433 t->omap_clear(coll, goid);
434 if (op.omap_header)
435 t->omap_setheader(coll, goid, *(op.omap_header));
436
437 for (auto &&up: op.omap_updates) {
438 using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType;
439 switch (up.first) {
440 case UpdateType::Remove:
441 t->omap_rmkeys(coll, goid, up.second);
442 break;
443 case UpdateType::Insert:
444 t->omap_setkeys(coll, goid, up.second);
445 break;
446 }
447 }
448
449 // updated_snaps doesn't matter since we marked unrollbackable
450
451 if (op.alloc_hint) {
452 auto &hint = *(op.alloc_hint);
453 t->set_alloc_hint(
454 coll,
455 goid,
456 hint.expected_object_size,
457 hint.expected_write_size,
458 hint.flags);
459 }
460
461 for (auto &&extent: op.buffer_updates) {
462 using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
463 match(
464 extent.get_val(),
465 [&](const BufferUpdate::Write &op) {
466 t->write(
467 coll,
468 goid,
469 extent.get_off(),
470 extent.get_len(),
471 op.buffer);
472 },
473 [&](const BufferUpdate::Zero &op) {
474 t->zero(
475 coll,
476 goid,
477 extent.get_off(),
478 extent.get_len());
479 },
480 [&](const BufferUpdate::CloneRange &op) {
481 assert(op.len == extent.get_len());
482 t->clone_range(
483 coll,
484 ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
485 goid,
486 op.offset,
487 extent.get_len(),
488 extent.get_off());
489 });
490 }
491 });
492 }
493
494 void ReplicatedBackend::submit_transaction(
495 const hobject_t &soid,
496 const object_stat_sum_t &delta_stats,
497 const eversion_t &at_version,
498 PGTransactionUPtr &&_t,
499 const eversion_t &trim_to,
500 const eversion_t &roll_forward_to,
501 const vector<pg_log_entry_t> &_log_entries,
502 boost::optional<pg_hit_set_history_t> &hset_history,
503 Context *on_local_applied_sync,
504 Context *on_all_acked,
505 Context *on_all_commit,
506 ceph_tid_t tid,
507 osd_reqid_t reqid,
508 OpRequestRef orig_op)
509 {
510 parent->apply_stats(
511 soid,
512 delta_stats);
513
514 vector<pg_log_entry_t> log_entries(_log_entries);
515 ObjectStore::Transaction op_t;
516 PGTransactionUPtr t(std::move(_t));
517 set<hobject_t> added, removed;
518 generate_transaction(
519 t,
520 coll,
521 (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
522 log_entries,
523 &op_t,
524 &added,
525 &removed);
526 assert(added.size() <= 1);
527 assert(removed.size() <= 1);
528
529 assert(!in_progress_ops.count(tid));
530 InProgressOp &op = in_progress_ops.insert(
531 make_pair(
532 tid,
533 InProgressOp(
534 tid, on_all_commit, on_all_acked,
535 orig_op, at_version)
536 )
537 ).first->second;
538
539 op.waiting_for_applied.insert(
540 parent->get_actingbackfill_shards().begin(),
541 parent->get_actingbackfill_shards().end());
542 op.waiting_for_commit.insert(
543 parent->get_actingbackfill_shards().begin(),
544 parent->get_actingbackfill_shards().end());
545
546 issue_op(
547 soid,
548 at_version,
549 tid,
550 reqid,
551 trim_to,
552 at_version,
553 added.size() ? *(added.begin()) : hobject_t(),
554 removed.size() ? *(removed.begin()) : hobject_t(),
555 log_entries,
556 hset_history,
557 &op,
558 op_t);
559
560 add_temp_objs(added);
561 clear_temp_objs(removed);
562
563 parent->log_operation(
564 log_entries,
565 hset_history,
566 trim_to,
567 at_version,
568 true,
569 op_t);
570
571 op_t.register_on_applied_sync(on_local_applied_sync);
572 op_t.register_on_applied(
573 parent->bless_context(
574 new C_OSD_OnOpApplied(this, &op)));
575 op_t.register_on_commit(
576 parent->bless_context(
577 new C_OSD_OnOpCommit(this, &op)));
578
579 vector<ObjectStore::Transaction> tls;
580 tls.push_back(std::move(op_t));
581
582 parent->queue_transactions(tls, op.op);
583 }
584
585 void ReplicatedBackend::op_applied(
586 InProgressOp *op)
587 {
588 FUNCTRACE();
589 OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_APPLIED_BEGIN", true);
590 dout(10) << __func__ << ": " << op->tid << dendl;
591 if (op->op) {
592 op->op->mark_event("op_applied");
593 op->op->pg_trace.event("op applied");
594 }
595
596 op->waiting_for_applied.erase(get_parent()->whoami_shard());
597 parent->op_applied(op->v);
598
599 if (op->waiting_for_applied.empty()) {
600 op->on_applied->complete(0);
601 op->on_applied = 0;
602 }
603 if (op->done()) {
604 assert(!op->on_commit && !op->on_applied);
605 in_progress_ops.erase(op->tid);
606 }
607 }
608
609 void ReplicatedBackend::op_commit(
610 InProgressOp *op)
611 {
612 FUNCTRACE();
613 OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_COMMIT_BEGIN", true);
614 dout(10) << __func__ << ": " << op->tid << dendl;
615 if (op->op) {
616 op->op->mark_event("op_commit");
617 op->op->pg_trace.event("op commit");
618 }
619
620 op->waiting_for_commit.erase(get_parent()->whoami_shard());
621
622 if (op->waiting_for_commit.empty()) {
623 op->on_commit->complete(0);
624 op->on_commit = 0;
625 }
626 if (op->done()) {
627 assert(!op->on_commit && !op->on_applied);
628 in_progress_ops.erase(op->tid);
629 }
630 }
631
632 void ReplicatedBackend::do_repop_reply(OpRequestRef op)
633 {
634 static_cast<MOSDRepOpReply*>(op->get_nonconst_req())->finish_decode();
635 const MOSDRepOpReply *r = static_cast<const MOSDRepOpReply *>(op->get_req());
636 assert(r->get_header().type == MSG_OSD_REPOPREPLY);
637
638 op->mark_started();
639
640 // must be replication.
641 ceph_tid_t rep_tid = r->get_tid();
642 pg_shard_t from = r->from;
643
644 if (in_progress_ops.count(rep_tid)) {
645 map<ceph_tid_t, InProgressOp>::iterator iter =
646 in_progress_ops.find(rep_tid);
647 InProgressOp &ip_op = iter->second;
648 const MOSDOp *m = NULL;
649 if (ip_op.op)
650 m = static_cast<const MOSDOp *>(ip_op.op->get_req());
651
652 if (m)
653 dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
654 << " ack_type " << (int)r->ack_type
655 << " from " << from
656 << dendl;
657 else
658 dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) "
659 << " ack_type " << (int)r->ack_type
660 << " from " << from
661 << dendl;
662
663 // oh, good.
664
665 if (r->ack_type & CEPH_OSD_FLAG_ONDISK) {
666 assert(ip_op.waiting_for_commit.count(from));
667 ip_op.waiting_for_commit.erase(from);
668 if (ip_op.op) {
669 ostringstream ss;
670 ss << "sub_op_commit_rec from " << from;
671 ip_op.op->mark_event_string(ss.str());
672 ip_op.op->pg_trace.event("sub_op_commit_rec");
673 }
674 } else {
675 assert(ip_op.waiting_for_applied.count(from));
676 if (ip_op.op) {
677 ostringstream ss;
678 ss << "sub_op_applied_rec from " << from;
679 ip_op.op->mark_event_string(ss.str());
680 ip_op.op->pg_trace.event("sub_op_applied_rec");
681 }
682 }
683 ip_op.waiting_for_applied.erase(from);
684
685 parent->update_peer_last_complete_ondisk(
686 from,
687 r->get_last_complete_ondisk());
688
689 if (ip_op.waiting_for_applied.empty() &&
690 ip_op.on_applied) {
691 ip_op.on_applied->complete(0);
692 ip_op.on_applied = 0;
693 }
694 if (ip_op.waiting_for_commit.empty() &&
695 ip_op.on_commit) {
696 ip_op.on_commit->complete(0);
697 ip_op.on_commit= 0;
698 }
699 if (ip_op.done()) {
700 assert(!ip_op.on_commit && !ip_op.on_applied);
701 in_progress_ops.erase(iter);
702 }
703 }
704 }
705
706 int ReplicatedBackend::be_deep_scrub(
707 const hobject_t &poid,
708 ScrubMap &map,
709 ScrubMapBuilder &pos,
710 ScrubMap::object &o)
711 {
712 dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
713 int r;
714 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
715 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED |
716 CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE;
717
718 utime_t sleeptime;
719 sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
720 if (sleeptime != utime_t()) {
721 lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
722 sleeptime.sleep();
723 }
724
725 assert(poid == pos.ls[pos.pos]);
726 if (!pos.data_done()) {
727 if (pos.data_pos == 0) {
728 pos.data_hash = bufferhash(-1);
729 }
730
731 bufferlist bl;
732 r = store->read(
733 ch,
734 ghobject_t(
735 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
736 pos.data_pos,
737 cct->_conf->osd_deep_scrub_stride, bl,
738 fadvise_flags);
739 if (r < 0) {
740 dout(20) << __func__ << " " << poid << " got "
741 << r << " on read, read_error" << dendl;
742 o.read_error = true;
743 return 0;
744 }
745 if (r > 0) {
746 pos.data_hash << bl;
747 }
748 pos.data_pos += r;
749 if (r == cct->_conf->osd_deep_scrub_stride) {
750 dout(20) << __func__ << " " << poid << " more data, digest so far 0x"
751 << std::hex << pos.data_hash.digest() << std::dec << dendl;
752 return -EINPROGRESS;
753 }
754 // done with bytes
755 pos.data_pos = -1;
756 o.digest = pos.data_hash.digest();
757 o.digest_present = true;
758 dout(20) << __func__ << " " << poid << " done with data, digest 0x"
759 << std::hex << o.digest << std::dec << dendl;
760 }
761
762 // omap header
763 if (pos.omap_pos.empty()) {
764 pos.omap_hash = bufferhash(-1);
765
766 bufferlist hdrbl;
767 r = store->omap_get_header(
768 coll,
769 ghobject_t(
770 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
771 &hdrbl, true);
772 if (r == -EIO) {
773 dout(20) << __func__ << " " << poid << " got "
774 << r << " on omap header read, read_error" << dendl;
775 o.read_error = true;
776 return 0;
777 }
778 if (r == 0 && hdrbl.length()) {
779 dout(25) << "CRC header " << string(hdrbl.c_str(), hdrbl.length())
780 << dendl;
781 pos.omap_hash << hdrbl;
782 }
783 }
784
785 // omap
786 ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(
787 coll,
788 ghobject_t(
789 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
790 assert(iter);
791 if (pos.omap_pos.length()) {
792 iter->lower_bound(pos.omap_pos);
793 } else {
794 iter->seek_to_first();
795 }
796 int max = g_conf->osd_deep_scrub_keys;
797 while (iter->status() == 0 && iter->valid()) {
798 pos.omap_bytes += iter->value().length();
799 ++pos.omap_keys;
800 --max;
801 // fixme: we can do this more efficiently.
802 bufferlist bl;
803 ::encode(iter->key(), bl);
804 ::encode(iter->value(), bl);
805 pos.omap_hash << bl;
806
807 iter->next();
808
809 if (iter->valid() && max == 0) {
810 pos.omap_pos = iter->key();
811 return -EINPROGRESS;
812 }
813 if (iter->status() < 0) {
814 dout(25) << __func__ << " " << poid
815 << " on omap scan, db status error" << dendl;
816 o.read_error = true;
817 return 0;
818 }
819 }
820
821 if (pos.omap_keys > cct->_conf->
822 osd_deep_scrub_large_omap_object_key_threshold ||
823 pos.omap_bytes > cct->_conf->
824 osd_deep_scrub_large_omap_object_value_sum_threshold) {
825 dout(25) << __func__ << " " << poid
826 << " large omap object detected. Object has " << pos.omap_keys
827 << " keys and size " << pos.omap_bytes << " bytes" << dendl;
828 o.large_omap_object_found = true;
829 o.large_omap_object_key_count = pos.omap_keys;
830 o.large_omap_object_value_size = pos.omap_bytes;
831 map.has_large_omap_object_errors = true;
832 }
833
834 o.omap_digest = pos.omap_hash.digest();
835 o.omap_digest_present = true;
836 dout(20) << __func__ << " done with " << poid << " omap_digest "
837 << std::hex << o.omap_digest << std::dec << dendl;
838
839 // done!
840 return 0;
841 }
842
843 void ReplicatedBackend::_do_push(OpRequestRef op)
844 {
845 const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
846 assert(m->get_type() == MSG_OSD_PG_PUSH);
847 pg_shard_t from = m->from;
848
849 op->mark_started();
850
851 vector<PushReplyOp> replies;
852 ObjectStore::Transaction t;
853 ostringstream ss;
854 if (get_parent()->check_failsafe_full(ss)) {
855 dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl;
856 ceph_abort();
857 }
858 for (vector<PushOp>::const_iterator i = m->pushes.begin();
859 i != m->pushes.end();
860 ++i) {
861 replies.push_back(PushReplyOp());
862 handle_push(from, *i, &(replies.back()), &t);
863 }
864
865 MOSDPGPushReply *reply = new MOSDPGPushReply;
866 reply->from = get_parent()->whoami_shard();
867 reply->set_priority(m->get_priority());
868 reply->pgid = get_info().pgid;
869 reply->map_epoch = m->map_epoch;
870 reply->min_epoch = m->min_epoch;
871 reply->replies.swap(replies);
872 reply->compute_cost(cct);
873
874 t.register_on_complete(
875 new PG_SendMessageOnConn(
876 get_parent(), reply, m->get_connection()));
877
878 get_parent()->queue_transaction(std::move(t));
879 }
880
881 struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
882 ReplicatedBackend *bc;
883 list<ReplicatedBackend::pull_complete_info> to_continue;
884 int priority;
885 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
886 : bc(bc), priority(priority) {}
887
888 void finish(ThreadPool::TPHandle &handle) override {
889 ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
890 for (auto &&i: to_continue) {
891 auto j = bc->pulling.find(i.hoid);
892 assert(j != bc->pulling.end());
893 ObjectContextRef obc = j->second.obc;
894 bc->clear_pull(j, false /* already did it */);
895 int started = bc->start_pushes(i.hoid, obc, h);
896 if (started < 0) {
897 bc->pushing[i.hoid].clear();
898 bc->get_parent()->primary_failed(i.hoid);
899 bc->get_parent()->primary_error(i.hoid, obc->obs.oi.version);
900 } else if (!started) {
901 bc->get_parent()->on_global_recover(
902 i.hoid, i.stat, false);
903 }
904 handle.reset_tp_timeout();
905 }
906 bc->run_recovery_op(h, priority);
907 }
908 };
909
910 void ReplicatedBackend::_do_pull_response(OpRequestRef op)
911 {
912 const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
913 assert(m->get_type() == MSG_OSD_PG_PUSH);
914 pg_shard_t from = m->from;
915
916 op->mark_started();
917
918 vector<PullOp> replies(1);
919
920 ostringstream ss;
921 if (get_parent()->check_failsafe_full(ss)) {
922 dout(10) << __func__ << " Out of space (failsafe) processing pull response (push): " << ss.str() << dendl;
923 ceph_abort();
924 }
925
926 ObjectStore::Transaction t;
927 list<pull_complete_info> to_continue;
928 for (vector<PushOp>::const_iterator i = m->pushes.begin();
929 i != m->pushes.end();
930 ++i) {
931 bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, &t);
932 if (more)
933 replies.push_back(PullOp());
934 }
935 if (!to_continue.empty()) {
936 C_ReplicatedBackend_OnPullComplete *c =
937 new C_ReplicatedBackend_OnPullComplete(
938 this,
939 m->get_priority());
940 c->to_continue.swap(to_continue);
941 t.register_on_complete(
942 new PG_RecoveryQueueAsync(
943 get_parent(),
944 get_parent()->bless_gencontext(c)));
945 }
946 replies.erase(replies.end() - 1);
947
948 if (replies.size()) {
949 MOSDPGPull *reply = new MOSDPGPull;
950 reply->from = parent->whoami_shard();
951 reply->set_priority(m->get_priority());
952 reply->pgid = get_info().pgid;
953 reply->map_epoch = m->map_epoch;
954 reply->min_epoch = m->min_epoch;
955 reply->set_pulls(&replies);
956 reply->compute_cost(cct);
957
958 t.register_on_complete(
959 new PG_SendMessageOnConn(
960 get_parent(), reply, m->get_connection()));
961 }
962
963 get_parent()->queue_transaction(std::move(t));
964 }
965
966 void ReplicatedBackend::do_pull(OpRequestRef op)
967 {
968 MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_nonconst_req());
969 assert(m->get_type() == MSG_OSD_PG_PULL);
970 pg_shard_t from = m->from;
971
972 map<pg_shard_t, vector<PushOp> > replies;
973 vector<PullOp> pulls;
974 m->take_pulls(&pulls);
975 for (auto& i : pulls) {
976 replies[from].push_back(PushOp());
977 handle_pull(from, i, &(replies[from].back()));
978 }
979 send_pushes(m->get_priority(), replies);
980 }
981
982 void ReplicatedBackend::do_push_reply(OpRequestRef op)
983 {
984 const MOSDPGPushReply *m = static_cast<const MOSDPGPushReply *>(op->get_req());
985 assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
986 pg_shard_t from = m->from;
987
988 vector<PushOp> replies(1);
989 for (vector<PushReplyOp>::const_iterator i = m->replies.begin();
990 i != m->replies.end();
991 ++i) {
992 bool more = handle_push_reply(from, *i, &(replies.back()));
993 if (more)
994 replies.push_back(PushOp());
995 }
996 replies.erase(replies.end() - 1);
997
998 map<pg_shard_t, vector<PushOp> > _replies;
999 _replies[from].swap(replies);
1000 send_pushes(m->get_priority(), _replies);
1001 }
1002
1003 Message * ReplicatedBackend::generate_subop(
1004 const hobject_t &soid,
1005 const eversion_t &at_version,
1006 ceph_tid_t tid,
1007 osd_reqid_t reqid,
1008 eversion_t pg_trim_to,
1009 eversion_t pg_roll_forward_to,
1010 hobject_t new_temp_oid,
1011 hobject_t discard_temp_oid,
1012 const vector<pg_log_entry_t> &log_entries,
1013 boost::optional<pg_hit_set_history_t> &hset_hist,
1014 ObjectStore::Transaction &op_t,
1015 pg_shard_t peer,
1016 const pg_info_t &pinfo)
1017 {
1018 int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
1019 // forward the write/update/whatever
1020 MOSDRepOp *wr = new MOSDRepOp(
1021 reqid, parent->whoami_shard(),
1022 spg_t(get_info().pgid.pgid, peer.shard),
1023 soid, acks_wanted,
1024 get_osdmap()->get_epoch(),
1025 parent->get_last_peering_reset_epoch(),
1026 tid, at_version);
1027
1028 // ship resulting transaction, log entries, and pg_stats
1029 if (!parent->should_send_op(peer, soid)) {
1030 dout(10) << "issue_repop shipping empty opt to osd." << peer
1031 <<", object " << soid
1032 << " beyond MAX(last_backfill_started "
1033 << ", pinfo.last_backfill "
1034 << pinfo.last_backfill << ")" << dendl;
1035 ObjectStore::Transaction t;
1036 ::encode(t, wr->get_data());
1037 } else {
1038 ::encode(op_t, wr->get_data());
1039 wr->get_header().data_off = op_t.get_data_alignment();
1040 }
1041
1042 ::encode(log_entries, wr->logbl);
1043
1044 if (pinfo.is_incomplete())
1045 wr->pg_stats = pinfo.stats; // reflects backfill progress
1046 else
1047 wr->pg_stats = get_info().stats;
1048
1049 wr->pg_trim_to = pg_trim_to;
1050 wr->pg_roll_forward_to = pg_roll_forward_to;
1051
1052 wr->new_temp_oid = new_temp_oid;
1053 wr->discard_temp_oid = discard_temp_oid;
1054 wr->updated_hit_set_history = hset_hist;
1055 return wr;
1056 }
1057
1058 void ReplicatedBackend::issue_op(
1059 const hobject_t &soid,
1060 const eversion_t &at_version,
1061 ceph_tid_t tid,
1062 osd_reqid_t reqid,
1063 eversion_t pg_trim_to,
1064 eversion_t pg_roll_forward_to,
1065 hobject_t new_temp_oid,
1066 hobject_t discard_temp_oid,
1067 const vector<pg_log_entry_t> &log_entries,
1068 boost::optional<pg_hit_set_history_t> &hset_hist,
1069 InProgressOp *op,
1070 ObjectStore::Transaction &op_t)
1071 {
1072 if (op->op)
1073 op->op->pg_trace.event("issue replication ops");
1074
1075 if (parent->get_actingbackfill_shards().size() > 1) {
1076 ostringstream ss;
1077 set<pg_shard_t> replicas = parent->get_actingbackfill_shards();
1078 replicas.erase(parent->whoami_shard());
1079 ss << "waiting for subops from " << replicas;
1080 if (op->op)
1081 op->op->mark_sub_op_sent(ss.str());
1082 }
1083 for (set<pg_shard_t>::const_iterator i =
1084 parent->get_actingbackfill_shards().begin();
1085 i != parent->get_actingbackfill_shards().end();
1086 ++i) {
1087 if (*i == parent->whoami_shard()) continue;
1088 pg_shard_t peer = *i;
1089 const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second;
1090
1091 Message *wr;
1092 wr = generate_subop(
1093 soid,
1094 at_version,
1095 tid,
1096 reqid,
1097 pg_trim_to,
1098 pg_roll_forward_to,
1099 new_temp_oid,
1100 discard_temp_oid,
1101 log_entries,
1102 hset_hist,
1103 op_t,
1104 peer,
1105 pinfo);
1106 if (op->op)
1107 wr->trace.init("replicated op", nullptr, &op->op->pg_trace);
1108 get_parent()->send_message_osd_cluster(
1109 peer.osd, wr, get_osdmap()->get_epoch());
1110 }
1111 }
1112
1113 // sub op modify
1114 void ReplicatedBackend::do_repop(OpRequestRef op)
1115 {
1116 static_cast<MOSDRepOp*>(op->get_nonconst_req())->finish_decode();
1117 const MOSDRepOp *m = static_cast<const MOSDRepOp *>(op->get_req());
1118 int msg_type = m->get_type();
1119 assert(MSG_OSD_REPOP == msg_type);
1120
1121 const hobject_t& soid = m->poid;
1122
1123 dout(10) << __func__ << " " << soid
1124 << " v " << m->version
1125 << (m->logbl.length() ? " (transaction)" : " (parallel exec")
1126 << " " << m->logbl.length()
1127 << dendl;
1128
1129 // sanity checks
1130 assert(m->map_epoch >= get_info().history.same_interval_since);
1131
1132 // we better not be missing this.
1133 assert(!parent->get_log().get_missing().is_missing(soid));
1134
1135 parent->maybe_preempt_replica_scrub(soid);
1136
1137 int ackerosd = m->get_source().num();
1138
1139 op->mark_started();
1140
1141 RepModifyRef rm(std::make_shared<RepModify>());
1142 rm->op = op;
1143 rm->ackerosd = ackerosd;
1144 rm->last_complete = get_info().last_complete;
1145 rm->epoch_started = get_osdmap()->get_epoch();
1146
1147 assert(m->logbl.length());
1148 // shipped transaction and log entries
1149 vector<pg_log_entry_t> log;
1150
1151 bufferlist::iterator p = const_cast<bufferlist&>(m->get_data()).begin();
1152 ::decode(rm->opt, p);
1153
1154 if (m->new_temp_oid != hobject_t()) {
1155 dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
1156 add_temp_obj(m->new_temp_oid);
1157 }
1158 if (m->discard_temp_oid != hobject_t()) {
1159 dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
1160 if (rm->opt.empty()) {
1161 dout(10) << __func__ << ": removing object " << m->discard_temp_oid
1162 << " since we won't get the transaction" << dendl;
1163 rm->localt.remove(coll, ghobject_t(m->discard_temp_oid));
1164 }
1165 clear_temp_obj(m->discard_temp_oid);
1166 }
1167
1168 p = const_cast<bufferlist&>(m->logbl).begin();
1169 ::decode(log, p);
1170 rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1171
1172 bool update_snaps = false;
1173 if (!rm->opt.empty()) {
1174 // If the opt is non-empty, we infer we are before
1175 // last_backfill (according to the primary, not our
1176 // not-quite-accurate value), and should update the
1177 // collections now. Otherwise, we do it later on push.
1178 update_snaps = true;
1179 }
1180 parent->update_stats(m->pg_stats);
1181 parent->log_operation(
1182 log,
1183 m->updated_hit_set_history,
1184 m->pg_trim_to,
1185 m->pg_roll_forward_to,
1186 update_snaps,
1187 rm->localt);
1188
1189 rm->opt.register_on_commit(
1190 parent->bless_context(
1191 new C_OSD_RepModifyCommit(this, rm)));
1192 rm->localt.register_on_applied(
1193 parent->bless_context(
1194 new C_OSD_RepModifyApply(this, rm)));
1195 vector<ObjectStore::Transaction> tls;
1196 tls.reserve(2);
1197 tls.push_back(std::move(rm->localt));
1198 tls.push_back(std::move(rm->opt));
1199 parent->queue_transactions(tls, op);
1200 // op is cleaned up by oncommit/onapply when both are executed
1201 }
1202
1203 void ReplicatedBackend::repop_applied(RepModifyRef rm)
1204 {
1205 rm->op->mark_event("sub_op_applied");
1206 rm->applied = true;
1207 rm->op->pg_trace.event("sup_op_applied");
1208
1209 dout(10) << __func__ << " on " << rm << " op "
1210 << *rm->op->get_req() << dendl;
1211 const Message *m = rm->op->get_req();
1212 const MOSDRepOp *req = static_cast<const MOSDRepOp*>(m);
1213 eversion_t version = req->version;
1214
1215 // send ack to acker only if we haven't sent a commit already
1216 if (!rm->committed) {
1217 Message *ack = new MOSDRepOpReply(
1218 req, parent->whoami_shard(),
1219 0, get_osdmap()->get_epoch(), req->min_epoch, CEPH_OSD_FLAG_ACK);
1220 ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
1221 ack->trace = rm->op->pg_trace;
1222 get_parent()->send_message_osd_cluster(
1223 rm->ackerosd, ack, get_osdmap()->get_epoch());
1224 }
1225
1226 parent->op_applied(version);
1227 }
1228
1229 void ReplicatedBackend::repop_commit(RepModifyRef rm)
1230 {
1231 rm->op->mark_commit_sent();
1232 rm->op->pg_trace.event("sup_op_commit");
1233 rm->committed = true;
1234
1235 // send commit.
1236 const MOSDRepOp *m = static_cast<const MOSDRepOp*>(rm->op->get_req());
1237 assert(m->get_type() == MSG_OSD_REPOP);
1238 dout(10) << __func__ << " on op " << *m
1239 << ", sending commit to osd." << rm->ackerosd
1240 << dendl;
1241 assert(get_osdmap()->is_up(rm->ackerosd));
1242
1243 get_parent()->update_last_complete_ondisk(rm->last_complete);
1244
1245 MOSDRepOpReply *reply = new MOSDRepOpReply(
1246 m,
1247 get_parent()->whoami_shard(),
1248 0, get_osdmap()->get_epoch(), m->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
1249 reply->set_last_complete_ondisk(rm->last_complete);
1250 reply->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
1251 reply->trace = rm->op->pg_trace;
1252 get_parent()->send_message_osd_cluster(
1253 rm->ackerosd, reply, get_osdmap()->get_epoch());
1254
1255 log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
1256 }
1257
1258
1259 // ===========================================================
1260
1261 void ReplicatedBackend::calc_head_subsets(
1262 ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
1263 const pg_missing_t& missing,
1264 const hobject_t &last_backfill,
1265 interval_set<uint64_t>& data_subset,
1266 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1267 ObcLockManager &manager)
1268 {
1269 dout(10) << "calc_head_subsets " << head
1270 << " clone_overlap " << snapset.clone_overlap << dendl;
1271
1272 uint64_t size = obc->obs.oi.size;
1273 if (size)
1274 data_subset.insert(0, size);
1275
1276 if (get_parent()->get_pool().allow_incomplete_clones()) {
1277 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1278 return;
1279 }
1280
1281 if (!cct->_conf->osd_recover_clone_overlap) {
1282 dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
1283 return;
1284 }
1285
1286
1287 interval_set<uint64_t> cloning;
1288 interval_set<uint64_t> prev;
1289 if (size)
1290 prev.insert(0, size);
1291
1292 for (int j=snapset.clones.size()-1; j>=0; j--) {
1293 hobject_t c = head;
1294 c.snap = snapset.clones[j];
1295 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1296 if (!missing.is_missing(c) &&
1297 c < last_backfill &&
1298 get_parent()->try_lock_for_read(c, manager)) {
1299 dout(10) << "calc_head_subsets " << head << " has prev " << c
1300 << " overlap " << prev << dendl;
1301 clone_subsets[c] = prev;
1302 cloning.union_of(prev);
1303 break;
1304 }
1305 dout(10) << "calc_head_subsets " << head << " does not have prev " << c
1306 << " overlap " << prev << dendl;
1307 }
1308
1309
1310 if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1311 dout(10) << "skipping clone, too many holes" << dendl;
1312 get_parent()->release_locks(manager);
1313 clone_subsets.clear();
1314 cloning.clear();
1315 }
1316
1317 // what's left for us to push?
1318 data_subset.subtract(cloning);
1319
1320 dout(10) << "calc_head_subsets " << head
1321 << " data_subset " << data_subset
1322 << " clone_subsets " << clone_subsets << dendl;
1323 }
1324
1325 void ReplicatedBackend::calc_clone_subsets(
1326 SnapSet& snapset, const hobject_t& soid,
1327 const pg_missing_t& missing,
1328 const hobject_t &last_backfill,
1329 interval_set<uint64_t>& data_subset,
1330 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1331 ObcLockManager &manager)
1332 {
1333 dout(10) << "calc_clone_subsets " << soid
1334 << " clone_overlap " << snapset.clone_overlap << dendl;
1335
1336 uint64_t size = snapset.clone_size[soid.snap];
1337 if (size)
1338 data_subset.insert(0, size);
1339
1340 if (get_parent()->get_pool().allow_incomplete_clones()) {
1341 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1342 return;
1343 }
1344
1345 if (!cct->_conf->osd_recover_clone_overlap) {
1346 dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
1347 return;
1348 }
1349
1350 unsigned i;
1351 for (i=0; i < snapset.clones.size(); i++)
1352 if (snapset.clones[i] == soid.snap)
1353 break;
1354
1355 // any overlap with next older clone?
1356 interval_set<uint64_t> cloning;
1357 interval_set<uint64_t> prev;
1358 if (size)
1359 prev.insert(0, size);
1360 for (int j=i-1; j>=0; j--) {
1361 hobject_t c = soid;
1362 c.snap = snapset.clones[j];
1363 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1364 if (!missing.is_missing(c) &&
1365 c < last_backfill &&
1366 get_parent()->try_lock_for_read(c, manager)) {
1367 dout(10) << "calc_clone_subsets " << soid << " has prev " << c
1368 << " overlap " << prev << dendl;
1369 clone_subsets[c] = prev;
1370 cloning.union_of(prev);
1371 break;
1372 }
1373 dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
1374 << " overlap " << prev << dendl;
1375 }
1376
1377 // overlap with next newest?
1378 interval_set<uint64_t> next;
1379 if (size)
1380 next.insert(0, size);
1381 for (unsigned j=i+1; j<snapset.clones.size(); j++) {
1382 hobject_t c = soid;
1383 c.snap = snapset.clones[j];
1384 next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
1385 if (!missing.is_missing(c) &&
1386 c < last_backfill &&
1387 get_parent()->try_lock_for_read(c, manager)) {
1388 dout(10) << "calc_clone_subsets " << soid << " has next " << c
1389 << " overlap " << next << dendl;
1390 clone_subsets[c] = next;
1391 cloning.union_of(next);
1392 break;
1393 }
1394 dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
1395 << " overlap " << next << dendl;
1396 }
1397
1398 if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1399 dout(10) << "skipping clone, too many holes" << dendl;
1400 get_parent()->release_locks(manager);
1401 clone_subsets.clear();
1402 cloning.clear();
1403 }
1404
1405
1406 // what's left for us to push?
1407 data_subset.subtract(cloning);
1408
1409 dout(10) << "calc_clone_subsets " << soid
1410 << " data_subset " << data_subset
1411 << " clone_subsets " << clone_subsets << dendl;
1412 }
1413
1414 void ReplicatedBackend::prepare_pull(
1415 eversion_t v,
1416 const hobject_t& soid,
1417 ObjectContextRef headctx,
1418 RPGHandle *h)
1419 {
1420 assert(get_parent()->get_local_missing().get_items().count(soid));
1421 eversion_t _v = get_parent()->get_local_missing().get_items().find(
1422 soid)->second.need;
1423 assert(_v == v);
1424 const map<hobject_t, set<pg_shard_t>> &missing_loc(
1425 get_parent()->get_missing_loc_shards());
1426 const map<pg_shard_t, pg_missing_t > &peer_missing(
1427 get_parent()->get_shard_missing());
1428 map<hobject_t, set<pg_shard_t>>::const_iterator q = missing_loc.find(soid);
1429 assert(q != missing_loc.end());
1430 assert(!q->second.empty());
1431
1432 // pick a pullee
1433 vector<pg_shard_t> shuffle(q->second.begin(), q->second.end());
1434 random_shuffle(shuffle.begin(), shuffle.end());
1435 vector<pg_shard_t>::iterator p = shuffle.begin();
1436 assert(get_osdmap()->is_up(p->osd));
1437 pg_shard_t fromshard = *p;
1438
1439 dout(7) << "pull " << soid
1440 << " v " << v
1441 << " on osds " << q->second
1442 << " from osd." << fromshard
1443 << dendl;
1444
1445 assert(peer_missing.count(fromshard));
1446 const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
1447 if (pmissing.is_missing(soid, v)) {
1448 assert(pmissing.get_items().find(soid)->second.have != v);
1449 dout(10) << "pulling soid " << soid << " from osd " << fromshard
1450 << " at version " << pmissing.get_items().find(soid)->second.have
1451 << " rather than at version " << v << dendl;
1452 v = pmissing.get_items().find(soid)->second.have;
1453 assert(get_parent()->get_log().get_log().objects.count(soid) &&
1454 (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
1455 pg_log_entry_t::LOST_REVERT) &&
1456 (get_parent()->get_log().get_log().objects.find(
1457 soid)->second->reverting_to ==
1458 v));
1459 }
1460
1461 ObjectRecoveryInfo recovery_info;
1462 ObcLockManager lock_manager;
1463
1464 if (soid.is_snap()) {
1465 assert(!get_parent()->get_local_missing().is_missing(
1466 soid.get_head()) ||
1467 !get_parent()->get_local_missing().is_missing(
1468 soid.get_snapdir()));
1469 assert(headctx);
1470 // check snapset
1471 SnapSetContext *ssc = headctx->ssc;
1472 assert(ssc);
1473 dout(10) << " snapset " << ssc->snapset << dendl;
1474 recovery_info.ss = ssc->snapset;
1475 calc_clone_subsets(
1476 ssc->snapset, soid, get_parent()->get_local_missing(),
1477 get_info().last_backfill,
1478 recovery_info.copy_subset,
1479 recovery_info.clone_subset,
1480 lock_manager);
1481 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1482 dout(10) << " pulling " << recovery_info << dendl;
1483
1484 assert(ssc->snapset.clone_size.count(soid.snap));
1485 recovery_info.size = ssc->snapset.clone_size[soid.snap];
1486 } else {
1487 // pulling head or unversioned object.
1488 // always pull the whole thing.
1489 recovery_info.copy_subset.insert(0, (uint64_t)-1);
1490 recovery_info.size = ((uint64_t)-1);
1491 }
1492
1493 h->pulls[fromshard].push_back(PullOp());
1494 PullOp &op = h->pulls[fromshard].back();
1495 op.soid = soid;
1496
1497 op.recovery_info = recovery_info;
1498 op.recovery_info.soid = soid;
1499 op.recovery_info.version = v;
1500 op.recovery_progress.data_complete = false;
1501 op.recovery_progress.omap_complete = false;
1502 op.recovery_progress.data_recovered_to = 0;
1503 op.recovery_progress.first = true;
1504
1505 assert(!pulling.count(soid));
1506 pull_from_peer[fromshard].insert(soid);
1507 PullInfo &pi = pulling[soid];
1508 pi.from = fromshard;
1509 pi.soid = soid;
1510 pi.head_ctx = headctx;
1511 pi.recovery_info = op.recovery_info;
1512 pi.recovery_progress = op.recovery_progress;
1513 pi.cache_dont_need = h->cache_dont_need;
1514 pi.lock_manager = std::move(lock_manager);
1515 }
1516
1517 /*
1518 * intelligently push an object to a replica. make use of existing
1519 * clones/heads and dup data ranges where possible.
1520 */
1521 int ReplicatedBackend::prep_push_to_replica(
1522 ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
1523 PushOp *pop, bool cache_dont_need)
1524 {
1525 const object_info_t& oi = obc->obs.oi;
1526 uint64_t size = obc->obs.oi.size;
1527
1528 dout(10) << __func__ << ": " << soid << " v" << oi.version
1529 << " size " << size << " to osd." << peer << dendl;
1530
1531 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1532 interval_set<uint64_t> data_subset;
1533
1534 ObcLockManager lock_manager;
1535 // are we doing a clone on the replica?
1536 if (soid.snap && soid.snap < CEPH_NOSNAP) {
1537 hobject_t head = soid;
1538 head.snap = CEPH_NOSNAP;
1539
1540 // try to base push off of clones that succeed/preceed poid
1541 // we need the head (and current SnapSet) locally to do that.
1542 if (get_parent()->get_local_missing().is_missing(head)) {
1543 dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
1544 return prep_push(obc, soid, peer, pop, cache_dont_need);
1545 }
1546 hobject_t snapdir = head;
1547 snapdir.snap = CEPH_SNAPDIR;
1548 if (get_parent()->get_local_missing().is_missing(snapdir)) {
1549 dout(15) << "push_to_replica missing snapdir " << snapdir
1550 << ", pushing raw clone" << dendl;
1551 return prep_push(obc, soid, peer, pop, cache_dont_need);
1552 }
1553
1554 SnapSetContext *ssc = obc->ssc;
1555 assert(ssc);
1556 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1557 pop->recovery_info.ss = ssc->snapset;
1558 map<pg_shard_t, pg_missing_t>::const_iterator pm =
1559 get_parent()->get_shard_missing().find(peer);
1560 assert(pm != get_parent()->get_shard_missing().end());
1561 map<pg_shard_t, pg_info_t>::const_iterator pi =
1562 get_parent()->get_shard_info().find(peer);
1563 assert(pi != get_parent()->get_shard_info().end());
1564 calc_clone_subsets(
1565 ssc->snapset, soid,
1566 pm->second,
1567 pi->second.last_backfill,
1568 data_subset, clone_subsets,
1569 lock_manager);
1570 } else if (soid.snap == CEPH_NOSNAP) {
1571 // pushing head or unversioned object.
1572 // base this on partially on replica's clones?
1573 SnapSetContext *ssc = obc->ssc;
1574 assert(ssc);
1575 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1576 calc_head_subsets(
1577 obc,
1578 ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
1579 get_parent()->get_shard_info().find(peer)->second.last_backfill,
1580 data_subset, clone_subsets,
1581 lock_manager);
1582 }
1583
1584 return prep_push(
1585 obc,
1586 soid,
1587 peer,
1588 oi.version,
1589 data_subset,
1590 clone_subsets,
1591 pop,
1592 cache_dont_need,
1593 std::move(lock_manager));
1594 }
1595
1596 int ReplicatedBackend::prep_push(ObjectContextRef obc,
1597 const hobject_t& soid, pg_shard_t peer,
1598 PushOp *pop, bool cache_dont_need)
1599 {
1600 interval_set<uint64_t> data_subset;
1601 if (obc->obs.oi.size)
1602 data_subset.insert(0, obc->obs.oi.size);
1603 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1604
1605 return prep_push(obc, soid, peer,
1606 obc->obs.oi.version, data_subset, clone_subsets,
1607 pop, cache_dont_need, ObcLockManager());
1608 }
1609
1610 int ReplicatedBackend::prep_push(
1611 ObjectContextRef obc,
1612 const hobject_t& soid, pg_shard_t peer,
1613 eversion_t version,
1614 interval_set<uint64_t> &data_subset,
1615 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1616 PushOp *pop,
1617 bool cache_dont_need,
1618 ObcLockManager &&lock_manager)
1619 {
1620 get_parent()->begin_peer_recover(peer, soid);
1621 // take note.
1622 PushInfo &pi = pushing[soid][peer];
1623 pi.obc = obc;
1624 pi.recovery_info.size = obc->obs.oi.size;
1625 pi.recovery_info.copy_subset = data_subset;
1626 pi.recovery_info.clone_subset = clone_subsets;
1627 pi.recovery_info.soid = soid;
1628 pi.recovery_info.oi = obc->obs.oi;
1629 pi.recovery_info.ss = pop->recovery_info.ss;
1630 pi.recovery_info.version = version;
1631 pi.lock_manager = std::move(lock_manager);
1632
1633 ObjectRecoveryProgress new_progress;
1634 int r = build_push_op(pi.recovery_info,
1635 pi.recovery_progress,
1636 &new_progress,
1637 pop,
1638 &(pi.stat), cache_dont_need);
1639 if (r < 0)
1640 return r;
1641 pi.recovery_progress = new_progress;
1642 return 0;
1643 }
1644
1645 void ReplicatedBackend::submit_push_data(
1646 const ObjectRecoveryInfo &recovery_info,
1647 bool first,
1648 bool complete,
1649 bool cache_dont_need,
1650 const interval_set<uint64_t> &intervals_included,
1651 bufferlist data_included,
1652 bufferlist omap_header,
1653 const map<string, bufferlist> &attrs,
1654 const map<string, bufferlist> &omap_entries,
1655 ObjectStore::Transaction *t)
1656 {
1657 hobject_t target_oid;
1658 if (first && complete) {
1659 target_oid = recovery_info.soid;
1660 } else {
1661 target_oid = get_parent()->get_temp_recovery_object(recovery_info.soid,
1662 recovery_info.version);
1663 if (first) {
1664 dout(10) << __func__ << ": Adding oid "
1665 << target_oid << " in the temp collection" << dendl;
1666 add_temp_obj(target_oid);
1667 }
1668 }
1669
1670 if (first) {
1671 t->remove(coll, ghobject_t(target_oid));
1672 t->touch(coll, ghobject_t(target_oid));
1673 t->truncate(coll, ghobject_t(target_oid), recovery_info.size);
1674 if (omap_header.length())
1675 t->omap_setheader(coll, ghobject_t(target_oid), omap_header);
1676
1677 bufferlist bv = attrs.at(OI_ATTR);
1678 object_info_t oi(bv);
1679 t->set_alloc_hint(coll, ghobject_t(target_oid),
1680 oi.expected_object_size,
1681 oi.expected_write_size,
1682 oi.alloc_hint_flags);
1683 }
1684 uint64_t off = 0;
1685 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
1686 if (cache_dont_need)
1687 fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
1688 for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
1689 p != intervals_included.end();
1690 ++p) {
1691 bufferlist bit;
1692 bit.substr_of(data_included, off, p.get_len());
1693 t->write(coll, ghobject_t(target_oid),
1694 p.get_start(), p.get_len(), bit, fadvise_flags);
1695 off += p.get_len();
1696 }
1697
1698 if (!omap_entries.empty())
1699 t->omap_setkeys(coll, ghobject_t(target_oid), omap_entries);
1700 if (!attrs.empty())
1701 t->setattrs(coll, ghobject_t(target_oid), attrs);
1702
1703 if (complete) {
1704 if (!first) {
1705 dout(10) << __func__ << ": Removing oid "
1706 << target_oid << " from the temp collection" << dendl;
1707 clear_temp_obj(target_oid);
1708 t->remove(coll, ghobject_t(recovery_info.soid));
1709 t->collection_move_rename(coll, ghobject_t(target_oid),
1710 coll, ghobject_t(recovery_info.soid));
1711 }
1712
1713 submit_push_complete(recovery_info, t);
1714 }
1715 }
1716
1717 void ReplicatedBackend::submit_push_complete(
1718 const ObjectRecoveryInfo &recovery_info,
1719 ObjectStore::Transaction *t)
1720 {
1721 for (map<hobject_t, interval_set<uint64_t>>::const_iterator p =
1722 recovery_info.clone_subset.begin();
1723 p != recovery_info.clone_subset.end();
1724 ++p) {
1725 for (interval_set<uint64_t>::const_iterator q = p->second.begin();
1726 q != p->second.end();
1727 ++q) {
1728 dout(15) << " clone_range " << p->first << " "
1729 << q.get_start() << "~" << q.get_len() << dendl;
1730 t->clone_range(coll, ghobject_t(p->first), ghobject_t(recovery_info.soid),
1731 q.get_start(), q.get_len(), q.get_start());
1732 }
1733 }
1734 }
1735
1736 ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
1737 const ObjectRecoveryInfo& recovery_info,
1738 SnapSetContext *ssc,
1739 ObcLockManager &manager)
1740 {
1741 if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
1742 return recovery_info;
1743 ObjectRecoveryInfo new_info = recovery_info;
1744 new_info.copy_subset.clear();
1745 new_info.clone_subset.clear();
1746 assert(ssc);
1747 get_parent()->release_locks(manager); // might already have locks
1748 calc_clone_subsets(
1749 ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
1750 get_info().last_backfill,
1751 new_info.copy_subset, new_info.clone_subset,
1752 manager);
1753 return new_info;
1754 }
1755
1756 bool ReplicatedBackend::handle_pull_response(
1757 pg_shard_t from, const PushOp &pop, PullOp *response,
1758 list<pull_complete_info> *to_continue,
1759 ObjectStore::Transaction *t)
1760 {
1761 interval_set<uint64_t> data_included = pop.data_included;
1762 bufferlist data;
1763 data = pop.data;
1764 dout(10) << "handle_pull_response "
1765 << pop.recovery_info
1766 << pop.after_progress
1767 << " data.size() is " << data.length()
1768 << " data_included: " << data_included
1769 << dendl;
1770 if (pop.version == eversion_t()) {
1771 // replica doesn't have it!
1772 _failed_pull(from, pop.soid);
1773 return false;
1774 }
1775
1776 const hobject_t &hoid = pop.soid;
1777 assert((data_included.empty() && data.length() == 0) ||
1778 (!data_included.empty() && data.length() > 0));
1779
1780 auto piter = pulling.find(hoid);
1781 if (piter == pulling.end()) {
1782 return false;
1783 }
1784
1785 PullInfo &pi = piter->second;
1786 if (pi.recovery_info.size == (uint64_t(-1))) {
1787 pi.recovery_info.size = pop.recovery_info.size;
1788 pi.recovery_info.copy_subset.intersection_of(
1789 pop.recovery_info.copy_subset);
1790 }
1791 // If primary doesn't have object info and didn't know version
1792 if (pi.recovery_info.version == eversion_t()) {
1793 pi.recovery_info.version = pop.version;
1794 }
1795
1796 bool first = pi.recovery_progress.first;
1797 if (first) {
1798 // attrs only reference the origin bufferlist (decode from
1799 // MOSDPGPush message) whose size is much greater than attrs in
1800 // recovery. If obc cache it (get_obc maybe cache the attr), this
1801 // causes the whole origin bufferlist would not be free until obc
1802 // is evicted from obc cache. So rebuild the bufferlists before
1803 // cache it.
1804 auto attrset = pop.attrset;
1805 for (auto& a : attrset) {
1806 a.second.rebuild();
1807 }
1808 pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset);
1809 pi.recovery_info.oi = pi.obc->obs.oi;
1810 pi.recovery_info = recalc_subsets(
1811 pi.recovery_info,
1812 pi.obc->ssc,
1813 pi.lock_manager);
1814 }
1815
1816
1817 interval_set<uint64_t> usable_intervals;
1818 bufferlist usable_data;
1819 trim_pushed_data(pi.recovery_info.copy_subset,
1820 data_included,
1821 data,
1822 &usable_intervals,
1823 &usable_data);
1824 data_included = usable_intervals;
1825 data.claim(usable_data);
1826
1827
1828 pi.recovery_progress = pop.after_progress;
1829
1830 dout(10) << "new recovery_info " << pi.recovery_info
1831 << ", new progress " << pi.recovery_progress
1832 << dendl;
1833
1834 bool complete = pi.is_complete();
1835
1836 submit_push_data(pi.recovery_info, first,
1837 complete, pi.cache_dont_need,
1838 data_included, data,
1839 pop.omap_header,
1840 pop.attrset,
1841 pop.omap_entries,
1842 t);
1843
1844 pi.stat.num_keys_recovered += pop.omap_entries.size();
1845 pi.stat.num_bytes_recovered += data.length();
1846
1847 if (complete) {
1848 pi.stat.num_objects_recovered++;
1849 clear_pull_from(piter);
1850 to_continue->push_back({hoid, pi.stat});
1851 get_parent()->on_local_recover(
1852 hoid, pi.recovery_info, pi.obc, false, t);
1853 return false;
1854 } else {
1855 response->soid = pop.soid;
1856 response->recovery_info = pi.recovery_info;
1857 response->recovery_progress = pi.recovery_progress;
1858 return true;
1859 }
1860 }
1861
1862 void ReplicatedBackend::handle_push(
1863 pg_shard_t from, const PushOp &pop, PushReplyOp *response,
1864 ObjectStore::Transaction *t)
1865 {
1866 dout(10) << "handle_push "
1867 << pop.recovery_info
1868 << pop.after_progress
1869 << dendl;
1870 bufferlist data;
1871 data = pop.data;
1872 bool first = pop.before_progress.first;
1873 bool complete = pop.after_progress.data_complete &&
1874 pop.after_progress.omap_complete;
1875
1876 response->soid = pop.recovery_info.soid;
1877 submit_push_data(pop.recovery_info,
1878 first,
1879 complete,
1880 true, // must be replicate
1881 pop.data_included,
1882 data,
1883 pop.omap_header,
1884 pop.attrset,
1885 pop.omap_entries,
1886 t);
1887
1888 if (complete)
1889 get_parent()->on_local_recover(
1890 pop.recovery_info.soid,
1891 pop.recovery_info,
1892 ObjectContextRef(), // ok, is replica
1893 false,
1894 t);
1895 }
1896
1897 void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
1898 {
1899 for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
1900 i != pushes.end();
1901 ++i) {
1902 ConnectionRef con = get_parent()->get_con_osd_cluster(
1903 i->first.osd,
1904 get_osdmap()->get_epoch());
1905 if (!con)
1906 continue;
1907 vector<PushOp>::iterator j = i->second.begin();
1908 while (j != i->second.end()) {
1909 uint64_t cost = 0;
1910 uint64_t pushes = 0;
1911 MOSDPGPush *msg = new MOSDPGPush();
1912 msg->from = get_parent()->whoami_shard();
1913 msg->pgid = get_parent()->primary_spg_t();
1914 msg->map_epoch = get_osdmap()->get_epoch();
1915 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1916 msg->set_priority(prio);
1917 for (;
1918 (j != i->second.end() &&
1919 cost < cct->_conf->osd_max_push_cost &&
1920 pushes < cct->_conf->osd_max_push_objects) ;
1921 ++j) {
1922 dout(20) << __func__ << ": sending push " << *j
1923 << " to osd." << i->first << dendl;
1924 cost += j->cost(cct);
1925 pushes += 1;
1926 msg->pushes.push_back(*j);
1927 }
1928 msg->set_cost(cost);
1929 get_parent()->send_message_osd_cluster(msg, con);
1930 }
1931 }
1932 }
1933
1934 void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
1935 {
1936 for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
1937 i != pulls.end();
1938 ++i) {
1939 ConnectionRef con = get_parent()->get_con_osd_cluster(
1940 i->first.osd,
1941 get_osdmap()->get_epoch());
1942 if (!con)
1943 continue;
1944 dout(20) << __func__ << ": sending pulls " << i->second
1945 << " to osd." << i->first << dendl;
1946 MOSDPGPull *msg = new MOSDPGPull();
1947 msg->from = parent->whoami_shard();
1948 msg->set_priority(prio);
1949 msg->pgid = get_parent()->primary_spg_t();
1950 msg->map_epoch = get_osdmap()->get_epoch();
1951 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1952 msg->set_pulls(&i->second);
1953 msg->compute_cost(cct);
1954 get_parent()->send_message_osd_cluster(msg, con);
1955 }
1956 }
1957
1958 int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
1959 const ObjectRecoveryProgress &progress,
1960 ObjectRecoveryProgress *out_progress,
1961 PushOp *out_op,
1962 object_stat_sum_t *stat,
1963 bool cache_dont_need)
1964 {
1965 ObjectRecoveryProgress _new_progress;
1966 if (!out_progress)
1967 out_progress = &_new_progress;
1968 ObjectRecoveryProgress &new_progress = *out_progress;
1969 new_progress = progress;
1970
1971 dout(7) << __func__ << " " << recovery_info.soid
1972 << " v " << recovery_info.version
1973 << " size " << recovery_info.size
1974 << " recovery_info: " << recovery_info
1975 << dendl;
1976
1977 eversion_t v = recovery_info.version;
1978 if (progress.first) {
1979 int r = store->omap_get_header(coll, ghobject_t(recovery_info.soid), &out_op->omap_header);
1980 if(r < 0) {
1981 dout(1) << __func__ << " get omap header failed: " << cpp_strerror(-r) << dendl;
1982 return r;
1983 }
1984 r = store->getattrs(ch, ghobject_t(recovery_info.soid), out_op->attrset);
1985 if(r < 0) {
1986 dout(1) << __func__ << " getattrs failed: " << cpp_strerror(-r) << dendl;
1987 return r;
1988 }
1989
1990 // Debug
1991 bufferlist bv = out_op->attrset[OI_ATTR];
1992 object_info_t oi;
1993 try {
1994 bufferlist::iterator bliter = bv.begin();
1995 ::decode(oi, bliter);
1996 } catch (...) {
1997 dout(0) << __func__ << ": bad object_info_t: " << recovery_info.soid << dendl;
1998 return -EINVAL;
1999 }
2000
2001 // If requestor didn't know the version, use ours
2002 if (v == eversion_t()) {
2003 v = oi.version;
2004 } else if (oi.version != v) {
2005 get_parent()->clog_error() << get_info().pgid << " push "
2006 << recovery_info.soid << " v "
2007 << recovery_info.version
2008 << " failed because local copy is "
2009 << oi.version;
2010 return -EINVAL;
2011 }
2012
2013 new_progress.first = false;
2014 }
2015 // Once we provide the version subsequent requests will have it, so
2016 // at this point it must be known.
2017 assert(v != eversion_t());
2018
2019 uint64_t available = cct->_conf->osd_recovery_max_chunk;
2020 if (!progress.omap_complete) {
2021 ObjectMap::ObjectMapIterator iter =
2022 store->get_omap_iterator(coll,
2023 ghobject_t(recovery_info.soid));
2024 assert(iter);
2025 for (iter->lower_bound(progress.omap_recovered_to);
2026 iter->valid();
2027 iter->next(false)) {
2028 if (!out_op->omap_entries.empty() &&
2029 ((cct->_conf->osd_recovery_max_omap_entries_per_chunk > 0 &&
2030 out_op->omap_entries.size() >= cct->_conf->osd_recovery_max_omap_entries_per_chunk) ||
2031 available <= iter->key().size() + iter->value().length()))
2032 break;
2033 out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
2034
2035 if ((iter->key().size() + iter->value().length()) <= available)
2036 available -= (iter->key().size() + iter->value().length());
2037 else
2038 available = 0;
2039 }
2040 if (!iter->valid())
2041 new_progress.omap_complete = true;
2042 else
2043 new_progress.omap_recovered_to = iter->key();
2044 }
2045
2046 if (available > 0) {
2047 if (!recovery_info.copy_subset.empty()) {
2048 interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
2049 map<uint64_t, uint64_t> m;
2050 int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0,
2051 copy_subset.range_end(), m);
2052 if (r >= 0) {
2053 interval_set<uint64_t> fiemap_included(m);
2054 copy_subset.intersection_of(fiemap_included);
2055 } else {
2056 // intersection of copy_subset and empty interval_set would be empty anyway
2057 copy_subset.clear();
2058 }
2059
2060 out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
2061 available);
2062 if (out_op->data_included.empty()) // zero filled section, skip to end!
2063 new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
2064 else
2065 new_progress.data_recovered_to = out_op->data_included.range_end();
2066 }
2067 } else {
2068 out_op->data_included.clear();
2069 }
2070
2071 for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
2072 p != out_op->data_included.end();
2073 ++p) {
2074 bufferlist bit;
2075 int r = store->read(ch, ghobject_t(recovery_info.soid),
2076 p.get_start(), p.get_len(), bit,
2077 cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
2078 if (cct->_conf->osd_debug_random_push_read_error &&
2079 (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) {
2080 dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
2081 r = -EIO;
2082 }
2083 if (r < 0) {
2084 return r;
2085 }
2086 if (p.get_len() != bit.length()) {
2087 dout(10) << " extent " << p.get_start() << "~" << p.get_len()
2088 << " is actually " << p.get_start() << "~" << bit.length()
2089 << dendl;
2090 interval_set<uint64_t>::iterator save = p++;
2091 if (bit.length() == 0)
2092 out_op->data_included.erase(save); //Remove this empty interval
2093 else
2094 save.set_len(bit.length());
2095 // Remove any other intervals present
2096 while (p != out_op->data_included.end()) {
2097 interval_set<uint64_t>::iterator save = p++;
2098 out_op->data_included.erase(save);
2099 }
2100 new_progress.data_complete = true;
2101 out_op->data.claim_append(bit);
2102 break;
2103 }
2104 out_op->data.claim_append(bit);
2105 }
2106
2107 if (new_progress.is_complete(recovery_info)) {
2108 new_progress.data_complete = true;
2109 if (stat)
2110 stat->num_objects_recovered++;
2111 }
2112
2113 if (stat) {
2114 stat->num_keys_recovered += out_op->omap_entries.size();
2115 stat->num_bytes_recovered += out_op->data.length();
2116 }
2117
2118 get_parent()->get_logger()->inc(l_osd_push);
2119 get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
2120
2121 // send
2122 out_op->version = v;
2123 out_op->soid = recovery_info.soid;
2124 out_op->recovery_info = recovery_info;
2125 out_op->after_progress = new_progress;
2126 out_op->before_progress = progress;
2127 return 0;
2128 }
2129
2130 void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
2131 {
2132 op->recovery_info.version = eversion_t();
2133 op->version = eversion_t();
2134 op->soid = soid;
2135 }
2136
2137 bool ReplicatedBackend::handle_push_reply(
2138 pg_shard_t peer, const PushReplyOp &op, PushOp *reply)
2139 {
2140 const hobject_t &soid = op.soid;
2141 if (pushing.count(soid) == 0) {
2142 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2143 << ", or anybody else"
2144 << dendl;
2145 return false;
2146 } else if (pushing[soid].count(peer) == 0) {
2147 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2148 << dendl;
2149 return false;
2150 } else {
2151 PushInfo *pi = &pushing[soid][peer];
2152 bool error = pushing[soid].begin()->second.recovery_progress.error;
2153
2154 if (!pi->recovery_progress.data_complete && !error) {
2155 dout(10) << " pushing more from, "
2156 << pi->recovery_progress.data_recovered_to
2157 << " of " << pi->recovery_info.copy_subset << dendl;
2158 ObjectRecoveryProgress new_progress;
2159 int r = build_push_op(
2160 pi->recovery_info,
2161 pi->recovery_progress, &new_progress, reply,
2162 &(pi->stat));
2163 // Handle the case of a read error right after we wrote, which is
2164 // hopefuilly extremely rare.
2165 if (r < 0) {
2166 dout(5) << __func__ << ": oid " << soid << " error " << r << dendl;
2167
2168 error = true;
2169 goto done;
2170 }
2171 pi->recovery_progress = new_progress;
2172 return true;
2173 } else {
2174 // done!
2175 done:
2176 if (!error)
2177 get_parent()->on_peer_recover( peer, soid, pi->recovery_info);
2178
2179 get_parent()->release_locks(pi->lock_manager);
2180 object_stat_sum_t stat = pi->stat;
2181 eversion_t v = pi->recovery_info.version;
2182 pushing[soid].erase(peer);
2183 pi = NULL;
2184
2185 if (pushing[soid].empty()) {
2186 if (!error)
2187 get_parent()->on_global_recover(soid, stat, false);
2188 else
2189 get_parent()->on_primary_error(soid, v);
2190 pushing.erase(soid);
2191 } else {
2192 // This looks weird, but we erased the current peer and need to remember
2193 // the error on any other one, while getting more acks.
2194 if (error)
2195 pushing[soid].begin()->second.recovery_progress.error = true;
2196 dout(10) << "pushed " << soid << ", still waiting for push ack from "
2197 << pushing[soid].size() << " others" << dendl;
2198 }
2199 return false;
2200 }
2201 }
2202 }
2203
2204 void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
2205 {
2206 const hobject_t &soid = op.soid;
2207 struct stat st;
2208 int r = store->stat(ch, ghobject_t(soid), &st);
2209 if (r != 0) {
2210 get_parent()->clog_error() << get_info().pgid << " "
2211 << peer << " tried to pull " << soid
2212 << " but got " << cpp_strerror(-r);
2213 prep_push_op_blank(soid, reply);
2214 } else {
2215 ObjectRecoveryInfo &recovery_info = op.recovery_info;
2216 ObjectRecoveryProgress &progress = op.recovery_progress;
2217 if (progress.first && recovery_info.size == ((uint64_t)-1)) {
2218 // Adjust size and copy_subset
2219 recovery_info.size = st.st_size;
2220 recovery_info.copy_subset.clear();
2221 if (st.st_size)
2222 recovery_info.copy_subset.insert(0, st.st_size);
2223 assert(recovery_info.clone_subset.empty());
2224 }
2225
2226 r = build_push_op(recovery_info, progress, 0, reply);
2227 if (r < 0)
2228 prep_push_op_blank(soid, reply);
2229 }
2230 }
2231
2232 /**
2233 * trim received data to remove what we don't want
2234 *
2235 * @param copy_subset intervals we want
2236 * @param data_included intervals we got
2237 * @param data_recieved data we got
2238 * @param intervals_usable intervals we want to keep
2239 * @param data_usable matching data we want to keep
2240 */
2241 void ReplicatedBackend::trim_pushed_data(
2242 const interval_set<uint64_t> &copy_subset,
2243 const interval_set<uint64_t> &intervals_received,
2244 bufferlist data_received,
2245 interval_set<uint64_t> *intervals_usable,
2246 bufferlist *data_usable)
2247 {
2248 if (intervals_received.subset_of(copy_subset)) {
2249 *intervals_usable = intervals_received;
2250 *data_usable = data_received;
2251 return;
2252 }
2253
2254 intervals_usable->intersection_of(copy_subset,
2255 intervals_received);
2256
2257 uint64_t off = 0;
2258 for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
2259 p != intervals_received.end();
2260 ++p) {
2261 interval_set<uint64_t> x;
2262 x.insert(p.get_start(), p.get_len());
2263 x.intersection_of(copy_subset);
2264 for (interval_set<uint64_t>::const_iterator q = x.begin();
2265 q != x.end();
2266 ++q) {
2267 bufferlist sub;
2268 uint64_t data_off = off + (q.get_start() - p.get_start());
2269 sub.substr_of(data_received, data_off, q.get_len());
2270 data_usable->claim_append(sub);
2271 }
2272 off += p.get_len();
2273 }
2274 }
2275
2276 void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid)
2277 {
2278 dout(20) << __func__ << ": " << soid << " from " << from << dendl;
2279 list<pg_shard_t> fl = { from };
2280 get_parent()->failed_push(fl, soid);
2281
2282 clear_pull(pulling.find(soid));
2283 }
2284
2285 void ReplicatedBackend::clear_pull_from(
2286 map<hobject_t, PullInfo>::iterator piter)
2287 {
2288 auto from = piter->second.from;
2289 pull_from_peer[from].erase(piter->second.soid);
2290 if (pull_from_peer[from].empty())
2291 pull_from_peer.erase(from);
2292 }
2293
2294 void ReplicatedBackend::clear_pull(
2295 map<hobject_t, PullInfo>::iterator piter,
2296 bool clear_pull_from_peer)
2297 {
2298 if (clear_pull_from_peer) {
2299 clear_pull_from(piter);
2300 }
2301 get_parent()->release_locks(piter->second.lock_manager);
2302 pulling.erase(piter);
2303 }
2304
2305 int ReplicatedBackend::start_pushes(
2306 const hobject_t &soid,
2307 ObjectContextRef obc,
2308 RPGHandle *h)
2309 {
2310 list< map<pg_shard_t, pg_missing_t>::const_iterator > shards;
2311
2312 dout(20) << __func__ << " soid " << soid << dendl;
2313 // who needs it?
2314 assert(get_parent()->get_actingbackfill_shards().size() > 0);
2315 for (set<pg_shard_t>::iterator i =
2316 get_parent()->get_actingbackfill_shards().begin();
2317 i != get_parent()->get_actingbackfill_shards().end();
2318 ++i) {
2319 if (*i == get_parent()->whoami_shard()) continue;
2320 pg_shard_t peer = *i;
2321 map<pg_shard_t, pg_missing_t>::const_iterator j =
2322 get_parent()->get_shard_missing().find(peer);
2323 assert(j != get_parent()->get_shard_missing().end());
2324 if (j->second.is_missing(soid)) {
2325 shards.push_back(j);
2326 }
2327 }
2328
2329 // If more than 1 read will occur ignore possible request to not cache
2330 bool cache = shards.size() == 1 ? h->cache_dont_need : false;
2331
2332 for (auto j : shards) {
2333 pg_shard_t peer = j->first;
2334 h->pushes[peer].push_back(PushOp());
2335 int r = prep_push_to_replica(obc, soid, peer,
2336 &(h->pushes[peer].back()), cache);
2337 if (r < 0) {
2338 // Back out all failed reads
2339 for (auto k : shards) {
2340 pg_shard_t p = k->first;
2341 dout(10) << __func__ << " clean up peer " << p << dendl;
2342 h->pushes[p].pop_back();
2343 if (p == peer) break;
2344 }
2345 return r;
2346 }
2347 }
2348 return shards.size();
2349 }