]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/ECBackend.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / osd / ECBackend.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <iostream>
16#include <sstream>
17
18#include "ECBackend.h"
19#include "messages/MOSDPGPush.h"
20#include "messages/MOSDPGPushReply.h"
21#include "messages/MOSDECSubOpWrite.h"
22#include "messages/MOSDECSubOpWriteReply.h"
23#include "messages/MOSDECSubOpRead.h"
24#include "messages/MOSDECSubOpReadReply.h"
25#include "ECMsgTypes.h"
26
27#include "PrimaryLogPG.h"
20effc67 28#include "osd_tracer.h"
7c673cae
FG
29
30#define dout_context cct
31#define dout_subsys ceph_subsys_osd
32#define DOUT_PREFIX_ARGS this
33#undef dout_prefix
34#define dout_prefix _prefix(_dout, this)
f67539c2
TL
35
36using std::dec;
37using std::hex;
20effc67 38using std::less;
f67539c2
TL
39using std::list;
40using std::make_pair;
41using std::map;
42using std::pair;
43using std::ostream;
44using std::set;
45using std::string;
46using std::unique_ptr;
47using std::vector;
48
49using ceph::bufferhash;
50using ceph::bufferlist;
51using ceph::bufferptr;
52using ceph::ErasureCodeInterfaceRef;
53using ceph::Formatter;
54
7c673cae 55static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
11fdf7f2 56 return pgb->get_parent()->gen_dbg_prefix(*_dout);
7c673cae
FG
57}
58
59struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
60 list<ECBackend::RecoveryOp> ops;
61};
62
63ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
64 switch (rhs.pipeline_state) {
65 case ECBackend::pipeline_state_t::CACHE_VALID:
66 return lhs << "CACHE_VALID";
67 case ECBackend::pipeline_state_t::CACHE_INVALID:
68 return lhs << "CACHE_INVALID";
69 default:
11fdf7f2 70 ceph_abort_msg("invalid pipeline state");
7c673cae
FG
71 }
72 return lhs; // unreachable
73}
74
75static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
76{
77 lhs << "[";
78 for (map<pg_shard_t, bufferlist>::const_iterator i = rhs.begin();
79 i != rhs.end();
80 ++i) {
81 if (i != rhs.begin())
82 lhs << ", ";
83 lhs << make_pair(i->first, i->second.length());
84 }
85 return lhs << "]";
86}
87
88static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
89{
90 lhs << "[";
91 for (map<int, bufferlist>::const_iterator i = rhs.begin();
92 i != rhs.end();
93 ++i) {
94 if (i != rhs.begin())
95 lhs << ", ";
96 lhs << make_pair(i->first, i->second.length());
97 }
98 return lhs << "]";
99}
100
101static ostream &operator<<(
102 ostream &lhs,
103 const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
104{
105 return lhs << "(" << rhs.get<0>() << ", "
106 << rhs.get<1>() << ", " << rhs.get<2>() << ")";
107}
108
109ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
110{
111 return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
112 << ", need=" << rhs.need
113 << ", want_attrs=" << rhs.want_attrs
114 << ")";
115}
116
117ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
118{
119 lhs << "read_result_t(r=" << rhs.r
120 << ", errors=" << rhs.errors;
121 if (rhs.attrs) {
9f95a23c 122 lhs << ", attrs=" << *(rhs.attrs);
7c673cae
FG
123 } else {
124 lhs << ", noattrs";
125 }
126 return lhs << ", returned=" << rhs.returned << ")";
127}
128
129ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
130{
131 lhs << "ReadOp(tid=" << rhs.tid;
132 if (rhs.op && rhs.op->get_req()) {
133 lhs << ", op=";
134 rhs.op->get_req()->print(lhs);
135 }
136 return lhs << ", to_read=" << rhs.to_read
137 << ", complete=" << rhs.complete
138 << ", priority=" << rhs.priority
139 << ", obj_to_source=" << rhs.obj_to_source
140 << ", source_to_obj=" << rhs.source_to_obj
141 << ", in_progress=" << rhs.in_progress << ")";
142}
143
144void ECBackend::ReadOp::dump(Formatter *f) const
145{
146 f->dump_unsigned("tid", tid);
147 if (op && op->get_req()) {
148 f->dump_stream("op") << *(op->get_req());
149 }
150 f->dump_stream("to_read") << to_read;
151 f->dump_stream("complete") << complete;
152 f->dump_int("priority", priority);
153 f->dump_stream("obj_to_source") << obj_to_source;
154 f->dump_stream("source_to_obj") << source_to_obj;
155 f->dump_stream("in_progress") << in_progress;
156}
157
158ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
159{
160 lhs << "Op(" << rhs.hoid
161 << " v=" << rhs.version
162 << " tt=" << rhs.trim_to
163 << " tid=" << rhs.tid
164 << " reqid=" << rhs.reqid;
165 if (rhs.client_op && rhs.client_op->get_req()) {
166 lhs << " client_op=";
167 rhs.client_op->get_req()->print(lhs);
168 }
169 lhs << " roll_forward_to=" << rhs.roll_forward_to
170 << " temp_added=" << rhs.temp_added
171 << " temp_cleared=" << rhs.temp_cleared
172 << " pending_read=" << rhs.pending_read
173 << " remote_read=" << rhs.remote_read
174 << " remote_read_result=" << rhs.remote_read_result
175 << " pending_apply=" << rhs.pending_apply
176 << " pending_commit=" << rhs.pending_commit
177 << " plan.to_read=" << rhs.plan.to_read
178 << " plan.will_write=" << rhs.plan.will_write
179 << ")";
180 return lhs;
181}
182
183ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
184{
185 return lhs << "RecoveryOp("
186 << "hoid=" << rhs.hoid
187 << " v=" << rhs.v
188 << " missing_on=" << rhs.missing_on
189 << " missing_on_shards=" << rhs.missing_on_shards
190 << " recovery_info=" << rhs.recovery_info
191 << " recovery_progress=" << rhs.recovery_progress
192 << " obc refcount=" << rhs.obc.use_count()
193 << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
194 << " waiting_on_pushes=" << rhs.waiting_on_pushes
195 << " extent_requested=" << rhs.extent_requested
196 << ")";
197}
198
199void ECBackend::RecoveryOp::dump(Formatter *f) const
200{
201 f->dump_stream("hoid") << hoid;
202 f->dump_stream("v") << v;
203 f->dump_stream("missing_on") << missing_on;
204 f->dump_stream("missing_on_shards") << missing_on_shards;
205 f->dump_stream("recovery_info") << recovery_info;
206 f->dump_stream("recovery_progress") << recovery_progress;
207 f->dump_stream("state") << tostr(state);
208 f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
209 f->dump_stream("extent_requested") << extent_requested;
210}
211
212ECBackend::ECBackend(
213 PGBackend::Listener *pg,
11fdf7f2 214 const coll_t &coll,
7c673cae
FG
215 ObjectStore::CollectionHandle &ch,
216 ObjectStore *store,
217 CephContext *cct,
218 ErasureCodeInterfaceRef ec_impl,
219 uint64_t stripe_width)
220 : PGBackend(cct, pg, store, coll, ch),
221 ec_impl(ec_impl),
222 sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
11fdf7f2 223 ceph_assert((ec_impl->get_data_chunk_count() *
7c673cae
FG
224 ec_impl->get_chunk_size(stripe_width)) == stripe_width);
225}
226
227PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
228{
229 return new ECRecoveryHandle;
230}
231
232void ECBackend::_failed_push(const hobject_t &hoid,
233 pair<RecoveryMessages *, ECBackend::read_result_t &> &in)
234{
235 ECBackend::read_result_t &res = in.second;
236 dout(10) << __func__ << ": Read error " << hoid << " r="
237 << res.r << " errors=" << res.errors << dendl;
238 dout(10) << __func__ << ": canceling recovery op for obj " << hoid
239 << dendl;
11fdf7f2 240 ceph_assert(recovery_ops.count(hoid));
b32b8144 241 eversion_t v = recovery_ops[hoid].v;
7c673cae
FG
242 recovery_ops.erase(hoid);
243
9f95a23c 244 set<pg_shard_t> fl;
7c673cae 245 for (auto&& i : res.errors) {
9f95a23c 246 fl.insert(i.first);
7c673cae 247 }
9f95a23c 248 get_parent()->on_failed_pull(fl, hoid, v);
7c673cae
FG
249}
250
251struct OnRecoveryReadComplete :
252 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
253 ECBackend *pg;
254 hobject_t hoid;
7c673cae
FG
255 OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid)
256 : pg(pg), hoid(hoid) {}
257 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
258 ECBackend::read_result_t &res = in.second;
259 if (!(res.r == 0 && res.errors.empty())) {
260 pg->_failed_push(hoid, in);
261 return;
262 }
11fdf7f2 263 ceph_assert(res.returned.size() == 1);
7c673cae
FG
264 pg->handle_recovery_read_complete(
265 hoid,
266 res.returned.back(),
267 res.attrs,
268 in.first);
269 }
270};
271
272struct RecoveryMessages {
273 map<hobject_t,
274 ECBackend::read_request_t> reads;
28e407b8 275 map<hobject_t, set<int>> want_to_read;
7c673cae
FG
276 void read(
277 ECBackend *ec,
278 const hobject_t &hoid, uint64_t off, uint64_t len,
28e407b8 279 set<int> &&_want_to_read,
11fdf7f2 280 const map<pg_shard_t, vector<pair<int, int>>> &need,
7c673cae
FG
281 bool attrs) {
282 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
283 to_read.push_back(boost::make_tuple(off, len, 0));
11fdf7f2 284 ceph_assert(!reads.count(hoid));
28e407b8 285 want_to_read.insert(make_pair(hoid, std::move(_want_to_read)));
7c673cae
FG
286 reads.insert(
287 make_pair(
288 hoid,
289 ECBackend::read_request_t(
290 to_read,
291 need,
292 attrs,
293 new OnRecoveryReadComplete(
294 ec,
295 hoid))));
296 }
297
298 map<pg_shard_t, vector<PushOp> > pushes;
299 map<pg_shard_t, vector<PushReplyOp> > push_replies;
300 ObjectStore::Transaction t;
301 RecoveryMessages() {}
f67539c2 302 ~RecoveryMessages() {}
7c673cae
FG
303};
304
305void ECBackend::handle_recovery_push(
306 const PushOp &op,
11fdf7f2
TL
307 RecoveryMessages *m,
308 bool is_repair)
7c673cae 309{
11fdf7f2
TL
310 if (get_parent()->check_failsafe_full()) {
311 dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl;
7c673cae
FG
312 ceph_abort();
313 }
314
315 bool oneshot = op.before_progress.first && op.after_progress.data_complete;
316 ghobject_t tobj;
317 if (oneshot) {
318 tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
319 get_parent()->whoami_shard().shard);
320 } else {
321 tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
322 op.version),
323 ghobject_t::NO_GEN,
324 get_parent()->whoami_shard().shard);
325 if (op.before_progress.first) {
326 dout(10) << __func__ << ": Adding oid "
327 << tobj.hobj << " in the temp collection" << dendl;
328 add_temp_obj(tobj.hobj);
329 }
330 }
331
332 if (op.before_progress.first) {
333 m->t.remove(coll, tobj);
334 m->t.touch(coll, tobj);
335 }
336
337 if (!op.data_included.empty()) {
338 uint64_t start = op.data_included.range_start();
339 uint64_t end = op.data_included.range_end();
11fdf7f2 340 ceph_assert(op.data.length() == (end - start));
7c673cae
FG
341
342 m->t.write(
343 coll,
344 tobj,
345 start,
346 op.data.length(),
347 op.data);
348 } else {
11fdf7f2
TL
349 ceph_assert(op.data.length() == 0);
350 }
351
352 if (get_parent()->pg_is_remote_backfilling()) {
353 get_parent()->pg_add_local_num_bytes(op.data.length());
354 get_parent()->pg_add_num_bytes(op.data.length() * get_ec_data_chunk_count());
355 dout(10) << __func__ << " " << op.soid
356 << " add new actual data by " << op.data.length()
357 << " add new num_bytes by " << op.data.length() * get_ec_data_chunk_count()
358 << dendl;
7c673cae
FG
359 }
360
361 if (op.before_progress.first) {
11fdf7f2 362 ceph_assert(op.attrset.count(string("_")));
7c673cae
FG
363 m->t.setattrs(
364 coll,
365 tobj,
366 op.attrset);
367 }
368
369 if (op.after_progress.data_complete && !oneshot) {
370 dout(10) << __func__ << ": Removing oid "
371 << tobj.hobj << " from the temp collection" << dendl;
372 clear_temp_obj(tobj.hobj);
373 m->t.remove(coll, ghobject_t(
374 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
375 m->t.collection_move_rename(
376 coll, tobj,
377 coll, ghobject_t(
378 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
379 }
380 if (op.after_progress.data_complete) {
381 if ((get_parent()->pgb_is_primary())) {
11fdf7f2
TL
382 ceph_assert(recovery_ops.count(op.soid));
383 ceph_assert(recovery_ops[op.soid].obc);
384 if (get_parent()->pg_is_repair())
385 get_parent()->inc_osd_stat_repaired();
7c673cae
FG
386 get_parent()->on_local_recover(
387 op.soid,
388 op.recovery_info,
389 recovery_ops[op.soid].obc,
c07f9fc5 390 false,
7c673cae
FG
391 &m->t);
392 } else {
11fdf7f2
TL
393 // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired
394 if (is_repair)
395 get_parent()->inc_osd_stat_repaired();
7c673cae
FG
396 get_parent()->on_local_recover(
397 op.soid,
398 op.recovery_info,
399 ObjectContextRef(),
c07f9fc5 400 false,
7c673cae 401 &m->t);
11fdf7f2
TL
402 if (get_parent()->pg_is_remote_backfilling()) {
403 struct stat st;
404 int r = store->stat(ch, ghobject_t(op.soid, ghobject_t::NO_GEN,
405 get_parent()->whoami_shard().shard), &st);
406 if (r == 0) {
407 get_parent()->pg_sub_local_num_bytes(st.st_size);
408 // XXX: This can be way overestimated for small objects
409 get_parent()->pg_sub_num_bytes(st.st_size * get_ec_data_chunk_count());
410 dout(10) << __func__ << " " << op.soid
411 << " sub actual data by " << st.st_size
412 << " sub num_bytes by " << st.st_size * get_ec_data_chunk_count()
413 << dendl;
414 }
415 }
7c673cae
FG
416 }
417 }
418 m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
419 m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
420}
421
422void ECBackend::handle_recovery_push_reply(
423 const PushReplyOp &op,
424 pg_shard_t from,
425 RecoveryMessages *m)
426{
427 if (!recovery_ops.count(op.soid))
428 return;
429 RecoveryOp &rop = recovery_ops[op.soid];
11fdf7f2 430 ceph_assert(rop.waiting_on_pushes.count(from));
7c673cae
FG
431 rop.waiting_on_pushes.erase(from);
432 continue_recovery_op(rop, m);
433}
434
435void ECBackend::handle_recovery_read_complete(
436 const hobject_t &hoid,
437 boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
20effc67 438 std::optional<map<string, bufferlist, less<>> > attrs,
7c673cae
FG
439 RecoveryMessages *m)
440{
441 dout(10) << __func__ << ": returned " << hoid << " "
442 << "(" << to_read.get<0>()
443 << ", " << to_read.get<1>()
444 << ", " << to_read.get<2>()
445 << ")"
446 << dendl;
11fdf7f2 447 ceph_assert(recovery_ops.count(hoid));
7c673cae 448 RecoveryOp &op = recovery_ops[hoid];
11fdf7f2 449 ceph_assert(op.returned_data.empty());
7c673cae
FG
450 map<int, bufferlist*> target;
451 for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
452 i != op.missing_on_shards.end();
453 ++i) {
454 target[*i] = &(op.returned_data[*i]);
455 }
456 map<int, bufferlist> from;
457 for(map<pg_shard_t, bufferlist>::iterator i = to_read.get<2>().begin();
458 i != to_read.get<2>().end();
459 ++i) {
f67539c2 460 from[i->first.shard] = std::move(i->second);
7c673cae
FG
461 }
462 dout(10) << __func__ << ": " << from << dendl;
11fdf7f2
TL
463 int r;
464 r = ECUtil::decode(sinfo, ec_impl, from, target);
465 ceph_assert(r == 0);
7c673cae
FG
466 if (attrs) {
467 op.xattrs.swap(*attrs);
468
469 if (!op.obc) {
470 // attrs only reference the origin bufferlist (decode from
471 // ECSubReadReply message) whose size is much greater than attrs
472 // in recovery. If obc cache it (get_obc maybe cache the attr),
473 // this causes the whole origin bufferlist would not be free
474 // until obc is evicted from obc cache. So rebuild the
475 // bufferlist before cache it.
476 for (map<string, bufferlist>::iterator it = op.xattrs.begin();
477 it != op.xattrs.end();
478 ++it) {
479 it->second.rebuild();
480 }
481 // Need to remove ECUtil::get_hinfo_key() since it should not leak out
482 // of the backend (see bug #12983)
20effc67 483 map<string, bufferlist, less<>> sanitized_attrs(op.xattrs);
7c673cae
FG
484 sanitized_attrs.erase(ECUtil::get_hinfo_key());
485 op.obc = get_parent()->get_obc(hoid, sanitized_attrs);
11fdf7f2 486 ceph_assert(op.obc);
7c673cae
FG
487 op.recovery_info.size = op.obc->obs.oi.size;
488 op.recovery_info.oi = op.obc->obs.oi;
489 }
490
491 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
492 if (op.obc->obs.oi.size > 0) {
11fdf7f2
TL
493 ceph_assert(op.xattrs.count(ECUtil::get_hinfo_key()));
494 auto bp = op.xattrs[ECUtil::get_hinfo_key()].cbegin();
495 decode(hinfo, bp);
7c673cae
FG
496 }
497 op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
498 }
11fdf7f2
TL
499 ceph_assert(op.xattrs.size());
500 ceph_assert(op.obc);
7c673cae
FG
501 continue_recovery_op(op, m);
502}
503
504struct SendPushReplies : public Context {
505 PGBackend::Listener *l;
506 epoch_t epoch;
507 map<int, MOSDPGPushReply*> replies;
508 SendPushReplies(
509 PGBackend::Listener *l,
510 epoch_t epoch,
511 map<int, MOSDPGPushReply*> &in) : l(l), epoch(epoch) {
512 replies.swap(in);
513 }
514 void finish(int) override {
9f95a23c
TL
515 std::vector<std::pair<int, Message*>> messages;
516 messages.reserve(replies.size());
7c673cae
FG
517 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
518 i != replies.end();
519 ++i) {
9f95a23c
TL
520 messages.push_back(std::make_pair(i->first, i->second));
521 }
522 if (!messages.empty()) {
523 l->send_message_osd_cluster(messages, epoch);
7c673cae
FG
524 }
525 replies.clear();
526 }
527 ~SendPushReplies() override {
528 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
529 i != replies.end();
530 ++i) {
531 i->second->put();
532 }
533 replies.clear();
534 }
535};
536
537void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
538{
539 for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
540 i != m.pushes.end();
541 m.pushes.erase(i++)) {
542 MOSDPGPush *msg = new MOSDPGPush();
543 msg->set_priority(priority);
11fdf7f2 544 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
545 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
546 msg->from = get_parent()->whoami_shard();
547 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
548 msg->pushes.swap(i->second);
549 msg->compute_cost(cct);
11fdf7f2 550 msg->is_repair = get_parent()->pg_is_repair();
7c673cae
FG
551 get_parent()->send_message(
552 i->first.osd,
553 msg);
554 }
555 map<int, MOSDPGPushReply*> replies;
556 for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
557 m.push_replies.begin();
558 i != m.push_replies.end();
559 m.push_replies.erase(i++)) {
560 MOSDPGPushReply *msg = new MOSDPGPushReply();
561 msg->set_priority(priority);
11fdf7f2 562 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
563 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
564 msg->from = get_parent()->whoami_shard();
565 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
566 msg->replies.swap(i->second);
567 msg->compute_cost(cct);
568 replies.insert(make_pair(i->first.osd, msg));
569 }
570
571 if (!replies.empty()) {
572 (m.t).register_on_complete(
573 get_parent()->bless_context(
574 new SendPushReplies(
575 get_parent(),
11fdf7f2 576 get_osdmap_epoch(),
7c673cae
FG
577 replies)));
578 get_parent()->queue_transaction(std::move(m.t));
579 }
580
581 if (m.reads.empty())
582 return;
583 start_read_op(
584 priority,
28e407b8 585 m.want_to_read,
7c673cae
FG
586 m.reads,
587 OpRequestRef(),
588 false, true);
589}
590
591void ECBackend::continue_recovery_op(
592 RecoveryOp &op,
593 RecoveryMessages *m)
594{
595 dout(10) << __func__ << ": continuing " << op << dendl;
596 while (1) {
597 switch (op.state) {
598 case RecoveryOp::IDLE: {
599 // start read
600 op.state = RecoveryOp::READING;
11fdf7f2 601 ceph_assert(!op.recovery_progress.data_complete);
7c673cae
FG
602 set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
603 uint64_t from = op.recovery_progress.data_recovered_to;
604 uint64_t amount = get_recovery_chunk_size();
605
606 if (op.recovery_progress.first && op.obc) {
607 /* We've got the attrs and the hinfo, might as well use them */
608 op.hinfo = get_hash_info(op.hoid);
a4b75251
TL
609 if (!op.hinfo) {
610 derr << __func__ << ": " << op.hoid << " has inconsistent hinfo"
611 << dendl;
612 ceph_assert(recovery_ops.count(op.hoid));
613 eversion_t v = recovery_ops[op.hoid].v;
614 recovery_ops.erase(op.hoid);
615 get_parent()->on_failed_pull({get_parent()->whoami_shard()},
616 op.hoid, v);
617 return;
618 }
7c673cae 619 op.xattrs = op.obc->attr_cache;
11fdf7f2 620 encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
7c673cae
FG
621 }
622
11fdf7f2 623 map<pg_shard_t, vector<pair<int, int>>> to_read;
7c673cae
FG
624 int r = get_min_avail_to_read_shards(
625 op.hoid, want, true, false, &to_read);
626 if (r != 0) {
627 // we must have lost a recovery source
11fdf7f2 628 ceph_assert(!op.recovery_progress.first);
7c673cae
FG
629 dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
630 << dendl;
631 get_parent()->cancel_pull(op.hoid);
632 recovery_ops.erase(op.hoid);
633 return;
634 }
635 m->read(
636 this,
637 op.hoid,
638 op.recovery_progress.data_recovered_to,
639 amount,
28e407b8 640 std::move(want),
7c673cae
FG
641 to_read,
642 op.recovery_progress.first && !op.obc);
643 op.extent_requested = make_pair(
644 from,
645 amount);
646 dout(10) << __func__ << ": IDLE return " << op << dendl;
647 return;
648 }
649 case RecoveryOp::READING: {
650 // read completed, start write
11fdf7f2
TL
651 ceph_assert(op.xattrs.size());
652 ceph_assert(op.returned_data.size());
7c673cae
FG
653 op.state = RecoveryOp::WRITING;
654 ObjectRecoveryProgress after_progress = op.recovery_progress;
655 after_progress.data_recovered_to += op.extent_requested.second;
656 after_progress.first = false;
657 if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
658 after_progress.data_recovered_to =
659 sinfo.logical_to_next_stripe_offset(
660 op.obc->obs.oi.size);
661 after_progress.data_complete = true;
662 }
663 for (set<pg_shard_t>::iterator mi = op.missing_on.begin();
664 mi != op.missing_on.end();
665 ++mi) {
11fdf7f2 666 ceph_assert(op.returned_data.count(mi->shard));
7c673cae
FG
667 m->pushes[*mi].push_back(PushOp());
668 PushOp &pop = m->pushes[*mi].back();
669 pop.soid = op.hoid;
670 pop.version = op.v;
671 pop.data = op.returned_data[mi->shard];
672 dout(10) << __func__ << ": before_progress=" << op.recovery_progress
673 << ", after_progress=" << after_progress
674 << ", pop.data.length()=" << pop.data.length()
675 << ", size=" << op.obc->obs.oi.size << dendl;
11fdf7f2 676 ceph_assert(
7c673cae
FG
677 pop.data.length() ==
678 sinfo.aligned_logical_offset_to_chunk_offset(
679 after_progress.data_recovered_to -
680 op.recovery_progress.data_recovered_to)
681 );
682 if (pop.data.length())
683 pop.data_included.insert(
684 sinfo.aligned_logical_offset_to_chunk_offset(
685 op.recovery_progress.data_recovered_to),
686 pop.data.length()
687 );
688 if (op.recovery_progress.first) {
689 pop.attrset = op.xattrs;
690 }
691 pop.recovery_info = op.recovery_info;
692 pop.before_progress = op.recovery_progress;
693 pop.after_progress = after_progress;
694 if (*mi != get_parent()->primary_shard())
695 get_parent()->begin_peer_recover(
696 *mi,
697 op.hoid);
698 }
699 op.returned_data.clear();
700 op.waiting_on_pushes = op.missing_on;
701 op.recovery_progress = after_progress;
702 dout(10) << __func__ << ": READING return " << op << dendl;
703 return;
704 }
705 case RecoveryOp::WRITING: {
706 if (op.waiting_on_pushes.empty()) {
707 if (op.recovery_progress.data_complete) {
708 op.state = RecoveryOp::COMPLETE;
709 for (set<pg_shard_t>::iterator i = op.missing_on.begin();
710 i != op.missing_on.end();
711 ++i) {
712 if (*i != get_parent()->primary_shard()) {
713 dout(10) << __func__ << ": on_peer_recover on " << *i
714 << ", obj " << op.hoid << dendl;
715 get_parent()->on_peer_recover(
716 *i,
717 op.hoid,
718 op.recovery_info);
719 }
720 }
721 object_stat_sum_t stat;
722 stat.num_bytes_recovered = op.recovery_info.size;
723 stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
724 stat.num_objects_recovered = 1;
11fdf7f2
TL
725 if (get_parent()->pg_is_repair())
726 stat.num_objects_repaired = 1;
c07f9fc5 727 get_parent()->on_global_recover(op.hoid, stat, false);
7c673cae
FG
728 dout(10) << __func__ << ": WRITING return " << op << dendl;
729 recovery_ops.erase(op.hoid);
730 return;
731 } else {
732 op.state = RecoveryOp::IDLE;
733 dout(10) << __func__ << ": WRITING continue " << op << dendl;
734 continue;
735 }
736 }
737 return;
738 }
739 // should never be called once complete
740 case RecoveryOp::COMPLETE:
741 default: {
742 ceph_abort();
743 };
744 }
745 }
746}
747
748void ECBackend::run_recovery_op(
749 RecoveryHandle *_h,
750 int priority)
751{
752 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
753 RecoveryMessages m;
754 for (list<RecoveryOp>::iterator i = h->ops.begin();
755 i != h->ops.end();
756 ++i) {
757 dout(10) << __func__ << ": starting " << *i << dendl;
11fdf7f2 758 ceph_assert(!recovery_ops.count(i->hoid));
7c673cae
FG
759 RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
760 continue_recovery_op(op, &m);
761 }
c07f9fc5 762
7c673cae 763 dispatch_recovery_messages(m, priority);
c07f9fc5 764 send_recovery_deletes(priority, h->deletes);
7c673cae
FG
765 delete _h;
766}
767
224ce89b 768int ECBackend::recover_object(
7c673cae
FG
769 const hobject_t &hoid,
770 eversion_t v,
771 ObjectContextRef head,
772 ObjectContextRef obc,
773 RecoveryHandle *_h)
774{
775 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
776 h->ops.push_back(RecoveryOp());
777 h->ops.back().v = v;
778 h->ops.back().hoid = hoid;
779 h->ops.back().obc = obc;
780 h->ops.back().recovery_info.soid = hoid;
781 h->ops.back().recovery_info.version = v;
782 if (obc) {
783 h->ops.back().recovery_info.size = obc->obs.oi.size;
784 h->ops.back().recovery_info.oi = obc->obs.oi;
785 }
786 if (hoid.is_snap()) {
787 if (obc) {
11fdf7f2 788 ceph_assert(obc->ssc);
7c673cae
FG
789 h->ops.back().recovery_info.ss = obc->ssc->snapset;
790 } else if (head) {
11fdf7f2 791 ceph_assert(head->ssc);
7c673cae
FG
792 h->ops.back().recovery_info.ss = head->ssc->snapset;
793 } else {
11fdf7f2 794 ceph_abort_msg("neither obc nor head set for a snap object");
7c673cae
FG
795 }
796 }
797 h->ops.back().recovery_progress.omap_complete = true;
798 for (set<pg_shard_t>::const_iterator i =
11fdf7f2
TL
799 get_parent()->get_acting_recovery_backfill_shards().begin();
800 i != get_parent()->get_acting_recovery_backfill_shards().end();
7c673cae
FG
801 ++i) {
802 dout(10) << "checking " << *i << dendl;
803 if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
804 h->ops.back().missing_on.insert(*i);
805 h->ops.back().missing_on_shards.insert(i->shard);
806 }
807 }
808 dout(10) << __func__ << ": built op " << h->ops.back() << dendl;
224ce89b 809 return 0;
7c673cae
FG
810}
811
812bool ECBackend::can_handle_while_inactive(
813 OpRequestRef _op)
814{
815 return false;
816}
817
c07f9fc5 818bool ECBackend::_handle_message(
7c673cae
FG
819 OpRequestRef _op)
820{
821 dout(10) << __func__ << ": " << *_op->get_req() << dendl;
822 int priority = _op->get_req()->get_priority();
823 switch (_op->get_req()->get_type()) {
824 case MSG_OSD_EC_WRITE: {
825 // NOTE: this is non-const because handle_sub_write modifies the embedded
826 // ObjectStore::Transaction in place (and then std::move's it). It does
827 // not conflict with ECSubWrite's operator<<.
828 MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
829 _op->get_nonconst_req());
28e407b8 830 parent->maybe_preempt_replica_scrub(op->op.soid);
7c673cae
FG
831 handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
832 return true;
833 }
834 case MSG_OSD_EC_WRITE_REPLY: {
835 const MOSDECSubOpWriteReply *op = static_cast<const MOSDECSubOpWriteReply*>(
836 _op->get_req());
837 handle_sub_write_reply(op->op.from, op->op, _op->pg_trace);
838 return true;
839 }
840 case MSG_OSD_EC_READ: {
9f95a23c 841 auto op = _op->get_req<MOSDECSubOpRead>();
7c673cae
FG
842 MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
843 reply->pgid = get_parent()->primary_spg_t();
11fdf7f2 844 reply->map_epoch = get_osdmap_epoch();
7c673cae
FG
845 reply->min_epoch = get_parent()->get_interval_start_epoch();
846 handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace);
847 reply->trace = _op->pg_trace;
848 get_parent()->send_message_osd_cluster(
9f95a23c 849 reply, _op->get_req()->get_connection());
7c673cae
FG
850 return true;
851 }
852 case MSG_OSD_EC_READ_REPLY: {
853 // NOTE: this is non-const because handle_sub_read_reply steals resulting
854 // buffers. It does not conflict with ECSubReadReply operator<<.
855 MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
856 _op->get_nonconst_req());
857 RecoveryMessages rm;
858 handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
859 dispatch_recovery_messages(rm, priority);
860 return true;
861 }
862 case MSG_OSD_PG_PUSH: {
9f95a23c 863 auto op = _op->get_req<MOSDPGPush>();
7c673cae
FG
864 RecoveryMessages rm;
865 for (vector<PushOp>::const_iterator i = op->pushes.begin();
866 i != op->pushes.end();
867 ++i) {
11fdf7f2 868 handle_recovery_push(*i, &rm, op->is_repair);
7c673cae
FG
869 }
870 dispatch_recovery_messages(rm, priority);
871 return true;
872 }
873 case MSG_OSD_PG_PUSH_REPLY: {
874 const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
875 _op->get_req());
876 RecoveryMessages rm;
877 for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
878 i != op->replies.end();
879 ++i) {
880 handle_recovery_push_reply(*i, op->from, &rm);
881 }
882 dispatch_recovery_messages(rm, priority);
883 return true;
884 }
885 default:
886 return false;
887 }
888 return false;
889}
890
891struct SubWriteCommitted : public Context {
892 ECBackend *pg;
893 OpRequestRef msg;
894 ceph_tid_t tid;
895 eversion_t version;
896 eversion_t last_complete;
897 const ZTracer::Trace trace;
898 SubWriteCommitted(
899 ECBackend *pg,
900 OpRequestRef msg,
901 ceph_tid_t tid,
902 eversion_t version,
903 eversion_t last_complete,
904 const ZTracer::Trace &trace)
905 : pg(pg), msg(msg), tid(tid),
906 version(version), last_complete(last_complete), trace(trace) {}
907 void finish(int) override {
908 if (msg)
909 msg->mark_event("sub_op_committed");
910 pg->sub_write_committed(tid, version, last_complete, trace);
911 }
912};
913void ECBackend::sub_write_committed(
914 ceph_tid_t tid, eversion_t version, eversion_t last_complete,
915 const ZTracer::Trace &trace) {
916 if (get_parent()->pgb_is_primary()) {
917 ECSubWriteReply reply;
918 reply.tid = tid;
919 reply.last_complete = last_complete;
920 reply.committed = true;
11fdf7f2 921 reply.applied = true;
7c673cae
FG
922 reply.from = get_parent()->whoami_shard();
923 handle_sub_write_reply(
924 get_parent()->whoami_shard(),
925 reply, trace);
926 } else {
927 get_parent()->update_last_complete_ondisk(last_complete);
928 MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
929 r->pgid = get_parent()->primary_spg_t();
11fdf7f2 930 r->map_epoch = get_osdmap_epoch();
7c673cae
FG
931 r->min_epoch = get_parent()->get_interval_start_epoch();
932 r->op.tid = tid;
933 r->op.last_complete = last_complete;
934 r->op.committed = true;
11fdf7f2 935 r->op.applied = true;
7c673cae
FG
936 r->op.from = get_parent()->whoami_shard();
937 r->set_priority(CEPH_MSG_PRIO_HIGH);
938 r->trace = trace;
939 r->trace.event("sending sub op commit");
940 get_parent()->send_message_osd_cluster(
11fdf7f2 941 get_parent()->primary_shard().osd, r, get_osdmap_epoch());
7c673cae
FG
942 }
943}
944
945void ECBackend::handle_sub_write(
946 pg_shard_t from,
947 OpRequestRef msg,
948 ECSubWrite &op,
11fdf7f2 949 const ZTracer::Trace &trace)
7c673cae 950{
20effc67
TL
951 jspan span;
952 if (msg) {
11fdf7f2 953 msg->mark_event("sub_op_started");
20effc67 954 span = tracing::osd::tracer.add_span(__func__, msg->osd_parent_span);
f67539c2 955 }
20effc67
TL
956 trace.event("handle_sub_write");
957
7c673cae
FG
958 if (!get_parent()->pgb_is_primary())
959 get_parent()->update_stats(op.stats);
960 ObjectStore::Transaction localt;
961 if (!op.temp_added.empty()) {
962 add_temp_objs(op.temp_added);
963 }
11fdf7f2 964 if (op.backfill_or_async_recovery) {
7c673cae
FG
965 for (set<hobject_t>::iterator i = op.temp_removed.begin();
966 i != op.temp_removed.end();
967 ++i) {
968 dout(10) << __func__ << ": removing object " << *i
969 << " since we won't get the transaction" << dendl;
970 localt.remove(
971 coll,
972 ghobject_t(
973 *i,
974 ghobject_t::NO_GEN,
975 get_parent()->whoami_shard().shard));
976 }
977 }
978 clear_temp_objs(op.temp_removed);
11fdf7f2
TL
979 dout(30) << __func__ << " missing before " << get_parent()->get_log().get_missing().get_items() << dendl;
980 // flag set to true during async recovery
981 bool async = false;
982 pg_missing_tracker_t pmissing = get_parent()->get_local_missing();
983 if (pmissing.is_missing(op.soid)) {
984 async = true;
985 dout(30) << __func__ << " is_missing " << pmissing.is_missing(op.soid) << dendl;
986 for (auto &&e: op.log_entries) {
987 dout(30) << " add_next_event entry " << e << dendl;
988 get_parent()->add_local_next_event(e);
989 dout(30) << " entry is_delete " << e.is_delete() << dendl;
990 }
991 }
7c673cae 992 get_parent()->log_operation(
f67539c2 993 std::move(op.log_entries),
7c673cae
FG
994 op.updated_hit_set_history,
995 op.trim_to,
996 op.roll_forward_to,
9f95a23c 997 op.roll_forward_to,
11fdf7f2
TL
998 !op.backfill_or_async_recovery,
999 localt,
1000 async);
7c673cae 1001
11fdf7f2
TL
1002 if (!get_parent()->pg_is_undersized() &&
1003 (unsigned)get_parent()->whoami_shard().shard >=
1004 ec_impl->get_data_chunk_count())
7c673cae
FG
1005 op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1006
7c673cae
FG
1007 localt.register_on_commit(
1008 get_parent()->bless_context(
1009 new SubWriteCommitted(
1010 this, msg, op.tid,
1011 op.at_version,
1012 get_parent()->get_info().last_complete, trace)));
7c673cae
FG
1013 vector<ObjectStore::Transaction> tls;
1014 tls.reserve(2);
1015 tls.push_back(std::move(op.t));
1016 tls.push_back(std::move(localt));
1017 get_parent()->queue_transactions(tls, msg);
11fdf7f2
TL
1018 dout(30) << __func__ << " missing after" << get_parent()->get_log().get_missing().get_items() << dendl;
1019 if (op.at_version != eversion_t()) {
1020 // dummy rollforward transaction doesn't get at_version (and doesn't advance it)
1021 get_parent()->op_applied(op.at_version);
1022 }
7c673cae
FG
1023}
1024
1025void ECBackend::handle_sub_read(
1026 pg_shard_t from,
1027 const ECSubRead &op,
1028 ECSubReadReply *reply,
1029 const ZTracer::Trace &trace)
1030{
1031 trace.event("handle sub read");
1032 shard_id_t shard = get_parent()->whoami_shard().shard;
1033 for(auto i = op.to_read.begin();
1034 i != op.to_read.end();
1035 ++i) {
1036 int r = 0;
7c673cae
FG
1037 for (auto j = i->second.begin(); j != i->second.end(); ++j) {
1038 bufferlist bl;
11fdf7f2
TL
1039 if ((op.subchunks.find(i->first)->second.size() == 1) &&
1040 (op.subchunks.find(i->first)->second.front().second ==
1041 ec_impl->get_sub_chunk_count())) {
1042 dout(25) << __func__ << " case1: reading the complete chunk/shard." << dendl;
1043 r = store->read(
1044 ch,
1045 ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1046 j->get<0>(),
1047 j->get<1>(),
1048 bl, j->get<2>()); // Allow EIO return
1049 } else {
1050 dout(25) << __func__ << " case2: going to do fragmented read." << dendl;
1051 int subchunk_size =
1052 sinfo.get_chunk_size() / ec_impl->get_sub_chunk_count();
1053 bool error = false;
1054 for (int m = 0; m < (int)j->get<1>() && !error;
1055 m += sinfo.get_chunk_size()) {
1056 for (auto &&k:op.subchunks.find(i->first)->second) {
1057 bufferlist bl0;
1058 r = store->read(
1059 ch,
1060 ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1061 j->get<0>() + m + (k.first)*subchunk_size,
1062 (k.second)*subchunk_size,
1063 bl0, j->get<2>());
1064 if (r < 0) {
1065 error = true;
1066 break;
1067 }
1068 bl.claim_append(bl0);
1069 }
1070 }
1071 }
1072
7c673cae 1073 if (r < 0) {
11fdf7f2
TL
1074 // if we are doing fast reads, it's possible for one of the shard
1075 // reads to cross paths with another update and get a (harmless)
1076 // ENOENT. Suppress the message to the cluster log in that case.
1077 if (r == -ENOENT && get_parent()->get_pool().fast_read) {
1078 dout(5) << __func__ << ": Error " << r
1079 << " reading " << i->first << ", fast read, probably ok"
1080 << dendl;
1081 } else {
1082 get_parent()->clog_error() << "Error " << r
1083 << " reading object "
1084 << i->first;
1085 dout(5) << __func__ << ": Error " << r
1086 << " reading " << i->first << dendl;
1087 }
7c673cae
FG
1088 goto error;
1089 } else {
1090 dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
1091 reply->buffers_read[i->first].push_back(
1092 make_pair(
1093 j->get<0>(),
1094 bl)
1095 );
1096 }
1097
1098 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1099 // This shows that we still need deep scrub because large enough files
1100 // are read in sections, so the digest check here won't be done here.
1101 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1102 // the state of our chunk in case other chunks could substitute.
11fdf7f2
TL
1103 ECUtil::HashInfoRef hinfo;
1104 hinfo = get_hash_info(i->first);
1105 if (!hinfo) {
1106 r = -EIO;
1107 get_parent()->clog_error() << "Corruption detected: object "
1108 << i->first
1109 << " is missing hash_info";
1110 dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
1111 goto error;
1112 }
1113 ceph_assert(hinfo->has_chunk_hash());
7c673cae
FG
1114 if ((bl.length() == hinfo->get_total_chunk_size()) &&
1115 (j->get<0>() == 0)) {
1116 dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
1117 bufferhash h(-1);
1118 h << bl;
1119 if (h.digest() != hinfo->get_chunk_hash(shard)) {
c07f9fc5 1120 get_parent()->clog_error() << "Bad hash for " << i->first << " digest 0x"
7c673cae
FG
1121 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec;
1122 dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
1123 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
1124 r = -EIO;
1125 goto error;
1126 }
1127 }
1128 }
1129 }
1130 continue;
1131error:
1132 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1133 // the state of our chunk in case other chunks could substitute.
1134 reply->buffers_read.erase(i->first);
1135 reply->errors[i->first] = r;
1136 }
1137 for (set<hobject_t>::iterator i = op.attrs_to_read.begin();
1138 i != op.attrs_to_read.end();
1139 ++i) {
1140 dout(10) << __func__ << ": fulfilling attr request on "
1141 << *i << dendl;
1142 if (reply->errors.count(*i))
1143 continue;
1144 int r = store->getattrs(
1145 ch,
1146 ghobject_t(
11fdf7f2 1147 *i, ghobject_t::NO_GEN, shard),
7c673cae
FG
1148 reply->attrs_read[*i]);
1149 if (r < 0) {
91327a77
AA
1150 // If we read error, we should not return the attrs too.
1151 reply->attrs_read.erase(*i);
7c673cae
FG
1152 reply->buffers_read.erase(*i);
1153 reply->errors[*i] = r;
1154 }
1155 }
1156 reply->from = get_parent()->whoami_shard();
1157 reply->tid = op.tid;
1158}
1159
1160void ECBackend::handle_sub_write_reply(
1161 pg_shard_t from,
1162 const ECSubWriteReply &op,
1163 const ZTracer::Trace &trace)
1164{
1165 map<ceph_tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
11fdf7f2 1166 ceph_assert(i != tid_to_op_map.end());
7c673cae
FG
1167 if (op.committed) {
1168 trace.event("sub write committed");
11fdf7f2 1169 ceph_assert(i->second.pending_commit.count(from));
7c673cae
FG
1170 i->second.pending_commit.erase(from);
1171 if (from != get_parent()->whoami_shard()) {
1172 get_parent()->update_peer_last_complete_ondisk(from, op.last_complete);
1173 }
1174 }
1175 if (op.applied) {
1176 trace.event("sub write applied");
11fdf7f2 1177 ceph_assert(i->second.pending_apply.count(from));
7c673cae
FG
1178 i->second.pending_apply.erase(from);
1179 }
1180
11fdf7f2
TL
1181 if (i->second.pending_commit.empty() &&
1182 i->second.on_all_commit &&
1183 // also wait for apply, to preserve ordering with luminous peers.
1184 i->second.pending_apply.empty()) {
7c673cae
FG
1185 dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
1186 i->second.on_all_commit->complete(0);
1187 i->second.on_all_commit = 0;
1188 i->second.trace.event("ec write all committed");
1189 }
1190 check_ops();
1191}
1192
1193void ECBackend::handle_sub_read_reply(
1194 pg_shard_t from,
1195 ECSubReadReply &op,
1196 RecoveryMessages *m,
1197 const ZTracer::Trace &trace)
1198{
1199 trace.event("ec sub read reply");
1200 dout(10) << __func__ << ": reply " << op << dendl;
1201 map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
1202 if (iter == tid_to_read_map.end()) {
1203 //canceled
1204 dout(20) << __func__ << ": dropped " << op << dendl;
1205 return;
1206 }
1207 ReadOp &rop = iter->second;
1208 for (auto i = op.buffers_read.begin();
1209 i != op.buffers_read.end();
1210 ++i) {
11fdf7f2 1211 ceph_assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer
7c673cae
FG
1212 if (!rop.to_read.count(i->first)) {
1213 // We canceled this read! @see filter_read_op
1214 dout(20) << __func__ << " to_read skipping" << dendl;
1215 continue;
1216 }
1217 list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter =
1218 rop.to_read.find(i->first)->second.to_read.begin();
1219 list<
1220 boost::tuple<
1221 uint64_t, uint64_t, map<pg_shard_t, bufferlist> > >::iterator riter =
1222 rop.complete[i->first].returned.begin();
1223 for (list<pair<uint64_t, bufferlist> >::iterator j = i->second.begin();
1224 j != i->second.end();
1225 ++j, ++req_iter, ++riter) {
11fdf7f2
TL
1226 ceph_assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
1227 ceph_assert(riter != rop.complete[i->first].returned.end());
7c673cae
FG
1228 pair<uint64_t, uint64_t> adjusted =
1229 sinfo.aligned_offset_len_to_chunk(
1230 make_pair(req_iter->get<0>(), req_iter->get<1>()));
11fdf7f2 1231 ceph_assert(adjusted.first == j->first);
f67539c2 1232 riter->get<2>()[from] = std::move(j->second);
7c673cae
FG
1233 }
1234 }
1235 for (auto i = op.attrs_read.begin();
1236 i != op.attrs_read.end();
1237 ++i) {
11fdf7f2 1238 ceph_assert(!op.errors.count(i->first)); // if read error better not have sent an attribute
7c673cae
FG
1239 if (!rop.to_read.count(i->first)) {
1240 // We canceled this read! @see filter_read_op
1241 dout(20) << __func__ << " to_read skipping" << dendl;
1242 continue;
1243 }
20effc67 1244 rop.complete[i->first].attrs.emplace();
7c673cae
FG
1245 (*(rop.complete[i->first].attrs)).swap(i->second);
1246 }
1247 for (auto i = op.errors.begin();
1248 i != op.errors.end();
1249 ++i) {
1250 rop.complete[i->first].errors.insert(
1251 make_pair(
1252 from,
1253 i->second));
1254 dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl;
1255 }
1256
1257 map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
1258 shard_to_read_map.find(from);
11fdf7f2
TL
1259 ceph_assert(siter != shard_to_read_map.end());
1260 ceph_assert(siter->second.count(op.tid));
7c673cae
FG
1261 siter->second.erase(op.tid);
1262
11fdf7f2 1263 ceph_assert(rop.in_progress.count(from));
7c673cae
FG
1264 rop.in_progress.erase(from);
1265 unsigned is_complete = 0;
f67539c2 1266 bool need_resend = false;
7c673cae
FG
1267 // For redundant reads check for completion as each shard comes in,
1268 // or in a non-recovery read check for completion once all the shards read.
b32b8144 1269 if (rop.do_redundant_reads || rop.in_progress.empty()) {
7c673cae
FG
1270 for (map<hobject_t, read_result_t>::const_iterator iter =
1271 rop.complete.begin();
1272 iter != rop.complete.end();
1273 ++iter) {
1274 set<int> have;
1275 for (map<pg_shard_t, bufferlist>::const_iterator j =
1276 iter->second.returned.front().get<2>().begin();
1277 j != iter->second.returned.front().get<2>().end();
1278 ++j) {
1279 have.insert(j->first.shard);
1280 dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
1281 }
11fdf7f2 1282 map<int, vector<pair<int, int>>> dummy_minimum;
7c673cae 1283 int err;
28e407b8 1284 if ((err = ec_impl->minimum_to_decode(rop.want_to_read[iter->first], have, &dummy_minimum)) < 0) {
7c673cae
FG
1285 dout(20) << __func__ << " minimum_to_decode failed" << dendl;
1286 if (rop.in_progress.empty()) {
11fdf7f2
TL
1287 // If we don't have enough copies, try other pg_shard_ts if available.
1288 // During recovery there may be multiple osds with copies of the same shard,
1289 // so getting EIO from one may result in multiple passes through this code path.
7c673cae
FG
1290 if (!rop.do_redundant_reads) {
1291 int r = send_all_remaining_reads(iter->first, rop);
1292 if (r == 0) {
f67539c2
TL
1293 // We changed the rop's to_read and not incrementing is_complete
1294 need_resend = true;
7c673cae
FG
1295 continue;
1296 }
1297 // Couldn't read any additional shards so handle as completed with errors
1298 }
c07f9fc5
FG
1299 // We don't want to confuse clients / RBD with objectstore error
1300 // values in particular ENOENT. We may have different error returns
1301 // from different shards, so we'll return minimum_to_decode() error
1302 // (usually EIO) to reader. It is likely an error here is due to a
1303 // damaged pg.
7c673cae
FG
1304 rop.complete[iter->first].r = err;
1305 ++is_complete;
1306 }
1307 } else {
11fdf7f2 1308 ceph_assert(rop.complete[iter->first].r == 0);
7c673cae
FG
1309 if (!rop.complete[iter->first].errors.empty()) {
1310 if (cct->_conf->osd_read_ec_check_for_errors) {
1311 dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl;
1312 err = rop.complete[iter->first].errors.begin()->second;
1313 rop.complete[iter->first].r = err;
1314 } else {
c07f9fc5 1315 get_parent()->clog_warn() << "Error(s) ignored for "
7c673cae
FG
1316 << iter->first << " enough copies available";
1317 dout(10) << __func__ << " Error(s) ignored for " << iter->first
1318 << " enough copies available" << dendl;
1319 rop.complete[iter->first].errors.clear();
1320 }
1321 }
f67539c2
TL
1322 // avoid re-read for completed object as we may send remaining reads for uncopmpleted objects
1323 rop.to_read.at(iter->first).need.clear();
1324 rop.to_read.at(iter->first).want_attrs = false;
7c673cae
FG
1325 ++is_complete;
1326 }
1327 }
1328 }
f67539c2
TL
1329 if (need_resend) {
1330 do_read_op(rop);
1331 } else if (rop.in_progress.empty() ||
1332 is_complete == rop.complete.size()) {
7c673cae
FG
1333 dout(20) << __func__ << " Complete: " << rop << dendl;
1334 rop.trace.event("ec read complete");
1335 complete_read_op(rop, m);
1336 } else {
1337 dout(10) << __func__ << " readop not complete: " << rop << dendl;
1338 }
1339}
1340
1341void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
1342{
1343 map<hobject_t, read_request_t>::iterator reqiter =
1344 rop.to_read.begin();
1345 map<hobject_t, read_result_t>::iterator resiter =
1346 rop.complete.begin();
11fdf7f2 1347 ceph_assert(rop.to_read.size() == rop.complete.size());
7c673cae
FG
1348 for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
1349 if (reqiter->second.cb) {
1350 pair<RecoveryMessages *, read_result_t &> arg(
1351 m, resiter->second);
1352 reqiter->second.cb->complete(arg);
11fdf7f2 1353 reqiter->second.cb = nullptr;
7c673cae
FG
1354 }
1355 }
11fdf7f2
TL
1356 // if the read op is over. clean all the data of this tid.
1357 for (set<pg_shard_t>::iterator iter = rop.in_progress.begin();
1358 iter != rop.in_progress.end();
1359 iter++) {
1360 shard_to_read_map[*iter].erase(rop.tid);
1361 }
1362 rop.in_progress.clear();
7c673cae
FG
1363 tid_to_read_map.erase(rop.tid);
1364}
1365
1366struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
1367 ECBackend *ec;
1368 ceph_tid_t tid;
1369 FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
1370 void finish(ThreadPool::TPHandle &handle) override {
1371 auto ropiter = ec->tid_to_read_map.find(tid);
11fdf7f2 1372 ceph_assert(ropiter != ec->tid_to_read_map.end());
7c673cae
FG
1373 int priority = ropiter->second.priority;
1374 RecoveryMessages rm;
1375 ec->complete_read_op(ropiter->second, &rm);
1376 ec->dispatch_recovery_messages(rm, priority);
1377 }
1378};
1379
1380void ECBackend::filter_read_op(
1381 const OSDMapRef& osdmap,
1382 ReadOp &op)
1383{
1384 set<hobject_t> to_cancel;
1385 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1386 i != op.source_to_obj.end();
1387 ++i) {
1388 if (osdmap->is_down(i->first.osd)) {
1389 to_cancel.insert(i->second.begin(), i->second.end());
1390 op.in_progress.erase(i->first);
1391 continue;
1392 }
1393 }
1394
1395 if (to_cancel.empty())
1396 return;
1397
1398 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1399 i != op.source_to_obj.end();
1400 ) {
1401 for (set<hobject_t>::iterator j = i->second.begin();
1402 j != i->second.end();
1403 ) {
1404 if (to_cancel.count(*j))
1405 i->second.erase(j++);
1406 else
1407 ++j;
1408 }
1409 if (i->second.empty()) {
1410 op.source_to_obj.erase(i++);
1411 } else {
11fdf7f2 1412 ceph_assert(!osdmap->is_down(i->first.osd));
7c673cae
FG
1413 ++i;
1414 }
1415 }
1416
1417 for (set<hobject_t>::iterator i = to_cancel.begin();
1418 i != to_cancel.end();
1419 ++i) {
1420 get_parent()->cancel_pull(*i);
1421
11fdf7f2 1422 ceph_assert(op.to_read.count(*i));
7c673cae
FG
1423 read_request_t &req = op.to_read.find(*i)->second;
1424 dout(10) << __func__ << ": canceling " << req
1425 << " for obj " << *i << dendl;
11fdf7f2 1426 ceph_assert(req.cb);
7c673cae 1427 delete req.cb;
11fdf7f2 1428 req.cb = nullptr;
7c673cae
FG
1429
1430 op.to_read.erase(*i);
1431 op.complete.erase(*i);
1432 recovery_ops.erase(*i);
1433 }
1434
1435 if (op.in_progress.empty()) {
1436 get_parent()->schedule_recovery_work(
11fdf7f2 1437 get_parent()->bless_unlocked_gencontext(
7c673cae
FG
1438 new FinishReadOp(this, op.tid)));
1439 }
1440}
1441
1442void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
1443{
1444 set<ceph_tid_t> tids_to_filter;
1445 for (map<pg_shard_t, set<ceph_tid_t> >::iterator
1446 i = shard_to_read_map.begin();
1447 i != shard_to_read_map.end();
1448 ) {
1449 if (osdmap->is_down(i->first.osd)) {
1450 tids_to_filter.insert(i->second.begin(), i->second.end());
1451 shard_to_read_map.erase(i++);
1452 } else {
1453 ++i;
1454 }
1455 }
1456 for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
1457 i != tids_to_filter.end();
1458 ++i) {
1459 map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
11fdf7f2 1460 ceph_assert(j != tid_to_read_map.end());
7c673cae
FG
1461 filter_read_op(osdmap, j->second);
1462 }
1463}
1464
1465void ECBackend::on_change()
1466{
1467 dout(10) << __func__ << dendl;
1468
1469 completed_to = eversion_t();
1470 committed_to = eversion_t();
1471 pipeline_state.clear();
1472 waiting_reads.clear();
1473 waiting_state.clear();
1474 waiting_commit.clear();
1475 for (auto &&op: tid_to_op_map) {
1476 cache.release_write_pin(op.second.pin);
1477 }
1478 tid_to_op_map.clear();
1479
1480 for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
1481 i != tid_to_read_map.end();
1482 ++i) {
1483 dout(10) << __func__ << ": cancelling " << i->second << dendl;
1484 for (map<hobject_t, read_request_t>::iterator j =
1485 i->second.to_read.begin();
1486 j != i->second.to_read.end();
1487 ++j) {
1488 delete j->second.cb;
11fdf7f2 1489 j->second.cb = nullptr;
7c673cae
FG
1490 }
1491 }
1492 tid_to_read_map.clear();
1493 in_progress_client_reads.clear();
1494 shard_to_read_map.clear();
1495 clear_recovery_state();
1496}
1497
1498void ECBackend::clear_recovery_state()
1499{
1500 recovery_ops.clear();
1501}
1502
7c673cae
FG
1503void ECBackend::dump_recovery_info(Formatter *f) const
1504{
1505 f->open_array_section("recovery_ops");
1506 for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
1507 i != recovery_ops.end();
1508 ++i) {
1509 f->open_object_section("op");
1510 i->second.dump(f);
1511 f->close_section();
1512 }
1513 f->close_section();
1514 f->open_array_section("read_ops");
1515 for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
1516 i != tid_to_read_map.end();
1517 ++i) {
1518 f->open_object_section("read_op");
1519 i->second.dump(f);
1520 f->close_section();
1521 }
1522 f->close_section();
1523}
1524
1525void ECBackend::submit_transaction(
1526 const hobject_t &hoid,
1527 const object_stat_sum_t &delta_stats,
1528 const eversion_t &at_version,
1529 PGTransactionUPtr &&t,
1530 const eversion_t &trim_to,
9f95a23c 1531 const eversion_t &min_last_complete_ondisk,
f67539c2 1532 vector<pg_log_entry_t>&& log_entries,
9f95a23c 1533 std::optional<pg_hit_set_history_t> &hset_history,
7c673cae
FG
1534 Context *on_all_commit,
1535 ceph_tid_t tid,
1536 osd_reqid_t reqid,
1537 OpRequestRef client_op
1538 )
1539{
11fdf7f2 1540 ceph_assert(!tid_to_op_map.count(tid));
7c673cae
FG
1541 Op *op = &(tid_to_op_map[tid]);
1542 op->hoid = hoid;
1543 op->delta_stats = delta_stats;
1544 op->version = at_version;
1545 op->trim_to = trim_to;
9f95a23c 1546 op->roll_forward_to = std::max(min_last_complete_ondisk, committed_to);
7c673cae
FG
1547 op->log_entries = log_entries;
1548 std::swap(op->updated_hit_set_history, hset_history);
7c673cae
FG
1549 op->on_all_commit = on_all_commit;
1550 op->tid = tid;
1551 op->reqid = reqid;
1552 op->client_op = client_op;
20effc67
TL
1553 jspan span;
1554 if (client_op) {
7c673cae 1555 op->trace = client_op->pg_trace;
20effc67 1556 span = tracing::osd::tracer.add_span("ECBackend::submit_transaction", client_op->osd_parent_span);
f67539c2 1557 }
7c673cae
FG
1558 dout(10) << __func__ << ": op " << *op << " starting" << dendl;
1559 start_rmw(op, std::move(t));
7c673cae
FG
1560}
1561
1562void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
1563 if (!waiting_state.empty()) {
1564 waiting_state.back().on_write.emplace_back(std::move(cb));
1565 } else if (!waiting_reads.empty()) {
1566 waiting_reads.back().on_write.emplace_back(std::move(cb));
1567 } else {
1568 // Nothing earlier in the pipeline, just call it
1569 cb();
1570 }
1571}
1572
b32b8144 1573void ECBackend::get_all_avail_shards(
7c673cae 1574 const hobject_t &hoid,
28e407b8 1575 const set<pg_shard_t> &error_shards,
b32b8144
FG
1576 set<int> &have,
1577 map<shard_id_t, pg_shard_t> &shards,
1578 bool for_recovery)
7c673cae 1579{
7c673cae
FG
1580 for (set<pg_shard_t>::const_iterator i =
1581 get_parent()->get_acting_shards().begin();
1582 i != get_parent()->get_acting_shards().end();
1583 ++i) {
1584 dout(10) << __func__ << ": checking acting " << *i << dendl;
1585 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
28e407b8
AA
1586 if (error_shards.find(*i) != error_shards.end())
1587 continue;
7c673cae 1588 if (!missing.is_missing(hoid)) {
11fdf7f2 1589 ceph_assert(!have.count(i->shard));
7c673cae 1590 have.insert(i->shard);
11fdf7f2 1591 ceph_assert(!shards.count(i->shard));
7c673cae
FG
1592 shards.insert(make_pair(i->shard, *i));
1593 }
1594 }
1595
1596 if (for_recovery) {
1597 for (set<pg_shard_t>::const_iterator i =
1598 get_parent()->get_backfill_shards().begin();
1599 i != get_parent()->get_backfill_shards().end();
1600 ++i) {
28e407b8
AA
1601 if (error_shards.find(*i) != error_shards.end())
1602 continue;
7c673cae 1603 if (have.count(i->shard)) {
11fdf7f2 1604 ceph_assert(shards.count(i->shard));
7c673cae
FG
1605 continue;
1606 }
1607 dout(10) << __func__ << ": checking backfill " << *i << dendl;
11fdf7f2 1608 ceph_assert(!shards.count(i->shard));
7c673cae
FG
1609 const pg_info_t &info = get_parent()->get_shard_info(*i);
1610 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1611 if (hoid < info.last_backfill &&
1612 !missing.is_missing(hoid)) {
1613 have.insert(i->shard);
1614 shards.insert(make_pair(i->shard, *i));
1615 }
1616 }
1617
1618 map<hobject_t, set<pg_shard_t>>::const_iterator miter =
1619 get_parent()->get_missing_loc_shards().find(hoid);
1620 if (miter != get_parent()->get_missing_loc_shards().end()) {
1621 for (set<pg_shard_t>::iterator i = miter->second.begin();
1622 i != miter->second.end();
1623 ++i) {
1624 dout(10) << __func__ << ": checking missing_loc " << *i << dendl;
1625 auto m = get_parent()->maybe_get_shard_missing(*i);
1626 if (m) {
11fdf7f2 1627 ceph_assert(!(*m).is_missing(hoid));
7c673cae 1628 }
28e407b8
AA
1629 if (error_shards.find(*i) != error_shards.end())
1630 continue;
7c673cae
FG
1631 have.insert(i->shard);
1632 shards.insert(make_pair(i->shard, *i));
1633 }
1634 }
1635 }
b32b8144
FG
1636}
1637
1638int ECBackend::get_min_avail_to_read_shards(
1639 const hobject_t &hoid,
1640 const set<int> &want,
1641 bool for_recovery,
1642 bool do_redundant_reads,
11fdf7f2 1643 map<pg_shard_t, vector<pair<int, int>>> *to_read)
b32b8144
FG
1644{
1645 // Make sure we don't do redundant reads for recovery
11fdf7f2 1646 ceph_assert(!for_recovery || !do_redundant_reads);
b32b8144
FG
1647
1648 set<int> have;
1649 map<shard_id_t, pg_shard_t> shards;
28e407b8 1650 set<pg_shard_t> error_shards;
b32b8144 1651
28e407b8 1652 get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
7c673cae 1653
11fdf7f2 1654 map<int, vector<pair<int, int>>> need;
7c673cae
FG
1655 int r = ec_impl->minimum_to_decode(want, have, &need);
1656 if (r < 0)
1657 return r;
1658
1659 if (do_redundant_reads) {
11fdf7f2
TL
1660 vector<pair<int, int>> subchunks_list;
1661 subchunks_list.push_back(make_pair(0, ec_impl->get_sub_chunk_count()));
1662 for (auto &&i: have) {
1663 need[i] = subchunks_list;
1664 }
7c673cae
FG
1665 }
1666
1667 if (!to_read)
1668 return 0;
1669
11fdf7f2
TL
1670 for (auto &&i:need) {
1671 ceph_assert(shards.count(shard_id_t(i.first)));
1672 to_read->insert(make_pair(shards[shard_id_t(i.first)], i.second));
7c673cae
FG
1673 }
1674 return 0;
1675}
1676
1677int ECBackend::get_remaining_shards(
1678 const hobject_t &hoid,
1679 const set<int> &avail,
28e407b8
AA
1680 const set<int> &want,
1681 const read_result_t &result,
11fdf7f2 1682 map<pg_shard_t, vector<pair<int, int>>> *to_read,
b32b8144 1683 bool for_recovery)
7c673cae 1684{
11fdf7f2 1685 ceph_assert(to_read);
7c673cae 1686
b32b8144
FG
1687 set<int> have;
1688 map<shard_id_t, pg_shard_t> shards;
28e407b8
AA
1689 set<pg_shard_t> error_shards;
1690 for (auto &p : result.errors) {
1691 error_shards.insert(p.first);
1692 }
1693
1694 get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
1695
11fdf7f2 1696 map<int, vector<pair<int, int>>> need;
28e407b8
AA
1697 int r = ec_impl->minimum_to_decode(want, have, &need);
1698 if (r < 0) {
1699 dout(0) << __func__ << " not enough shards left to try for " << hoid
1700 << " read result was " << result << dendl;
1701 return -EIO;
1702 }
7c673cae 1703
28e407b8
AA
1704 set<int> shards_left;
1705 for (auto p : need) {
11fdf7f2
TL
1706 if (avail.find(p.first) == avail.end()) {
1707 shards_left.insert(p.first);
28e407b8
AA
1708 }
1709 }
7c673cae 1710
11fdf7f2
TL
1711 vector<pair<int, int>> subchunks;
1712 subchunks.push_back(make_pair(0, ec_impl->get_sub_chunk_count()));
28e407b8
AA
1713 for (set<int>::iterator i = shards_left.begin();
1714 i != shards_left.end();
7c673cae 1715 ++i) {
11fdf7f2
TL
1716 ceph_assert(shards.count(shard_id_t(*i)));
1717 ceph_assert(avail.find(*i) == avail.end());
1718 to_read->insert(make_pair(shards[shard_id_t(*i)], subchunks));
7c673cae
FG
1719 }
1720 return 0;
1721}
1722
1723void ECBackend::start_read_op(
1724 int priority,
28e407b8 1725 map<hobject_t, set<int>> &want_to_read,
7c673cae
FG
1726 map<hobject_t, read_request_t> &to_read,
1727 OpRequestRef _op,
1728 bool do_redundant_reads,
1729 bool for_recovery)
1730{
1731 ceph_tid_t tid = get_parent()->get_tid();
11fdf7f2 1732 ceph_assert(!tid_to_read_map.count(tid));
7c673cae
FG
1733 auto &op = tid_to_read_map.emplace(
1734 tid,
1735 ReadOp(
1736 priority,
1737 tid,
1738 do_redundant_reads,
1739 for_recovery,
1740 _op,
28e407b8 1741 std::move(want_to_read),
7c673cae
FG
1742 std::move(to_read))).first->second;
1743 dout(10) << __func__ << ": starting " << op << dendl;
1744 if (_op) {
1745 op.trace = _op->pg_trace;
1746 op.trace.event("start ec read");
1747 }
1748 do_read_op(op);
1749}
1750
1751void ECBackend::do_read_op(ReadOp &op)
1752{
1753 int priority = op.priority;
1754 ceph_tid_t tid = op.tid;
1755
1756 dout(10) << __func__ << ": starting read " << op << dendl;
1757
1758 map<pg_shard_t, ECSubRead> messages;
1759 for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
1760 i != op.to_read.end();
1761 ++i) {
1762 bool need_attrs = i->second.want_attrs;
11fdf7f2
TL
1763
1764 for (auto j = i->second.need.begin();
7c673cae
FG
1765 j != i->second.need.end();
1766 ++j) {
1767 if (need_attrs) {
11fdf7f2 1768 messages[j->first].attrs_to_read.insert(i->first);
7c673cae
FG
1769 need_attrs = false;
1770 }
11fdf7f2
TL
1771 messages[j->first].subchunks[i->first] = j->second;
1772 op.obj_to_source[i->first].insert(j->first);
1773 op.source_to_obj[j->first].insert(i->first);
7c673cae
FG
1774 }
1775 for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
1776 i->second.to_read.begin();
1777 j != i->second.to_read.end();
1778 ++j) {
1779 pair<uint64_t, uint64_t> chunk_off_len =
1780 sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
11fdf7f2 1781 for (auto k = i->second.need.begin();
7c673cae
FG
1782 k != i->second.need.end();
1783 ++k) {
11fdf7f2 1784 messages[k->first].to_read[i->first].push_back(
7c673cae
FG
1785 boost::make_tuple(
1786 chunk_off_len.first,
1787 chunk_off_len.second,
1788 j->get<2>()));
1789 }
11fdf7f2 1790 ceph_assert(!need_attrs);
7c673cae
FG
1791 }
1792 }
1793
9f95a23c
TL
1794 std::vector<std::pair<int, Message*>> m;
1795 m.reserve(messages.size());
7c673cae
FG
1796 for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
1797 i != messages.end();
1798 ++i) {
1799 op.in_progress.insert(i->first);
1800 shard_to_read_map[i->first].insert(op.tid);
1801 i->second.tid = tid;
1802 MOSDECSubOpRead *msg = new MOSDECSubOpRead;
1803 msg->set_priority(priority);
1804 msg->pgid = spg_t(
1805 get_parent()->whoami_spg_t().pgid,
1806 i->first.shard);
11fdf7f2 1807 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
1808 msg->min_epoch = get_parent()->get_interval_start_epoch();
1809 msg->op = i->second;
1810 msg->op.from = get_parent()->whoami_shard();
1811 msg->op.tid = tid;
1812 if (op.trace) {
1813 // initialize a child span for this shard
1814 msg->trace.init("ec sub read", nullptr, &op.trace);
1815 msg->trace.keyval("shard", i->first.shard.id);
1816 }
9f95a23c 1817 m.push_back(std::make_pair(i->first.osd, msg));
7c673cae 1818 }
9f95a23c
TL
1819 if (!m.empty()) {
1820 get_parent()->send_message_osd_cluster(m, get_osdmap_epoch());
1821 }
1822
7c673cae
FG
1823 dout(10) << __func__ << ": started " << op << dendl;
1824}
1825
1826ECUtil::HashInfoRef ECBackend::get_hash_info(
20effc67 1827 const hobject_t &hoid, bool create, const map<string,bufferptr,less<>> *attrs)
7c673cae
FG
1828{
1829 dout(10) << __func__ << ": Getting attr on " << hoid << dendl;
1830 ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid);
1831 if (!ref) {
1832 dout(10) << __func__ << ": not in cache " << hoid << dendl;
1833 struct stat st;
1834 int r = store->stat(
1835 ch,
1836 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1837 &st);
1838 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
7c673cae
FG
1839 if (r >= 0) {
1840 dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl;
1841 bufferlist bl;
1842 if (attrs) {
1843 map<string, bufferptr>::const_iterator k = attrs->find(ECUtil::get_hinfo_key());
1844 if (k == attrs->end()) {
1845 dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl;
1846 } else {
1847 bl.push_back(k->second);
1848 }
1849 } else {
1850 r = store->getattr(
1851 ch,
1852 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1853 ECUtil::get_hinfo_key(),
1854 bl);
1855 if (r < 0) {
1856 dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl;
1857 bl.clear(); // just in case
1858 }
1859 }
1860 if (bl.length() > 0) {
11fdf7f2 1861 auto bp = bl.cbegin();
94b18763 1862 try {
11fdf7f2 1863 decode(hinfo, bp);
94b18763
FG
1864 } catch(...) {
1865 dout(0) << __func__ << ": Can't decode hinfo for " << hoid << dendl;
1866 return ECUtil::HashInfoRef();
1867 }
a4b75251 1868 if (hinfo.get_total_chunk_size() != (uint64_t)st.st_size) {
7c673cae
FG
1869 dout(0) << __func__ << ": Mismatch of total_chunk_size "
1870 << hinfo.get_total_chunk_size() << dendl;
1871 return ECUtil::HashInfoRef();
1872 }
1873 } else if (st.st_size > 0) { // If empty object and no hinfo, create it
1874 return ECUtil::HashInfoRef();
1875 }
a4b75251
TL
1876 } else if (r != -ENOENT || !create) {
1877 derr << __func__ << ": stat " << hoid << " failed: " << cpp_strerror(r)
1878 << dendl;
1879 return ECUtil::HashInfoRef();
7c673cae
FG
1880 }
1881 ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
1882 }
1883 return ref;
1884}
1885
1886void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
1887{
11fdf7f2 1888 ceph_assert(op);
7c673cae
FG
1889
1890 op->plan = ECTransaction::get_write_plan(
1891 sinfo,
1892 std::move(t),
1893 [&](const hobject_t &i) {
a4b75251 1894 ECUtil::HashInfoRef ref = get_hash_info(i, true);
7c673cae
FG
1895 if (!ref) {
1896 derr << __func__ << ": get_hash_info(" << i << ")"
1897 << " returned a null pointer and there is no "
1898 << " way to recover from such an error in this "
1899 << " context" << dendl;
1900 ceph_abort();
1901 }
1902 return ref;
1903 },
1904 get_parent()->get_dpp());
1905
1906 dout(10) << __func__ << ": " << *op << dendl;
1907
1908 waiting_state.push_back(*op);
1909 check_ops();
1910}
1911
1912bool ECBackend::try_state_to_reads()
1913{
1914 if (waiting_state.empty())
1915 return false;
1916
1917 Op *op = &(waiting_state.front());
1918 if (op->requires_rmw() && pipeline_state.cache_invalid()) {
11fdf7f2 1919 ceph_assert(get_parent()->get_pool().allows_ecoverwrites());
7c673cae
FG
1920 dout(20) << __func__ << ": blocking " << *op
1921 << " because it requires an rmw and the cache is invalid "
1922 << pipeline_state
1923 << dendl;
1924 return false;
1925 }
1926
11fdf7f2
TL
1927 if (!pipeline_state.caching_enabled()) {
1928 op->using_cache = false;
1929 } else if (op->invalidates_cache()) {
7c673cae
FG
1930 dout(20) << __func__ << ": invalidating cache after this op"
1931 << dendl;
1932 pipeline_state.invalidate();
7c673cae
FG
1933 }
1934
1935 waiting_state.pop_front();
1936 waiting_reads.push_back(*op);
1937
1938 if (op->using_cache) {
1939 cache.open_write_pin(op->pin);
1940
1941 extent_set empty;
1942 for (auto &&hpair: op->plan.will_write) {
1943 auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
1944 const extent_set &to_read_plan =
1945 to_read_plan_iter == op->plan.to_read.end() ?
1946 empty :
1947 to_read_plan_iter->second;
1948
1949 extent_set remote_read = cache.reserve_extents_for_rmw(
1950 hpair.first,
1951 op->pin,
1952 hpair.second,
1953 to_read_plan);
1954
1955 extent_set pending_read = to_read_plan;
1956 pending_read.subtract(remote_read);
1957
1958 if (!remote_read.empty()) {
1959 op->remote_read[hpair.first] = std::move(remote_read);
1960 }
1961 if (!pending_read.empty()) {
1962 op->pending_read[hpair.first] = std::move(pending_read);
1963 }
1964 }
1965 } else {
1966 op->remote_read = op->plan.to_read;
1967 }
1968
1969 dout(10) << __func__ << ": " << *op << dendl;
1970
1971 if (!op->remote_read.empty()) {
11fdf7f2 1972 ceph_assert(get_parent()->get_pool().allows_ecoverwrites());
7c673cae
FG
1973 objects_read_async_no_cache(
1974 op->remote_read,
1975 [this, op](map<hobject_t,pair<int, extent_map> > &&results) {
1976 for (auto &&i: results) {
1977 op->remote_read_result.emplace(i.first, i.second.second);
1978 }
1979 check_ops();
1980 });
1981 }
1982
1983 return true;
1984}
1985
1986bool ECBackend::try_reads_to_commit()
1987{
1988 if (waiting_reads.empty())
1989 return false;
1990 Op *op = &(waiting_reads.front());
1991 if (op->read_in_progress())
1992 return false;
1993 waiting_reads.pop_front();
1994 waiting_commit.push_back(*op);
1995
1996 dout(10) << __func__ << ": starting commit on " << *op << dendl;
1997 dout(20) << __func__ << ": " << cache << dendl;
1998
1999 get_parent()->apply_stats(
2000 op->hoid,
2001 op->delta_stats);
2002
2003 if (op->using_cache) {
2004 for (auto &&hpair: op->pending_read) {
2005 op->remote_read_result[hpair.first].insert(
2006 cache.get_remaining_extents_for_rmw(
2007 hpair.first,
2008 op->pin,
2009 hpair.second));
2010 }
2011 op->pending_read.clear();
2012 } else {
11fdf7f2 2013 ceph_assert(op->pending_read.empty());
7c673cae
FG
2014 }
2015
2016 map<shard_id_t, ObjectStore::Transaction> trans;
2017 for (set<pg_shard_t>::const_iterator i =
11fdf7f2
TL
2018 get_parent()->get_acting_recovery_backfill_shards().begin();
2019 i != get_parent()->get_acting_recovery_backfill_shards().end();
7c673cae
FG
2020 ++i) {
2021 trans[i->shard];
2022 }
2023
2024 op->trace.event("start ec write");
2025
2026 map<hobject_t,extent_map> written;
2027 if (op->plan.t) {
2028 ECTransaction::generate_transactions(
2029 op->plan,
2030 ec_impl,
2031 get_parent()->get_info().pgid.pgid,
7c673cae
FG
2032 sinfo,
2033 op->remote_read_result,
2034 op->log_entries,
2035 &written,
2036 &trans,
2037 &(op->temp_added),
2038 &(op->temp_cleared),
9f95a23c
TL
2039 get_parent()->get_dpp(),
2040 get_osdmap()->require_osd_release);
7c673cae
FG
2041 }
2042
2043 dout(20) << __func__ << ": " << cache << dendl;
2044 dout(20) << __func__ << ": written: " << written << dendl;
2045 dout(20) << __func__ << ": op: " << *op << dendl;
2046
2047 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2048 for (auto &&i: op->log_entries) {
2049 if (i.requires_kraken()) {
2050 derr << __func__ << ": log entry " << i << " requires kraken"
2051 << " but overwrites are not enabled!" << dendl;
2052 ceph_abort();
2053 }
2054 }
2055 }
2056
2057 map<hobject_t,extent_set> written_set;
2058 for (auto &&i: written) {
2059 written_set[i.first] = i.second.get_interval_set();
2060 }
2061 dout(20) << __func__ << ": written_set: " << written_set << dendl;
11fdf7f2 2062 ceph_assert(written_set == op->plan.will_write);
7c673cae
FG
2063
2064 if (op->using_cache) {
2065 for (auto &&hpair: written) {
2066 dout(20) << __func__ << ": " << hpair << dendl;
2067 cache.present_rmw_update(hpair.first, op->pin, hpair.second);
2068 }
2069 }
2070 op->remote_read.clear();
2071 op->remote_read_result.clear();
2072
7c673cae
FG
2073 ObjectStore::Transaction empty;
2074 bool should_write_local = false;
2075 ECSubWrite local_write_op;
9f95a23c
TL
2076 std::vector<std::pair<int, Message*>> messages;
2077 messages.reserve(get_parent()->get_acting_recovery_backfill_shards().size());
11fdf7f2 2078 set<pg_shard_t> backfill_shards = get_parent()->get_backfill_shards();
7c673cae 2079 for (set<pg_shard_t>::const_iterator i =
11fdf7f2
TL
2080 get_parent()->get_acting_recovery_backfill_shards().begin();
2081 i != get_parent()->get_acting_recovery_backfill_shards().end();
7c673cae
FG
2082 ++i) {
2083 op->pending_apply.insert(*i);
2084 op->pending_commit.insert(*i);
2085 map<shard_id_t, ObjectStore::Transaction>::iterator iter =
2086 trans.find(i->shard);
11fdf7f2 2087 ceph_assert(iter != trans.end());
7c673cae
FG
2088 bool should_send = get_parent()->should_send_op(*i, op->hoid);
2089 const pg_stat_t &stats =
11fdf7f2 2090 (should_send || !backfill_shards.count(*i)) ?
7c673cae
FG
2091 get_info().stats :
2092 parent->get_shard_info().find(*i)->second.stats;
2093
2094 ECSubWrite sop(
2095 get_parent()->whoami_shard(),
2096 op->tid,
2097 op->reqid,
2098 op->hoid,
2099 stats,
2100 should_send ? iter->second : empty,
2101 op->version,
2102 op->trim_to,
2103 op->roll_forward_to,
2104 op->log_entries,
2105 op->updated_hit_set_history,
2106 op->temp_added,
2107 op->temp_cleared,
2108 !should_send);
2109
2110 ZTracer::Trace trace;
2111 if (op->trace) {
2112 // initialize a child span for this shard
2113 trace.init("ec sub write", nullptr, &op->trace);
2114 trace.keyval("shard", i->shard.id);
2115 }
2116
2117 if (*i == get_parent()->whoami_shard()) {
2118 should_write_local = true;
2119 local_write_op.claim(sop);
2120 } else {
2121 MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop);
2122 r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard);
11fdf7f2 2123 r->map_epoch = get_osdmap_epoch();
7c673cae
FG
2124 r->min_epoch = get_parent()->get_interval_start_epoch();
2125 r->trace = trace;
9f95a23c 2126 messages.push_back(std::make_pair(i->osd, r));
7c673cae
FG
2127 }
2128 }
20effc67
TL
2129 jspan span;
2130 if (op->client_op) {
2131 span = tracing::osd::tracer.add_span("EC sub write", op->client_op->osd_parent_span);
2132 }
f67539c2 2133
9f95a23c
TL
2134 if (!messages.empty()) {
2135 get_parent()->send_message_osd_cluster(messages, get_osdmap_epoch());
2136 }
2137
7c673cae 2138 if (should_write_local) {
11fdf7f2
TL
2139 handle_sub_write(
2140 get_parent()->whoami_shard(),
2141 op->client_op,
2142 local_write_op,
2143 op->trace);
7c673cae
FG
2144 }
2145
2146 for (auto i = op->on_write.begin();
2147 i != op->on_write.end();
2148 op->on_write.erase(i++)) {
2149 (*i)();
2150 }
2151
2152 return true;
2153}
2154
2155bool ECBackend::try_finish_rmw()
2156{
2157 if (waiting_commit.empty())
2158 return false;
2159 Op *op = &(waiting_commit.front());
2160 if (op->write_in_progress())
2161 return false;
2162 waiting_commit.pop_front();
2163
2164 dout(10) << __func__ << ": " << *op << dendl;
2165 dout(20) << __func__ << ": " << cache << dendl;
2166
2167 if (op->roll_forward_to > completed_to)
2168 completed_to = op->roll_forward_to;
2169 if (op->version > committed_to)
2170 committed_to = op->version;
2171
9f95a23c 2172 if (get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
7c673cae
FG
2173 if (op->version > get_parent()->get_log().get_can_rollback_to() &&
2174 waiting_reads.empty() &&
2175 waiting_commit.empty()) {
2176 // submit a dummy transaction to kick the rollforward
2177 auto tid = get_parent()->get_tid();
2178 Op *nop = &(tid_to_op_map[tid]);
2179 nop->hoid = op->hoid;
2180 nop->trim_to = op->trim_to;
2181 nop->roll_forward_to = op->version;
2182 nop->tid = tid;
2183 nop->reqid = op->reqid;
2184 waiting_reads.push_back(*nop);
2185 }
2186 }
2187
2188 if (op->using_cache) {
2189 cache.release_write_pin(op->pin);
2190 }
2191 tid_to_op_map.erase(op->tid);
2192
2193 if (waiting_reads.empty() &&
2194 waiting_commit.empty()) {
2195 pipeline_state.clear();
2196 dout(20) << __func__ << ": clearing pipeline_state "
2197 << pipeline_state
2198 << dendl;
2199 }
2200 return true;
2201}
2202
2203void ECBackend::check_ops()
2204{
2205 while (try_state_to_reads() ||
2206 try_reads_to_commit() ||
2207 try_finish_rmw());
2208}
2209
2210int ECBackend::objects_read_sync(
2211 const hobject_t &hoid,
2212 uint64_t off,
2213 uint64_t len,
2214 uint32_t op_flags,
2215 bufferlist *bl)
2216{
2217 return -EOPNOTSUPP;
2218}
2219
2220void ECBackend::objects_read_async(
2221 const hobject_t &hoid,
2222 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2223 pair<bufferlist*, Context*> > > &to_read,
2224 Context *on_complete,
2225 bool fast_read)
2226{
2227 map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
2228 reads;
2229
2230 uint32_t flags = 0;
2231 extent_set es;
2232 for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2233 pair<bufferlist*, Context*> > >::const_iterator i =
2234 to_read.begin();
2235 i != to_read.end();
2236 ++i) {
2237 pair<uint64_t, uint64_t> tmp =
2238 sinfo.offset_len_to_stripe_bounds(
2239 make_pair(i->first.get<0>(), i->first.get<1>()));
2240
11fdf7f2 2241 es.union_insert(tmp.first, tmp.second);
7c673cae
FG
2242 flags |= i->first.get<2>();
2243 }
2244
2245 if (!es.empty()) {
2246 auto &offsets = reads[hoid];
2247 for (auto j = es.begin();
2248 j != es.end();
2249 ++j) {
2250 offsets.push_back(
2251 boost::make_tuple(
2252 j.get_start(),
2253 j.get_len(),
2254 flags));
2255 }
2256 }
2257
2258 struct cb {
2259 ECBackend *ec;
2260 hobject_t hoid;
2261 list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2262 pair<bufferlist*, Context*> > > to_read;
2263 unique_ptr<Context> on_complete;
2264 cb(const cb&) = delete;
2265 cb(cb &&) = default;
2266 cb(ECBackend *ec,
2267 const hobject_t &hoid,
2268 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2269 pair<bufferlist*, Context*> > > &to_read,
2270 Context *on_complete)
2271 : ec(ec),
2272 hoid(hoid),
2273 to_read(to_read),
2274 on_complete(on_complete) {}
2275 void operator()(map<hobject_t,pair<int, extent_map> > &&results) {
2276 auto dpp = ec->get_parent()->get_dpp();
2277 ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
2278 << dendl;
2279 ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
2280 << dendl;
2281
2282 auto &got = results[hoid];
2283
2284 int r = 0;
2285 for (auto &&read: to_read) {
2286 if (got.first < 0) {
2287 if (read.second.second) {
2288 read.second.second->complete(got.first);
2289 }
2290 if (r == 0)
2291 r = got.first;
2292 } else {
11fdf7f2 2293 ceph_assert(read.second.first);
7c673cae
FG
2294 uint64_t offset = read.first.get<0>();
2295 uint64_t length = read.first.get<1>();
2296 auto range = got.second.get_containing_range(offset, length);
11fdf7f2
TL
2297 ceph_assert(range.first != range.second);
2298 ceph_assert(range.first.get_off() <= offset);
91327a77
AA
2299 ldpp_dout(dpp, 30) << "offset: " << offset << dendl;
2300 ldpp_dout(dpp, 30) << "range offset: " << range.first.get_off() << dendl;
2301 ldpp_dout(dpp, 30) << "length: " << length << dendl;
2302 ldpp_dout(dpp, 30) << "range length: " << range.first.get_len() << dendl;
11fdf7f2 2303 ceph_assert(
7c673cae
FG
2304 (offset + length) <=
2305 (range.first.get_off() + range.first.get_len()));
2306 read.second.first->substr_of(
2307 range.first.get_val(),
2308 offset - range.first.get_off(),
2309 length);
2310 if (read.second.second) {
2311 read.second.second->complete(length);
2312 read.second.second = nullptr;
2313 }
2314 }
2315 }
2316 to_read.clear();
2317 if (on_complete) {
2318 on_complete.release()->complete(r);
2319 }
2320 }
2321 ~cb() {
2322 for (auto &&i: to_read) {
2323 delete i.second.second;
2324 }
2325 to_read.clear();
2326 }
2327 };
2328 objects_read_and_reconstruct(
2329 reads,
2330 fast_read,
2331 make_gen_lambda_context<
2332 map<hobject_t,pair<int, extent_map> > &&, cb>(
2333 cb(this,
2334 hoid,
2335 to_read,
2336 on_complete)));
2337}
2338
2339struct CallClientContexts :
2340 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
2341 hobject_t hoid;
2342 ECBackend *ec;
2343 ECBackend::ClientAsyncReadStatus *status;
2344 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
2345 CallClientContexts(
2346 hobject_t hoid,
2347 ECBackend *ec,
2348 ECBackend::ClientAsyncReadStatus *status,
2349 const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
2350 : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
2351 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
2352 ECBackend::read_result_t &res = in.second;
2353 extent_map result;
2354 if (res.r != 0)
2355 goto out;
11fdf7f2
TL
2356 ceph_assert(res.returned.size() == to_read.size());
2357 ceph_assert(res.errors.empty());
7c673cae
FG
2358 for (auto &&read: to_read) {
2359 pair<uint64_t, uint64_t> adjusted =
2360 ec->sinfo.offset_len_to_stripe_bounds(
2361 make_pair(read.get<0>(), read.get<1>()));
11fdf7f2 2362 ceph_assert(res.returned.front().get<0>() == adjusted.first &&
7c673cae
FG
2363 res.returned.front().get<1>() == adjusted.second);
2364 map<int, bufferlist> to_decode;
2365 bufferlist bl;
2366 for (map<pg_shard_t, bufferlist>::iterator j =
2367 res.returned.front().get<2>().begin();
2368 j != res.returned.front().get<2>().end();
2369 ++j) {
f67539c2 2370 to_decode[j->first.shard] = std::move(j->second);
7c673cae
FG
2371 }
2372 int r = ECUtil::decode(
2373 ec->sinfo,
2374 ec->ec_impl,
2375 to_decode,
2376 &bl);
2377 if (r < 0) {
2378 res.r = r;
2379 goto out;
2380 }
2381 bufferlist trimmed;
2382 trimmed.substr_of(
2383 bl,
2384 read.get<0>() - adjusted.first,
11fdf7f2 2385 std::min(read.get<1>(),
7c673cae
FG
2386 bl.length() - (read.get<0>() - adjusted.first)));
2387 result.insert(
2388 read.get<0>(), trimmed.length(), std::move(trimmed));
2389 res.returned.pop_front();
2390 }
2391out:
2392 status->complete_object(hoid, res.r, std::move(result));
2393 ec->kick_reads();
2394 }
2395};
2396
2397void ECBackend::objects_read_and_reconstruct(
2398 const map<hobject_t,
2399 std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
2400 > &reads,
2401 bool fast_read,
2402 GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
2403{
2404 in_progress_client_reads.emplace_back(
2405 reads.size(), std::move(func));
2406 if (!reads.size()) {
2407 kick_reads();
2408 return;
2409 }
2410
28e407b8 2411 map<hobject_t, set<int>> obj_want_to_read;
7c673cae
FG
2412 set<int> want_to_read;
2413 get_want_to_read_shards(&want_to_read);
2414
2415 map<hobject_t, read_request_t> for_read_op;
2416 for (auto &&to_read: reads) {
11fdf7f2 2417 map<pg_shard_t, vector<pair<int, int>>> shards;
7c673cae
FG
2418 int r = get_min_avail_to_read_shards(
2419 to_read.first,
2420 want_to_read,
2421 false,
2422 fast_read,
2423 &shards);
11fdf7f2 2424 ceph_assert(r == 0);
7c673cae
FG
2425
2426 CallClientContexts *c = new CallClientContexts(
2427 to_read.first,
2428 this,
2429 &(in_progress_client_reads.back()),
2430 to_read.second);
2431 for_read_op.insert(
2432 make_pair(
2433 to_read.first,
2434 read_request_t(
2435 to_read.second,
2436 shards,
2437 false,
2438 c)));
28e407b8 2439 obj_want_to_read.insert(make_pair(to_read.first, want_to_read));
7c673cae
FG
2440 }
2441
2442 start_read_op(
2443 CEPH_MSG_PRIO_DEFAULT,
28e407b8 2444 obj_want_to_read,
7c673cae
FG
2445 for_read_op,
2446 OpRequestRef(),
2447 fast_read, false);
2448 return;
2449}
2450
2451
2452int ECBackend::send_all_remaining_reads(
2453 const hobject_t &hoid,
2454 ReadOp &rop)
2455{
2456 set<int> already_read;
2457 const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
2458 for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
2459 already_read.insert(i->shard);
2460 dout(10) << __func__ << " have/error shards=" << already_read << dendl;
11fdf7f2 2461 map<pg_shard_t, vector<pair<int, int>>> shards;
28e407b8
AA
2462 int r = get_remaining_shards(hoid, already_read, rop.want_to_read[hoid],
2463 rop.complete[hoid], &shards, rop.for_recovery);
7c673cae
FG
2464 if (r)
2465 return r;
7c673cae 2466
7c673cae
FG
2467 list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
2468 rop.to_read.find(hoid)->second.to_read;
2469 GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
2470 rop.to_read.find(hoid)->second.cb;
2471
91327a77
AA
2472 // (Note cuixf) If we need to read attrs and we read failed, try to read again.
2473 bool want_attrs =
2474 rop.to_read.find(hoid)->second.want_attrs &&
2475 (!rop.complete[hoid].attrs || rop.complete[hoid].attrs->empty());
2476 if (want_attrs) {
2477 dout(10) << __func__ << " want attrs again" << dendl;
2478 }
2479
28e407b8
AA
2480 rop.to_read.erase(hoid);
2481 rop.to_read.insert(make_pair(
7c673cae
FG
2482 hoid,
2483 read_request_t(
2484 offsets,
2485 shards,
91327a77 2486 want_attrs,
7c673cae 2487 c)));
7c673cae
FG
2488 return 0;
2489}
2490
2491int ECBackend::objects_get_attrs(
2492 const hobject_t &hoid,
20effc67 2493 map<string, bufferlist, less<>> *out)
7c673cae
FG
2494{
2495 int r = store->getattrs(
2496 ch,
2497 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2498 *out);
2499 if (r < 0)
2500 return r;
2501
2502 for (map<string, bufferlist>::iterator i = out->begin();
2503 i != out->end();
2504 ) {
2505 if (ECUtil::is_hinfo_key_string(i->first))
2506 out->erase(i++);
2507 else
2508 ++i;
2509 }
2510 return r;
2511}
2512
2513void ECBackend::rollback_append(
2514 const hobject_t &hoid,
2515 uint64_t old_size,
2516 ObjectStore::Transaction *t)
2517{
11fdf7f2 2518 ceph_assert(old_size % sinfo.get_stripe_width() == 0);
7c673cae
FG
2519 t->truncate(
2520 coll,
2521 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2522 sinfo.aligned_logical_offset_to_chunk_offset(
2523 old_size));
2524}
2525
28e407b8 2526int ECBackend::be_deep_scrub(
7c673cae 2527 const hobject_t &poid,
28e407b8
AA
2528 ScrubMap &map,
2529 ScrubMapBuilder &pos,
2530 ScrubMap::object &o)
2531{
2532 dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
7c673cae 2533 int r;
28e407b8
AA
2534
2535 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
2536 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
2537
2538 utime_t sleeptime;
2539 sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
2540 if (sleeptime != utime_t()) {
2541 lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
2542 sleeptime.sleep();
2543 }
2544
2545 if (pos.data_pos == 0) {
2546 pos.data_hash = bufferhash(-1);
2547 }
2548
7c673cae
FG
2549 uint64_t stride = cct->_conf->osd_deep_scrub_stride;
2550 if (stride % sinfo.get_chunk_size())
2551 stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
7c673cae 2552
28e407b8
AA
2553 bufferlist bl;
2554 r = store->read(
2555 ch,
2556 ghobject_t(
2557 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2558 pos.data_pos,
2559 stride, bl,
2560 fadvise_flags);
2561 if (r < 0) {
2562 dout(20) << __func__ << " " << poid << " got "
2563 << r << " on read, read_error" << dendl;
2564 o.read_error = true;
2565 return 0;
7c673cae 2566 }
28e407b8
AA
2567 if (bl.length() % sinfo.get_chunk_size()) {
2568 dout(20) << __func__ << " " << poid << " got "
2569 << r << " on read, not chunk size " << sinfo.get_chunk_size() << " aligned"
2570 << dendl;
7c673cae 2571 o.read_error = true;
28e407b8
AA
2572 return 0;
2573 }
2574 if (r > 0) {
2575 pos.data_hash << bl;
2576 }
2577 pos.data_pos += r;
2578 if (r == (int)stride) {
2579 return -EINPROGRESS;
7c673cae
FG
2580 }
2581
2582 ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
2583 if (!hinfo) {
2584 dout(0) << "_scan_list " << poid << " could not retrieve hash info" << dendl;
2585 o.read_error = true;
2586 o.digest_present = false;
28e407b8 2587 return 0;
7c673cae
FG
2588 } else {
2589 if (!get_parent()->get_pool().allows_ecoverwrites()) {
a4b75251
TL
2590 if (!hinfo->has_chunk_hash()) {
2591 dout(0) << "_scan_list " << poid << " got invalid hash info" << dendl;
2592 o.ec_size_mismatch = true;
2593 return 0;
2594 }
28e407b8
AA
2595 if (hinfo->get_total_chunk_size() != (unsigned)pos.data_pos) {
2596 dout(0) << "_scan_list " << poid << " got incorrect size on read 0x"
2597 << std::hex << pos
2598 << " expected 0x" << hinfo->get_total_chunk_size() << std::dec
2599 << dendl;
7c673cae 2600 o.ec_size_mismatch = true;
28e407b8 2601 return 0;
7c673cae
FG
2602 }
2603
28e407b8
AA
2604 if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) !=
2605 pos.data_hash.digest()) {
2606 dout(0) << "_scan_list " << poid << " got incorrect hash on read 0x"
2607 << std::hex << pos.data_hash.digest() << " != expected 0x"
2608 << hinfo->get_chunk_hash(get_parent()->whoami_shard().shard)
2609 << std::dec << dendl;
7c673cae 2610 o.ec_hash_mismatch = true;
28e407b8 2611 return 0;
7c673cae
FG
2612 }
2613
2614 /* We checked above that we match our own stored hash. We cannot
2615 * send a hash of the actual object, so instead we simply send
2616 * our locally stored hash of shard 0 on the assumption that if
2617 * we match our chunk hash and our recollection of the hash for
2618 * chunk 0 matches that of our peers, there is likely no corruption.
2619 */
2620 o.digest = hinfo->get_chunk_hash(0);
2621 o.digest_present = true;
2622 } else {
2623 /* Hack! We must be using partial overwrites, and partial overwrites
2624 * don't support deep-scrub yet
2625 */
2626 o.digest = 0;
2627 o.digest_present = true;
2628 }
2629 }
2630
28e407b8 2631 o.omap_digest = -1;
7c673cae 2632 o.omap_digest_present = true;
28e407b8 2633 return 0;
7c673cae 2634}