]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/ECBackend.cc
import ceph 16.2.7
[ceph.git] / ceph / src / osd / ECBackend.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <iostream>
16#include <sstream>
17
18#include "ECBackend.h"
19#include "messages/MOSDPGPush.h"
20#include "messages/MOSDPGPushReply.h"
21#include "messages/MOSDECSubOpWrite.h"
22#include "messages/MOSDECSubOpWriteReply.h"
23#include "messages/MOSDECSubOpRead.h"
24#include "messages/MOSDECSubOpReadReply.h"
25#include "ECMsgTypes.h"
26
27#include "PrimaryLogPG.h"
28
29#define dout_context cct
30#define dout_subsys ceph_subsys_osd
31#define DOUT_PREFIX_ARGS this
32#undef dout_prefix
33#define dout_prefix _prefix(_dout, this)
f67539c2
TL
34
35using std::dec;
36using std::hex;
37using std::list;
38using std::make_pair;
39using std::map;
40using std::pair;
41using std::ostream;
42using std::set;
43using std::string;
44using std::unique_ptr;
45using std::vector;
46
47using ceph::bufferhash;
48using ceph::bufferlist;
49using ceph::bufferptr;
50using ceph::ErasureCodeInterfaceRef;
51using ceph::Formatter;
52
7c673cae 53static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
11fdf7f2 54 return pgb->get_parent()->gen_dbg_prefix(*_dout);
7c673cae
FG
55}
56
57struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
58 list<ECBackend::RecoveryOp> ops;
59};
60
61ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
62 switch (rhs.pipeline_state) {
63 case ECBackend::pipeline_state_t::CACHE_VALID:
64 return lhs << "CACHE_VALID";
65 case ECBackend::pipeline_state_t::CACHE_INVALID:
66 return lhs << "CACHE_INVALID";
67 default:
11fdf7f2 68 ceph_abort_msg("invalid pipeline state");
7c673cae
FG
69 }
70 return lhs; // unreachable
71}
72
73static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
74{
75 lhs << "[";
76 for (map<pg_shard_t, bufferlist>::const_iterator i = rhs.begin();
77 i != rhs.end();
78 ++i) {
79 if (i != rhs.begin())
80 lhs << ", ";
81 lhs << make_pair(i->first, i->second.length());
82 }
83 return lhs << "]";
84}
85
86static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
87{
88 lhs << "[";
89 for (map<int, bufferlist>::const_iterator i = rhs.begin();
90 i != rhs.end();
91 ++i) {
92 if (i != rhs.begin())
93 lhs << ", ";
94 lhs << make_pair(i->first, i->second.length());
95 }
96 return lhs << "]";
97}
98
99static ostream &operator<<(
100 ostream &lhs,
101 const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
102{
103 return lhs << "(" << rhs.get<0>() << ", "
104 << rhs.get<1>() << ", " << rhs.get<2>() << ")";
105}
106
107ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
108{
109 return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
110 << ", need=" << rhs.need
111 << ", want_attrs=" << rhs.want_attrs
112 << ")";
113}
114
115ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
116{
117 lhs << "read_result_t(r=" << rhs.r
118 << ", errors=" << rhs.errors;
119 if (rhs.attrs) {
9f95a23c 120 lhs << ", attrs=" << *(rhs.attrs);
7c673cae
FG
121 } else {
122 lhs << ", noattrs";
123 }
124 return lhs << ", returned=" << rhs.returned << ")";
125}
126
127ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
128{
129 lhs << "ReadOp(tid=" << rhs.tid;
130 if (rhs.op && rhs.op->get_req()) {
131 lhs << ", op=";
132 rhs.op->get_req()->print(lhs);
133 }
134 return lhs << ", to_read=" << rhs.to_read
135 << ", complete=" << rhs.complete
136 << ", priority=" << rhs.priority
137 << ", obj_to_source=" << rhs.obj_to_source
138 << ", source_to_obj=" << rhs.source_to_obj
139 << ", in_progress=" << rhs.in_progress << ")";
140}
141
142void ECBackend::ReadOp::dump(Formatter *f) const
143{
144 f->dump_unsigned("tid", tid);
145 if (op && op->get_req()) {
146 f->dump_stream("op") << *(op->get_req());
147 }
148 f->dump_stream("to_read") << to_read;
149 f->dump_stream("complete") << complete;
150 f->dump_int("priority", priority);
151 f->dump_stream("obj_to_source") << obj_to_source;
152 f->dump_stream("source_to_obj") << source_to_obj;
153 f->dump_stream("in_progress") << in_progress;
154}
155
156ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
157{
158 lhs << "Op(" << rhs.hoid
159 << " v=" << rhs.version
160 << " tt=" << rhs.trim_to
161 << " tid=" << rhs.tid
162 << " reqid=" << rhs.reqid;
163 if (rhs.client_op && rhs.client_op->get_req()) {
164 lhs << " client_op=";
165 rhs.client_op->get_req()->print(lhs);
166 }
167 lhs << " roll_forward_to=" << rhs.roll_forward_to
168 << " temp_added=" << rhs.temp_added
169 << " temp_cleared=" << rhs.temp_cleared
170 << " pending_read=" << rhs.pending_read
171 << " remote_read=" << rhs.remote_read
172 << " remote_read_result=" << rhs.remote_read_result
173 << " pending_apply=" << rhs.pending_apply
174 << " pending_commit=" << rhs.pending_commit
175 << " plan.to_read=" << rhs.plan.to_read
176 << " plan.will_write=" << rhs.plan.will_write
177 << ")";
178 return lhs;
179}
180
181ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
182{
183 return lhs << "RecoveryOp("
184 << "hoid=" << rhs.hoid
185 << " v=" << rhs.v
186 << " missing_on=" << rhs.missing_on
187 << " missing_on_shards=" << rhs.missing_on_shards
188 << " recovery_info=" << rhs.recovery_info
189 << " recovery_progress=" << rhs.recovery_progress
190 << " obc refcount=" << rhs.obc.use_count()
191 << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
192 << " waiting_on_pushes=" << rhs.waiting_on_pushes
193 << " extent_requested=" << rhs.extent_requested
194 << ")";
195}
196
197void ECBackend::RecoveryOp::dump(Formatter *f) const
198{
199 f->dump_stream("hoid") << hoid;
200 f->dump_stream("v") << v;
201 f->dump_stream("missing_on") << missing_on;
202 f->dump_stream("missing_on_shards") << missing_on_shards;
203 f->dump_stream("recovery_info") << recovery_info;
204 f->dump_stream("recovery_progress") << recovery_progress;
205 f->dump_stream("state") << tostr(state);
206 f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
207 f->dump_stream("extent_requested") << extent_requested;
208}
209
210ECBackend::ECBackend(
211 PGBackend::Listener *pg,
11fdf7f2 212 const coll_t &coll,
7c673cae
FG
213 ObjectStore::CollectionHandle &ch,
214 ObjectStore *store,
215 CephContext *cct,
216 ErasureCodeInterfaceRef ec_impl,
217 uint64_t stripe_width)
218 : PGBackend(cct, pg, store, coll, ch),
219 ec_impl(ec_impl),
220 sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
11fdf7f2 221 ceph_assert((ec_impl->get_data_chunk_count() *
7c673cae
FG
222 ec_impl->get_chunk_size(stripe_width)) == stripe_width);
223}
224
225PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
226{
227 return new ECRecoveryHandle;
228}
229
230void ECBackend::_failed_push(const hobject_t &hoid,
231 pair<RecoveryMessages *, ECBackend::read_result_t &> &in)
232{
233 ECBackend::read_result_t &res = in.second;
234 dout(10) << __func__ << ": Read error " << hoid << " r="
235 << res.r << " errors=" << res.errors << dendl;
236 dout(10) << __func__ << ": canceling recovery op for obj " << hoid
237 << dendl;
11fdf7f2 238 ceph_assert(recovery_ops.count(hoid));
b32b8144 239 eversion_t v = recovery_ops[hoid].v;
7c673cae
FG
240 recovery_ops.erase(hoid);
241
9f95a23c 242 set<pg_shard_t> fl;
7c673cae 243 for (auto&& i : res.errors) {
9f95a23c 244 fl.insert(i.first);
7c673cae 245 }
9f95a23c 246 get_parent()->on_failed_pull(fl, hoid, v);
7c673cae
FG
247}
248
249struct OnRecoveryReadComplete :
250 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
251 ECBackend *pg;
252 hobject_t hoid;
7c673cae
FG
253 OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid)
254 : pg(pg), hoid(hoid) {}
255 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
256 ECBackend::read_result_t &res = in.second;
257 if (!(res.r == 0 && res.errors.empty())) {
258 pg->_failed_push(hoid, in);
259 return;
260 }
11fdf7f2 261 ceph_assert(res.returned.size() == 1);
7c673cae
FG
262 pg->handle_recovery_read_complete(
263 hoid,
264 res.returned.back(),
265 res.attrs,
266 in.first);
267 }
268};
269
270struct RecoveryMessages {
271 map<hobject_t,
272 ECBackend::read_request_t> reads;
28e407b8 273 map<hobject_t, set<int>> want_to_read;
7c673cae
FG
274 void read(
275 ECBackend *ec,
276 const hobject_t &hoid, uint64_t off, uint64_t len,
28e407b8 277 set<int> &&_want_to_read,
11fdf7f2 278 const map<pg_shard_t, vector<pair<int, int>>> &need,
7c673cae
FG
279 bool attrs) {
280 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
281 to_read.push_back(boost::make_tuple(off, len, 0));
11fdf7f2 282 ceph_assert(!reads.count(hoid));
28e407b8 283 want_to_read.insert(make_pair(hoid, std::move(_want_to_read)));
7c673cae
FG
284 reads.insert(
285 make_pair(
286 hoid,
287 ECBackend::read_request_t(
288 to_read,
289 need,
290 attrs,
291 new OnRecoveryReadComplete(
292 ec,
293 hoid))));
294 }
295
296 map<pg_shard_t, vector<PushOp> > pushes;
297 map<pg_shard_t, vector<PushReplyOp> > push_replies;
298 ObjectStore::Transaction t;
299 RecoveryMessages() {}
f67539c2 300 ~RecoveryMessages() {}
7c673cae
FG
301};
302
303void ECBackend::handle_recovery_push(
304 const PushOp &op,
11fdf7f2
TL
305 RecoveryMessages *m,
306 bool is_repair)
7c673cae 307{
11fdf7f2
TL
308 if (get_parent()->check_failsafe_full()) {
309 dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl;
7c673cae
FG
310 ceph_abort();
311 }
312
313 bool oneshot = op.before_progress.first && op.after_progress.data_complete;
314 ghobject_t tobj;
315 if (oneshot) {
316 tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
317 get_parent()->whoami_shard().shard);
318 } else {
319 tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
320 op.version),
321 ghobject_t::NO_GEN,
322 get_parent()->whoami_shard().shard);
323 if (op.before_progress.first) {
324 dout(10) << __func__ << ": Adding oid "
325 << tobj.hobj << " in the temp collection" << dendl;
326 add_temp_obj(tobj.hobj);
327 }
328 }
329
330 if (op.before_progress.first) {
331 m->t.remove(coll, tobj);
332 m->t.touch(coll, tobj);
333 }
334
335 if (!op.data_included.empty()) {
336 uint64_t start = op.data_included.range_start();
337 uint64_t end = op.data_included.range_end();
11fdf7f2 338 ceph_assert(op.data.length() == (end - start));
7c673cae
FG
339
340 m->t.write(
341 coll,
342 tobj,
343 start,
344 op.data.length(),
345 op.data);
346 } else {
11fdf7f2
TL
347 ceph_assert(op.data.length() == 0);
348 }
349
350 if (get_parent()->pg_is_remote_backfilling()) {
351 get_parent()->pg_add_local_num_bytes(op.data.length());
352 get_parent()->pg_add_num_bytes(op.data.length() * get_ec_data_chunk_count());
353 dout(10) << __func__ << " " << op.soid
354 << " add new actual data by " << op.data.length()
355 << " add new num_bytes by " << op.data.length() * get_ec_data_chunk_count()
356 << dendl;
7c673cae
FG
357 }
358
359 if (op.before_progress.first) {
11fdf7f2 360 ceph_assert(op.attrset.count(string("_")));
7c673cae
FG
361 m->t.setattrs(
362 coll,
363 tobj,
364 op.attrset);
365 }
366
367 if (op.after_progress.data_complete && !oneshot) {
368 dout(10) << __func__ << ": Removing oid "
369 << tobj.hobj << " from the temp collection" << dendl;
370 clear_temp_obj(tobj.hobj);
371 m->t.remove(coll, ghobject_t(
372 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
373 m->t.collection_move_rename(
374 coll, tobj,
375 coll, ghobject_t(
376 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
377 }
378 if (op.after_progress.data_complete) {
379 if ((get_parent()->pgb_is_primary())) {
11fdf7f2
TL
380 ceph_assert(recovery_ops.count(op.soid));
381 ceph_assert(recovery_ops[op.soid].obc);
382 if (get_parent()->pg_is_repair())
383 get_parent()->inc_osd_stat_repaired();
7c673cae
FG
384 get_parent()->on_local_recover(
385 op.soid,
386 op.recovery_info,
387 recovery_ops[op.soid].obc,
c07f9fc5 388 false,
7c673cae
FG
389 &m->t);
390 } else {
11fdf7f2
TL
391 // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired
392 if (is_repair)
393 get_parent()->inc_osd_stat_repaired();
7c673cae
FG
394 get_parent()->on_local_recover(
395 op.soid,
396 op.recovery_info,
397 ObjectContextRef(),
c07f9fc5 398 false,
7c673cae 399 &m->t);
11fdf7f2
TL
400 if (get_parent()->pg_is_remote_backfilling()) {
401 struct stat st;
402 int r = store->stat(ch, ghobject_t(op.soid, ghobject_t::NO_GEN,
403 get_parent()->whoami_shard().shard), &st);
404 if (r == 0) {
405 get_parent()->pg_sub_local_num_bytes(st.st_size);
406 // XXX: This can be way overestimated for small objects
407 get_parent()->pg_sub_num_bytes(st.st_size * get_ec_data_chunk_count());
408 dout(10) << __func__ << " " << op.soid
409 << " sub actual data by " << st.st_size
410 << " sub num_bytes by " << st.st_size * get_ec_data_chunk_count()
411 << dendl;
412 }
413 }
7c673cae
FG
414 }
415 }
416 m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
417 m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
418}
419
420void ECBackend::handle_recovery_push_reply(
421 const PushReplyOp &op,
422 pg_shard_t from,
423 RecoveryMessages *m)
424{
425 if (!recovery_ops.count(op.soid))
426 return;
427 RecoveryOp &rop = recovery_ops[op.soid];
11fdf7f2 428 ceph_assert(rop.waiting_on_pushes.count(from));
7c673cae
FG
429 rop.waiting_on_pushes.erase(from);
430 continue_recovery_op(rop, m);
431}
432
433void ECBackend::handle_recovery_read_complete(
434 const hobject_t &hoid,
435 boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
9f95a23c 436 std::optional<map<string, bufferlist> > attrs,
7c673cae
FG
437 RecoveryMessages *m)
438{
439 dout(10) << __func__ << ": returned " << hoid << " "
440 << "(" << to_read.get<0>()
441 << ", " << to_read.get<1>()
442 << ", " << to_read.get<2>()
443 << ")"
444 << dendl;
11fdf7f2 445 ceph_assert(recovery_ops.count(hoid));
7c673cae 446 RecoveryOp &op = recovery_ops[hoid];
11fdf7f2 447 ceph_assert(op.returned_data.empty());
7c673cae
FG
448 map<int, bufferlist*> target;
449 for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
450 i != op.missing_on_shards.end();
451 ++i) {
452 target[*i] = &(op.returned_data[*i]);
453 }
454 map<int, bufferlist> from;
455 for(map<pg_shard_t, bufferlist>::iterator i = to_read.get<2>().begin();
456 i != to_read.get<2>().end();
457 ++i) {
f67539c2 458 from[i->first.shard] = std::move(i->second);
7c673cae
FG
459 }
460 dout(10) << __func__ << ": " << from << dendl;
11fdf7f2
TL
461 int r;
462 r = ECUtil::decode(sinfo, ec_impl, from, target);
463 ceph_assert(r == 0);
7c673cae
FG
464 if (attrs) {
465 op.xattrs.swap(*attrs);
466
467 if (!op.obc) {
468 // attrs only reference the origin bufferlist (decode from
469 // ECSubReadReply message) whose size is much greater than attrs
470 // in recovery. If obc cache it (get_obc maybe cache the attr),
471 // this causes the whole origin bufferlist would not be free
472 // until obc is evicted from obc cache. So rebuild the
473 // bufferlist before cache it.
474 for (map<string, bufferlist>::iterator it = op.xattrs.begin();
475 it != op.xattrs.end();
476 ++it) {
477 it->second.rebuild();
478 }
479 // Need to remove ECUtil::get_hinfo_key() since it should not leak out
480 // of the backend (see bug #12983)
481 map<string, bufferlist> sanitized_attrs(op.xattrs);
482 sanitized_attrs.erase(ECUtil::get_hinfo_key());
483 op.obc = get_parent()->get_obc(hoid, sanitized_attrs);
11fdf7f2 484 ceph_assert(op.obc);
7c673cae
FG
485 op.recovery_info.size = op.obc->obs.oi.size;
486 op.recovery_info.oi = op.obc->obs.oi;
487 }
488
489 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
490 if (op.obc->obs.oi.size > 0) {
11fdf7f2
TL
491 ceph_assert(op.xattrs.count(ECUtil::get_hinfo_key()));
492 auto bp = op.xattrs[ECUtil::get_hinfo_key()].cbegin();
493 decode(hinfo, bp);
7c673cae
FG
494 }
495 op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
496 }
11fdf7f2
TL
497 ceph_assert(op.xattrs.size());
498 ceph_assert(op.obc);
7c673cae
FG
499 continue_recovery_op(op, m);
500}
501
502struct SendPushReplies : public Context {
503 PGBackend::Listener *l;
504 epoch_t epoch;
505 map<int, MOSDPGPushReply*> replies;
506 SendPushReplies(
507 PGBackend::Listener *l,
508 epoch_t epoch,
509 map<int, MOSDPGPushReply*> &in) : l(l), epoch(epoch) {
510 replies.swap(in);
511 }
512 void finish(int) override {
9f95a23c
TL
513 std::vector<std::pair<int, Message*>> messages;
514 messages.reserve(replies.size());
7c673cae
FG
515 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
516 i != replies.end();
517 ++i) {
9f95a23c
TL
518 messages.push_back(std::make_pair(i->first, i->second));
519 }
520 if (!messages.empty()) {
521 l->send_message_osd_cluster(messages, epoch);
7c673cae
FG
522 }
523 replies.clear();
524 }
525 ~SendPushReplies() override {
526 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
527 i != replies.end();
528 ++i) {
529 i->second->put();
530 }
531 replies.clear();
532 }
533};
534
535void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
536{
537 for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
538 i != m.pushes.end();
539 m.pushes.erase(i++)) {
540 MOSDPGPush *msg = new MOSDPGPush();
541 msg->set_priority(priority);
11fdf7f2 542 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
543 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
544 msg->from = get_parent()->whoami_shard();
545 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
546 msg->pushes.swap(i->second);
547 msg->compute_cost(cct);
11fdf7f2 548 msg->is_repair = get_parent()->pg_is_repair();
7c673cae
FG
549 get_parent()->send_message(
550 i->first.osd,
551 msg);
552 }
553 map<int, MOSDPGPushReply*> replies;
554 for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
555 m.push_replies.begin();
556 i != m.push_replies.end();
557 m.push_replies.erase(i++)) {
558 MOSDPGPushReply *msg = new MOSDPGPushReply();
559 msg->set_priority(priority);
11fdf7f2 560 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
561 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
562 msg->from = get_parent()->whoami_shard();
563 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
564 msg->replies.swap(i->second);
565 msg->compute_cost(cct);
566 replies.insert(make_pair(i->first.osd, msg));
567 }
568
569 if (!replies.empty()) {
570 (m.t).register_on_complete(
571 get_parent()->bless_context(
572 new SendPushReplies(
573 get_parent(),
11fdf7f2 574 get_osdmap_epoch(),
7c673cae
FG
575 replies)));
576 get_parent()->queue_transaction(std::move(m.t));
577 }
578
579 if (m.reads.empty())
580 return;
581 start_read_op(
582 priority,
28e407b8 583 m.want_to_read,
7c673cae
FG
584 m.reads,
585 OpRequestRef(),
586 false, true);
587}
588
589void ECBackend::continue_recovery_op(
590 RecoveryOp &op,
591 RecoveryMessages *m)
592{
593 dout(10) << __func__ << ": continuing " << op << dendl;
594 while (1) {
595 switch (op.state) {
596 case RecoveryOp::IDLE: {
597 // start read
598 op.state = RecoveryOp::READING;
11fdf7f2 599 ceph_assert(!op.recovery_progress.data_complete);
7c673cae
FG
600 set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
601 uint64_t from = op.recovery_progress.data_recovered_to;
602 uint64_t amount = get_recovery_chunk_size();
603
604 if (op.recovery_progress.first && op.obc) {
605 /* We've got the attrs and the hinfo, might as well use them */
606 op.hinfo = get_hash_info(op.hoid);
a4b75251
TL
607 if (!op.hinfo) {
608 derr << __func__ << ": " << op.hoid << " has inconsistent hinfo"
609 << dendl;
610 ceph_assert(recovery_ops.count(op.hoid));
611 eversion_t v = recovery_ops[op.hoid].v;
612 recovery_ops.erase(op.hoid);
613 get_parent()->on_failed_pull({get_parent()->whoami_shard()},
614 op.hoid, v);
615 return;
616 }
7c673cae 617 op.xattrs = op.obc->attr_cache;
11fdf7f2 618 encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
7c673cae
FG
619 }
620
11fdf7f2 621 map<pg_shard_t, vector<pair<int, int>>> to_read;
7c673cae
FG
622 int r = get_min_avail_to_read_shards(
623 op.hoid, want, true, false, &to_read);
624 if (r != 0) {
625 // we must have lost a recovery source
11fdf7f2 626 ceph_assert(!op.recovery_progress.first);
7c673cae
FG
627 dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
628 << dendl;
629 get_parent()->cancel_pull(op.hoid);
630 recovery_ops.erase(op.hoid);
631 return;
632 }
633 m->read(
634 this,
635 op.hoid,
636 op.recovery_progress.data_recovered_to,
637 amount,
28e407b8 638 std::move(want),
7c673cae
FG
639 to_read,
640 op.recovery_progress.first && !op.obc);
641 op.extent_requested = make_pair(
642 from,
643 amount);
644 dout(10) << __func__ << ": IDLE return " << op << dendl;
645 return;
646 }
647 case RecoveryOp::READING: {
648 // read completed, start write
11fdf7f2
TL
649 ceph_assert(op.xattrs.size());
650 ceph_assert(op.returned_data.size());
7c673cae
FG
651 op.state = RecoveryOp::WRITING;
652 ObjectRecoveryProgress after_progress = op.recovery_progress;
653 after_progress.data_recovered_to += op.extent_requested.second;
654 after_progress.first = false;
655 if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
656 after_progress.data_recovered_to =
657 sinfo.logical_to_next_stripe_offset(
658 op.obc->obs.oi.size);
659 after_progress.data_complete = true;
660 }
661 for (set<pg_shard_t>::iterator mi = op.missing_on.begin();
662 mi != op.missing_on.end();
663 ++mi) {
11fdf7f2 664 ceph_assert(op.returned_data.count(mi->shard));
7c673cae
FG
665 m->pushes[*mi].push_back(PushOp());
666 PushOp &pop = m->pushes[*mi].back();
667 pop.soid = op.hoid;
668 pop.version = op.v;
669 pop.data = op.returned_data[mi->shard];
670 dout(10) << __func__ << ": before_progress=" << op.recovery_progress
671 << ", after_progress=" << after_progress
672 << ", pop.data.length()=" << pop.data.length()
673 << ", size=" << op.obc->obs.oi.size << dendl;
11fdf7f2 674 ceph_assert(
7c673cae
FG
675 pop.data.length() ==
676 sinfo.aligned_logical_offset_to_chunk_offset(
677 after_progress.data_recovered_to -
678 op.recovery_progress.data_recovered_to)
679 );
680 if (pop.data.length())
681 pop.data_included.insert(
682 sinfo.aligned_logical_offset_to_chunk_offset(
683 op.recovery_progress.data_recovered_to),
684 pop.data.length()
685 );
686 if (op.recovery_progress.first) {
687 pop.attrset = op.xattrs;
688 }
689 pop.recovery_info = op.recovery_info;
690 pop.before_progress = op.recovery_progress;
691 pop.after_progress = after_progress;
692 if (*mi != get_parent()->primary_shard())
693 get_parent()->begin_peer_recover(
694 *mi,
695 op.hoid);
696 }
697 op.returned_data.clear();
698 op.waiting_on_pushes = op.missing_on;
699 op.recovery_progress = after_progress;
700 dout(10) << __func__ << ": READING return " << op << dendl;
701 return;
702 }
703 case RecoveryOp::WRITING: {
704 if (op.waiting_on_pushes.empty()) {
705 if (op.recovery_progress.data_complete) {
706 op.state = RecoveryOp::COMPLETE;
707 for (set<pg_shard_t>::iterator i = op.missing_on.begin();
708 i != op.missing_on.end();
709 ++i) {
710 if (*i != get_parent()->primary_shard()) {
711 dout(10) << __func__ << ": on_peer_recover on " << *i
712 << ", obj " << op.hoid << dendl;
713 get_parent()->on_peer_recover(
714 *i,
715 op.hoid,
716 op.recovery_info);
717 }
718 }
719 object_stat_sum_t stat;
720 stat.num_bytes_recovered = op.recovery_info.size;
721 stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
722 stat.num_objects_recovered = 1;
11fdf7f2
TL
723 if (get_parent()->pg_is_repair())
724 stat.num_objects_repaired = 1;
c07f9fc5 725 get_parent()->on_global_recover(op.hoid, stat, false);
7c673cae
FG
726 dout(10) << __func__ << ": WRITING return " << op << dendl;
727 recovery_ops.erase(op.hoid);
728 return;
729 } else {
730 op.state = RecoveryOp::IDLE;
731 dout(10) << __func__ << ": WRITING continue " << op << dendl;
732 continue;
733 }
734 }
735 return;
736 }
737 // should never be called once complete
738 case RecoveryOp::COMPLETE:
739 default: {
740 ceph_abort();
741 };
742 }
743 }
744}
745
746void ECBackend::run_recovery_op(
747 RecoveryHandle *_h,
748 int priority)
749{
750 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
751 RecoveryMessages m;
752 for (list<RecoveryOp>::iterator i = h->ops.begin();
753 i != h->ops.end();
754 ++i) {
755 dout(10) << __func__ << ": starting " << *i << dendl;
11fdf7f2 756 ceph_assert(!recovery_ops.count(i->hoid));
7c673cae
FG
757 RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
758 continue_recovery_op(op, &m);
759 }
c07f9fc5 760
7c673cae 761 dispatch_recovery_messages(m, priority);
c07f9fc5 762 send_recovery_deletes(priority, h->deletes);
7c673cae
FG
763 delete _h;
764}
765
224ce89b 766int ECBackend::recover_object(
7c673cae
FG
767 const hobject_t &hoid,
768 eversion_t v,
769 ObjectContextRef head,
770 ObjectContextRef obc,
771 RecoveryHandle *_h)
772{
773 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
774 h->ops.push_back(RecoveryOp());
775 h->ops.back().v = v;
776 h->ops.back().hoid = hoid;
777 h->ops.back().obc = obc;
778 h->ops.back().recovery_info.soid = hoid;
779 h->ops.back().recovery_info.version = v;
780 if (obc) {
781 h->ops.back().recovery_info.size = obc->obs.oi.size;
782 h->ops.back().recovery_info.oi = obc->obs.oi;
783 }
784 if (hoid.is_snap()) {
785 if (obc) {
11fdf7f2 786 ceph_assert(obc->ssc);
7c673cae
FG
787 h->ops.back().recovery_info.ss = obc->ssc->snapset;
788 } else if (head) {
11fdf7f2 789 ceph_assert(head->ssc);
7c673cae
FG
790 h->ops.back().recovery_info.ss = head->ssc->snapset;
791 } else {
11fdf7f2 792 ceph_abort_msg("neither obc nor head set for a snap object");
7c673cae
FG
793 }
794 }
795 h->ops.back().recovery_progress.omap_complete = true;
796 for (set<pg_shard_t>::const_iterator i =
11fdf7f2
TL
797 get_parent()->get_acting_recovery_backfill_shards().begin();
798 i != get_parent()->get_acting_recovery_backfill_shards().end();
7c673cae
FG
799 ++i) {
800 dout(10) << "checking " << *i << dendl;
801 if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
802 h->ops.back().missing_on.insert(*i);
803 h->ops.back().missing_on_shards.insert(i->shard);
804 }
805 }
806 dout(10) << __func__ << ": built op " << h->ops.back() << dendl;
224ce89b 807 return 0;
7c673cae
FG
808}
809
810bool ECBackend::can_handle_while_inactive(
811 OpRequestRef _op)
812{
813 return false;
814}
815
c07f9fc5 816bool ECBackend::_handle_message(
7c673cae
FG
817 OpRequestRef _op)
818{
819 dout(10) << __func__ << ": " << *_op->get_req() << dendl;
820 int priority = _op->get_req()->get_priority();
821 switch (_op->get_req()->get_type()) {
822 case MSG_OSD_EC_WRITE: {
823 // NOTE: this is non-const because handle_sub_write modifies the embedded
824 // ObjectStore::Transaction in place (and then std::move's it). It does
825 // not conflict with ECSubWrite's operator<<.
826 MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
827 _op->get_nonconst_req());
28e407b8 828 parent->maybe_preempt_replica_scrub(op->op.soid);
7c673cae
FG
829 handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
830 return true;
831 }
832 case MSG_OSD_EC_WRITE_REPLY: {
833 const MOSDECSubOpWriteReply *op = static_cast<const MOSDECSubOpWriteReply*>(
834 _op->get_req());
835 handle_sub_write_reply(op->op.from, op->op, _op->pg_trace);
836 return true;
837 }
838 case MSG_OSD_EC_READ: {
9f95a23c 839 auto op = _op->get_req<MOSDECSubOpRead>();
7c673cae
FG
840 MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
841 reply->pgid = get_parent()->primary_spg_t();
11fdf7f2 842 reply->map_epoch = get_osdmap_epoch();
7c673cae
FG
843 reply->min_epoch = get_parent()->get_interval_start_epoch();
844 handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace);
845 reply->trace = _op->pg_trace;
846 get_parent()->send_message_osd_cluster(
9f95a23c 847 reply, _op->get_req()->get_connection());
7c673cae
FG
848 return true;
849 }
850 case MSG_OSD_EC_READ_REPLY: {
851 // NOTE: this is non-const because handle_sub_read_reply steals resulting
852 // buffers. It does not conflict with ECSubReadReply operator<<.
853 MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
854 _op->get_nonconst_req());
855 RecoveryMessages rm;
856 handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
857 dispatch_recovery_messages(rm, priority);
858 return true;
859 }
860 case MSG_OSD_PG_PUSH: {
9f95a23c 861 auto op = _op->get_req<MOSDPGPush>();
7c673cae
FG
862 RecoveryMessages rm;
863 for (vector<PushOp>::const_iterator i = op->pushes.begin();
864 i != op->pushes.end();
865 ++i) {
11fdf7f2 866 handle_recovery_push(*i, &rm, op->is_repair);
7c673cae
FG
867 }
868 dispatch_recovery_messages(rm, priority);
869 return true;
870 }
871 case MSG_OSD_PG_PUSH_REPLY: {
872 const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
873 _op->get_req());
874 RecoveryMessages rm;
875 for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
876 i != op->replies.end();
877 ++i) {
878 handle_recovery_push_reply(*i, op->from, &rm);
879 }
880 dispatch_recovery_messages(rm, priority);
881 return true;
882 }
883 default:
884 return false;
885 }
886 return false;
887}
888
889struct SubWriteCommitted : public Context {
890 ECBackend *pg;
891 OpRequestRef msg;
892 ceph_tid_t tid;
893 eversion_t version;
894 eversion_t last_complete;
895 const ZTracer::Trace trace;
896 SubWriteCommitted(
897 ECBackend *pg,
898 OpRequestRef msg,
899 ceph_tid_t tid,
900 eversion_t version,
901 eversion_t last_complete,
902 const ZTracer::Trace &trace)
903 : pg(pg), msg(msg), tid(tid),
904 version(version), last_complete(last_complete), trace(trace) {}
905 void finish(int) override {
906 if (msg)
907 msg->mark_event("sub_op_committed");
908 pg->sub_write_committed(tid, version, last_complete, trace);
909 }
910};
911void ECBackend::sub_write_committed(
912 ceph_tid_t tid, eversion_t version, eversion_t last_complete,
913 const ZTracer::Trace &trace) {
914 if (get_parent()->pgb_is_primary()) {
915 ECSubWriteReply reply;
916 reply.tid = tid;
917 reply.last_complete = last_complete;
918 reply.committed = true;
11fdf7f2 919 reply.applied = true;
7c673cae
FG
920 reply.from = get_parent()->whoami_shard();
921 handle_sub_write_reply(
922 get_parent()->whoami_shard(),
923 reply, trace);
924 } else {
925 get_parent()->update_last_complete_ondisk(last_complete);
926 MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
927 r->pgid = get_parent()->primary_spg_t();
11fdf7f2 928 r->map_epoch = get_osdmap_epoch();
7c673cae
FG
929 r->min_epoch = get_parent()->get_interval_start_epoch();
930 r->op.tid = tid;
931 r->op.last_complete = last_complete;
932 r->op.committed = true;
11fdf7f2 933 r->op.applied = true;
7c673cae
FG
934 r->op.from = get_parent()->whoami_shard();
935 r->set_priority(CEPH_MSG_PRIO_HIGH);
936 r->trace = trace;
937 r->trace.event("sending sub op commit");
938 get_parent()->send_message_osd_cluster(
11fdf7f2 939 get_parent()->primary_shard().osd, r, get_osdmap_epoch());
7c673cae
FG
940 }
941}
942
943void ECBackend::handle_sub_write(
944 pg_shard_t from,
945 OpRequestRef msg,
946 ECSubWrite &op,
11fdf7f2 947 const ZTracer::Trace &trace)
7c673cae
FG
948{
949 if (msg)
11fdf7f2 950 msg->mark_event("sub_op_started");
7c673cae 951 trace.event("handle_sub_write");
f67539c2
TL
952#ifdef HAVE_JAEGER
953 if (msg->osd_parent_span) {
954 auto ec_sub_trans = jaeger_tracing::child_span(__func__, msg->osd_parent_span);
955 }
956#endif
7c673cae
FG
957 if (!get_parent()->pgb_is_primary())
958 get_parent()->update_stats(op.stats);
959 ObjectStore::Transaction localt;
960 if (!op.temp_added.empty()) {
961 add_temp_objs(op.temp_added);
962 }
11fdf7f2 963 if (op.backfill_or_async_recovery) {
7c673cae
FG
964 for (set<hobject_t>::iterator i = op.temp_removed.begin();
965 i != op.temp_removed.end();
966 ++i) {
967 dout(10) << __func__ << ": removing object " << *i
968 << " since we won't get the transaction" << dendl;
969 localt.remove(
970 coll,
971 ghobject_t(
972 *i,
973 ghobject_t::NO_GEN,
974 get_parent()->whoami_shard().shard));
975 }
976 }
977 clear_temp_objs(op.temp_removed);
11fdf7f2
TL
978 dout(30) << __func__ << " missing before " << get_parent()->get_log().get_missing().get_items() << dendl;
979 // flag set to true during async recovery
980 bool async = false;
981 pg_missing_tracker_t pmissing = get_parent()->get_local_missing();
982 if (pmissing.is_missing(op.soid)) {
983 async = true;
984 dout(30) << __func__ << " is_missing " << pmissing.is_missing(op.soid) << dendl;
985 for (auto &&e: op.log_entries) {
986 dout(30) << " add_next_event entry " << e << dendl;
987 get_parent()->add_local_next_event(e);
988 dout(30) << " entry is_delete " << e.is_delete() << dendl;
989 }
990 }
7c673cae 991 get_parent()->log_operation(
f67539c2 992 std::move(op.log_entries),
7c673cae
FG
993 op.updated_hit_set_history,
994 op.trim_to,
995 op.roll_forward_to,
9f95a23c 996 op.roll_forward_to,
11fdf7f2
TL
997 !op.backfill_or_async_recovery,
998 localt,
999 async);
7c673cae 1000
11fdf7f2
TL
1001 if (!get_parent()->pg_is_undersized() &&
1002 (unsigned)get_parent()->whoami_shard().shard >=
1003 ec_impl->get_data_chunk_count())
7c673cae
FG
1004 op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1005
7c673cae
FG
1006 localt.register_on_commit(
1007 get_parent()->bless_context(
1008 new SubWriteCommitted(
1009 this, msg, op.tid,
1010 op.at_version,
1011 get_parent()->get_info().last_complete, trace)));
7c673cae
FG
1012 vector<ObjectStore::Transaction> tls;
1013 tls.reserve(2);
1014 tls.push_back(std::move(op.t));
1015 tls.push_back(std::move(localt));
1016 get_parent()->queue_transactions(tls, msg);
11fdf7f2
TL
1017 dout(30) << __func__ << " missing after" << get_parent()->get_log().get_missing().get_items() << dendl;
1018 if (op.at_version != eversion_t()) {
1019 // dummy rollforward transaction doesn't get at_version (and doesn't advance it)
1020 get_parent()->op_applied(op.at_version);
1021 }
7c673cae
FG
1022}
1023
1024void ECBackend::handle_sub_read(
1025 pg_shard_t from,
1026 const ECSubRead &op,
1027 ECSubReadReply *reply,
1028 const ZTracer::Trace &trace)
1029{
1030 trace.event("handle sub read");
1031 shard_id_t shard = get_parent()->whoami_shard().shard;
1032 for(auto i = op.to_read.begin();
1033 i != op.to_read.end();
1034 ++i) {
1035 int r = 0;
7c673cae
FG
1036 for (auto j = i->second.begin(); j != i->second.end(); ++j) {
1037 bufferlist bl;
11fdf7f2
TL
1038 if ((op.subchunks.find(i->first)->second.size() == 1) &&
1039 (op.subchunks.find(i->first)->second.front().second ==
1040 ec_impl->get_sub_chunk_count())) {
1041 dout(25) << __func__ << " case1: reading the complete chunk/shard." << dendl;
1042 r = store->read(
1043 ch,
1044 ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1045 j->get<0>(),
1046 j->get<1>(),
1047 bl, j->get<2>()); // Allow EIO return
1048 } else {
1049 dout(25) << __func__ << " case2: going to do fragmented read." << dendl;
1050 int subchunk_size =
1051 sinfo.get_chunk_size() / ec_impl->get_sub_chunk_count();
1052 bool error = false;
1053 for (int m = 0; m < (int)j->get<1>() && !error;
1054 m += sinfo.get_chunk_size()) {
1055 for (auto &&k:op.subchunks.find(i->first)->second) {
1056 bufferlist bl0;
1057 r = store->read(
1058 ch,
1059 ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1060 j->get<0>() + m + (k.first)*subchunk_size,
1061 (k.second)*subchunk_size,
1062 bl0, j->get<2>());
1063 if (r < 0) {
1064 error = true;
1065 break;
1066 }
1067 bl.claim_append(bl0);
1068 }
1069 }
1070 }
1071
7c673cae 1072 if (r < 0) {
11fdf7f2
TL
1073 // if we are doing fast reads, it's possible for one of the shard
1074 // reads to cross paths with another update and get a (harmless)
1075 // ENOENT. Suppress the message to the cluster log in that case.
1076 if (r == -ENOENT && get_parent()->get_pool().fast_read) {
1077 dout(5) << __func__ << ": Error " << r
1078 << " reading " << i->first << ", fast read, probably ok"
1079 << dendl;
1080 } else {
1081 get_parent()->clog_error() << "Error " << r
1082 << " reading object "
1083 << i->first;
1084 dout(5) << __func__ << ": Error " << r
1085 << " reading " << i->first << dendl;
1086 }
7c673cae
FG
1087 goto error;
1088 } else {
1089 dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
1090 reply->buffers_read[i->first].push_back(
1091 make_pair(
1092 j->get<0>(),
1093 bl)
1094 );
1095 }
1096
1097 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1098 // This shows that we still need deep scrub because large enough files
1099 // are read in sections, so the digest check here won't be done here.
1100 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1101 // the state of our chunk in case other chunks could substitute.
11fdf7f2
TL
1102 ECUtil::HashInfoRef hinfo;
1103 hinfo = get_hash_info(i->first);
1104 if (!hinfo) {
1105 r = -EIO;
1106 get_parent()->clog_error() << "Corruption detected: object "
1107 << i->first
1108 << " is missing hash_info";
1109 dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
1110 goto error;
1111 }
1112 ceph_assert(hinfo->has_chunk_hash());
7c673cae
FG
1113 if ((bl.length() == hinfo->get_total_chunk_size()) &&
1114 (j->get<0>() == 0)) {
1115 dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
1116 bufferhash h(-1);
1117 h << bl;
1118 if (h.digest() != hinfo->get_chunk_hash(shard)) {
c07f9fc5 1119 get_parent()->clog_error() << "Bad hash for " << i->first << " digest 0x"
7c673cae
FG
1120 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec;
1121 dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
1122 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
1123 r = -EIO;
1124 goto error;
1125 }
1126 }
1127 }
1128 }
1129 continue;
1130error:
1131 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1132 // the state of our chunk in case other chunks could substitute.
1133 reply->buffers_read.erase(i->first);
1134 reply->errors[i->first] = r;
1135 }
1136 for (set<hobject_t>::iterator i = op.attrs_to_read.begin();
1137 i != op.attrs_to_read.end();
1138 ++i) {
1139 dout(10) << __func__ << ": fulfilling attr request on "
1140 << *i << dendl;
1141 if (reply->errors.count(*i))
1142 continue;
1143 int r = store->getattrs(
1144 ch,
1145 ghobject_t(
11fdf7f2 1146 *i, ghobject_t::NO_GEN, shard),
7c673cae
FG
1147 reply->attrs_read[*i]);
1148 if (r < 0) {
91327a77
AA
1149 // If we read error, we should not return the attrs too.
1150 reply->attrs_read.erase(*i);
7c673cae
FG
1151 reply->buffers_read.erase(*i);
1152 reply->errors[*i] = r;
1153 }
1154 }
1155 reply->from = get_parent()->whoami_shard();
1156 reply->tid = op.tid;
1157}
1158
1159void ECBackend::handle_sub_write_reply(
1160 pg_shard_t from,
1161 const ECSubWriteReply &op,
1162 const ZTracer::Trace &trace)
1163{
1164 map<ceph_tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
11fdf7f2 1165 ceph_assert(i != tid_to_op_map.end());
7c673cae
FG
1166 if (op.committed) {
1167 trace.event("sub write committed");
11fdf7f2 1168 ceph_assert(i->second.pending_commit.count(from));
7c673cae
FG
1169 i->second.pending_commit.erase(from);
1170 if (from != get_parent()->whoami_shard()) {
1171 get_parent()->update_peer_last_complete_ondisk(from, op.last_complete);
1172 }
1173 }
1174 if (op.applied) {
1175 trace.event("sub write applied");
11fdf7f2 1176 ceph_assert(i->second.pending_apply.count(from));
7c673cae
FG
1177 i->second.pending_apply.erase(from);
1178 }
1179
11fdf7f2
TL
1180 if (i->second.pending_commit.empty() &&
1181 i->second.on_all_commit &&
1182 // also wait for apply, to preserve ordering with luminous peers.
1183 i->second.pending_apply.empty()) {
7c673cae
FG
1184 dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
1185 i->second.on_all_commit->complete(0);
1186 i->second.on_all_commit = 0;
1187 i->second.trace.event("ec write all committed");
1188 }
1189 check_ops();
1190}
1191
1192void ECBackend::handle_sub_read_reply(
1193 pg_shard_t from,
1194 ECSubReadReply &op,
1195 RecoveryMessages *m,
1196 const ZTracer::Trace &trace)
1197{
1198 trace.event("ec sub read reply");
1199 dout(10) << __func__ << ": reply " << op << dendl;
1200 map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
1201 if (iter == tid_to_read_map.end()) {
1202 //canceled
1203 dout(20) << __func__ << ": dropped " << op << dendl;
1204 return;
1205 }
1206 ReadOp &rop = iter->second;
1207 for (auto i = op.buffers_read.begin();
1208 i != op.buffers_read.end();
1209 ++i) {
11fdf7f2 1210 ceph_assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer
7c673cae
FG
1211 if (!rop.to_read.count(i->first)) {
1212 // We canceled this read! @see filter_read_op
1213 dout(20) << __func__ << " to_read skipping" << dendl;
1214 continue;
1215 }
1216 list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter =
1217 rop.to_read.find(i->first)->second.to_read.begin();
1218 list<
1219 boost::tuple<
1220 uint64_t, uint64_t, map<pg_shard_t, bufferlist> > >::iterator riter =
1221 rop.complete[i->first].returned.begin();
1222 for (list<pair<uint64_t, bufferlist> >::iterator j = i->second.begin();
1223 j != i->second.end();
1224 ++j, ++req_iter, ++riter) {
11fdf7f2
TL
1225 ceph_assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
1226 ceph_assert(riter != rop.complete[i->first].returned.end());
7c673cae
FG
1227 pair<uint64_t, uint64_t> adjusted =
1228 sinfo.aligned_offset_len_to_chunk(
1229 make_pair(req_iter->get<0>(), req_iter->get<1>()));
11fdf7f2 1230 ceph_assert(adjusted.first == j->first);
f67539c2 1231 riter->get<2>()[from] = std::move(j->second);
7c673cae
FG
1232 }
1233 }
1234 for (auto i = op.attrs_read.begin();
1235 i != op.attrs_read.end();
1236 ++i) {
11fdf7f2 1237 ceph_assert(!op.errors.count(i->first)); // if read error better not have sent an attribute
7c673cae
FG
1238 if (!rop.to_read.count(i->first)) {
1239 // We canceled this read! @see filter_read_op
1240 dout(20) << __func__ << " to_read skipping" << dendl;
1241 continue;
1242 }
1243 rop.complete[i->first].attrs = map<string, bufferlist>();
1244 (*(rop.complete[i->first].attrs)).swap(i->second);
1245 }
1246 for (auto i = op.errors.begin();
1247 i != op.errors.end();
1248 ++i) {
1249 rop.complete[i->first].errors.insert(
1250 make_pair(
1251 from,
1252 i->second));
1253 dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl;
1254 }
1255
1256 map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
1257 shard_to_read_map.find(from);
11fdf7f2
TL
1258 ceph_assert(siter != shard_to_read_map.end());
1259 ceph_assert(siter->second.count(op.tid));
7c673cae
FG
1260 siter->second.erase(op.tid);
1261
11fdf7f2 1262 ceph_assert(rop.in_progress.count(from));
7c673cae
FG
1263 rop.in_progress.erase(from);
1264 unsigned is_complete = 0;
f67539c2 1265 bool need_resend = false;
7c673cae
FG
1266 // For redundant reads check for completion as each shard comes in,
1267 // or in a non-recovery read check for completion once all the shards read.
b32b8144 1268 if (rop.do_redundant_reads || rop.in_progress.empty()) {
7c673cae
FG
1269 for (map<hobject_t, read_result_t>::const_iterator iter =
1270 rop.complete.begin();
1271 iter != rop.complete.end();
1272 ++iter) {
1273 set<int> have;
1274 for (map<pg_shard_t, bufferlist>::const_iterator j =
1275 iter->second.returned.front().get<2>().begin();
1276 j != iter->second.returned.front().get<2>().end();
1277 ++j) {
1278 have.insert(j->first.shard);
1279 dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
1280 }
11fdf7f2 1281 map<int, vector<pair<int, int>>> dummy_minimum;
7c673cae 1282 int err;
28e407b8 1283 if ((err = ec_impl->minimum_to_decode(rop.want_to_read[iter->first], have, &dummy_minimum)) < 0) {
7c673cae
FG
1284 dout(20) << __func__ << " minimum_to_decode failed" << dendl;
1285 if (rop.in_progress.empty()) {
11fdf7f2
TL
1286 // If we don't have enough copies, try other pg_shard_ts if available.
1287 // During recovery there may be multiple osds with copies of the same shard,
1288 // so getting EIO from one may result in multiple passes through this code path.
7c673cae
FG
1289 if (!rop.do_redundant_reads) {
1290 int r = send_all_remaining_reads(iter->first, rop);
1291 if (r == 0) {
f67539c2
TL
1292 // We changed the rop's to_read and not incrementing is_complete
1293 need_resend = true;
7c673cae
FG
1294 continue;
1295 }
1296 // Couldn't read any additional shards so handle as completed with errors
1297 }
c07f9fc5
FG
1298 // We don't want to confuse clients / RBD with objectstore error
1299 // values in particular ENOENT. We may have different error returns
1300 // from different shards, so we'll return minimum_to_decode() error
1301 // (usually EIO) to reader. It is likely an error here is due to a
1302 // damaged pg.
7c673cae
FG
1303 rop.complete[iter->first].r = err;
1304 ++is_complete;
1305 }
1306 } else {
11fdf7f2 1307 ceph_assert(rop.complete[iter->first].r == 0);
7c673cae
FG
1308 if (!rop.complete[iter->first].errors.empty()) {
1309 if (cct->_conf->osd_read_ec_check_for_errors) {
1310 dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl;
1311 err = rop.complete[iter->first].errors.begin()->second;
1312 rop.complete[iter->first].r = err;
1313 } else {
c07f9fc5 1314 get_parent()->clog_warn() << "Error(s) ignored for "
7c673cae
FG
1315 << iter->first << " enough copies available";
1316 dout(10) << __func__ << " Error(s) ignored for " << iter->first
1317 << " enough copies available" << dendl;
1318 rop.complete[iter->first].errors.clear();
1319 }
1320 }
f67539c2
TL
1321 // avoid re-read for completed object as we may send remaining reads for uncopmpleted objects
1322 rop.to_read.at(iter->first).need.clear();
1323 rop.to_read.at(iter->first).want_attrs = false;
7c673cae
FG
1324 ++is_complete;
1325 }
1326 }
1327 }
f67539c2
TL
1328 if (need_resend) {
1329 do_read_op(rop);
1330 } else if (rop.in_progress.empty() ||
1331 is_complete == rop.complete.size()) {
7c673cae
FG
1332 dout(20) << __func__ << " Complete: " << rop << dendl;
1333 rop.trace.event("ec read complete");
1334 complete_read_op(rop, m);
1335 } else {
1336 dout(10) << __func__ << " readop not complete: " << rop << dendl;
1337 }
1338}
1339
1340void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
1341{
1342 map<hobject_t, read_request_t>::iterator reqiter =
1343 rop.to_read.begin();
1344 map<hobject_t, read_result_t>::iterator resiter =
1345 rop.complete.begin();
11fdf7f2 1346 ceph_assert(rop.to_read.size() == rop.complete.size());
7c673cae
FG
1347 for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
1348 if (reqiter->second.cb) {
1349 pair<RecoveryMessages *, read_result_t &> arg(
1350 m, resiter->second);
1351 reqiter->second.cb->complete(arg);
11fdf7f2 1352 reqiter->second.cb = nullptr;
7c673cae
FG
1353 }
1354 }
11fdf7f2
TL
1355 // if the read op is over. clean all the data of this tid.
1356 for (set<pg_shard_t>::iterator iter = rop.in_progress.begin();
1357 iter != rop.in_progress.end();
1358 iter++) {
1359 shard_to_read_map[*iter].erase(rop.tid);
1360 }
1361 rop.in_progress.clear();
7c673cae
FG
1362 tid_to_read_map.erase(rop.tid);
1363}
1364
1365struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
1366 ECBackend *ec;
1367 ceph_tid_t tid;
1368 FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
1369 void finish(ThreadPool::TPHandle &handle) override {
1370 auto ropiter = ec->tid_to_read_map.find(tid);
11fdf7f2 1371 ceph_assert(ropiter != ec->tid_to_read_map.end());
7c673cae
FG
1372 int priority = ropiter->second.priority;
1373 RecoveryMessages rm;
1374 ec->complete_read_op(ropiter->second, &rm);
1375 ec->dispatch_recovery_messages(rm, priority);
1376 }
1377};
1378
1379void ECBackend::filter_read_op(
1380 const OSDMapRef& osdmap,
1381 ReadOp &op)
1382{
1383 set<hobject_t> to_cancel;
1384 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1385 i != op.source_to_obj.end();
1386 ++i) {
1387 if (osdmap->is_down(i->first.osd)) {
1388 to_cancel.insert(i->second.begin(), i->second.end());
1389 op.in_progress.erase(i->first);
1390 continue;
1391 }
1392 }
1393
1394 if (to_cancel.empty())
1395 return;
1396
1397 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1398 i != op.source_to_obj.end();
1399 ) {
1400 for (set<hobject_t>::iterator j = i->second.begin();
1401 j != i->second.end();
1402 ) {
1403 if (to_cancel.count(*j))
1404 i->second.erase(j++);
1405 else
1406 ++j;
1407 }
1408 if (i->second.empty()) {
1409 op.source_to_obj.erase(i++);
1410 } else {
11fdf7f2 1411 ceph_assert(!osdmap->is_down(i->first.osd));
7c673cae
FG
1412 ++i;
1413 }
1414 }
1415
1416 for (set<hobject_t>::iterator i = to_cancel.begin();
1417 i != to_cancel.end();
1418 ++i) {
1419 get_parent()->cancel_pull(*i);
1420
11fdf7f2 1421 ceph_assert(op.to_read.count(*i));
7c673cae
FG
1422 read_request_t &req = op.to_read.find(*i)->second;
1423 dout(10) << __func__ << ": canceling " << req
1424 << " for obj " << *i << dendl;
11fdf7f2 1425 ceph_assert(req.cb);
7c673cae 1426 delete req.cb;
11fdf7f2 1427 req.cb = nullptr;
7c673cae
FG
1428
1429 op.to_read.erase(*i);
1430 op.complete.erase(*i);
1431 recovery_ops.erase(*i);
1432 }
1433
1434 if (op.in_progress.empty()) {
1435 get_parent()->schedule_recovery_work(
11fdf7f2 1436 get_parent()->bless_unlocked_gencontext(
7c673cae
FG
1437 new FinishReadOp(this, op.tid)));
1438 }
1439}
1440
1441void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
1442{
1443 set<ceph_tid_t> tids_to_filter;
1444 for (map<pg_shard_t, set<ceph_tid_t> >::iterator
1445 i = shard_to_read_map.begin();
1446 i != shard_to_read_map.end();
1447 ) {
1448 if (osdmap->is_down(i->first.osd)) {
1449 tids_to_filter.insert(i->second.begin(), i->second.end());
1450 shard_to_read_map.erase(i++);
1451 } else {
1452 ++i;
1453 }
1454 }
1455 for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
1456 i != tids_to_filter.end();
1457 ++i) {
1458 map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
11fdf7f2 1459 ceph_assert(j != tid_to_read_map.end());
7c673cae
FG
1460 filter_read_op(osdmap, j->second);
1461 }
1462}
1463
1464void ECBackend::on_change()
1465{
1466 dout(10) << __func__ << dendl;
1467
1468 completed_to = eversion_t();
1469 committed_to = eversion_t();
1470 pipeline_state.clear();
1471 waiting_reads.clear();
1472 waiting_state.clear();
1473 waiting_commit.clear();
1474 for (auto &&op: tid_to_op_map) {
1475 cache.release_write_pin(op.second.pin);
1476 }
1477 tid_to_op_map.clear();
1478
1479 for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
1480 i != tid_to_read_map.end();
1481 ++i) {
1482 dout(10) << __func__ << ": cancelling " << i->second << dendl;
1483 for (map<hobject_t, read_request_t>::iterator j =
1484 i->second.to_read.begin();
1485 j != i->second.to_read.end();
1486 ++j) {
1487 delete j->second.cb;
11fdf7f2 1488 j->second.cb = nullptr;
7c673cae
FG
1489 }
1490 }
1491 tid_to_read_map.clear();
1492 in_progress_client_reads.clear();
1493 shard_to_read_map.clear();
1494 clear_recovery_state();
1495}
1496
1497void ECBackend::clear_recovery_state()
1498{
1499 recovery_ops.clear();
1500}
1501
7c673cae
FG
1502void ECBackend::dump_recovery_info(Formatter *f) const
1503{
1504 f->open_array_section("recovery_ops");
1505 for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
1506 i != recovery_ops.end();
1507 ++i) {
1508 f->open_object_section("op");
1509 i->second.dump(f);
1510 f->close_section();
1511 }
1512 f->close_section();
1513 f->open_array_section("read_ops");
1514 for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
1515 i != tid_to_read_map.end();
1516 ++i) {
1517 f->open_object_section("read_op");
1518 i->second.dump(f);
1519 f->close_section();
1520 }
1521 f->close_section();
1522}
1523
1524void ECBackend::submit_transaction(
1525 const hobject_t &hoid,
1526 const object_stat_sum_t &delta_stats,
1527 const eversion_t &at_version,
1528 PGTransactionUPtr &&t,
1529 const eversion_t &trim_to,
9f95a23c 1530 const eversion_t &min_last_complete_ondisk,
f67539c2 1531 vector<pg_log_entry_t>&& log_entries,
9f95a23c 1532 std::optional<pg_hit_set_history_t> &hset_history,
7c673cae
FG
1533 Context *on_all_commit,
1534 ceph_tid_t tid,
1535 osd_reqid_t reqid,
1536 OpRequestRef client_op
1537 )
1538{
11fdf7f2 1539 ceph_assert(!tid_to_op_map.count(tid));
7c673cae
FG
1540 Op *op = &(tid_to_op_map[tid]);
1541 op->hoid = hoid;
1542 op->delta_stats = delta_stats;
1543 op->version = at_version;
1544 op->trim_to = trim_to;
9f95a23c 1545 op->roll_forward_to = std::max(min_last_complete_ondisk, committed_to);
7c673cae
FG
1546 op->log_entries = log_entries;
1547 std::swap(op->updated_hit_set_history, hset_history);
7c673cae
FG
1548 op->on_all_commit = on_all_commit;
1549 op->tid = tid;
1550 op->reqid = reqid;
1551 op->client_op = client_op;
1552 if (client_op)
1553 op->trace = client_op->pg_trace;
f67539c2
TL
1554
1555#ifdef HAVE_JAEGER
1556 if (client_op->osd_parent_span) {
1557 auto ec_sub_trans = jaeger_tracing::child_span("ECBackend::submit_transaction", client_op->osd_parent_span);
1558 }
1559#endif
7c673cae
FG
1560 dout(10) << __func__ << ": op " << *op << " starting" << dendl;
1561 start_rmw(op, std::move(t));
7c673cae
FG
1562}
1563
1564void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
1565 if (!waiting_state.empty()) {
1566 waiting_state.back().on_write.emplace_back(std::move(cb));
1567 } else if (!waiting_reads.empty()) {
1568 waiting_reads.back().on_write.emplace_back(std::move(cb));
1569 } else {
1570 // Nothing earlier in the pipeline, just call it
1571 cb();
1572 }
1573}
1574
b32b8144 1575void ECBackend::get_all_avail_shards(
7c673cae 1576 const hobject_t &hoid,
28e407b8 1577 const set<pg_shard_t> &error_shards,
b32b8144
FG
1578 set<int> &have,
1579 map<shard_id_t, pg_shard_t> &shards,
1580 bool for_recovery)
7c673cae 1581{
7c673cae
FG
1582 for (set<pg_shard_t>::const_iterator i =
1583 get_parent()->get_acting_shards().begin();
1584 i != get_parent()->get_acting_shards().end();
1585 ++i) {
1586 dout(10) << __func__ << ": checking acting " << *i << dendl;
1587 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
28e407b8
AA
1588 if (error_shards.find(*i) != error_shards.end())
1589 continue;
7c673cae 1590 if (!missing.is_missing(hoid)) {
11fdf7f2 1591 ceph_assert(!have.count(i->shard));
7c673cae 1592 have.insert(i->shard);
11fdf7f2 1593 ceph_assert(!shards.count(i->shard));
7c673cae
FG
1594 shards.insert(make_pair(i->shard, *i));
1595 }
1596 }
1597
1598 if (for_recovery) {
1599 for (set<pg_shard_t>::const_iterator i =
1600 get_parent()->get_backfill_shards().begin();
1601 i != get_parent()->get_backfill_shards().end();
1602 ++i) {
28e407b8
AA
1603 if (error_shards.find(*i) != error_shards.end())
1604 continue;
7c673cae 1605 if (have.count(i->shard)) {
11fdf7f2 1606 ceph_assert(shards.count(i->shard));
7c673cae
FG
1607 continue;
1608 }
1609 dout(10) << __func__ << ": checking backfill " << *i << dendl;
11fdf7f2 1610 ceph_assert(!shards.count(i->shard));
7c673cae
FG
1611 const pg_info_t &info = get_parent()->get_shard_info(*i);
1612 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1613 if (hoid < info.last_backfill &&
1614 !missing.is_missing(hoid)) {
1615 have.insert(i->shard);
1616 shards.insert(make_pair(i->shard, *i));
1617 }
1618 }
1619
1620 map<hobject_t, set<pg_shard_t>>::const_iterator miter =
1621 get_parent()->get_missing_loc_shards().find(hoid);
1622 if (miter != get_parent()->get_missing_loc_shards().end()) {
1623 for (set<pg_shard_t>::iterator i = miter->second.begin();
1624 i != miter->second.end();
1625 ++i) {
1626 dout(10) << __func__ << ": checking missing_loc " << *i << dendl;
1627 auto m = get_parent()->maybe_get_shard_missing(*i);
1628 if (m) {
11fdf7f2 1629 ceph_assert(!(*m).is_missing(hoid));
7c673cae 1630 }
28e407b8
AA
1631 if (error_shards.find(*i) != error_shards.end())
1632 continue;
7c673cae
FG
1633 have.insert(i->shard);
1634 shards.insert(make_pair(i->shard, *i));
1635 }
1636 }
1637 }
b32b8144
FG
1638}
1639
1640int ECBackend::get_min_avail_to_read_shards(
1641 const hobject_t &hoid,
1642 const set<int> &want,
1643 bool for_recovery,
1644 bool do_redundant_reads,
11fdf7f2 1645 map<pg_shard_t, vector<pair<int, int>>> *to_read)
b32b8144
FG
1646{
1647 // Make sure we don't do redundant reads for recovery
11fdf7f2 1648 ceph_assert(!for_recovery || !do_redundant_reads);
b32b8144
FG
1649
1650 set<int> have;
1651 map<shard_id_t, pg_shard_t> shards;
28e407b8 1652 set<pg_shard_t> error_shards;
b32b8144 1653
28e407b8 1654 get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
7c673cae 1655
11fdf7f2 1656 map<int, vector<pair<int, int>>> need;
7c673cae
FG
1657 int r = ec_impl->minimum_to_decode(want, have, &need);
1658 if (r < 0)
1659 return r;
1660
1661 if (do_redundant_reads) {
11fdf7f2
TL
1662 vector<pair<int, int>> subchunks_list;
1663 subchunks_list.push_back(make_pair(0, ec_impl->get_sub_chunk_count()));
1664 for (auto &&i: have) {
1665 need[i] = subchunks_list;
1666 }
7c673cae
FG
1667 }
1668
1669 if (!to_read)
1670 return 0;
1671
11fdf7f2
TL
1672 for (auto &&i:need) {
1673 ceph_assert(shards.count(shard_id_t(i.first)));
1674 to_read->insert(make_pair(shards[shard_id_t(i.first)], i.second));
7c673cae
FG
1675 }
1676 return 0;
1677}
1678
1679int ECBackend::get_remaining_shards(
1680 const hobject_t &hoid,
1681 const set<int> &avail,
28e407b8
AA
1682 const set<int> &want,
1683 const read_result_t &result,
11fdf7f2 1684 map<pg_shard_t, vector<pair<int, int>>> *to_read,
b32b8144 1685 bool for_recovery)
7c673cae 1686{
11fdf7f2 1687 ceph_assert(to_read);
7c673cae 1688
b32b8144
FG
1689 set<int> have;
1690 map<shard_id_t, pg_shard_t> shards;
28e407b8
AA
1691 set<pg_shard_t> error_shards;
1692 for (auto &p : result.errors) {
1693 error_shards.insert(p.first);
1694 }
1695
1696 get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
1697
11fdf7f2 1698 map<int, vector<pair<int, int>>> need;
28e407b8
AA
1699 int r = ec_impl->minimum_to_decode(want, have, &need);
1700 if (r < 0) {
1701 dout(0) << __func__ << " not enough shards left to try for " << hoid
1702 << " read result was " << result << dendl;
1703 return -EIO;
1704 }
7c673cae 1705
28e407b8
AA
1706 set<int> shards_left;
1707 for (auto p : need) {
11fdf7f2
TL
1708 if (avail.find(p.first) == avail.end()) {
1709 shards_left.insert(p.first);
28e407b8
AA
1710 }
1711 }
7c673cae 1712
11fdf7f2
TL
1713 vector<pair<int, int>> subchunks;
1714 subchunks.push_back(make_pair(0, ec_impl->get_sub_chunk_count()));
28e407b8
AA
1715 for (set<int>::iterator i = shards_left.begin();
1716 i != shards_left.end();
7c673cae 1717 ++i) {
11fdf7f2
TL
1718 ceph_assert(shards.count(shard_id_t(*i)));
1719 ceph_assert(avail.find(*i) == avail.end());
1720 to_read->insert(make_pair(shards[shard_id_t(*i)], subchunks));
7c673cae
FG
1721 }
1722 return 0;
1723}
1724
1725void ECBackend::start_read_op(
1726 int priority,
28e407b8 1727 map<hobject_t, set<int>> &want_to_read,
7c673cae
FG
1728 map<hobject_t, read_request_t> &to_read,
1729 OpRequestRef _op,
1730 bool do_redundant_reads,
1731 bool for_recovery)
1732{
1733 ceph_tid_t tid = get_parent()->get_tid();
11fdf7f2 1734 ceph_assert(!tid_to_read_map.count(tid));
7c673cae
FG
1735 auto &op = tid_to_read_map.emplace(
1736 tid,
1737 ReadOp(
1738 priority,
1739 tid,
1740 do_redundant_reads,
1741 for_recovery,
1742 _op,
28e407b8 1743 std::move(want_to_read),
7c673cae
FG
1744 std::move(to_read))).first->second;
1745 dout(10) << __func__ << ": starting " << op << dendl;
1746 if (_op) {
1747 op.trace = _op->pg_trace;
1748 op.trace.event("start ec read");
1749 }
1750 do_read_op(op);
1751}
1752
1753void ECBackend::do_read_op(ReadOp &op)
1754{
1755 int priority = op.priority;
1756 ceph_tid_t tid = op.tid;
1757
1758 dout(10) << __func__ << ": starting read " << op << dendl;
1759
1760 map<pg_shard_t, ECSubRead> messages;
1761 for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
1762 i != op.to_read.end();
1763 ++i) {
1764 bool need_attrs = i->second.want_attrs;
11fdf7f2
TL
1765
1766 for (auto j = i->second.need.begin();
7c673cae
FG
1767 j != i->second.need.end();
1768 ++j) {
1769 if (need_attrs) {
11fdf7f2 1770 messages[j->first].attrs_to_read.insert(i->first);
7c673cae
FG
1771 need_attrs = false;
1772 }
11fdf7f2
TL
1773 messages[j->first].subchunks[i->first] = j->second;
1774 op.obj_to_source[i->first].insert(j->first);
1775 op.source_to_obj[j->first].insert(i->first);
7c673cae
FG
1776 }
1777 for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
1778 i->second.to_read.begin();
1779 j != i->second.to_read.end();
1780 ++j) {
1781 pair<uint64_t, uint64_t> chunk_off_len =
1782 sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
11fdf7f2 1783 for (auto k = i->second.need.begin();
7c673cae
FG
1784 k != i->second.need.end();
1785 ++k) {
11fdf7f2 1786 messages[k->first].to_read[i->first].push_back(
7c673cae
FG
1787 boost::make_tuple(
1788 chunk_off_len.first,
1789 chunk_off_len.second,
1790 j->get<2>()));
1791 }
11fdf7f2 1792 ceph_assert(!need_attrs);
7c673cae
FG
1793 }
1794 }
1795
9f95a23c
TL
1796 std::vector<std::pair<int, Message*>> m;
1797 m.reserve(messages.size());
7c673cae
FG
1798 for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
1799 i != messages.end();
1800 ++i) {
1801 op.in_progress.insert(i->first);
1802 shard_to_read_map[i->first].insert(op.tid);
1803 i->second.tid = tid;
1804 MOSDECSubOpRead *msg = new MOSDECSubOpRead;
1805 msg->set_priority(priority);
1806 msg->pgid = spg_t(
1807 get_parent()->whoami_spg_t().pgid,
1808 i->first.shard);
11fdf7f2 1809 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
1810 msg->min_epoch = get_parent()->get_interval_start_epoch();
1811 msg->op = i->second;
1812 msg->op.from = get_parent()->whoami_shard();
1813 msg->op.tid = tid;
1814 if (op.trace) {
1815 // initialize a child span for this shard
1816 msg->trace.init("ec sub read", nullptr, &op.trace);
1817 msg->trace.keyval("shard", i->first.shard.id);
1818 }
9f95a23c 1819 m.push_back(std::make_pair(i->first.osd, msg));
7c673cae 1820 }
9f95a23c
TL
1821 if (!m.empty()) {
1822 get_parent()->send_message_osd_cluster(m, get_osdmap_epoch());
1823 }
1824
7c673cae
FG
1825 dout(10) << __func__ << ": started " << op << dendl;
1826}
1827
1828ECUtil::HashInfoRef ECBackend::get_hash_info(
a4b75251 1829 const hobject_t &hoid, bool create, const map<string,bufferptr> *attrs)
7c673cae
FG
1830{
1831 dout(10) << __func__ << ": Getting attr on " << hoid << dendl;
1832 ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid);
1833 if (!ref) {
1834 dout(10) << __func__ << ": not in cache " << hoid << dendl;
1835 struct stat st;
1836 int r = store->stat(
1837 ch,
1838 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1839 &st);
1840 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
7c673cae
FG
1841 if (r >= 0) {
1842 dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl;
1843 bufferlist bl;
1844 if (attrs) {
1845 map<string, bufferptr>::const_iterator k = attrs->find(ECUtil::get_hinfo_key());
1846 if (k == attrs->end()) {
1847 dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl;
1848 } else {
1849 bl.push_back(k->second);
1850 }
1851 } else {
1852 r = store->getattr(
1853 ch,
1854 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1855 ECUtil::get_hinfo_key(),
1856 bl);
1857 if (r < 0) {
1858 dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl;
1859 bl.clear(); // just in case
1860 }
1861 }
1862 if (bl.length() > 0) {
11fdf7f2 1863 auto bp = bl.cbegin();
94b18763 1864 try {
11fdf7f2 1865 decode(hinfo, bp);
94b18763
FG
1866 } catch(...) {
1867 dout(0) << __func__ << ": Can't decode hinfo for " << hoid << dendl;
1868 return ECUtil::HashInfoRef();
1869 }
a4b75251 1870 if (hinfo.get_total_chunk_size() != (uint64_t)st.st_size) {
7c673cae
FG
1871 dout(0) << __func__ << ": Mismatch of total_chunk_size "
1872 << hinfo.get_total_chunk_size() << dendl;
1873 return ECUtil::HashInfoRef();
1874 }
1875 } else if (st.st_size > 0) { // If empty object and no hinfo, create it
1876 return ECUtil::HashInfoRef();
1877 }
a4b75251
TL
1878 } else if (r != -ENOENT || !create) {
1879 derr << __func__ << ": stat " << hoid << " failed: " << cpp_strerror(r)
1880 << dendl;
1881 return ECUtil::HashInfoRef();
7c673cae
FG
1882 }
1883 ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
1884 }
1885 return ref;
1886}
1887
1888void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
1889{
11fdf7f2 1890 ceph_assert(op);
7c673cae
FG
1891
1892 op->plan = ECTransaction::get_write_plan(
1893 sinfo,
1894 std::move(t),
1895 [&](const hobject_t &i) {
a4b75251 1896 ECUtil::HashInfoRef ref = get_hash_info(i, true);
7c673cae
FG
1897 if (!ref) {
1898 derr << __func__ << ": get_hash_info(" << i << ")"
1899 << " returned a null pointer and there is no "
1900 << " way to recover from such an error in this "
1901 << " context" << dendl;
1902 ceph_abort();
1903 }
1904 return ref;
1905 },
1906 get_parent()->get_dpp());
1907
1908 dout(10) << __func__ << ": " << *op << dendl;
1909
1910 waiting_state.push_back(*op);
1911 check_ops();
1912}
1913
1914bool ECBackend::try_state_to_reads()
1915{
1916 if (waiting_state.empty())
1917 return false;
1918
1919 Op *op = &(waiting_state.front());
1920 if (op->requires_rmw() && pipeline_state.cache_invalid()) {
11fdf7f2 1921 ceph_assert(get_parent()->get_pool().allows_ecoverwrites());
7c673cae
FG
1922 dout(20) << __func__ << ": blocking " << *op
1923 << " because it requires an rmw and the cache is invalid "
1924 << pipeline_state
1925 << dendl;
1926 return false;
1927 }
1928
11fdf7f2
TL
1929 if (!pipeline_state.caching_enabled()) {
1930 op->using_cache = false;
1931 } else if (op->invalidates_cache()) {
7c673cae
FG
1932 dout(20) << __func__ << ": invalidating cache after this op"
1933 << dendl;
1934 pipeline_state.invalidate();
7c673cae
FG
1935 }
1936
1937 waiting_state.pop_front();
1938 waiting_reads.push_back(*op);
1939
1940 if (op->using_cache) {
1941 cache.open_write_pin(op->pin);
1942
1943 extent_set empty;
1944 for (auto &&hpair: op->plan.will_write) {
1945 auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
1946 const extent_set &to_read_plan =
1947 to_read_plan_iter == op->plan.to_read.end() ?
1948 empty :
1949 to_read_plan_iter->second;
1950
1951 extent_set remote_read = cache.reserve_extents_for_rmw(
1952 hpair.first,
1953 op->pin,
1954 hpair.second,
1955 to_read_plan);
1956
1957 extent_set pending_read = to_read_plan;
1958 pending_read.subtract(remote_read);
1959
1960 if (!remote_read.empty()) {
1961 op->remote_read[hpair.first] = std::move(remote_read);
1962 }
1963 if (!pending_read.empty()) {
1964 op->pending_read[hpair.first] = std::move(pending_read);
1965 }
1966 }
1967 } else {
1968 op->remote_read = op->plan.to_read;
1969 }
1970
1971 dout(10) << __func__ << ": " << *op << dendl;
1972
1973 if (!op->remote_read.empty()) {
11fdf7f2 1974 ceph_assert(get_parent()->get_pool().allows_ecoverwrites());
7c673cae
FG
1975 objects_read_async_no_cache(
1976 op->remote_read,
1977 [this, op](map<hobject_t,pair<int, extent_map> > &&results) {
1978 for (auto &&i: results) {
1979 op->remote_read_result.emplace(i.first, i.second.second);
1980 }
1981 check_ops();
1982 });
1983 }
1984
1985 return true;
1986}
1987
1988bool ECBackend::try_reads_to_commit()
1989{
1990 if (waiting_reads.empty())
1991 return false;
1992 Op *op = &(waiting_reads.front());
1993 if (op->read_in_progress())
1994 return false;
1995 waiting_reads.pop_front();
1996 waiting_commit.push_back(*op);
1997
1998 dout(10) << __func__ << ": starting commit on " << *op << dendl;
1999 dout(20) << __func__ << ": " << cache << dendl;
2000
2001 get_parent()->apply_stats(
2002 op->hoid,
2003 op->delta_stats);
2004
2005 if (op->using_cache) {
2006 for (auto &&hpair: op->pending_read) {
2007 op->remote_read_result[hpair.first].insert(
2008 cache.get_remaining_extents_for_rmw(
2009 hpair.first,
2010 op->pin,
2011 hpair.second));
2012 }
2013 op->pending_read.clear();
2014 } else {
11fdf7f2 2015 ceph_assert(op->pending_read.empty());
7c673cae
FG
2016 }
2017
2018 map<shard_id_t, ObjectStore::Transaction> trans;
2019 for (set<pg_shard_t>::const_iterator i =
11fdf7f2
TL
2020 get_parent()->get_acting_recovery_backfill_shards().begin();
2021 i != get_parent()->get_acting_recovery_backfill_shards().end();
7c673cae
FG
2022 ++i) {
2023 trans[i->shard];
2024 }
2025
2026 op->trace.event("start ec write");
2027
2028 map<hobject_t,extent_map> written;
2029 if (op->plan.t) {
2030 ECTransaction::generate_transactions(
2031 op->plan,
2032 ec_impl,
2033 get_parent()->get_info().pgid.pgid,
7c673cae
FG
2034 sinfo,
2035 op->remote_read_result,
2036 op->log_entries,
2037 &written,
2038 &trans,
2039 &(op->temp_added),
2040 &(op->temp_cleared),
9f95a23c
TL
2041 get_parent()->get_dpp(),
2042 get_osdmap()->require_osd_release);
7c673cae
FG
2043 }
2044
2045 dout(20) << __func__ << ": " << cache << dendl;
2046 dout(20) << __func__ << ": written: " << written << dendl;
2047 dout(20) << __func__ << ": op: " << *op << dendl;
2048
2049 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2050 for (auto &&i: op->log_entries) {
2051 if (i.requires_kraken()) {
2052 derr << __func__ << ": log entry " << i << " requires kraken"
2053 << " but overwrites are not enabled!" << dendl;
2054 ceph_abort();
2055 }
2056 }
2057 }
2058
2059 map<hobject_t,extent_set> written_set;
2060 for (auto &&i: written) {
2061 written_set[i.first] = i.second.get_interval_set();
2062 }
2063 dout(20) << __func__ << ": written_set: " << written_set << dendl;
11fdf7f2 2064 ceph_assert(written_set == op->plan.will_write);
7c673cae
FG
2065
2066 if (op->using_cache) {
2067 for (auto &&hpair: written) {
2068 dout(20) << __func__ << ": " << hpair << dendl;
2069 cache.present_rmw_update(hpair.first, op->pin, hpair.second);
2070 }
2071 }
2072 op->remote_read.clear();
2073 op->remote_read_result.clear();
2074
7c673cae
FG
2075 ObjectStore::Transaction empty;
2076 bool should_write_local = false;
2077 ECSubWrite local_write_op;
9f95a23c
TL
2078 std::vector<std::pair<int, Message*>> messages;
2079 messages.reserve(get_parent()->get_acting_recovery_backfill_shards().size());
11fdf7f2 2080 set<pg_shard_t> backfill_shards = get_parent()->get_backfill_shards();
7c673cae 2081 for (set<pg_shard_t>::const_iterator i =
11fdf7f2
TL
2082 get_parent()->get_acting_recovery_backfill_shards().begin();
2083 i != get_parent()->get_acting_recovery_backfill_shards().end();
7c673cae
FG
2084 ++i) {
2085 op->pending_apply.insert(*i);
2086 op->pending_commit.insert(*i);
2087 map<shard_id_t, ObjectStore::Transaction>::iterator iter =
2088 trans.find(i->shard);
11fdf7f2 2089 ceph_assert(iter != trans.end());
7c673cae
FG
2090 bool should_send = get_parent()->should_send_op(*i, op->hoid);
2091 const pg_stat_t &stats =
11fdf7f2 2092 (should_send || !backfill_shards.count(*i)) ?
7c673cae
FG
2093 get_info().stats :
2094 parent->get_shard_info().find(*i)->second.stats;
2095
2096 ECSubWrite sop(
2097 get_parent()->whoami_shard(),
2098 op->tid,
2099 op->reqid,
2100 op->hoid,
2101 stats,
2102 should_send ? iter->second : empty,
2103 op->version,
2104 op->trim_to,
2105 op->roll_forward_to,
2106 op->log_entries,
2107 op->updated_hit_set_history,
2108 op->temp_added,
2109 op->temp_cleared,
2110 !should_send);
2111
2112 ZTracer::Trace trace;
2113 if (op->trace) {
2114 // initialize a child span for this shard
2115 trace.init("ec sub write", nullptr, &op->trace);
2116 trace.keyval("shard", i->shard.id);
2117 }
2118
2119 if (*i == get_parent()->whoami_shard()) {
2120 should_write_local = true;
2121 local_write_op.claim(sop);
2122 } else {
2123 MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop);
2124 r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard);
11fdf7f2 2125 r->map_epoch = get_osdmap_epoch();
7c673cae
FG
2126 r->min_epoch = get_parent()->get_interval_start_epoch();
2127 r->trace = trace;
9f95a23c 2128 messages.push_back(std::make_pair(i->osd, r));
7c673cae
FG
2129 }
2130 }
f67539c2
TL
2131
2132#ifdef HAVE_JAEGER
2133 if (op->client_op->osd_parent_span) {
2134 auto sub_write_span = jaeger_tracing::child_span("EC sub write", op->client_op->osd_parent_span);
2135 }
2136#endif
9f95a23c
TL
2137 if (!messages.empty()) {
2138 get_parent()->send_message_osd_cluster(messages, get_osdmap_epoch());
2139 }
2140
7c673cae 2141 if (should_write_local) {
11fdf7f2
TL
2142 handle_sub_write(
2143 get_parent()->whoami_shard(),
2144 op->client_op,
2145 local_write_op,
2146 op->trace);
7c673cae
FG
2147 }
2148
2149 for (auto i = op->on_write.begin();
2150 i != op->on_write.end();
2151 op->on_write.erase(i++)) {
2152 (*i)();
2153 }
2154
2155 return true;
2156}
2157
2158bool ECBackend::try_finish_rmw()
2159{
2160 if (waiting_commit.empty())
2161 return false;
2162 Op *op = &(waiting_commit.front());
2163 if (op->write_in_progress())
2164 return false;
2165 waiting_commit.pop_front();
2166
2167 dout(10) << __func__ << ": " << *op << dendl;
2168 dout(20) << __func__ << ": " << cache << dendl;
2169
2170 if (op->roll_forward_to > completed_to)
2171 completed_to = op->roll_forward_to;
2172 if (op->version > committed_to)
2173 committed_to = op->version;
2174
9f95a23c 2175 if (get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
7c673cae
FG
2176 if (op->version > get_parent()->get_log().get_can_rollback_to() &&
2177 waiting_reads.empty() &&
2178 waiting_commit.empty()) {
2179 // submit a dummy transaction to kick the rollforward
2180 auto tid = get_parent()->get_tid();
2181 Op *nop = &(tid_to_op_map[tid]);
2182 nop->hoid = op->hoid;
2183 nop->trim_to = op->trim_to;
2184 nop->roll_forward_to = op->version;
2185 nop->tid = tid;
2186 nop->reqid = op->reqid;
2187 waiting_reads.push_back(*nop);
2188 }
2189 }
2190
2191 if (op->using_cache) {
2192 cache.release_write_pin(op->pin);
2193 }
2194 tid_to_op_map.erase(op->tid);
2195
2196 if (waiting_reads.empty() &&
2197 waiting_commit.empty()) {
2198 pipeline_state.clear();
2199 dout(20) << __func__ << ": clearing pipeline_state "
2200 << pipeline_state
2201 << dendl;
2202 }
2203 return true;
2204}
2205
2206void ECBackend::check_ops()
2207{
2208 while (try_state_to_reads() ||
2209 try_reads_to_commit() ||
2210 try_finish_rmw());
2211}
2212
2213int ECBackend::objects_read_sync(
2214 const hobject_t &hoid,
2215 uint64_t off,
2216 uint64_t len,
2217 uint32_t op_flags,
2218 bufferlist *bl)
2219{
2220 return -EOPNOTSUPP;
2221}
2222
2223void ECBackend::objects_read_async(
2224 const hobject_t &hoid,
2225 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2226 pair<bufferlist*, Context*> > > &to_read,
2227 Context *on_complete,
2228 bool fast_read)
2229{
2230 map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
2231 reads;
2232
2233 uint32_t flags = 0;
2234 extent_set es;
2235 for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2236 pair<bufferlist*, Context*> > >::const_iterator i =
2237 to_read.begin();
2238 i != to_read.end();
2239 ++i) {
2240 pair<uint64_t, uint64_t> tmp =
2241 sinfo.offset_len_to_stripe_bounds(
2242 make_pair(i->first.get<0>(), i->first.get<1>()));
2243
11fdf7f2 2244 es.union_insert(tmp.first, tmp.second);
7c673cae
FG
2245 flags |= i->first.get<2>();
2246 }
2247
2248 if (!es.empty()) {
2249 auto &offsets = reads[hoid];
2250 for (auto j = es.begin();
2251 j != es.end();
2252 ++j) {
2253 offsets.push_back(
2254 boost::make_tuple(
2255 j.get_start(),
2256 j.get_len(),
2257 flags));
2258 }
2259 }
2260
2261 struct cb {
2262 ECBackend *ec;
2263 hobject_t hoid;
2264 list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2265 pair<bufferlist*, Context*> > > to_read;
2266 unique_ptr<Context> on_complete;
2267 cb(const cb&) = delete;
2268 cb(cb &&) = default;
2269 cb(ECBackend *ec,
2270 const hobject_t &hoid,
2271 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2272 pair<bufferlist*, Context*> > > &to_read,
2273 Context *on_complete)
2274 : ec(ec),
2275 hoid(hoid),
2276 to_read(to_read),
2277 on_complete(on_complete) {}
2278 void operator()(map<hobject_t,pair<int, extent_map> > &&results) {
2279 auto dpp = ec->get_parent()->get_dpp();
2280 ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
2281 << dendl;
2282 ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
2283 << dendl;
2284
2285 auto &got = results[hoid];
2286
2287 int r = 0;
2288 for (auto &&read: to_read) {
2289 if (got.first < 0) {
2290 if (read.second.second) {
2291 read.second.second->complete(got.first);
2292 }
2293 if (r == 0)
2294 r = got.first;
2295 } else {
11fdf7f2 2296 ceph_assert(read.second.first);
7c673cae
FG
2297 uint64_t offset = read.first.get<0>();
2298 uint64_t length = read.first.get<1>();
2299 auto range = got.second.get_containing_range(offset, length);
11fdf7f2
TL
2300 ceph_assert(range.first != range.second);
2301 ceph_assert(range.first.get_off() <= offset);
91327a77
AA
2302 ldpp_dout(dpp, 30) << "offset: " << offset << dendl;
2303 ldpp_dout(dpp, 30) << "range offset: " << range.first.get_off() << dendl;
2304 ldpp_dout(dpp, 30) << "length: " << length << dendl;
2305 ldpp_dout(dpp, 30) << "range length: " << range.first.get_len() << dendl;
11fdf7f2 2306 ceph_assert(
7c673cae
FG
2307 (offset + length) <=
2308 (range.first.get_off() + range.first.get_len()));
2309 read.second.first->substr_of(
2310 range.first.get_val(),
2311 offset - range.first.get_off(),
2312 length);
2313 if (read.second.second) {
2314 read.second.second->complete(length);
2315 read.second.second = nullptr;
2316 }
2317 }
2318 }
2319 to_read.clear();
2320 if (on_complete) {
2321 on_complete.release()->complete(r);
2322 }
2323 }
2324 ~cb() {
2325 for (auto &&i: to_read) {
2326 delete i.second.second;
2327 }
2328 to_read.clear();
2329 }
2330 };
2331 objects_read_and_reconstruct(
2332 reads,
2333 fast_read,
2334 make_gen_lambda_context<
2335 map<hobject_t,pair<int, extent_map> > &&, cb>(
2336 cb(this,
2337 hoid,
2338 to_read,
2339 on_complete)));
2340}
2341
2342struct CallClientContexts :
2343 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
2344 hobject_t hoid;
2345 ECBackend *ec;
2346 ECBackend::ClientAsyncReadStatus *status;
2347 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
2348 CallClientContexts(
2349 hobject_t hoid,
2350 ECBackend *ec,
2351 ECBackend::ClientAsyncReadStatus *status,
2352 const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
2353 : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
2354 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
2355 ECBackend::read_result_t &res = in.second;
2356 extent_map result;
2357 if (res.r != 0)
2358 goto out;
11fdf7f2
TL
2359 ceph_assert(res.returned.size() == to_read.size());
2360 ceph_assert(res.errors.empty());
7c673cae
FG
2361 for (auto &&read: to_read) {
2362 pair<uint64_t, uint64_t> adjusted =
2363 ec->sinfo.offset_len_to_stripe_bounds(
2364 make_pair(read.get<0>(), read.get<1>()));
11fdf7f2 2365 ceph_assert(res.returned.front().get<0>() == adjusted.first &&
7c673cae
FG
2366 res.returned.front().get<1>() == adjusted.second);
2367 map<int, bufferlist> to_decode;
2368 bufferlist bl;
2369 for (map<pg_shard_t, bufferlist>::iterator j =
2370 res.returned.front().get<2>().begin();
2371 j != res.returned.front().get<2>().end();
2372 ++j) {
f67539c2 2373 to_decode[j->first.shard] = std::move(j->second);
7c673cae
FG
2374 }
2375 int r = ECUtil::decode(
2376 ec->sinfo,
2377 ec->ec_impl,
2378 to_decode,
2379 &bl);
2380 if (r < 0) {
2381 res.r = r;
2382 goto out;
2383 }
2384 bufferlist trimmed;
2385 trimmed.substr_of(
2386 bl,
2387 read.get<0>() - adjusted.first,
11fdf7f2 2388 std::min(read.get<1>(),
7c673cae
FG
2389 bl.length() - (read.get<0>() - adjusted.first)));
2390 result.insert(
2391 read.get<0>(), trimmed.length(), std::move(trimmed));
2392 res.returned.pop_front();
2393 }
2394out:
2395 status->complete_object(hoid, res.r, std::move(result));
2396 ec->kick_reads();
2397 }
2398};
2399
2400void ECBackend::objects_read_and_reconstruct(
2401 const map<hobject_t,
2402 std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
2403 > &reads,
2404 bool fast_read,
2405 GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
2406{
2407 in_progress_client_reads.emplace_back(
2408 reads.size(), std::move(func));
2409 if (!reads.size()) {
2410 kick_reads();
2411 return;
2412 }
2413
28e407b8 2414 map<hobject_t, set<int>> obj_want_to_read;
7c673cae
FG
2415 set<int> want_to_read;
2416 get_want_to_read_shards(&want_to_read);
2417
2418 map<hobject_t, read_request_t> for_read_op;
2419 for (auto &&to_read: reads) {
11fdf7f2 2420 map<pg_shard_t, vector<pair<int, int>>> shards;
7c673cae
FG
2421 int r = get_min_avail_to_read_shards(
2422 to_read.first,
2423 want_to_read,
2424 false,
2425 fast_read,
2426 &shards);
11fdf7f2 2427 ceph_assert(r == 0);
7c673cae
FG
2428
2429 CallClientContexts *c = new CallClientContexts(
2430 to_read.first,
2431 this,
2432 &(in_progress_client_reads.back()),
2433 to_read.second);
2434 for_read_op.insert(
2435 make_pair(
2436 to_read.first,
2437 read_request_t(
2438 to_read.second,
2439 shards,
2440 false,
2441 c)));
28e407b8 2442 obj_want_to_read.insert(make_pair(to_read.first, want_to_read));
7c673cae
FG
2443 }
2444
2445 start_read_op(
2446 CEPH_MSG_PRIO_DEFAULT,
28e407b8 2447 obj_want_to_read,
7c673cae
FG
2448 for_read_op,
2449 OpRequestRef(),
2450 fast_read, false);
2451 return;
2452}
2453
2454
2455int ECBackend::send_all_remaining_reads(
2456 const hobject_t &hoid,
2457 ReadOp &rop)
2458{
2459 set<int> already_read;
2460 const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
2461 for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
2462 already_read.insert(i->shard);
2463 dout(10) << __func__ << " have/error shards=" << already_read << dendl;
11fdf7f2 2464 map<pg_shard_t, vector<pair<int, int>>> shards;
28e407b8
AA
2465 int r = get_remaining_shards(hoid, already_read, rop.want_to_read[hoid],
2466 rop.complete[hoid], &shards, rop.for_recovery);
7c673cae
FG
2467 if (r)
2468 return r;
7c673cae 2469
7c673cae
FG
2470 list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
2471 rop.to_read.find(hoid)->second.to_read;
2472 GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
2473 rop.to_read.find(hoid)->second.cb;
2474
91327a77
AA
2475 // (Note cuixf) If we need to read attrs and we read failed, try to read again.
2476 bool want_attrs =
2477 rop.to_read.find(hoid)->second.want_attrs &&
2478 (!rop.complete[hoid].attrs || rop.complete[hoid].attrs->empty());
2479 if (want_attrs) {
2480 dout(10) << __func__ << " want attrs again" << dendl;
2481 }
2482
28e407b8
AA
2483 rop.to_read.erase(hoid);
2484 rop.to_read.insert(make_pair(
7c673cae
FG
2485 hoid,
2486 read_request_t(
2487 offsets,
2488 shards,
91327a77 2489 want_attrs,
7c673cae 2490 c)));
7c673cae
FG
2491 return 0;
2492}
2493
2494int ECBackend::objects_get_attrs(
2495 const hobject_t &hoid,
2496 map<string, bufferlist> *out)
2497{
2498 int r = store->getattrs(
2499 ch,
2500 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2501 *out);
2502 if (r < 0)
2503 return r;
2504
2505 for (map<string, bufferlist>::iterator i = out->begin();
2506 i != out->end();
2507 ) {
2508 if (ECUtil::is_hinfo_key_string(i->first))
2509 out->erase(i++);
2510 else
2511 ++i;
2512 }
2513 return r;
2514}
2515
2516void ECBackend::rollback_append(
2517 const hobject_t &hoid,
2518 uint64_t old_size,
2519 ObjectStore::Transaction *t)
2520{
11fdf7f2 2521 ceph_assert(old_size % sinfo.get_stripe_width() == 0);
7c673cae
FG
2522 t->truncate(
2523 coll,
2524 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2525 sinfo.aligned_logical_offset_to_chunk_offset(
2526 old_size));
2527}
2528
28e407b8 2529int ECBackend::be_deep_scrub(
7c673cae 2530 const hobject_t &poid,
28e407b8
AA
2531 ScrubMap &map,
2532 ScrubMapBuilder &pos,
2533 ScrubMap::object &o)
2534{
2535 dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
7c673cae 2536 int r;
28e407b8
AA
2537
2538 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
2539 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
2540
2541 utime_t sleeptime;
2542 sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
2543 if (sleeptime != utime_t()) {
2544 lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
2545 sleeptime.sleep();
2546 }
2547
2548 if (pos.data_pos == 0) {
2549 pos.data_hash = bufferhash(-1);
2550 }
2551
7c673cae
FG
2552 uint64_t stride = cct->_conf->osd_deep_scrub_stride;
2553 if (stride % sinfo.get_chunk_size())
2554 stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
7c673cae 2555
28e407b8
AA
2556 bufferlist bl;
2557 r = store->read(
2558 ch,
2559 ghobject_t(
2560 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2561 pos.data_pos,
2562 stride, bl,
2563 fadvise_flags);
2564 if (r < 0) {
2565 dout(20) << __func__ << " " << poid << " got "
2566 << r << " on read, read_error" << dendl;
2567 o.read_error = true;
2568 return 0;
7c673cae 2569 }
28e407b8
AA
2570 if (bl.length() % sinfo.get_chunk_size()) {
2571 dout(20) << __func__ << " " << poid << " got "
2572 << r << " on read, not chunk size " << sinfo.get_chunk_size() << " aligned"
2573 << dendl;
7c673cae 2574 o.read_error = true;
28e407b8
AA
2575 return 0;
2576 }
2577 if (r > 0) {
2578 pos.data_hash << bl;
2579 }
2580 pos.data_pos += r;
2581 if (r == (int)stride) {
2582 return -EINPROGRESS;
7c673cae
FG
2583 }
2584
2585 ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
2586 if (!hinfo) {
2587 dout(0) << "_scan_list " << poid << " could not retrieve hash info" << dendl;
2588 o.read_error = true;
2589 o.digest_present = false;
28e407b8 2590 return 0;
7c673cae
FG
2591 } else {
2592 if (!get_parent()->get_pool().allows_ecoverwrites()) {
a4b75251
TL
2593 if (!hinfo->has_chunk_hash()) {
2594 dout(0) << "_scan_list " << poid << " got invalid hash info" << dendl;
2595 o.ec_size_mismatch = true;
2596 return 0;
2597 }
28e407b8
AA
2598 if (hinfo->get_total_chunk_size() != (unsigned)pos.data_pos) {
2599 dout(0) << "_scan_list " << poid << " got incorrect size on read 0x"
2600 << std::hex << pos
2601 << " expected 0x" << hinfo->get_total_chunk_size() << std::dec
2602 << dendl;
7c673cae 2603 o.ec_size_mismatch = true;
28e407b8 2604 return 0;
7c673cae
FG
2605 }
2606
28e407b8
AA
2607 if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) !=
2608 pos.data_hash.digest()) {
2609 dout(0) << "_scan_list " << poid << " got incorrect hash on read 0x"
2610 << std::hex << pos.data_hash.digest() << " != expected 0x"
2611 << hinfo->get_chunk_hash(get_parent()->whoami_shard().shard)
2612 << std::dec << dendl;
7c673cae 2613 o.ec_hash_mismatch = true;
28e407b8 2614 return 0;
7c673cae
FG
2615 }
2616
2617 /* We checked above that we match our own stored hash. We cannot
2618 * send a hash of the actual object, so instead we simply send
2619 * our locally stored hash of shard 0 on the assumption that if
2620 * we match our chunk hash and our recollection of the hash for
2621 * chunk 0 matches that of our peers, there is likely no corruption.
2622 */
2623 o.digest = hinfo->get_chunk_hash(0);
2624 o.digest_present = true;
2625 } else {
2626 /* Hack! We must be using partial overwrites, and partial overwrites
2627 * don't support deep-scrub yet
2628 */
2629 o.digest = 0;
2630 o.digest_present = true;
2631 }
2632 }
2633
28e407b8 2634 o.omap_digest = -1;
7c673cae 2635 o.omap_digest_present = true;
28e407b8 2636 return 0;
7c673cae 2637}