]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/ECBackend.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / osd / ECBackend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <iostream>
16 #include <sstream>
17
18 #include "ECBackend.h"
19 #include "messages/MOSDPGPush.h"
20 #include "messages/MOSDPGPushReply.h"
21 #include "messages/MOSDECSubOpWrite.h"
22 #include "messages/MOSDECSubOpWriteReply.h"
23 #include "messages/MOSDECSubOpRead.h"
24 #include "messages/MOSDECSubOpReadReply.h"
25 #include "ECMsgTypes.h"
26
27 #include "PrimaryLogPG.h"
28
29 #define dout_context cct
30 #define dout_subsys ceph_subsys_osd
31 #define DOUT_PREFIX_ARGS this
32 #undef dout_prefix
33 #define dout_prefix _prefix(_dout, this)
34 static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
35 return pgb->get_parent()->gen_dbg_prefix(*_dout);
36 }
37
38 struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
39 list<ECBackend::RecoveryOp> ops;
40 };
41
42 ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
43 switch (rhs.pipeline_state) {
44 case ECBackend::pipeline_state_t::CACHE_VALID:
45 return lhs << "CACHE_VALID";
46 case ECBackend::pipeline_state_t::CACHE_INVALID:
47 return lhs << "CACHE_INVALID";
48 default:
49 ceph_abort_msg("invalid pipeline state");
50 }
51 return lhs; // unreachable
52 }
53
54 static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
55 {
56 lhs << "[";
57 for (map<pg_shard_t, bufferlist>::const_iterator i = rhs.begin();
58 i != rhs.end();
59 ++i) {
60 if (i != rhs.begin())
61 lhs << ", ";
62 lhs << make_pair(i->first, i->second.length());
63 }
64 return lhs << "]";
65 }
66
67 static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
68 {
69 lhs << "[";
70 for (map<int, bufferlist>::const_iterator i = rhs.begin();
71 i != rhs.end();
72 ++i) {
73 if (i != rhs.begin())
74 lhs << ", ";
75 lhs << make_pair(i->first, i->second.length());
76 }
77 return lhs << "]";
78 }
79
80 static ostream &operator<<(
81 ostream &lhs,
82 const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
83 {
84 return lhs << "(" << rhs.get<0>() << ", "
85 << rhs.get<1>() << ", " << rhs.get<2>() << ")";
86 }
87
88 ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
89 {
90 return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
91 << ", need=" << rhs.need
92 << ", want_attrs=" << rhs.want_attrs
93 << ")";
94 }
95
96 ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
97 {
98 lhs << "read_result_t(r=" << rhs.r
99 << ", errors=" << rhs.errors;
100 if (rhs.attrs) {
101 lhs << ", attrs=" << *(rhs.attrs);
102 } else {
103 lhs << ", noattrs";
104 }
105 return lhs << ", returned=" << rhs.returned << ")";
106 }
107
108 ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
109 {
110 lhs << "ReadOp(tid=" << rhs.tid;
111 if (rhs.op && rhs.op->get_req()) {
112 lhs << ", op=";
113 rhs.op->get_req()->print(lhs);
114 }
115 return lhs << ", to_read=" << rhs.to_read
116 << ", complete=" << rhs.complete
117 << ", priority=" << rhs.priority
118 << ", obj_to_source=" << rhs.obj_to_source
119 << ", source_to_obj=" << rhs.source_to_obj
120 << ", in_progress=" << rhs.in_progress << ")";
121 }
122
123 void ECBackend::ReadOp::dump(Formatter *f) const
124 {
125 f->dump_unsigned("tid", tid);
126 if (op && op->get_req()) {
127 f->dump_stream("op") << *(op->get_req());
128 }
129 f->dump_stream("to_read") << to_read;
130 f->dump_stream("complete") << complete;
131 f->dump_int("priority", priority);
132 f->dump_stream("obj_to_source") << obj_to_source;
133 f->dump_stream("source_to_obj") << source_to_obj;
134 f->dump_stream("in_progress") << in_progress;
135 }
136
137 ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
138 {
139 lhs << "Op(" << rhs.hoid
140 << " v=" << rhs.version
141 << " tt=" << rhs.trim_to
142 << " tid=" << rhs.tid
143 << " reqid=" << rhs.reqid;
144 if (rhs.client_op && rhs.client_op->get_req()) {
145 lhs << " client_op=";
146 rhs.client_op->get_req()->print(lhs);
147 }
148 lhs << " roll_forward_to=" << rhs.roll_forward_to
149 << " temp_added=" << rhs.temp_added
150 << " temp_cleared=" << rhs.temp_cleared
151 << " pending_read=" << rhs.pending_read
152 << " remote_read=" << rhs.remote_read
153 << " remote_read_result=" << rhs.remote_read_result
154 << " pending_apply=" << rhs.pending_apply
155 << " pending_commit=" << rhs.pending_commit
156 << " plan.to_read=" << rhs.plan.to_read
157 << " plan.will_write=" << rhs.plan.will_write
158 << ")";
159 return lhs;
160 }
161
162 ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
163 {
164 return lhs << "RecoveryOp("
165 << "hoid=" << rhs.hoid
166 << " v=" << rhs.v
167 << " missing_on=" << rhs.missing_on
168 << " missing_on_shards=" << rhs.missing_on_shards
169 << " recovery_info=" << rhs.recovery_info
170 << " recovery_progress=" << rhs.recovery_progress
171 << " obc refcount=" << rhs.obc.use_count()
172 << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
173 << " waiting_on_pushes=" << rhs.waiting_on_pushes
174 << " extent_requested=" << rhs.extent_requested
175 << ")";
176 }
177
178 void ECBackend::RecoveryOp::dump(Formatter *f) const
179 {
180 f->dump_stream("hoid") << hoid;
181 f->dump_stream("v") << v;
182 f->dump_stream("missing_on") << missing_on;
183 f->dump_stream("missing_on_shards") << missing_on_shards;
184 f->dump_stream("recovery_info") << recovery_info;
185 f->dump_stream("recovery_progress") << recovery_progress;
186 f->dump_stream("state") << tostr(state);
187 f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
188 f->dump_stream("extent_requested") << extent_requested;
189 }
190
191 ECBackend::ECBackend(
192 PGBackend::Listener *pg,
193 const coll_t &coll,
194 ObjectStore::CollectionHandle &ch,
195 ObjectStore *store,
196 CephContext *cct,
197 ErasureCodeInterfaceRef ec_impl,
198 uint64_t stripe_width)
199 : PGBackend(cct, pg, store, coll, ch),
200 ec_impl(ec_impl),
201 sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
202 ceph_assert((ec_impl->get_data_chunk_count() *
203 ec_impl->get_chunk_size(stripe_width)) == stripe_width);
204 }
205
206 PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
207 {
208 return new ECRecoveryHandle;
209 }
210
211 void ECBackend::_failed_push(const hobject_t &hoid,
212 pair<RecoveryMessages *, ECBackend::read_result_t &> &in)
213 {
214 ECBackend::read_result_t &res = in.second;
215 dout(10) << __func__ << ": Read error " << hoid << " r="
216 << res.r << " errors=" << res.errors << dendl;
217 dout(10) << __func__ << ": canceling recovery op for obj " << hoid
218 << dendl;
219 ceph_assert(recovery_ops.count(hoid));
220 eversion_t v = recovery_ops[hoid].v;
221 recovery_ops.erase(hoid);
222
223 set<pg_shard_t> fl;
224 for (auto&& i : res.errors) {
225 fl.insert(i.first);
226 }
227 get_parent()->on_failed_pull(fl, hoid, v);
228 }
229
230 struct OnRecoveryReadComplete :
231 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
232 ECBackend *pg;
233 hobject_t hoid;
234 OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid)
235 : pg(pg), hoid(hoid) {}
236 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
237 ECBackend::read_result_t &res = in.second;
238 if (!(res.r == 0 && res.errors.empty())) {
239 pg->_failed_push(hoid, in);
240 return;
241 }
242 ceph_assert(res.returned.size() == 1);
243 pg->handle_recovery_read_complete(
244 hoid,
245 res.returned.back(),
246 res.attrs,
247 in.first);
248 }
249 };
250
251 struct RecoveryMessages {
252 map<hobject_t,
253 ECBackend::read_request_t> reads;
254 map<hobject_t, set<int>> want_to_read;
255 void read(
256 ECBackend *ec,
257 const hobject_t &hoid, uint64_t off, uint64_t len,
258 set<int> &&_want_to_read,
259 const map<pg_shard_t, vector<pair<int, int>>> &need,
260 bool attrs) {
261 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
262 to_read.push_back(boost::make_tuple(off, len, 0));
263 ceph_assert(!reads.count(hoid));
264 want_to_read.insert(make_pair(hoid, std::move(_want_to_read)));
265 reads.insert(
266 make_pair(
267 hoid,
268 ECBackend::read_request_t(
269 to_read,
270 need,
271 attrs,
272 new OnRecoveryReadComplete(
273 ec,
274 hoid))));
275 }
276
277 map<pg_shard_t, vector<PushOp> > pushes;
278 map<pg_shard_t, vector<PushReplyOp> > push_replies;
279 ObjectStore::Transaction t;
280 RecoveryMessages() {}
281 ~RecoveryMessages(){}
282 };
283
284 void ECBackend::handle_recovery_push(
285 const PushOp &op,
286 RecoveryMessages *m,
287 bool is_repair)
288 {
289 if (get_parent()->check_failsafe_full()) {
290 dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl;
291 ceph_abort();
292 }
293
294 bool oneshot = op.before_progress.first && op.after_progress.data_complete;
295 ghobject_t tobj;
296 if (oneshot) {
297 tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
298 get_parent()->whoami_shard().shard);
299 } else {
300 tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
301 op.version),
302 ghobject_t::NO_GEN,
303 get_parent()->whoami_shard().shard);
304 if (op.before_progress.first) {
305 dout(10) << __func__ << ": Adding oid "
306 << tobj.hobj << " in the temp collection" << dendl;
307 add_temp_obj(tobj.hobj);
308 }
309 }
310
311 if (op.before_progress.first) {
312 m->t.remove(coll, tobj);
313 m->t.touch(coll, tobj);
314 }
315
316 if (!op.data_included.empty()) {
317 uint64_t start = op.data_included.range_start();
318 uint64_t end = op.data_included.range_end();
319 ceph_assert(op.data.length() == (end - start));
320
321 m->t.write(
322 coll,
323 tobj,
324 start,
325 op.data.length(),
326 op.data);
327 } else {
328 ceph_assert(op.data.length() == 0);
329 }
330
331 if (get_parent()->pg_is_remote_backfilling()) {
332 get_parent()->pg_add_local_num_bytes(op.data.length());
333 get_parent()->pg_add_num_bytes(op.data.length() * get_ec_data_chunk_count());
334 dout(10) << __func__ << " " << op.soid
335 << " add new actual data by " << op.data.length()
336 << " add new num_bytes by " << op.data.length() * get_ec_data_chunk_count()
337 << dendl;
338 }
339
340 if (op.before_progress.first) {
341 ceph_assert(op.attrset.count(string("_")));
342 m->t.setattrs(
343 coll,
344 tobj,
345 op.attrset);
346 }
347
348 if (op.after_progress.data_complete && !oneshot) {
349 dout(10) << __func__ << ": Removing oid "
350 << tobj.hobj << " from the temp collection" << dendl;
351 clear_temp_obj(tobj.hobj);
352 m->t.remove(coll, ghobject_t(
353 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
354 m->t.collection_move_rename(
355 coll, tobj,
356 coll, ghobject_t(
357 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
358 }
359 if (op.after_progress.data_complete) {
360 if ((get_parent()->pgb_is_primary())) {
361 ceph_assert(recovery_ops.count(op.soid));
362 ceph_assert(recovery_ops[op.soid].obc);
363 if (get_parent()->pg_is_repair())
364 get_parent()->inc_osd_stat_repaired();
365 get_parent()->on_local_recover(
366 op.soid,
367 op.recovery_info,
368 recovery_ops[op.soid].obc,
369 false,
370 &m->t);
371 } else {
372 // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired
373 if (is_repair)
374 get_parent()->inc_osd_stat_repaired();
375 get_parent()->on_local_recover(
376 op.soid,
377 op.recovery_info,
378 ObjectContextRef(),
379 false,
380 &m->t);
381 if (get_parent()->pg_is_remote_backfilling()) {
382 struct stat st;
383 int r = store->stat(ch, ghobject_t(op.soid, ghobject_t::NO_GEN,
384 get_parent()->whoami_shard().shard), &st);
385 if (r == 0) {
386 get_parent()->pg_sub_local_num_bytes(st.st_size);
387 // XXX: This can be way overestimated for small objects
388 get_parent()->pg_sub_num_bytes(st.st_size * get_ec_data_chunk_count());
389 dout(10) << __func__ << " " << op.soid
390 << " sub actual data by " << st.st_size
391 << " sub num_bytes by " << st.st_size * get_ec_data_chunk_count()
392 << dendl;
393 }
394 }
395 }
396 }
397 m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
398 m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
399 }
400
401 void ECBackend::handle_recovery_push_reply(
402 const PushReplyOp &op,
403 pg_shard_t from,
404 RecoveryMessages *m)
405 {
406 if (!recovery_ops.count(op.soid))
407 return;
408 RecoveryOp &rop = recovery_ops[op.soid];
409 ceph_assert(rop.waiting_on_pushes.count(from));
410 rop.waiting_on_pushes.erase(from);
411 continue_recovery_op(rop, m);
412 }
413
414 void ECBackend::handle_recovery_read_complete(
415 const hobject_t &hoid,
416 boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
417 std::optional<map<string, bufferlist> > attrs,
418 RecoveryMessages *m)
419 {
420 dout(10) << __func__ << ": returned " << hoid << " "
421 << "(" << to_read.get<0>()
422 << ", " << to_read.get<1>()
423 << ", " << to_read.get<2>()
424 << ")"
425 << dendl;
426 ceph_assert(recovery_ops.count(hoid));
427 RecoveryOp &op = recovery_ops[hoid];
428 ceph_assert(op.returned_data.empty());
429 map<int, bufferlist*> target;
430 for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
431 i != op.missing_on_shards.end();
432 ++i) {
433 target[*i] = &(op.returned_data[*i]);
434 }
435 map<int, bufferlist> from;
436 for(map<pg_shard_t, bufferlist>::iterator i = to_read.get<2>().begin();
437 i != to_read.get<2>().end();
438 ++i) {
439 from[i->first.shard].claim(i->second);
440 }
441 dout(10) << __func__ << ": " << from << dendl;
442 int r;
443 r = ECUtil::decode(sinfo, ec_impl, from, target);
444 ceph_assert(r == 0);
445 if (attrs) {
446 op.xattrs.swap(*attrs);
447
448 if (!op.obc) {
449 // attrs only reference the origin bufferlist (decode from
450 // ECSubReadReply message) whose size is much greater than attrs
451 // in recovery. If obc cache it (get_obc maybe cache the attr),
452 // this causes the whole origin bufferlist would not be free
453 // until obc is evicted from obc cache. So rebuild the
454 // bufferlist before cache it.
455 for (map<string, bufferlist>::iterator it = op.xattrs.begin();
456 it != op.xattrs.end();
457 ++it) {
458 it->second.rebuild();
459 }
460 // Need to remove ECUtil::get_hinfo_key() since it should not leak out
461 // of the backend (see bug #12983)
462 map<string, bufferlist> sanitized_attrs(op.xattrs);
463 sanitized_attrs.erase(ECUtil::get_hinfo_key());
464 op.obc = get_parent()->get_obc(hoid, sanitized_attrs);
465 ceph_assert(op.obc);
466 op.recovery_info.size = op.obc->obs.oi.size;
467 op.recovery_info.oi = op.obc->obs.oi;
468 }
469
470 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
471 if (op.obc->obs.oi.size > 0) {
472 ceph_assert(op.xattrs.count(ECUtil::get_hinfo_key()));
473 auto bp = op.xattrs[ECUtil::get_hinfo_key()].cbegin();
474 decode(hinfo, bp);
475 }
476 op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
477 }
478 ceph_assert(op.xattrs.size());
479 ceph_assert(op.obc);
480 continue_recovery_op(op, m);
481 }
482
483 struct SendPushReplies : public Context {
484 PGBackend::Listener *l;
485 epoch_t epoch;
486 map<int, MOSDPGPushReply*> replies;
487 SendPushReplies(
488 PGBackend::Listener *l,
489 epoch_t epoch,
490 map<int, MOSDPGPushReply*> &in) : l(l), epoch(epoch) {
491 replies.swap(in);
492 }
493 void finish(int) override {
494 std::vector<std::pair<int, Message*>> messages;
495 messages.reserve(replies.size());
496 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
497 i != replies.end();
498 ++i) {
499 messages.push_back(std::make_pair(i->first, i->second));
500 }
501 if (!messages.empty()) {
502 l->send_message_osd_cluster(messages, epoch);
503 }
504 replies.clear();
505 }
506 ~SendPushReplies() override {
507 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
508 i != replies.end();
509 ++i) {
510 i->second->put();
511 }
512 replies.clear();
513 }
514 };
515
516 void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
517 {
518 for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
519 i != m.pushes.end();
520 m.pushes.erase(i++)) {
521 MOSDPGPush *msg = new MOSDPGPush();
522 msg->set_priority(priority);
523 msg->map_epoch = get_osdmap_epoch();
524 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
525 msg->from = get_parent()->whoami_shard();
526 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
527 msg->pushes.swap(i->second);
528 msg->compute_cost(cct);
529 msg->is_repair = get_parent()->pg_is_repair();
530 get_parent()->send_message(
531 i->first.osd,
532 msg);
533 }
534 map<int, MOSDPGPushReply*> replies;
535 for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
536 m.push_replies.begin();
537 i != m.push_replies.end();
538 m.push_replies.erase(i++)) {
539 MOSDPGPushReply *msg = new MOSDPGPushReply();
540 msg->set_priority(priority);
541 msg->map_epoch = get_osdmap_epoch();
542 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
543 msg->from = get_parent()->whoami_shard();
544 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
545 msg->replies.swap(i->second);
546 msg->compute_cost(cct);
547 replies.insert(make_pair(i->first.osd, msg));
548 }
549
550 if (!replies.empty()) {
551 (m.t).register_on_complete(
552 get_parent()->bless_context(
553 new SendPushReplies(
554 get_parent(),
555 get_osdmap_epoch(),
556 replies)));
557 get_parent()->queue_transaction(std::move(m.t));
558 }
559
560 if (m.reads.empty())
561 return;
562 start_read_op(
563 priority,
564 m.want_to_read,
565 m.reads,
566 OpRequestRef(),
567 false, true);
568 }
569
570 void ECBackend::continue_recovery_op(
571 RecoveryOp &op,
572 RecoveryMessages *m)
573 {
574 dout(10) << __func__ << ": continuing " << op << dendl;
575 while (1) {
576 switch (op.state) {
577 case RecoveryOp::IDLE: {
578 // start read
579 op.state = RecoveryOp::READING;
580 ceph_assert(!op.recovery_progress.data_complete);
581 set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
582 uint64_t from = op.recovery_progress.data_recovered_to;
583 uint64_t amount = get_recovery_chunk_size();
584
585 if (op.recovery_progress.first && op.obc) {
586 /* We've got the attrs and the hinfo, might as well use them */
587 op.hinfo = get_hash_info(op.hoid);
588 ceph_assert(op.hinfo);
589 op.xattrs = op.obc->attr_cache;
590 encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
591 }
592
593 map<pg_shard_t, vector<pair<int, int>>> to_read;
594 int r = get_min_avail_to_read_shards(
595 op.hoid, want, true, false, &to_read);
596 if (r != 0) {
597 // we must have lost a recovery source
598 ceph_assert(!op.recovery_progress.first);
599 dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
600 << dendl;
601 get_parent()->cancel_pull(op.hoid);
602 recovery_ops.erase(op.hoid);
603 return;
604 }
605 m->read(
606 this,
607 op.hoid,
608 op.recovery_progress.data_recovered_to,
609 amount,
610 std::move(want),
611 to_read,
612 op.recovery_progress.first && !op.obc);
613 op.extent_requested = make_pair(
614 from,
615 amount);
616 dout(10) << __func__ << ": IDLE return " << op << dendl;
617 return;
618 }
619 case RecoveryOp::READING: {
620 // read completed, start write
621 ceph_assert(op.xattrs.size());
622 ceph_assert(op.returned_data.size());
623 op.state = RecoveryOp::WRITING;
624 ObjectRecoveryProgress after_progress = op.recovery_progress;
625 after_progress.data_recovered_to += op.extent_requested.second;
626 after_progress.first = false;
627 if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
628 after_progress.data_recovered_to =
629 sinfo.logical_to_next_stripe_offset(
630 op.obc->obs.oi.size);
631 after_progress.data_complete = true;
632 }
633 for (set<pg_shard_t>::iterator mi = op.missing_on.begin();
634 mi != op.missing_on.end();
635 ++mi) {
636 ceph_assert(op.returned_data.count(mi->shard));
637 m->pushes[*mi].push_back(PushOp());
638 PushOp &pop = m->pushes[*mi].back();
639 pop.soid = op.hoid;
640 pop.version = op.v;
641 pop.data = op.returned_data[mi->shard];
642 dout(10) << __func__ << ": before_progress=" << op.recovery_progress
643 << ", after_progress=" << after_progress
644 << ", pop.data.length()=" << pop.data.length()
645 << ", size=" << op.obc->obs.oi.size << dendl;
646 ceph_assert(
647 pop.data.length() ==
648 sinfo.aligned_logical_offset_to_chunk_offset(
649 after_progress.data_recovered_to -
650 op.recovery_progress.data_recovered_to)
651 );
652 if (pop.data.length())
653 pop.data_included.insert(
654 sinfo.aligned_logical_offset_to_chunk_offset(
655 op.recovery_progress.data_recovered_to),
656 pop.data.length()
657 );
658 if (op.recovery_progress.first) {
659 pop.attrset = op.xattrs;
660 }
661 pop.recovery_info = op.recovery_info;
662 pop.before_progress = op.recovery_progress;
663 pop.after_progress = after_progress;
664 if (*mi != get_parent()->primary_shard())
665 get_parent()->begin_peer_recover(
666 *mi,
667 op.hoid);
668 }
669 op.returned_data.clear();
670 op.waiting_on_pushes = op.missing_on;
671 op.recovery_progress = after_progress;
672 dout(10) << __func__ << ": READING return " << op << dendl;
673 return;
674 }
675 case RecoveryOp::WRITING: {
676 if (op.waiting_on_pushes.empty()) {
677 if (op.recovery_progress.data_complete) {
678 op.state = RecoveryOp::COMPLETE;
679 for (set<pg_shard_t>::iterator i = op.missing_on.begin();
680 i != op.missing_on.end();
681 ++i) {
682 if (*i != get_parent()->primary_shard()) {
683 dout(10) << __func__ << ": on_peer_recover on " << *i
684 << ", obj " << op.hoid << dendl;
685 get_parent()->on_peer_recover(
686 *i,
687 op.hoid,
688 op.recovery_info);
689 }
690 }
691 object_stat_sum_t stat;
692 stat.num_bytes_recovered = op.recovery_info.size;
693 stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
694 stat.num_objects_recovered = 1;
695 if (get_parent()->pg_is_repair())
696 stat.num_objects_repaired = 1;
697 get_parent()->on_global_recover(op.hoid, stat, false);
698 dout(10) << __func__ << ": WRITING return " << op << dendl;
699 recovery_ops.erase(op.hoid);
700 return;
701 } else {
702 op.state = RecoveryOp::IDLE;
703 dout(10) << __func__ << ": WRITING continue " << op << dendl;
704 continue;
705 }
706 }
707 return;
708 }
709 // should never be called once complete
710 case RecoveryOp::COMPLETE:
711 default: {
712 ceph_abort();
713 };
714 }
715 }
716 }
717
718 void ECBackend::run_recovery_op(
719 RecoveryHandle *_h,
720 int priority)
721 {
722 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
723 RecoveryMessages m;
724 for (list<RecoveryOp>::iterator i = h->ops.begin();
725 i != h->ops.end();
726 ++i) {
727 dout(10) << __func__ << ": starting " << *i << dendl;
728 ceph_assert(!recovery_ops.count(i->hoid));
729 RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
730 continue_recovery_op(op, &m);
731 }
732
733 dispatch_recovery_messages(m, priority);
734 send_recovery_deletes(priority, h->deletes);
735 delete _h;
736 }
737
738 int ECBackend::recover_object(
739 const hobject_t &hoid,
740 eversion_t v,
741 ObjectContextRef head,
742 ObjectContextRef obc,
743 RecoveryHandle *_h)
744 {
745 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
746 h->ops.push_back(RecoveryOp());
747 h->ops.back().v = v;
748 h->ops.back().hoid = hoid;
749 h->ops.back().obc = obc;
750 h->ops.back().recovery_info.soid = hoid;
751 h->ops.back().recovery_info.version = v;
752 if (obc) {
753 h->ops.back().recovery_info.size = obc->obs.oi.size;
754 h->ops.back().recovery_info.oi = obc->obs.oi;
755 }
756 if (hoid.is_snap()) {
757 if (obc) {
758 ceph_assert(obc->ssc);
759 h->ops.back().recovery_info.ss = obc->ssc->snapset;
760 } else if (head) {
761 ceph_assert(head->ssc);
762 h->ops.back().recovery_info.ss = head->ssc->snapset;
763 } else {
764 ceph_abort_msg("neither obc nor head set for a snap object");
765 }
766 }
767 h->ops.back().recovery_progress.omap_complete = true;
768 for (set<pg_shard_t>::const_iterator i =
769 get_parent()->get_acting_recovery_backfill_shards().begin();
770 i != get_parent()->get_acting_recovery_backfill_shards().end();
771 ++i) {
772 dout(10) << "checking " << *i << dendl;
773 if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
774 h->ops.back().missing_on.insert(*i);
775 h->ops.back().missing_on_shards.insert(i->shard);
776 }
777 }
778 dout(10) << __func__ << ": built op " << h->ops.back() << dendl;
779 return 0;
780 }
781
782 bool ECBackend::can_handle_while_inactive(
783 OpRequestRef _op)
784 {
785 return false;
786 }
787
788 bool ECBackend::_handle_message(
789 OpRequestRef _op)
790 {
791 dout(10) << __func__ << ": " << *_op->get_req() << dendl;
792 int priority = _op->get_req()->get_priority();
793 switch (_op->get_req()->get_type()) {
794 case MSG_OSD_EC_WRITE: {
795 // NOTE: this is non-const because handle_sub_write modifies the embedded
796 // ObjectStore::Transaction in place (and then std::move's it). It does
797 // not conflict with ECSubWrite's operator<<.
798 MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
799 _op->get_nonconst_req());
800 parent->maybe_preempt_replica_scrub(op->op.soid);
801 handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
802 return true;
803 }
804 case MSG_OSD_EC_WRITE_REPLY: {
805 const MOSDECSubOpWriteReply *op = static_cast<const MOSDECSubOpWriteReply*>(
806 _op->get_req());
807 handle_sub_write_reply(op->op.from, op->op, _op->pg_trace);
808 return true;
809 }
810 case MSG_OSD_EC_READ: {
811 auto op = _op->get_req<MOSDECSubOpRead>();
812 MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
813 reply->pgid = get_parent()->primary_spg_t();
814 reply->map_epoch = get_osdmap_epoch();
815 reply->min_epoch = get_parent()->get_interval_start_epoch();
816 handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace);
817 reply->trace = _op->pg_trace;
818 get_parent()->send_message_osd_cluster(
819 reply, _op->get_req()->get_connection());
820 return true;
821 }
822 case MSG_OSD_EC_READ_REPLY: {
823 // NOTE: this is non-const because handle_sub_read_reply steals resulting
824 // buffers. It does not conflict with ECSubReadReply operator<<.
825 MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
826 _op->get_nonconst_req());
827 RecoveryMessages rm;
828 handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
829 dispatch_recovery_messages(rm, priority);
830 return true;
831 }
832 case MSG_OSD_PG_PUSH: {
833 auto op = _op->get_req<MOSDPGPush>();
834 RecoveryMessages rm;
835 for (vector<PushOp>::const_iterator i = op->pushes.begin();
836 i != op->pushes.end();
837 ++i) {
838 handle_recovery_push(*i, &rm, op->is_repair);
839 }
840 dispatch_recovery_messages(rm, priority);
841 return true;
842 }
843 case MSG_OSD_PG_PUSH_REPLY: {
844 const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
845 _op->get_req());
846 RecoveryMessages rm;
847 for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
848 i != op->replies.end();
849 ++i) {
850 handle_recovery_push_reply(*i, op->from, &rm);
851 }
852 dispatch_recovery_messages(rm, priority);
853 return true;
854 }
855 default:
856 return false;
857 }
858 return false;
859 }
860
861 struct SubWriteCommitted : public Context {
862 ECBackend *pg;
863 OpRequestRef msg;
864 ceph_tid_t tid;
865 eversion_t version;
866 eversion_t last_complete;
867 const ZTracer::Trace trace;
868 SubWriteCommitted(
869 ECBackend *pg,
870 OpRequestRef msg,
871 ceph_tid_t tid,
872 eversion_t version,
873 eversion_t last_complete,
874 const ZTracer::Trace &trace)
875 : pg(pg), msg(msg), tid(tid),
876 version(version), last_complete(last_complete), trace(trace) {}
877 void finish(int) override {
878 if (msg)
879 msg->mark_event("sub_op_committed");
880 pg->sub_write_committed(tid, version, last_complete, trace);
881 }
882 };
883 void ECBackend::sub_write_committed(
884 ceph_tid_t tid, eversion_t version, eversion_t last_complete,
885 const ZTracer::Trace &trace) {
886 if (get_parent()->pgb_is_primary()) {
887 ECSubWriteReply reply;
888 reply.tid = tid;
889 reply.last_complete = last_complete;
890 reply.committed = true;
891 reply.applied = true;
892 reply.from = get_parent()->whoami_shard();
893 handle_sub_write_reply(
894 get_parent()->whoami_shard(),
895 reply, trace);
896 } else {
897 get_parent()->update_last_complete_ondisk(last_complete);
898 MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
899 r->pgid = get_parent()->primary_spg_t();
900 r->map_epoch = get_osdmap_epoch();
901 r->min_epoch = get_parent()->get_interval_start_epoch();
902 r->op.tid = tid;
903 r->op.last_complete = last_complete;
904 r->op.committed = true;
905 r->op.applied = true;
906 r->op.from = get_parent()->whoami_shard();
907 r->set_priority(CEPH_MSG_PRIO_HIGH);
908 r->trace = trace;
909 r->trace.event("sending sub op commit");
910 get_parent()->send_message_osd_cluster(
911 get_parent()->primary_shard().osd, r, get_osdmap_epoch());
912 }
913 }
914
915 void ECBackend::handle_sub_write(
916 pg_shard_t from,
917 OpRequestRef msg,
918 ECSubWrite &op,
919 const ZTracer::Trace &trace)
920 {
921 if (msg)
922 msg->mark_event("sub_op_started");
923 trace.event("handle_sub_write");
924 if (!get_parent()->pgb_is_primary())
925 get_parent()->update_stats(op.stats);
926 ObjectStore::Transaction localt;
927 if (!op.temp_added.empty()) {
928 add_temp_objs(op.temp_added);
929 }
930 if (op.backfill_or_async_recovery) {
931 for (set<hobject_t>::iterator i = op.temp_removed.begin();
932 i != op.temp_removed.end();
933 ++i) {
934 dout(10) << __func__ << ": removing object " << *i
935 << " since we won't get the transaction" << dendl;
936 localt.remove(
937 coll,
938 ghobject_t(
939 *i,
940 ghobject_t::NO_GEN,
941 get_parent()->whoami_shard().shard));
942 }
943 }
944 clear_temp_objs(op.temp_removed);
945 dout(30) << __func__ << " missing before " << get_parent()->get_log().get_missing().get_items() << dendl;
946 // flag set to true during async recovery
947 bool async = false;
948 pg_missing_tracker_t pmissing = get_parent()->get_local_missing();
949 if (pmissing.is_missing(op.soid)) {
950 async = true;
951 dout(30) << __func__ << " is_missing " << pmissing.is_missing(op.soid) << dendl;
952 for (auto &&e: op.log_entries) {
953 dout(30) << " add_next_event entry " << e << dendl;
954 get_parent()->add_local_next_event(e);
955 dout(30) << " entry is_delete " << e.is_delete() << dendl;
956 }
957 }
958 get_parent()->log_operation(
959 op.log_entries,
960 op.updated_hit_set_history,
961 op.trim_to,
962 op.roll_forward_to,
963 op.roll_forward_to,
964 !op.backfill_or_async_recovery,
965 localt,
966 async);
967
968 if (!get_parent()->pg_is_undersized() &&
969 (unsigned)get_parent()->whoami_shard().shard >=
970 ec_impl->get_data_chunk_count())
971 op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
972
973 localt.register_on_commit(
974 get_parent()->bless_context(
975 new SubWriteCommitted(
976 this, msg, op.tid,
977 op.at_version,
978 get_parent()->get_info().last_complete, trace)));
979 vector<ObjectStore::Transaction> tls;
980 tls.reserve(2);
981 tls.push_back(std::move(op.t));
982 tls.push_back(std::move(localt));
983 get_parent()->queue_transactions(tls, msg);
984 dout(30) << __func__ << " missing after" << get_parent()->get_log().get_missing().get_items() << dendl;
985 if (op.at_version != eversion_t()) {
986 // dummy rollforward transaction doesn't get at_version (and doesn't advance it)
987 get_parent()->op_applied(op.at_version);
988 }
989 }
990
991 void ECBackend::handle_sub_read(
992 pg_shard_t from,
993 const ECSubRead &op,
994 ECSubReadReply *reply,
995 const ZTracer::Trace &trace)
996 {
997 trace.event("handle sub read");
998 shard_id_t shard = get_parent()->whoami_shard().shard;
999 for(auto i = op.to_read.begin();
1000 i != op.to_read.end();
1001 ++i) {
1002 int r = 0;
1003 for (auto j = i->second.begin(); j != i->second.end(); ++j) {
1004 bufferlist bl;
1005 if ((op.subchunks.find(i->first)->second.size() == 1) &&
1006 (op.subchunks.find(i->first)->second.front().second ==
1007 ec_impl->get_sub_chunk_count())) {
1008 dout(25) << __func__ << " case1: reading the complete chunk/shard." << dendl;
1009 r = store->read(
1010 ch,
1011 ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1012 j->get<0>(),
1013 j->get<1>(),
1014 bl, j->get<2>()); // Allow EIO return
1015 } else {
1016 dout(25) << __func__ << " case2: going to do fragmented read." << dendl;
1017 int subchunk_size =
1018 sinfo.get_chunk_size() / ec_impl->get_sub_chunk_count();
1019 bool error = false;
1020 for (int m = 0; m < (int)j->get<1>() && !error;
1021 m += sinfo.get_chunk_size()) {
1022 for (auto &&k:op.subchunks.find(i->first)->second) {
1023 bufferlist bl0;
1024 r = store->read(
1025 ch,
1026 ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1027 j->get<0>() + m + (k.first)*subchunk_size,
1028 (k.second)*subchunk_size,
1029 bl0, j->get<2>());
1030 if (r < 0) {
1031 error = true;
1032 break;
1033 }
1034 bl.claim_append(bl0);
1035 }
1036 }
1037 }
1038
1039 if (r < 0) {
1040 // if we are doing fast reads, it's possible for one of the shard
1041 // reads to cross paths with another update and get a (harmless)
1042 // ENOENT. Suppress the message to the cluster log in that case.
1043 if (r == -ENOENT && get_parent()->get_pool().fast_read) {
1044 dout(5) << __func__ << ": Error " << r
1045 << " reading " << i->first << ", fast read, probably ok"
1046 << dendl;
1047 } else {
1048 get_parent()->clog_error() << "Error " << r
1049 << " reading object "
1050 << i->first;
1051 dout(5) << __func__ << ": Error " << r
1052 << " reading " << i->first << dendl;
1053 }
1054 goto error;
1055 } else {
1056 dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
1057 reply->buffers_read[i->first].push_back(
1058 make_pair(
1059 j->get<0>(),
1060 bl)
1061 );
1062 }
1063
1064 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1065 // This shows that we still need deep scrub because large enough files
1066 // are read in sections, so the digest check here won't be done here.
1067 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1068 // the state of our chunk in case other chunks could substitute.
1069 ECUtil::HashInfoRef hinfo;
1070 hinfo = get_hash_info(i->first);
1071 if (!hinfo) {
1072 r = -EIO;
1073 get_parent()->clog_error() << "Corruption detected: object "
1074 << i->first
1075 << " is missing hash_info";
1076 dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
1077 goto error;
1078 }
1079 ceph_assert(hinfo->has_chunk_hash());
1080 if ((bl.length() == hinfo->get_total_chunk_size()) &&
1081 (j->get<0>() == 0)) {
1082 dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
1083 bufferhash h(-1);
1084 h << bl;
1085 if (h.digest() != hinfo->get_chunk_hash(shard)) {
1086 get_parent()->clog_error() << "Bad hash for " << i->first << " digest 0x"
1087 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec;
1088 dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
1089 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
1090 r = -EIO;
1091 goto error;
1092 }
1093 }
1094 }
1095 }
1096 continue;
1097 error:
1098 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1099 // the state of our chunk in case other chunks could substitute.
1100 reply->buffers_read.erase(i->first);
1101 reply->errors[i->first] = r;
1102 }
1103 for (set<hobject_t>::iterator i = op.attrs_to_read.begin();
1104 i != op.attrs_to_read.end();
1105 ++i) {
1106 dout(10) << __func__ << ": fulfilling attr request on "
1107 << *i << dendl;
1108 if (reply->errors.count(*i))
1109 continue;
1110 int r = store->getattrs(
1111 ch,
1112 ghobject_t(
1113 *i, ghobject_t::NO_GEN, shard),
1114 reply->attrs_read[*i]);
1115 if (r < 0) {
1116 // If we read error, we should not return the attrs too.
1117 reply->attrs_read.erase(*i);
1118 reply->buffers_read.erase(*i);
1119 reply->errors[*i] = r;
1120 }
1121 }
1122 reply->from = get_parent()->whoami_shard();
1123 reply->tid = op.tid;
1124 }
1125
1126 void ECBackend::handle_sub_write_reply(
1127 pg_shard_t from,
1128 const ECSubWriteReply &op,
1129 const ZTracer::Trace &trace)
1130 {
1131 map<ceph_tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
1132 ceph_assert(i != tid_to_op_map.end());
1133 if (op.committed) {
1134 trace.event("sub write committed");
1135 ceph_assert(i->second.pending_commit.count(from));
1136 i->second.pending_commit.erase(from);
1137 if (from != get_parent()->whoami_shard()) {
1138 get_parent()->update_peer_last_complete_ondisk(from, op.last_complete);
1139 }
1140 }
1141 if (op.applied) {
1142 trace.event("sub write applied");
1143 ceph_assert(i->second.pending_apply.count(from));
1144 i->second.pending_apply.erase(from);
1145 }
1146
1147 if (i->second.pending_commit.empty() &&
1148 i->second.on_all_commit &&
1149 // also wait for apply, to preserve ordering with luminous peers.
1150 i->second.pending_apply.empty()) {
1151 dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
1152 i->second.on_all_commit->complete(0);
1153 i->second.on_all_commit = 0;
1154 i->second.trace.event("ec write all committed");
1155 }
1156 check_ops();
1157 }
1158
1159 void ECBackend::handle_sub_read_reply(
1160 pg_shard_t from,
1161 ECSubReadReply &op,
1162 RecoveryMessages *m,
1163 const ZTracer::Trace &trace)
1164 {
1165 trace.event("ec sub read reply");
1166 dout(10) << __func__ << ": reply " << op << dendl;
1167 map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
1168 if (iter == tid_to_read_map.end()) {
1169 //canceled
1170 dout(20) << __func__ << ": dropped " << op << dendl;
1171 return;
1172 }
1173 ReadOp &rop = iter->second;
1174 for (auto i = op.buffers_read.begin();
1175 i != op.buffers_read.end();
1176 ++i) {
1177 ceph_assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer
1178 if (!rop.to_read.count(i->first)) {
1179 // We canceled this read! @see filter_read_op
1180 dout(20) << __func__ << " to_read skipping" << dendl;
1181 continue;
1182 }
1183 list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter =
1184 rop.to_read.find(i->first)->second.to_read.begin();
1185 list<
1186 boost::tuple<
1187 uint64_t, uint64_t, map<pg_shard_t, bufferlist> > >::iterator riter =
1188 rop.complete[i->first].returned.begin();
1189 for (list<pair<uint64_t, bufferlist> >::iterator j = i->second.begin();
1190 j != i->second.end();
1191 ++j, ++req_iter, ++riter) {
1192 ceph_assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
1193 ceph_assert(riter != rop.complete[i->first].returned.end());
1194 pair<uint64_t, uint64_t> adjusted =
1195 sinfo.aligned_offset_len_to_chunk(
1196 make_pair(req_iter->get<0>(), req_iter->get<1>()));
1197 ceph_assert(adjusted.first == j->first);
1198 riter->get<2>()[from].claim(j->second);
1199 }
1200 }
1201 for (auto i = op.attrs_read.begin();
1202 i != op.attrs_read.end();
1203 ++i) {
1204 ceph_assert(!op.errors.count(i->first)); // if read error better not have sent an attribute
1205 if (!rop.to_read.count(i->first)) {
1206 // We canceled this read! @see filter_read_op
1207 dout(20) << __func__ << " to_read skipping" << dendl;
1208 continue;
1209 }
1210 rop.complete[i->first].attrs = map<string, bufferlist>();
1211 (*(rop.complete[i->first].attrs)).swap(i->second);
1212 }
1213 for (auto i = op.errors.begin();
1214 i != op.errors.end();
1215 ++i) {
1216 rop.complete[i->first].errors.insert(
1217 make_pair(
1218 from,
1219 i->second));
1220 dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl;
1221 }
1222
1223 map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
1224 shard_to_read_map.find(from);
1225 ceph_assert(siter != shard_to_read_map.end());
1226 ceph_assert(siter->second.count(op.tid));
1227 siter->second.erase(op.tid);
1228
1229 ceph_assert(rop.in_progress.count(from));
1230 rop.in_progress.erase(from);
1231 unsigned is_complete = 0;
1232 // For redundant reads check for completion as each shard comes in,
1233 // or in a non-recovery read check for completion once all the shards read.
1234 if (rop.do_redundant_reads || rop.in_progress.empty()) {
1235 for (map<hobject_t, read_result_t>::const_iterator iter =
1236 rop.complete.begin();
1237 iter != rop.complete.end();
1238 ++iter) {
1239 set<int> have;
1240 for (map<pg_shard_t, bufferlist>::const_iterator j =
1241 iter->second.returned.front().get<2>().begin();
1242 j != iter->second.returned.front().get<2>().end();
1243 ++j) {
1244 have.insert(j->first.shard);
1245 dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
1246 }
1247 map<int, vector<pair<int, int>>> dummy_minimum;
1248 int err;
1249 if ((err = ec_impl->minimum_to_decode(rop.want_to_read[iter->first], have, &dummy_minimum)) < 0) {
1250 dout(20) << __func__ << " minimum_to_decode failed" << dendl;
1251 if (rop.in_progress.empty()) {
1252 // If we don't have enough copies, try other pg_shard_ts if available.
1253 // During recovery there may be multiple osds with copies of the same shard,
1254 // so getting EIO from one may result in multiple passes through this code path.
1255 if (!rop.do_redundant_reads) {
1256 int r = send_all_remaining_reads(iter->first, rop);
1257 if (r == 0) {
1258 // We added to in_progress and not incrementing is_complete
1259 continue;
1260 }
1261 // Couldn't read any additional shards so handle as completed with errors
1262 }
1263 // We don't want to confuse clients / RBD with objectstore error
1264 // values in particular ENOENT. We may have different error returns
1265 // from different shards, so we'll return minimum_to_decode() error
1266 // (usually EIO) to reader. It is likely an error here is due to a
1267 // damaged pg.
1268 rop.complete[iter->first].r = err;
1269 ++is_complete;
1270 }
1271 } else {
1272 ceph_assert(rop.complete[iter->first].r == 0);
1273 if (!rop.complete[iter->first].errors.empty()) {
1274 if (cct->_conf->osd_read_ec_check_for_errors) {
1275 dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl;
1276 err = rop.complete[iter->first].errors.begin()->second;
1277 rop.complete[iter->first].r = err;
1278 } else {
1279 get_parent()->clog_warn() << "Error(s) ignored for "
1280 << iter->first << " enough copies available";
1281 dout(10) << __func__ << " Error(s) ignored for " << iter->first
1282 << " enough copies available" << dendl;
1283 rop.complete[iter->first].errors.clear();
1284 }
1285 }
1286 ++is_complete;
1287 }
1288 }
1289 }
1290 if (rop.in_progress.empty() || is_complete == rop.complete.size()) {
1291 dout(20) << __func__ << " Complete: " << rop << dendl;
1292 rop.trace.event("ec read complete");
1293 complete_read_op(rop, m);
1294 } else {
1295 dout(10) << __func__ << " readop not complete: " << rop << dendl;
1296 }
1297 }
1298
1299 void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
1300 {
1301 map<hobject_t, read_request_t>::iterator reqiter =
1302 rop.to_read.begin();
1303 map<hobject_t, read_result_t>::iterator resiter =
1304 rop.complete.begin();
1305 ceph_assert(rop.to_read.size() == rop.complete.size());
1306 for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
1307 if (reqiter->second.cb) {
1308 pair<RecoveryMessages *, read_result_t &> arg(
1309 m, resiter->second);
1310 reqiter->second.cb->complete(arg);
1311 reqiter->second.cb = nullptr;
1312 }
1313 }
1314 // if the read op is over. clean all the data of this tid.
1315 for (set<pg_shard_t>::iterator iter = rop.in_progress.begin();
1316 iter != rop.in_progress.end();
1317 iter++) {
1318 shard_to_read_map[*iter].erase(rop.tid);
1319 }
1320 rop.in_progress.clear();
1321 tid_to_read_map.erase(rop.tid);
1322 }
1323
1324 struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
1325 ECBackend *ec;
1326 ceph_tid_t tid;
1327 FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
1328 void finish(ThreadPool::TPHandle &handle) override {
1329 auto ropiter = ec->tid_to_read_map.find(tid);
1330 ceph_assert(ropiter != ec->tid_to_read_map.end());
1331 int priority = ropiter->second.priority;
1332 RecoveryMessages rm;
1333 ec->complete_read_op(ropiter->second, &rm);
1334 ec->dispatch_recovery_messages(rm, priority);
1335 }
1336 };
1337
1338 void ECBackend::filter_read_op(
1339 const OSDMapRef& osdmap,
1340 ReadOp &op)
1341 {
1342 set<hobject_t> to_cancel;
1343 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1344 i != op.source_to_obj.end();
1345 ++i) {
1346 if (osdmap->is_down(i->first.osd)) {
1347 to_cancel.insert(i->second.begin(), i->second.end());
1348 op.in_progress.erase(i->first);
1349 continue;
1350 }
1351 }
1352
1353 if (to_cancel.empty())
1354 return;
1355
1356 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1357 i != op.source_to_obj.end();
1358 ) {
1359 for (set<hobject_t>::iterator j = i->second.begin();
1360 j != i->second.end();
1361 ) {
1362 if (to_cancel.count(*j))
1363 i->second.erase(j++);
1364 else
1365 ++j;
1366 }
1367 if (i->second.empty()) {
1368 op.source_to_obj.erase(i++);
1369 } else {
1370 ceph_assert(!osdmap->is_down(i->first.osd));
1371 ++i;
1372 }
1373 }
1374
1375 for (set<hobject_t>::iterator i = to_cancel.begin();
1376 i != to_cancel.end();
1377 ++i) {
1378 get_parent()->cancel_pull(*i);
1379
1380 ceph_assert(op.to_read.count(*i));
1381 read_request_t &req = op.to_read.find(*i)->second;
1382 dout(10) << __func__ << ": canceling " << req
1383 << " for obj " << *i << dendl;
1384 ceph_assert(req.cb);
1385 delete req.cb;
1386 req.cb = nullptr;
1387
1388 op.to_read.erase(*i);
1389 op.complete.erase(*i);
1390 recovery_ops.erase(*i);
1391 }
1392
1393 if (op.in_progress.empty()) {
1394 get_parent()->schedule_recovery_work(
1395 get_parent()->bless_unlocked_gencontext(
1396 new FinishReadOp(this, op.tid)));
1397 }
1398 }
1399
1400 void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
1401 {
1402 set<ceph_tid_t> tids_to_filter;
1403 for (map<pg_shard_t, set<ceph_tid_t> >::iterator
1404 i = shard_to_read_map.begin();
1405 i != shard_to_read_map.end();
1406 ) {
1407 if (osdmap->is_down(i->first.osd)) {
1408 tids_to_filter.insert(i->second.begin(), i->second.end());
1409 shard_to_read_map.erase(i++);
1410 } else {
1411 ++i;
1412 }
1413 }
1414 for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
1415 i != tids_to_filter.end();
1416 ++i) {
1417 map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
1418 ceph_assert(j != tid_to_read_map.end());
1419 filter_read_op(osdmap, j->second);
1420 }
1421 }
1422
1423 void ECBackend::on_change()
1424 {
1425 dout(10) << __func__ << dendl;
1426
1427 completed_to = eversion_t();
1428 committed_to = eversion_t();
1429 pipeline_state.clear();
1430 waiting_reads.clear();
1431 waiting_state.clear();
1432 waiting_commit.clear();
1433 for (auto &&op: tid_to_op_map) {
1434 cache.release_write_pin(op.second.pin);
1435 }
1436 tid_to_op_map.clear();
1437
1438 for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
1439 i != tid_to_read_map.end();
1440 ++i) {
1441 dout(10) << __func__ << ": cancelling " << i->second << dendl;
1442 for (map<hobject_t, read_request_t>::iterator j =
1443 i->second.to_read.begin();
1444 j != i->second.to_read.end();
1445 ++j) {
1446 delete j->second.cb;
1447 j->second.cb = nullptr;
1448 }
1449 }
1450 tid_to_read_map.clear();
1451 in_progress_client_reads.clear();
1452 shard_to_read_map.clear();
1453 clear_recovery_state();
1454 }
1455
1456 void ECBackend::clear_recovery_state()
1457 {
1458 recovery_ops.clear();
1459 }
1460
1461 void ECBackend::dump_recovery_info(Formatter *f) const
1462 {
1463 f->open_array_section("recovery_ops");
1464 for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
1465 i != recovery_ops.end();
1466 ++i) {
1467 f->open_object_section("op");
1468 i->second.dump(f);
1469 f->close_section();
1470 }
1471 f->close_section();
1472 f->open_array_section("read_ops");
1473 for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
1474 i != tid_to_read_map.end();
1475 ++i) {
1476 f->open_object_section("read_op");
1477 i->second.dump(f);
1478 f->close_section();
1479 }
1480 f->close_section();
1481 }
1482
1483 void ECBackend::submit_transaction(
1484 const hobject_t &hoid,
1485 const object_stat_sum_t &delta_stats,
1486 const eversion_t &at_version,
1487 PGTransactionUPtr &&t,
1488 const eversion_t &trim_to,
1489 const eversion_t &min_last_complete_ondisk,
1490 const vector<pg_log_entry_t> &log_entries,
1491 std::optional<pg_hit_set_history_t> &hset_history,
1492 Context *on_all_commit,
1493 ceph_tid_t tid,
1494 osd_reqid_t reqid,
1495 OpRequestRef client_op
1496 )
1497 {
1498 ceph_assert(!tid_to_op_map.count(tid));
1499 Op *op = &(tid_to_op_map[tid]);
1500 op->hoid = hoid;
1501 op->delta_stats = delta_stats;
1502 op->version = at_version;
1503 op->trim_to = trim_to;
1504 op->roll_forward_to = std::max(min_last_complete_ondisk, committed_to);
1505 op->log_entries = log_entries;
1506 std::swap(op->updated_hit_set_history, hset_history);
1507 op->on_all_commit = on_all_commit;
1508 op->tid = tid;
1509 op->reqid = reqid;
1510 op->client_op = client_op;
1511 if (client_op)
1512 op->trace = client_op->pg_trace;
1513
1514 dout(10) << __func__ << ": op " << *op << " starting" << dendl;
1515 start_rmw(op, std::move(t));
1516 }
1517
1518 void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
1519 if (!waiting_state.empty()) {
1520 waiting_state.back().on_write.emplace_back(std::move(cb));
1521 } else if (!waiting_reads.empty()) {
1522 waiting_reads.back().on_write.emplace_back(std::move(cb));
1523 } else {
1524 // Nothing earlier in the pipeline, just call it
1525 cb();
1526 }
1527 }
1528
1529 void ECBackend::get_all_avail_shards(
1530 const hobject_t &hoid,
1531 const set<pg_shard_t> &error_shards,
1532 set<int> &have,
1533 map<shard_id_t, pg_shard_t> &shards,
1534 bool for_recovery)
1535 {
1536 for (set<pg_shard_t>::const_iterator i =
1537 get_parent()->get_acting_shards().begin();
1538 i != get_parent()->get_acting_shards().end();
1539 ++i) {
1540 dout(10) << __func__ << ": checking acting " << *i << dendl;
1541 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1542 if (error_shards.find(*i) != error_shards.end())
1543 continue;
1544 if (!missing.is_missing(hoid)) {
1545 ceph_assert(!have.count(i->shard));
1546 have.insert(i->shard);
1547 ceph_assert(!shards.count(i->shard));
1548 shards.insert(make_pair(i->shard, *i));
1549 }
1550 }
1551
1552 if (for_recovery) {
1553 for (set<pg_shard_t>::const_iterator i =
1554 get_parent()->get_backfill_shards().begin();
1555 i != get_parent()->get_backfill_shards().end();
1556 ++i) {
1557 if (error_shards.find(*i) != error_shards.end())
1558 continue;
1559 if (have.count(i->shard)) {
1560 ceph_assert(shards.count(i->shard));
1561 continue;
1562 }
1563 dout(10) << __func__ << ": checking backfill " << *i << dendl;
1564 ceph_assert(!shards.count(i->shard));
1565 const pg_info_t &info = get_parent()->get_shard_info(*i);
1566 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1567 if (hoid < info.last_backfill &&
1568 !missing.is_missing(hoid)) {
1569 have.insert(i->shard);
1570 shards.insert(make_pair(i->shard, *i));
1571 }
1572 }
1573
1574 map<hobject_t, set<pg_shard_t>>::const_iterator miter =
1575 get_parent()->get_missing_loc_shards().find(hoid);
1576 if (miter != get_parent()->get_missing_loc_shards().end()) {
1577 for (set<pg_shard_t>::iterator i = miter->second.begin();
1578 i != miter->second.end();
1579 ++i) {
1580 dout(10) << __func__ << ": checking missing_loc " << *i << dendl;
1581 auto m = get_parent()->maybe_get_shard_missing(*i);
1582 if (m) {
1583 ceph_assert(!(*m).is_missing(hoid));
1584 }
1585 if (error_shards.find(*i) != error_shards.end())
1586 continue;
1587 have.insert(i->shard);
1588 shards.insert(make_pair(i->shard, *i));
1589 }
1590 }
1591 }
1592 }
1593
1594 int ECBackend::get_min_avail_to_read_shards(
1595 const hobject_t &hoid,
1596 const set<int> &want,
1597 bool for_recovery,
1598 bool do_redundant_reads,
1599 map<pg_shard_t, vector<pair<int, int>>> *to_read)
1600 {
1601 // Make sure we don't do redundant reads for recovery
1602 ceph_assert(!for_recovery || !do_redundant_reads);
1603
1604 set<int> have;
1605 map<shard_id_t, pg_shard_t> shards;
1606 set<pg_shard_t> error_shards;
1607
1608 get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
1609
1610 map<int, vector<pair<int, int>>> need;
1611 int r = ec_impl->minimum_to_decode(want, have, &need);
1612 if (r < 0)
1613 return r;
1614
1615 if (do_redundant_reads) {
1616 vector<pair<int, int>> subchunks_list;
1617 subchunks_list.push_back(make_pair(0, ec_impl->get_sub_chunk_count()));
1618 for (auto &&i: have) {
1619 need[i] = subchunks_list;
1620 }
1621 }
1622
1623 if (!to_read)
1624 return 0;
1625
1626 for (auto &&i:need) {
1627 ceph_assert(shards.count(shard_id_t(i.first)));
1628 to_read->insert(make_pair(shards[shard_id_t(i.first)], i.second));
1629 }
1630 return 0;
1631 }
1632
1633 int ECBackend::get_remaining_shards(
1634 const hobject_t &hoid,
1635 const set<int> &avail,
1636 const set<int> &want,
1637 const read_result_t &result,
1638 map<pg_shard_t, vector<pair<int, int>>> *to_read,
1639 bool for_recovery)
1640 {
1641 ceph_assert(to_read);
1642
1643 set<int> have;
1644 map<shard_id_t, pg_shard_t> shards;
1645 set<pg_shard_t> error_shards;
1646 for (auto &p : result.errors) {
1647 error_shards.insert(p.first);
1648 }
1649
1650 get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
1651
1652 map<int, vector<pair<int, int>>> need;
1653 int r = ec_impl->minimum_to_decode(want, have, &need);
1654 if (r < 0) {
1655 dout(0) << __func__ << " not enough shards left to try for " << hoid
1656 << " read result was " << result << dendl;
1657 return -EIO;
1658 }
1659
1660 set<int> shards_left;
1661 for (auto p : need) {
1662 if (avail.find(p.first) == avail.end()) {
1663 shards_left.insert(p.first);
1664 }
1665 }
1666
1667 vector<pair<int, int>> subchunks;
1668 subchunks.push_back(make_pair(0, ec_impl->get_sub_chunk_count()));
1669 for (set<int>::iterator i = shards_left.begin();
1670 i != shards_left.end();
1671 ++i) {
1672 ceph_assert(shards.count(shard_id_t(*i)));
1673 ceph_assert(avail.find(*i) == avail.end());
1674 to_read->insert(make_pair(shards[shard_id_t(*i)], subchunks));
1675 }
1676 return 0;
1677 }
1678
1679 void ECBackend::start_read_op(
1680 int priority,
1681 map<hobject_t, set<int>> &want_to_read,
1682 map<hobject_t, read_request_t> &to_read,
1683 OpRequestRef _op,
1684 bool do_redundant_reads,
1685 bool for_recovery)
1686 {
1687 ceph_tid_t tid = get_parent()->get_tid();
1688 ceph_assert(!tid_to_read_map.count(tid));
1689 auto &op = tid_to_read_map.emplace(
1690 tid,
1691 ReadOp(
1692 priority,
1693 tid,
1694 do_redundant_reads,
1695 for_recovery,
1696 _op,
1697 std::move(want_to_read),
1698 std::move(to_read))).first->second;
1699 dout(10) << __func__ << ": starting " << op << dendl;
1700 if (_op) {
1701 op.trace = _op->pg_trace;
1702 op.trace.event("start ec read");
1703 }
1704 do_read_op(op);
1705 }
1706
1707 void ECBackend::do_read_op(ReadOp &op)
1708 {
1709 int priority = op.priority;
1710 ceph_tid_t tid = op.tid;
1711
1712 dout(10) << __func__ << ": starting read " << op << dendl;
1713
1714 map<pg_shard_t, ECSubRead> messages;
1715 for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
1716 i != op.to_read.end();
1717 ++i) {
1718 bool need_attrs = i->second.want_attrs;
1719
1720 for (auto j = i->second.need.begin();
1721 j != i->second.need.end();
1722 ++j) {
1723 if (need_attrs) {
1724 messages[j->first].attrs_to_read.insert(i->first);
1725 need_attrs = false;
1726 }
1727 messages[j->first].subchunks[i->first] = j->second;
1728 op.obj_to_source[i->first].insert(j->first);
1729 op.source_to_obj[j->first].insert(i->first);
1730 }
1731 for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
1732 i->second.to_read.begin();
1733 j != i->second.to_read.end();
1734 ++j) {
1735 pair<uint64_t, uint64_t> chunk_off_len =
1736 sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
1737 for (auto k = i->second.need.begin();
1738 k != i->second.need.end();
1739 ++k) {
1740 messages[k->first].to_read[i->first].push_back(
1741 boost::make_tuple(
1742 chunk_off_len.first,
1743 chunk_off_len.second,
1744 j->get<2>()));
1745 }
1746 ceph_assert(!need_attrs);
1747 }
1748 }
1749
1750 std::vector<std::pair<int, Message*>> m;
1751 m.reserve(messages.size());
1752 for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
1753 i != messages.end();
1754 ++i) {
1755 op.in_progress.insert(i->first);
1756 shard_to_read_map[i->first].insert(op.tid);
1757 i->second.tid = tid;
1758 MOSDECSubOpRead *msg = new MOSDECSubOpRead;
1759 msg->set_priority(priority);
1760 msg->pgid = spg_t(
1761 get_parent()->whoami_spg_t().pgid,
1762 i->first.shard);
1763 msg->map_epoch = get_osdmap_epoch();
1764 msg->min_epoch = get_parent()->get_interval_start_epoch();
1765 msg->op = i->second;
1766 msg->op.from = get_parent()->whoami_shard();
1767 msg->op.tid = tid;
1768 if (op.trace) {
1769 // initialize a child span for this shard
1770 msg->trace.init("ec sub read", nullptr, &op.trace);
1771 msg->trace.keyval("shard", i->first.shard.id);
1772 }
1773 m.push_back(std::make_pair(i->first.osd, msg));
1774 }
1775 if (!m.empty()) {
1776 get_parent()->send_message_osd_cluster(m, get_osdmap_epoch());
1777 }
1778
1779 dout(10) << __func__ << ": started " << op << dendl;
1780 }
1781
1782 ECUtil::HashInfoRef ECBackend::get_hash_info(
1783 const hobject_t &hoid, bool checks, const map<string,bufferptr> *attrs)
1784 {
1785 dout(10) << __func__ << ": Getting attr on " << hoid << dendl;
1786 ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid);
1787 if (!ref) {
1788 dout(10) << __func__ << ": not in cache " << hoid << dendl;
1789 struct stat st;
1790 int r = store->stat(
1791 ch,
1792 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1793 &st);
1794 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
1795 // XXX: What does it mean if there is no object on disk?
1796 if (r >= 0) {
1797 dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl;
1798 bufferlist bl;
1799 if (attrs) {
1800 map<string, bufferptr>::const_iterator k = attrs->find(ECUtil::get_hinfo_key());
1801 if (k == attrs->end()) {
1802 dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl;
1803 } else {
1804 bl.push_back(k->second);
1805 }
1806 } else {
1807 r = store->getattr(
1808 ch,
1809 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1810 ECUtil::get_hinfo_key(),
1811 bl);
1812 if (r < 0) {
1813 dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl;
1814 bl.clear(); // just in case
1815 }
1816 }
1817 if (bl.length() > 0) {
1818 auto bp = bl.cbegin();
1819 try {
1820 decode(hinfo, bp);
1821 } catch(...) {
1822 dout(0) << __func__ << ": Can't decode hinfo for " << hoid << dendl;
1823 return ECUtil::HashInfoRef();
1824 }
1825 if (checks && hinfo.get_total_chunk_size() != (uint64_t)st.st_size) {
1826 dout(0) << __func__ << ": Mismatch of total_chunk_size "
1827 << hinfo.get_total_chunk_size() << dendl;
1828 return ECUtil::HashInfoRef();
1829 }
1830 } else if (st.st_size > 0) { // If empty object and no hinfo, create it
1831 return ECUtil::HashInfoRef();
1832 }
1833 }
1834 ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
1835 }
1836 return ref;
1837 }
1838
1839 void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
1840 {
1841 ceph_assert(op);
1842
1843 op->plan = ECTransaction::get_write_plan(
1844 sinfo,
1845 std::move(t),
1846 [&](const hobject_t &i) {
1847 ECUtil::HashInfoRef ref = get_hash_info(i, false);
1848 if (!ref) {
1849 derr << __func__ << ": get_hash_info(" << i << ")"
1850 << " returned a null pointer and there is no "
1851 << " way to recover from such an error in this "
1852 << " context" << dendl;
1853 ceph_abort();
1854 }
1855 return ref;
1856 },
1857 get_parent()->get_dpp());
1858
1859 dout(10) << __func__ << ": " << *op << dendl;
1860
1861 waiting_state.push_back(*op);
1862 check_ops();
1863 }
1864
1865 bool ECBackend::try_state_to_reads()
1866 {
1867 if (waiting_state.empty())
1868 return false;
1869
1870 Op *op = &(waiting_state.front());
1871 if (op->requires_rmw() && pipeline_state.cache_invalid()) {
1872 ceph_assert(get_parent()->get_pool().allows_ecoverwrites());
1873 dout(20) << __func__ << ": blocking " << *op
1874 << " because it requires an rmw and the cache is invalid "
1875 << pipeline_state
1876 << dendl;
1877 return false;
1878 }
1879
1880 if (!pipeline_state.caching_enabled()) {
1881 op->using_cache = false;
1882 } else if (op->invalidates_cache()) {
1883 dout(20) << __func__ << ": invalidating cache after this op"
1884 << dendl;
1885 pipeline_state.invalidate();
1886 }
1887
1888 waiting_state.pop_front();
1889 waiting_reads.push_back(*op);
1890
1891 if (op->using_cache) {
1892 cache.open_write_pin(op->pin);
1893
1894 extent_set empty;
1895 for (auto &&hpair: op->plan.will_write) {
1896 auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
1897 const extent_set &to_read_plan =
1898 to_read_plan_iter == op->plan.to_read.end() ?
1899 empty :
1900 to_read_plan_iter->second;
1901
1902 extent_set remote_read = cache.reserve_extents_for_rmw(
1903 hpair.first,
1904 op->pin,
1905 hpair.second,
1906 to_read_plan);
1907
1908 extent_set pending_read = to_read_plan;
1909 pending_read.subtract(remote_read);
1910
1911 if (!remote_read.empty()) {
1912 op->remote_read[hpair.first] = std::move(remote_read);
1913 }
1914 if (!pending_read.empty()) {
1915 op->pending_read[hpair.first] = std::move(pending_read);
1916 }
1917 }
1918 } else {
1919 op->remote_read = op->plan.to_read;
1920 }
1921
1922 dout(10) << __func__ << ": " << *op << dendl;
1923
1924 if (!op->remote_read.empty()) {
1925 ceph_assert(get_parent()->get_pool().allows_ecoverwrites());
1926 objects_read_async_no_cache(
1927 op->remote_read,
1928 [this, op](map<hobject_t,pair<int, extent_map> > &&results) {
1929 for (auto &&i: results) {
1930 op->remote_read_result.emplace(i.first, i.second.second);
1931 }
1932 check_ops();
1933 });
1934 }
1935
1936 return true;
1937 }
1938
1939 bool ECBackend::try_reads_to_commit()
1940 {
1941 if (waiting_reads.empty())
1942 return false;
1943 Op *op = &(waiting_reads.front());
1944 if (op->read_in_progress())
1945 return false;
1946 waiting_reads.pop_front();
1947 waiting_commit.push_back(*op);
1948
1949 dout(10) << __func__ << ": starting commit on " << *op << dendl;
1950 dout(20) << __func__ << ": " << cache << dendl;
1951
1952 get_parent()->apply_stats(
1953 op->hoid,
1954 op->delta_stats);
1955
1956 if (op->using_cache) {
1957 for (auto &&hpair: op->pending_read) {
1958 op->remote_read_result[hpair.first].insert(
1959 cache.get_remaining_extents_for_rmw(
1960 hpair.first,
1961 op->pin,
1962 hpair.second));
1963 }
1964 op->pending_read.clear();
1965 } else {
1966 ceph_assert(op->pending_read.empty());
1967 }
1968
1969 map<shard_id_t, ObjectStore::Transaction> trans;
1970 for (set<pg_shard_t>::const_iterator i =
1971 get_parent()->get_acting_recovery_backfill_shards().begin();
1972 i != get_parent()->get_acting_recovery_backfill_shards().end();
1973 ++i) {
1974 trans[i->shard];
1975 }
1976
1977 op->trace.event("start ec write");
1978
1979 map<hobject_t,extent_map> written;
1980 if (op->plan.t) {
1981 ECTransaction::generate_transactions(
1982 op->plan,
1983 ec_impl,
1984 get_parent()->get_info().pgid.pgid,
1985 sinfo,
1986 op->remote_read_result,
1987 op->log_entries,
1988 &written,
1989 &trans,
1990 &(op->temp_added),
1991 &(op->temp_cleared),
1992 get_parent()->get_dpp(),
1993 get_osdmap()->require_osd_release);
1994 }
1995
1996 dout(20) << __func__ << ": " << cache << dendl;
1997 dout(20) << __func__ << ": written: " << written << dendl;
1998 dout(20) << __func__ << ": op: " << *op << dendl;
1999
2000 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2001 for (auto &&i: op->log_entries) {
2002 if (i.requires_kraken()) {
2003 derr << __func__ << ": log entry " << i << " requires kraken"
2004 << " but overwrites are not enabled!" << dendl;
2005 ceph_abort();
2006 }
2007 }
2008 }
2009
2010 map<hobject_t,extent_set> written_set;
2011 for (auto &&i: written) {
2012 written_set[i.first] = i.second.get_interval_set();
2013 }
2014 dout(20) << __func__ << ": written_set: " << written_set << dendl;
2015 ceph_assert(written_set == op->plan.will_write);
2016
2017 if (op->using_cache) {
2018 for (auto &&hpair: written) {
2019 dout(20) << __func__ << ": " << hpair << dendl;
2020 cache.present_rmw_update(hpair.first, op->pin, hpair.second);
2021 }
2022 }
2023 op->remote_read.clear();
2024 op->remote_read_result.clear();
2025
2026 ObjectStore::Transaction empty;
2027 bool should_write_local = false;
2028 ECSubWrite local_write_op;
2029 std::vector<std::pair<int, Message*>> messages;
2030 messages.reserve(get_parent()->get_acting_recovery_backfill_shards().size());
2031 set<pg_shard_t> backfill_shards = get_parent()->get_backfill_shards();
2032 for (set<pg_shard_t>::const_iterator i =
2033 get_parent()->get_acting_recovery_backfill_shards().begin();
2034 i != get_parent()->get_acting_recovery_backfill_shards().end();
2035 ++i) {
2036 op->pending_apply.insert(*i);
2037 op->pending_commit.insert(*i);
2038 map<shard_id_t, ObjectStore::Transaction>::iterator iter =
2039 trans.find(i->shard);
2040 ceph_assert(iter != trans.end());
2041 bool should_send = get_parent()->should_send_op(*i, op->hoid);
2042 const pg_stat_t &stats =
2043 (should_send || !backfill_shards.count(*i)) ?
2044 get_info().stats :
2045 parent->get_shard_info().find(*i)->second.stats;
2046
2047 ECSubWrite sop(
2048 get_parent()->whoami_shard(),
2049 op->tid,
2050 op->reqid,
2051 op->hoid,
2052 stats,
2053 should_send ? iter->second : empty,
2054 op->version,
2055 op->trim_to,
2056 op->roll_forward_to,
2057 op->log_entries,
2058 op->updated_hit_set_history,
2059 op->temp_added,
2060 op->temp_cleared,
2061 !should_send);
2062
2063 ZTracer::Trace trace;
2064 if (op->trace) {
2065 // initialize a child span for this shard
2066 trace.init("ec sub write", nullptr, &op->trace);
2067 trace.keyval("shard", i->shard.id);
2068 }
2069
2070 if (*i == get_parent()->whoami_shard()) {
2071 should_write_local = true;
2072 local_write_op.claim(sop);
2073 } else {
2074 MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop);
2075 r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard);
2076 r->map_epoch = get_osdmap_epoch();
2077 r->min_epoch = get_parent()->get_interval_start_epoch();
2078 r->trace = trace;
2079 messages.push_back(std::make_pair(i->osd, r));
2080 }
2081 }
2082 if (!messages.empty()) {
2083 get_parent()->send_message_osd_cluster(messages, get_osdmap_epoch());
2084 }
2085
2086 if (should_write_local) {
2087 handle_sub_write(
2088 get_parent()->whoami_shard(),
2089 op->client_op,
2090 local_write_op,
2091 op->trace);
2092 }
2093
2094 for (auto i = op->on_write.begin();
2095 i != op->on_write.end();
2096 op->on_write.erase(i++)) {
2097 (*i)();
2098 }
2099
2100 return true;
2101 }
2102
2103 bool ECBackend::try_finish_rmw()
2104 {
2105 if (waiting_commit.empty())
2106 return false;
2107 Op *op = &(waiting_commit.front());
2108 if (op->write_in_progress())
2109 return false;
2110 waiting_commit.pop_front();
2111
2112 dout(10) << __func__ << ": " << *op << dendl;
2113 dout(20) << __func__ << ": " << cache << dendl;
2114
2115 if (op->roll_forward_to > completed_to)
2116 completed_to = op->roll_forward_to;
2117 if (op->version > committed_to)
2118 committed_to = op->version;
2119
2120 if (get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
2121 if (op->version > get_parent()->get_log().get_can_rollback_to() &&
2122 waiting_reads.empty() &&
2123 waiting_commit.empty()) {
2124 // submit a dummy transaction to kick the rollforward
2125 auto tid = get_parent()->get_tid();
2126 Op *nop = &(tid_to_op_map[tid]);
2127 nop->hoid = op->hoid;
2128 nop->trim_to = op->trim_to;
2129 nop->roll_forward_to = op->version;
2130 nop->tid = tid;
2131 nop->reqid = op->reqid;
2132 waiting_reads.push_back(*nop);
2133 }
2134 }
2135
2136 if (op->using_cache) {
2137 cache.release_write_pin(op->pin);
2138 }
2139 tid_to_op_map.erase(op->tid);
2140
2141 if (waiting_reads.empty() &&
2142 waiting_commit.empty()) {
2143 pipeline_state.clear();
2144 dout(20) << __func__ << ": clearing pipeline_state "
2145 << pipeline_state
2146 << dendl;
2147 }
2148 return true;
2149 }
2150
2151 void ECBackend::check_ops()
2152 {
2153 while (try_state_to_reads() ||
2154 try_reads_to_commit() ||
2155 try_finish_rmw());
2156 }
2157
2158 int ECBackend::objects_read_sync(
2159 const hobject_t &hoid,
2160 uint64_t off,
2161 uint64_t len,
2162 uint32_t op_flags,
2163 bufferlist *bl)
2164 {
2165 return -EOPNOTSUPP;
2166 }
2167
2168 void ECBackend::objects_read_async(
2169 const hobject_t &hoid,
2170 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2171 pair<bufferlist*, Context*> > > &to_read,
2172 Context *on_complete,
2173 bool fast_read)
2174 {
2175 map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
2176 reads;
2177
2178 uint32_t flags = 0;
2179 extent_set es;
2180 for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2181 pair<bufferlist*, Context*> > >::const_iterator i =
2182 to_read.begin();
2183 i != to_read.end();
2184 ++i) {
2185 pair<uint64_t, uint64_t> tmp =
2186 sinfo.offset_len_to_stripe_bounds(
2187 make_pair(i->first.get<0>(), i->first.get<1>()));
2188
2189 es.union_insert(tmp.first, tmp.second);
2190 flags |= i->first.get<2>();
2191 }
2192
2193 if (!es.empty()) {
2194 auto &offsets = reads[hoid];
2195 for (auto j = es.begin();
2196 j != es.end();
2197 ++j) {
2198 offsets.push_back(
2199 boost::make_tuple(
2200 j.get_start(),
2201 j.get_len(),
2202 flags));
2203 }
2204 }
2205
2206 struct cb {
2207 ECBackend *ec;
2208 hobject_t hoid;
2209 list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2210 pair<bufferlist*, Context*> > > to_read;
2211 unique_ptr<Context> on_complete;
2212 cb(const cb&) = delete;
2213 cb(cb &&) = default;
2214 cb(ECBackend *ec,
2215 const hobject_t &hoid,
2216 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2217 pair<bufferlist*, Context*> > > &to_read,
2218 Context *on_complete)
2219 : ec(ec),
2220 hoid(hoid),
2221 to_read(to_read),
2222 on_complete(on_complete) {}
2223 void operator()(map<hobject_t,pair<int, extent_map> > &&results) {
2224 auto dpp = ec->get_parent()->get_dpp();
2225 ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
2226 << dendl;
2227 ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
2228 << dendl;
2229
2230 auto &got = results[hoid];
2231
2232 int r = 0;
2233 for (auto &&read: to_read) {
2234 if (got.first < 0) {
2235 if (read.second.second) {
2236 read.second.second->complete(got.first);
2237 }
2238 if (r == 0)
2239 r = got.first;
2240 } else {
2241 ceph_assert(read.second.first);
2242 uint64_t offset = read.first.get<0>();
2243 uint64_t length = read.first.get<1>();
2244 auto range = got.second.get_containing_range(offset, length);
2245 ceph_assert(range.first != range.second);
2246 ceph_assert(range.first.get_off() <= offset);
2247 ldpp_dout(dpp, 30) << "offset: " << offset << dendl;
2248 ldpp_dout(dpp, 30) << "range offset: " << range.first.get_off() << dendl;
2249 ldpp_dout(dpp, 30) << "length: " << length << dendl;
2250 ldpp_dout(dpp, 30) << "range length: " << range.first.get_len() << dendl;
2251 ceph_assert(
2252 (offset + length) <=
2253 (range.first.get_off() + range.first.get_len()));
2254 read.second.first->substr_of(
2255 range.first.get_val(),
2256 offset - range.first.get_off(),
2257 length);
2258 if (read.second.second) {
2259 read.second.second->complete(length);
2260 read.second.second = nullptr;
2261 }
2262 }
2263 }
2264 to_read.clear();
2265 if (on_complete) {
2266 on_complete.release()->complete(r);
2267 }
2268 }
2269 ~cb() {
2270 for (auto &&i: to_read) {
2271 delete i.second.second;
2272 }
2273 to_read.clear();
2274 }
2275 };
2276 objects_read_and_reconstruct(
2277 reads,
2278 fast_read,
2279 make_gen_lambda_context<
2280 map<hobject_t,pair<int, extent_map> > &&, cb>(
2281 cb(this,
2282 hoid,
2283 to_read,
2284 on_complete)));
2285 }
2286
2287 struct CallClientContexts :
2288 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
2289 hobject_t hoid;
2290 ECBackend *ec;
2291 ECBackend::ClientAsyncReadStatus *status;
2292 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
2293 CallClientContexts(
2294 hobject_t hoid,
2295 ECBackend *ec,
2296 ECBackend::ClientAsyncReadStatus *status,
2297 const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
2298 : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
2299 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
2300 ECBackend::read_result_t &res = in.second;
2301 extent_map result;
2302 if (res.r != 0)
2303 goto out;
2304 ceph_assert(res.returned.size() == to_read.size());
2305 ceph_assert(res.errors.empty());
2306 for (auto &&read: to_read) {
2307 pair<uint64_t, uint64_t> adjusted =
2308 ec->sinfo.offset_len_to_stripe_bounds(
2309 make_pair(read.get<0>(), read.get<1>()));
2310 ceph_assert(res.returned.front().get<0>() == adjusted.first &&
2311 res.returned.front().get<1>() == adjusted.second);
2312 map<int, bufferlist> to_decode;
2313 bufferlist bl;
2314 for (map<pg_shard_t, bufferlist>::iterator j =
2315 res.returned.front().get<2>().begin();
2316 j != res.returned.front().get<2>().end();
2317 ++j) {
2318 to_decode[j->first.shard].claim(j->second);
2319 }
2320 int r = ECUtil::decode(
2321 ec->sinfo,
2322 ec->ec_impl,
2323 to_decode,
2324 &bl);
2325 if (r < 0) {
2326 res.r = r;
2327 goto out;
2328 }
2329 bufferlist trimmed;
2330 trimmed.substr_of(
2331 bl,
2332 read.get<0>() - adjusted.first,
2333 std::min(read.get<1>(),
2334 bl.length() - (read.get<0>() - adjusted.first)));
2335 result.insert(
2336 read.get<0>(), trimmed.length(), std::move(trimmed));
2337 res.returned.pop_front();
2338 }
2339 out:
2340 status->complete_object(hoid, res.r, std::move(result));
2341 ec->kick_reads();
2342 }
2343 };
2344
2345 void ECBackend::objects_read_and_reconstruct(
2346 const map<hobject_t,
2347 std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
2348 > &reads,
2349 bool fast_read,
2350 GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
2351 {
2352 in_progress_client_reads.emplace_back(
2353 reads.size(), std::move(func));
2354 if (!reads.size()) {
2355 kick_reads();
2356 return;
2357 }
2358
2359 map<hobject_t, set<int>> obj_want_to_read;
2360 set<int> want_to_read;
2361 get_want_to_read_shards(&want_to_read);
2362
2363 map<hobject_t, read_request_t> for_read_op;
2364 for (auto &&to_read: reads) {
2365 map<pg_shard_t, vector<pair<int, int>>> shards;
2366 int r = get_min_avail_to_read_shards(
2367 to_read.first,
2368 want_to_read,
2369 false,
2370 fast_read,
2371 &shards);
2372 ceph_assert(r == 0);
2373
2374 CallClientContexts *c = new CallClientContexts(
2375 to_read.first,
2376 this,
2377 &(in_progress_client_reads.back()),
2378 to_read.second);
2379 for_read_op.insert(
2380 make_pair(
2381 to_read.first,
2382 read_request_t(
2383 to_read.second,
2384 shards,
2385 false,
2386 c)));
2387 obj_want_to_read.insert(make_pair(to_read.first, want_to_read));
2388 }
2389
2390 start_read_op(
2391 CEPH_MSG_PRIO_DEFAULT,
2392 obj_want_to_read,
2393 for_read_op,
2394 OpRequestRef(),
2395 fast_read, false);
2396 return;
2397 }
2398
2399
2400 int ECBackend::send_all_remaining_reads(
2401 const hobject_t &hoid,
2402 ReadOp &rop)
2403 {
2404 set<int> already_read;
2405 const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
2406 for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
2407 already_read.insert(i->shard);
2408 dout(10) << __func__ << " have/error shards=" << already_read << dendl;
2409 map<pg_shard_t, vector<pair<int, int>>> shards;
2410 int r = get_remaining_shards(hoid, already_read, rop.want_to_read[hoid],
2411 rop.complete[hoid], &shards, rop.for_recovery);
2412 if (r)
2413 return r;
2414
2415 list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
2416 rop.to_read.find(hoid)->second.to_read;
2417 GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
2418 rop.to_read.find(hoid)->second.cb;
2419
2420 // (Note cuixf) If we need to read attrs and we read failed, try to read again.
2421 bool want_attrs =
2422 rop.to_read.find(hoid)->second.want_attrs &&
2423 (!rop.complete[hoid].attrs || rop.complete[hoid].attrs->empty());
2424 if (want_attrs) {
2425 dout(10) << __func__ << " want attrs again" << dendl;
2426 }
2427
2428 rop.to_read.erase(hoid);
2429 rop.to_read.insert(make_pair(
2430 hoid,
2431 read_request_t(
2432 offsets,
2433 shards,
2434 want_attrs,
2435 c)));
2436 do_read_op(rop);
2437 return 0;
2438 }
2439
2440 int ECBackend::objects_get_attrs(
2441 const hobject_t &hoid,
2442 map<string, bufferlist> *out)
2443 {
2444 int r = store->getattrs(
2445 ch,
2446 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2447 *out);
2448 if (r < 0)
2449 return r;
2450
2451 for (map<string, bufferlist>::iterator i = out->begin();
2452 i != out->end();
2453 ) {
2454 if (ECUtil::is_hinfo_key_string(i->first))
2455 out->erase(i++);
2456 else
2457 ++i;
2458 }
2459 return r;
2460 }
2461
2462 void ECBackend::rollback_append(
2463 const hobject_t &hoid,
2464 uint64_t old_size,
2465 ObjectStore::Transaction *t)
2466 {
2467 ceph_assert(old_size % sinfo.get_stripe_width() == 0);
2468 t->truncate(
2469 coll,
2470 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2471 sinfo.aligned_logical_offset_to_chunk_offset(
2472 old_size));
2473 }
2474
2475 int ECBackend::be_deep_scrub(
2476 const hobject_t &poid,
2477 ScrubMap &map,
2478 ScrubMapBuilder &pos,
2479 ScrubMap::object &o)
2480 {
2481 dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
2482 int r;
2483
2484 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
2485 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
2486
2487 utime_t sleeptime;
2488 sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
2489 if (sleeptime != utime_t()) {
2490 lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
2491 sleeptime.sleep();
2492 }
2493
2494 if (pos.data_pos == 0) {
2495 pos.data_hash = bufferhash(-1);
2496 }
2497
2498 uint64_t stride = cct->_conf->osd_deep_scrub_stride;
2499 if (stride % sinfo.get_chunk_size())
2500 stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
2501
2502 bufferlist bl;
2503 r = store->read(
2504 ch,
2505 ghobject_t(
2506 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2507 pos.data_pos,
2508 stride, bl,
2509 fadvise_flags);
2510 if (r < 0) {
2511 dout(20) << __func__ << " " << poid << " got "
2512 << r << " on read, read_error" << dendl;
2513 o.read_error = true;
2514 return 0;
2515 }
2516 if (bl.length() % sinfo.get_chunk_size()) {
2517 dout(20) << __func__ << " " << poid << " got "
2518 << r << " on read, not chunk size " << sinfo.get_chunk_size() << " aligned"
2519 << dendl;
2520 o.read_error = true;
2521 return 0;
2522 }
2523 if (r > 0) {
2524 pos.data_hash << bl;
2525 }
2526 pos.data_pos += r;
2527 if (r == (int)stride) {
2528 return -EINPROGRESS;
2529 }
2530
2531 ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
2532 if (!hinfo) {
2533 dout(0) << "_scan_list " << poid << " could not retrieve hash info" << dendl;
2534 o.read_error = true;
2535 o.digest_present = false;
2536 return 0;
2537 } else {
2538 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2539 ceph_assert(hinfo->has_chunk_hash());
2540 if (hinfo->get_total_chunk_size() != (unsigned)pos.data_pos) {
2541 dout(0) << "_scan_list " << poid << " got incorrect size on read 0x"
2542 << std::hex << pos
2543 << " expected 0x" << hinfo->get_total_chunk_size() << std::dec
2544 << dendl;
2545 o.ec_size_mismatch = true;
2546 return 0;
2547 }
2548
2549 if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) !=
2550 pos.data_hash.digest()) {
2551 dout(0) << "_scan_list " << poid << " got incorrect hash on read 0x"
2552 << std::hex << pos.data_hash.digest() << " != expected 0x"
2553 << hinfo->get_chunk_hash(get_parent()->whoami_shard().shard)
2554 << std::dec << dendl;
2555 o.ec_hash_mismatch = true;
2556 return 0;
2557 }
2558
2559 /* We checked above that we match our own stored hash. We cannot
2560 * send a hash of the actual object, so instead we simply send
2561 * our locally stored hash of shard 0 on the assumption that if
2562 * we match our chunk hash and our recollection of the hash for
2563 * chunk 0 matches that of our peers, there is likely no corruption.
2564 */
2565 o.digest = hinfo->get_chunk_hash(0);
2566 o.digest_present = true;
2567 } else {
2568 /* Hack! We must be using partial overwrites, and partial overwrites
2569 * don't support deep-scrub yet
2570 */
2571 o.digest = 0;
2572 o.digest_present = true;
2573 }
2574 }
2575
2576 o.omap_digest = -1;
2577 o.omap_digest_present = true;
2578 return 0;
2579 }