]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/ECBackend.cc
bump version to 16.2.6-pve2
[ceph.git] / ceph / src / osd / ECBackend.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <iostream>
16#include <sstream>
17
18#include "ECBackend.h"
19#include "messages/MOSDPGPush.h"
20#include "messages/MOSDPGPushReply.h"
21#include "messages/MOSDECSubOpWrite.h"
22#include "messages/MOSDECSubOpWriteReply.h"
23#include "messages/MOSDECSubOpRead.h"
24#include "messages/MOSDECSubOpReadReply.h"
25#include "ECMsgTypes.h"
26
27#include "PrimaryLogPG.h"
28
29#define dout_context cct
30#define dout_subsys ceph_subsys_osd
31#define DOUT_PREFIX_ARGS this
32#undef dout_prefix
33#define dout_prefix _prefix(_dout, this)
f67539c2
TL
34
35using std::dec;
36using std::hex;
37using std::list;
38using std::make_pair;
39using std::map;
40using std::pair;
41using std::ostream;
42using std::set;
43using std::string;
44using std::unique_ptr;
45using std::vector;
46
47using ceph::bufferhash;
48using ceph::bufferlist;
49using ceph::bufferptr;
50using ceph::ErasureCodeInterfaceRef;
51using ceph::Formatter;
52
7c673cae 53static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
11fdf7f2 54 return pgb->get_parent()->gen_dbg_prefix(*_dout);
7c673cae
FG
55}
56
57struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
58 list<ECBackend::RecoveryOp> ops;
59};
60
61ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
62 switch (rhs.pipeline_state) {
63 case ECBackend::pipeline_state_t::CACHE_VALID:
64 return lhs << "CACHE_VALID";
65 case ECBackend::pipeline_state_t::CACHE_INVALID:
66 return lhs << "CACHE_INVALID";
67 default:
11fdf7f2 68 ceph_abort_msg("invalid pipeline state");
7c673cae
FG
69 }
70 return lhs; // unreachable
71}
72
73static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
74{
75 lhs << "[";
76 for (map<pg_shard_t, bufferlist>::const_iterator i = rhs.begin();
77 i != rhs.end();
78 ++i) {
79 if (i != rhs.begin())
80 lhs << ", ";
81 lhs << make_pair(i->first, i->second.length());
82 }
83 return lhs << "]";
84}
85
86static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
87{
88 lhs << "[";
89 for (map<int, bufferlist>::const_iterator i = rhs.begin();
90 i != rhs.end();
91 ++i) {
92 if (i != rhs.begin())
93 lhs << ", ";
94 lhs << make_pair(i->first, i->second.length());
95 }
96 return lhs << "]";
97}
98
99static ostream &operator<<(
100 ostream &lhs,
101 const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
102{
103 return lhs << "(" << rhs.get<0>() << ", "
104 << rhs.get<1>() << ", " << rhs.get<2>() << ")";
105}
106
107ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
108{
109 return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
110 << ", need=" << rhs.need
111 << ", want_attrs=" << rhs.want_attrs
112 << ")";
113}
114
115ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
116{
117 lhs << "read_result_t(r=" << rhs.r
118 << ", errors=" << rhs.errors;
119 if (rhs.attrs) {
9f95a23c 120 lhs << ", attrs=" << *(rhs.attrs);
7c673cae
FG
121 } else {
122 lhs << ", noattrs";
123 }
124 return lhs << ", returned=" << rhs.returned << ")";
125}
126
127ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
128{
129 lhs << "ReadOp(tid=" << rhs.tid;
130 if (rhs.op && rhs.op->get_req()) {
131 lhs << ", op=";
132 rhs.op->get_req()->print(lhs);
133 }
134 return lhs << ", to_read=" << rhs.to_read
135 << ", complete=" << rhs.complete
136 << ", priority=" << rhs.priority
137 << ", obj_to_source=" << rhs.obj_to_source
138 << ", source_to_obj=" << rhs.source_to_obj
139 << ", in_progress=" << rhs.in_progress << ")";
140}
141
142void ECBackend::ReadOp::dump(Formatter *f) const
143{
144 f->dump_unsigned("tid", tid);
145 if (op && op->get_req()) {
146 f->dump_stream("op") << *(op->get_req());
147 }
148 f->dump_stream("to_read") << to_read;
149 f->dump_stream("complete") << complete;
150 f->dump_int("priority", priority);
151 f->dump_stream("obj_to_source") << obj_to_source;
152 f->dump_stream("source_to_obj") << source_to_obj;
153 f->dump_stream("in_progress") << in_progress;
154}
155
156ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
157{
158 lhs << "Op(" << rhs.hoid
159 << " v=" << rhs.version
160 << " tt=" << rhs.trim_to
161 << " tid=" << rhs.tid
162 << " reqid=" << rhs.reqid;
163 if (rhs.client_op && rhs.client_op->get_req()) {
164 lhs << " client_op=";
165 rhs.client_op->get_req()->print(lhs);
166 }
167 lhs << " roll_forward_to=" << rhs.roll_forward_to
168 << " temp_added=" << rhs.temp_added
169 << " temp_cleared=" << rhs.temp_cleared
170 << " pending_read=" << rhs.pending_read
171 << " remote_read=" << rhs.remote_read
172 << " remote_read_result=" << rhs.remote_read_result
173 << " pending_apply=" << rhs.pending_apply
174 << " pending_commit=" << rhs.pending_commit
175 << " plan.to_read=" << rhs.plan.to_read
176 << " plan.will_write=" << rhs.plan.will_write
177 << ")";
178 return lhs;
179}
180
181ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
182{
183 return lhs << "RecoveryOp("
184 << "hoid=" << rhs.hoid
185 << " v=" << rhs.v
186 << " missing_on=" << rhs.missing_on
187 << " missing_on_shards=" << rhs.missing_on_shards
188 << " recovery_info=" << rhs.recovery_info
189 << " recovery_progress=" << rhs.recovery_progress
190 << " obc refcount=" << rhs.obc.use_count()
191 << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
192 << " waiting_on_pushes=" << rhs.waiting_on_pushes
193 << " extent_requested=" << rhs.extent_requested
194 << ")";
195}
196
197void ECBackend::RecoveryOp::dump(Formatter *f) const
198{
199 f->dump_stream("hoid") << hoid;
200 f->dump_stream("v") << v;
201 f->dump_stream("missing_on") << missing_on;
202 f->dump_stream("missing_on_shards") << missing_on_shards;
203 f->dump_stream("recovery_info") << recovery_info;
204 f->dump_stream("recovery_progress") << recovery_progress;
205 f->dump_stream("state") << tostr(state);
206 f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
207 f->dump_stream("extent_requested") << extent_requested;
208}
209
210ECBackend::ECBackend(
211 PGBackend::Listener *pg,
11fdf7f2 212 const coll_t &coll,
7c673cae
FG
213 ObjectStore::CollectionHandle &ch,
214 ObjectStore *store,
215 CephContext *cct,
216 ErasureCodeInterfaceRef ec_impl,
217 uint64_t stripe_width)
218 : PGBackend(cct, pg, store, coll, ch),
219 ec_impl(ec_impl),
220 sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
11fdf7f2 221 ceph_assert((ec_impl->get_data_chunk_count() *
7c673cae
FG
222 ec_impl->get_chunk_size(stripe_width)) == stripe_width);
223}
224
225PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
226{
227 return new ECRecoveryHandle;
228}
229
230void ECBackend::_failed_push(const hobject_t &hoid,
231 pair<RecoveryMessages *, ECBackend::read_result_t &> &in)
232{
233 ECBackend::read_result_t &res = in.second;
234 dout(10) << __func__ << ": Read error " << hoid << " r="
235 << res.r << " errors=" << res.errors << dendl;
236 dout(10) << __func__ << ": canceling recovery op for obj " << hoid
237 << dendl;
11fdf7f2 238 ceph_assert(recovery_ops.count(hoid));
b32b8144 239 eversion_t v = recovery_ops[hoid].v;
7c673cae
FG
240 recovery_ops.erase(hoid);
241
9f95a23c 242 set<pg_shard_t> fl;
7c673cae 243 for (auto&& i : res.errors) {
9f95a23c 244 fl.insert(i.first);
7c673cae 245 }
9f95a23c 246 get_parent()->on_failed_pull(fl, hoid, v);
7c673cae
FG
247}
248
249struct OnRecoveryReadComplete :
250 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
251 ECBackend *pg;
252 hobject_t hoid;
7c673cae
FG
253 OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid)
254 : pg(pg), hoid(hoid) {}
255 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
256 ECBackend::read_result_t &res = in.second;
257 if (!(res.r == 0 && res.errors.empty())) {
258 pg->_failed_push(hoid, in);
259 return;
260 }
11fdf7f2 261 ceph_assert(res.returned.size() == 1);
7c673cae
FG
262 pg->handle_recovery_read_complete(
263 hoid,
264 res.returned.back(),
265 res.attrs,
266 in.first);
267 }
268};
269
270struct RecoveryMessages {
271 map<hobject_t,
272 ECBackend::read_request_t> reads;
28e407b8 273 map<hobject_t, set<int>> want_to_read;
7c673cae
FG
274 void read(
275 ECBackend *ec,
276 const hobject_t &hoid, uint64_t off, uint64_t len,
28e407b8 277 set<int> &&_want_to_read,
11fdf7f2 278 const map<pg_shard_t, vector<pair<int, int>>> &need,
7c673cae
FG
279 bool attrs) {
280 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
281 to_read.push_back(boost::make_tuple(off, len, 0));
11fdf7f2 282 ceph_assert(!reads.count(hoid));
28e407b8 283 want_to_read.insert(make_pair(hoid, std::move(_want_to_read)));
7c673cae
FG
284 reads.insert(
285 make_pair(
286 hoid,
287 ECBackend::read_request_t(
288 to_read,
289 need,
290 attrs,
291 new OnRecoveryReadComplete(
292 ec,
293 hoid))));
294 }
295
296 map<pg_shard_t, vector<PushOp> > pushes;
297 map<pg_shard_t, vector<PushReplyOp> > push_replies;
298 ObjectStore::Transaction t;
299 RecoveryMessages() {}
f67539c2 300 ~RecoveryMessages() {}
7c673cae
FG
301};
302
303void ECBackend::handle_recovery_push(
304 const PushOp &op,
11fdf7f2
TL
305 RecoveryMessages *m,
306 bool is_repair)
7c673cae 307{
11fdf7f2
TL
308 if (get_parent()->check_failsafe_full()) {
309 dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl;
7c673cae
FG
310 ceph_abort();
311 }
312
313 bool oneshot = op.before_progress.first && op.after_progress.data_complete;
314 ghobject_t tobj;
315 if (oneshot) {
316 tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
317 get_parent()->whoami_shard().shard);
318 } else {
319 tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
320 op.version),
321 ghobject_t::NO_GEN,
322 get_parent()->whoami_shard().shard);
323 if (op.before_progress.first) {
324 dout(10) << __func__ << ": Adding oid "
325 << tobj.hobj << " in the temp collection" << dendl;
326 add_temp_obj(tobj.hobj);
327 }
328 }
329
330 if (op.before_progress.first) {
331 m->t.remove(coll, tobj);
332 m->t.touch(coll, tobj);
333 }
334
335 if (!op.data_included.empty()) {
336 uint64_t start = op.data_included.range_start();
337 uint64_t end = op.data_included.range_end();
11fdf7f2 338 ceph_assert(op.data.length() == (end - start));
7c673cae
FG
339
340 m->t.write(
341 coll,
342 tobj,
343 start,
344 op.data.length(),
345 op.data);
346 } else {
11fdf7f2
TL
347 ceph_assert(op.data.length() == 0);
348 }
349
350 if (get_parent()->pg_is_remote_backfilling()) {
351 get_parent()->pg_add_local_num_bytes(op.data.length());
352 get_parent()->pg_add_num_bytes(op.data.length() * get_ec_data_chunk_count());
353 dout(10) << __func__ << " " << op.soid
354 << " add new actual data by " << op.data.length()
355 << " add new num_bytes by " << op.data.length() * get_ec_data_chunk_count()
356 << dendl;
7c673cae
FG
357 }
358
359 if (op.before_progress.first) {
11fdf7f2 360 ceph_assert(op.attrset.count(string("_")));
7c673cae
FG
361 m->t.setattrs(
362 coll,
363 tobj,
364 op.attrset);
365 }
366
367 if (op.after_progress.data_complete && !oneshot) {
368 dout(10) << __func__ << ": Removing oid "
369 << tobj.hobj << " from the temp collection" << dendl;
370 clear_temp_obj(tobj.hobj);
371 m->t.remove(coll, ghobject_t(
372 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
373 m->t.collection_move_rename(
374 coll, tobj,
375 coll, ghobject_t(
376 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
377 }
378 if (op.after_progress.data_complete) {
379 if ((get_parent()->pgb_is_primary())) {
11fdf7f2
TL
380 ceph_assert(recovery_ops.count(op.soid));
381 ceph_assert(recovery_ops[op.soid].obc);
382 if (get_parent()->pg_is_repair())
383 get_parent()->inc_osd_stat_repaired();
7c673cae
FG
384 get_parent()->on_local_recover(
385 op.soid,
386 op.recovery_info,
387 recovery_ops[op.soid].obc,
c07f9fc5 388 false,
7c673cae
FG
389 &m->t);
390 } else {
11fdf7f2
TL
391 // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired
392 if (is_repair)
393 get_parent()->inc_osd_stat_repaired();
7c673cae
FG
394 get_parent()->on_local_recover(
395 op.soid,
396 op.recovery_info,
397 ObjectContextRef(),
c07f9fc5 398 false,
7c673cae 399 &m->t);
11fdf7f2
TL
400 if (get_parent()->pg_is_remote_backfilling()) {
401 struct stat st;
402 int r = store->stat(ch, ghobject_t(op.soid, ghobject_t::NO_GEN,
403 get_parent()->whoami_shard().shard), &st);
404 if (r == 0) {
405 get_parent()->pg_sub_local_num_bytes(st.st_size);
406 // XXX: This can be way overestimated for small objects
407 get_parent()->pg_sub_num_bytes(st.st_size * get_ec_data_chunk_count());
408 dout(10) << __func__ << " " << op.soid
409 << " sub actual data by " << st.st_size
410 << " sub num_bytes by " << st.st_size * get_ec_data_chunk_count()
411 << dendl;
412 }
413 }
7c673cae
FG
414 }
415 }
416 m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
417 m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
418}
419
420void ECBackend::handle_recovery_push_reply(
421 const PushReplyOp &op,
422 pg_shard_t from,
423 RecoveryMessages *m)
424{
425 if (!recovery_ops.count(op.soid))
426 return;
427 RecoveryOp &rop = recovery_ops[op.soid];
11fdf7f2 428 ceph_assert(rop.waiting_on_pushes.count(from));
7c673cae
FG
429 rop.waiting_on_pushes.erase(from);
430 continue_recovery_op(rop, m);
431}
432
433void ECBackend::handle_recovery_read_complete(
434 const hobject_t &hoid,
435 boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
9f95a23c 436 std::optional<map<string, bufferlist> > attrs,
7c673cae
FG
437 RecoveryMessages *m)
438{
439 dout(10) << __func__ << ": returned " << hoid << " "
440 << "(" << to_read.get<0>()
441 << ", " << to_read.get<1>()
442 << ", " << to_read.get<2>()
443 << ")"
444 << dendl;
11fdf7f2 445 ceph_assert(recovery_ops.count(hoid));
7c673cae 446 RecoveryOp &op = recovery_ops[hoid];
11fdf7f2 447 ceph_assert(op.returned_data.empty());
7c673cae
FG
448 map<int, bufferlist*> target;
449 for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
450 i != op.missing_on_shards.end();
451 ++i) {
452 target[*i] = &(op.returned_data[*i]);
453 }
454 map<int, bufferlist> from;
455 for(map<pg_shard_t, bufferlist>::iterator i = to_read.get<2>().begin();
456 i != to_read.get<2>().end();
457 ++i) {
f67539c2 458 from[i->first.shard] = std::move(i->second);
7c673cae
FG
459 }
460 dout(10) << __func__ << ": " << from << dendl;
11fdf7f2
TL
461 int r;
462 r = ECUtil::decode(sinfo, ec_impl, from, target);
463 ceph_assert(r == 0);
7c673cae
FG
464 if (attrs) {
465 op.xattrs.swap(*attrs);
466
467 if (!op.obc) {
468 // attrs only reference the origin bufferlist (decode from
469 // ECSubReadReply message) whose size is much greater than attrs
470 // in recovery. If obc cache it (get_obc maybe cache the attr),
471 // this causes the whole origin bufferlist would not be free
472 // until obc is evicted from obc cache. So rebuild the
473 // bufferlist before cache it.
474 for (map<string, bufferlist>::iterator it = op.xattrs.begin();
475 it != op.xattrs.end();
476 ++it) {
477 it->second.rebuild();
478 }
479 // Need to remove ECUtil::get_hinfo_key() since it should not leak out
480 // of the backend (see bug #12983)
481 map<string, bufferlist> sanitized_attrs(op.xattrs);
482 sanitized_attrs.erase(ECUtil::get_hinfo_key());
483 op.obc = get_parent()->get_obc(hoid, sanitized_attrs);
11fdf7f2 484 ceph_assert(op.obc);
7c673cae
FG
485 op.recovery_info.size = op.obc->obs.oi.size;
486 op.recovery_info.oi = op.obc->obs.oi;
487 }
488
489 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
490 if (op.obc->obs.oi.size > 0) {
11fdf7f2
TL
491 ceph_assert(op.xattrs.count(ECUtil::get_hinfo_key()));
492 auto bp = op.xattrs[ECUtil::get_hinfo_key()].cbegin();
493 decode(hinfo, bp);
7c673cae
FG
494 }
495 op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
496 }
11fdf7f2
TL
497 ceph_assert(op.xattrs.size());
498 ceph_assert(op.obc);
7c673cae
FG
499 continue_recovery_op(op, m);
500}
501
502struct SendPushReplies : public Context {
503 PGBackend::Listener *l;
504 epoch_t epoch;
505 map<int, MOSDPGPushReply*> replies;
506 SendPushReplies(
507 PGBackend::Listener *l,
508 epoch_t epoch,
509 map<int, MOSDPGPushReply*> &in) : l(l), epoch(epoch) {
510 replies.swap(in);
511 }
512 void finish(int) override {
9f95a23c
TL
513 std::vector<std::pair<int, Message*>> messages;
514 messages.reserve(replies.size());
7c673cae
FG
515 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
516 i != replies.end();
517 ++i) {
9f95a23c
TL
518 messages.push_back(std::make_pair(i->first, i->second));
519 }
520 if (!messages.empty()) {
521 l->send_message_osd_cluster(messages, epoch);
7c673cae
FG
522 }
523 replies.clear();
524 }
525 ~SendPushReplies() override {
526 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
527 i != replies.end();
528 ++i) {
529 i->second->put();
530 }
531 replies.clear();
532 }
533};
534
535void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
536{
537 for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
538 i != m.pushes.end();
539 m.pushes.erase(i++)) {
540 MOSDPGPush *msg = new MOSDPGPush();
541 msg->set_priority(priority);
11fdf7f2 542 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
543 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
544 msg->from = get_parent()->whoami_shard();
545 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
546 msg->pushes.swap(i->second);
547 msg->compute_cost(cct);
11fdf7f2 548 msg->is_repair = get_parent()->pg_is_repair();
7c673cae
FG
549 get_parent()->send_message(
550 i->first.osd,
551 msg);
552 }
553 map<int, MOSDPGPushReply*> replies;
554 for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
555 m.push_replies.begin();
556 i != m.push_replies.end();
557 m.push_replies.erase(i++)) {
558 MOSDPGPushReply *msg = new MOSDPGPushReply();
559 msg->set_priority(priority);
11fdf7f2 560 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
561 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
562 msg->from = get_parent()->whoami_shard();
563 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
564 msg->replies.swap(i->second);
565 msg->compute_cost(cct);
566 replies.insert(make_pair(i->first.osd, msg));
567 }
568
569 if (!replies.empty()) {
570 (m.t).register_on_complete(
571 get_parent()->bless_context(
572 new SendPushReplies(
573 get_parent(),
11fdf7f2 574 get_osdmap_epoch(),
7c673cae
FG
575 replies)));
576 get_parent()->queue_transaction(std::move(m.t));
577 }
578
579 if (m.reads.empty())
580 return;
581 start_read_op(
582 priority,
28e407b8 583 m.want_to_read,
7c673cae
FG
584 m.reads,
585 OpRequestRef(),
586 false, true);
587}
588
589void ECBackend::continue_recovery_op(
590 RecoveryOp &op,
591 RecoveryMessages *m)
592{
593 dout(10) << __func__ << ": continuing " << op << dendl;
594 while (1) {
595 switch (op.state) {
596 case RecoveryOp::IDLE: {
597 // start read
598 op.state = RecoveryOp::READING;
11fdf7f2 599 ceph_assert(!op.recovery_progress.data_complete);
7c673cae
FG
600 set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
601 uint64_t from = op.recovery_progress.data_recovered_to;
602 uint64_t amount = get_recovery_chunk_size();
603
604 if (op.recovery_progress.first && op.obc) {
605 /* We've got the attrs and the hinfo, might as well use them */
606 op.hinfo = get_hash_info(op.hoid);
11fdf7f2 607 ceph_assert(op.hinfo);
7c673cae 608 op.xattrs = op.obc->attr_cache;
11fdf7f2 609 encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
7c673cae
FG
610 }
611
11fdf7f2 612 map<pg_shard_t, vector<pair<int, int>>> to_read;
7c673cae
FG
613 int r = get_min_avail_to_read_shards(
614 op.hoid, want, true, false, &to_read);
615 if (r != 0) {
616 // we must have lost a recovery source
11fdf7f2 617 ceph_assert(!op.recovery_progress.first);
7c673cae
FG
618 dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
619 << dendl;
620 get_parent()->cancel_pull(op.hoid);
621 recovery_ops.erase(op.hoid);
622 return;
623 }
624 m->read(
625 this,
626 op.hoid,
627 op.recovery_progress.data_recovered_to,
628 amount,
28e407b8 629 std::move(want),
7c673cae
FG
630 to_read,
631 op.recovery_progress.first && !op.obc);
632 op.extent_requested = make_pair(
633 from,
634 amount);
635 dout(10) << __func__ << ": IDLE return " << op << dendl;
636 return;
637 }
638 case RecoveryOp::READING: {
639 // read completed, start write
11fdf7f2
TL
640 ceph_assert(op.xattrs.size());
641 ceph_assert(op.returned_data.size());
7c673cae
FG
642 op.state = RecoveryOp::WRITING;
643 ObjectRecoveryProgress after_progress = op.recovery_progress;
644 after_progress.data_recovered_to += op.extent_requested.second;
645 after_progress.first = false;
646 if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
647 after_progress.data_recovered_to =
648 sinfo.logical_to_next_stripe_offset(
649 op.obc->obs.oi.size);
650 after_progress.data_complete = true;
651 }
652 for (set<pg_shard_t>::iterator mi = op.missing_on.begin();
653 mi != op.missing_on.end();
654 ++mi) {
11fdf7f2 655 ceph_assert(op.returned_data.count(mi->shard));
7c673cae
FG
656 m->pushes[*mi].push_back(PushOp());
657 PushOp &pop = m->pushes[*mi].back();
658 pop.soid = op.hoid;
659 pop.version = op.v;
660 pop.data = op.returned_data[mi->shard];
661 dout(10) << __func__ << ": before_progress=" << op.recovery_progress
662 << ", after_progress=" << after_progress
663 << ", pop.data.length()=" << pop.data.length()
664 << ", size=" << op.obc->obs.oi.size << dendl;
11fdf7f2 665 ceph_assert(
7c673cae
FG
666 pop.data.length() ==
667 sinfo.aligned_logical_offset_to_chunk_offset(
668 after_progress.data_recovered_to -
669 op.recovery_progress.data_recovered_to)
670 );
671 if (pop.data.length())
672 pop.data_included.insert(
673 sinfo.aligned_logical_offset_to_chunk_offset(
674 op.recovery_progress.data_recovered_to),
675 pop.data.length()
676 );
677 if (op.recovery_progress.first) {
678 pop.attrset = op.xattrs;
679 }
680 pop.recovery_info = op.recovery_info;
681 pop.before_progress = op.recovery_progress;
682 pop.after_progress = after_progress;
683 if (*mi != get_parent()->primary_shard())
684 get_parent()->begin_peer_recover(
685 *mi,
686 op.hoid);
687 }
688 op.returned_data.clear();
689 op.waiting_on_pushes = op.missing_on;
690 op.recovery_progress = after_progress;
691 dout(10) << __func__ << ": READING return " << op << dendl;
692 return;
693 }
694 case RecoveryOp::WRITING: {
695 if (op.waiting_on_pushes.empty()) {
696 if (op.recovery_progress.data_complete) {
697 op.state = RecoveryOp::COMPLETE;
698 for (set<pg_shard_t>::iterator i = op.missing_on.begin();
699 i != op.missing_on.end();
700 ++i) {
701 if (*i != get_parent()->primary_shard()) {
702 dout(10) << __func__ << ": on_peer_recover on " << *i
703 << ", obj " << op.hoid << dendl;
704 get_parent()->on_peer_recover(
705 *i,
706 op.hoid,
707 op.recovery_info);
708 }
709 }
710 object_stat_sum_t stat;
711 stat.num_bytes_recovered = op.recovery_info.size;
712 stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
713 stat.num_objects_recovered = 1;
11fdf7f2
TL
714 if (get_parent()->pg_is_repair())
715 stat.num_objects_repaired = 1;
c07f9fc5 716 get_parent()->on_global_recover(op.hoid, stat, false);
7c673cae
FG
717 dout(10) << __func__ << ": WRITING return " << op << dendl;
718 recovery_ops.erase(op.hoid);
719 return;
720 } else {
721 op.state = RecoveryOp::IDLE;
722 dout(10) << __func__ << ": WRITING continue " << op << dendl;
723 continue;
724 }
725 }
726 return;
727 }
728 // should never be called once complete
729 case RecoveryOp::COMPLETE:
730 default: {
731 ceph_abort();
732 };
733 }
734 }
735}
736
737void ECBackend::run_recovery_op(
738 RecoveryHandle *_h,
739 int priority)
740{
741 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
742 RecoveryMessages m;
743 for (list<RecoveryOp>::iterator i = h->ops.begin();
744 i != h->ops.end();
745 ++i) {
746 dout(10) << __func__ << ": starting " << *i << dendl;
11fdf7f2 747 ceph_assert(!recovery_ops.count(i->hoid));
7c673cae
FG
748 RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
749 continue_recovery_op(op, &m);
750 }
c07f9fc5 751
7c673cae 752 dispatch_recovery_messages(m, priority);
c07f9fc5 753 send_recovery_deletes(priority, h->deletes);
7c673cae
FG
754 delete _h;
755}
756
224ce89b 757int ECBackend::recover_object(
7c673cae
FG
758 const hobject_t &hoid,
759 eversion_t v,
760 ObjectContextRef head,
761 ObjectContextRef obc,
762 RecoveryHandle *_h)
763{
764 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
765 h->ops.push_back(RecoveryOp());
766 h->ops.back().v = v;
767 h->ops.back().hoid = hoid;
768 h->ops.back().obc = obc;
769 h->ops.back().recovery_info.soid = hoid;
770 h->ops.back().recovery_info.version = v;
771 if (obc) {
772 h->ops.back().recovery_info.size = obc->obs.oi.size;
773 h->ops.back().recovery_info.oi = obc->obs.oi;
774 }
775 if (hoid.is_snap()) {
776 if (obc) {
11fdf7f2 777 ceph_assert(obc->ssc);
7c673cae
FG
778 h->ops.back().recovery_info.ss = obc->ssc->snapset;
779 } else if (head) {
11fdf7f2 780 ceph_assert(head->ssc);
7c673cae
FG
781 h->ops.back().recovery_info.ss = head->ssc->snapset;
782 } else {
11fdf7f2 783 ceph_abort_msg("neither obc nor head set for a snap object");
7c673cae
FG
784 }
785 }
786 h->ops.back().recovery_progress.omap_complete = true;
787 for (set<pg_shard_t>::const_iterator i =
11fdf7f2
TL
788 get_parent()->get_acting_recovery_backfill_shards().begin();
789 i != get_parent()->get_acting_recovery_backfill_shards().end();
7c673cae
FG
790 ++i) {
791 dout(10) << "checking " << *i << dendl;
792 if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
793 h->ops.back().missing_on.insert(*i);
794 h->ops.back().missing_on_shards.insert(i->shard);
795 }
796 }
797 dout(10) << __func__ << ": built op " << h->ops.back() << dendl;
224ce89b 798 return 0;
7c673cae
FG
799}
800
801bool ECBackend::can_handle_while_inactive(
802 OpRequestRef _op)
803{
804 return false;
805}
806
c07f9fc5 807bool ECBackend::_handle_message(
7c673cae
FG
808 OpRequestRef _op)
809{
810 dout(10) << __func__ << ": " << *_op->get_req() << dendl;
811 int priority = _op->get_req()->get_priority();
812 switch (_op->get_req()->get_type()) {
813 case MSG_OSD_EC_WRITE: {
814 // NOTE: this is non-const because handle_sub_write modifies the embedded
815 // ObjectStore::Transaction in place (and then std::move's it). It does
816 // not conflict with ECSubWrite's operator<<.
817 MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
818 _op->get_nonconst_req());
28e407b8 819 parent->maybe_preempt_replica_scrub(op->op.soid);
7c673cae
FG
820 handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
821 return true;
822 }
823 case MSG_OSD_EC_WRITE_REPLY: {
824 const MOSDECSubOpWriteReply *op = static_cast<const MOSDECSubOpWriteReply*>(
825 _op->get_req());
826 handle_sub_write_reply(op->op.from, op->op, _op->pg_trace);
827 return true;
828 }
829 case MSG_OSD_EC_READ: {
9f95a23c 830 auto op = _op->get_req<MOSDECSubOpRead>();
7c673cae
FG
831 MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
832 reply->pgid = get_parent()->primary_spg_t();
11fdf7f2 833 reply->map_epoch = get_osdmap_epoch();
7c673cae
FG
834 reply->min_epoch = get_parent()->get_interval_start_epoch();
835 handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace);
836 reply->trace = _op->pg_trace;
837 get_parent()->send_message_osd_cluster(
9f95a23c 838 reply, _op->get_req()->get_connection());
7c673cae
FG
839 return true;
840 }
841 case MSG_OSD_EC_READ_REPLY: {
842 // NOTE: this is non-const because handle_sub_read_reply steals resulting
843 // buffers. It does not conflict with ECSubReadReply operator<<.
844 MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
845 _op->get_nonconst_req());
846 RecoveryMessages rm;
847 handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
848 dispatch_recovery_messages(rm, priority);
849 return true;
850 }
851 case MSG_OSD_PG_PUSH: {
9f95a23c 852 auto op = _op->get_req<MOSDPGPush>();
7c673cae
FG
853 RecoveryMessages rm;
854 for (vector<PushOp>::const_iterator i = op->pushes.begin();
855 i != op->pushes.end();
856 ++i) {
11fdf7f2 857 handle_recovery_push(*i, &rm, op->is_repair);
7c673cae
FG
858 }
859 dispatch_recovery_messages(rm, priority);
860 return true;
861 }
862 case MSG_OSD_PG_PUSH_REPLY: {
863 const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
864 _op->get_req());
865 RecoveryMessages rm;
866 for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
867 i != op->replies.end();
868 ++i) {
869 handle_recovery_push_reply(*i, op->from, &rm);
870 }
871 dispatch_recovery_messages(rm, priority);
872 return true;
873 }
874 default:
875 return false;
876 }
877 return false;
878}
879
880struct SubWriteCommitted : public Context {
881 ECBackend *pg;
882 OpRequestRef msg;
883 ceph_tid_t tid;
884 eversion_t version;
885 eversion_t last_complete;
886 const ZTracer::Trace trace;
887 SubWriteCommitted(
888 ECBackend *pg,
889 OpRequestRef msg,
890 ceph_tid_t tid,
891 eversion_t version,
892 eversion_t last_complete,
893 const ZTracer::Trace &trace)
894 : pg(pg), msg(msg), tid(tid),
895 version(version), last_complete(last_complete), trace(trace) {}
896 void finish(int) override {
897 if (msg)
898 msg->mark_event("sub_op_committed");
899 pg->sub_write_committed(tid, version, last_complete, trace);
900 }
901};
902void ECBackend::sub_write_committed(
903 ceph_tid_t tid, eversion_t version, eversion_t last_complete,
904 const ZTracer::Trace &trace) {
905 if (get_parent()->pgb_is_primary()) {
906 ECSubWriteReply reply;
907 reply.tid = tid;
908 reply.last_complete = last_complete;
909 reply.committed = true;
11fdf7f2 910 reply.applied = true;
7c673cae
FG
911 reply.from = get_parent()->whoami_shard();
912 handle_sub_write_reply(
913 get_parent()->whoami_shard(),
914 reply, trace);
915 } else {
916 get_parent()->update_last_complete_ondisk(last_complete);
917 MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
918 r->pgid = get_parent()->primary_spg_t();
11fdf7f2 919 r->map_epoch = get_osdmap_epoch();
7c673cae
FG
920 r->min_epoch = get_parent()->get_interval_start_epoch();
921 r->op.tid = tid;
922 r->op.last_complete = last_complete;
923 r->op.committed = true;
11fdf7f2 924 r->op.applied = true;
7c673cae
FG
925 r->op.from = get_parent()->whoami_shard();
926 r->set_priority(CEPH_MSG_PRIO_HIGH);
927 r->trace = trace;
928 r->trace.event("sending sub op commit");
929 get_parent()->send_message_osd_cluster(
11fdf7f2 930 get_parent()->primary_shard().osd, r, get_osdmap_epoch());
7c673cae
FG
931 }
932}
933
934void ECBackend::handle_sub_write(
935 pg_shard_t from,
936 OpRequestRef msg,
937 ECSubWrite &op,
11fdf7f2 938 const ZTracer::Trace &trace)
7c673cae
FG
939{
940 if (msg)
11fdf7f2 941 msg->mark_event("sub_op_started");
7c673cae 942 trace.event("handle_sub_write");
f67539c2
TL
943#ifdef HAVE_JAEGER
944 if (msg->osd_parent_span) {
945 auto ec_sub_trans = jaeger_tracing::child_span(__func__, msg->osd_parent_span);
946 }
947#endif
7c673cae
FG
948 if (!get_parent()->pgb_is_primary())
949 get_parent()->update_stats(op.stats);
950 ObjectStore::Transaction localt;
951 if (!op.temp_added.empty()) {
952 add_temp_objs(op.temp_added);
953 }
11fdf7f2 954 if (op.backfill_or_async_recovery) {
7c673cae
FG
955 for (set<hobject_t>::iterator i = op.temp_removed.begin();
956 i != op.temp_removed.end();
957 ++i) {
958 dout(10) << __func__ << ": removing object " << *i
959 << " since we won't get the transaction" << dendl;
960 localt.remove(
961 coll,
962 ghobject_t(
963 *i,
964 ghobject_t::NO_GEN,
965 get_parent()->whoami_shard().shard));
966 }
967 }
968 clear_temp_objs(op.temp_removed);
11fdf7f2
TL
969 dout(30) << __func__ << " missing before " << get_parent()->get_log().get_missing().get_items() << dendl;
970 // flag set to true during async recovery
971 bool async = false;
972 pg_missing_tracker_t pmissing = get_parent()->get_local_missing();
973 if (pmissing.is_missing(op.soid)) {
974 async = true;
975 dout(30) << __func__ << " is_missing " << pmissing.is_missing(op.soid) << dendl;
976 for (auto &&e: op.log_entries) {
977 dout(30) << " add_next_event entry " << e << dendl;
978 get_parent()->add_local_next_event(e);
979 dout(30) << " entry is_delete " << e.is_delete() << dendl;
980 }
981 }
7c673cae 982 get_parent()->log_operation(
f67539c2 983 std::move(op.log_entries),
7c673cae
FG
984 op.updated_hit_set_history,
985 op.trim_to,
986 op.roll_forward_to,
9f95a23c 987 op.roll_forward_to,
11fdf7f2
TL
988 !op.backfill_or_async_recovery,
989 localt,
990 async);
7c673cae 991
11fdf7f2
TL
992 if (!get_parent()->pg_is_undersized() &&
993 (unsigned)get_parent()->whoami_shard().shard >=
994 ec_impl->get_data_chunk_count())
7c673cae
FG
995 op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
996
7c673cae
FG
997 localt.register_on_commit(
998 get_parent()->bless_context(
999 new SubWriteCommitted(
1000 this, msg, op.tid,
1001 op.at_version,
1002 get_parent()->get_info().last_complete, trace)));
7c673cae
FG
1003 vector<ObjectStore::Transaction> tls;
1004 tls.reserve(2);
1005 tls.push_back(std::move(op.t));
1006 tls.push_back(std::move(localt));
1007 get_parent()->queue_transactions(tls, msg);
11fdf7f2
TL
1008 dout(30) << __func__ << " missing after" << get_parent()->get_log().get_missing().get_items() << dendl;
1009 if (op.at_version != eversion_t()) {
1010 // dummy rollforward transaction doesn't get at_version (and doesn't advance it)
1011 get_parent()->op_applied(op.at_version);
1012 }
7c673cae
FG
1013}
1014
1015void ECBackend::handle_sub_read(
1016 pg_shard_t from,
1017 const ECSubRead &op,
1018 ECSubReadReply *reply,
1019 const ZTracer::Trace &trace)
1020{
1021 trace.event("handle sub read");
1022 shard_id_t shard = get_parent()->whoami_shard().shard;
1023 for(auto i = op.to_read.begin();
1024 i != op.to_read.end();
1025 ++i) {
1026 int r = 0;
7c673cae
FG
1027 for (auto j = i->second.begin(); j != i->second.end(); ++j) {
1028 bufferlist bl;
11fdf7f2
TL
1029 if ((op.subchunks.find(i->first)->second.size() == 1) &&
1030 (op.subchunks.find(i->first)->second.front().second ==
1031 ec_impl->get_sub_chunk_count())) {
1032 dout(25) << __func__ << " case1: reading the complete chunk/shard." << dendl;
1033 r = store->read(
1034 ch,
1035 ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1036 j->get<0>(),
1037 j->get<1>(),
1038 bl, j->get<2>()); // Allow EIO return
1039 } else {
1040 dout(25) << __func__ << " case2: going to do fragmented read." << dendl;
1041 int subchunk_size =
1042 sinfo.get_chunk_size() / ec_impl->get_sub_chunk_count();
1043 bool error = false;
1044 for (int m = 0; m < (int)j->get<1>() && !error;
1045 m += sinfo.get_chunk_size()) {
1046 for (auto &&k:op.subchunks.find(i->first)->second) {
1047 bufferlist bl0;
1048 r = store->read(
1049 ch,
1050 ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1051 j->get<0>() + m + (k.first)*subchunk_size,
1052 (k.second)*subchunk_size,
1053 bl0, j->get<2>());
1054 if (r < 0) {
1055 error = true;
1056 break;
1057 }
1058 bl.claim_append(bl0);
1059 }
1060 }
1061 }
1062
7c673cae 1063 if (r < 0) {
11fdf7f2
TL
1064 // if we are doing fast reads, it's possible for one of the shard
1065 // reads to cross paths with another update and get a (harmless)
1066 // ENOENT. Suppress the message to the cluster log in that case.
1067 if (r == -ENOENT && get_parent()->get_pool().fast_read) {
1068 dout(5) << __func__ << ": Error " << r
1069 << " reading " << i->first << ", fast read, probably ok"
1070 << dendl;
1071 } else {
1072 get_parent()->clog_error() << "Error " << r
1073 << " reading object "
1074 << i->first;
1075 dout(5) << __func__ << ": Error " << r
1076 << " reading " << i->first << dendl;
1077 }
7c673cae
FG
1078 goto error;
1079 } else {
1080 dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
1081 reply->buffers_read[i->first].push_back(
1082 make_pair(
1083 j->get<0>(),
1084 bl)
1085 );
1086 }
1087
1088 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1089 // This shows that we still need deep scrub because large enough files
1090 // are read in sections, so the digest check here won't be done here.
1091 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1092 // the state of our chunk in case other chunks could substitute.
11fdf7f2
TL
1093 ECUtil::HashInfoRef hinfo;
1094 hinfo = get_hash_info(i->first);
1095 if (!hinfo) {
1096 r = -EIO;
1097 get_parent()->clog_error() << "Corruption detected: object "
1098 << i->first
1099 << " is missing hash_info";
1100 dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
1101 goto error;
1102 }
1103 ceph_assert(hinfo->has_chunk_hash());
7c673cae
FG
1104 if ((bl.length() == hinfo->get_total_chunk_size()) &&
1105 (j->get<0>() == 0)) {
1106 dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
1107 bufferhash h(-1);
1108 h << bl;
1109 if (h.digest() != hinfo->get_chunk_hash(shard)) {
c07f9fc5 1110 get_parent()->clog_error() << "Bad hash for " << i->first << " digest 0x"
7c673cae
FG
1111 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec;
1112 dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
1113 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
1114 r = -EIO;
1115 goto error;
1116 }
1117 }
1118 }
1119 }
1120 continue;
1121error:
1122 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1123 // the state of our chunk in case other chunks could substitute.
1124 reply->buffers_read.erase(i->first);
1125 reply->errors[i->first] = r;
1126 }
1127 for (set<hobject_t>::iterator i = op.attrs_to_read.begin();
1128 i != op.attrs_to_read.end();
1129 ++i) {
1130 dout(10) << __func__ << ": fulfilling attr request on "
1131 << *i << dendl;
1132 if (reply->errors.count(*i))
1133 continue;
1134 int r = store->getattrs(
1135 ch,
1136 ghobject_t(
11fdf7f2 1137 *i, ghobject_t::NO_GEN, shard),
7c673cae
FG
1138 reply->attrs_read[*i]);
1139 if (r < 0) {
91327a77
AA
1140 // If we read error, we should not return the attrs too.
1141 reply->attrs_read.erase(*i);
7c673cae
FG
1142 reply->buffers_read.erase(*i);
1143 reply->errors[*i] = r;
1144 }
1145 }
1146 reply->from = get_parent()->whoami_shard();
1147 reply->tid = op.tid;
1148}
1149
1150void ECBackend::handle_sub_write_reply(
1151 pg_shard_t from,
1152 const ECSubWriteReply &op,
1153 const ZTracer::Trace &trace)
1154{
1155 map<ceph_tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
11fdf7f2 1156 ceph_assert(i != tid_to_op_map.end());
7c673cae
FG
1157 if (op.committed) {
1158 trace.event("sub write committed");
11fdf7f2 1159 ceph_assert(i->second.pending_commit.count(from));
7c673cae
FG
1160 i->second.pending_commit.erase(from);
1161 if (from != get_parent()->whoami_shard()) {
1162 get_parent()->update_peer_last_complete_ondisk(from, op.last_complete);
1163 }
1164 }
1165 if (op.applied) {
1166 trace.event("sub write applied");
11fdf7f2 1167 ceph_assert(i->second.pending_apply.count(from));
7c673cae
FG
1168 i->second.pending_apply.erase(from);
1169 }
1170
11fdf7f2
TL
1171 if (i->second.pending_commit.empty() &&
1172 i->second.on_all_commit &&
1173 // also wait for apply, to preserve ordering with luminous peers.
1174 i->second.pending_apply.empty()) {
7c673cae
FG
1175 dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
1176 i->second.on_all_commit->complete(0);
1177 i->second.on_all_commit = 0;
1178 i->second.trace.event("ec write all committed");
1179 }
1180 check_ops();
1181}
1182
1183void ECBackend::handle_sub_read_reply(
1184 pg_shard_t from,
1185 ECSubReadReply &op,
1186 RecoveryMessages *m,
1187 const ZTracer::Trace &trace)
1188{
1189 trace.event("ec sub read reply");
1190 dout(10) << __func__ << ": reply " << op << dendl;
1191 map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
1192 if (iter == tid_to_read_map.end()) {
1193 //canceled
1194 dout(20) << __func__ << ": dropped " << op << dendl;
1195 return;
1196 }
1197 ReadOp &rop = iter->second;
1198 for (auto i = op.buffers_read.begin();
1199 i != op.buffers_read.end();
1200 ++i) {
11fdf7f2 1201 ceph_assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer
7c673cae
FG
1202 if (!rop.to_read.count(i->first)) {
1203 // We canceled this read! @see filter_read_op
1204 dout(20) << __func__ << " to_read skipping" << dendl;
1205 continue;
1206 }
1207 list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter =
1208 rop.to_read.find(i->first)->second.to_read.begin();
1209 list<
1210 boost::tuple<
1211 uint64_t, uint64_t, map<pg_shard_t, bufferlist> > >::iterator riter =
1212 rop.complete[i->first].returned.begin();
1213 for (list<pair<uint64_t, bufferlist> >::iterator j = i->second.begin();
1214 j != i->second.end();
1215 ++j, ++req_iter, ++riter) {
11fdf7f2
TL
1216 ceph_assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
1217 ceph_assert(riter != rop.complete[i->first].returned.end());
7c673cae
FG
1218 pair<uint64_t, uint64_t> adjusted =
1219 sinfo.aligned_offset_len_to_chunk(
1220 make_pair(req_iter->get<0>(), req_iter->get<1>()));
11fdf7f2 1221 ceph_assert(adjusted.first == j->first);
f67539c2 1222 riter->get<2>()[from] = std::move(j->second);
7c673cae
FG
1223 }
1224 }
1225 for (auto i = op.attrs_read.begin();
1226 i != op.attrs_read.end();
1227 ++i) {
11fdf7f2 1228 ceph_assert(!op.errors.count(i->first)); // if read error better not have sent an attribute
7c673cae
FG
1229 if (!rop.to_read.count(i->first)) {
1230 // We canceled this read! @see filter_read_op
1231 dout(20) << __func__ << " to_read skipping" << dendl;
1232 continue;
1233 }
1234 rop.complete[i->first].attrs = map<string, bufferlist>();
1235 (*(rop.complete[i->first].attrs)).swap(i->second);
1236 }
1237 for (auto i = op.errors.begin();
1238 i != op.errors.end();
1239 ++i) {
1240 rop.complete[i->first].errors.insert(
1241 make_pair(
1242 from,
1243 i->second));
1244 dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl;
1245 }
1246
1247 map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
1248 shard_to_read_map.find(from);
11fdf7f2
TL
1249 ceph_assert(siter != shard_to_read_map.end());
1250 ceph_assert(siter->second.count(op.tid));
7c673cae
FG
1251 siter->second.erase(op.tid);
1252
11fdf7f2 1253 ceph_assert(rop.in_progress.count(from));
7c673cae
FG
1254 rop.in_progress.erase(from);
1255 unsigned is_complete = 0;
f67539c2 1256 bool need_resend = false;
7c673cae
FG
1257 // For redundant reads check for completion as each shard comes in,
1258 // or in a non-recovery read check for completion once all the shards read.
b32b8144 1259 if (rop.do_redundant_reads || rop.in_progress.empty()) {
7c673cae
FG
1260 for (map<hobject_t, read_result_t>::const_iterator iter =
1261 rop.complete.begin();
1262 iter != rop.complete.end();
1263 ++iter) {
1264 set<int> have;
1265 for (map<pg_shard_t, bufferlist>::const_iterator j =
1266 iter->second.returned.front().get<2>().begin();
1267 j != iter->second.returned.front().get<2>().end();
1268 ++j) {
1269 have.insert(j->first.shard);
1270 dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
1271 }
11fdf7f2 1272 map<int, vector<pair<int, int>>> dummy_minimum;
7c673cae 1273 int err;
28e407b8 1274 if ((err = ec_impl->minimum_to_decode(rop.want_to_read[iter->first], have, &dummy_minimum)) < 0) {
7c673cae
FG
1275 dout(20) << __func__ << " minimum_to_decode failed" << dendl;
1276 if (rop.in_progress.empty()) {
11fdf7f2
TL
1277 // If we don't have enough copies, try other pg_shard_ts if available.
1278 // During recovery there may be multiple osds with copies of the same shard,
1279 // so getting EIO from one may result in multiple passes through this code path.
7c673cae
FG
1280 if (!rop.do_redundant_reads) {
1281 int r = send_all_remaining_reads(iter->first, rop);
1282 if (r == 0) {
f67539c2
TL
1283 // We changed the rop's to_read and not incrementing is_complete
1284 need_resend = true;
7c673cae
FG
1285 continue;
1286 }
1287 // Couldn't read any additional shards so handle as completed with errors
1288 }
c07f9fc5
FG
1289 // We don't want to confuse clients / RBD with objectstore error
1290 // values in particular ENOENT. We may have different error returns
1291 // from different shards, so we'll return minimum_to_decode() error
1292 // (usually EIO) to reader. It is likely an error here is due to a
1293 // damaged pg.
7c673cae
FG
1294 rop.complete[iter->first].r = err;
1295 ++is_complete;
1296 }
1297 } else {
11fdf7f2 1298 ceph_assert(rop.complete[iter->first].r == 0);
7c673cae
FG
1299 if (!rop.complete[iter->first].errors.empty()) {
1300 if (cct->_conf->osd_read_ec_check_for_errors) {
1301 dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl;
1302 err = rop.complete[iter->first].errors.begin()->second;
1303 rop.complete[iter->first].r = err;
1304 } else {
c07f9fc5 1305 get_parent()->clog_warn() << "Error(s) ignored for "
7c673cae
FG
1306 << iter->first << " enough copies available";
1307 dout(10) << __func__ << " Error(s) ignored for " << iter->first
1308 << " enough copies available" << dendl;
1309 rop.complete[iter->first].errors.clear();
1310 }
1311 }
f67539c2
TL
1312 // avoid re-read for completed object as we may send remaining reads for uncopmpleted objects
1313 rop.to_read.at(iter->first).need.clear();
1314 rop.to_read.at(iter->first).want_attrs = false;
7c673cae
FG
1315 ++is_complete;
1316 }
1317 }
1318 }
f67539c2
TL
1319 if (need_resend) {
1320 do_read_op(rop);
1321 } else if (rop.in_progress.empty() ||
1322 is_complete == rop.complete.size()) {
7c673cae
FG
1323 dout(20) << __func__ << " Complete: " << rop << dendl;
1324 rop.trace.event("ec read complete");
1325 complete_read_op(rop, m);
1326 } else {
1327 dout(10) << __func__ << " readop not complete: " << rop << dendl;
1328 }
1329}
1330
1331void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
1332{
1333 map<hobject_t, read_request_t>::iterator reqiter =
1334 rop.to_read.begin();
1335 map<hobject_t, read_result_t>::iterator resiter =
1336 rop.complete.begin();
11fdf7f2 1337 ceph_assert(rop.to_read.size() == rop.complete.size());
7c673cae
FG
1338 for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
1339 if (reqiter->second.cb) {
1340 pair<RecoveryMessages *, read_result_t &> arg(
1341 m, resiter->second);
1342 reqiter->second.cb->complete(arg);
11fdf7f2 1343 reqiter->second.cb = nullptr;
7c673cae
FG
1344 }
1345 }
11fdf7f2
TL
1346 // if the read op is over. clean all the data of this tid.
1347 for (set<pg_shard_t>::iterator iter = rop.in_progress.begin();
1348 iter != rop.in_progress.end();
1349 iter++) {
1350 shard_to_read_map[*iter].erase(rop.tid);
1351 }
1352 rop.in_progress.clear();
7c673cae
FG
1353 tid_to_read_map.erase(rop.tid);
1354}
1355
1356struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
1357 ECBackend *ec;
1358 ceph_tid_t tid;
1359 FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
1360 void finish(ThreadPool::TPHandle &handle) override {
1361 auto ropiter = ec->tid_to_read_map.find(tid);
11fdf7f2 1362 ceph_assert(ropiter != ec->tid_to_read_map.end());
7c673cae
FG
1363 int priority = ropiter->second.priority;
1364 RecoveryMessages rm;
1365 ec->complete_read_op(ropiter->second, &rm);
1366 ec->dispatch_recovery_messages(rm, priority);
1367 }
1368};
1369
1370void ECBackend::filter_read_op(
1371 const OSDMapRef& osdmap,
1372 ReadOp &op)
1373{
1374 set<hobject_t> to_cancel;
1375 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1376 i != op.source_to_obj.end();
1377 ++i) {
1378 if (osdmap->is_down(i->first.osd)) {
1379 to_cancel.insert(i->second.begin(), i->second.end());
1380 op.in_progress.erase(i->first);
1381 continue;
1382 }
1383 }
1384
1385 if (to_cancel.empty())
1386 return;
1387
1388 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1389 i != op.source_to_obj.end();
1390 ) {
1391 for (set<hobject_t>::iterator j = i->second.begin();
1392 j != i->second.end();
1393 ) {
1394 if (to_cancel.count(*j))
1395 i->second.erase(j++);
1396 else
1397 ++j;
1398 }
1399 if (i->second.empty()) {
1400 op.source_to_obj.erase(i++);
1401 } else {
11fdf7f2 1402 ceph_assert(!osdmap->is_down(i->first.osd));
7c673cae
FG
1403 ++i;
1404 }
1405 }
1406
1407 for (set<hobject_t>::iterator i = to_cancel.begin();
1408 i != to_cancel.end();
1409 ++i) {
1410 get_parent()->cancel_pull(*i);
1411
11fdf7f2 1412 ceph_assert(op.to_read.count(*i));
7c673cae
FG
1413 read_request_t &req = op.to_read.find(*i)->second;
1414 dout(10) << __func__ << ": canceling " << req
1415 << " for obj " << *i << dendl;
11fdf7f2 1416 ceph_assert(req.cb);
7c673cae 1417 delete req.cb;
11fdf7f2 1418 req.cb = nullptr;
7c673cae
FG
1419
1420 op.to_read.erase(*i);
1421 op.complete.erase(*i);
1422 recovery_ops.erase(*i);
1423 }
1424
1425 if (op.in_progress.empty()) {
1426 get_parent()->schedule_recovery_work(
11fdf7f2 1427 get_parent()->bless_unlocked_gencontext(
7c673cae
FG
1428 new FinishReadOp(this, op.tid)));
1429 }
1430}
1431
1432void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
1433{
1434 set<ceph_tid_t> tids_to_filter;
1435 for (map<pg_shard_t, set<ceph_tid_t> >::iterator
1436 i = shard_to_read_map.begin();
1437 i != shard_to_read_map.end();
1438 ) {
1439 if (osdmap->is_down(i->first.osd)) {
1440 tids_to_filter.insert(i->second.begin(), i->second.end());
1441 shard_to_read_map.erase(i++);
1442 } else {
1443 ++i;
1444 }
1445 }
1446 for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
1447 i != tids_to_filter.end();
1448 ++i) {
1449 map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
11fdf7f2 1450 ceph_assert(j != tid_to_read_map.end());
7c673cae
FG
1451 filter_read_op(osdmap, j->second);
1452 }
1453}
1454
1455void ECBackend::on_change()
1456{
1457 dout(10) << __func__ << dendl;
1458
1459 completed_to = eversion_t();
1460 committed_to = eversion_t();
1461 pipeline_state.clear();
1462 waiting_reads.clear();
1463 waiting_state.clear();
1464 waiting_commit.clear();
1465 for (auto &&op: tid_to_op_map) {
1466 cache.release_write_pin(op.second.pin);
1467 }
1468 tid_to_op_map.clear();
1469
1470 for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
1471 i != tid_to_read_map.end();
1472 ++i) {
1473 dout(10) << __func__ << ": cancelling " << i->second << dendl;
1474 for (map<hobject_t, read_request_t>::iterator j =
1475 i->second.to_read.begin();
1476 j != i->second.to_read.end();
1477 ++j) {
1478 delete j->second.cb;
11fdf7f2 1479 j->second.cb = nullptr;
7c673cae
FG
1480 }
1481 }
1482 tid_to_read_map.clear();
1483 in_progress_client_reads.clear();
1484 shard_to_read_map.clear();
1485 clear_recovery_state();
1486}
1487
1488void ECBackend::clear_recovery_state()
1489{
1490 recovery_ops.clear();
1491}
1492
7c673cae
FG
1493void ECBackend::dump_recovery_info(Formatter *f) const
1494{
1495 f->open_array_section("recovery_ops");
1496 for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
1497 i != recovery_ops.end();
1498 ++i) {
1499 f->open_object_section("op");
1500 i->second.dump(f);
1501 f->close_section();
1502 }
1503 f->close_section();
1504 f->open_array_section("read_ops");
1505 for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
1506 i != tid_to_read_map.end();
1507 ++i) {
1508 f->open_object_section("read_op");
1509 i->second.dump(f);
1510 f->close_section();
1511 }
1512 f->close_section();
1513}
1514
1515void ECBackend::submit_transaction(
1516 const hobject_t &hoid,
1517 const object_stat_sum_t &delta_stats,
1518 const eversion_t &at_version,
1519 PGTransactionUPtr &&t,
1520 const eversion_t &trim_to,
9f95a23c 1521 const eversion_t &min_last_complete_ondisk,
f67539c2 1522 vector<pg_log_entry_t>&& log_entries,
9f95a23c 1523 std::optional<pg_hit_set_history_t> &hset_history,
7c673cae
FG
1524 Context *on_all_commit,
1525 ceph_tid_t tid,
1526 osd_reqid_t reqid,
1527 OpRequestRef client_op
1528 )
1529{
11fdf7f2 1530 ceph_assert(!tid_to_op_map.count(tid));
7c673cae
FG
1531 Op *op = &(tid_to_op_map[tid]);
1532 op->hoid = hoid;
1533 op->delta_stats = delta_stats;
1534 op->version = at_version;
1535 op->trim_to = trim_to;
9f95a23c 1536 op->roll_forward_to = std::max(min_last_complete_ondisk, committed_to);
7c673cae
FG
1537 op->log_entries = log_entries;
1538 std::swap(op->updated_hit_set_history, hset_history);
7c673cae
FG
1539 op->on_all_commit = on_all_commit;
1540 op->tid = tid;
1541 op->reqid = reqid;
1542 op->client_op = client_op;
1543 if (client_op)
1544 op->trace = client_op->pg_trace;
f67539c2
TL
1545
1546#ifdef HAVE_JAEGER
1547 if (client_op->osd_parent_span) {
1548 auto ec_sub_trans = jaeger_tracing::child_span("ECBackend::submit_transaction", client_op->osd_parent_span);
1549 }
1550#endif
7c673cae
FG
1551 dout(10) << __func__ << ": op " << *op << " starting" << dendl;
1552 start_rmw(op, std::move(t));
7c673cae
FG
1553}
1554
1555void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
1556 if (!waiting_state.empty()) {
1557 waiting_state.back().on_write.emplace_back(std::move(cb));
1558 } else if (!waiting_reads.empty()) {
1559 waiting_reads.back().on_write.emplace_back(std::move(cb));
1560 } else {
1561 // Nothing earlier in the pipeline, just call it
1562 cb();
1563 }
1564}
1565
b32b8144 1566void ECBackend::get_all_avail_shards(
7c673cae 1567 const hobject_t &hoid,
28e407b8 1568 const set<pg_shard_t> &error_shards,
b32b8144
FG
1569 set<int> &have,
1570 map<shard_id_t, pg_shard_t> &shards,
1571 bool for_recovery)
7c673cae 1572{
7c673cae
FG
1573 for (set<pg_shard_t>::const_iterator i =
1574 get_parent()->get_acting_shards().begin();
1575 i != get_parent()->get_acting_shards().end();
1576 ++i) {
1577 dout(10) << __func__ << ": checking acting " << *i << dendl;
1578 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
28e407b8
AA
1579 if (error_shards.find(*i) != error_shards.end())
1580 continue;
7c673cae 1581 if (!missing.is_missing(hoid)) {
11fdf7f2 1582 ceph_assert(!have.count(i->shard));
7c673cae 1583 have.insert(i->shard);
11fdf7f2 1584 ceph_assert(!shards.count(i->shard));
7c673cae
FG
1585 shards.insert(make_pair(i->shard, *i));
1586 }
1587 }
1588
1589 if (for_recovery) {
1590 for (set<pg_shard_t>::const_iterator i =
1591 get_parent()->get_backfill_shards().begin();
1592 i != get_parent()->get_backfill_shards().end();
1593 ++i) {
28e407b8
AA
1594 if (error_shards.find(*i) != error_shards.end())
1595 continue;
7c673cae 1596 if (have.count(i->shard)) {
11fdf7f2 1597 ceph_assert(shards.count(i->shard));
7c673cae
FG
1598 continue;
1599 }
1600 dout(10) << __func__ << ": checking backfill " << *i << dendl;
11fdf7f2 1601 ceph_assert(!shards.count(i->shard));
7c673cae
FG
1602 const pg_info_t &info = get_parent()->get_shard_info(*i);
1603 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1604 if (hoid < info.last_backfill &&
1605 !missing.is_missing(hoid)) {
1606 have.insert(i->shard);
1607 shards.insert(make_pair(i->shard, *i));
1608 }
1609 }
1610
1611 map<hobject_t, set<pg_shard_t>>::const_iterator miter =
1612 get_parent()->get_missing_loc_shards().find(hoid);
1613 if (miter != get_parent()->get_missing_loc_shards().end()) {
1614 for (set<pg_shard_t>::iterator i = miter->second.begin();
1615 i != miter->second.end();
1616 ++i) {
1617 dout(10) << __func__ << ": checking missing_loc " << *i << dendl;
1618 auto m = get_parent()->maybe_get_shard_missing(*i);
1619 if (m) {
11fdf7f2 1620 ceph_assert(!(*m).is_missing(hoid));
7c673cae 1621 }
28e407b8
AA
1622 if (error_shards.find(*i) != error_shards.end())
1623 continue;
7c673cae
FG
1624 have.insert(i->shard);
1625 shards.insert(make_pair(i->shard, *i));
1626 }
1627 }
1628 }
b32b8144
FG
1629}
1630
1631int ECBackend::get_min_avail_to_read_shards(
1632 const hobject_t &hoid,
1633 const set<int> &want,
1634 bool for_recovery,
1635 bool do_redundant_reads,
11fdf7f2 1636 map<pg_shard_t, vector<pair<int, int>>> *to_read)
b32b8144
FG
1637{
1638 // Make sure we don't do redundant reads for recovery
11fdf7f2 1639 ceph_assert(!for_recovery || !do_redundant_reads);
b32b8144
FG
1640
1641 set<int> have;
1642 map<shard_id_t, pg_shard_t> shards;
28e407b8 1643 set<pg_shard_t> error_shards;
b32b8144 1644
28e407b8 1645 get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
7c673cae 1646
11fdf7f2 1647 map<int, vector<pair<int, int>>> need;
7c673cae
FG
1648 int r = ec_impl->minimum_to_decode(want, have, &need);
1649 if (r < 0)
1650 return r;
1651
1652 if (do_redundant_reads) {
11fdf7f2
TL
1653 vector<pair<int, int>> subchunks_list;
1654 subchunks_list.push_back(make_pair(0, ec_impl->get_sub_chunk_count()));
1655 for (auto &&i: have) {
1656 need[i] = subchunks_list;
1657 }
7c673cae
FG
1658 }
1659
1660 if (!to_read)
1661 return 0;
1662
11fdf7f2
TL
1663 for (auto &&i:need) {
1664 ceph_assert(shards.count(shard_id_t(i.first)));
1665 to_read->insert(make_pair(shards[shard_id_t(i.first)], i.second));
7c673cae
FG
1666 }
1667 return 0;
1668}
1669
1670int ECBackend::get_remaining_shards(
1671 const hobject_t &hoid,
1672 const set<int> &avail,
28e407b8
AA
1673 const set<int> &want,
1674 const read_result_t &result,
11fdf7f2 1675 map<pg_shard_t, vector<pair<int, int>>> *to_read,
b32b8144 1676 bool for_recovery)
7c673cae 1677{
11fdf7f2 1678 ceph_assert(to_read);
7c673cae 1679
b32b8144
FG
1680 set<int> have;
1681 map<shard_id_t, pg_shard_t> shards;
28e407b8
AA
1682 set<pg_shard_t> error_shards;
1683 for (auto &p : result.errors) {
1684 error_shards.insert(p.first);
1685 }
1686
1687 get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
1688
11fdf7f2 1689 map<int, vector<pair<int, int>>> need;
28e407b8
AA
1690 int r = ec_impl->minimum_to_decode(want, have, &need);
1691 if (r < 0) {
1692 dout(0) << __func__ << " not enough shards left to try for " << hoid
1693 << " read result was " << result << dendl;
1694 return -EIO;
1695 }
7c673cae 1696
28e407b8
AA
1697 set<int> shards_left;
1698 for (auto p : need) {
11fdf7f2
TL
1699 if (avail.find(p.first) == avail.end()) {
1700 shards_left.insert(p.first);
28e407b8
AA
1701 }
1702 }
7c673cae 1703
11fdf7f2
TL
1704 vector<pair<int, int>> subchunks;
1705 subchunks.push_back(make_pair(0, ec_impl->get_sub_chunk_count()));
28e407b8
AA
1706 for (set<int>::iterator i = shards_left.begin();
1707 i != shards_left.end();
7c673cae 1708 ++i) {
11fdf7f2
TL
1709 ceph_assert(shards.count(shard_id_t(*i)));
1710 ceph_assert(avail.find(*i) == avail.end());
1711 to_read->insert(make_pair(shards[shard_id_t(*i)], subchunks));
7c673cae
FG
1712 }
1713 return 0;
1714}
1715
1716void ECBackend::start_read_op(
1717 int priority,
28e407b8 1718 map<hobject_t, set<int>> &want_to_read,
7c673cae
FG
1719 map<hobject_t, read_request_t> &to_read,
1720 OpRequestRef _op,
1721 bool do_redundant_reads,
1722 bool for_recovery)
1723{
1724 ceph_tid_t tid = get_parent()->get_tid();
11fdf7f2 1725 ceph_assert(!tid_to_read_map.count(tid));
7c673cae
FG
1726 auto &op = tid_to_read_map.emplace(
1727 tid,
1728 ReadOp(
1729 priority,
1730 tid,
1731 do_redundant_reads,
1732 for_recovery,
1733 _op,
28e407b8 1734 std::move(want_to_read),
7c673cae
FG
1735 std::move(to_read))).first->second;
1736 dout(10) << __func__ << ": starting " << op << dendl;
1737 if (_op) {
1738 op.trace = _op->pg_trace;
1739 op.trace.event("start ec read");
1740 }
1741 do_read_op(op);
1742}
1743
1744void ECBackend::do_read_op(ReadOp &op)
1745{
1746 int priority = op.priority;
1747 ceph_tid_t tid = op.tid;
1748
1749 dout(10) << __func__ << ": starting read " << op << dendl;
1750
1751 map<pg_shard_t, ECSubRead> messages;
1752 for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
1753 i != op.to_read.end();
1754 ++i) {
1755 bool need_attrs = i->second.want_attrs;
11fdf7f2
TL
1756
1757 for (auto j = i->second.need.begin();
7c673cae
FG
1758 j != i->second.need.end();
1759 ++j) {
1760 if (need_attrs) {
11fdf7f2 1761 messages[j->first].attrs_to_read.insert(i->first);
7c673cae
FG
1762 need_attrs = false;
1763 }
11fdf7f2
TL
1764 messages[j->first].subchunks[i->first] = j->second;
1765 op.obj_to_source[i->first].insert(j->first);
1766 op.source_to_obj[j->first].insert(i->first);
7c673cae
FG
1767 }
1768 for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
1769 i->second.to_read.begin();
1770 j != i->second.to_read.end();
1771 ++j) {
1772 pair<uint64_t, uint64_t> chunk_off_len =
1773 sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
11fdf7f2 1774 for (auto k = i->second.need.begin();
7c673cae
FG
1775 k != i->second.need.end();
1776 ++k) {
11fdf7f2 1777 messages[k->first].to_read[i->first].push_back(
7c673cae
FG
1778 boost::make_tuple(
1779 chunk_off_len.first,
1780 chunk_off_len.second,
1781 j->get<2>()));
1782 }
11fdf7f2 1783 ceph_assert(!need_attrs);
7c673cae
FG
1784 }
1785 }
1786
9f95a23c
TL
1787 std::vector<std::pair<int, Message*>> m;
1788 m.reserve(messages.size());
7c673cae
FG
1789 for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
1790 i != messages.end();
1791 ++i) {
1792 op.in_progress.insert(i->first);
1793 shard_to_read_map[i->first].insert(op.tid);
1794 i->second.tid = tid;
1795 MOSDECSubOpRead *msg = new MOSDECSubOpRead;
1796 msg->set_priority(priority);
1797 msg->pgid = spg_t(
1798 get_parent()->whoami_spg_t().pgid,
1799 i->first.shard);
11fdf7f2 1800 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
1801 msg->min_epoch = get_parent()->get_interval_start_epoch();
1802 msg->op = i->second;
1803 msg->op.from = get_parent()->whoami_shard();
1804 msg->op.tid = tid;
1805 if (op.trace) {
1806 // initialize a child span for this shard
1807 msg->trace.init("ec sub read", nullptr, &op.trace);
1808 msg->trace.keyval("shard", i->first.shard.id);
1809 }
9f95a23c 1810 m.push_back(std::make_pair(i->first.osd, msg));
7c673cae 1811 }
9f95a23c
TL
1812 if (!m.empty()) {
1813 get_parent()->send_message_osd_cluster(m, get_osdmap_epoch());
1814 }
1815
7c673cae
FG
1816 dout(10) << __func__ << ": started " << op << dendl;
1817}
1818
1819ECUtil::HashInfoRef ECBackend::get_hash_info(
1820 const hobject_t &hoid, bool checks, const map<string,bufferptr> *attrs)
1821{
1822 dout(10) << __func__ << ": Getting attr on " << hoid << dendl;
1823 ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid);
1824 if (!ref) {
1825 dout(10) << __func__ << ": not in cache " << hoid << dendl;
1826 struct stat st;
1827 int r = store->stat(
1828 ch,
1829 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1830 &st);
1831 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
1832 // XXX: What does it mean if there is no object on disk?
1833 if (r >= 0) {
1834 dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl;
1835 bufferlist bl;
1836 if (attrs) {
1837 map<string, bufferptr>::const_iterator k = attrs->find(ECUtil::get_hinfo_key());
1838 if (k == attrs->end()) {
1839 dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl;
1840 } else {
1841 bl.push_back(k->second);
1842 }
1843 } else {
1844 r = store->getattr(
1845 ch,
1846 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1847 ECUtil::get_hinfo_key(),
1848 bl);
1849 if (r < 0) {
1850 dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl;
1851 bl.clear(); // just in case
1852 }
1853 }
1854 if (bl.length() > 0) {
11fdf7f2 1855 auto bp = bl.cbegin();
94b18763 1856 try {
11fdf7f2 1857 decode(hinfo, bp);
94b18763
FG
1858 } catch(...) {
1859 dout(0) << __func__ << ": Can't decode hinfo for " << hoid << dendl;
1860 return ECUtil::HashInfoRef();
1861 }
7c673cae
FG
1862 if (checks && hinfo.get_total_chunk_size() != (uint64_t)st.st_size) {
1863 dout(0) << __func__ << ": Mismatch of total_chunk_size "
1864 << hinfo.get_total_chunk_size() << dendl;
1865 return ECUtil::HashInfoRef();
1866 }
1867 } else if (st.st_size > 0) { // If empty object and no hinfo, create it
1868 return ECUtil::HashInfoRef();
1869 }
1870 }
1871 ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
1872 }
1873 return ref;
1874}
1875
1876void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
1877{
11fdf7f2 1878 ceph_assert(op);
7c673cae
FG
1879
1880 op->plan = ECTransaction::get_write_plan(
1881 sinfo,
1882 std::move(t),
1883 [&](const hobject_t &i) {
1884 ECUtil::HashInfoRef ref = get_hash_info(i, false);
1885 if (!ref) {
1886 derr << __func__ << ": get_hash_info(" << i << ")"
1887 << " returned a null pointer and there is no "
1888 << " way to recover from such an error in this "
1889 << " context" << dendl;
1890 ceph_abort();
1891 }
1892 return ref;
1893 },
1894 get_parent()->get_dpp());
1895
1896 dout(10) << __func__ << ": " << *op << dendl;
1897
1898 waiting_state.push_back(*op);
1899 check_ops();
1900}
1901
1902bool ECBackend::try_state_to_reads()
1903{
1904 if (waiting_state.empty())
1905 return false;
1906
1907 Op *op = &(waiting_state.front());
1908 if (op->requires_rmw() && pipeline_state.cache_invalid()) {
11fdf7f2 1909 ceph_assert(get_parent()->get_pool().allows_ecoverwrites());
7c673cae
FG
1910 dout(20) << __func__ << ": blocking " << *op
1911 << " because it requires an rmw and the cache is invalid "
1912 << pipeline_state
1913 << dendl;
1914 return false;
1915 }
1916
11fdf7f2
TL
1917 if (!pipeline_state.caching_enabled()) {
1918 op->using_cache = false;
1919 } else if (op->invalidates_cache()) {
7c673cae
FG
1920 dout(20) << __func__ << ": invalidating cache after this op"
1921 << dendl;
1922 pipeline_state.invalidate();
7c673cae
FG
1923 }
1924
1925 waiting_state.pop_front();
1926 waiting_reads.push_back(*op);
1927
1928 if (op->using_cache) {
1929 cache.open_write_pin(op->pin);
1930
1931 extent_set empty;
1932 for (auto &&hpair: op->plan.will_write) {
1933 auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
1934 const extent_set &to_read_plan =
1935 to_read_plan_iter == op->plan.to_read.end() ?
1936 empty :
1937 to_read_plan_iter->second;
1938
1939 extent_set remote_read = cache.reserve_extents_for_rmw(
1940 hpair.first,
1941 op->pin,
1942 hpair.second,
1943 to_read_plan);
1944
1945 extent_set pending_read = to_read_plan;
1946 pending_read.subtract(remote_read);
1947
1948 if (!remote_read.empty()) {
1949 op->remote_read[hpair.first] = std::move(remote_read);
1950 }
1951 if (!pending_read.empty()) {
1952 op->pending_read[hpair.first] = std::move(pending_read);
1953 }
1954 }
1955 } else {
1956 op->remote_read = op->plan.to_read;
1957 }
1958
1959 dout(10) << __func__ << ": " << *op << dendl;
1960
1961 if (!op->remote_read.empty()) {
11fdf7f2 1962 ceph_assert(get_parent()->get_pool().allows_ecoverwrites());
7c673cae
FG
1963 objects_read_async_no_cache(
1964 op->remote_read,
1965 [this, op](map<hobject_t,pair<int, extent_map> > &&results) {
1966 for (auto &&i: results) {
1967 op->remote_read_result.emplace(i.first, i.second.second);
1968 }
1969 check_ops();
1970 });
1971 }
1972
1973 return true;
1974}
1975
1976bool ECBackend::try_reads_to_commit()
1977{
1978 if (waiting_reads.empty())
1979 return false;
1980 Op *op = &(waiting_reads.front());
1981 if (op->read_in_progress())
1982 return false;
1983 waiting_reads.pop_front();
1984 waiting_commit.push_back(*op);
1985
1986 dout(10) << __func__ << ": starting commit on " << *op << dendl;
1987 dout(20) << __func__ << ": " << cache << dendl;
1988
1989 get_parent()->apply_stats(
1990 op->hoid,
1991 op->delta_stats);
1992
1993 if (op->using_cache) {
1994 for (auto &&hpair: op->pending_read) {
1995 op->remote_read_result[hpair.first].insert(
1996 cache.get_remaining_extents_for_rmw(
1997 hpair.first,
1998 op->pin,
1999 hpair.second));
2000 }
2001 op->pending_read.clear();
2002 } else {
11fdf7f2 2003 ceph_assert(op->pending_read.empty());
7c673cae
FG
2004 }
2005
2006 map<shard_id_t, ObjectStore::Transaction> trans;
2007 for (set<pg_shard_t>::const_iterator i =
11fdf7f2
TL
2008 get_parent()->get_acting_recovery_backfill_shards().begin();
2009 i != get_parent()->get_acting_recovery_backfill_shards().end();
7c673cae
FG
2010 ++i) {
2011 trans[i->shard];
2012 }
2013
2014 op->trace.event("start ec write");
2015
2016 map<hobject_t,extent_map> written;
2017 if (op->plan.t) {
2018 ECTransaction::generate_transactions(
2019 op->plan,
2020 ec_impl,
2021 get_parent()->get_info().pgid.pgid,
7c673cae
FG
2022 sinfo,
2023 op->remote_read_result,
2024 op->log_entries,
2025 &written,
2026 &trans,
2027 &(op->temp_added),
2028 &(op->temp_cleared),
9f95a23c
TL
2029 get_parent()->get_dpp(),
2030 get_osdmap()->require_osd_release);
7c673cae
FG
2031 }
2032
2033 dout(20) << __func__ << ": " << cache << dendl;
2034 dout(20) << __func__ << ": written: " << written << dendl;
2035 dout(20) << __func__ << ": op: " << *op << dendl;
2036
2037 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2038 for (auto &&i: op->log_entries) {
2039 if (i.requires_kraken()) {
2040 derr << __func__ << ": log entry " << i << " requires kraken"
2041 << " but overwrites are not enabled!" << dendl;
2042 ceph_abort();
2043 }
2044 }
2045 }
2046
2047 map<hobject_t,extent_set> written_set;
2048 for (auto &&i: written) {
2049 written_set[i.first] = i.second.get_interval_set();
2050 }
2051 dout(20) << __func__ << ": written_set: " << written_set << dendl;
11fdf7f2 2052 ceph_assert(written_set == op->plan.will_write);
7c673cae
FG
2053
2054 if (op->using_cache) {
2055 for (auto &&hpair: written) {
2056 dout(20) << __func__ << ": " << hpair << dendl;
2057 cache.present_rmw_update(hpair.first, op->pin, hpair.second);
2058 }
2059 }
2060 op->remote_read.clear();
2061 op->remote_read_result.clear();
2062
7c673cae
FG
2063 ObjectStore::Transaction empty;
2064 bool should_write_local = false;
2065 ECSubWrite local_write_op;
9f95a23c
TL
2066 std::vector<std::pair<int, Message*>> messages;
2067 messages.reserve(get_parent()->get_acting_recovery_backfill_shards().size());
11fdf7f2 2068 set<pg_shard_t> backfill_shards = get_parent()->get_backfill_shards();
7c673cae 2069 for (set<pg_shard_t>::const_iterator i =
11fdf7f2
TL
2070 get_parent()->get_acting_recovery_backfill_shards().begin();
2071 i != get_parent()->get_acting_recovery_backfill_shards().end();
7c673cae
FG
2072 ++i) {
2073 op->pending_apply.insert(*i);
2074 op->pending_commit.insert(*i);
2075 map<shard_id_t, ObjectStore::Transaction>::iterator iter =
2076 trans.find(i->shard);
11fdf7f2 2077 ceph_assert(iter != trans.end());
7c673cae
FG
2078 bool should_send = get_parent()->should_send_op(*i, op->hoid);
2079 const pg_stat_t &stats =
11fdf7f2 2080 (should_send || !backfill_shards.count(*i)) ?
7c673cae
FG
2081 get_info().stats :
2082 parent->get_shard_info().find(*i)->second.stats;
2083
2084 ECSubWrite sop(
2085 get_parent()->whoami_shard(),
2086 op->tid,
2087 op->reqid,
2088 op->hoid,
2089 stats,
2090 should_send ? iter->second : empty,
2091 op->version,
2092 op->trim_to,
2093 op->roll_forward_to,
2094 op->log_entries,
2095 op->updated_hit_set_history,
2096 op->temp_added,
2097 op->temp_cleared,
2098 !should_send);
2099
2100 ZTracer::Trace trace;
2101 if (op->trace) {
2102 // initialize a child span for this shard
2103 trace.init("ec sub write", nullptr, &op->trace);
2104 trace.keyval("shard", i->shard.id);
2105 }
2106
2107 if (*i == get_parent()->whoami_shard()) {
2108 should_write_local = true;
2109 local_write_op.claim(sop);
2110 } else {
2111 MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop);
2112 r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard);
11fdf7f2 2113 r->map_epoch = get_osdmap_epoch();
7c673cae
FG
2114 r->min_epoch = get_parent()->get_interval_start_epoch();
2115 r->trace = trace;
9f95a23c 2116 messages.push_back(std::make_pair(i->osd, r));
7c673cae
FG
2117 }
2118 }
f67539c2
TL
2119
2120#ifdef HAVE_JAEGER
2121 if (op->client_op->osd_parent_span) {
2122 auto sub_write_span = jaeger_tracing::child_span("EC sub write", op->client_op->osd_parent_span);
2123 }
2124#endif
9f95a23c
TL
2125 if (!messages.empty()) {
2126 get_parent()->send_message_osd_cluster(messages, get_osdmap_epoch());
2127 }
2128
7c673cae 2129 if (should_write_local) {
11fdf7f2
TL
2130 handle_sub_write(
2131 get_parent()->whoami_shard(),
2132 op->client_op,
2133 local_write_op,
2134 op->trace);
7c673cae
FG
2135 }
2136
2137 for (auto i = op->on_write.begin();
2138 i != op->on_write.end();
2139 op->on_write.erase(i++)) {
2140 (*i)();
2141 }
2142
2143 return true;
2144}
2145
2146bool ECBackend::try_finish_rmw()
2147{
2148 if (waiting_commit.empty())
2149 return false;
2150 Op *op = &(waiting_commit.front());
2151 if (op->write_in_progress())
2152 return false;
2153 waiting_commit.pop_front();
2154
2155 dout(10) << __func__ << ": " << *op << dendl;
2156 dout(20) << __func__ << ": " << cache << dendl;
2157
2158 if (op->roll_forward_to > completed_to)
2159 completed_to = op->roll_forward_to;
2160 if (op->version > committed_to)
2161 committed_to = op->version;
2162
9f95a23c 2163 if (get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
7c673cae
FG
2164 if (op->version > get_parent()->get_log().get_can_rollback_to() &&
2165 waiting_reads.empty() &&
2166 waiting_commit.empty()) {
2167 // submit a dummy transaction to kick the rollforward
2168 auto tid = get_parent()->get_tid();
2169 Op *nop = &(tid_to_op_map[tid]);
2170 nop->hoid = op->hoid;
2171 nop->trim_to = op->trim_to;
2172 nop->roll_forward_to = op->version;
2173 nop->tid = tid;
2174 nop->reqid = op->reqid;
2175 waiting_reads.push_back(*nop);
2176 }
2177 }
2178
2179 if (op->using_cache) {
2180 cache.release_write_pin(op->pin);
2181 }
2182 tid_to_op_map.erase(op->tid);
2183
2184 if (waiting_reads.empty() &&
2185 waiting_commit.empty()) {
2186 pipeline_state.clear();
2187 dout(20) << __func__ << ": clearing pipeline_state "
2188 << pipeline_state
2189 << dendl;
2190 }
2191 return true;
2192}
2193
2194void ECBackend::check_ops()
2195{
2196 while (try_state_to_reads() ||
2197 try_reads_to_commit() ||
2198 try_finish_rmw());
2199}
2200
2201int ECBackend::objects_read_sync(
2202 const hobject_t &hoid,
2203 uint64_t off,
2204 uint64_t len,
2205 uint32_t op_flags,
2206 bufferlist *bl)
2207{
2208 return -EOPNOTSUPP;
2209}
2210
2211void ECBackend::objects_read_async(
2212 const hobject_t &hoid,
2213 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2214 pair<bufferlist*, Context*> > > &to_read,
2215 Context *on_complete,
2216 bool fast_read)
2217{
2218 map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
2219 reads;
2220
2221 uint32_t flags = 0;
2222 extent_set es;
2223 for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2224 pair<bufferlist*, Context*> > >::const_iterator i =
2225 to_read.begin();
2226 i != to_read.end();
2227 ++i) {
2228 pair<uint64_t, uint64_t> tmp =
2229 sinfo.offset_len_to_stripe_bounds(
2230 make_pair(i->first.get<0>(), i->first.get<1>()));
2231
11fdf7f2 2232 es.union_insert(tmp.first, tmp.second);
7c673cae
FG
2233 flags |= i->first.get<2>();
2234 }
2235
2236 if (!es.empty()) {
2237 auto &offsets = reads[hoid];
2238 for (auto j = es.begin();
2239 j != es.end();
2240 ++j) {
2241 offsets.push_back(
2242 boost::make_tuple(
2243 j.get_start(),
2244 j.get_len(),
2245 flags));
2246 }
2247 }
2248
2249 struct cb {
2250 ECBackend *ec;
2251 hobject_t hoid;
2252 list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2253 pair<bufferlist*, Context*> > > to_read;
2254 unique_ptr<Context> on_complete;
2255 cb(const cb&) = delete;
2256 cb(cb &&) = default;
2257 cb(ECBackend *ec,
2258 const hobject_t &hoid,
2259 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2260 pair<bufferlist*, Context*> > > &to_read,
2261 Context *on_complete)
2262 : ec(ec),
2263 hoid(hoid),
2264 to_read(to_read),
2265 on_complete(on_complete) {}
2266 void operator()(map<hobject_t,pair<int, extent_map> > &&results) {
2267 auto dpp = ec->get_parent()->get_dpp();
2268 ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
2269 << dendl;
2270 ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
2271 << dendl;
2272
2273 auto &got = results[hoid];
2274
2275 int r = 0;
2276 for (auto &&read: to_read) {
2277 if (got.first < 0) {
2278 if (read.second.second) {
2279 read.second.second->complete(got.first);
2280 }
2281 if (r == 0)
2282 r = got.first;
2283 } else {
11fdf7f2 2284 ceph_assert(read.second.first);
7c673cae
FG
2285 uint64_t offset = read.first.get<0>();
2286 uint64_t length = read.first.get<1>();
2287 auto range = got.second.get_containing_range(offset, length);
11fdf7f2
TL
2288 ceph_assert(range.first != range.second);
2289 ceph_assert(range.first.get_off() <= offset);
91327a77
AA
2290 ldpp_dout(dpp, 30) << "offset: " << offset << dendl;
2291 ldpp_dout(dpp, 30) << "range offset: " << range.first.get_off() << dendl;
2292 ldpp_dout(dpp, 30) << "length: " << length << dendl;
2293 ldpp_dout(dpp, 30) << "range length: " << range.first.get_len() << dendl;
11fdf7f2 2294 ceph_assert(
7c673cae
FG
2295 (offset + length) <=
2296 (range.first.get_off() + range.first.get_len()));
2297 read.second.first->substr_of(
2298 range.first.get_val(),
2299 offset - range.first.get_off(),
2300 length);
2301 if (read.second.second) {
2302 read.second.second->complete(length);
2303 read.second.second = nullptr;
2304 }
2305 }
2306 }
2307 to_read.clear();
2308 if (on_complete) {
2309 on_complete.release()->complete(r);
2310 }
2311 }
2312 ~cb() {
2313 for (auto &&i: to_read) {
2314 delete i.second.second;
2315 }
2316 to_read.clear();
2317 }
2318 };
2319 objects_read_and_reconstruct(
2320 reads,
2321 fast_read,
2322 make_gen_lambda_context<
2323 map<hobject_t,pair<int, extent_map> > &&, cb>(
2324 cb(this,
2325 hoid,
2326 to_read,
2327 on_complete)));
2328}
2329
2330struct CallClientContexts :
2331 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
2332 hobject_t hoid;
2333 ECBackend *ec;
2334 ECBackend::ClientAsyncReadStatus *status;
2335 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
2336 CallClientContexts(
2337 hobject_t hoid,
2338 ECBackend *ec,
2339 ECBackend::ClientAsyncReadStatus *status,
2340 const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
2341 : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
2342 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
2343 ECBackend::read_result_t &res = in.second;
2344 extent_map result;
2345 if (res.r != 0)
2346 goto out;
11fdf7f2
TL
2347 ceph_assert(res.returned.size() == to_read.size());
2348 ceph_assert(res.errors.empty());
7c673cae
FG
2349 for (auto &&read: to_read) {
2350 pair<uint64_t, uint64_t> adjusted =
2351 ec->sinfo.offset_len_to_stripe_bounds(
2352 make_pair(read.get<0>(), read.get<1>()));
11fdf7f2 2353 ceph_assert(res.returned.front().get<0>() == adjusted.first &&
7c673cae
FG
2354 res.returned.front().get<1>() == adjusted.second);
2355 map<int, bufferlist> to_decode;
2356 bufferlist bl;
2357 for (map<pg_shard_t, bufferlist>::iterator j =
2358 res.returned.front().get<2>().begin();
2359 j != res.returned.front().get<2>().end();
2360 ++j) {
f67539c2 2361 to_decode[j->first.shard] = std::move(j->second);
7c673cae
FG
2362 }
2363 int r = ECUtil::decode(
2364 ec->sinfo,
2365 ec->ec_impl,
2366 to_decode,
2367 &bl);
2368 if (r < 0) {
2369 res.r = r;
2370 goto out;
2371 }
2372 bufferlist trimmed;
2373 trimmed.substr_of(
2374 bl,
2375 read.get<0>() - adjusted.first,
11fdf7f2 2376 std::min(read.get<1>(),
7c673cae
FG
2377 bl.length() - (read.get<0>() - adjusted.first)));
2378 result.insert(
2379 read.get<0>(), trimmed.length(), std::move(trimmed));
2380 res.returned.pop_front();
2381 }
2382out:
2383 status->complete_object(hoid, res.r, std::move(result));
2384 ec->kick_reads();
2385 }
2386};
2387
2388void ECBackend::objects_read_and_reconstruct(
2389 const map<hobject_t,
2390 std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
2391 > &reads,
2392 bool fast_read,
2393 GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
2394{
2395 in_progress_client_reads.emplace_back(
2396 reads.size(), std::move(func));
2397 if (!reads.size()) {
2398 kick_reads();
2399 return;
2400 }
2401
28e407b8 2402 map<hobject_t, set<int>> obj_want_to_read;
7c673cae
FG
2403 set<int> want_to_read;
2404 get_want_to_read_shards(&want_to_read);
2405
2406 map<hobject_t, read_request_t> for_read_op;
2407 for (auto &&to_read: reads) {
11fdf7f2 2408 map<pg_shard_t, vector<pair<int, int>>> shards;
7c673cae
FG
2409 int r = get_min_avail_to_read_shards(
2410 to_read.first,
2411 want_to_read,
2412 false,
2413 fast_read,
2414 &shards);
11fdf7f2 2415 ceph_assert(r == 0);
7c673cae
FG
2416
2417 CallClientContexts *c = new CallClientContexts(
2418 to_read.first,
2419 this,
2420 &(in_progress_client_reads.back()),
2421 to_read.second);
2422 for_read_op.insert(
2423 make_pair(
2424 to_read.first,
2425 read_request_t(
2426 to_read.second,
2427 shards,
2428 false,
2429 c)));
28e407b8 2430 obj_want_to_read.insert(make_pair(to_read.first, want_to_read));
7c673cae
FG
2431 }
2432
2433 start_read_op(
2434 CEPH_MSG_PRIO_DEFAULT,
28e407b8 2435 obj_want_to_read,
7c673cae
FG
2436 for_read_op,
2437 OpRequestRef(),
2438 fast_read, false);
2439 return;
2440}
2441
2442
2443int ECBackend::send_all_remaining_reads(
2444 const hobject_t &hoid,
2445 ReadOp &rop)
2446{
2447 set<int> already_read;
2448 const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
2449 for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
2450 already_read.insert(i->shard);
2451 dout(10) << __func__ << " have/error shards=" << already_read << dendl;
11fdf7f2 2452 map<pg_shard_t, vector<pair<int, int>>> shards;
28e407b8
AA
2453 int r = get_remaining_shards(hoid, already_read, rop.want_to_read[hoid],
2454 rop.complete[hoid], &shards, rop.for_recovery);
7c673cae
FG
2455 if (r)
2456 return r;
7c673cae 2457
7c673cae
FG
2458 list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
2459 rop.to_read.find(hoid)->second.to_read;
2460 GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
2461 rop.to_read.find(hoid)->second.cb;
2462
91327a77
AA
2463 // (Note cuixf) If we need to read attrs and we read failed, try to read again.
2464 bool want_attrs =
2465 rop.to_read.find(hoid)->second.want_attrs &&
2466 (!rop.complete[hoid].attrs || rop.complete[hoid].attrs->empty());
2467 if (want_attrs) {
2468 dout(10) << __func__ << " want attrs again" << dendl;
2469 }
2470
28e407b8
AA
2471 rop.to_read.erase(hoid);
2472 rop.to_read.insert(make_pair(
7c673cae
FG
2473 hoid,
2474 read_request_t(
2475 offsets,
2476 shards,
91327a77 2477 want_attrs,
7c673cae 2478 c)));
7c673cae
FG
2479 return 0;
2480}
2481
2482int ECBackend::objects_get_attrs(
2483 const hobject_t &hoid,
2484 map<string, bufferlist> *out)
2485{
2486 int r = store->getattrs(
2487 ch,
2488 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2489 *out);
2490 if (r < 0)
2491 return r;
2492
2493 for (map<string, bufferlist>::iterator i = out->begin();
2494 i != out->end();
2495 ) {
2496 if (ECUtil::is_hinfo_key_string(i->first))
2497 out->erase(i++);
2498 else
2499 ++i;
2500 }
2501 return r;
2502}
2503
2504void ECBackend::rollback_append(
2505 const hobject_t &hoid,
2506 uint64_t old_size,
2507 ObjectStore::Transaction *t)
2508{
11fdf7f2 2509 ceph_assert(old_size % sinfo.get_stripe_width() == 0);
7c673cae
FG
2510 t->truncate(
2511 coll,
2512 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2513 sinfo.aligned_logical_offset_to_chunk_offset(
2514 old_size));
2515}
2516
28e407b8 2517int ECBackend::be_deep_scrub(
7c673cae 2518 const hobject_t &poid,
28e407b8
AA
2519 ScrubMap &map,
2520 ScrubMapBuilder &pos,
2521 ScrubMap::object &o)
2522{
2523 dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
7c673cae 2524 int r;
28e407b8
AA
2525
2526 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
2527 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
2528
2529 utime_t sleeptime;
2530 sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
2531 if (sleeptime != utime_t()) {
2532 lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
2533 sleeptime.sleep();
2534 }
2535
2536 if (pos.data_pos == 0) {
2537 pos.data_hash = bufferhash(-1);
2538 }
2539
7c673cae
FG
2540 uint64_t stride = cct->_conf->osd_deep_scrub_stride;
2541 if (stride % sinfo.get_chunk_size())
2542 stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
7c673cae 2543
28e407b8
AA
2544 bufferlist bl;
2545 r = store->read(
2546 ch,
2547 ghobject_t(
2548 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2549 pos.data_pos,
2550 stride, bl,
2551 fadvise_flags);
2552 if (r < 0) {
2553 dout(20) << __func__ << " " << poid << " got "
2554 << r << " on read, read_error" << dendl;
2555 o.read_error = true;
2556 return 0;
7c673cae 2557 }
28e407b8
AA
2558 if (bl.length() % sinfo.get_chunk_size()) {
2559 dout(20) << __func__ << " " << poid << " got "
2560 << r << " on read, not chunk size " << sinfo.get_chunk_size() << " aligned"
2561 << dendl;
7c673cae 2562 o.read_error = true;
28e407b8
AA
2563 return 0;
2564 }
2565 if (r > 0) {
2566 pos.data_hash << bl;
2567 }
2568 pos.data_pos += r;
2569 if (r == (int)stride) {
2570 return -EINPROGRESS;
7c673cae
FG
2571 }
2572
2573 ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
2574 if (!hinfo) {
2575 dout(0) << "_scan_list " << poid << " could not retrieve hash info" << dendl;
2576 o.read_error = true;
2577 o.digest_present = false;
28e407b8 2578 return 0;
7c673cae
FG
2579 } else {
2580 if (!get_parent()->get_pool().allows_ecoverwrites()) {
11fdf7f2 2581 ceph_assert(hinfo->has_chunk_hash());
28e407b8
AA
2582 if (hinfo->get_total_chunk_size() != (unsigned)pos.data_pos) {
2583 dout(0) << "_scan_list " << poid << " got incorrect size on read 0x"
2584 << std::hex << pos
2585 << " expected 0x" << hinfo->get_total_chunk_size() << std::dec
2586 << dendl;
7c673cae 2587 o.ec_size_mismatch = true;
28e407b8 2588 return 0;
7c673cae
FG
2589 }
2590
28e407b8
AA
2591 if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) !=
2592 pos.data_hash.digest()) {
2593 dout(0) << "_scan_list " << poid << " got incorrect hash on read 0x"
2594 << std::hex << pos.data_hash.digest() << " != expected 0x"
2595 << hinfo->get_chunk_hash(get_parent()->whoami_shard().shard)
2596 << std::dec << dendl;
7c673cae 2597 o.ec_hash_mismatch = true;
28e407b8 2598 return 0;
7c673cae
FG
2599 }
2600
2601 /* We checked above that we match our own stored hash. We cannot
2602 * send a hash of the actual object, so instead we simply send
2603 * our locally stored hash of shard 0 on the assumption that if
2604 * we match our chunk hash and our recollection of the hash for
2605 * chunk 0 matches that of our peers, there is likely no corruption.
2606 */
2607 o.digest = hinfo->get_chunk_hash(0);
2608 o.digest_present = true;
2609 } else {
2610 /* Hack! We must be using partial overwrites, and partial overwrites
2611 * don't support deep-scrub yet
2612 */
2613 o.digest = 0;
2614 o.digest_present = true;
2615 }
2616 }
2617
28e407b8 2618 o.omap_digest = -1;
7c673cae 2619 o.omap_digest_present = true;
28e407b8 2620 return 0;
7c673cae 2621}