]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/ReplicatedBackend.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / osd / ReplicatedBackend.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14#include "common/errno.h"
15#include "ReplicatedBackend.h"
16#include "messages/MOSDOp.h"
7c673cae 17#include "messages/MOSDRepOp.h"
7c673cae
FG
18#include "messages/MOSDRepOpReply.h"
19#include "messages/MOSDPGPush.h"
20#include "messages/MOSDPGPull.h"
21#include "messages/MOSDPGPushReply.h"
22#include "common/EventTrace.h"
11fdf7f2
TL
23#include "include/random.h"
24#include "OSD.h"
7c673cae
FG
25
26#define dout_context cct
27#define dout_subsys ceph_subsys_osd
28#define DOUT_PREFIX_ARGS this
29#undef dout_prefix
30#define dout_prefix _prefix(_dout, this)
31static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
11fdf7f2 32 return pgb->get_parent()->gen_dbg_prefix(*_dout);
7c673cae
FG
33}
34
35namespace {
36class PG_SendMessageOnConn: public Context {
37 PGBackend::Listener *pg;
38 Message *reply;
39 ConnectionRef conn;
40 public:
41 PG_SendMessageOnConn(
42 PGBackend::Listener *pg,
43 Message *reply,
44 ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {}
45 void finish(int) override {
46 pg->send_message_osd_cluster(reply, conn.get());
47 }
48};
49
50class PG_RecoveryQueueAsync : public Context {
51 PGBackend::Listener *pg;
52 unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
53 public:
54 PG_RecoveryQueueAsync(
55 PGBackend::Listener *pg,
56 GenContext<ThreadPool::TPHandle&> *c) : pg(pg), c(c) {}
57 void finish(int) override {
58 pg->schedule_recovery_work(c.release());
59 }
60};
61}
62
7c673cae
FG
63struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context {
64 ReplicatedBackend *pg;
65 RepModifyRef rm;
66 C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
67 : pg(pg), rm(r) {}
68 void finish(int r) override {
69 pg->repop_commit(rm);
70 }
71};
72
73static void log_subop_stats(
74 PerfCounters *logger,
75 OpRequestRef op, int subop)
76{
77 utime_t now = ceph_clock_now();
78 utime_t latency = now;
79 latency -= op->get_req()->get_recv_stamp();
80
81
82 logger->inc(l_osd_sop);
83 logger->tinc(l_osd_sop_lat, latency);
84 logger->inc(subop);
85
86 if (subop != l_osd_sop_pull) {
87 uint64_t inb = op->get_req()->get_data().length();
88 logger->inc(l_osd_sop_inb, inb);
89 if (subop == l_osd_sop_w) {
90 logger->inc(l_osd_sop_w_inb, inb);
91 logger->tinc(l_osd_sop_w_lat, latency);
92 } else if (subop == l_osd_sop_push) {
93 logger->inc(l_osd_sop_push_inb, inb);
94 logger->tinc(l_osd_sop_push_lat, latency);
95 } else
11fdf7f2 96 ceph_abort_msg("no support subop");
7c673cae
FG
97 } else {
98 logger->tinc(l_osd_sop_pull_lat, latency);
99 }
100}
101
102ReplicatedBackend::ReplicatedBackend(
103 PGBackend::Listener *pg,
11fdf7f2 104 const coll_t &coll,
7c673cae
FG
105 ObjectStore::CollectionHandle &c,
106 ObjectStore *store,
107 CephContext *cct) :
108 PGBackend(cct, pg, store, coll, c) {}
109
110void ReplicatedBackend::run_recovery_op(
111 PGBackend::RecoveryHandle *_h,
112 int priority)
113{
114 RPGHandle *h = static_cast<RPGHandle *>(_h);
115 send_pushes(priority, h->pushes);
116 send_pulls(priority, h->pulls);
c07f9fc5 117 send_recovery_deletes(priority, h->deletes);
7c673cae
FG
118 delete h;
119}
120
224ce89b 121int ReplicatedBackend::recover_object(
7c673cae
FG
122 const hobject_t &hoid,
123 eversion_t v,
124 ObjectContextRef head,
125 ObjectContextRef obc,
126 RecoveryHandle *_h
127 )
128{
129 dout(10) << __func__ << ": " << hoid << dendl;
130 RPGHandle *h = static_cast<RPGHandle *>(_h);
131 if (get_parent()->get_local_missing().is_missing(hoid)) {
11fdf7f2 132 ceph_assert(!obc);
7c673cae
FG
133 // pull
134 prepare_pull(
135 v,
136 hoid,
137 head,
138 h);
7c673cae 139 } else {
11fdf7f2 140 ceph_assert(obc);
7c673cae
FG
141 int started = start_pushes(
142 hoid,
143 obc,
144 h);
224ce89b
WB
145 if (started < 0) {
146 pushing[hoid].clear();
147 return started;
148 }
7c673cae 149 }
224ce89b 150 return 0;
7c673cae
FG
151}
152
153void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
154{
155 for(map<pg_shard_t, set<hobject_t> >::iterator i = pull_from_peer.begin();
156 i != pull_from_peer.end();
157 ) {
158 if (osdmap->is_down(i->first.osd)) {
159 dout(10) << "check_recovery_sources resetting pulls from osd." << i->first
160 << ", osdmap has it marked down" << dendl;
161 for (set<hobject_t>::iterator j = i->second.begin();
162 j != i->second.end();
163 ++j) {
164 get_parent()->cancel_pull(*j);
165 clear_pull(pulling.find(*j), false);
166 }
167 pull_from_peer.erase(i++);
168 } else {
169 ++i;
170 }
171 }
172}
173
174bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op)
175{
176 dout(10) << __func__ << ": " << op << dendl;
177 switch (op->get_req()->get_type()) {
178 case MSG_OSD_PG_PULL:
179 return true;
180 default:
181 return false;
182 }
183}
184
c07f9fc5 185bool ReplicatedBackend::_handle_message(
7c673cae
FG
186 OpRequestRef op
187 )
188{
189 dout(10) << __func__ << ": " << op << dendl;
190 switch (op->get_req()->get_type()) {
191 case MSG_OSD_PG_PUSH:
192 do_push(op);
193 return true;
194
195 case MSG_OSD_PG_PULL:
196 do_pull(op);
197 return true;
198
199 case MSG_OSD_PG_PUSH_REPLY:
200 do_push_reply(op);
201 return true;
202
7c673cae
FG
203 case MSG_OSD_REPOP: {
204 do_repop(op);
205 return true;
206 }
207
208 case MSG_OSD_REPOPREPLY: {
209 do_repop_reply(op);
210 return true;
211 }
212
213 default:
214 break;
215 }
216 return false;
217}
218
219void ReplicatedBackend::clear_recovery_state()
220{
221 // clear pushing/pulling maps
222 for (auto &&i: pushing) {
223 for (auto &&j: i.second) {
224 get_parent()->release_locks(j.second.lock_manager);
225 }
226 }
227 pushing.clear();
228
229 for (auto &&i: pulling) {
230 get_parent()->release_locks(i.second.lock_manager);
231 }
232 pulling.clear();
233 pull_from_peer.clear();
234}
235
236void ReplicatedBackend::on_change()
237{
238 dout(10) << __func__ << dendl;
11fdf7f2
TL
239 for (auto& op : in_progress_ops) {
240 delete op.second->on_commit;
241 op.second->on_commit = nullptr;
7c673cae 242 }
11fdf7f2 243 in_progress_ops.clear();
7c673cae
FG
244 clear_recovery_state();
245}
246
7c673cae
FG
247int ReplicatedBackend::objects_read_sync(
248 const hobject_t &hoid,
249 uint64_t off,
250 uint64_t len,
251 uint32_t op_flags,
252 bufferlist *bl)
253{
254 return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags);
255}
256
7c673cae
FG
257void ReplicatedBackend::objects_read_async(
258 const hobject_t &hoid,
259 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
260 pair<bufferlist*, Context*> > > &to_read,
261 Context *on_complete,
262 bool fast_read)
263{
11fdf7f2 264 ceph_abort_msg("async read is not used by replica pool");
7c673cae
FG
265}
266
267class C_OSD_OnOpCommit : public Context {
268 ReplicatedBackend *pg;
11fdf7f2 269 ReplicatedBackend::InProgressOpRef op;
7c673cae
FG
270public:
271 C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
272 : pg(pg), op(op) {}
273 void finish(int) override {
274 pg->op_commit(op);
275 }
276};
277
7c673cae
FG
278void generate_transaction(
279 PGTransactionUPtr &pgt,
280 const coll_t &coll,
7c673cae
FG
281 vector<pg_log_entry_t> &log_entries,
282 ObjectStore::Transaction *t,
283 set<hobject_t> *added,
284 set<hobject_t> *removed)
285{
11fdf7f2
TL
286 ceph_assert(t);
287 ceph_assert(added);
288 ceph_assert(removed);
7c673cae
FG
289
290 for (auto &&le: log_entries) {
291 le.mark_unrollbackable();
292 auto oiter = pgt->op_map.find(le.soid);
293 if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) {
31f18b77 294 bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8);
11fdf7f2 295 encode(oiter->second.updated_snaps->second, bl);
31f18b77
FG
296 le.snaps.swap(bl);
297 le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
7c673cae
FG
298 }
299 }
300
301 pgt->safe_create_traverse(
302 [&](pair<const hobject_t, PGTransaction::ObjectOperation> &obj_op) {
303 const hobject_t &oid = obj_op.first;
304 const ghobject_t goid =
305 ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
306 const PGTransaction::ObjectOperation &op = obj_op.second;
307
308 if (oid.is_temp()) {
309 if (op.is_fresh_object()) {
310 added->insert(oid);
311 } else if (op.is_delete()) {
312 removed->insert(oid);
313 }
314 }
315
316 if (op.delete_first) {
317 t->remove(coll, goid);
318 }
319
320 match(
321 op.init_type,
322 [&](const PGTransaction::ObjectOperation::Init::None &) {
323 },
324 [&](const PGTransaction::ObjectOperation::Init::Create &op) {
325 t->touch(coll, goid);
326 },
327 [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
328 t->clone(
329 coll,
330 ghobject_t(
331 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
332 goid);
333 },
334 [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
11fdf7f2 335 ceph_assert(op.source.is_temp());
7c673cae
FG
336 t->collection_move_rename(
337 coll,
338 ghobject_t(
339 op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
340 coll,
341 goid);
342 });
343
344 if (op.truncate) {
345 t->truncate(coll, goid, op.truncate->first);
346 if (op.truncate->first != op.truncate->second)
347 t->truncate(coll, goid, op.truncate->second);
348 }
349
350 if (!op.attr_updates.empty()) {
351 map<string, bufferlist> attrs;
352 for (auto &&p: op.attr_updates) {
353 if (p.second)
354 attrs[p.first] = *(p.second);
355 else
356 t->rmattr(coll, goid, p.first);
357 }
358 t->setattrs(coll, goid, attrs);
359 }
360
361 if (op.clear_omap)
362 t->omap_clear(coll, goid);
363 if (op.omap_header)
364 t->omap_setheader(coll, goid, *(op.omap_header));
365
366 for (auto &&up: op.omap_updates) {
367 using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType;
368 switch (up.first) {
369 case UpdateType::Remove:
370 t->omap_rmkeys(coll, goid, up.second);
371 break;
372 case UpdateType::Insert:
373 t->omap_setkeys(coll, goid, up.second);
374 break;
375 }
376 }
377
378 // updated_snaps doesn't matter since we marked unrollbackable
379
380 if (op.alloc_hint) {
381 auto &hint = *(op.alloc_hint);
382 t->set_alloc_hint(
383 coll,
384 goid,
385 hint.expected_object_size,
386 hint.expected_write_size,
387 hint.flags);
388 }
389
390 for (auto &&extent: op.buffer_updates) {
391 using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
392 match(
393 extent.get_val(),
394 [&](const BufferUpdate::Write &op) {
395 t->write(
396 coll,
397 goid,
398 extent.get_off(),
399 extent.get_len(),
400 op.buffer);
401 },
402 [&](const BufferUpdate::Zero &op) {
403 t->zero(
404 coll,
405 goid,
406 extent.get_off(),
407 extent.get_len());
408 },
409 [&](const BufferUpdate::CloneRange &op) {
11fdf7f2 410 ceph_assert(op.len == extent.get_len());
7c673cae
FG
411 t->clone_range(
412 coll,
413 ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
414 goid,
415 op.offset,
416 extent.get_len(),
417 extent.get_off());
418 });
419 }
420 });
421}
422
423void ReplicatedBackend::submit_transaction(
424 const hobject_t &soid,
425 const object_stat_sum_t &delta_stats,
426 const eversion_t &at_version,
427 PGTransactionUPtr &&_t,
428 const eversion_t &trim_to,
429 const eversion_t &roll_forward_to,
430 const vector<pg_log_entry_t> &_log_entries,
431 boost::optional<pg_hit_set_history_t> &hset_history,
7c673cae
FG
432 Context *on_all_commit,
433 ceph_tid_t tid,
434 osd_reqid_t reqid,
435 OpRequestRef orig_op)
436{
437 parent->apply_stats(
438 soid,
439 delta_stats);
440
441 vector<pg_log_entry_t> log_entries(_log_entries);
442 ObjectStore::Transaction op_t;
443 PGTransactionUPtr t(std::move(_t));
444 set<hobject_t> added, removed;
445 generate_transaction(
446 t,
447 coll,
7c673cae
FG
448 log_entries,
449 &op_t,
450 &added,
451 &removed);
11fdf7f2
TL
452 ceph_assert(added.size() <= 1);
453 ceph_assert(removed.size() <= 1);
7c673cae 454
11fdf7f2 455 auto insert_res = in_progress_ops.insert(
7c673cae
FG
456 make_pair(
457 tid,
11fdf7f2
TL
458 new InProgressOp(
459 tid, on_all_commit,
7c673cae
FG
460 orig_op, at_version)
461 )
11fdf7f2
TL
462 );
463 ceph_assert(insert_res.second);
464 InProgressOp &op = *insert_res.first->second;
7c673cae 465
7c673cae 466 op.waiting_for_commit.insert(
11fdf7f2
TL
467 parent->get_acting_recovery_backfill_shards().begin(),
468 parent->get_acting_recovery_backfill_shards().end());
7c673cae
FG
469
470 issue_op(
471 soid,
472 at_version,
473 tid,
474 reqid,
475 trim_to,
476 at_version,
477 added.size() ? *(added.begin()) : hobject_t(),
478 removed.size() ? *(removed.begin()) : hobject_t(),
479 log_entries,
480 hset_history,
481 &op,
482 op_t);
483
484 add_temp_objs(added);
485 clear_temp_objs(removed);
486
487 parent->log_operation(
488 log_entries,
489 hset_history,
490 trim_to,
491 at_version,
492 true,
493 op_t);
494
7c673cae
FG
495 op_t.register_on_commit(
496 parent->bless_context(
497 new C_OSD_OnOpCommit(this, &op)));
498
499 vector<ObjectStore::Transaction> tls;
500 tls.push_back(std::move(op_t));
501
502 parent->queue_transactions(tls, op.op);
11fdf7f2
TL
503 if (at_version != eversion_t()) {
504 parent->op_applied(at_version);
7c673cae
FG
505 }
506}
507
508void ReplicatedBackend::op_commit(
11fdf7f2 509 InProgressOpRef& op)
7c673cae 510{
11fdf7f2
TL
511 if (op->on_commit == nullptr) {
512 // aborted
513 return;
514 }
515
516 FUNCTRACE(cct);
7c673cae
FG
517 OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_COMMIT_BEGIN", true);
518 dout(10) << __func__ << ": " << op->tid << dendl;
519 if (op->op) {
520 op->op->mark_event("op_commit");
521 op->op->pg_trace.event("op commit");
522 }
523
524 op->waiting_for_commit.erase(get_parent()->whoami_shard());
525
526 if (op->waiting_for_commit.empty()) {
527 op->on_commit->complete(0);
528 op->on_commit = 0;
7c673cae
FG
529 in_progress_ops.erase(op->tid);
530 }
531}
532
533void ReplicatedBackend::do_repop_reply(OpRequestRef op)
534{
535 static_cast<MOSDRepOpReply*>(op->get_nonconst_req())->finish_decode();
536 const MOSDRepOpReply *r = static_cast<const MOSDRepOpReply *>(op->get_req());
11fdf7f2 537 ceph_assert(r->get_header().type == MSG_OSD_REPOPREPLY);
7c673cae
FG
538
539 op->mark_started();
540
541 // must be replication.
542 ceph_tid_t rep_tid = r->get_tid();
543 pg_shard_t from = r->from;
544
11fdf7f2
TL
545 auto iter = in_progress_ops.find(rep_tid);
546 if (iter != in_progress_ops.end()) {
547 InProgressOp &ip_op = *iter->second;
7c673cae
FG
548 const MOSDOp *m = NULL;
549 if (ip_op.op)
550 m = static_cast<const MOSDOp *>(ip_op.op->get_req());
551
552 if (m)
553 dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
554 << " ack_type " << (int)r->ack_type
555 << " from " << from
556 << dendl;
557 else
558 dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) "
559 << " ack_type " << (int)r->ack_type
560 << " from " << from
561 << dendl;
562
563 // oh, good.
564
565 if (r->ack_type & CEPH_OSD_FLAG_ONDISK) {
11fdf7f2 566 ceph_assert(ip_op.waiting_for_commit.count(from));
7c673cae
FG
567 ip_op.waiting_for_commit.erase(from);
568 if (ip_op.op) {
11fdf7f2 569 ip_op.op->mark_event("sub_op_commit_rec");
7c673cae
FG
570 ip_op.op->pg_trace.event("sub_op_commit_rec");
571 }
572 } else {
11fdf7f2 573 // legacy peer; ignore
7c673cae 574 }
7c673cae
FG
575
576 parent->update_peer_last_complete_ondisk(
577 from,
578 r->get_last_complete_ondisk());
579
7c673cae
FG
580 if (ip_op.waiting_for_commit.empty() &&
581 ip_op.on_commit) {
582 ip_op.on_commit->complete(0);
11fdf7f2 583 ip_op.on_commit = 0;
7c673cae
FG
584 in_progress_ops.erase(iter);
585 }
586 }
587}
588
28e407b8 589int ReplicatedBackend::be_deep_scrub(
7c673cae 590 const hobject_t &poid,
28e407b8
AA
591 ScrubMap &map,
592 ScrubMapBuilder &pos,
593 ScrubMap::object &o)
7c673cae 594{
28e407b8 595 dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
7c673cae 596 int r;
28e407b8 597 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
91327a77
AA
598 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED |
599 CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE;
28e407b8
AA
600
601 utime_t sleeptime;
602 sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
603 if (sleeptime != utime_t()) {
604 lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
605 sleeptime.sleep();
606 }
7c673cae 607
11fdf7f2 608 ceph_assert(poid == pos.ls[pos.pos]);
28e407b8
AA
609 if (!pos.data_done()) {
610 if (pos.data_pos == 0) {
611 pos.data_hash = bufferhash(-1);
612 }
7c673cae 613
28e407b8 614 bufferlist bl;
7c673cae 615 r = store->read(
28e407b8
AA
616 ch,
617 ghobject_t(
618 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
619 pos.data_pos,
620 cct->_conf->osd_deep_scrub_stride, bl,
621 fadvise_flags);
622 if (r < 0) {
623 dout(20) << __func__ << " " << poid << " got "
624 << r << " on read, read_error" << dendl;
625 o.read_error = true;
626 return 0;
627 }
628 if (r > 0) {
629 pos.data_hash << bl;
630 }
631 pos.data_pos += r;
632 if (r == cct->_conf->osd_deep_scrub_stride) {
633 dout(20) << __func__ << " " << poid << " more data, digest so far 0x"
634 << std::hex << pos.data_hash.digest() << std::dec << dendl;
635 return -EINPROGRESS;
636 }
637 // done with bytes
638 pos.data_pos = -1;
639 o.digest = pos.data_hash.digest();
640 o.digest_present = true;
641 dout(20) << __func__ << " " << poid << " done with data, digest 0x"
642 << std::hex << o.digest << std::dec << dendl;
7c673cae 643 }
7c673cae 644
28e407b8
AA
645 // omap header
646 if (pos.omap_pos.empty()) {
647 pos.omap_hash = bufferhash(-1);
648
649 bufferlist hdrbl;
650 r = store->omap_get_header(
11fdf7f2 651 ch,
28e407b8
AA
652 ghobject_t(
653 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
654 &hdrbl, true);
655 if (r == -EIO) {
656 dout(20) << __func__ << " " << poid << " got "
657 << r << " on omap header read, read_error" << dendl;
658 o.read_error = true;
659 return 0;
660 }
661 if (r == 0 && hdrbl.length()) {
662 dout(25) << "CRC header " << string(hdrbl.c_str(), hdrbl.length())
663 << dendl;
664 pos.omap_hash << hdrbl;
7c673cae 665 }
7c673cae
FG
666 }
667
28e407b8 668 // omap
7c673cae 669 ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(
11fdf7f2 670 ch,
7c673cae
FG
671 ghobject_t(
672 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
11fdf7f2 673 ceph_assert(iter);
28e407b8
AA
674 if (pos.omap_pos.length()) {
675 iter->lower_bound(pos.omap_pos);
676 } else {
677 iter->seek_to_first();
678 }
11fdf7f2 679 int max = g_conf()->osd_deep_scrub_keys;
28e407b8
AA
680 while (iter->status() == 0 && iter->valid()) {
681 pos.omap_bytes += iter->value().length();
682 ++pos.omap_keys;
683 --max;
684 // fixme: we can do this more efficiently.
685 bufferlist bl;
11fdf7f2
TL
686 encode(iter->key(), bl);
687 encode(iter->value(), bl);
28e407b8
AA
688 pos.omap_hash << bl;
689
690 iter->next();
691
692 if (iter->valid() && max == 0) {
693 pos.omap_pos = iter->key();
694 return -EINPROGRESS;
695 }
696 if (iter->status() < 0) {
697 dout(25) << __func__ << " " << poid
698 << " on omap scan, db status error" << dendl;
699 o.read_error = true;
700 return 0;
701 }
7c673cae
FG
702 }
703
28e407b8
AA
704 if (pos.omap_keys > cct->_conf->
705 osd_deep_scrub_large_omap_object_key_threshold ||
706 pos.omap_bytes > cct->_conf->
707 osd_deep_scrub_large_omap_object_value_sum_threshold) {
708 dout(25) << __func__ << " " << poid
709 << " large omap object detected. Object has " << pos.omap_keys
710 << " keys and size " << pos.omap_bytes << " bytes" << dendl;
711 o.large_omap_object_found = true;
712 o.large_omap_object_key_count = pos.omap_keys;
713 o.large_omap_object_value_size = pos.omap_bytes;
714 map.has_large_omap_object_errors = true;
7c673cae
FG
715 }
716
28e407b8 717 o.omap_digest = pos.omap_hash.digest();
7c673cae 718 o.omap_digest_present = true;
28e407b8 719 dout(20) << __func__ << " done with " << poid << " omap_digest "
7c673cae 720 << std::hex << o.omap_digest << std::dec << dendl;
28e407b8 721
11fdf7f2
TL
722 // Sum up omap usage
723 if (pos.omap_keys > 0 || pos.omap_bytes > 0) {
724 dout(25) << __func__ << " adding " << pos.omap_keys << " keys and "
725 << pos.omap_bytes << " bytes to pg_stats sums" << dendl;
726 map.has_omap_keys = true;
727 o.object_omap_bytes = pos.omap_bytes;
728 o.object_omap_keys = pos.omap_keys;
729 }
730
28e407b8
AA
731 // done!
732 return 0;
7c673cae
FG
733}
734
735void ReplicatedBackend::_do_push(OpRequestRef op)
736{
737 const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
11fdf7f2 738 ceph_assert(m->get_type() == MSG_OSD_PG_PUSH);
7c673cae
FG
739 pg_shard_t from = m->from;
740
741 op->mark_started();
742
743 vector<PushReplyOp> replies;
744 ObjectStore::Transaction t;
11fdf7f2
TL
745 if (get_parent()->check_failsafe_full()) {
746 dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl;
7c673cae
FG
747 ceph_abort();
748 }
749 for (vector<PushOp>::const_iterator i = m->pushes.begin();
750 i != m->pushes.end();
751 ++i) {
752 replies.push_back(PushReplyOp());
11fdf7f2 753 handle_push(from, *i, &(replies.back()), &t, m->is_repair);
7c673cae
FG
754 }
755
756 MOSDPGPushReply *reply = new MOSDPGPushReply;
757 reply->from = get_parent()->whoami_shard();
758 reply->set_priority(m->get_priority());
759 reply->pgid = get_info().pgid;
760 reply->map_epoch = m->map_epoch;
761 reply->min_epoch = m->min_epoch;
762 reply->replies.swap(replies);
763 reply->compute_cost(cct);
764
765 t.register_on_complete(
766 new PG_SendMessageOnConn(
767 get_parent(), reply, m->get_connection()));
768
769 get_parent()->queue_transaction(std::move(t));
770}
771
772struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
773 ReplicatedBackend *bc;
774 list<ReplicatedBackend::pull_complete_info> to_continue;
775 int priority;
776 C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
777 : bc(bc), priority(priority) {}
778
779 void finish(ThreadPool::TPHandle &handle) override {
780 ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
781 for (auto &&i: to_continue) {
782 auto j = bc->pulling.find(i.hoid);
11fdf7f2 783 ceph_assert(j != bc->pulling.end());
7c673cae
FG
784 ObjectContextRef obc = j->second.obc;
785 bc->clear_pull(j, false /* already did it */);
224ce89b
WB
786 int started = bc->start_pushes(i.hoid, obc, h);
787 if (started < 0) {
788 bc->pushing[i.hoid].clear();
789 bc->get_parent()->primary_failed(i.hoid);
790 bc->get_parent()->primary_error(i.hoid, obc->obs.oi.version);
791 } else if (!started) {
7c673cae 792 bc->get_parent()->on_global_recover(
c07f9fc5 793 i.hoid, i.stat, false);
7c673cae
FG
794 }
795 handle.reset_tp_timeout();
796 }
797 bc->run_recovery_op(h, priority);
798 }
799};
800
801void ReplicatedBackend::_do_pull_response(OpRequestRef op)
802{
803 const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
11fdf7f2 804 ceph_assert(m->get_type() == MSG_OSD_PG_PUSH);
7c673cae
FG
805 pg_shard_t from = m->from;
806
807 op->mark_started();
808
809 vector<PullOp> replies(1);
11fdf7f2
TL
810 if (get_parent()->check_failsafe_full()) {
811 dout(10) << __func__ << " Out of space (failsafe) processing pull response (push)." << dendl;
7c673cae
FG
812 ceph_abort();
813 }
814
815 ObjectStore::Transaction t;
816 list<pull_complete_info> to_continue;
817 for (vector<PushOp>::const_iterator i = m->pushes.begin();
818 i != m->pushes.end();
819 ++i) {
820 bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, &t);
821 if (more)
822 replies.push_back(PullOp());
823 }
824 if (!to_continue.empty()) {
825 C_ReplicatedBackend_OnPullComplete *c =
826 new C_ReplicatedBackend_OnPullComplete(
827 this,
828 m->get_priority());
829 c->to_continue.swap(to_continue);
830 t.register_on_complete(
831 new PG_RecoveryQueueAsync(
832 get_parent(),
11fdf7f2 833 get_parent()->bless_unlocked_gencontext(c)));
7c673cae
FG
834 }
835 replies.erase(replies.end() - 1);
836
837 if (replies.size()) {
838 MOSDPGPull *reply = new MOSDPGPull;
839 reply->from = parent->whoami_shard();
840 reply->set_priority(m->get_priority());
841 reply->pgid = get_info().pgid;
842 reply->map_epoch = m->map_epoch;
843 reply->min_epoch = m->min_epoch;
844 reply->set_pulls(&replies);
845 reply->compute_cost(cct);
846
847 t.register_on_complete(
848 new PG_SendMessageOnConn(
849 get_parent(), reply, m->get_connection()));
850 }
851
852 get_parent()->queue_transaction(std::move(t));
853}
854
855void ReplicatedBackend::do_pull(OpRequestRef op)
856{
857 MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_nonconst_req());
11fdf7f2 858 ceph_assert(m->get_type() == MSG_OSD_PG_PULL);
7c673cae
FG
859 pg_shard_t from = m->from;
860
861 map<pg_shard_t, vector<PushOp> > replies;
862 vector<PullOp> pulls;
863 m->take_pulls(&pulls);
864 for (auto& i : pulls) {
865 replies[from].push_back(PushOp());
866 handle_pull(from, i, &(replies[from].back()));
867 }
868 send_pushes(m->get_priority(), replies);
869}
870
871void ReplicatedBackend::do_push_reply(OpRequestRef op)
872{
873 const MOSDPGPushReply *m = static_cast<const MOSDPGPushReply *>(op->get_req());
11fdf7f2 874 ceph_assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
7c673cae
FG
875 pg_shard_t from = m->from;
876
877 vector<PushOp> replies(1);
878 for (vector<PushReplyOp>::const_iterator i = m->replies.begin();
879 i != m->replies.end();
880 ++i) {
881 bool more = handle_push_reply(from, *i, &(replies.back()));
882 if (more)
883 replies.push_back(PushOp());
884 }
885 replies.erase(replies.end() - 1);
886
887 map<pg_shard_t, vector<PushOp> > _replies;
888 _replies[from].swap(replies);
889 send_pushes(m->get_priority(), _replies);
890}
891
892Message * ReplicatedBackend::generate_subop(
893 const hobject_t &soid,
894 const eversion_t &at_version,
895 ceph_tid_t tid,
896 osd_reqid_t reqid,
897 eversion_t pg_trim_to,
898 eversion_t pg_roll_forward_to,
899 hobject_t new_temp_oid,
900 hobject_t discard_temp_oid,
11fdf7f2 901 const bufferlist &log_entries,
7c673cae
FG
902 boost::optional<pg_hit_set_history_t> &hset_hist,
903 ObjectStore::Transaction &op_t,
904 pg_shard_t peer,
905 const pg_info_t &pinfo)
906{
907 int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
908 // forward the write/update/whatever
909 MOSDRepOp *wr = new MOSDRepOp(
910 reqid, parent->whoami_shard(),
911 spg_t(get_info().pgid.pgid, peer.shard),
912 soid, acks_wanted,
11fdf7f2 913 get_osdmap_epoch(),
7c673cae
FG
914 parent->get_last_peering_reset_epoch(),
915 tid, at_version);
916
917 // ship resulting transaction, log entries, and pg_stats
918 if (!parent->should_send_op(peer, soid)) {
7c673cae 919 ObjectStore::Transaction t;
11fdf7f2 920 encode(t, wr->get_data());
7c673cae 921 } else {
11fdf7f2 922 encode(op_t, wr->get_data());
7c673cae
FG
923 wr->get_header().data_off = op_t.get_data_alignment();
924 }
925
11fdf7f2 926 wr->logbl = log_entries;
7c673cae
FG
927
928 if (pinfo.is_incomplete())
929 wr->pg_stats = pinfo.stats; // reflects backfill progress
930 else
931 wr->pg_stats = get_info().stats;
932
933 wr->pg_trim_to = pg_trim_to;
934 wr->pg_roll_forward_to = pg_roll_forward_to;
935
936 wr->new_temp_oid = new_temp_oid;
937 wr->discard_temp_oid = discard_temp_oid;
938 wr->updated_hit_set_history = hset_hist;
939 return wr;
940}
941
942void ReplicatedBackend::issue_op(
943 const hobject_t &soid,
944 const eversion_t &at_version,
945 ceph_tid_t tid,
946 osd_reqid_t reqid,
947 eversion_t pg_trim_to,
948 eversion_t pg_roll_forward_to,
949 hobject_t new_temp_oid,
950 hobject_t discard_temp_oid,
951 const vector<pg_log_entry_t> &log_entries,
952 boost::optional<pg_hit_set_history_t> &hset_hist,
953 InProgressOp *op,
954 ObjectStore::Transaction &op_t)
955{
11fdf7f2
TL
956 if (parent->get_acting_recovery_backfill_shards().size() > 1) {
957 if (op->op) {
958 op->op->pg_trace.event("issue replication ops");
959 ostringstream ss;
960 set<pg_shard_t> replicas = parent->get_acting_recovery_backfill_shards();
961 replicas.erase(parent->whoami_shard());
962 ss << "waiting for subops from " << replicas;
7c673cae 963 op->op->mark_sub_op_sent(ss.str());
11fdf7f2 964 }
7c673cae 965
11fdf7f2
TL
966 // avoid doing the same work in generate_subop
967 bufferlist logs;
968 encode(log_entries, logs);
969
970 for (const auto& shard : get_parent()->get_acting_recovery_backfill_shards()) {
971 if (shard == parent->whoami_shard()) continue;
972 const pg_info_t &pinfo = parent->get_shard_info().find(shard)->second;
973
974 Message *wr;
975 wr = generate_subop(
976 soid,
977 at_version,
978 tid,
979 reqid,
980 pg_trim_to,
981 pg_roll_forward_to,
982 new_temp_oid,
983 discard_temp_oid,
984 logs,
985 hset_hist,
986 op_t,
987 shard,
988 pinfo);
989 if (op->op && op->op->pg_trace)
990 wr->trace.init("replicated op", nullptr, &op->op->pg_trace);
991 get_parent()->send_message_osd_cluster(
992 shard.osd, wr, get_osdmap_epoch());
993 }
7c673cae
FG
994 }
995}
996
997// sub op modify
998void ReplicatedBackend::do_repop(OpRequestRef op)
999{
1000 static_cast<MOSDRepOp*>(op->get_nonconst_req())->finish_decode();
1001 const MOSDRepOp *m = static_cast<const MOSDRepOp *>(op->get_req());
1002 int msg_type = m->get_type();
11fdf7f2 1003 ceph_assert(MSG_OSD_REPOP == msg_type);
7c673cae
FG
1004
1005 const hobject_t& soid = m->poid;
1006
1007 dout(10) << __func__ << " " << soid
1008 << " v " << m->version
1009 << (m->logbl.length() ? " (transaction)" : " (parallel exec")
1010 << " " << m->logbl.length()
1011 << dendl;
1012
1013 // sanity checks
11fdf7f2 1014 ceph_assert(m->map_epoch >= get_info().history.same_interval_since);
7c673cae 1015
11fdf7f2 1016 dout(30) << __func__ << " missing before " << get_parent()->get_log().get_missing().get_items() << dendl;
28e407b8
AA
1017 parent->maybe_preempt_replica_scrub(soid);
1018
7c673cae
FG
1019 int ackerosd = m->get_source().num();
1020
1021 op->mark_started();
1022
1023 RepModifyRef rm(std::make_shared<RepModify>());
1024 rm->op = op;
1025 rm->ackerosd = ackerosd;
1026 rm->last_complete = get_info().last_complete;
11fdf7f2 1027 rm->epoch_started = get_osdmap_epoch();
7c673cae 1028
11fdf7f2 1029 ceph_assert(m->logbl.length());
7c673cae
FG
1030 // shipped transaction and log entries
1031 vector<pg_log_entry_t> log;
1032
11fdf7f2
TL
1033 auto p = const_cast<bufferlist&>(m->get_data()).cbegin();
1034 decode(rm->opt, p);
7c673cae
FG
1035
1036 if (m->new_temp_oid != hobject_t()) {
1037 dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
1038 add_temp_obj(m->new_temp_oid);
1039 }
1040 if (m->discard_temp_oid != hobject_t()) {
1041 dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
1042 if (rm->opt.empty()) {
1043 dout(10) << __func__ << ": removing object " << m->discard_temp_oid
1044 << " since we won't get the transaction" << dendl;
1045 rm->localt.remove(coll, ghobject_t(m->discard_temp_oid));
1046 }
1047 clear_temp_obj(m->discard_temp_oid);
1048 }
1049
1050 p = const_cast<bufferlist&>(m->logbl).begin();
11fdf7f2 1051 decode(log, p);
7c673cae
FG
1052 rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1053
1054 bool update_snaps = false;
1055 if (!rm->opt.empty()) {
1056 // If the opt is non-empty, we infer we are before
1057 // last_backfill (according to the primary, not our
1058 // not-quite-accurate value), and should update the
1059 // collections now. Otherwise, we do it later on push.
1060 update_snaps = true;
1061 }
11fdf7f2
TL
1062
1063 // flag set to true during async recovery
1064 bool async = false;
1065 pg_missing_tracker_t pmissing = get_parent()->get_local_missing();
1066 if (pmissing.is_missing(soid)) {
1067 async = true;
1068 dout(30) << __func__ << " is_missing " << pmissing.is_missing(soid) << dendl;
1069 for (auto &&e: log) {
1070 dout(30) << " add_next_event entry " << e << dendl;
1071 get_parent()->add_local_next_event(e);
1072 dout(30) << " entry is_delete " << e.is_delete() << dendl;
1073 }
1074 }
1075
7c673cae
FG
1076 parent->update_stats(m->pg_stats);
1077 parent->log_operation(
1078 log,
1079 m->updated_hit_set_history,
1080 m->pg_trim_to,
1081 m->pg_roll_forward_to,
1082 update_snaps,
11fdf7f2
TL
1083 rm->localt,
1084 async);
7c673cae
FG
1085
1086 rm->opt.register_on_commit(
1087 parent->bless_context(
1088 new C_OSD_RepModifyCommit(this, rm)));
7c673cae
FG
1089 vector<ObjectStore::Transaction> tls;
1090 tls.reserve(2);
1091 tls.push_back(std::move(rm->localt));
1092 tls.push_back(std::move(rm->opt));
1093 parent->queue_transactions(tls, op);
1094 // op is cleaned up by oncommit/onapply when both are executed
11fdf7f2 1095 dout(30) << __func__ << " missing after" << get_parent()->get_log().get_missing().get_items() << dendl;
7c673cae
FG
1096}
1097
1098void ReplicatedBackend::repop_commit(RepModifyRef rm)
1099{
1100 rm->op->mark_commit_sent();
1101 rm->op->pg_trace.event("sup_op_commit");
1102 rm->committed = true;
1103
1104 // send commit.
1105 const MOSDRepOp *m = static_cast<const MOSDRepOp*>(rm->op->get_req());
11fdf7f2 1106 ceph_assert(m->get_type() == MSG_OSD_REPOP);
7c673cae
FG
1107 dout(10) << __func__ << " on op " << *m
1108 << ", sending commit to osd." << rm->ackerosd
1109 << dendl;
11fdf7f2 1110 ceph_assert(get_osdmap()->is_up(rm->ackerosd));
7c673cae
FG
1111
1112 get_parent()->update_last_complete_ondisk(rm->last_complete);
1113
1114 MOSDRepOpReply *reply = new MOSDRepOpReply(
1115 m,
1116 get_parent()->whoami_shard(),
11fdf7f2 1117 0, get_osdmap_epoch(), m->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
7c673cae
FG
1118 reply->set_last_complete_ondisk(rm->last_complete);
1119 reply->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
1120 reply->trace = rm->op->pg_trace;
1121 get_parent()->send_message_osd_cluster(
11fdf7f2 1122 rm->ackerosd, reply, get_osdmap_epoch());
7c673cae
FG
1123
1124 log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
1125}
1126
1127
1128// ===========================================================
1129
1130void ReplicatedBackend::calc_head_subsets(
1131 ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
1132 const pg_missing_t& missing,
1133 const hobject_t &last_backfill,
1134 interval_set<uint64_t>& data_subset,
1135 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1136 ObcLockManager &manager)
1137{
1138 dout(10) << "calc_head_subsets " << head
1139 << " clone_overlap " << snapset.clone_overlap << dendl;
1140
1141 uint64_t size = obc->obs.oi.size;
1142 if (size)
1143 data_subset.insert(0, size);
1144
1145 if (get_parent()->get_pool().allow_incomplete_clones()) {
1146 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1147 return;
1148 }
1149
1150 if (!cct->_conf->osd_recover_clone_overlap) {
1151 dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
1152 return;
1153 }
1154
1155
1156 interval_set<uint64_t> cloning;
1157 interval_set<uint64_t> prev;
1158 if (size)
1159 prev.insert(0, size);
1160
1161 for (int j=snapset.clones.size()-1; j>=0; j--) {
1162 hobject_t c = head;
1163 c.snap = snapset.clones[j];
1164 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1165 if (!missing.is_missing(c) &&
1166 c < last_backfill &&
1167 get_parent()->try_lock_for_read(c, manager)) {
1168 dout(10) << "calc_head_subsets " << head << " has prev " << c
1169 << " overlap " << prev << dendl;
1170 clone_subsets[c] = prev;
1171 cloning.union_of(prev);
1172 break;
1173 }
1174 dout(10) << "calc_head_subsets " << head << " does not have prev " << c
1175 << " overlap " << prev << dendl;
1176 }
1177
1178
1179 if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1180 dout(10) << "skipping clone, too many holes" << dendl;
1181 get_parent()->release_locks(manager);
1182 clone_subsets.clear();
1183 cloning.clear();
1184 }
1185
1186 // what's left for us to push?
1187 data_subset.subtract(cloning);
1188
1189 dout(10) << "calc_head_subsets " << head
1190 << " data_subset " << data_subset
1191 << " clone_subsets " << clone_subsets << dendl;
1192}
1193
1194void ReplicatedBackend::calc_clone_subsets(
1195 SnapSet& snapset, const hobject_t& soid,
1196 const pg_missing_t& missing,
1197 const hobject_t &last_backfill,
1198 interval_set<uint64_t>& data_subset,
1199 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1200 ObcLockManager &manager)
1201{
1202 dout(10) << "calc_clone_subsets " << soid
1203 << " clone_overlap " << snapset.clone_overlap << dendl;
1204
1205 uint64_t size = snapset.clone_size[soid.snap];
1206 if (size)
1207 data_subset.insert(0, size);
1208
1209 if (get_parent()->get_pool().allow_incomplete_clones()) {
1210 dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
1211 return;
1212 }
1213
1214 if (!cct->_conf->osd_recover_clone_overlap) {
1215 dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
1216 return;
1217 }
1218
1219 unsigned i;
1220 for (i=0; i < snapset.clones.size(); i++)
1221 if (snapset.clones[i] == soid.snap)
1222 break;
1223
1224 // any overlap with next older clone?
1225 interval_set<uint64_t> cloning;
1226 interval_set<uint64_t> prev;
1227 if (size)
1228 prev.insert(0, size);
1229 for (int j=i-1; j>=0; j--) {
1230 hobject_t c = soid;
1231 c.snap = snapset.clones[j];
1232 prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
1233 if (!missing.is_missing(c) &&
1234 c < last_backfill &&
1235 get_parent()->try_lock_for_read(c, manager)) {
1236 dout(10) << "calc_clone_subsets " << soid << " has prev " << c
1237 << " overlap " << prev << dendl;
1238 clone_subsets[c] = prev;
1239 cloning.union_of(prev);
1240 break;
1241 }
1242 dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
1243 << " overlap " << prev << dendl;
1244 }
1245
1246 // overlap with next newest?
1247 interval_set<uint64_t> next;
1248 if (size)
1249 next.insert(0, size);
1250 for (unsigned j=i+1; j<snapset.clones.size(); j++) {
1251 hobject_t c = soid;
1252 c.snap = snapset.clones[j];
1253 next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
1254 if (!missing.is_missing(c) &&
1255 c < last_backfill &&
1256 get_parent()->try_lock_for_read(c, manager)) {
1257 dout(10) << "calc_clone_subsets " << soid << " has next " << c
1258 << " overlap " << next << dendl;
1259 clone_subsets[c] = next;
1260 cloning.union_of(next);
1261 break;
1262 }
1263 dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
1264 << " overlap " << next << dendl;
1265 }
1266
1267 if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
1268 dout(10) << "skipping clone, too many holes" << dendl;
1269 get_parent()->release_locks(manager);
1270 clone_subsets.clear();
1271 cloning.clear();
1272 }
1273
1274
1275 // what's left for us to push?
1276 data_subset.subtract(cloning);
1277
1278 dout(10) << "calc_clone_subsets " << soid
1279 << " data_subset " << data_subset
1280 << " clone_subsets " << clone_subsets << dendl;
1281}
1282
1283void ReplicatedBackend::prepare_pull(
1284 eversion_t v,
1285 const hobject_t& soid,
1286 ObjectContextRef headctx,
1287 RPGHandle *h)
1288{
11fdf7f2 1289 ceph_assert(get_parent()->get_local_missing().get_items().count(soid));
7c673cae
FG
1290 eversion_t _v = get_parent()->get_local_missing().get_items().find(
1291 soid)->second.need;
11fdf7f2 1292 ceph_assert(_v == v);
7c673cae
FG
1293 const map<hobject_t, set<pg_shard_t>> &missing_loc(
1294 get_parent()->get_missing_loc_shards());
1295 const map<pg_shard_t, pg_missing_t > &peer_missing(
1296 get_parent()->get_shard_missing());
1297 map<hobject_t, set<pg_shard_t>>::const_iterator q = missing_loc.find(soid);
11fdf7f2
TL
1298 ceph_assert(q != missing_loc.end());
1299 ceph_assert(!q->second.empty());
7c673cae
FG
1300
1301 // pick a pullee
11fdf7f2
TL
1302 auto p = q->second.begin();
1303 std::advance(p,
1304 util::generate_random_number<int>(0,
1305 q->second.size() - 1));
1306 ceph_assert(get_osdmap()->is_up(p->osd));
7c673cae
FG
1307 pg_shard_t fromshard = *p;
1308
1309 dout(7) << "pull " << soid
1310 << " v " << v
1311 << " on osds " << q->second
1312 << " from osd." << fromshard
1313 << dendl;
1314
11fdf7f2 1315 ceph_assert(peer_missing.count(fromshard));
7c673cae
FG
1316 const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
1317 if (pmissing.is_missing(soid, v)) {
11fdf7f2 1318 ceph_assert(pmissing.get_items().find(soid)->second.have != v);
7c673cae
FG
1319 dout(10) << "pulling soid " << soid << " from osd " << fromshard
1320 << " at version " << pmissing.get_items().find(soid)->second.have
1321 << " rather than at version " << v << dendl;
1322 v = pmissing.get_items().find(soid)->second.have;
11fdf7f2 1323 ceph_assert(get_parent()->get_log().get_log().objects.count(soid) &&
7c673cae
FG
1324 (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
1325 pg_log_entry_t::LOST_REVERT) &&
1326 (get_parent()->get_log().get_log().objects.find(
1327 soid)->second->reverting_to ==
1328 v));
1329 }
1330
1331 ObjectRecoveryInfo recovery_info;
1332 ObcLockManager lock_manager;
1333
1334 if (soid.is_snap()) {
11fdf7f2
TL
1335 ceph_assert(!get_parent()->get_local_missing().is_missing(soid.get_head()));
1336 ceph_assert(headctx);
7c673cae
FG
1337 // check snapset
1338 SnapSetContext *ssc = headctx->ssc;
11fdf7f2 1339 ceph_assert(ssc);
7c673cae
FG
1340 dout(10) << " snapset " << ssc->snapset << dendl;
1341 recovery_info.ss = ssc->snapset;
1342 calc_clone_subsets(
1343 ssc->snapset, soid, get_parent()->get_local_missing(),
1344 get_info().last_backfill,
1345 recovery_info.copy_subset,
1346 recovery_info.clone_subset,
1347 lock_manager);
1348 // FIXME: this may overestimate if we are pulling multiple clones in parallel...
1349 dout(10) << " pulling " << recovery_info << dendl;
1350
11fdf7f2 1351 ceph_assert(ssc->snapset.clone_size.count(soid.snap));
7c673cae
FG
1352 recovery_info.size = ssc->snapset.clone_size[soid.snap];
1353 } else {
1354 // pulling head or unversioned object.
1355 // always pull the whole thing.
1356 recovery_info.copy_subset.insert(0, (uint64_t)-1);
1357 recovery_info.size = ((uint64_t)-1);
1358 }
1359
1360 h->pulls[fromshard].push_back(PullOp());
1361 PullOp &op = h->pulls[fromshard].back();
1362 op.soid = soid;
1363
1364 op.recovery_info = recovery_info;
1365 op.recovery_info.soid = soid;
1366 op.recovery_info.version = v;
1367 op.recovery_progress.data_complete = false;
1368 op.recovery_progress.omap_complete = false;
1369 op.recovery_progress.data_recovered_to = 0;
1370 op.recovery_progress.first = true;
1371
11fdf7f2 1372 ceph_assert(!pulling.count(soid));
7c673cae
FG
1373 pull_from_peer[fromshard].insert(soid);
1374 PullInfo &pi = pulling[soid];
1375 pi.from = fromshard;
1376 pi.soid = soid;
1377 pi.head_ctx = headctx;
1378 pi.recovery_info = op.recovery_info;
1379 pi.recovery_progress = op.recovery_progress;
1380 pi.cache_dont_need = h->cache_dont_need;
1381 pi.lock_manager = std::move(lock_manager);
1382}
1383
1384/*
1385 * intelligently push an object to a replica. make use of existing
1386 * clones/heads and dup data ranges where possible.
1387 */
224ce89b 1388int ReplicatedBackend::prep_push_to_replica(
7c673cae
FG
1389 ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
1390 PushOp *pop, bool cache_dont_need)
1391{
1392 const object_info_t& oi = obc->obs.oi;
1393 uint64_t size = obc->obs.oi.size;
1394
1395 dout(10) << __func__ << ": " << soid << " v" << oi.version
1396 << " size " << size << " to osd." << peer << dendl;
1397
1398 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1399 interval_set<uint64_t> data_subset;
1400
1401 ObcLockManager lock_manager;
1402 // are we doing a clone on the replica?
1403 if (soid.snap && soid.snap < CEPH_NOSNAP) {
1404 hobject_t head = soid;
1405 head.snap = CEPH_NOSNAP;
1406
1407 // try to base push off of clones that succeed/preceed poid
1408 // we need the head (and current SnapSet) locally to do that.
1409 if (get_parent()->get_local_missing().is_missing(head)) {
1410 dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
1411 return prep_push(obc, soid, peer, pop, cache_dont_need);
1412 }
7c673cae
FG
1413
1414 SnapSetContext *ssc = obc->ssc;
11fdf7f2 1415 ceph_assert(ssc);
7c673cae
FG
1416 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1417 pop->recovery_info.ss = ssc->snapset;
1418 map<pg_shard_t, pg_missing_t>::const_iterator pm =
1419 get_parent()->get_shard_missing().find(peer);
11fdf7f2 1420 ceph_assert(pm != get_parent()->get_shard_missing().end());
7c673cae
FG
1421 map<pg_shard_t, pg_info_t>::const_iterator pi =
1422 get_parent()->get_shard_info().find(peer);
11fdf7f2 1423 ceph_assert(pi != get_parent()->get_shard_info().end());
7c673cae
FG
1424 calc_clone_subsets(
1425 ssc->snapset, soid,
1426 pm->second,
1427 pi->second.last_backfill,
1428 data_subset, clone_subsets,
1429 lock_manager);
1430 } else if (soid.snap == CEPH_NOSNAP) {
1431 // pushing head or unversioned object.
1432 // base this on partially on replica's clones?
1433 SnapSetContext *ssc = obc->ssc;
11fdf7f2 1434 ceph_assert(ssc);
7c673cae
FG
1435 dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
1436 calc_head_subsets(
1437 obc,
1438 ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
1439 get_parent()->get_shard_info().find(peer)->second.last_backfill,
1440 data_subset, clone_subsets,
1441 lock_manager);
1442 }
1443
224ce89b 1444 return prep_push(
7c673cae
FG
1445 obc,
1446 soid,
1447 peer,
1448 oi.version,
1449 data_subset,
1450 clone_subsets,
1451 pop,
1452 cache_dont_need,
1453 std::move(lock_manager));
1454}
1455
224ce89b 1456int ReplicatedBackend::prep_push(ObjectContextRef obc,
7c673cae
FG
1457 const hobject_t& soid, pg_shard_t peer,
1458 PushOp *pop, bool cache_dont_need)
1459{
1460 interval_set<uint64_t> data_subset;
1461 if (obc->obs.oi.size)
1462 data_subset.insert(0, obc->obs.oi.size);
1463 map<hobject_t, interval_set<uint64_t>> clone_subsets;
1464
224ce89b 1465 return prep_push(obc, soid, peer,
7c673cae
FG
1466 obc->obs.oi.version, data_subset, clone_subsets,
1467 pop, cache_dont_need, ObcLockManager());
1468}
1469
224ce89b 1470int ReplicatedBackend::prep_push(
7c673cae
FG
1471 ObjectContextRef obc,
1472 const hobject_t& soid, pg_shard_t peer,
1473 eversion_t version,
1474 interval_set<uint64_t> &data_subset,
1475 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
1476 PushOp *pop,
1477 bool cache_dont_need,
1478 ObcLockManager &&lock_manager)
1479{
1480 get_parent()->begin_peer_recover(peer, soid);
1481 // take note.
1482 PushInfo &pi = pushing[soid][peer];
1483 pi.obc = obc;
1484 pi.recovery_info.size = obc->obs.oi.size;
1485 pi.recovery_info.copy_subset = data_subset;
1486 pi.recovery_info.clone_subset = clone_subsets;
1487 pi.recovery_info.soid = soid;
1488 pi.recovery_info.oi = obc->obs.oi;
1489 pi.recovery_info.ss = pop->recovery_info.ss;
1490 pi.recovery_info.version = version;
1491 pi.lock_manager = std::move(lock_manager);
1492
1493 ObjectRecoveryProgress new_progress;
1494 int r = build_push_op(pi.recovery_info,
1495 pi.recovery_progress,
1496 &new_progress,
1497 pop,
1498 &(pi.stat), cache_dont_need);
224ce89b
WB
1499 if (r < 0)
1500 return r;
7c673cae 1501 pi.recovery_progress = new_progress;
224ce89b 1502 return 0;
7c673cae
FG
1503}
1504
1505void ReplicatedBackend::submit_push_data(
1506 const ObjectRecoveryInfo &recovery_info,
1507 bool first,
1508 bool complete,
1509 bool cache_dont_need,
1510 const interval_set<uint64_t> &intervals_included,
1511 bufferlist data_included,
1512 bufferlist omap_header,
1513 const map<string, bufferlist> &attrs,
1514 const map<string, bufferlist> &omap_entries,
1515 ObjectStore::Transaction *t)
1516{
1517 hobject_t target_oid;
1518 if (first && complete) {
1519 target_oid = recovery_info.soid;
1520 } else {
1521 target_oid = get_parent()->get_temp_recovery_object(recovery_info.soid,
1522 recovery_info.version);
1523 if (first) {
1524 dout(10) << __func__ << ": Adding oid "
1525 << target_oid << " in the temp collection" << dendl;
1526 add_temp_obj(target_oid);
1527 }
1528 }
1529
1530 if (first) {
1531 t->remove(coll, ghobject_t(target_oid));
1532 t->touch(coll, ghobject_t(target_oid));
1533 t->truncate(coll, ghobject_t(target_oid), recovery_info.size);
1534 if (omap_header.length())
1535 t->omap_setheader(coll, ghobject_t(target_oid), omap_header);
1536
1537 bufferlist bv = attrs.at(OI_ATTR);
1538 object_info_t oi(bv);
1539 t->set_alloc_hint(coll, ghobject_t(target_oid),
1540 oi.expected_object_size,
1541 oi.expected_write_size,
1542 oi.alloc_hint_flags);
11fdf7f2
TL
1543 if (get_parent()->pg_is_remote_backfilling()) {
1544 struct stat st;
1545 uint64_t size = 0;
1546 int r = store->stat(ch, ghobject_t(recovery_info.soid), &st);
1547 if (r == 0) {
1548 size = st.st_size;
1549 }
1550 // Don't need to do anything if object is still the same size
1551 if (size != recovery_info.oi.size) {
1552 get_parent()->pg_add_local_num_bytes((int64_t)recovery_info.oi.size - (int64_t)size);
1553 get_parent()->pg_add_num_bytes((int64_t)recovery_info.oi.size - (int64_t)size);
1554 dout(10) << __func__ << " " << recovery_info.soid
1555 << " backfill size " << recovery_info.oi.size
1556 << " previous size " << size
1557 << " net size " << recovery_info.oi.size - size
1558 << dendl;
1559 }
1560 }
7c673cae
FG
1561 }
1562 uint64_t off = 0;
1563 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
1564 if (cache_dont_need)
1565 fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
1566 for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
1567 p != intervals_included.end();
1568 ++p) {
1569 bufferlist bit;
1570 bit.substr_of(data_included, off, p.get_len());
1571 t->write(coll, ghobject_t(target_oid),
1572 p.get_start(), p.get_len(), bit, fadvise_flags);
1573 off += p.get_len();
1574 }
1575
1576 if (!omap_entries.empty())
1577 t->omap_setkeys(coll, ghobject_t(target_oid), omap_entries);
1578 if (!attrs.empty())
1579 t->setattrs(coll, ghobject_t(target_oid), attrs);
1580
1581 if (complete) {
1582 if (!first) {
1583 dout(10) << __func__ << ": Removing oid "
1584 << target_oid << " from the temp collection" << dendl;
1585 clear_temp_obj(target_oid);
1586 t->remove(coll, ghobject_t(recovery_info.soid));
1587 t->collection_move_rename(coll, ghobject_t(target_oid),
1588 coll, ghobject_t(recovery_info.soid));
1589 }
1590
1591 submit_push_complete(recovery_info, t);
1592 }
1593}
1594
1595void ReplicatedBackend::submit_push_complete(
1596 const ObjectRecoveryInfo &recovery_info,
1597 ObjectStore::Transaction *t)
1598{
1599 for (map<hobject_t, interval_set<uint64_t>>::const_iterator p =
1600 recovery_info.clone_subset.begin();
1601 p != recovery_info.clone_subset.end();
1602 ++p) {
1603 for (interval_set<uint64_t>::const_iterator q = p->second.begin();
1604 q != p->second.end();
1605 ++q) {
1606 dout(15) << " clone_range " << p->first << " "
1607 << q.get_start() << "~" << q.get_len() << dendl;
1608 t->clone_range(coll, ghobject_t(p->first), ghobject_t(recovery_info.soid),
1609 q.get_start(), q.get_len(), q.get_start());
1610 }
1611 }
1612}
1613
1614ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
1615 const ObjectRecoveryInfo& recovery_info,
1616 SnapSetContext *ssc,
1617 ObcLockManager &manager)
1618{
1619 if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
1620 return recovery_info;
1621 ObjectRecoveryInfo new_info = recovery_info;
1622 new_info.copy_subset.clear();
1623 new_info.clone_subset.clear();
11fdf7f2 1624 ceph_assert(ssc);
7c673cae
FG
1625 get_parent()->release_locks(manager); // might already have locks
1626 calc_clone_subsets(
1627 ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
1628 get_info().last_backfill,
1629 new_info.copy_subset, new_info.clone_subset,
1630 manager);
1631 return new_info;
1632}
1633
1634bool ReplicatedBackend::handle_pull_response(
1635 pg_shard_t from, const PushOp &pop, PullOp *response,
1636 list<pull_complete_info> *to_continue,
1637 ObjectStore::Transaction *t)
1638{
1639 interval_set<uint64_t> data_included = pop.data_included;
1640 bufferlist data;
1641 data = pop.data;
1642 dout(10) << "handle_pull_response "
1643 << pop.recovery_info
1644 << pop.after_progress
1645 << " data.size() is " << data.length()
1646 << " data_included: " << data_included
1647 << dendl;
1648 if (pop.version == eversion_t()) {
1649 // replica doesn't have it!
224ce89b 1650 _failed_pull(from, pop.soid);
7c673cae
FG
1651 return false;
1652 }
1653
1654 const hobject_t &hoid = pop.soid;
11fdf7f2 1655 ceph_assert((data_included.empty() && data.length() == 0) ||
7c673cae
FG
1656 (!data_included.empty() && data.length() > 0));
1657
1658 auto piter = pulling.find(hoid);
1659 if (piter == pulling.end()) {
1660 return false;
1661 }
1662
1663 PullInfo &pi = piter->second;
1664 if (pi.recovery_info.size == (uint64_t(-1))) {
1665 pi.recovery_info.size = pop.recovery_info.size;
1666 pi.recovery_info.copy_subset.intersection_of(
1667 pop.recovery_info.copy_subset);
1668 }
224ce89b
WB
1669 // If primary doesn't have object info and didn't know version
1670 if (pi.recovery_info.version == eversion_t()) {
1671 pi.recovery_info.version = pop.version;
1672 }
7c673cae
FG
1673
1674 bool first = pi.recovery_progress.first;
1675 if (first) {
1676 // attrs only reference the origin bufferlist (decode from
1677 // MOSDPGPush message) whose size is much greater than attrs in
1678 // recovery. If obc cache it (get_obc maybe cache the attr), this
1679 // causes the whole origin bufferlist would not be free until obc
1680 // is evicted from obc cache. So rebuild the bufferlists before
1681 // cache it.
1682 auto attrset = pop.attrset;
1683 for (auto& a : attrset) {
1684 a.second.rebuild();
1685 }
1686 pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset);
1687 pi.recovery_info.oi = pi.obc->obs.oi;
1688 pi.recovery_info = recalc_subsets(
1689 pi.recovery_info,
1690 pi.obc->ssc,
1691 pi.lock_manager);
1692 }
1693
1694
1695 interval_set<uint64_t> usable_intervals;
1696 bufferlist usable_data;
1697 trim_pushed_data(pi.recovery_info.copy_subset,
1698 data_included,
1699 data,
1700 &usable_intervals,
1701 &usable_data);
1702 data_included = usable_intervals;
1703 data.claim(usable_data);
1704
1705
1706 pi.recovery_progress = pop.after_progress;
1707
1708 dout(10) << "new recovery_info " << pi.recovery_info
1709 << ", new progress " << pi.recovery_progress
1710 << dendl;
1711
1712 bool complete = pi.is_complete();
1713
1714 submit_push_data(pi.recovery_info, first,
1715 complete, pi.cache_dont_need,
1716 data_included, data,
1717 pop.omap_header,
1718 pop.attrset,
1719 pop.omap_entries,
1720 t);
1721
1722 pi.stat.num_keys_recovered += pop.omap_entries.size();
1723 pi.stat.num_bytes_recovered += data.length();
11fdf7f2 1724 get_parent()->get_logger()->inc(l_osd_rbytes, pop.omap_entries.size() + data.length());
7c673cae
FG
1725
1726 if (complete) {
1727 pi.stat.num_objects_recovered++;
11fdf7f2
TL
1728 // XXX: This could overcount if regular recovery is needed right after a repair
1729 if (get_parent()->pg_is_repair()) {
1730 pi.stat.num_objects_repaired++;
1731 get_parent()->inc_osd_stat_repaired();
1732 }
7c673cae
FG
1733 clear_pull_from(piter);
1734 to_continue->push_back({hoid, pi.stat});
1735 get_parent()->on_local_recover(
c07f9fc5 1736 hoid, pi.recovery_info, pi.obc, false, t);
7c673cae
FG
1737 return false;
1738 } else {
1739 response->soid = pop.soid;
1740 response->recovery_info = pi.recovery_info;
1741 response->recovery_progress = pi.recovery_progress;
1742 return true;
1743 }
1744}
1745
1746void ReplicatedBackend::handle_push(
1747 pg_shard_t from, const PushOp &pop, PushReplyOp *response,
11fdf7f2 1748 ObjectStore::Transaction *t, bool is_repair)
7c673cae
FG
1749{
1750 dout(10) << "handle_push "
1751 << pop.recovery_info
1752 << pop.after_progress
1753 << dendl;
1754 bufferlist data;
1755 data = pop.data;
1756 bool first = pop.before_progress.first;
1757 bool complete = pop.after_progress.data_complete &&
1758 pop.after_progress.omap_complete;
1759
1760 response->soid = pop.recovery_info.soid;
1761 submit_push_data(pop.recovery_info,
1762 first,
1763 complete,
1764 true, // must be replicate
1765 pop.data_included,
1766 data,
1767 pop.omap_header,
1768 pop.attrset,
1769 pop.omap_entries,
1770 t);
1771
11fdf7f2
TL
1772 if (complete) {
1773 if (is_repair) {
1774 get_parent()->inc_osd_stat_repaired();
1775 dout(20) << __func__ << " repair complete" << dendl;
1776 }
7c673cae
FG
1777 get_parent()->on_local_recover(
1778 pop.recovery_info.soid,
1779 pop.recovery_info,
1780 ObjectContextRef(), // ok, is replica
c07f9fc5 1781 false,
7c673cae 1782 t);
11fdf7f2 1783 }
7c673cae
FG
1784}
1785
1786void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
1787{
1788 for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
1789 i != pushes.end();
1790 ++i) {
1791 ConnectionRef con = get_parent()->get_con_osd_cluster(
1792 i->first.osd,
11fdf7f2 1793 get_osdmap_epoch());
7c673cae
FG
1794 if (!con)
1795 continue;
1796 vector<PushOp>::iterator j = i->second.begin();
1797 while (j != i->second.end()) {
1798 uint64_t cost = 0;
1799 uint64_t pushes = 0;
1800 MOSDPGPush *msg = new MOSDPGPush();
1801 msg->from = get_parent()->whoami_shard();
1802 msg->pgid = get_parent()->primary_spg_t();
11fdf7f2 1803 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
1804 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1805 msg->set_priority(prio);
11fdf7f2 1806 msg->is_repair = get_parent()->pg_is_repair();
7c673cae
FG
1807 for (;
1808 (j != i->second.end() &&
1809 cost < cct->_conf->osd_max_push_cost &&
1810 pushes < cct->_conf->osd_max_push_objects) ;
1811 ++j) {
1812 dout(20) << __func__ << ": sending push " << *j
1813 << " to osd." << i->first << dendl;
1814 cost += j->cost(cct);
1815 pushes += 1;
1816 msg->pushes.push_back(*j);
1817 }
1818 msg->set_cost(cost);
1819 get_parent()->send_message_osd_cluster(msg, con);
1820 }
1821 }
1822}
1823
1824void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
1825{
1826 for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
1827 i != pulls.end();
1828 ++i) {
1829 ConnectionRef con = get_parent()->get_con_osd_cluster(
1830 i->first.osd,
11fdf7f2 1831 get_osdmap_epoch());
7c673cae
FG
1832 if (!con)
1833 continue;
1834 dout(20) << __func__ << ": sending pulls " << i->second
1835 << " to osd." << i->first << dendl;
1836 MOSDPGPull *msg = new MOSDPGPull();
1837 msg->from = parent->whoami_shard();
1838 msg->set_priority(prio);
1839 msg->pgid = get_parent()->primary_spg_t();
11fdf7f2 1840 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
1841 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
1842 msg->set_pulls(&i->second);
1843 msg->compute_cost(cct);
1844 get_parent()->send_message_osd_cluster(msg, con);
1845 }
1846}
1847
1848int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
1849 const ObjectRecoveryProgress &progress,
1850 ObjectRecoveryProgress *out_progress,
1851 PushOp *out_op,
1852 object_stat_sum_t *stat,
1853 bool cache_dont_need)
1854{
1855 ObjectRecoveryProgress _new_progress;
1856 if (!out_progress)
1857 out_progress = &_new_progress;
1858 ObjectRecoveryProgress &new_progress = *out_progress;
1859 new_progress = progress;
1860
224ce89b 1861 dout(7) << __func__ << " " << recovery_info.soid
7c673cae
FG
1862 << " v " << recovery_info.version
1863 << " size " << recovery_info.size
1864 << " recovery_info: " << recovery_info
1865 << dendl;
1866
224ce89b 1867 eversion_t v = recovery_info.version;
11fdf7f2 1868 object_info_t oi;
7c673cae 1869 if (progress.first) {
11fdf7f2 1870 int r = store->omap_get_header(ch, ghobject_t(recovery_info.soid), &out_op->omap_header);
7c673cae
FG
1871 if(r < 0) {
1872 dout(1) << __func__ << " get omap header failed: " << cpp_strerror(-r) << dendl;
1873 return r;
1874 }
1875 r = store->getattrs(ch, ghobject_t(recovery_info.soid), out_op->attrset);
1876 if(r < 0) {
1877 dout(1) << __func__ << " getattrs failed: " << cpp_strerror(-r) << dendl;
1878 return r;
1879 }
1880
1881 // Debug
1882 bufferlist bv = out_op->attrset[OI_ATTR];
224ce89b 1883 try {
11fdf7f2
TL
1884 auto bliter = bv.cbegin();
1885 decode(oi, bliter);
224ce89b
WB
1886 } catch (...) {
1887 dout(0) << __func__ << ": bad object_info_t: " << recovery_info.soid << dendl;
1888 return -EINVAL;
1889 }
7c673cae 1890
224ce89b
WB
1891 // If requestor didn't know the version, use ours
1892 if (v == eversion_t()) {
1893 v = oi.version;
1894 } else if (oi.version != v) {
7c673cae
FG
1895 get_parent()->clog_error() << get_info().pgid << " push "
1896 << recovery_info.soid << " v "
1897 << recovery_info.version
1898 << " failed because local copy is "
1899 << oi.version;
1900 return -EINVAL;
1901 }
1902
1903 new_progress.first = false;
1904 }
224ce89b
WB
1905 // Once we provide the version subsequent requests will have it, so
1906 // at this point it must be known.
11fdf7f2 1907 ceph_assert(v != eversion_t());
7c673cae
FG
1908
1909 uint64_t available = cct->_conf->osd_recovery_max_chunk;
1910 if (!progress.omap_complete) {
1911 ObjectMap::ObjectMapIterator iter =
11fdf7f2 1912 store->get_omap_iterator(ch,
7c673cae 1913 ghobject_t(recovery_info.soid));
11fdf7f2 1914 ceph_assert(iter);
7c673cae
FG
1915 for (iter->lower_bound(progress.omap_recovered_to);
1916 iter->valid();
11fdf7f2 1917 iter->next()) {
7c673cae
FG
1918 if (!out_op->omap_entries.empty() &&
1919 ((cct->_conf->osd_recovery_max_omap_entries_per_chunk > 0 &&
1920 out_op->omap_entries.size() >= cct->_conf->osd_recovery_max_omap_entries_per_chunk) ||
1921 available <= iter->key().size() + iter->value().length()))
1922 break;
1923 out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
1924
1925 if ((iter->key().size() + iter->value().length()) <= available)
1926 available -= (iter->key().size() + iter->value().length());
1927 else
1928 available = 0;
1929 }
1930 if (!iter->valid())
1931 new_progress.omap_complete = true;
1932 else
1933 new_progress.omap_recovered_to = iter->key();
1934 }
1935
1936 if (available > 0) {
1937 if (!recovery_info.copy_subset.empty()) {
1938 interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
1939 map<uint64_t, uint64_t> m;
1940 int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0,
1941 copy_subset.range_end(), m);
1942 if (r >= 0) {
1943 interval_set<uint64_t> fiemap_included(m);
1944 copy_subset.intersection_of(fiemap_included);
1945 } else {
1946 // intersection of copy_subset and empty interval_set would be empty anyway
1947 copy_subset.clear();
1948 }
1949
1950 out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
1951 available);
1952 if (out_op->data_included.empty()) // zero filled section, skip to end!
1953 new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
1954 else
1955 new_progress.data_recovered_to = out_op->data_included.range_end();
1956 }
1957 } else {
1958 out_op->data_included.clear();
1959 }
1960
1961 for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
1962 p != out_op->data_included.end();
1963 ++p) {
1964 bufferlist bit;
224ce89b 1965 int r = store->read(ch, ghobject_t(recovery_info.soid),
7c673cae
FG
1966 p.get_start(), p.get_len(), bit,
1967 cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
224ce89b
WB
1968 if (cct->_conf->osd_debug_random_push_read_error &&
1969 (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) {
1970 dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
1971 r = -EIO;
1972 }
1973 if (r < 0) {
1974 return r;
1975 }
7c673cae
FG
1976 if (p.get_len() != bit.length()) {
1977 dout(10) << " extent " << p.get_start() << "~" << p.get_len()
1978 << " is actually " << p.get_start() << "~" << bit.length()
1979 << dendl;
1980 interval_set<uint64_t>::iterator save = p++;
1981 if (bit.length() == 0)
1982 out_op->data_included.erase(save); //Remove this empty interval
1983 else
1984 save.set_len(bit.length());
1985 // Remove any other intervals present
1986 while (p != out_op->data_included.end()) {
1987 interval_set<uint64_t>::iterator save = p++;
1988 out_op->data_included.erase(save);
1989 }
1990 new_progress.data_complete = true;
1991 out_op->data.claim_append(bit);
1992 break;
1993 }
1994 out_op->data.claim_append(bit);
1995 }
11fdf7f2
TL
1996 if (progress.first && out_op->data_included.begin().get_start() == 0 &&
1997 out_op->data.length() == oi.size && oi.is_data_digest()) {
1998 uint32_t crc = out_op->data.crc32c(-1);
1999 if (oi.data_digest != crc) {
2000 dout(0) << __func__ << " " << coll << std::hex
2001 << " full-object read crc 0x" << crc
2002 << " != expected 0x" << oi.data_digest
2003 << std::dec << " on " << recovery_info.soid << dendl;
2004 return -EIO;
2005 }
2006 }
7c673cae
FG
2007
2008 if (new_progress.is_complete(recovery_info)) {
2009 new_progress.data_complete = true;
11fdf7f2 2010 if (stat) {
7c673cae 2011 stat->num_objects_recovered++;
11fdf7f2
TL
2012 if (get_parent()->pg_is_repair())
2013 stat->num_objects_repaired++;
2014 }
7c673cae
FG
2015 }
2016
2017 if (stat) {
2018 stat->num_keys_recovered += out_op->omap_entries.size();
2019 stat->num_bytes_recovered += out_op->data.length();
11fdf7f2 2020 get_parent()->get_logger()->inc(l_osd_rbytes, out_op->omap_entries.size() + out_op->data.length());
7c673cae
FG
2021 }
2022
2023 get_parent()->get_logger()->inc(l_osd_push);
2024 get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
2025
2026 // send
224ce89b 2027 out_op->version = v;
7c673cae
FG
2028 out_op->soid = recovery_info.soid;
2029 out_op->recovery_info = recovery_info;
2030 out_op->after_progress = new_progress;
2031 out_op->before_progress = progress;
2032 return 0;
2033}
2034
2035void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
2036{
2037 op->recovery_info.version = eversion_t();
2038 op->version = eversion_t();
2039 op->soid = soid;
2040}
2041
2042bool ReplicatedBackend::handle_push_reply(
2043 pg_shard_t peer, const PushReplyOp &op, PushOp *reply)
2044{
2045 const hobject_t &soid = op.soid;
2046 if (pushing.count(soid) == 0) {
2047 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2048 << ", or anybody else"
2049 << dendl;
2050 return false;
2051 } else if (pushing[soid].count(peer) == 0) {
2052 dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
2053 << dendl;
2054 return false;
2055 } else {
2056 PushInfo *pi = &pushing[soid][peer];
224ce89b 2057 bool error = pushing[soid].begin()->second.recovery_progress.error;
7c673cae 2058
224ce89b 2059 if (!pi->recovery_progress.data_complete && !error) {
7c673cae
FG
2060 dout(10) << " pushing more from, "
2061 << pi->recovery_progress.data_recovered_to
2062 << " of " << pi->recovery_info.copy_subset << dendl;
2063 ObjectRecoveryProgress new_progress;
2064 int r = build_push_op(
2065 pi->recovery_info,
2066 pi->recovery_progress, &new_progress, reply,
2067 &(pi->stat));
224ce89b 2068 // Handle the case of a read error right after we wrote, which is
11fdf7f2 2069 // hopefully extremely rare.
224ce89b
WB
2070 if (r < 0) {
2071 dout(5) << __func__ << ": oid " << soid << " error " << r << dendl;
2072
2073 error = true;
2074 goto done;
2075 }
7c673cae
FG
2076 pi->recovery_progress = new_progress;
2077 return true;
2078 } else {
2079 // done!
224ce89b
WB
2080done:
2081 if (!error)
2082 get_parent()->on_peer_recover( peer, soid, pi->recovery_info);
7c673cae
FG
2083
2084 get_parent()->release_locks(pi->lock_manager);
2085 object_stat_sum_t stat = pi->stat;
224ce89b 2086 eversion_t v = pi->recovery_info.version;
7c673cae
FG
2087 pushing[soid].erase(peer);
2088 pi = NULL;
2089
2090 if (pushing[soid].empty()) {
224ce89b 2091 if (!error)
c07f9fc5 2092 get_parent()->on_global_recover(soid, stat, false);
224ce89b
WB
2093 else
2094 get_parent()->on_primary_error(soid, v);
7c673cae
FG
2095 pushing.erase(soid);
2096 } else {
224ce89b
WB
2097 // This looks weird, but we erased the current peer and need to remember
2098 // the error on any other one, while getting more acks.
2099 if (error)
2100 pushing[soid].begin()->second.recovery_progress.error = true;
7c673cae
FG
2101 dout(10) << "pushed " << soid << ", still waiting for push ack from "
2102 << pushing[soid].size() << " others" << dendl;
2103 }
2104 return false;
2105 }
2106 }
2107}
2108
2109void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
2110{
2111 const hobject_t &soid = op.soid;
2112 struct stat st;
2113 int r = store->stat(ch, ghobject_t(soid), &st);
2114 if (r != 0) {
2115 get_parent()->clog_error() << get_info().pgid << " "
2116 << peer << " tried to pull " << soid
2117 << " but got " << cpp_strerror(-r);
2118 prep_push_op_blank(soid, reply);
2119 } else {
2120 ObjectRecoveryInfo &recovery_info = op.recovery_info;
2121 ObjectRecoveryProgress &progress = op.recovery_progress;
2122 if (progress.first && recovery_info.size == ((uint64_t)-1)) {
2123 // Adjust size and copy_subset
2124 recovery_info.size = st.st_size;
2125 recovery_info.copy_subset.clear();
2126 if (st.st_size)
2127 recovery_info.copy_subset.insert(0, st.st_size);
11fdf7f2 2128 ceph_assert(recovery_info.clone_subset.empty());
7c673cae
FG
2129 }
2130
2131 r = build_push_op(recovery_info, progress, 0, reply);
2132 if (r < 0)
2133 prep_push_op_blank(soid, reply);
2134 }
2135}
2136
2137/**
2138 * trim received data to remove what we don't want
2139 *
2140 * @param copy_subset intervals we want
2141 * @param data_included intervals we got
2142 * @param data_recieved data we got
2143 * @param intervals_usable intervals we want to keep
2144 * @param data_usable matching data we want to keep
2145 */
2146void ReplicatedBackend::trim_pushed_data(
2147 const interval_set<uint64_t> &copy_subset,
2148 const interval_set<uint64_t> &intervals_received,
2149 bufferlist data_received,
2150 interval_set<uint64_t> *intervals_usable,
2151 bufferlist *data_usable)
2152{
2153 if (intervals_received.subset_of(copy_subset)) {
2154 *intervals_usable = intervals_received;
2155 *data_usable = data_received;
2156 return;
2157 }
2158
2159 intervals_usable->intersection_of(copy_subset,
2160 intervals_received);
2161
2162 uint64_t off = 0;
2163 for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
2164 p != intervals_received.end();
2165 ++p) {
2166 interval_set<uint64_t> x;
2167 x.insert(p.get_start(), p.get_len());
2168 x.intersection_of(copy_subset);
2169 for (interval_set<uint64_t>::const_iterator q = x.begin();
2170 q != x.end();
2171 ++q) {
2172 bufferlist sub;
2173 uint64_t data_off = off + (q.get_start() - p.get_start());
2174 sub.substr_of(data_received, data_off, q.get_len());
2175 data_usable->claim_append(sub);
2176 }
2177 off += p.get_len();
2178 }
2179}
2180
224ce89b 2181void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid)
7c673cae 2182{
224ce89b 2183 dout(20) << __func__ << ": " << soid << " from " << from << dendl;
7c673cae
FG
2184 list<pg_shard_t> fl = { from };
2185 get_parent()->failed_push(fl, soid);
2186
2187 clear_pull(pulling.find(soid));
2188}
2189
2190void ReplicatedBackend::clear_pull_from(
2191 map<hobject_t, PullInfo>::iterator piter)
2192{
2193 auto from = piter->second.from;
2194 pull_from_peer[from].erase(piter->second.soid);
2195 if (pull_from_peer[from].empty())
2196 pull_from_peer.erase(from);
2197}
2198
2199void ReplicatedBackend::clear_pull(
2200 map<hobject_t, PullInfo>::iterator piter,
2201 bool clear_pull_from_peer)
2202{
2203 if (clear_pull_from_peer) {
2204 clear_pull_from(piter);
2205 }
2206 get_parent()->release_locks(piter->second.lock_manager);
2207 pulling.erase(piter);
2208}
2209
2210int ReplicatedBackend::start_pushes(
2211 const hobject_t &soid,
2212 ObjectContextRef obc,
2213 RPGHandle *h)
2214{
224ce89b
WB
2215 list< map<pg_shard_t, pg_missing_t>::const_iterator > shards;
2216
2217 dout(20) << __func__ << " soid " << soid << dendl;
7c673cae 2218 // who needs it?
11fdf7f2 2219 ceph_assert(get_parent()->get_acting_recovery_backfill_shards().size() > 0);
7c673cae 2220 for (set<pg_shard_t>::iterator i =
11fdf7f2
TL
2221 get_parent()->get_acting_recovery_backfill_shards().begin();
2222 i != get_parent()->get_acting_recovery_backfill_shards().end();
7c673cae
FG
2223 ++i) {
2224 if (*i == get_parent()->whoami_shard()) continue;
2225 pg_shard_t peer = *i;
2226 map<pg_shard_t, pg_missing_t>::const_iterator j =
2227 get_parent()->get_shard_missing().find(peer);
11fdf7f2 2228 ceph_assert(j != get_parent()->get_shard_missing().end());
7c673cae 2229 if (j->second.is_missing(soid)) {
224ce89b
WB
2230 shards.push_back(j);
2231 }
2232 }
2233
2234 // If more than 1 read will occur ignore possible request to not cache
2235 bool cache = shards.size() == 1 ? h->cache_dont_need : false;
2236
2237 for (auto j : shards) {
2238 pg_shard_t peer = j->first;
2239 h->pushes[peer].push_back(PushOp());
2240 int r = prep_push_to_replica(obc, soid, peer,
2241 &(h->pushes[peer].back()), cache);
2242 if (r < 0) {
2243 // Back out all failed reads
2244 for (auto k : shards) {
2245 pg_shard_t p = k->first;
2246 dout(10) << __func__ << " clean up peer " << p << dendl;
2247 h->pushes[p].pop_back();
2248 if (p == peer) break;
2249 }
2250 return r;
7c673cae
FG
2251 }
2252 }
224ce89b 2253 return shards.size();
7c673cae 2254}