]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/ECBackend.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / osd / ECBackend.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <iostream>
16#include <sstream>
17
18#include "ECBackend.h"
19#include "messages/MOSDPGPush.h"
20#include "messages/MOSDPGPushReply.h"
21#include "messages/MOSDECSubOpWrite.h"
22#include "messages/MOSDECSubOpWriteReply.h"
23#include "messages/MOSDECSubOpRead.h"
24#include "messages/MOSDECSubOpReadReply.h"
25#include "ECMsgTypes.h"
26
27#include "PrimaryLogPG.h"
20effc67 28#include "osd_tracer.h"
7c673cae
FG
29
30#define dout_context cct
31#define dout_subsys ceph_subsys_osd
32#define DOUT_PREFIX_ARGS this
33#undef dout_prefix
34#define dout_prefix _prefix(_dout, this)
f67539c2
TL
35
36using std::dec;
37using std::hex;
20effc67 38using std::less;
f67539c2
TL
39using std::list;
40using std::make_pair;
41using std::map;
42using std::pair;
43using std::ostream;
44using std::set;
45using std::string;
46using std::unique_ptr;
47using std::vector;
48
49using ceph::bufferhash;
50using ceph::bufferlist;
51using ceph::bufferptr;
52using ceph::ErasureCodeInterfaceRef;
53using ceph::Formatter;
54
7c673cae 55static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
11fdf7f2 56 return pgb->get_parent()->gen_dbg_prefix(*_dout);
7c673cae
FG
57}
58
59struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
60 list<ECBackend::RecoveryOp> ops;
61};
62
63ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
64 switch (rhs.pipeline_state) {
65 case ECBackend::pipeline_state_t::CACHE_VALID:
66 return lhs << "CACHE_VALID";
67 case ECBackend::pipeline_state_t::CACHE_INVALID:
68 return lhs << "CACHE_INVALID";
69 default:
11fdf7f2 70 ceph_abort_msg("invalid pipeline state");
7c673cae
FG
71 }
72 return lhs; // unreachable
73}
74
75static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
76{
77 lhs << "[";
78 for (map<pg_shard_t, bufferlist>::const_iterator i = rhs.begin();
79 i != rhs.end();
80 ++i) {
81 if (i != rhs.begin())
82 lhs << ", ";
83 lhs << make_pair(i->first, i->second.length());
84 }
85 return lhs << "]";
86}
87
88static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
89{
90 lhs << "[";
91 for (map<int, bufferlist>::const_iterator i = rhs.begin();
92 i != rhs.end();
93 ++i) {
94 if (i != rhs.begin())
95 lhs << ", ";
96 lhs << make_pair(i->first, i->second.length());
97 }
98 return lhs << "]";
99}
100
101static ostream &operator<<(
102 ostream &lhs,
103 const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
104{
105 return lhs << "(" << rhs.get<0>() << ", "
106 << rhs.get<1>() << ", " << rhs.get<2>() << ")";
107}
108
109ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
110{
111 return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
112 << ", need=" << rhs.need
113 << ", want_attrs=" << rhs.want_attrs
114 << ")";
115}
116
117ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
118{
119 lhs << "read_result_t(r=" << rhs.r
120 << ", errors=" << rhs.errors;
121 if (rhs.attrs) {
9f95a23c 122 lhs << ", attrs=" << *(rhs.attrs);
7c673cae
FG
123 } else {
124 lhs << ", noattrs";
125 }
126 return lhs << ", returned=" << rhs.returned << ")";
127}
128
129ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
130{
131 lhs << "ReadOp(tid=" << rhs.tid;
132 if (rhs.op && rhs.op->get_req()) {
133 lhs << ", op=";
134 rhs.op->get_req()->print(lhs);
135 }
136 return lhs << ", to_read=" << rhs.to_read
137 << ", complete=" << rhs.complete
138 << ", priority=" << rhs.priority
139 << ", obj_to_source=" << rhs.obj_to_source
140 << ", source_to_obj=" << rhs.source_to_obj
141 << ", in_progress=" << rhs.in_progress << ")";
142}
143
144void ECBackend::ReadOp::dump(Formatter *f) const
145{
146 f->dump_unsigned("tid", tid);
147 if (op && op->get_req()) {
148 f->dump_stream("op") << *(op->get_req());
149 }
150 f->dump_stream("to_read") << to_read;
151 f->dump_stream("complete") << complete;
152 f->dump_int("priority", priority);
153 f->dump_stream("obj_to_source") << obj_to_source;
154 f->dump_stream("source_to_obj") << source_to_obj;
155 f->dump_stream("in_progress") << in_progress;
156}
157
158ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
159{
160 lhs << "Op(" << rhs.hoid
161 << " v=" << rhs.version
162 << " tt=" << rhs.trim_to
163 << " tid=" << rhs.tid
164 << " reqid=" << rhs.reqid;
165 if (rhs.client_op && rhs.client_op->get_req()) {
166 lhs << " client_op=";
167 rhs.client_op->get_req()->print(lhs);
168 }
169 lhs << " roll_forward_to=" << rhs.roll_forward_to
170 << " temp_added=" << rhs.temp_added
171 << " temp_cleared=" << rhs.temp_cleared
172 << " pending_read=" << rhs.pending_read
173 << " remote_read=" << rhs.remote_read
174 << " remote_read_result=" << rhs.remote_read_result
175 << " pending_apply=" << rhs.pending_apply
176 << " pending_commit=" << rhs.pending_commit
177 << " plan.to_read=" << rhs.plan.to_read
178 << " plan.will_write=" << rhs.plan.will_write
179 << ")";
180 return lhs;
181}
182
183ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
184{
185 return lhs << "RecoveryOp("
186 << "hoid=" << rhs.hoid
187 << " v=" << rhs.v
188 << " missing_on=" << rhs.missing_on
189 << " missing_on_shards=" << rhs.missing_on_shards
190 << " recovery_info=" << rhs.recovery_info
191 << " recovery_progress=" << rhs.recovery_progress
192 << " obc refcount=" << rhs.obc.use_count()
193 << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
194 << " waiting_on_pushes=" << rhs.waiting_on_pushes
195 << " extent_requested=" << rhs.extent_requested
196 << ")";
197}
198
199void ECBackend::RecoveryOp::dump(Formatter *f) const
200{
201 f->dump_stream("hoid") << hoid;
202 f->dump_stream("v") << v;
203 f->dump_stream("missing_on") << missing_on;
204 f->dump_stream("missing_on_shards") << missing_on_shards;
205 f->dump_stream("recovery_info") << recovery_info;
206 f->dump_stream("recovery_progress") << recovery_progress;
207 f->dump_stream("state") << tostr(state);
208 f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
209 f->dump_stream("extent_requested") << extent_requested;
210}
211
212ECBackend::ECBackend(
213 PGBackend::Listener *pg,
11fdf7f2 214 const coll_t &coll,
7c673cae
FG
215 ObjectStore::CollectionHandle &ch,
216 ObjectStore *store,
217 CephContext *cct,
218 ErasureCodeInterfaceRef ec_impl,
219 uint64_t stripe_width)
220 : PGBackend(cct, pg, store, coll, ch),
221 ec_impl(ec_impl),
222 sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
11fdf7f2 223 ceph_assert((ec_impl->get_data_chunk_count() *
7c673cae
FG
224 ec_impl->get_chunk_size(stripe_width)) == stripe_width);
225}
226
227PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
228{
229 return new ECRecoveryHandle;
230}
231
232void ECBackend::_failed_push(const hobject_t &hoid,
233 pair<RecoveryMessages *, ECBackend::read_result_t &> &in)
234{
235 ECBackend::read_result_t &res = in.second;
236 dout(10) << __func__ << ": Read error " << hoid << " r="
237 << res.r << " errors=" << res.errors << dendl;
238 dout(10) << __func__ << ": canceling recovery op for obj " << hoid
239 << dendl;
11fdf7f2 240 ceph_assert(recovery_ops.count(hoid));
b32b8144 241 eversion_t v = recovery_ops[hoid].v;
7c673cae
FG
242 recovery_ops.erase(hoid);
243
9f95a23c 244 set<pg_shard_t> fl;
7c673cae 245 for (auto&& i : res.errors) {
9f95a23c 246 fl.insert(i.first);
7c673cae 247 }
9f95a23c 248 get_parent()->on_failed_pull(fl, hoid, v);
7c673cae
FG
249}
250
251struct OnRecoveryReadComplete :
252 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
253 ECBackend *pg;
254 hobject_t hoid;
7c673cae
FG
255 OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid)
256 : pg(pg), hoid(hoid) {}
257 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
258 ECBackend::read_result_t &res = in.second;
259 if (!(res.r == 0 && res.errors.empty())) {
260 pg->_failed_push(hoid, in);
261 return;
262 }
11fdf7f2 263 ceph_assert(res.returned.size() == 1);
7c673cae
FG
264 pg->handle_recovery_read_complete(
265 hoid,
266 res.returned.back(),
267 res.attrs,
268 in.first);
269 }
270};
271
272struct RecoveryMessages {
273 map<hobject_t,
274 ECBackend::read_request_t> reads;
28e407b8 275 map<hobject_t, set<int>> want_to_read;
7c673cae
FG
276 void read(
277 ECBackend *ec,
278 const hobject_t &hoid, uint64_t off, uint64_t len,
28e407b8 279 set<int> &&_want_to_read,
11fdf7f2 280 const map<pg_shard_t, vector<pair<int, int>>> &need,
7c673cae
FG
281 bool attrs) {
282 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
283 to_read.push_back(boost::make_tuple(off, len, 0));
11fdf7f2 284 ceph_assert(!reads.count(hoid));
28e407b8 285 want_to_read.insert(make_pair(hoid, std::move(_want_to_read)));
7c673cae
FG
286 reads.insert(
287 make_pair(
288 hoid,
289 ECBackend::read_request_t(
290 to_read,
291 need,
292 attrs,
293 new OnRecoveryReadComplete(
294 ec,
295 hoid))));
296 }
297
298 map<pg_shard_t, vector<PushOp> > pushes;
299 map<pg_shard_t, vector<PushReplyOp> > push_replies;
300 ObjectStore::Transaction t;
301 RecoveryMessages() {}
f67539c2 302 ~RecoveryMessages() {}
7c673cae
FG
303};
304
305void ECBackend::handle_recovery_push(
306 const PushOp &op,
11fdf7f2
TL
307 RecoveryMessages *m,
308 bool is_repair)
7c673cae 309{
11fdf7f2
TL
310 if (get_parent()->check_failsafe_full()) {
311 dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl;
7c673cae
FG
312 ceph_abort();
313 }
314
315 bool oneshot = op.before_progress.first && op.after_progress.data_complete;
316 ghobject_t tobj;
317 if (oneshot) {
318 tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
319 get_parent()->whoami_shard().shard);
320 } else {
321 tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
322 op.version),
323 ghobject_t::NO_GEN,
324 get_parent()->whoami_shard().shard);
325 if (op.before_progress.first) {
326 dout(10) << __func__ << ": Adding oid "
327 << tobj.hobj << " in the temp collection" << dendl;
328 add_temp_obj(tobj.hobj);
329 }
330 }
331
332 if (op.before_progress.first) {
333 m->t.remove(coll, tobj);
334 m->t.touch(coll, tobj);
335 }
336
337 if (!op.data_included.empty()) {
338 uint64_t start = op.data_included.range_start();
339 uint64_t end = op.data_included.range_end();
11fdf7f2 340 ceph_assert(op.data.length() == (end - start));
7c673cae
FG
341
342 m->t.write(
343 coll,
344 tobj,
345 start,
346 op.data.length(),
347 op.data);
348 } else {
11fdf7f2
TL
349 ceph_assert(op.data.length() == 0);
350 }
351
352 if (get_parent()->pg_is_remote_backfilling()) {
353 get_parent()->pg_add_local_num_bytes(op.data.length());
354 get_parent()->pg_add_num_bytes(op.data.length() * get_ec_data_chunk_count());
355 dout(10) << __func__ << " " << op.soid
356 << " add new actual data by " << op.data.length()
357 << " add new num_bytes by " << op.data.length() * get_ec_data_chunk_count()
358 << dendl;
7c673cae
FG
359 }
360
361 if (op.before_progress.first) {
11fdf7f2 362 ceph_assert(op.attrset.count(string("_")));
7c673cae
FG
363 m->t.setattrs(
364 coll,
365 tobj,
366 op.attrset);
367 }
368
369 if (op.after_progress.data_complete && !oneshot) {
370 dout(10) << __func__ << ": Removing oid "
371 << tobj.hobj << " from the temp collection" << dendl;
372 clear_temp_obj(tobj.hobj);
373 m->t.remove(coll, ghobject_t(
374 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
375 m->t.collection_move_rename(
376 coll, tobj,
377 coll, ghobject_t(
378 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
379 }
380 if (op.after_progress.data_complete) {
381 if ((get_parent()->pgb_is_primary())) {
11fdf7f2
TL
382 ceph_assert(recovery_ops.count(op.soid));
383 ceph_assert(recovery_ops[op.soid].obc);
1e59de90 384 if (get_parent()->pg_is_repair() || is_repair)
11fdf7f2 385 get_parent()->inc_osd_stat_repaired();
7c673cae
FG
386 get_parent()->on_local_recover(
387 op.soid,
388 op.recovery_info,
389 recovery_ops[op.soid].obc,
c07f9fc5 390 false,
7c673cae
FG
391 &m->t);
392 } else {
11fdf7f2
TL
393 // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired
394 if (is_repair)
395 get_parent()->inc_osd_stat_repaired();
7c673cae
FG
396 get_parent()->on_local_recover(
397 op.soid,
398 op.recovery_info,
399 ObjectContextRef(),
c07f9fc5 400 false,
7c673cae 401 &m->t);
11fdf7f2
TL
402 if (get_parent()->pg_is_remote_backfilling()) {
403 struct stat st;
404 int r = store->stat(ch, ghobject_t(op.soid, ghobject_t::NO_GEN,
405 get_parent()->whoami_shard().shard), &st);
406 if (r == 0) {
407 get_parent()->pg_sub_local_num_bytes(st.st_size);
408 // XXX: This can be way overestimated for small objects
409 get_parent()->pg_sub_num_bytes(st.st_size * get_ec_data_chunk_count());
410 dout(10) << __func__ << " " << op.soid
411 << " sub actual data by " << st.st_size
412 << " sub num_bytes by " << st.st_size * get_ec_data_chunk_count()
413 << dendl;
414 }
415 }
7c673cae
FG
416 }
417 }
418 m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
419 m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
420}
421
422void ECBackend::handle_recovery_push_reply(
423 const PushReplyOp &op,
424 pg_shard_t from,
425 RecoveryMessages *m)
426{
427 if (!recovery_ops.count(op.soid))
428 return;
429 RecoveryOp &rop = recovery_ops[op.soid];
11fdf7f2 430 ceph_assert(rop.waiting_on_pushes.count(from));
7c673cae
FG
431 rop.waiting_on_pushes.erase(from);
432 continue_recovery_op(rop, m);
433}
434
435void ECBackend::handle_recovery_read_complete(
436 const hobject_t &hoid,
437 boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
20effc67 438 std::optional<map<string, bufferlist, less<>> > attrs,
7c673cae
FG
439 RecoveryMessages *m)
440{
441 dout(10) << __func__ << ": returned " << hoid << " "
442 << "(" << to_read.get<0>()
443 << ", " << to_read.get<1>()
444 << ", " << to_read.get<2>()
445 << ")"
446 << dendl;
11fdf7f2 447 ceph_assert(recovery_ops.count(hoid));
7c673cae 448 RecoveryOp &op = recovery_ops[hoid];
11fdf7f2 449 ceph_assert(op.returned_data.empty());
7c673cae
FG
450 map<int, bufferlist*> target;
451 for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
452 i != op.missing_on_shards.end();
453 ++i) {
454 target[*i] = &(op.returned_data[*i]);
455 }
456 map<int, bufferlist> from;
457 for(map<pg_shard_t, bufferlist>::iterator i = to_read.get<2>().begin();
458 i != to_read.get<2>().end();
459 ++i) {
f67539c2 460 from[i->first.shard] = std::move(i->second);
7c673cae
FG
461 }
462 dout(10) << __func__ << ": " << from << dendl;
11fdf7f2
TL
463 int r;
464 r = ECUtil::decode(sinfo, ec_impl, from, target);
465 ceph_assert(r == 0);
7c673cae
FG
466 if (attrs) {
467 op.xattrs.swap(*attrs);
468
469 if (!op.obc) {
470 // attrs only reference the origin bufferlist (decode from
471 // ECSubReadReply message) whose size is much greater than attrs
472 // in recovery. If obc cache it (get_obc maybe cache the attr),
473 // this causes the whole origin bufferlist would not be free
474 // until obc is evicted from obc cache. So rebuild the
475 // bufferlist before cache it.
476 for (map<string, bufferlist>::iterator it = op.xattrs.begin();
477 it != op.xattrs.end();
478 ++it) {
479 it->second.rebuild();
480 }
481 // Need to remove ECUtil::get_hinfo_key() since it should not leak out
482 // of the backend (see bug #12983)
20effc67 483 map<string, bufferlist, less<>> sanitized_attrs(op.xattrs);
7c673cae
FG
484 sanitized_attrs.erase(ECUtil::get_hinfo_key());
485 op.obc = get_parent()->get_obc(hoid, sanitized_attrs);
11fdf7f2 486 ceph_assert(op.obc);
7c673cae
FG
487 op.recovery_info.size = op.obc->obs.oi.size;
488 op.recovery_info.oi = op.obc->obs.oi;
489 }
490
491 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
492 if (op.obc->obs.oi.size > 0) {
11fdf7f2
TL
493 ceph_assert(op.xattrs.count(ECUtil::get_hinfo_key()));
494 auto bp = op.xattrs[ECUtil::get_hinfo_key()].cbegin();
495 decode(hinfo, bp);
7c673cae
FG
496 }
497 op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
498 }
11fdf7f2
TL
499 ceph_assert(op.xattrs.size());
500 ceph_assert(op.obc);
7c673cae
FG
501 continue_recovery_op(op, m);
502}
503
504struct SendPushReplies : public Context {
505 PGBackend::Listener *l;
506 epoch_t epoch;
507 map<int, MOSDPGPushReply*> replies;
508 SendPushReplies(
509 PGBackend::Listener *l,
510 epoch_t epoch,
511 map<int, MOSDPGPushReply*> &in) : l(l), epoch(epoch) {
512 replies.swap(in);
513 }
514 void finish(int) override {
9f95a23c
TL
515 std::vector<std::pair<int, Message*>> messages;
516 messages.reserve(replies.size());
7c673cae
FG
517 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
518 i != replies.end();
519 ++i) {
9f95a23c
TL
520 messages.push_back(std::make_pair(i->first, i->second));
521 }
522 if (!messages.empty()) {
523 l->send_message_osd_cluster(messages, epoch);
7c673cae
FG
524 }
525 replies.clear();
526 }
527 ~SendPushReplies() override {
528 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
529 i != replies.end();
530 ++i) {
531 i->second->put();
532 }
533 replies.clear();
534 }
535};
536
537void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
538{
539 for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
540 i != m.pushes.end();
541 m.pushes.erase(i++)) {
542 MOSDPGPush *msg = new MOSDPGPush();
543 msg->set_priority(priority);
11fdf7f2 544 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
545 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
546 msg->from = get_parent()->whoami_shard();
547 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
548 msg->pushes.swap(i->second);
549 msg->compute_cost(cct);
11fdf7f2 550 msg->is_repair = get_parent()->pg_is_repair();
7c673cae
FG
551 get_parent()->send_message(
552 i->first.osd,
553 msg);
554 }
555 map<int, MOSDPGPushReply*> replies;
556 for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
557 m.push_replies.begin();
558 i != m.push_replies.end();
559 m.push_replies.erase(i++)) {
560 MOSDPGPushReply *msg = new MOSDPGPushReply();
561 msg->set_priority(priority);
11fdf7f2 562 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
563 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
564 msg->from = get_parent()->whoami_shard();
565 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
566 msg->replies.swap(i->second);
567 msg->compute_cost(cct);
568 replies.insert(make_pair(i->first.osd, msg));
569 }
570
571 if (!replies.empty()) {
572 (m.t).register_on_complete(
573 get_parent()->bless_context(
574 new SendPushReplies(
575 get_parent(),
11fdf7f2 576 get_osdmap_epoch(),
7c673cae
FG
577 replies)));
578 get_parent()->queue_transaction(std::move(m.t));
579 }
580
581 if (m.reads.empty())
582 return;
583 start_read_op(
584 priority,
28e407b8 585 m.want_to_read,
7c673cae
FG
586 m.reads,
587 OpRequestRef(),
588 false, true);
589}
590
591void ECBackend::continue_recovery_op(
592 RecoveryOp &op,
593 RecoveryMessages *m)
594{
595 dout(10) << __func__ << ": continuing " << op << dendl;
596 while (1) {
597 switch (op.state) {
598 case RecoveryOp::IDLE: {
599 // start read
600 op.state = RecoveryOp::READING;
11fdf7f2 601 ceph_assert(!op.recovery_progress.data_complete);
7c673cae
FG
602 set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
603 uint64_t from = op.recovery_progress.data_recovered_to;
604 uint64_t amount = get_recovery_chunk_size();
605
606 if (op.recovery_progress.first && op.obc) {
607 /* We've got the attrs and the hinfo, might as well use them */
608 op.hinfo = get_hash_info(op.hoid);
a4b75251
TL
609 if (!op.hinfo) {
610 derr << __func__ << ": " << op.hoid << " has inconsistent hinfo"
611 << dendl;
612 ceph_assert(recovery_ops.count(op.hoid));
613 eversion_t v = recovery_ops[op.hoid].v;
614 recovery_ops.erase(op.hoid);
615 get_parent()->on_failed_pull({get_parent()->whoami_shard()},
616 op.hoid, v);
617 return;
618 }
7c673cae 619 op.xattrs = op.obc->attr_cache;
11fdf7f2 620 encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
7c673cae
FG
621 }
622
11fdf7f2 623 map<pg_shard_t, vector<pair<int, int>>> to_read;
7c673cae
FG
624 int r = get_min_avail_to_read_shards(
625 op.hoid, want, true, false, &to_read);
626 if (r != 0) {
627 // we must have lost a recovery source
11fdf7f2 628 ceph_assert(!op.recovery_progress.first);
7c673cae
FG
629 dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
630 << dendl;
631 get_parent()->cancel_pull(op.hoid);
632 recovery_ops.erase(op.hoid);
633 return;
634 }
635 m->read(
636 this,
637 op.hoid,
638 op.recovery_progress.data_recovered_to,
639 amount,
28e407b8 640 std::move(want),
7c673cae
FG
641 to_read,
642 op.recovery_progress.first && !op.obc);
643 op.extent_requested = make_pair(
644 from,
645 amount);
646 dout(10) << __func__ << ": IDLE return " << op << dendl;
647 return;
648 }
649 case RecoveryOp::READING: {
650 // read completed, start write
11fdf7f2
TL
651 ceph_assert(op.xattrs.size());
652 ceph_assert(op.returned_data.size());
7c673cae
FG
653 op.state = RecoveryOp::WRITING;
654 ObjectRecoveryProgress after_progress = op.recovery_progress;
655 after_progress.data_recovered_to += op.extent_requested.second;
656 after_progress.first = false;
657 if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
658 after_progress.data_recovered_to =
659 sinfo.logical_to_next_stripe_offset(
660 op.obc->obs.oi.size);
661 after_progress.data_complete = true;
662 }
663 for (set<pg_shard_t>::iterator mi = op.missing_on.begin();
664 mi != op.missing_on.end();
665 ++mi) {
11fdf7f2 666 ceph_assert(op.returned_data.count(mi->shard));
7c673cae
FG
667 m->pushes[*mi].push_back(PushOp());
668 PushOp &pop = m->pushes[*mi].back();
669 pop.soid = op.hoid;
670 pop.version = op.v;
671 pop.data = op.returned_data[mi->shard];
672 dout(10) << __func__ << ": before_progress=" << op.recovery_progress
673 << ", after_progress=" << after_progress
674 << ", pop.data.length()=" << pop.data.length()
675 << ", size=" << op.obc->obs.oi.size << dendl;
11fdf7f2 676 ceph_assert(
7c673cae
FG
677 pop.data.length() ==
678 sinfo.aligned_logical_offset_to_chunk_offset(
679 after_progress.data_recovered_to -
680 op.recovery_progress.data_recovered_to)
681 );
682 if (pop.data.length())
683 pop.data_included.insert(
684 sinfo.aligned_logical_offset_to_chunk_offset(
685 op.recovery_progress.data_recovered_to),
686 pop.data.length()
687 );
688 if (op.recovery_progress.first) {
689 pop.attrset = op.xattrs;
690 }
691 pop.recovery_info = op.recovery_info;
692 pop.before_progress = op.recovery_progress;
693 pop.after_progress = after_progress;
694 if (*mi != get_parent()->primary_shard())
695 get_parent()->begin_peer_recover(
696 *mi,
697 op.hoid);
698 }
699 op.returned_data.clear();
700 op.waiting_on_pushes = op.missing_on;
701 op.recovery_progress = after_progress;
702 dout(10) << __func__ << ": READING return " << op << dendl;
703 return;
704 }
705 case RecoveryOp::WRITING: {
706 if (op.waiting_on_pushes.empty()) {
707 if (op.recovery_progress.data_complete) {
708 op.state = RecoveryOp::COMPLETE;
709 for (set<pg_shard_t>::iterator i = op.missing_on.begin();
710 i != op.missing_on.end();
711 ++i) {
712 if (*i != get_parent()->primary_shard()) {
713 dout(10) << __func__ << ": on_peer_recover on " << *i
714 << ", obj " << op.hoid << dendl;
715 get_parent()->on_peer_recover(
716 *i,
717 op.hoid,
718 op.recovery_info);
719 }
720 }
721 object_stat_sum_t stat;
722 stat.num_bytes_recovered = op.recovery_info.size;
723 stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
724 stat.num_objects_recovered = 1;
11fdf7f2
TL
725 if (get_parent()->pg_is_repair())
726 stat.num_objects_repaired = 1;
c07f9fc5 727 get_parent()->on_global_recover(op.hoid, stat, false);
7c673cae
FG
728 dout(10) << __func__ << ": WRITING return " << op << dendl;
729 recovery_ops.erase(op.hoid);
730 return;
731 } else {
732 op.state = RecoveryOp::IDLE;
733 dout(10) << __func__ << ": WRITING continue " << op << dendl;
734 continue;
735 }
736 }
737 return;
738 }
739 // should never be called once complete
740 case RecoveryOp::COMPLETE:
741 default: {
742 ceph_abort();
743 };
744 }
745 }
746}
747
748void ECBackend::run_recovery_op(
749 RecoveryHandle *_h,
750 int priority)
751{
752 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
753 RecoveryMessages m;
754 for (list<RecoveryOp>::iterator i = h->ops.begin();
755 i != h->ops.end();
756 ++i) {
757 dout(10) << __func__ << ": starting " << *i << dendl;
11fdf7f2 758 ceph_assert(!recovery_ops.count(i->hoid));
7c673cae
FG
759 RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
760 continue_recovery_op(op, &m);
761 }
c07f9fc5 762
7c673cae 763 dispatch_recovery_messages(m, priority);
c07f9fc5 764 send_recovery_deletes(priority, h->deletes);
7c673cae
FG
765 delete _h;
766}
767
224ce89b 768int ECBackend::recover_object(
7c673cae
FG
769 const hobject_t &hoid,
770 eversion_t v,
771 ObjectContextRef head,
772 ObjectContextRef obc,
773 RecoveryHandle *_h)
774{
775 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
776 h->ops.push_back(RecoveryOp());
777 h->ops.back().v = v;
778 h->ops.back().hoid = hoid;
779 h->ops.back().obc = obc;
780 h->ops.back().recovery_info.soid = hoid;
781 h->ops.back().recovery_info.version = v;
782 if (obc) {
783 h->ops.back().recovery_info.size = obc->obs.oi.size;
784 h->ops.back().recovery_info.oi = obc->obs.oi;
785 }
786 if (hoid.is_snap()) {
787 if (obc) {
11fdf7f2 788 ceph_assert(obc->ssc);
7c673cae
FG
789 h->ops.back().recovery_info.ss = obc->ssc->snapset;
790 } else if (head) {
11fdf7f2 791 ceph_assert(head->ssc);
7c673cae
FG
792 h->ops.back().recovery_info.ss = head->ssc->snapset;
793 } else {
11fdf7f2 794 ceph_abort_msg("neither obc nor head set for a snap object");
7c673cae
FG
795 }
796 }
797 h->ops.back().recovery_progress.omap_complete = true;
798 for (set<pg_shard_t>::const_iterator i =
11fdf7f2
TL
799 get_parent()->get_acting_recovery_backfill_shards().begin();
800 i != get_parent()->get_acting_recovery_backfill_shards().end();
7c673cae
FG
801 ++i) {
802 dout(10) << "checking " << *i << dendl;
803 if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
804 h->ops.back().missing_on.insert(*i);
805 h->ops.back().missing_on_shards.insert(i->shard);
806 }
807 }
808 dout(10) << __func__ << ": built op " << h->ops.back() << dendl;
224ce89b 809 return 0;
7c673cae
FG
810}
811
812bool ECBackend::can_handle_while_inactive(
813 OpRequestRef _op)
814{
815 return false;
816}
817
c07f9fc5 818bool ECBackend::_handle_message(
7c673cae
FG
819 OpRequestRef _op)
820{
821 dout(10) << __func__ << ": " << *_op->get_req() << dendl;
822 int priority = _op->get_req()->get_priority();
823 switch (_op->get_req()->get_type()) {
824 case MSG_OSD_EC_WRITE: {
825 // NOTE: this is non-const because handle_sub_write modifies the embedded
826 // ObjectStore::Transaction in place (and then std::move's it). It does
827 // not conflict with ECSubWrite's operator<<.
828 MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
829 _op->get_nonconst_req());
28e407b8 830 parent->maybe_preempt_replica_scrub(op->op.soid);
7c673cae
FG
831 handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
832 return true;
833 }
834 case MSG_OSD_EC_WRITE_REPLY: {
835 const MOSDECSubOpWriteReply *op = static_cast<const MOSDECSubOpWriteReply*>(
836 _op->get_req());
837 handle_sub_write_reply(op->op.from, op->op, _op->pg_trace);
838 return true;
839 }
840 case MSG_OSD_EC_READ: {
9f95a23c 841 auto op = _op->get_req<MOSDECSubOpRead>();
7c673cae
FG
842 MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
843 reply->pgid = get_parent()->primary_spg_t();
11fdf7f2 844 reply->map_epoch = get_osdmap_epoch();
7c673cae
FG
845 reply->min_epoch = get_parent()->get_interval_start_epoch();
846 handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace);
847 reply->trace = _op->pg_trace;
848 get_parent()->send_message_osd_cluster(
9f95a23c 849 reply, _op->get_req()->get_connection());
7c673cae
FG
850 return true;
851 }
852 case MSG_OSD_EC_READ_REPLY: {
853 // NOTE: this is non-const because handle_sub_read_reply steals resulting
854 // buffers. It does not conflict with ECSubReadReply operator<<.
855 MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
856 _op->get_nonconst_req());
857 RecoveryMessages rm;
858 handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
859 dispatch_recovery_messages(rm, priority);
860 return true;
861 }
862 case MSG_OSD_PG_PUSH: {
9f95a23c 863 auto op = _op->get_req<MOSDPGPush>();
7c673cae
FG
864 RecoveryMessages rm;
865 for (vector<PushOp>::const_iterator i = op->pushes.begin();
866 i != op->pushes.end();
867 ++i) {
11fdf7f2 868 handle_recovery_push(*i, &rm, op->is_repair);
7c673cae
FG
869 }
870 dispatch_recovery_messages(rm, priority);
871 return true;
872 }
873 case MSG_OSD_PG_PUSH_REPLY: {
874 const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
875 _op->get_req());
876 RecoveryMessages rm;
877 for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
878 i != op->replies.end();
879 ++i) {
880 handle_recovery_push_reply(*i, op->from, &rm);
881 }
882 dispatch_recovery_messages(rm, priority);
883 return true;
884 }
885 default:
886 return false;
887 }
888 return false;
889}
890
891struct SubWriteCommitted : public Context {
892 ECBackend *pg;
893 OpRequestRef msg;
894 ceph_tid_t tid;
895 eversion_t version;
896 eversion_t last_complete;
897 const ZTracer::Trace trace;
898 SubWriteCommitted(
899 ECBackend *pg,
900 OpRequestRef msg,
901 ceph_tid_t tid,
902 eversion_t version,
903 eversion_t last_complete,
904 const ZTracer::Trace &trace)
905 : pg(pg), msg(msg), tid(tid),
906 version(version), last_complete(last_complete), trace(trace) {}
907 void finish(int) override {
908 if (msg)
909 msg->mark_event("sub_op_committed");
910 pg->sub_write_committed(tid, version, last_complete, trace);
911 }
912};
913void ECBackend::sub_write_committed(
914 ceph_tid_t tid, eversion_t version, eversion_t last_complete,
915 const ZTracer::Trace &trace) {
916 if (get_parent()->pgb_is_primary()) {
917 ECSubWriteReply reply;
918 reply.tid = tid;
919 reply.last_complete = last_complete;
920 reply.committed = true;
11fdf7f2 921 reply.applied = true;
7c673cae
FG
922 reply.from = get_parent()->whoami_shard();
923 handle_sub_write_reply(
924 get_parent()->whoami_shard(),
925 reply, trace);
926 } else {
927 get_parent()->update_last_complete_ondisk(last_complete);
928 MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
929 r->pgid = get_parent()->primary_spg_t();
11fdf7f2 930 r->map_epoch = get_osdmap_epoch();
7c673cae
FG
931 r->min_epoch = get_parent()->get_interval_start_epoch();
932 r->op.tid = tid;
933 r->op.last_complete = last_complete;
934 r->op.committed = true;
11fdf7f2 935 r->op.applied = true;
7c673cae
FG
936 r->op.from = get_parent()->whoami_shard();
937 r->set_priority(CEPH_MSG_PRIO_HIGH);
938 r->trace = trace;
939 r->trace.event("sending sub op commit");
940 get_parent()->send_message_osd_cluster(
11fdf7f2 941 get_parent()->primary_shard().osd, r, get_osdmap_epoch());
7c673cae
FG
942 }
943}
944
945void ECBackend::handle_sub_write(
946 pg_shard_t from,
947 OpRequestRef msg,
948 ECSubWrite &op,
11fdf7f2 949 const ZTracer::Trace &trace)
7c673cae 950{
20effc67 951 if (msg) {
11fdf7f2 952 msg->mark_event("sub_op_started");
f67539c2 953 }
20effc67
TL
954 trace.event("handle_sub_write");
955
7c673cae
FG
956 if (!get_parent()->pgb_is_primary())
957 get_parent()->update_stats(op.stats);
958 ObjectStore::Transaction localt;
959 if (!op.temp_added.empty()) {
960 add_temp_objs(op.temp_added);
961 }
11fdf7f2 962 if (op.backfill_or_async_recovery) {
7c673cae
FG
963 for (set<hobject_t>::iterator i = op.temp_removed.begin();
964 i != op.temp_removed.end();
965 ++i) {
966 dout(10) << __func__ << ": removing object " << *i
967 << " since we won't get the transaction" << dendl;
968 localt.remove(
969 coll,
970 ghobject_t(
971 *i,
972 ghobject_t::NO_GEN,
973 get_parent()->whoami_shard().shard));
974 }
975 }
976 clear_temp_objs(op.temp_removed);
11fdf7f2
TL
977 dout(30) << __func__ << " missing before " << get_parent()->get_log().get_missing().get_items() << dendl;
978 // flag set to true during async recovery
979 bool async = false;
980 pg_missing_tracker_t pmissing = get_parent()->get_local_missing();
981 if (pmissing.is_missing(op.soid)) {
982 async = true;
983 dout(30) << __func__ << " is_missing " << pmissing.is_missing(op.soid) << dendl;
984 for (auto &&e: op.log_entries) {
985 dout(30) << " add_next_event entry " << e << dendl;
986 get_parent()->add_local_next_event(e);
987 dout(30) << " entry is_delete " << e.is_delete() << dendl;
988 }
989 }
7c673cae 990 get_parent()->log_operation(
f67539c2 991 std::move(op.log_entries),
7c673cae
FG
992 op.updated_hit_set_history,
993 op.trim_to,
994 op.roll_forward_to,
9f95a23c 995 op.roll_forward_to,
11fdf7f2
TL
996 !op.backfill_or_async_recovery,
997 localt,
998 async);
7c673cae 999
11fdf7f2
TL
1000 if (!get_parent()->pg_is_undersized() &&
1001 (unsigned)get_parent()->whoami_shard().shard >=
1002 ec_impl->get_data_chunk_count())
7c673cae
FG
1003 op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1004
7c673cae
FG
1005 localt.register_on_commit(
1006 get_parent()->bless_context(
1007 new SubWriteCommitted(
1008 this, msg, op.tid,
1009 op.at_version,
1010 get_parent()->get_info().last_complete, trace)));
7c673cae
FG
1011 vector<ObjectStore::Transaction> tls;
1012 tls.reserve(2);
1013 tls.push_back(std::move(op.t));
1014 tls.push_back(std::move(localt));
1015 get_parent()->queue_transactions(tls, msg);
11fdf7f2
TL
1016 dout(30) << __func__ << " missing after" << get_parent()->get_log().get_missing().get_items() << dendl;
1017 if (op.at_version != eversion_t()) {
1018 // dummy rollforward transaction doesn't get at_version (and doesn't advance it)
1019 get_parent()->op_applied(op.at_version);
1020 }
7c673cae
FG
1021}
1022
1023void ECBackend::handle_sub_read(
1024 pg_shard_t from,
1025 const ECSubRead &op,
1026 ECSubReadReply *reply,
1027 const ZTracer::Trace &trace)
1028{
1029 trace.event("handle sub read");
1030 shard_id_t shard = get_parent()->whoami_shard().shard;
1031 for(auto i = op.to_read.begin();
1032 i != op.to_read.end();
1033 ++i) {
1034 int r = 0;
7c673cae
FG
1035 for (auto j = i->second.begin(); j != i->second.end(); ++j) {
1036 bufferlist bl;
11fdf7f2
TL
1037 if ((op.subchunks.find(i->first)->second.size() == 1) &&
1038 (op.subchunks.find(i->first)->second.front().second ==
1039 ec_impl->get_sub_chunk_count())) {
1040 dout(25) << __func__ << " case1: reading the complete chunk/shard." << dendl;
1041 r = store->read(
1042 ch,
1043 ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1044 j->get<0>(),
1045 j->get<1>(),
1046 bl, j->get<2>()); // Allow EIO return
1047 } else {
1048 dout(25) << __func__ << " case2: going to do fragmented read." << dendl;
1049 int subchunk_size =
1050 sinfo.get_chunk_size() / ec_impl->get_sub_chunk_count();
1051 bool error = false;
1052 for (int m = 0; m < (int)j->get<1>() && !error;
1053 m += sinfo.get_chunk_size()) {
1054 for (auto &&k:op.subchunks.find(i->first)->second) {
1055 bufferlist bl0;
1056 r = store->read(
1057 ch,
1058 ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1059 j->get<0>() + m + (k.first)*subchunk_size,
1060 (k.second)*subchunk_size,
1061 bl0, j->get<2>());
1062 if (r < 0) {
1063 error = true;
1064 break;
1065 }
1066 bl.claim_append(bl0);
1067 }
1068 }
1069 }
1070
7c673cae 1071 if (r < 0) {
11fdf7f2
TL
1072 // if we are doing fast reads, it's possible for one of the shard
1073 // reads to cross paths with another update and get a (harmless)
1074 // ENOENT. Suppress the message to the cluster log in that case.
1075 if (r == -ENOENT && get_parent()->get_pool().fast_read) {
1076 dout(5) << __func__ << ": Error " << r
1077 << " reading " << i->first << ", fast read, probably ok"
1078 << dendl;
1079 } else {
1080 get_parent()->clog_error() << "Error " << r
1081 << " reading object "
1082 << i->first;
1083 dout(5) << __func__ << ": Error " << r
1084 << " reading " << i->first << dendl;
1085 }
7c673cae
FG
1086 goto error;
1087 } else {
1088 dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
1089 reply->buffers_read[i->first].push_back(
1090 make_pair(
1091 j->get<0>(),
1092 bl)
1093 );
1094 }
1095
1096 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1097 // This shows that we still need deep scrub because large enough files
1098 // are read in sections, so the digest check here won't be done here.
1099 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1100 // the state of our chunk in case other chunks could substitute.
11fdf7f2
TL
1101 ECUtil::HashInfoRef hinfo;
1102 hinfo = get_hash_info(i->first);
1103 if (!hinfo) {
1104 r = -EIO;
1105 get_parent()->clog_error() << "Corruption detected: object "
1106 << i->first
1107 << " is missing hash_info";
1108 dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
1109 goto error;
1110 }
1111 ceph_assert(hinfo->has_chunk_hash());
7c673cae
FG
1112 if ((bl.length() == hinfo->get_total_chunk_size()) &&
1113 (j->get<0>() == 0)) {
1114 dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
1115 bufferhash h(-1);
1116 h << bl;
1117 if (h.digest() != hinfo->get_chunk_hash(shard)) {
c07f9fc5 1118 get_parent()->clog_error() << "Bad hash for " << i->first << " digest 0x"
7c673cae
FG
1119 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec;
1120 dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
1121 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
1122 r = -EIO;
1123 goto error;
1124 }
1125 }
1126 }
1127 }
1128 continue;
1129error:
1130 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1131 // the state of our chunk in case other chunks could substitute.
1132 reply->buffers_read.erase(i->first);
1133 reply->errors[i->first] = r;
1134 }
1135 for (set<hobject_t>::iterator i = op.attrs_to_read.begin();
1136 i != op.attrs_to_read.end();
1137 ++i) {
1138 dout(10) << __func__ << ": fulfilling attr request on "
1139 << *i << dendl;
1140 if (reply->errors.count(*i))
1141 continue;
1142 int r = store->getattrs(
1143 ch,
1144 ghobject_t(
11fdf7f2 1145 *i, ghobject_t::NO_GEN, shard),
7c673cae
FG
1146 reply->attrs_read[*i]);
1147 if (r < 0) {
91327a77
AA
1148 // If we read error, we should not return the attrs too.
1149 reply->attrs_read.erase(*i);
7c673cae
FG
1150 reply->buffers_read.erase(*i);
1151 reply->errors[*i] = r;
1152 }
1153 }
1154 reply->from = get_parent()->whoami_shard();
1155 reply->tid = op.tid;
1156}
1157
1158void ECBackend::handle_sub_write_reply(
1159 pg_shard_t from,
1160 const ECSubWriteReply &op,
1161 const ZTracer::Trace &trace)
1162{
1163 map<ceph_tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
11fdf7f2 1164 ceph_assert(i != tid_to_op_map.end());
7c673cae
FG
1165 if (op.committed) {
1166 trace.event("sub write committed");
11fdf7f2 1167 ceph_assert(i->second.pending_commit.count(from));
7c673cae
FG
1168 i->second.pending_commit.erase(from);
1169 if (from != get_parent()->whoami_shard()) {
1170 get_parent()->update_peer_last_complete_ondisk(from, op.last_complete);
1171 }
1172 }
1173 if (op.applied) {
1174 trace.event("sub write applied");
11fdf7f2 1175 ceph_assert(i->second.pending_apply.count(from));
7c673cae
FG
1176 i->second.pending_apply.erase(from);
1177 }
1178
11fdf7f2
TL
1179 if (i->second.pending_commit.empty() &&
1180 i->second.on_all_commit &&
1181 // also wait for apply, to preserve ordering with luminous peers.
1182 i->second.pending_apply.empty()) {
7c673cae
FG
1183 dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
1184 i->second.on_all_commit->complete(0);
1185 i->second.on_all_commit = 0;
1186 i->second.trace.event("ec write all committed");
1187 }
1188 check_ops();
1189}
1190
1191void ECBackend::handle_sub_read_reply(
1192 pg_shard_t from,
1193 ECSubReadReply &op,
1194 RecoveryMessages *m,
1195 const ZTracer::Trace &trace)
1196{
1197 trace.event("ec sub read reply");
1198 dout(10) << __func__ << ": reply " << op << dendl;
1199 map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
1200 if (iter == tid_to_read_map.end()) {
1201 //canceled
1202 dout(20) << __func__ << ": dropped " << op << dendl;
1203 return;
1204 }
1205 ReadOp &rop = iter->second;
1206 for (auto i = op.buffers_read.begin();
1207 i != op.buffers_read.end();
1208 ++i) {
11fdf7f2 1209 ceph_assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer
7c673cae
FG
1210 if (!rop.to_read.count(i->first)) {
1211 // We canceled this read! @see filter_read_op
1212 dout(20) << __func__ << " to_read skipping" << dendl;
1213 continue;
1214 }
1215 list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter =
1216 rop.to_read.find(i->first)->second.to_read.begin();
1217 list<
1218 boost::tuple<
1219 uint64_t, uint64_t, map<pg_shard_t, bufferlist> > >::iterator riter =
1220 rop.complete[i->first].returned.begin();
1221 for (list<pair<uint64_t, bufferlist> >::iterator j = i->second.begin();
1222 j != i->second.end();
1223 ++j, ++req_iter, ++riter) {
11fdf7f2
TL
1224 ceph_assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
1225 ceph_assert(riter != rop.complete[i->first].returned.end());
7c673cae
FG
1226 pair<uint64_t, uint64_t> adjusted =
1227 sinfo.aligned_offset_len_to_chunk(
1228 make_pair(req_iter->get<0>(), req_iter->get<1>()));
11fdf7f2 1229 ceph_assert(adjusted.first == j->first);
f67539c2 1230 riter->get<2>()[from] = std::move(j->second);
7c673cae
FG
1231 }
1232 }
1233 for (auto i = op.attrs_read.begin();
1234 i != op.attrs_read.end();
1235 ++i) {
11fdf7f2 1236 ceph_assert(!op.errors.count(i->first)); // if read error better not have sent an attribute
7c673cae
FG
1237 if (!rop.to_read.count(i->first)) {
1238 // We canceled this read! @see filter_read_op
1239 dout(20) << __func__ << " to_read skipping" << dendl;
1240 continue;
1241 }
20effc67 1242 rop.complete[i->first].attrs.emplace();
7c673cae
FG
1243 (*(rop.complete[i->first].attrs)).swap(i->second);
1244 }
1245 for (auto i = op.errors.begin();
1246 i != op.errors.end();
1247 ++i) {
1248 rop.complete[i->first].errors.insert(
1249 make_pair(
1250 from,
1251 i->second));
1252 dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl;
1253 }
1254
1255 map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
1256 shard_to_read_map.find(from);
11fdf7f2
TL
1257 ceph_assert(siter != shard_to_read_map.end());
1258 ceph_assert(siter->second.count(op.tid));
7c673cae
FG
1259 siter->second.erase(op.tid);
1260
11fdf7f2 1261 ceph_assert(rop.in_progress.count(from));
7c673cae
FG
1262 rop.in_progress.erase(from);
1263 unsigned is_complete = 0;
f67539c2 1264 bool need_resend = false;
7c673cae
FG
1265 // For redundant reads check for completion as each shard comes in,
1266 // or in a non-recovery read check for completion once all the shards read.
b32b8144 1267 if (rop.do_redundant_reads || rop.in_progress.empty()) {
7c673cae
FG
1268 for (map<hobject_t, read_result_t>::const_iterator iter =
1269 rop.complete.begin();
1270 iter != rop.complete.end();
1271 ++iter) {
1272 set<int> have;
1273 for (map<pg_shard_t, bufferlist>::const_iterator j =
1274 iter->second.returned.front().get<2>().begin();
1275 j != iter->second.returned.front().get<2>().end();
1276 ++j) {
1277 have.insert(j->first.shard);
1278 dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
1279 }
11fdf7f2 1280 map<int, vector<pair<int, int>>> dummy_minimum;
7c673cae 1281 int err;
28e407b8 1282 if ((err = ec_impl->minimum_to_decode(rop.want_to_read[iter->first], have, &dummy_minimum)) < 0) {
7c673cae
FG
1283 dout(20) << __func__ << " minimum_to_decode failed" << dendl;
1284 if (rop.in_progress.empty()) {
11fdf7f2
TL
1285 // If we don't have enough copies, try other pg_shard_ts if available.
1286 // During recovery there may be multiple osds with copies of the same shard,
1287 // so getting EIO from one may result in multiple passes through this code path.
7c673cae
FG
1288 if (!rop.do_redundant_reads) {
1289 int r = send_all_remaining_reads(iter->first, rop);
1290 if (r == 0) {
f67539c2
TL
1291 // We changed the rop's to_read and not incrementing is_complete
1292 need_resend = true;
7c673cae
FG
1293 continue;
1294 }
1295 // Couldn't read any additional shards so handle as completed with errors
1296 }
c07f9fc5
FG
1297 // We don't want to confuse clients / RBD with objectstore error
1298 // values in particular ENOENT. We may have different error returns
1299 // from different shards, so we'll return minimum_to_decode() error
1300 // (usually EIO) to reader. It is likely an error here is due to a
1301 // damaged pg.
7c673cae
FG
1302 rop.complete[iter->first].r = err;
1303 ++is_complete;
1304 }
1305 } else {
11fdf7f2 1306 ceph_assert(rop.complete[iter->first].r == 0);
7c673cae
FG
1307 if (!rop.complete[iter->first].errors.empty()) {
1308 if (cct->_conf->osd_read_ec_check_for_errors) {
1309 dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl;
1310 err = rop.complete[iter->first].errors.begin()->second;
1311 rop.complete[iter->first].r = err;
1312 } else {
c07f9fc5 1313 get_parent()->clog_warn() << "Error(s) ignored for "
7c673cae
FG
1314 << iter->first << " enough copies available";
1315 dout(10) << __func__ << " Error(s) ignored for " << iter->first
1316 << " enough copies available" << dendl;
1317 rop.complete[iter->first].errors.clear();
1318 }
1319 }
f67539c2
TL
1320 // avoid re-read for completed object as we may send remaining reads for uncopmpleted objects
1321 rop.to_read.at(iter->first).need.clear();
1322 rop.to_read.at(iter->first).want_attrs = false;
7c673cae
FG
1323 ++is_complete;
1324 }
1325 }
1326 }
f67539c2
TL
1327 if (need_resend) {
1328 do_read_op(rop);
1329 } else if (rop.in_progress.empty() ||
1330 is_complete == rop.complete.size()) {
7c673cae
FG
1331 dout(20) << __func__ << " Complete: " << rop << dendl;
1332 rop.trace.event("ec read complete");
1333 complete_read_op(rop, m);
1334 } else {
1335 dout(10) << __func__ << " readop not complete: " << rop << dendl;
1336 }
1337}
1338
1339void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
1340{
1341 map<hobject_t, read_request_t>::iterator reqiter =
1342 rop.to_read.begin();
1343 map<hobject_t, read_result_t>::iterator resiter =
1344 rop.complete.begin();
11fdf7f2 1345 ceph_assert(rop.to_read.size() == rop.complete.size());
7c673cae
FG
1346 for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
1347 if (reqiter->second.cb) {
1348 pair<RecoveryMessages *, read_result_t &> arg(
1349 m, resiter->second);
1350 reqiter->second.cb->complete(arg);
11fdf7f2 1351 reqiter->second.cb = nullptr;
7c673cae
FG
1352 }
1353 }
11fdf7f2
TL
1354 // if the read op is over. clean all the data of this tid.
1355 for (set<pg_shard_t>::iterator iter = rop.in_progress.begin();
1356 iter != rop.in_progress.end();
1357 iter++) {
1358 shard_to_read_map[*iter].erase(rop.tid);
1359 }
1360 rop.in_progress.clear();
7c673cae
FG
1361 tid_to_read_map.erase(rop.tid);
1362}
1363
1364struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
1365 ECBackend *ec;
1366 ceph_tid_t tid;
1367 FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
1368 void finish(ThreadPool::TPHandle &handle) override {
1369 auto ropiter = ec->tid_to_read_map.find(tid);
11fdf7f2 1370 ceph_assert(ropiter != ec->tid_to_read_map.end());
7c673cae
FG
1371 int priority = ropiter->second.priority;
1372 RecoveryMessages rm;
1373 ec->complete_read_op(ropiter->second, &rm);
1374 ec->dispatch_recovery_messages(rm, priority);
1375 }
1376};
1377
1378void ECBackend::filter_read_op(
1379 const OSDMapRef& osdmap,
1380 ReadOp &op)
1381{
1382 set<hobject_t> to_cancel;
1383 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1384 i != op.source_to_obj.end();
1385 ++i) {
1386 if (osdmap->is_down(i->first.osd)) {
1387 to_cancel.insert(i->second.begin(), i->second.end());
1388 op.in_progress.erase(i->first);
1389 continue;
1390 }
1391 }
1392
1393 if (to_cancel.empty())
1394 return;
1395
1396 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1397 i != op.source_to_obj.end();
1398 ) {
1399 for (set<hobject_t>::iterator j = i->second.begin();
1400 j != i->second.end();
1401 ) {
1402 if (to_cancel.count(*j))
1403 i->second.erase(j++);
1404 else
1405 ++j;
1406 }
1407 if (i->second.empty()) {
1408 op.source_to_obj.erase(i++);
1409 } else {
11fdf7f2 1410 ceph_assert(!osdmap->is_down(i->first.osd));
7c673cae
FG
1411 ++i;
1412 }
1413 }
1414
1415 for (set<hobject_t>::iterator i = to_cancel.begin();
1416 i != to_cancel.end();
1417 ++i) {
1418 get_parent()->cancel_pull(*i);
1419
11fdf7f2 1420 ceph_assert(op.to_read.count(*i));
7c673cae
FG
1421 read_request_t &req = op.to_read.find(*i)->second;
1422 dout(10) << __func__ << ": canceling " << req
1423 << " for obj " << *i << dendl;
11fdf7f2 1424 ceph_assert(req.cb);
7c673cae 1425 delete req.cb;
11fdf7f2 1426 req.cb = nullptr;
7c673cae
FG
1427
1428 op.to_read.erase(*i);
1429 op.complete.erase(*i);
1430 recovery_ops.erase(*i);
1431 }
1432
1433 if (op.in_progress.empty()) {
1e59de90
TL
1434 /* This case is odd. filter_read_op gets called while processing
1435 * an OSDMap. Normal, non-recovery reads only happen from acting
1436 * set osds. For this op to have had a read source go down and
1437 * there not be an interval change, it must be part of a pull during
1438 * log-based recovery.
1439 *
1440 * This callback delays calling complete_read_op until later to avoid
1441 * dealing with recovery while handling an OSDMap. We assign a
1442 * cost here of 1 because:
1443 * 1) This should be very rare, and the operation itself was already
1444 * throttled.
1445 * 2) It shouldn't result in IO, rather it should result in restarting
1446 * the pull on the affected objects and pushes from in-memory buffers
1447 * on any now complete unaffected objects.
1448 */
7c673cae 1449 get_parent()->schedule_recovery_work(
11fdf7f2 1450 get_parent()->bless_unlocked_gencontext(
1e59de90
TL
1451 new FinishReadOp(this, op.tid)),
1452 1);
7c673cae
FG
1453 }
1454}
1455
1456void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
1457{
1458 set<ceph_tid_t> tids_to_filter;
1459 for (map<pg_shard_t, set<ceph_tid_t> >::iterator
1460 i = shard_to_read_map.begin();
1461 i != shard_to_read_map.end();
1462 ) {
1463 if (osdmap->is_down(i->first.osd)) {
1464 tids_to_filter.insert(i->second.begin(), i->second.end());
1465 shard_to_read_map.erase(i++);
1466 } else {
1467 ++i;
1468 }
1469 }
1470 for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
1471 i != tids_to_filter.end();
1472 ++i) {
1473 map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
11fdf7f2 1474 ceph_assert(j != tid_to_read_map.end());
7c673cae
FG
1475 filter_read_op(osdmap, j->second);
1476 }
1477}
1478
1479void ECBackend::on_change()
1480{
1481 dout(10) << __func__ << dendl;
1482
1483 completed_to = eversion_t();
1484 committed_to = eversion_t();
1485 pipeline_state.clear();
1486 waiting_reads.clear();
1487 waiting_state.clear();
1488 waiting_commit.clear();
1489 for (auto &&op: tid_to_op_map) {
1490 cache.release_write_pin(op.second.pin);
1491 }
1492 tid_to_op_map.clear();
1493
1494 for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
1495 i != tid_to_read_map.end();
1496 ++i) {
1497 dout(10) << __func__ << ": cancelling " << i->second << dendl;
1498 for (map<hobject_t, read_request_t>::iterator j =
1499 i->second.to_read.begin();
1500 j != i->second.to_read.end();
1501 ++j) {
1502 delete j->second.cb;
11fdf7f2 1503 j->second.cb = nullptr;
7c673cae
FG
1504 }
1505 }
1506 tid_to_read_map.clear();
1507 in_progress_client_reads.clear();
1508 shard_to_read_map.clear();
1509 clear_recovery_state();
1510}
1511
1512void ECBackend::clear_recovery_state()
1513{
1514 recovery_ops.clear();
1515}
1516
7c673cae
FG
1517void ECBackend::dump_recovery_info(Formatter *f) const
1518{
1519 f->open_array_section("recovery_ops");
1520 for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
1521 i != recovery_ops.end();
1522 ++i) {
1523 f->open_object_section("op");
1524 i->second.dump(f);
1525 f->close_section();
1526 }
1527 f->close_section();
1528 f->open_array_section("read_ops");
1529 for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
1530 i != tid_to_read_map.end();
1531 ++i) {
1532 f->open_object_section("read_op");
1533 i->second.dump(f);
1534 f->close_section();
1535 }
1536 f->close_section();
1537}
1538
1539void ECBackend::submit_transaction(
1540 const hobject_t &hoid,
1541 const object_stat_sum_t &delta_stats,
1542 const eversion_t &at_version,
1543 PGTransactionUPtr &&t,
1544 const eversion_t &trim_to,
9f95a23c 1545 const eversion_t &min_last_complete_ondisk,
f67539c2 1546 vector<pg_log_entry_t>&& log_entries,
9f95a23c 1547 std::optional<pg_hit_set_history_t> &hset_history,
7c673cae
FG
1548 Context *on_all_commit,
1549 ceph_tid_t tid,
1550 osd_reqid_t reqid,
1551 OpRequestRef client_op
1552 )
1553{
11fdf7f2 1554 ceph_assert(!tid_to_op_map.count(tid));
7c673cae
FG
1555 Op *op = &(tid_to_op_map[tid]);
1556 op->hoid = hoid;
1557 op->delta_stats = delta_stats;
1558 op->version = at_version;
1559 op->trim_to = trim_to;
9f95a23c 1560 op->roll_forward_to = std::max(min_last_complete_ondisk, committed_to);
7c673cae
FG
1561 op->log_entries = log_entries;
1562 std::swap(op->updated_hit_set_history, hset_history);
7c673cae
FG
1563 op->on_all_commit = on_all_commit;
1564 op->tid = tid;
1565 op->reqid = reqid;
1566 op->client_op = client_op;
20effc67 1567 if (client_op) {
7c673cae 1568 op->trace = client_op->pg_trace;
f67539c2 1569 }
7c673cae
FG
1570 dout(10) << __func__ << ": op " << *op << " starting" << dendl;
1571 start_rmw(op, std::move(t));
7c673cae
FG
1572}
1573
1574void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
1575 if (!waiting_state.empty()) {
1576 waiting_state.back().on_write.emplace_back(std::move(cb));
1577 } else if (!waiting_reads.empty()) {
1578 waiting_reads.back().on_write.emplace_back(std::move(cb));
1579 } else {
1580 // Nothing earlier in the pipeline, just call it
1581 cb();
1582 }
1583}
1584
b32b8144 1585void ECBackend::get_all_avail_shards(
7c673cae 1586 const hobject_t &hoid,
28e407b8 1587 const set<pg_shard_t> &error_shards,
b32b8144
FG
1588 set<int> &have,
1589 map<shard_id_t, pg_shard_t> &shards,
1590 bool for_recovery)
7c673cae 1591{
7c673cae
FG
1592 for (set<pg_shard_t>::const_iterator i =
1593 get_parent()->get_acting_shards().begin();
1594 i != get_parent()->get_acting_shards().end();
1595 ++i) {
1596 dout(10) << __func__ << ": checking acting " << *i << dendl;
1597 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
28e407b8
AA
1598 if (error_shards.find(*i) != error_shards.end())
1599 continue;
7c673cae 1600 if (!missing.is_missing(hoid)) {
11fdf7f2 1601 ceph_assert(!have.count(i->shard));
7c673cae 1602 have.insert(i->shard);
11fdf7f2 1603 ceph_assert(!shards.count(i->shard));
7c673cae
FG
1604 shards.insert(make_pair(i->shard, *i));
1605 }
1606 }
1607
1608 if (for_recovery) {
1609 for (set<pg_shard_t>::const_iterator i =
1610 get_parent()->get_backfill_shards().begin();
1611 i != get_parent()->get_backfill_shards().end();
1612 ++i) {
28e407b8
AA
1613 if (error_shards.find(*i) != error_shards.end())
1614 continue;
7c673cae 1615 if (have.count(i->shard)) {
11fdf7f2 1616 ceph_assert(shards.count(i->shard));
7c673cae
FG
1617 continue;
1618 }
1619 dout(10) << __func__ << ": checking backfill " << *i << dendl;
11fdf7f2 1620 ceph_assert(!shards.count(i->shard));
7c673cae
FG
1621 const pg_info_t &info = get_parent()->get_shard_info(*i);
1622 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1623 if (hoid < info.last_backfill &&
1624 !missing.is_missing(hoid)) {
1625 have.insert(i->shard);
1626 shards.insert(make_pair(i->shard, *i));
1627 }
1628 }
1629
1630 map<hobject_t, set<pg_shard_t>>::const_iterator miter =
1631 get_parent()->get_missing_loc_shards().find(hoid);
1632 if (miter != get_parent()->get_missing_loc_shards().end()) {
1633 for (set<pg_shard_t>::iterator i = miter->second.begin();
1634 i != miter->second.end();
1635 ++i) {
1636 dout(10) << __func__ << ": checking missing_loc " << *i << dendl;
1637 auto m = get_parent()->maybe_get_shard_missing(*i);
1638 if (m) {
11fdf7f2 1639 ceph_assert(!(*m).is_missing(hoid));
7c673cae 1640 }
28e407b8
AA
1641 if (error_shards.find(*i) != error_shards.end())
1642 continue;
7c673cae
FG
1643 have.insert(i->shard);
1644 shards.insert(make_pair(i->shard, *i));
1645 }
1646 }
1647 }
b32b8144
FG
1648}
1649
1650int ECBackend::get_min_avail_to_read_shards(
1651 const hobject_t &hoid,
1652 const set<int> &want,
1653 bool for_recovery,
1654 bool do_redundant_reads,
11fdf7f2 1655 map<pg_shard_t, vector<pair<int, int>>> *to_read)
b32b8144
FG
1656{
1657 // Make sure we don't do redundant reads for recovery
11fdf7f2 1658 ceph_assert(!for_recovery || !do_redundant_reads);
b32b8144
FG
1659
1660 set<int> have;
1661 map<shard_id_t, pg_shard_t> shards;
28e407b8 1662 set<pg_shard_t> error_shards;
b32b8144 1663
28e407b8 1664 get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
7c673cae 1665
11fdf7f2 1666 map<int, vector<pair<int, int>>> need;
7c673cae
FG
1667 int r = ec_impl->minimum_to_decode(want, have, &need);
1668 if (r < 0)
1669 return r;
1670
1671 if (do_redundant_reads) {
11fdf7f2
TL
1672 vector<pair<int, int>> subchunks_list;
1673 subchunks_list.push_back(make_pair(0, ec_impl->get_sub_chunk_count()));
1674 for (auto &&i: have) {
1675 need[i] = subchunks_list;
1676 }
7c673cae
FG
1677 }
1678
1679 if (!to_read)
1680 return 0;
1681
11fdf7f2
TL
1682 for (auto &&i:need) {
1683 ceph_assert(shards.count(shard_id_t(i.first)));
1684 to_read->insert(make_pair(shards[shard_id_t(i.first)], i.second));
7c673cae
FG
1685 }
1686 return 0;
1687}
1688
1689int ECBackend::get_remaining_shards(
1690 const hobject_t &hoid,
1691 const set<int> &avail,
28e407b8
AA
1692 const set<int> &want,
1693 const read_result_t &result,
11fdf7f2 1694 map<pg_shard_t, vector<pair<int, int>>> *to_read,
b32b8144 1695 bool for_recovery)
7c673cae 1696{
11fdf7f2 1697 ceph_assert(to_read);
7c673cae 1698
b32b8144
FG
1699 set<int> have;
1700 map<shard_id_t, pg_shard_t> shards;
28e407b8
AA
1701 set<pg_shard_t> error_shards;
1702 for (auto &p : result.errors) {
1703 error_shards.insert(p.first);
1704 }
1705
1706 get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
1707
11fdf7f2 1708 map<int, vector<pair<int, int>>> need;
28e407b8
AA
1709 int r = ec_impl->minimum_to_decode(want, have, &need);
1710 if (r < 0) {
1711 dout(0) << __func__ << " not enough shards left to try for " << hoid
1712 << " read result was " << result << dendl;
1713 return -EIO;
1714 }
7c673cae 1715
28e407b8
AA
1716 set<int> shards_left;
1717 for (auto p : need) {
11fdf7f2
TL
1718 if (avail.find(p.first) == avail.end()) {
1719 shards_left.insert(p.first);
28e407b8
AA
1720 }
1721 }
7c673cae 1722
11fdf7f2
TL
1723 vector<pair<int, int>> subchunks;
1724 subchunks.push_back(make_pair(0, ec_impl->get_sub_chunk_count()));
28e407b8
AA
1725 for (set<int>::iterator i = shards_left.begin();
1726 i != shards_left.end();
7c673cae 1727 ++i) {
11fdf7f2
TL
1728 ceph_assert(shards.count(shard_id_t(*i)));
1729 ceph_assert(avail.find(*i) == avail.end());
1730 to_read->insert(make_pair(shards[shard_id_t(*i)], subchunks));
7c673cae
FG
1731 }
1732 return 0;
1733}
1734
1735void ECBackend::start_read_op(
1736 int priority,
28e407b8 1737 map<hobject_t, set<int>> &want_to_read,
7c673cae
FG
1738 map<hobject_t, read_request_t> &to_read,
1739 OpRequestRef _op,
1740 bool do_redundant_reads,
1741 bool for_recovery)
1742{
1743 ceph_tid_t tid = get_parent()->get_tid();
11fdf7f2 1744 ceph_assert(!tid_to_read_map.count(tid));
7c673cae
FG
1745 auto &op = tid_to_read_map.emplace(
1746 tid,
1747 ReadOp(
1748 priority,
1749 tid,
1750 do_redundant_reads,
1751 for_recovery,
1752 _op,
28e407b8 1753 std::move(want_to_read),
7c673cae
FG
1754 std::move(to_read))).first->second;
1755 dout(10) << __func__ << ": starting " << op << dendl;
1756 if (_op) {
1757 op.trace = _op->pg_trace;
1758 op.trace.event("start ec read");
1759 }
1760 do_read_op(op);
1761}
1762
1763void ECBackend::do_read_op(ReadOp &op)
1764{
1765 int priority = op.priority;
1766 ceph_tid_t tid = op.tid;
1767
1768 dout(10) << __func__ << ": starting read " << op << dendl;
1769
1770 map<pg_shard_t, ECSubRead> messages;
1771 for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
1772 i != op.to_read.end();
1773 ++i) {
1774 bool need_attrs = i->second.want_attrs;
11fdf7f2
TL
1775
1776 for (auto j = i->second.need.begin();
7c673cae
FG
1777 j != i->second.need.end();
1778 ++j) {
1779 if (need_attrs) {
11fdf7f2 1780 messages[j->first].attrs_to_read.insert(i->first);
7c673cae
FG
1781 need_attrs = false;
1782 }
11fdf7f2
TL
1783 messages[j->first].subchunks[i->first] = j->second;
1784 op.obj_to_source[i->first].insert(j->first);
1785 op.source_to_obj[j->first].insert(i->first);
7c673cae
FG
1786 }
1787 for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
1788 i->second.to_read.begin();
1789 j != i->second.to_read.end();
1790 ++j) {
1791 pair<uint64_t, uint64_t> chunk_off_len =
1792 sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
11fdf7f2 1793 for (auto k = i->second.need.begin();
7c673cae
FG
1794 k != i->second.need.end();
1795 ++k) {
11fdf7f2 1796 messages[k->first].to_read[i->first].push_back(
7c673cae
FG
1797 boost::make_tuple(
1798 chunk_off_len.first,
1799 chunk_off_len.second,
1800 j->get<2>()));
1801 }
11fdf7f2 1802 ceph_assert(!need_attrs);
7c673cae
FG
1803 }
1804 }
1805
9f95a23c
TL
1806 std::vector<std::pair<int, Message*>> m;
1807 m.reserve(messages.size());
7c673cae
FG
1808 for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
1809 i != messages.end();
1810 ++i) {
1811 op.in_progress.insert(i->first);
1812 shard_to_read_map[i->first].insert(op.tid);
1813 i->second.tid = tid;
1814 MOSDECSubOpRead *msg = new MOSDECSubOpRead;
1815 msg->set_priority(priority);
1816 msg->pgid = spg_t(
1817 get_parent()->whoami_spg_t().pgid,
1818 i->first.shard);
11fdf7f2 1819 msg->map_epoch = get_osdmap_epoch();
7c673cae
FG
1820 msg->min_epoch = get_parent()->get_interval_start_epoch();
1821 msg->op = i->second;
1822 msg->op.from = get_parent()->whoami_shard();
1823 msg->op.tid = tid;
1824 if (op.trace) {
1825 // initialize a child span for this shard
1826 msg->trace.init("ec sub read", nullptr, &op.trace);
1827 msg->trace.keyval("shard", i->first.shard.id);
1828 }
9f95a23c 1829 m.push_back(std::make_pair(i->first.osd, msg));
7c673cae 1830 }
9f95a23c
TL
1831 if (!m.empty()) {
1832 get_parent()->send_message_osd_cluster(m, get_osdmap_epoch());
1833 }
1834
7c673cae
FG
1835 dout(10) << __func__ << ": started " << op << dendl;
1836}
1837
1838ECUtil::HashInfoRef ECBackend::get_hash_info(
20effc67 1839 const hobject_t &hoid, bool create, const map<string,bufferptr,less<>> *attrs)
7c673cae
FG
1840{
1841 dout(10) << __func__ << ": Getting attr on " << hoid << dendl;
1842 ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid);
1843 if (!ref) {
1844 dout(10) << __func__ << ": not in cache " << hoid << dendl;
1845 struct stat st;
1846 int r = store->stat(
1847 ch,
1848 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1849 &st);
1850 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
7c673cae
FG
1851 if (r >= 0) {
1852 dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl;
1853 bufferlist bl;
1854 if (attrs) {
1855 map<string, bufferptr>::const_iterator k = attrs->find(ECUtil::get_hinfo_key());
1856 if (k == attrs->end()) {
1857 dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl;
1858 } else {
1859 bl.push_back(k->second);
1860 }
1861 } else {
1862 r = store->getattr(
1863 ch,
1864 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1865 ECUtil::get_hinfo_key(),
1866 bl);
1867 if (r < 0) {
1868 dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl;
1869 bl.clear(); // just in case
1870 }
1871 }
1872 if (bl.length() > 0) {
11fdf7f2 1873 auto bp = bl.cbegin();
94b18763 1874 try {
11fdf7f2 1875 decode(hinfo, bp);
94b18763
FG
1876 } catch(...) {
1877 dout(0) << __func__ << ": Can't decode hinfo for " << hoid << dendl;
1878 return ECUtil::HashInfoRef();
1879 }
a4b75251 1880 if (hinfo.get_total_chunk_size() != (uint64_t)st.st_size) {
7c673cae
FG
1881 dout(0) << __func__ << ": Mismatch of total_chunk_size "
1882 << hinfo.get_total_chunk_size() << dendl;
1883 return ECUtil::HashInfoRef();
1884 }
1885 } else if (st.st_size > 0) { // If empty object and no hinfo, create it
1886 return ECUtil::HashInfoRef();
1887 }
a4b75251
TL
1888 } else if (r != -ENOENT || !create) {
1889 derr << __func__ << ": stat " << hoid << " failed: " << cpp_strerror(r)
1890 << dendl;
1891 return ECUtil::HashInfoRef();
7c673cae
FG
1892 }
1893 ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
1894 }
1895 return ref;
1896}
1897
1898void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
1899{
11fdf7f2 1900 ceph_assert(op);
7c673cae
FG
1901
1902 op->plan = ECTransaction::get_write_plan(
1903 sinfo,
1904 std::move(t),
1905 [&](const hobject_t &i) {
a4b75251 1906 ECUtil::HashInfoRef ref = get_hash_info(i, true);
7c673cae
FG
1907 if (!ref) {
1908 derr << __func__ << ": get_hash_info(" << i << ")"
1909 << " returned a null pointer and there is no "
1910 << " way to recover from such an error in this "
1911 << " context" << dendl;
1912 ceph_abort();
1913 }
1914 return ref;
1915 },
1916 get_parent()->get_dpp());
1917
1918 dout(10) << __func__ << ": " << *op << dendl;
1919
1920 waiting_state.push_back(*op);
1921 check_ops();
1922}
1923
1924bool ECBackend::try_state_to_reads()
1925{
1926 if (waiting_state.empty())
1927 return false;
1928
1929 Op *op = &(waiting_state.front());
1930 if (op->requires_rmw() && pipeline_state.cache_invalid()) {
11fdf7f2 1931 ceph_assert(get_parent()->get_pool().allows_ecoverwrites());
7c673cae
FG
1932 dout(20) << __func__ << ": blocking " << *op
1933 << " because it requires an rmw and the cache is invalid "
1934 << pipeline_state
1935 << dendl;
1936 return false;
1937 }
1938
11fdf7f2
TL
1939 if (!pipeline_state.caching_enabled()) {
1940 op->using_cache = false;
1941 } else if (op->invalidates_cache()) {
7c673cae
FG
1942 dout(20) << __func__ << ": invalidating cache after this op"
1943 << dendl;
1944 pipeline_state.invalidate();
7c673cae
FG
1945 }
1946
1947 waiting_state.pop_front();
1948 waiting_reads.push_back(*op);
1949
1950 if (op->using_cache) {
1951 cache.open_write_pin(op->pin);
1952
1953 extent_set empty;
1954 for (auto &&hpair: op->plan.will_write) {
1955 auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
1956 const extent_set &to_read_plan =
1957 to_read_plan_iter == op->plan.to_read.end() ?
1958 empty :
1959 to_read_plan_iter->second;
1960
1961 extent_set remote_read = cache.reserve_extents_for_rmw(
1962 hpair.first,
1963 op->pin,
1964 hpair.second,
1965 to_read_plan);
1966
1967 extent_set pending_read = to_read_plan;
1968 pending_read.subtract(remote_read);
1969
1970 if (!remote_read.empty()) {
1971 op->remote_read[hpair.first] = std::move(remote_read);
1972 }
1973 if (!pending_read.empty()) {
1974 op->pending_read[hpair.first] = std::move(pending_read);
1975 }
1976 }
1977 } else {
1978 op->remote_read = op->plan.to_read;
1979 }
1980
1981 dout(10) << __func__ << ": " << *op << dendl;
1982
1983 if (!op->remote_read.empty()) {
11fdf7f2 1984 ceph_assert(get_parent()->get_pool().allows_ecoverwrites());
7c673cae
FG
1985 objects_read_async_no_cache(
1986 op->remote_read,
1987 [this, op](map<hobject_t,pair<int, extent_map> > &&results) {
1988 for (auto &&i: results) {
1989 op->remote_read_result.emplace(i.first, i.second.second);
1990 }
1991 check_ops();
1992 });
1993 }
1994
1995 return true;
1996}
1997
1998bool ECBackend::try_reads_to_commit()
1999{
2000 if (waiting_reads.empty())
2001 return false;
2002 Op *op = &(waiting_reads.front());
2003 if (op->read_in_progress())
2004 return false;
2005 waiting_reads.pop_front();
2006 waiting_commit.push_back(*op);
2007
2008 dout(10) << __func__ << ": starting commit on " << *op << dendl;
2009 dout(20) << __func__ << ": " << cache << dendl;
2010
2011 get_parent()->apply_stats(
2012 op->hoid,
2013 op->delta_stats);
2014
2015 if (op->using_cache) {
2016 for (auto &&hpair: op->pending_read) {
2017 op->remote_read_result[hpair.first].insert(
2018 cache.get_remaining_extents_for_rmw(
2019 hpair.first,
2020 op->pin,
2021 hpair.second));
2022 }
2023 op->pending_read.clear();
2024 } else {
11fdf7f2 2025 ceph_assert(op->pending_read.empty());
7c673cae
FG
2026 }
2027
2028 map<shard_id_t, ObjectStore::Transaction> trans;
2029 for (set<pg_shard_t>::const_iterator i =
11fdf7f2
TL
2030 get_parent()->get_acting_recovery_backfill_shards().begin();
2031 i != get_parent()->get_acting_recovery_backfill_shards().end();
7c673cae
FG
2032 ++i) {
2033 trans[i->shard];
2034 }
2035
2036 op->trace.event("start ec write");
2037
2038 map<hobject_t,extent_map> written;
2039 if (op->plan.t) {
2040 ECTransaction::generate_transactions(
2041 op->plan,
2042 ec_impl,
2043 get_parent()->get_info().pgid.pgid,
7c673cae
FG
2044 sinfo,
2045 op->remote_read_result,
2046 op->log_entries,
2047 &written,
2048 &trans,
2049 &(op->temp_added),
2050 &(op->temp_cleared),
9f95a23c
TL
2051 get_parent()->get_dpp(),
2052 get_osdmap()->require_osd_release);
7c673cae
FG
2053 }
2054
2055 dout(20) << __func__ << ": " << cache << dendl;
2056 dout(20) << __func__ << ": written: " << written << dendl;
2057 dout(20) << __func__ << ": op: " << *op << dendl;
2058
2059 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2060 for (auto &&i: op->log_entries) {
2061 if (i.requires_kraken()) {
2062 derr << __func__ << ": log entry " << i << " requires kraken"
2063 << " but overwrites are not enabled!" << dendl;
2064 ceph_abort();
2065 }
2066 }
2067 }
2068
2069 map<hobject_t,extent_set> written_set;
2070 for (auto &&i: written) {
2071 written_set[i.first] = i.second.get_interval_set();
2072 }
2073 dout(20) << __func__ << ": written_set: " << written_set << dendl;
11fdf7f2 2074 ceph_assert(written_set == op->plan.will_write);
7c673cae
FG
2075
2076 if (op->using_cache) {
2077 for (auto &&hpair: written) {
2078 dout(20) << __func__ << ": " << hpair << dendl;
2079 cache.present_rmw_update(hpair.first, op->pin, hpair.second);
2080 }
2081 }
2082 op->remote_read.clear();
2083 op->remote_read_result.clear();
2084
7c673cae
FG
2085 ObjectStore::Transaction empty;
2086 bool should_write_local = false;
2087 ECSubWrite local_write_op;
9f95a23c
TL
2088 std::vector<std::pair<int, Message*>> messages;
2089 messages.reserve(get_parent()->get_acting_recovery_backfill_shards().size());
11fdf7f2 2090 set<pg_shard_t> backfill_shards = get_parent()->get_backfill_shards();
7c673cae 2091 for (set<pg_shard_t>::const_iterator i =
11fdf7f2
TL
2092 get_parent()->get_acting_recovery_backfill_shards().begin();
2093 i != get_parent()->get_acting_recovery_backfill_shards().end();
7c673cae
FG
2094 ++i) {
2095 op->pending_apply.insert(*i);
2096 op->pending_commit.insert(*i);
2097 map<shard_id_t, ObjectStore::Transaction>::iterator iter =
2098 trans.find(i->shard);
11fdf7f2 2099 ceph_assert(iter != trans.end());
7c673cae
FG
2100 bool should_send = get_parent()->should_send_op(*i, op->hoid);
2101 const pg_stat_t &stats =
11fdf7f2 2102 (should_send || !backfill_shards.count(*i)) ?
7c673cae
FG
2103 get_info().stats :
2104 parent->get_shard_info().find(*i)->second.stats;
2105
2106 ECSubWrite sop(
2107 get_parent()->whoami_shard(),
2108 op->tid,
2109 op->reqid,
2110 op->hoid,
2111 stats,
2112 should_send ? iter->second : empty,
2113 op->version,
2114 op->trim_to,
2115 op->roll_forward_to,
2116 op->log_entries,
2117 op->updated_hit_set_history,
2118 op->temp_added,
2119 op->temp_cleared,
2120 !should_send);
2121
2122 ZTracer::Trace trace;
2123 if (op->trace) {
2124 // initialize a child span for this shard
2125 trace.init("ec sub write", nullptr, &op->trace);
2126 trace.keyval("shard", i->shard.id);
2127 }
2128
2129 if (*i == get_parent()->whoami_shard()) {
2130 should_write_local = true;
2131 local_write_op.claim(sop);
2132 } else {
2133 MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop);
2134 r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard);
11fdf7f2 2135 r->map_epoch = get_osdmap_epoch();
7c673cae
FG
2136 r->min_epoch = get_parent()->get_interval_start_epoch();
2137 r->trace = trace;
9f95a23c 2138 messages.push_back(std::make_pair(i->osd, r));
7c673cae
FG
2139 }
2140 }
f67539c2 2141
9f95a23c
TL
2142 if (!messages.empty()) {
2143 get_parent()->send_message_osd_cluster(messages, get_osdmap_epoch());
2144 }
2145
7c673cae 2146 if (should_write_local) {
11fdf7f2
TL
2147 handle_sub_write(
2148 get_parent()->whoami_shard(),
2149 op->client_op,
2150 local_write_op,
2151 op->trace);
7c673cae
FG
2152 }
2153
2154 for (auto i = op->on_write.begin();
2155 i != op->on_write.end();
2156 op->on_write.erase(i++)) {
2157 (*i)();
2158 }
2159
2160 return true;
2161}
2162
2163bool ECBackend::try_finish_rmw()
2164{
2165 if (waiting_commit.empty())
2166 return false;
2167 Op *op = &(waiting_commit.front());
2168 if (op->write_in_progress())
2169 return false;
2170 waiting_commit.pop_front();
2171
2172 dout(10) << __func__ << ": " << *op << dendl;
2173 dout(20) << __func__ << ": " << cache << dendl;
2174
2175 if (op->roll_forward_to > completed_to)
2176 completed_to = op->roll_forward_to;
2177 if (op->version > committed_to)
2178 committed_to = op->version;
2179
9f95a23c 2180 if (get_osdmap()->require_osd_release >= ceph_release_t::kraken) {
7c673cae
FG
2181 if (op->version > get_parent()->get_log().get_can_rollback_to() &&
2182 waiting_reads.empty() &&
2183 waiting_commit.empty()) {
2184 // submit a dummy transaction to kick the rollforward
2185 auto tid = get_parent()->get_tid();
2186 Op *nop = &(tid_to_op_map[tid]);
2187 nop->hoid = op->hoid;
2188 nop->trim_to = op->trim_to;
2189 nop->roll_forward_to = op->version;
2190 nop->tid = tid;
2191 nop->reqid = op->reqid;
2192 waiting_reads.push_back(*nop);
2193 }
2194 }
2195
2196 if (op->using_cache) {
2197 cache.release_write_pin(op->pin);
2198 }
2199 tid_to_op_map.erase(op->tid);
2200
2201 if (waiting_reads.empty() &&
2202 waiting_commit.empty()) {
2203 pipeline_state.clear();
2204 dout(20) << __func__ << ": clearing pipeline_state "
2205 << pipeline_state
2206 << dendl;
2207 }
2208 return true;
2209}
2210
2211void ECBackend::check_ops()
2212{
2213 while (try_state_to_reads() ||
2214 try_reads_to_commit() ||
2215 try_finish_rmw());
2216}
2217
2218int ECBackend::objects_read_sync(
2219 const hobject_t &hoid,
2220 uint64_t off,
2221 uint64_t len,
2222 uint32_t op_flags,
2223 bufferlist *bl)
2224{
2225 return -EOPNOTSUPP;
2226}
2227
2228void ECBackend::objects_read_async(
2229 const hobject_t &hoid,
2230 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2231 pair<bufferlist*, Context*> > > &to_read,
2232 Context *on_complete,
2233 bool fast_read)
2234{
2235 map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
2236 reads;
2237
2238 uint32_t flags = 0;
2239 extent_set es;
2240 for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2241 pair<bufferlist*, Context*> > >::const_iterator i =
2242 to_read.begin();
2243 i != to_read.end();
2244 ++i) {
2245 pair<uint64_t, uint64_t> tmp =
2246 sinfo.offset_len_to_stripe_bounds(
2247 make_pair(i->first.get<0>(), i->first.get<1>()));
2248
11fdf7f2 2249 es.union_insert(tmp.first, tmp.second);
7c673cae
FG
2250 flags |= i->first.get<2>();
2251 }
2252
2253 if (!es.empty()) {
2254 auto &offsets = reads[hoid];
2255 for (auto j = es.begin();
2256 j != es.end();
2257 ++j) {
2258 offsets.push_back(
2259 boost::make_tuple(
2260 j.get_start(),
2261 j.get_len(),
2262 flags));
2263 }
2264 }
2265
2266 struct cb {
2267 ECBackend *ec;
2268 hobject_t hoid;
2269 list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2270 pair<bufferlist*, Context*> > > to_read;
2271 unique_ptr<Context> on_complete;
2272 cb(const cb&) = delete;
2273 cb(cb &&) = default;
2274 cb(ECBackend *ec,
2275 const hobject_t &hoid,
2276 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2277 pair<bufferlist*, Context*> > > &to_read,
2278 Context *on_complete)
2279 : ec(ec),
2280 hoid(hoid),
2281 to_read(to_read),
2282 on_complete(on_complete) {}
2283 void operator()(map<hobject_t,pair<int, extent_map> > &&results) {
2284 auto dpp = ec->get_parent()->get_dpp();
2285 ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
2286 << dendl;
2287 ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
2288 << dendl;
2289
2290 auto &got = results[hoid];
2291
2292 int r = 0;
2293 for (auto &&read: to_read) {
2294 if (got.first < 0) {
2295 if (read.second.second) {
2296 read.second.second->complete(got.first);
2297 }
2298 if (r == 0)
2299 r = got.first;
2300 } else {
11fdf7f2 2301 ceph_assert(read.second.first);
7c673cae
FG
2302 uint64_t offset = read.first.get<0>();
2303 uint64_t length = read.first.get<1>();
2304 auto range = got.second.get_containing_range(offset, length);
11fdf7f2
TL
2305 ceph_assert(range.first != range.second);
2306 ceph_assert(range.first.get_off() <= offset);
91327a77
AA
2307 ldpp_dout(dpp, 30) << "offset: " << offset << dendl;
2308 ldpp_dout(dpp, 30) << "range offset: " << range.first.get_off() << dendl;
2309 ldpp_dout(dpp, 30) << "length: " << length << dendl;
2310 ldpp_dout(dpp, 30) << "range length: " << range.first.get_len() << dendl;
11fdf7f2 2311 ceph_assert(
7c673cae
FG
2312 (offset + length) <=
2313 (range.first.get_off() + range.first.get_len()));
2314 read.second.first->substr_of(
2315 range.first.get_val(),
2316 offset - range.first.get_off(),
2317 length);
2318 if (read.second.second) {
2319 read.second.second->complete(length);
2320 read.second.second = nullptr;
2321 }
2322 }
2323 }
2324 to_read.clear();
2325 if (on_complete) {
2326 on_complete.release()->complete(r);
2327 }
2328 }
2329 ~cb() {
2330 for (auto &&i: to_read) {
2331 delete i.second.second;
2332 }
2333 to_read.clear();
2334 }
2335 };
2336 objects_read_and_reconstruct(
2337 reads,
2338 fast_read,
2339 make_gen_lambda_context<
2340 map<hobject_t,pair<int, extent_map> > &&, cb>(
2341 cb(this,
2342 hoid,
2343 to_read,
2344 on_complete)));
2345}
2346
2347struct CallClientContexts :
2348 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
2349 hobject_t hoid;
2350 ECBackend *ec;
2351 ECBackend::ClientAsyncReadStatus *status;
2352 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
2353 CallClientContexts(
2354 hobject_t hoid,
2355 ECBackend *ec,
2356 ECBackend::ClientAsyncReadStatus *status,
2357 const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
2358 : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
2359 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
2360 ECBackend::read_result_t &res = in.second;
2361 extent_map result;
2362 if (res.r != 0)
2363 goto out;
11fdf7f2
TL
2364 ceph_assert(res.returned.size() == to_read.size());
2365 ceph_assert(res.errors.empty());
7c673cae
FG
2366 for (auto &&read: to_read) {
2367 pair<uint64_t, uint64_t> adjusted =
2368 ec->sinfo.offset_len_to_stripe_bounds(
2369 make_pair(read.get<0>(), read.get<1>()));
1e59de90
TL
2370 ceph_assert(res.returned.front().get<0>() == adjusted.first);
2371 ceph_assert(res.returned.front().get<1>() == adjusted.second);
7c673cae
FG
2372 map<int, bufferlist> to_decode;
2373 bufferlist bl;
2374 for (map<pg_shard_t, bufferlist>::iterator j =
2375 res.returned.front().get<2>().begin();
2376 j != res.returned.front().get<2>().end();
2377 ++j) {
f67539c2 2378 to_decode[j->first.shard] = std::move(j->second);
7c673cae
FG
2379 }
2380 int r = ECUtil::decode(
2381 ec->sinfo,
2382 ec->ec_impl,
2383 to_decode,
2384 &bl);
2385 if (r < 0) {
2386 res.r = r;
2387 goto out;
2388 }
2389 bufferlist trimmed;
2390 trimmed.substr_of(
2391 bl,
2392 read.get<0>() - adjusted.first,
11fdf7f2 2393 std::min(read.get<1>(),
7c673cae
FG
2394 bl.length() - (read.get<0>() - adjusted.first)));
2395 result.insert(
2396 read.get<0>(), trimmed.length(), std::move(trimmed));
2397 res.returned.pop_front();
2398 }
2399out:
2400 status->complete_object(hoid, res.r, std::move(result));
2401 ec->kick_reads();
2402 }
2403};
2404
2405void ECBackend::objects_read_and_reconstruct(
2406 const map<hobject_t,
2407 std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
2408 > &reads,
2409 bool fast_read,
2410 GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
2411{
2412 in_progress_client_reads.emplace_back(
2413 reads.size(), std::move(func));
2414 if (!reads.size()) {
2415 kick_reads();
2416 return;
2417 }
2418
28e407b8 2419 map<hobject_t, set<int>> obj_want_to_read;
7c673cae
FG
2420 set<int> want_to_read;
2421 get_want_to_read_shards(&want_to_read);
2422
2423 map<hobject_t, read_request_t> for_read_op;
2424 for (auto &&to_read: reads) {
11fdf7f2 2425 map<pg_shard_t, vector<pair<int, int>>> shards;
7c673cae
FG
2426 int r = get_min_avail_to_read_shards(
2427 to_read.first,
2428 want_to_read,
2429 false,
2430 fast_read,
2431 &shards);
11fdf7f2 2432 ceph_assert(r == 0);
7c673cae
FG
2433
2434 CallClientContexts *c = new CallClientContexts(
2435 to_read.first,
2436 this,
2437 &(in_progress_client_reads.back()),
2438 to_read.second);
2439 for_read_op.insert(
2440 make_pair(
2441 to_read.first,
2442 read_request_t(
2443 to_read.second,
2444 shards,
2445 false,
2446 c)));
28e407b8 2447 obj_want_to_read.insert(make_pair(to_read.first, want_to_read));
7c673cae
FG
2448 }
2449
2450 start_read_op(
2451 CEPH_MSG_PRIO_DEFAULT,
28e407b8 2452 obj_want_to_read,
7c673cae
FG
2453 for_read_op,
2454 OpRequestRef(),
2455 fast_read, false);
2456 return;
2457}
2458
2459
2460int ECBackend::send_all_remaining_reads(
2461 const hobject_t &hoid,
2462 ReadOp &rop)
2463{
2464 set<int> already_read;
2465 const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
2466 for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
2467 already_read.insert(i->shard);
2468 dout(10) << __func__ << " have/error shards=" << already_read << dendl;
11fdf7f2 2469 map<pg_shard_t, vector<pair<int, int>>> shards;
28e407b8
AA
2470 int r = get_remaining_shards(hoid, already_read, rop.want_to_read[hoid],
2471 rop.complete[hoid], &shards, rop.for_recovery);
7c673cae
FG
2472 if (r)
2473 return r;
7c673cae 2474
7c673cae
FG
2475 list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
2476 rop.to_read.find(hoid)->second.to_read;
2477 GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
2478 rop.to_read.find(hoid)->second.cb;
2479
91327a77
AA
2480 // (Note cuixf) If we need to read attrs and we read failed, try to read again.
2481 bool want_attrs =
2482 rop.to_read.find(hoid)->second.want_attrs &&
2483 (!rop.complete[hoid].attrs || rop.complete[hoid].attrs->empty());
2484 if (want_attrs) {
2485 dout(10) << __func__ << " want attrs again" << dendl;
2486 }
2487
28e407b8
AA
2488 rop.to_read.erase(hoid);
2489 rop.to_read.insert(make_pair(
7c673cae
FG
2490 hoid,
2491 read_request_t(
2492 offsets,
2493 shards,
91327a77 2494 want_attrs,
7c673cae 2495 c)));
7c673cae
FG
2496 return 0;
2497}
2498
2499int ECBackend::objects_get_attrs(
2500 const hobject_t &hoid,
20effc67 2501 map<string, bufferlist, less<>> *out)
7c673cae
FG
2502{
2503 int r = store->getattrs(
2504 ch,
2505 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2506 *out);
2507 if (r < 0)
2508 return r;
2509
2510 for (map<string, bufferlist>::iterator i = out->begin();
2511 i != out->end();
2512 ) {
2513 if (ECUtil::is_hinfo_key_string(i->first))
2514 out->erase(i++);
2515 else
2516 ++i;
2517 }
2518 return r;
2519}
2520
2521void ECBackend::rollback_append(
2522 const hobject_t &hoid,
2523 uint64_t old_size,
2524 ObjectStore::Transaction *t)
2525{
11fdf7f2 2526 ceph_assert(old_size % sinfo.get_stripe_width() == 0);
7c673cae
FG
2527 t->truncate(
2528 coll,
2529 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2530 sinfo.aligned_logical_offset_to_chunk_offset(
2531 old_size));
2532}
2533
28e407b8 2534int ECBackend::be_deep_scrub(
7c673cae 2535 const hobject_t &poid,
28e407b8
AA
2536 ScrubMap &map,
2537 ScrubMapBuilder &pos,
2538 ScrubMap::object &o)
2539{
2540 dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
7c673cae 2541 int r;
28e407b8
AA
2542
2543 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
2544 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
2545
2546 utime_t sleeptime;
2547 sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
2548 if (sleeptime != utime_t()) {
2549 lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
2550 sleeptime.sleep();
2551 }
2552
2553 if (pos.data_pos == 0) {
2554 pos.data_hash = bufferhash(-1);
2555 }
2556
7c673cae
FG
2557 uint64_t stride = cct->_conf->osd_deep_scrub_stride;
2558 if (stride % sinfo.get_chunk_size())
2559 stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
7c673cae 2560
28e407b8
AA
2561 bufferlist bl;
2562 r = store->read(
2563 ch,
2564 ghobject_t(
2565 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2566 pos.data_pos,
2567 stride, bl,
2568 fadvise_flags);
2569 if (r < 0) {
2570 dout(20) << __func__ << " " << poid << " got "
2571 << r << " on read, read_error" << dendl;
2572 o.read_error = true;
2573 return 0;
7c673cae 2574 }
28e407b8
AA
2575 if (bl.length() % sinfo.get_chunk_size()) {
2576 dout(20) << __func__ << " " << poid << " got "
2577 << r << " on read, not chunk size " << sinfo.get_chunk_size() << " aligned"
2578 << dendl;
7c673cae 2579 o.read_error = true;
28e407b8
AA
2580 return 0;
2581 }
2582 if (r > 0) {
2583 pos.data_hash << bl;
2584 }
2585 pos.data_pos += r;
2586 if (r == (int)stride) {
2587 return -EINPROGRESS;
7c673cae
FG
2588 }
2589
2590 ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
2591 if (!hinfo) {
2592 dout(0) << "_scan_list " << poid << " could not retrieve hash info" << dendl;
2593 o.read_error = true;
2594 o.digest_present = false;
28e407b8 2595 return 0;
7c673cae
FG
2596 } else {
2597 if (!get_parent()->get_pool().allows_ecoverwrites()) {
a4b75251
TL
2598 if (!hinfo->has_chunk_hash()) {
2599 dout(0) << "_scan_list " << poid << " got invalid hash info" << dendl;
2600 o.ec_size_mismatch = true;
2601 return 0;
2602 }
28e407b8
AA
2603 if (hinfo->get_total_chunk_size() != (unsigned)pos.data_pos) {
2604 dout(0) << "_scan_list " << poid << " got incorrect size on read 0x"
2605 << std::hex << pos
2606 << " expected 0x" << hinfo->get_total_chunk_size() << std::dec
2607 << dendl;
7c673cae 2608 o.ec_size_mismatch = true;
28e407b8 2609 return 0;
7c673cae
FG
2610 }
2611
28e407b8
AA
2612 if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) !=
2613 pos.data_hash.digest()) {
2614 dout(0) << "_scan_list " << poid << " got incorrect hash on read 0x"
2615 << std::hex << pos.data_hash.digest() << " != expected 0x"
2616 << hinfo->get_chunk_hash(get_parent()->whoami_shard().shard)
2617 << std::dec << dendl;
7c673cae 2618 o.ec_hash_mismatch = true;
28e407b8 2619 return 0;
7c673cae
FG
2620 }
2621
2622 /* We checked above that we match our own stored hash. We cannot
2623 * send a hash of the actual object, so instead we simply send
2624 * our locally stored hash of shard 0 on the assumption that if
2625 * we match our chunk hash and our recollection of the hash for
2626 * chunk 0 matches that of our peers, there is likely no corruption.
2627 */
2628 o.digest = hinfo->get_chunk_hash(0);
2629 o.digest_present = true;
2630 } else {
2631 /* Hack! We must be using partial overwrites, and partial overwrites
2632 * don't support deep-scrub yet
2633 */
2634 o.digest = 0;
2635 o.digest_present = true;
2636 }
2637 }
2638
28e407b8 2639 o.omap_digest = -1;
7c673cae 2640 o.omap_digest_present = true;
28e407b8 2641 return 0;
7c673cae 2642}