]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/ECBackend.cc
update sources to 12.2.7
[ceph.git] / ceph / src / osd / ECBackend.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <iostream>
16#include <sstream>
17
18#include "ECBackend.h"
19#include "messages/MOSDPGPush.h"
20#include "messages/MOSDPGPushReply.h"
21#include "messages/MOSDECSubOpWrite.h"
22#include "messages/MOSDECSubOpWriteReply.h"
23#include "messages/MOSDECSubOpRead.h"
24#include "messages/MOSDECSubOpReadReply.h"
25#include "ECMsgTypes.h"
26
27#include "PrimaryLogPG.h"
28
29#define dout_context cct
30#define dout_subsys ceph_subsys_osd
31#define DOUT_PREFIX_ARGS this
32#undef dout_prefix
33#define dout_prefix _prefix(_dout, this)
34static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
35 return *_dout << pgb->get_parent()->gen_dbg_prefix();
36}
37
38struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
39 list<ECBackend::RecoveryOp> ops;
40};
41
42ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
43 switch (rhs.pipeline_state) {
44 case ECBackend::pipeline_state_t::CACHE_VALID:
45 return lhs << "CACHE_VALID";
46 case ECBackend::pipeline_state_t::CACHE_INVALID:
47 return lhs << "CACHE_INVALID";
48 default:
49 assert(0 == "invalid pipeline state");
50 }
51 return lhs; // unreachable
52}
53
54static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
55{
56 lhs << "[";
57 for (map<pg_shard_t, bufferlist>::const_iterator i = rhs.begin();
58 i != rhs.end();
59 ++i) {
60 if (i != rhs.begin())
61 lhs << ", ";
62 lhs << make_pair(i->first, i->second.length());
63 }
64 return lhs << "]";
65}
66
67static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
68{
69 lhs << "[";
70 for (map<int, bufferlist>::const_iterator i = rhs.begin();
71 i != rhs.end();
72 ++i) {
73 if (i != rhs.begin())
74 lhs << ", ";
75 lhs << make_pair(i->first, i->second.length());
76 }
77 return lhs << "]";
78}
79
80static ostream &operator<<(
81 ostream &lhs,
82 const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
83{
84 return lhs << "(" << rhs.get<0>() << ", "
85 << rhs.get<1>() << ", " << rhs.get<2>() << ")";
86}
87
88ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
89{
90 return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
91 << ", need=" << rhs.need
92 << ", want_attrs=" << rhs.want_attrs
93 << ")";
94}
95
96ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
97{
98 lhs << "read_result_t(r=" << rhs.r
99 << ", errors=" << rhs.errors;
100 if (rhs.attrs) {
101 lhs << ", attrs=" << rhs.attrs.get();
102 } else {
103 lhs << ", noattrs";
104 }
105 return lhs << ", returned=" << rhs.returned << ")";
106}
107
108ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
109{
110 lhs << "ReadOp(tid=" << rhs.tid;
111 if (rhs.op && rhs.op->get_req()) {
112 lhs << ", op=";
113 rhs.op->get_req()->print(lhs);
114 }
115 return lhs << ", to_read=" << rhs.to_read
116 << ", complete=" << rhs.complete
117 << ", priority=" << rhs.priority
118 << ", obj_to_source=" << rhs.obj_to_source
119 << ", source_to_obj=" << rhs.source_to_obj
120 << ", in_progress=" << rhs.in_progress << ")";
121}
122
123void ECBackend::ReadOp::dump(Formatter *f) const
124{
125 f->dump_unsigned("tid", tid);
126 if (op && op->get_req()) {
127 f->dump_stream("op") << *(op->get_req());
128 }
129 f->dump_stream("to_read") << to_read;
130 f->dump_stream("complete") << complete;
131 f->dump_int("priority", priority);
132 f->dump_stream("obj_to_source") << obj_to_source;
133 f->dump_stream("source_to_obj") << source_to_obj;
134 f->dump_stream("in_progress") << in_progress;
135}
136
137ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
138{
139 lhs << "Op(" << rhs.hoid
140 << " v=" << rhs.version
141 << " tt=" << rhs.trim_to
142 << " tid=" << rhs.tid
143 << " reqid=" << rhs.reqid;
144 if (rhs.client_op && rhs.client_op->get_req()) {
145 lhs << " client_op=";
146 rhs.client_op->get_req()->print(lhs);
147 }
148 lhs << " roll_forward_to=" << rhs.roll_forward_to
149 << " temp_added=" << rhs.temp_added
150 << " temp_cleared=" << rhs.temp_cleared
151 << " pending_read=" << rhs.pending_read
152 << " remote_read=" << rhs.remote_read
153 << " remote_read_result=" << rhs.remote_read_result
154 << " pending_apply=" << rhs.pending_apply
155 << " pending_commit=" << rhs.pending_commit
156 << " plan.to_read=" << rhs.plan.to_read
157 << " plan.will_write=" << rhs.plan.will_write
158 << ")";
159 return lhs;
160}
161
162ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
163{
164 return lhs << "RecoveryOp("
165 << "hoid=" << rhs.hoid
166 << " v=" << rhs.v
167 << " missing_on=" << rhs.missing_on
168 << " missing_on_shards=" << rhs.missing_on_shards
169 << " recovery_info=" << rhs.recovery_info
170 << " recovery_progress=" << rhs.recovery_progress
171 << " obc refcount=" << rhs.obc.use_count()
172 << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
173 << " waiting_on_pushes=" << rhs.waiting_on_pushes
174 << " extent_requested=" << rhs.extent_requested
175 << ")";
176}
177
178void ECBackend::RecoveryOp::dump(Formatter *f) const
179{
180 f->dump_stream("hoid") << hoid;
181 f->dump_stream("v") << v;
182 f->dump_stream("missing_on") << missing_on;
183 f->dump_stream("missing_on_shards") << missing_on_shards;
184 f->dump_stream("recovery_info") << recovery_info;
185 f->dump_stream("recovery_progress") << recovery_progress;
186 f->dump_stream("state") << tostr(state);
187 f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
188 f->dump_stream("extent_requested") << extent_requested;
189}
190
191ECBackend::ECBackend(
192 PGBackend::Listener *pg,
193 coll_t coll,
194 ObjectStore::CollectionHandle &ch,
195 ObjectStore *store,
196 CephContext *cct,
197 ErasureCodeInterfaceRef ec_impl,
198 uint64_t stripe_width)
199 : PGBackend(cct, pg, store, coll, ch),
200 ec_impl(ec_impl),
201 sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
202 assert((ec_impl->get_data_chunk_count() *
203 ec_impl->get_chunk_size(stripe_width)) == stripe_width);
204}
205
206PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
207{
208 return new ECRecoveryHandle;
209}
210
211void ECBackend::_failed_push(const hobject_t &hoid,
212 pair<RecoveryMessages *, ECBackend::read_result_t &> &in)
213{
214 ECBackend::read_result_t &res = in.second;
215 dout(10) << __func__ << ": Read error " << hoid << " r="
216 << res.r << " errors=" << res.errors << dendl;
217 dout(10) << __func__ << ": canceling recovery op for obj " << hoid
218 << dendl;
219 assert(recovery_ops.count(hoid));
b32b8144 220 eversion_t v = recovery_ops[hoid].v;
7c673cae
FG
221 recovery_ops.erase(hoid);
222
223 list<pg_shard_t> fl;
224 for (auto&& i : res.errors) {
225 fl.push_back(i.first);
226 }
227 get_parent()->failed_push(fl, hoid);
b32b8144
FG
228 get_parent()->backfill_add_missing(hoid, v);
229 get_parent()->finish_degraded_object(hoid);
7c673cae
FG
230}
231
232struct OnRecoveryReadComplete :
233 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
234 ECBackend *pg;
235 hobject_t hoid;
236 set<int> want;
237 OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid)
238 : pg(pg), hoid(hoid) {}
239 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
240 ECBackend::read_result_t &res = in.second;
241 if (!(res.r == 0 && res.errors.empty())) {
242 pg->_failed_push(hoid, in);
243 return;
244 }
245 assert(res.returned.size() == 1);
246 pg->handle_recovery_read_complete(
247 hoid,
248 res.returned.back(),
249 res.attrs,
250 in.first);
251 }
252};
253
254struct RecoveryMessages {
255 map<hobject_t,
256 ECBackend::read_request_t> reads;
28e407b8 257 map<hobject_t, set<int>> want_to_read;
7c673cae
FG
258 void read(
259 ECBackend *ec,
260 const hobject_t &hoid, uint64_t off, uint64_t len,
28e407b8 261 set<int> &&_want_to_read,
7c673cae
FG
262 const set<pg_shard_t> &need,
263 bool attrs) {
264 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
265 to_read.push_back(boost::make_tuple(off, len, 0));
266 assert(!reads.count(hoid));
28e407b8 267 want_to_read.insert(make_pair(hoid, std::move(_want_to_read)));
7c673cae
FG
268 reads.insert(
269 make_pair(
270 hoid,
271 ECBackend::read_request_t(
272 to_read,
273 need,
274 attrs,
275 new OnRecoveryReadComplete(
276 ec,
277 hoid))));
278 }
279
280 map<pg_shard_t, vector<PushOp> > pushes;
281 map<pg_shard_t, vector<PushReplyOp> > push_replies;
282 ObjectStore::Transaction t;
283 RecoveryMessages() {}
284 ~RecoveryMessages(){}
285};
286
287void ECBackend::handle_recovery_push(
288 const PushOp &op,
289 RecoveryMessages *m)
290{
291 ostringstream ss;
292 if (get_parent()->check_failsafe_full(ss)) {
293 dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl;
294 ceph_abort();
295 }
296
297 bool oneshot = op.before_progress.first && op.after_progress.data_complete;
298 ghobject_t tobj;
299 if (oneshot) {
300 tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
301 get_parent()->whoami_shard().shard);
302 } else {
303 tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
304 op.version),
305 ghobject_t::NO_GEN,
306 get_parent()->whoami_shard().shard);
307 if (op.before_progress.first) {
308 dout(10) << __func__ << ": Adding oid "
309 << tobj.hobj << " in the temp collection" << dendl;
310 add_temp_obj(tobj.hobj);
311 }
312 }
313
314 if (op.before_progress.first) {
315 m->t.remove(coll, tobj);
316 m->t.touch(coll, tobj);
317 }
318
319 if (!op.data_included.empty()) {
320 uint64_t start = op.data_included.range_start();
321 uint64_t end = op.data_included.range_end();
322 assert(op.data.length() == (end - start));
323
324 m->t.write(
325 coll,
326 tobj,
327 start,
328 op.data.length(),
329 op.data);
330 } else {
331 assert(op.data.length() == 0);
332 }
333
334 if (op.before_progress.first) {
335 assert(op.attrset.count(string("_")));
336 m->t.setattrs(
337 coll,
338 tobj,
339 op.attrset);
340 }
341
342 if (op.after_progress.data_complete && !oneshot) {
343 dout(10) << __func__ << ": Removing oid "
344 << tobj.hobj << " from the temp collection" << dendl;
345 clear_temp_obj(tobj.hobj);
346 m->t.remove(coll, ghobject_t(
347 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
348 m->t.collection_move_rename(
349 coll, tobj,
350 coll, ghobject_t(
351 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
352 }
353 if (op.after_progress.data_complete) {
354 if ((get_parent()->pgb_is_primary())) {
355 assert(recovery_ops.count(op.soid));
356 assert(recovery_ops[op.soid].obc);
357 get_parent()->on_local_recover(
358 op.soid,
359 op.recovery_info,
360 recovery_ops[op.soid].obc,
c07f9fc5 361 false,
7c673cae
FG
362 &m->t);
363 } else {
364 get_parent()->on_local_recover(
365 op.soid,
366 op.recovery_info,
367 ObjectContextRef(),
c07f9fc5 368 false,
7c673cae
FG
369 &m->t);
370 }
371 }
372 m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
373 m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
374}
375
376void ECBackend::handle_recovery_push_reply(
377 const PushReplyOp &op,
378 pg_shard_t from,
379 RecoveryMessages *m)
380{
381 if (!recovery_ops.count(op.soid))
382 return;
383 RecoveryOp &rop = recovery_ops[op.soid];
384 assert(rop.waiting_on_pushes.count(from));
385 rop.waiting_on_pushes.erase(from);
386 continue_recovery_op(rop, m);
387}
388
389void ECBackend::handle_recovery_read_complete(
390 const hobject_t &hoid,
391 boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
392 boost::optional<map<string, bufferlist> > attrs,
393 RecoveryMessages *m)
394{
395 dout(10) << __func__ << ": returned " << hoid << " "
396 << "(" << to_read.get<0>()
397 << ", " << to_read.get<1>()
398 << ", " << to_read.get<2>()
399 << ")"
400 << dendl;
401 assert(recovery_ops.count(hoid));
402 RecoveryOp &op = recovery_ops[hoid];
403 assert(op.returned_data.empty());
404 map<int, bufferlist*> target;
405 for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
406 i != op.missing_on_shards.end();
407 ++i) {
408 target[*i] = &(op.returned_data[*i]);
409 }
410 map<int, bufferlist> from;
411 for(map<pg_shard_t, bufferlist>::iterator i = to_read.get<2>().begin();
412 i != to_read.get<2>().end();
413 ++i) {
414 from[i->first.shard].claim(i->second);
415 }
416 dout(10) << __func__ << ": " << from << dendl;
417 int r = ECUtil::decode(sinfo, ec_impl, from, target);
418 assert(r == 0);
419 if (attrs) {
420 op.xattrs.swap(*attrs);
421
422 if (!op.obc) {
423 // attrs only reference the origin bufferlist (decode from
424 // ECSubReadReply message) whose size is much greater than attrs
425 // in recovery. If obc cache it (get_obc maybe cache the attr),
426 // this causes the whole origin bufferlist would not be free
427 // until obc is evicted from obc cache. So rebuild the
428 // bufferlist before cache it.
429 for (map<string, bufferlist>::iterator it = op.xattrs.begin();
430 it != op.xattrs.end();
431 ++it) {
432 it->second.rebuild();
433 }
434 // Need to remove ECUtil::get_hinfo_key() since it should not leak out
435 // of the backend (see bug #12983)
436 map<string, bufferlist> sanitized_attrs(op.xattrs);
437 sanitized_attrs.erase(ECUtil::get_hinfo_key());
438 op.obc = get_parent()->get_obc(hoid, sanitized_attrs);
439 assert(op.obc);
440 op.recovery_info.size = op.obc->obs.oi.size;
441 op.recovery_info.oi = op.obc->obs.oi;
442 }
443
444 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
445 if (op.obc->obs.oi.size > 0) {
446 assert(op.xattrs.count(ECUtil::get_hinfo_key()));
447 bufferlist::iterator bp = op.xattrs[ECUtil::get_hinfo_key()].begin();
448 ::decode(hinfo, bp);
449 }
450 op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
451 }
452 assert(op.xattrs.size());
453 assert(op.obc);
454 continue_recovery_op(op, m);
455}
456
457struct SendPushReplies : public Context {
458 PGBackend::Listener *l;
459 epoch_t epoch;
460 map<int, MOSDPGPushReply*> replies;
461 SendPushReplies(
462 PGBackend::Listener *l,
463 epoch_t epoch,
464 map<int, MOSDPGPushReply*> &in) : l(l), epoch(epoch) {
465 replies.swap(in);
466 }
467 void finish(int) override {
468 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
469 i != replies.end();
470 ++i) {
471 l->send_message_osd_cluster(i->first, i->second, epoch);
472 }
473 replies.clear();
474 }
475 ~SendPushReplies() override {
476 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
477 i != replies.end();
478 ++i) {
479 i->second->put();
480 }
481 replies.clear();
482 }
483};
484
485void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
486{
487 for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
488 i != m.pushes.end();
489 m.pushes.erase(i++)) {
490 MOSDPGPush *msg = new MOSDPGPush();
491 msg->set_priority(priority);
492 msg->map_epoch = get_parent()->get_epoch();
493 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
494 msg->from = get_parent()->whoami_shard();
495 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
496 msg->pushes.swap(i->second);
497 msg->compute_cost(cct);
498 get_parent()->send_message(
499 i->first.osd,
500 msg);
501 }
502 map<int, MOSDPGPushReply*> replies;
503 for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
504 m.push_replies.begin();
505 i != m.push_replies.end();
506 m.push_replies.erase(i++)) {
507 MOSDPGPushReply *msg = new MOSDPGPushReply();
508 msg->set_priority(priority);
509 msg->map_epoch = get_parent()->get_epoch();
510 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
511 msg->from = get_parent()->whoami_shard();
512 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
513 msg->replies.swap(i->second);
514 msg->compute_cost(cct);
515 replies.insert(make_pair(i->first.osd, msg));
516 }
517
518 if (!replies.empty()) {
519 (m.t).register_on_complete(
520 get_parent()->bless_context(
521 new SendPushReplies(
522 get_parent(),
523 get_parent()->get_epoch(),
524 replies)));
525 get_parent()->queue_transaction(std::move(m.t));
526 }
527
528 if (m.reads.empty())
529 return;
530 start_read_op(
531 priority,
28e407b8 532 m.want_to_read,
7c673cae
FG
533 m.reads,
534 OpRequestRef(),
535 false, true);
536}
537
538void ECBackend::continue_recovery_op(
539 RecoveryOp &op,
540 RecoveryMessages *m)
541{
542 dout(10) << __func__ << ": continuing " << op << dendl;
543 while (1) {
544 switch (op.state) {
545 case RecoveryOp::IDLE: {
546 // start read
547 op.state = RecoveryOp::READING;
548 assert(!op.recovery_progress.data_complete);
549 set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
550 uint64_t from = op.recovery_progress.data_recovered_to;
551 uint64_t amount = get_recovery_chunk_size();
552
553 if (op.recovery_progress.first && op.obc) {
554 /* We've got the attrs and the hinfo, might as well use them */
555 op.hinfo = get_hash_info(op.hoid);
556 assert(op.hinfo);
557 op.xattrs = op.obc->attr_cache;
558 ::encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
559 }
560
561 set<pg_shard_t> to_read;
562 int r = get_min_avail_to_read_shards(
563 op.hoid, want, true, false, &to_read);
564 if (r != 0) {
565 // we must have lost a recovery source
566 assert(!op.recovery_progress.first);
567 dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
568 << dendl;
569 get_parent()->cancel_pull(op.hoid);
570 recovery_ops.erase(op.hoid);
571 return;
572 }
573 m->read(
574 this,
575 op.hoid,
576 op.recovery_progress.data_recovered_to,
577 amount,
28e407b8 578 std::move(want),
7c673cae
FG
579 to_read,
580 op.recovery_progress.first && !op.obc);
581 op.extent_requested = make_pair(
582 from,
583 amount);
584 dout(10) << __func__ << ": IDLE return " << op << dendl;
585 return;
586 }
587 case RecoveryOp::READING: {
588 // read completed, start write
589 assert(op.xattrs.size());
590 assert(op.returned_data.size());
591 op.state = RecoveryOp::WRITING;
592 ObjectRecoveryProgress after_progress = op.recovery_progress;
593 after_progress.data_recovered_to += op.extent_requested.second;
594 after_progress.first = false;
595 if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
596 after_progress.data_recovered_to =
597 sinfo.logical_to_next_stripe_offset(
598 op.obc->obs.oi.size);
599 after_progress.data_complete = true;
600 }
601 for (set<pg_shard_t>::iterator mi = op.missing_on.begin();
602 mi != op.missing_on.end();
603 ++mi) {
604 assert(op.returned_data.count(mi->shard));
605 m->pushes[*mi].push_back(PushOp());
606 PushOp &pop = m->pushes[*mi].back();
607 pop.soid = op.hoid;
608 pop.version = op.v;
609 pop.data = op.returned_data[mi->shard];
610 dout(10) << __func__ << ": before_progress=" << op.recovery_progress
611 << ", after_progress=" << after_progress
612 << ", pop.data.length()=" << pop.data.length()
613 << ", size=" << op.obc->obs.oi.size << dendl;
614 assert(
615 pop.data.length() ==
616 sinfo.aligned_logical_offset_to_chunk_offset(
617 after_progress.data_recovered_to -
618 op.recovery_progress.data_recovered_to)
619 );
620 if (pop.data.length())
621 pop.data_included.insert(
622 sinfo.aligned_logical_offset_to_chunk_offset(
623 op.recovery_progress.data_recovered_to),
624 pop.data.length()
625 );
626 if (op.recovery_progress.first) {
627 pop.attrset = op.xattrs;
628 }
629 pop.recovery_info = op.recovery_info;
630 pop.before_progress = op.recovery_progress;
631 pop.after_progress = after_progress;
632 if (*mi != get_parent()->primary_shard())
633 get_parent()->begin_peer_recover(
634 *mi,
635 op.hoid);
636 }
637 op.returned_data.clear();
638 op.waiting_on_pushes = op.missing_on;
639 op.recovery_progress = after_progress;
640 dout(10) << __func__ << ": READING return " << op << dendl;
641 return;
642 }
643 case RecoveryOp::WRITING: {
644 if (op.waiting_on_pushes.empty()) {
645 if (op.recovery_progress.data_complete) {
646 op.state = RecoveryOp::COMPLETE;
647 for (set<pg_shard_t>::iterator i = op.missing_on.begin();
648 i != op.missing_on.end();
649 ++i) {
650 if (*i != get_parent()->primary_shard()) {
651 dout(10) << __func__ << ": on_peer_recover on " << *i
652 << ", obj " << op.hoid << dendl;
653 get_parent()->on_peer_recover(
654 *i,
655 op.hoid,
656 op.recovery_info);
657 }
658 }
659 object_stat_sum_t stat;
660 stat.num_bytes_recovered = op.recovery_info.size;
661 stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
662 stat.num_objects_recovered = 1;
c07f9fc5 663 get_parent()->on_global_recover(op.hoid, stat, false);
7c673cae
FG
664 dout(10) << __func__ << ": WRITING return " << op << dendl;
665 recovery_ops.erase(op.hoid);
666 return;
667 } else {
668 op.state = RecoveryOp::IDLE;
669 dout(10) << __func__ << ": WRITING continue " << op << dendl;
670 continue;
671 }
672 }
673 return;
674 }
675 // should never be called once complete
676 case RecoveryOp::COMPLETE:
677 default: {
678 ceph_abort();
679 };
680 }
681 }
682}
683
684void ECBackend::run_recovery_op(
685 RecoveryHandle *_h,
686 int priority)
687{
688 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
689 RecoveryMessages m;
690 for (list<RecoveryOp>::iterator i = h->ops.begin();
691 i != h->ops.end();
692 ++i) {
693 dout(10) << __func__ << ": starting " << *i << dendl;
694 assert(!recovery_ops.count(i->hoid));
695 RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
696 continue_recovery_op(op, &m);
697 }
c07f9fc5 698
7c673cae 699 dispatch_recovery_messages(m, priority);
c07f9fc5 700 send_recovery_deletes(priority, h->deletes);
7c673cae
FG
701 delete _h;
702}
703
224ce89b 704int ECBackend::recover_object(
7c673cae
FG
705 const hobject_t &hoid,
706 eversion_t v,
707 ObjectContextRef head,
708 ObjectContextRef obc,
709 RecoveryHandle *_h)
710{
711 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
712 h->ops.push_back(RecoveryOp());
713 h->ops.back().v = v;
714 h->ops.back().hoid = hoid;
715 h->ops.back().obc = obc;
716 h->ops.back().recovery_info.soid = hoid;
717 h->ops.back().recovery_info.version = v;
718 if (obc) {
719 h->ops.back().recovery_info.size = obc->obs.oi.size;
720 h->ops.back().recovery_info.oi = obc->obs.oi;
721 }
722 if (hoid.is_snap()) {
723 if (obc) {
724 assert(obc->ssc);
725 h->ops.back().recovery_info.ss = obc->ssc->snapset;
726 } else if (head) {
727 assert(head->ssc);
728 h->ops.back().recovery_info.ss = head->ssc->snapset;
729 } else {
730 assert(0 == "neither obc nor head set for a snap object");
731 }
732 }
733 h->ops.back().recovery_progress.omap_complete = true;
734 for (set<pg_shard_t>::const_iterator i =
735 get_parent()->get_actingbackfill_shards().begin();
736 i != get_parent()->get_actingbackfill_shards().end();
737 ++i) {
738 dout(10) << "checking " << *i << dendl;
739 if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
740 h->ops.back().missing_on.insert(*i);
741 h->ops.back().missing_on_shards.insert(i->shard);
742 }
743 }
744 dout(10) << __func__ << ": built op " << h->ops.back() << dendl;
224ce89b 745 return 0;
7c673cae
FG
746}
747
748bool ECBackend::can_handle_while_inactive(
749 OpRequestRef _op)
750{
751 return false;
752}
753
c07f9fc5 754bool ECBackend::_handle_message(
7c673cae
FG
755 OpRequestRef _op)
756{
757 dout(10) << __func__ << ": " << *_op->get_req() << dendl;
758 int priority = _op->get_req()->get_priority();
759 switch (_op->get_req()->get_type()) {
760 case MSG_OSD_EC_WRITE: {
761 // NOTE: this is non-const because handle_sub_write modifies the embedded
762 // ObjectStore::Transaction in place (and then std::move's it). It does
763 // not conflict with ECSubWrite's operator<<.
764 MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
765 _op->get_nonconst_req());
28e407b8 766 parent->maybe_preempt_replica_scrub(op->op.soid);
7c673cae
FG
767 handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
768 return true;
769 }
770 case MSG_OSD_EC_WRITE_REPLY: {
771 const MOSDECSubOpWriteReply *op = static_cast<const MOSDECSubOpWriteReply*>(
772 _op->get_req());
773 handle_sub_write_reply(op->op.from, op->op, _op->pg_trace);
774 return true;
775 }
776 case MSG_OSD_EC_READ: {
777 const MOSDECSubOpRead *op = static_cast<const MOSDECSubOpRead*>(_op->get_req());
778 MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
779 reply->pgid = get_parent()->primary_spg_t();
780 reply->map_epoch = get_parent()->get_epoch();
781 reply->min_epoch = get_parent()->get_interval_start_epoch();
782 handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace);
783 reply->trace = _op->pg_trace;
784 get_parent()->send_message_osd_cluster(
785 op->op.from.osd, reply, get_parent()->get_epoch());
786 return true;
787 }
788 case MSG_OSD_EC_READ_REPLY: {
789 // NOTE: this is non-const because handle_sub_read_reply steals resulting
790 // buffers. It does not conflict with ECSubReadReply operator<<.
791 MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
792 _op->get_nonconst_req());
793 RecoveryMessages rm;
794 handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
795 dispatch_recovery_messages(rm, priority);
796 return true;
797 }
798 case MSG_OSD_PG_PUSH: {
799 const MOSDPGPush *op = static_cast<const MOSDPGPush *>(_op->get_req());
800 RecoveryMessages rm;
801 for (vector<PushOp>::const_iterator i = op->pushes.begin();
802 i != op->pushes.end();
803 ++i) {
804 handle_recovery_push(*i, &rm);
805 }
806 dispatch_recovery_messages(rm, priority);
807 return true;
808 }
809 case MSG_OSD_PG_PUSH_REPLY: {
810 const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
811 _op->get_req());
812 RecoveryMessages rm;
813 for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
814 i != op->replies.end();
815 ++i) {
816 handle_recovery_push_reply(*i, op->from, &rm);
817 }
818 dispatch_recovery_messages(rm, priority);
819 return true;
820 }
821 default:
822 return false;
823 }
824 return false;
825}
826
827struct SubWriteCommitted : public Context {
828 ECBackend *pg;
829 OpRequestRef msg;
830 ceph_tid_t tid;
831 eversion_t version;
832 eversion_t last_complete;
833 const ZTracer::Trace trace;
834 SubWriteCommitted(
835 ECBackend *pg,
836 OpRequestRef msg,
837 ceph_tid_t tid,
838 eversion_t version,
839 eversion_t last_complete,
840 const ZTracer::Trace &trace)
841 : pg(pg), msg(msg), tid(tid),
842 version(version), last_complete(last_complete), trace(trace) {}
843 void finish(int) override {
844 if (msg)
845 msg->mark_event("sub_op_committed");
846 pg->sub_write_committed(tid, version, last_complete, trace);
847 }
848};
849void ECBackend::sub_write_committed(
850 ceph_tid_t tid, eversion_t version, eversion_t last_complete,
851 const ZTracer::Trace &trace) {
852 if (get_parent()->pgb_is_primary()) {
853 ECSubWriteReply reply;
854 reply.tid = tid;
855 reply.last_complete = last_complete;
856 reply.committed = true;
857 reply.from = get_parent()->whoami_shard();
858 handle_sub_write_reply(
859 get_parent()->whoami_shard(),
860 reply, trace);
861 } else {
862 get_parent()->update_last_complete_ondisk(last_complete);
863 MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
864 r->pgid = get_parent()->primary_spg_t();
865 r->map_epoch = get_parent()->get_epoch();
866 r->min_epoch = get_parent()->get_interval_start_epoch();
867 r->op.tid = tid;
868 r->op.last_complete = last_complete;
869 r->op.committed = true;
870 r->op.from = get_parent()->whoami_shard();
871 r->set_priority(CEPH_MSG_PRIO_HIGH);
872 r->trace = trace;
873 r->trace.event("sending sub op commit");
874 get_parent()->send_message_osd_cluster(
875 get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
876 }
877}
878
879struct SubWriteApplied : public Context {
880 ECBackend *pg;
881 OpRequestRef msg;
882 ceph_tid_t tid;
883 eversion_t version;
884 const ZTracer::Trace trace;
885 SubWriteApplied(
886 ECBackend *pg,
887 OpRequestRef msg,
888 ceph_tid_t tid,
889 eversion_t version,
890 const ZTracer::Trace &trace)
891 : pg(pg), msg(msg), tid(tid), version(version), trace(trace) {}
892 void finish(int) override {
893 if (msg)
894 msg->mark_event("sub_op_applied");
895 pg->sub_write_applied(tid, version, trace);
896 }
897};
898void ECBackend::sub_write_applied(
899 ceph_tid_t tid, eversion_t version,
900 const ZTracer::Trace &trace) {
901 parent->op_applied(version);
902 if (get_parent()->pgb_is_primary()) {
903 ECSubWriteReply reply;
904 reply.from = get_parent()->whoami_shard();
905 reply.tid = tid;
906 reply.applied = true;
907 handle_sub_write_reply(
908 get_parent()->whoami_shard(),
909 reply, trace);
910 } else {
911 MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
912 r->pgid = get_parent()->primary_spg_t();
913 r->map_epoch = get_parent()->get_epoch();
914 r->min_epoch = get_parent()->get_interval_start_epoch();
915 r->op.from = get_parent()->whoami_shard();
916 r->op.tid = tid;
917 r->op.applied = true;
918 r->set_priority(CEPH_MSG_PRIO_HIGH);
919 r->trace = trace;
920 r->trace.event("sending sub op apply");
921 get_parent()->send_message_osd_cluster(
922 get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
923 }
924}
925
926void ECBackend::handle_sub_write(
927 pg_shard_t from,
928 OpRequestRef msg,
929 ECSubWrite &op,
930 const ZTracer::Trace &trace,
931 Context *on_local_applied_sync)
932{
933 if (msg)
934 msg->mark_started();
935 trace.event("handle_sub_write");
936 assert(!get_parent()->get_log().get_missing().is_missing(op.soid));
937 if (!get_parent()->pgb_is_primary())
938 get_parent()->update_stats(op.stats);
939 ObjectStore::Transaction localt;
940 if (!op.temp_added.empty()) {
941 add_temp_objs(op.temp_added);
942 }
943 if (op.backfill) {
944 for (set<hobject_t>::iterator i = op.temp_removed.begin();
945 i != op.temp_removed.end();
946 ++i) {
947 dout(10) << __func__ << ": removing object " << *i
948 << " since we won't get the transaction" << dendl;
949 localt.remove(
950 coll,
951 ghobject_t(
952 *i,
953 ghobject_t::NO_GEN,
954 get_parent()->whoami_shard().shard));
955 }
956 }
957 clear_temp_objs(op.temp_removed);
958 get_parent()->log_operation(
959 op.log_entries,
960 op.updated_hit_set_history,
961 op.trim_to,
962 op.roll_forward_to,
963 !op.backfill,
964 localt);
965
966 PrimaryLogPG *_rPG = dynamic_cast<PrimaryLogPG *>(get_parent());
967 if (_rPG && !_rPG->is_undersized() &&
968 (unsigned)get_parent()->whoami_shard().shard >= ec_impl->get_data_chunk_count())
969 op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
970
971 if (on_local_applied_sync) {
972 dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync << dendl;
973 localt.register_on_applied_sync(on_local_applied_sync);
974 }
975 localt.register_on_commit(
976 get_parent()->bless_context(
977 new SubWriteCommitted(
978 this, msg, op.tid,
979 op.at_version,
980 get_parent()->get_info().last_complete, trace)));
981 localt.register_on_applied(
982 get_parent()->bless_context(
983 new SubWriteApplied(this, msg, op.tid, op.at_version, trace)));
984 vector<ObjectStore::Transaction> tls;
985 tls.reserve(2);
986 tls.push_back(std::move(op.t));
987 tls.push_back(std::move(localt));
988 get_parent()->queue_transactions(tls, msg);
989}
990
991void ECBackend::handle_sub_read(
992 pg_shard_t from,
993 const ECSubRead &op,
994 ECSubReadReply *reply,
995 const ZTracer::Trace &trace)
996{
997 trace.event("handle sub read");
998 shard_id_t shard = get_parent()->whoami_shard().shard;
999 for(auto i = op.to_read.begin();
1000 i != op.to_read.end();
1001 ++i) {
1002 int r = 0;
1003 ECUtil::HashInfoRef hinfo;
1004 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1005 hinfo = get_hash_info(i->first);
1006 if (!hinfo) {
1007 r = -EIO;
c07f9fc5
FG
1008 get_parent()->clog_error() << "Corruption detected: object " << i->first
1009 << " is missing hash_info";
7c673cae
FG
1010 dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
1011 goto error;
1012 }
1013 }
1014 for (auto j = i->second.begin(); j != i->second.end(); ++j) {
1015 bufferlist bl;
1016 r = store->read(
1017 ch,
1018 ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1019 j->get<0>(),
1020 j->get<1>(),
224ce89b 1021 bl, j->get<2>());
7c673cae 1022 if (r < 0) {
c07f9fc5
FG
1023 get_parent()->clog_error() << "Error " << r
1024 << " reading object "
7c673cae
FG
1025 << i->first;
1026 dout(5) << __func__ << ": Error " << r
1027 << " reading " << i->first << dendl;
1028 goto error;
1029 } else {
1030 dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
1031 reply->buffers_read[i->first].push_back(
1032 make_pair(
1033 j->get<0>(),
1034 bl)
1035 );
1036 }
1037
1038 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1039 // This shows that we still need deep scrub because large enough files
1040 // are read in sections, so the digest check here won't be done here.
1041 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1042 // the state of our chunk in case other chunks could substitute.
1043 assert(hinfo->has_chunk_hash());
1044 if ((bl.length() == hinfo->get_total_chunk_size()) &&
1045 (j->get<0>() == 0)) {
1046 dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
1047 bufferhash h(-1);
1048 h << bl;
1049 if (h.digest() != hinfo->get_chunk_hash(shard)) {
c07f9fc5 1050 get_parent()->clog_error() << "Bad hash for " << i->first << " digest 0x"
7c673cae
FG
1051 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec;
1052 dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
1053 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
1054 r = -EIO;
1055 goto error;
1056 }
1057 }
1058 }
1059 }
1060 continue;
1061error:
1062 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1063 // the state of our chunk in case other chunks could substitute.
1064 reply->buffers_read.erase(i->first);
1065 reply->errors[i->first] = r;
1066 }
1067 for (set<hobject_t>::iterator i = op.attrs_to_read.begin();
1068 i != op.attrs_to_read.end();
1069 ++i) {
1070 dout(10) << __func__ << ": fulfilling attr request on "
1071 << *i << dendl;
1072 if (reply->errors.count(*i))
1073 continue;
1074 int r = store->getattrs(
1075 ch,
1076 ghobject_t(
1077 *i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1078 reply->attrs_read[*i]);
1079 if (r < 0) {
1080 reply->buffers_read.erase(*i);
1081 reply->errors[*i] = r;
1082 }
1083 }
1084 reply->from = get_parent()->whoami_shard();
1085 reply->tid = op.tid;
1086}
1087
1088void ECBackend::handle_sub_write_reply(
1089 pg_shard_t from,
1090 const ECSubWriteReply &op,
1091 const ZTracer::Trace &trace)
1092{
1093 map<ceph_tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
1094 assert(i != tid_to_op_map.end());
1095 if (op.committed) {
1096 trace.event("sub write committed");
1097 assert(i->second.pending_commit.count(from));
1098 i->second.pending_commit.erase(from);
1099 if (from != get_parent()->whoami_shard()) {
1100 get_parent()->update_peer_last_complete_ondisk(from, op.last_complete);
1101 }
1102 }
1103 if (op.applied) {
1104 trace.event("sub write applied");
1105 assert(i->second.pending_apply.count(from));
1106 i->second.pending_apply.erase(from);
1107 }
1108
1109 if (i->second.pending_apply.empty() && i->second.on_all_applied) {
1110 dout(10) << __func__ << " Calling on_all_applied on " << i->second << dendl;
1111 i->second.on_all_applied->complete(0);
1112 i->second.on_all_applied = 0;
1113 i->second.trace.event("ec write all applied");
1114 }
1115 if (i->second.pending_commit.empty() && i->second.on_all_commit) {
1116 dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
1117 i->second.on_all_commit->complete(0);
1118 i->second.on_all_commit = 0;
1119 i->second.trace.event("ec write all committed");
1120 }
1121 check_ops();
1122}
1123
1124void ECBackend::handle_sub_read_reply(
1125 pg_shard_t from,
1126 ECSubReadReply &op,
1127 RecoveryMessages *m,
1128 const ZTracer::Trace &trace)
1129{
1130 trace.event("ec sub read reply");
1131 dout(10) << __func__ << ": reply " << op << dendl;
1132 map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
1133 if (iter == tid_to_read_map.end()) {
1134 //canceled
1135 dout(20) << __func__ << ": dropped " << op << dendl;
1136 return;
1137 }
1138 ReadOp &rop = iter->second;
1139 for (auto i = op.buffers_read.begin();
1140 i != op.buffers_read.end();
1141 ++i) {
1142 assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer
1143 if (!rop.to_read.count(i->first)) {
1144 // We canceled this read! @see filter_read_op
1145 dout(20) << __func__ << " to_read skipping" << dendl;
1146 continue;
1147 }
1148 list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter =
1149 rop.to_read.find(i->first)->second.to_read.begin();
1150 list<
1151 boost::tuple<
1152 uint64_t, uint64_t, map<pg_shard_t, bufferlist> > >::iterator riter =
1153 rop.complete[i->first].returned.begin();
1154 for (list<pair<uint64_t, bufferlist> >::iterator j = i->second.begin();
1155 j != i->second.end();
1156 ++j, ++req_iter, ++riter) {
1157 assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
1158 assert(riter != rop.complete[i->first].returned.end());
1159 pair<uint64_t, uint64_t> adjusted =
1160 sinfo.aligned_offset_len_to_chunk(
1161 make_pair(req_iter->get<0>(), req_iter->get<1>()));
1162 assert(adjusted.first == j->first);
1163 riter->get<2>()[from].claim(j->second);
1164 }
1165 }
1166 for (auto i = op.attrs_read.begin();
1167 i != op.attrs_read.end();
1168 ++i) {
1169 assert(!op.errors.count(i->first)); // if read error better not have sent an attribute
1170 if (!rop.to_read.count(i->first)) {
1171 // We canceled this read! @see filter_read_op
1172 dout(20) << __func__ << " to_read skipping" << dendl;
1173 continue;
1174 }
1175 rop.complete[i->first].attrs = map<string, bufferlist>();
1176 (*(rop.complete[i->first].attrs)).swap(i->second);
1177 }
1178 for (auto i = op.errors.begin();
1179 i != op.errors.end();
1180 ++i) {
1181 rop.complete[i->first].errors.insert(
1182 make_pair(
1183 from,
1184 i->second));
1185 dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl;
1186 }
1187
1188 map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
1189 shard_to_read_map.find(from);
1190 assert(siter != shard_to_read_map.end());
1191 assert(siter->second.count(op.tid));
1192 siter->second.erase(op.tid);
1193
1194 assert(rop.in_progress.count(from));
1195 rop.in_progress.erase(from);
1196 unsigned is_complete = 0;
1197 // For redundant reads check for completion as each shard comes in,
1198 // or in a non-recovery read check for completion once all the shards read.
b32b8144 1199 if (rop.do_redundant_reads || rop.in_progress.empty()) {
7c673cae
FG
1200 for (map<hobject_t, read_result_t>::const_iterator iter =
1201 rop.complete.begin();
1202 iter != rop.complete.end();
1203 ++iter) {
1204 set<int> have;
1205 for (map<pg_shard_t, bufferlist>::const_iterator j =
1206 iter->second.returned.front().get<2>().begin();
1207 j != iter->second.returned.front().get<2>().end();
1208 ++j) {
1209 have.insert(j->first.shard);
1210 dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
1211 }
28e407b8 1212 set<int> dummy_minimum;
7c673cae 1213 int err;
28e407b8 1214 if ((err = ec_impl->minimum_to_decode(rop.want_to_read[iter->first], have, &dummy_minimum)) < 0) {
7c673cae
FG
1215 dout(20) << __func__ << " minimum_to_decode failed" << dendl;
1216 if (rop.in_progress.empty()) {
1217 // If we don't have enough copies and we haven't sent reads for all shards
1218 // we can send the rest of the reads, if any.
1219 if (!rop.do_redundant_reads) {
1220 int r = send_all_remaining_reads(iter->first, rop);
1221 if (r == 0) {
1222 // We added to in_progress and not incrementing is_complete
1223 continue;
1224 }
1225 // Couldn't read any additional shards so handle as completed with errors
1226 }
c07f9fc5
FG
1227 // We don't want to confuse clients / RBD with objectstore error
1228 // values in particular ENOENT. We may have different error returns
1229 // from different shards, so we'll return minimum_to_decode() error
1230 // (usually EIO) to reader. It is likely an error here is due to a
1231 // damaged pg.
7c673cae
FG
1232 rop.complete[iter->first].r = err;
1233 ++is_complete;
1234 }
1235 } else {
1236 assert(rop.complete[iter->first].r == 0);
1237 if (!rop.complete[iter->first].errors.empty()) {
1238 if (cct->_conf->osd_read_ec_check_for_errors) {
1239 dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl;
1240 err = rop.complete[iter->first].errors.begin()->second;
1241 rop.complete[iter->first].r = err;
1242 } else {
c07f9fc5 1243 get_parent()->clog_warn() << "Error(s) ignored for "
7c673cae
FG
1244 << iter->first << " enough copies available";
1245 dout(10) << __func__ << " Error(s) ignored for " << iter->first
1246 << " enough copies available" << dendl;
1247 rop.complete[iter->first].errors.clear();
1248 }
1249 }
1250 ++is_complete;
1251 }
1252 }
1253 }
1254 if (rop.in_progress.empty() || is_complete == rop.complete.size()) {
1255 dout(20) << __func__ << " Complete: " << rop << dendl;
1256 rop.trace.event("ec read complete");
1257 complete_read_op(rop, m);
1258 } else {
1259 dout(10) << __func__ << " readop not complete: " << rop << dendl;
1260 }
1261}
1262
1263void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
1264{
1265 map<hobject_t, read_request_t>::iterator reqiter =
1266 rop.to_read.begin();
1267 map<hobject_t, read_result_t>::iterator resiter =
1268 rop.complete.begin();
1269 assert(rop.to_read.size() == rop.complete.size());
1270 for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
1271 if (reqiter->second.cb) {
1272 pair<RecoveryMessages *, read_result_t &> arg(
1273 m, resiter->second);
1274 reqiter->second.cb->complete(arg);
1275 reqiter->second.cb = NULL;
1276 }
1277 }
1278 tid_to_read_map.erase(rop.tid);
1279}
1280
1281struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
1282 ECBackend *ec;
1283 ceph_tid_t tid;
1284 FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
1285 void finish(ThreadPool::TPHandle &handle) override {
1286 auto ropiter = ec->tid_to_read_map.find(tid);
1287 assert(ropiter != ec->tid_to_read_map.end());
1288 int priority = ropiter->second.priority;
1289 RecoveryMessages rm;
1290 ec->complete_read_op(ropiter->second, &rm);
1291 ec->dispatch_recovery_messages(rm, priority);
1292 }
1293};
1294
1295void ECBackend::filter_read_op(
1296 const OSDMapRef& osdmap,
1297 ReadOp &op)
1298{
1299 set<hobject_t> to_cancel;
1300 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1301 i != op.source_to_obj.end();
1302 ++i) {
1303 if (osdmap->is_down(i->first.osd)) {
1304 to_cancel.insert(i->second.begin(), i->second.end());
1305 op.in_progress.erase(i->first);
1306 continue;
1307 }
1308 }
1309
1310 if (to_cancel.empty())
1311 return;
1312
1313 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1314 i != op.source_to_obj.end();
1315 ) {
1316 for (set<hobject_t>::iterator j = i->second.begin();
1317 j != i->second.end();
1318 ) {
1319 if (to_cancel.count(*j))
1320 i->second.erase(j++);
1321 else
1322 ++j;
1323 }
1324 if (i->second.empty()) {
1325 op.source_to_obj.erase(i++);
1326 } else {
1327 assert(!osdmap->is_down(i->first.osd));
1328 ++i;
1329 }
1330 }
1331
1332 for (set<hobject_t>::iterator i = to_cancel.begin();
1333 i != to_cancel.end();
1334 ++i) {
1335 get_parent()->cancel_pull(*i);
1336
1337 assert(op.to_read.count(*i));
1338 read_request_t &req = op.to_read.find(*i)->second;
1339 dout(10) << __func__ << ": canceling " << req
1340 << " for obj " << *i << dendl;
1341 assert(req.cb);
1342 delete req.cb;
1343 req.cb = NULL;
1344
1345 op.to_read.erase(*i);
1346 op.complete.erase(*i);
1347 recovery_ops.erase(*i);
1348 }
1349
1350 if (op.in_progress.empty()) {
1351 get_parent()->schedule_recovery_work(
1352 get_parent()->bless_gencontext(
1353 new FinishReadOp(this, op.tid)));
1354 }
1355}
1356
1357void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
1358{
1359 set<ceph_tid_t> tids_to_filter;
1360 for (map<pg_shard_t, set<ceph_tid_t> >::iterator
1361 i = shard_to_read_map.begin();
1362 i != shard_to_read_map.end();
1363 ) {
1364 if (osdmap->is_down(i->first.osd)) {
1365 tids_to_filter.insert(i->second.begin(), i->second.end());
1366 shard_to_read_map.erase(i++);
1367 } else {
1368 ++i;
1369 }
1370 }
1371 for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
1372 i != tids_to_filter.end();
1373 ++i) {
1374 map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
1375 assert(j != tid_to_read_map.end());
1376 filter_read_op(osdmap, j->second);
1377 }
1378}
1379
1380void ECBackend::on_change()
1381{
1382 dout(10) << __func__ << dendl;
1383
1384 completed_to = eversion_t();
1385 committed_to = eversion_t();
1386 pipeline_state.clear();
1387 waiting_reads.clear();
1388 waiting_state.clear();
1389 waiting_commit.clear();
1390 for (auto &&op: tid_to_op_map) {
1391 cache.release_write_pin(op.second.pin);
1392 }
1393 tid_to_op_map.clear();
1394
1395 for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
1396 i != tid_to_read_map.end();
1397 ++i) {
1398 dout(10) << __func__ << ": cancelling " << i->second << dendl;
1399 for (map<hobject_t, read_request_t>::iterator j =
1400 i->second.to_read.begin();
1401 j != i->second.to_read.end();
1402 ++j) {
1403 delete j->second.cb;
1404 j->second.cb = 0;
1405 }
1406 }
1407 tid_to_read_map.clear();
1408 in_progress_client_reads.clear();
1409 shard_to_read_map.clear();
1410 clear_recovery_state();
1411}
1412
1413void ECBackend::clear_recovery_state()
1414{
1415 recovery_ops.clear();
1416}
1417
1418void ECBackend::on_flushed()
1419{
1420}
1421
1422void ECBackend::dump_recovery_info(Formatter *f) const
1423{
1424 f->open_array_section("recovery_ops");
1425 for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
1426 i != recovery_ops.end();
1427 ++i) {
1428 f->open_object_section("op");
1429 i->second.dump(f);
1430 f->close_section();
1431 }
1432 f->close_section();
1433 f->open_array_section("read_ops");
1434 for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
1435 i != tid_to_read_map.end();
1436 ++i) {
1437 f->open_object_section("read_op");
1438 i->second.dump(f);
1439 f->close_section();
1440 }
1441 f->close_section();
1442}
1443
1444void ECBackend::submit_transaction(
1445 const hobject_t &hoid,
1446 const object_stat_sum_t &delta_stats,
1447 const eversion_t &at_version,
1448 PGTransactionUPtr &&t,
1449 const eversion_t &trim_to,
1450 const eversion_t &roll_forward_to,
1451 const vector<pg_log_entry_t> &log_entries,
1452 boost::optional<pg_hit_set_history_t> &hset_history,
1453 Context *on_local_applied_sync,
1454 Context *on_all_applied,
1455 Context *on_all_commit,
1456 ceph_tid_t tid,
1457 osd_reqid_t reqid,
1458 OpRequestRef client_op
1459 )
1460{
1461 assert(!tid_to_op_map.count(tid));
1462 Op *op = &(tid_to_op_map[tid]);
1463 op->hoid = hoid;
1464 op->delta_stats = delta_stats;
1465 op->version = at_version;
1466 op->trim_to = trim_to;
1467 op->roll_forward_to = MAX(roll_forward_to, committed_to);
1468 op->log_entries = log_entries;
1469 std::swap(op->updated_hit_set_history, hset_history);
1470 op->on_local_applied_sync = on_local_applied_sync;
1471 op->on_all_applied = on_all_applied;
1472 op->on_all_commit = on_all_commit;
1473 op->tid = tid;
1474 op->reqid = reqid;
1475 op->client_op = client_op;
1476 if (client_op)
1477 op->trace = client_op->pg_trace;
1478
1479 dout(10) << __func__ << ": op " << *op << " starting" << dendl;
1480 start_rmw(op, std::move(t));
1481 dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
1482}
1483
1484void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
1485 if (!waiting_state.empty()) {
1486 waiting_state.back().on_write.emplace_back(std::move(cb));
1487 } else if (!waiting_reads.empty()) {
1488 waiting_reads.back().on_write.emplace_back(std::move(cb));
1489 } else {
1490 // Nothing earlier in the pipeline, just call it
1491 cb();
1492 }
1493}
1494
b32b8144 1495void ECBackend::get_all_avail_shards(
7c673cae 1496 const hobject_t &hoid,
28e407b8 1497 const set<pg_shard_t> &error_shards,
b32b8144
FG
1498 set<int> &have,
1499 map<shard_id_t, pg_shard_t> &shards,
1500 bool for_recovery)
7c673cae 1501{
7c673cae
FG
1502 for (set<pg_shard_t>::const_iterator i =
1503 get_parent()->get_acting_shards().begin();
1504 i != get_parent()->get_acting_shards().end();
1505 ++i) {
1506 dout(10) << __func__ << ": checking acting " << *i << dendl;
1507 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
28e407b8
AA
1508 if (error_shards.find(*i) != error_shards.end())
1509 continue;
7c673cae
FG
1510 if (!missing.is_missing(hoid)) {
1511 assert(!have.count(i->shard));
1512 have.insert(i->shard);
1513 assert(!shards.count(i->shard));
1514 shards.insert(make_pair(i->shard, *i));
1515 }
1516 }
1517
1518 if (for_recovery) {
1519 for (set<pg_shard_t>::const_iterator i =
1520 get_parent()->get_backfill_shards().begin();
1521 i != get_parent()->get_backfill_shards().end();
1522 ++i) {
28e407b8
AA
1523 if (error_shards.find(*i) != error_shards.end())
1524 continue;
7c673cae
FG
1525 if (have.count(i->shard)) {
1526 assert(shards.count(i->shard));
1527 continue;
1528 }
1529 dout(10) << __func__ << ": checking backfill " << *i << dendl;
1530 assert(!shards.count(i->shard));
1531 const pg_info_t &info = get_parent()->get_shard_info(*i);
1532 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1533 if (hoid < info.last_backfill &&
1534 !missing.is_missing(hoid)) {
1535 have.insert(i->shard);
1536 shards.insert(make_pair(i->shard, *i));
1537 }
1538 }
1539
1540 map<hobject_t, set<pg_shard_t>>::const_iterator miter =
1541 get_parent()->get_missing_loc_shards().find(hoid);
1542 if (miter != get_parent()->get_missing_loc_shards().end()) {
1543 for (set<pg_shard_t>::iterator i = miter->second.begin();
1544 i != miter->second.end();
1545 ++i) {
1546 dout(10) << __func__ << ": checking missing_loc " << *i << dendl;
1547 auto m = get_parent()->maybe_get_shard_missing(*i);
1548 if (m) {
1549 assert(!(*m).is_missing(hoid));
1550 }
28e407b8
AA
1551 if (error_shards.find(*i) != error_shards.end())
1552 continue;
7c673cae
FG
1553 have.insert(i->shard);
1554 shards.insert(make_pair(i->shard, *i));
1555 }
1556 }
1557 }
b32b8144
FG
1558}
1559
1560int ECBackend::get_min_avail_to_read_shards(
1561 const hobject_t &hoid,
1562 const set<int> &want,
1563 bool for_recovery,
1564 bool do_redundant_reads,
1565 set<pg_shard_t> *to_read)
1566{
1567 // Make sure we don't do redundant reads for recovery
1568 assert(!for_recovery || !do_redundant_reads);
1569
1570 set<int> have;
1571 map<shard_id_t, pg_shard_t> shards;
28e407b8 1572 set<pg_shard_t> error_shards;
b32b8144 1573
28e407b8 1574 get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
7c673cae
FG
1575
1576 set<int> need;
1577 int r = ec_impl->minimum_to_decode(want, have, &need);
1578 if (r < 0)
1579 return r;
1580
1581 if (do_redundant_reads) {
1582 need.swap(have);
1583 }
1584
1585 if (!to_read)
1586 return 0;
1587
1588 for (set<int>::iterator i = need.begin();
1589 i != need.end();
1590 ++i) {
1591 assert(shards.count(shard_id_t(*i)));
1592 to_read->insert(shards[shard_id_t(*i)]);
1593 }
1594 return 0;
1595}
1596
1597int ECBackend::get_remaining_shards(
1598 const hobject_t &hoid,
1599 const set<int> &avail,
28e407b8
AA
1600 const set<int> &want,
1601 const read_result_t &result,
b32b8144
FG
1602 set<pg_shard_t> *to_read,
1603 bool for_recovery)
7c673cae 1604{
b32b8144 1605 assert(to_read);
7c673cae 1606
b32b8144
FG
1607 set<int> have;
1608 map<shard_id_t, pg_shard_t> shards;
28e407b8
AA
1609 set<pg_shard_t> error_shards;
1610 for (auto &p : result.errors) {
1611 error_shards.insert(p.first);
1612 }
1613
1614 get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
1615
1616 set<int> need;
1617 int r = ec_impl->minimum_to_decode(want, have, &need);
1618 if (r < 0) {
1619 dout(0) << __func__ << " not enough shards left to try for " << hoid
1620 << " read result was " << result << dendl;
1621 return -EIO;
1622 }
7c673cae 1623
28e407b8
AA
1624 set<int> shards_left;
1625 for (auto p : need) {
1626 if (avail.find(p) == avail.end()) {
1627 shards_left.insert(p);
1628 }
1629 }
7c673cae 1630
28e407b8
AA
1631 for (set<int>::iterator i = shards_left.begin();
1632 i != shards_left.end();
7c673cae
FG
1633 ++i) {
1634 assert(shards.count(shard_id_t(*i)));
28e407b8
AA
1635 assert(avail.find(*i) == avail.end());
1636 to_read->insert(shards[shard_id_t(*i)]);
7c673cae
FG
1637 }
1638 return 0;
1639}
1640
1641void ECBackend::start_read_op(
1642 int priority,
28e407b8 1643 map<hobject_t, set<int>> &want_to_read,
7c673cae
FG
1644 map<hobject_t, read_request_t> &to_read,
1645 OpRequestRef _op,
1646 bool do_redundant_reads,
1647 bool for_recovery)
1648{
1649 ceph_tid_t tid = get_parent()->get_tid();
1650 assert(!tid_to_read_map.count(tid));
1651 auto &op = tid_to_read_map.emplace(
1652 tid,
1653 ReadOp(
1654 priority,
1655 tid,
1656 do_redundant_reads,
1657 for_recovery,
1658 _op,
28e407b8 1659 std::move(want_to_read),
7c673cae
FG
1660 std::move(to_read))).first->second;
1661 dout(10) << __func__ << ": starting " << op << dendl;
1662 if (_op) {
1663 op.trace = _op->pg_trace;
1664 op.trace.event("start ec read");
1665 }
1666 do_read_op(op);
1667}
1668
1669void ECBackend::do_read_op(ReadOp &op)
1670{
1671 int priority = op.priority;
1672 ceph_tid_t tid = op.tid;
1673
1674 dout(10) << __func__ << ": starting read " << op << dendl;
1675
1676 map<pg_shard_t, ECSubRead> messages;
1677 for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
1678 i != op.to_read.end();
1679 ++i) {
1680 bool need_attrs = i->second.want_attrs;
1681 for (set<pg_shard_t>::const_iterator j = i->second.need.begin();
1682 j != i->second.need.end();
1683 ++j) {
1684 if (need_attrs) {
1685 messages[*j].attrs_to_read.insert(i->first);
1686 need_attrs = false;
1687 }
1688 op.obj_to_source[i->first].insert(*j);
1689 op.source_to_obj[*j].insert(i->first);
1690 }
1691 for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
1692 i->second.to_read.begin();
1693 j != i->second.to_read.end();
1694 ++j) {
1695 pair<uint64_t, uint64_t> chunk_off_len =
1696 sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
1697 for (set<pg_shard_t>::const_iterator k = i->second.need.begin();
1698 k != i->second.need.end();
1699 ++k) {
1700 messages[*k].to_read[i->first].push_back(
1701 boost::make_tuple(
1702 chunk_off_len.first,
1703 chunk_off_len.second,
1704 j->get<2>()));
1705 }
1706 assert(!need_attrs);
1707 }
1708 }
1709
1710 for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
1711 i != messages.end();
1712 ++i) {
1713 op.in_progress.insert(i->first);
1714 shard_to_read_map[i->first].insert(op.tid);
1715 i->second.tid = tid;
1716 MOSDECSubOpRead *msg = new MOSDECSubOpRead;
1717 msg->set_priority(priority);
1718 msg->pgid = spg_t(
1719 get_parent()->whoami_spg_t().pgid,
1720 i->first.shard);
1721 msg->map_epoch = get_parent()->get_epoch();
1722 msg->min_epoch = get_parent()->get_interval_start_epoch();
1723 msg->op = i->second;
1724 msg->op.from = get_parent()->whoami_shard();
1725 msg->op.tid = tid;
1726 if (op.trace) {
1727 // initialize a child span for this shard
1728 msg->trace.init("ec sub read", nullptr, &op.trace);
1729 msg->trace.keyval("shard", i->first.shard.id);
1730 }
1731 get_parent()->send_message_osd_cluster(
1732 i->first.osd,
1733 msg,
1734 get_parent()->get_epoch());
1735 }
1736 dout(10) << __func__ << ": started " << op << dendl;
1737}
1738
1739ECUtil::HashInfoRef ECBackend::get_hash_info(
1740 const hobject_t &hoid, bool checks, const map<string,bufferptr> *attrs)
1741{
1742 dout(10) << __func__ << ": Getting attr on " << hoid << dendl;
1743 ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid);
1744 if (!ref) {
1745 dout(10) << __func__ << ": not in cache " << hoid << dendl;
1746 struct stat st;
1747 int r = store->stat(
1748 ch,
1749 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1750 &st);
1751 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
1752 // XXX: What does it mean if there is no object on disk?
1753 if (r >= 0) {
1754 dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl;
1755 bufferlist bl;
1756 if (attrs) {
1757 map<string, bufferptr>::const_iterator k = attrs->find(ECUtil::get_hinfo_key());
1758 if (k == attrs->end()) {
1759 dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl;
1760 } else {
1761 bl.push_back(k->second);
1762 }
1763 } else {
1764 r = store->getattr(
1765 ch,
1766 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1767 ECUtil::get_hinfo_key(),
1768 bl);
1769 if (r < 0) {
1770 dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl;
1771 bl.clear(); // just in case
1772 }
1773 }
1774 if (bl.length() > 0) {
1775 bufferlist::iterator bp = bl.begin();
94b18763
FG
1776 try {
1777 ::decode(hinfo, bp);
1778 } catch(...) {
1779 dout(0) << __func__ << ": Can't decode hinfo for " << hoid << dendl;
1780 return ECUtil::HashInfoRef();
1781 }
7c673cae
FG
1782 if (checks && hinfo.get_total_chunk_size() != (uint64_t)st.st_size) {
1783 dout(0) << __func__ << ": Mismatch of total_chunk_size "
1784 << hinfo.get_total_chunk_size() << dendl;
1785 return ECUtil::HashInfoRef();
1786 }
1787 } else if (st.st_size > 0) { // If empty object and no hinfo, create it
1788 return ECUtil::HashInfoRef();
1789 }
1790 }
1791 ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
1792 }
1793 return ref;
1794}
1795
1796void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
1797{
1798 assert(op);
1799
1800 op->plan = ECTransaction::get_write_plan(
1801 sinfo,
1802 std::move(t),
1803 [&](const hobject_t &i) {
1804 ECUtil::HashInfoRef ref = get_hash_info(i, false);
1805 if (!ref) {
1806 derr << __func__ << ": get_hash_info(" << i << ")"
1807 << " returned a null pointer and there is no "
1808 << " way to recover from such an error in this "
1809 << " context" << dendl;
1810 ceph_abort();
1811 }
1812 return ref;
1813 },
1814 get_parent()->get_dpp());
1815
1816 dout(10) << __func__ << ": " << *op << dendl;
1817
1818 waiting_state.push_back(*op);
1819 check_ops();
1820}
1821
1822bool ECBackend::try_state_to_reads()
1823{
1824 if (waiting_state.empty())
1825 return false;
1826
1827 Op *op = &(waiting_state.front());
1828 if (op->requires_rmw() && pipeline_state.cache_invalid()) {
1829 assert(get_parent()->get_pool().allows_ecoverwrites());
1830 dout(20) << __func__ << ": blocking " << *op
1831 << " because it requires an rmw and the cache is invalid "
1832 << pipeline_state
1833 << dendl;
1834 return false;
1835 }
1836
1837 if (op->invalidates_cache()) {
1838 dout(20) << __func__ << ": invalidating cache after this op"
1839 << dendl;
1840 pipeline_state.invalidate();
1841 op->using_cache = false;
1842 } else {
1843 op->using_cache = pipeline_state.caching_enabled();
1844 }
1845
1846 waiting_state.pop_front();
1847 waiting_reads.push_back(*op);
1848
1849 if (op->using_cache) {
1850 cache.open_write_pin(op->pin);
1851
1852 extent_set empty;
1853 for (auto &&hpair: op->plan.will_write) {
1854 auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
1855 const extent_set &to_read_plan =
1856 to_read_plan_iter == op->plan.to_read.end() ?
1857 empty :
1858 to_read_plan_iter->second;
1859
1860 extent_set remote_read = cache.reserve_extents_for_rmw(
1861 hpair.first,
1862 op->pin,
1863 hpair.second,
1864 to_read_plan);
1865
1866 extent_set pending_read = to_read_plan;
1867 pending_read.subtract(remote_read);
1868
1869 if (!remote_read.empty()) {
1870 op->remote_read[hpair.first] = std::move(remote_read);
1871 }
1872 if (!pending_read.empty()) {
1873 op->pending_read[hpair.first] = std::move(pending_read);
1874 }
1875 }
1876 } else {
1877 op->remote_read = op->plan.to_read;
1878 }
1879
1880 dout(10) << __func__ << ": " << *op << dendl;
1881
1882 if (!op->remote_read.empty()) {
1883 assert(get_parent()->get_pool().allows_ecoverwrites());
1884 objects_read_async_no_cache(
1885 op->remote_read,
1886 [this, op](map<hobject_t,pair<int, extent_map> > &&results) {
1887 for (auto &&i: results) {
1888 op->remote_read_result.emplace(i.first, i.second.second);
1889 }
1890 check_ops();
1891 });
1892 }
1893
1894 return true;
1895}
1896
1897bool ECBackend::try_reads_to_commit()
1898{
1899 if (waiting_reads.empty())
1900 return false;
1901 Op *op = &(waiting_reads.front());
1902 if (op->read_in_progress())
1903 return false;
1904 waiting_reads.pop_front();
1905 waiting_commit.push_back(*op);
1906
1907 dout(10) << __func__ << ": starting commit on " << *op << dendl;
1908 dout(20) << __func__ << ": " << cache << dendl;
1909
1910 get_parent()->apply_stats(
1911 op->hoid,
1912 op->delta_stats);
1913
1914 if (op->using_cache) {
1915 for (auto &&hpair: op->pending_read) {
1916 op->remote_read_result[hpair.first].insert(
1917 cache.get_remaining_extents_for_rmw(
1918 hpair.first,
1919 op->pin,
1920 hpair.second));
1921 }
1922 op->pending_read.clear();
1923 } else {
1924 assert(op->pending_read.empty());
1925 }
1926
1927 map<shard_id_t, ObjectStore::Transaction> trans;
1928 for (set<pg_shard_t>::const_iterator i =
1929 get_parent()->get_actingbackfill_shards().begin();
1930 i != get_parent()->get_actingbackfill_shards().end();
1931 ++i) {
1932 trans[i->shard];
1933 }
1934
1935 op->trace.event("start ec write");
1936
1937 map<hobject_t,extent_map> written;
1938 if (op->plan.t) {
1939 ECTransaction::generate_transactions(
1940 op->plan,
1941 ec_impl,
1942 get_parent()->get_info().pgid.pgid,
31f18b77 1943 (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
7c673cae
FG
1944 sinfo,
1945 op->remote_read_result,
1946 op->log_entries,
1947 &written,
1948 &trans,
1949 &(op->temp_added),
1950 &(op->temp_cleared),
1951 get_parent()->get_dpp());
1952 }
1953
1954 dout(20) << __func__ << ": " << cache << dendl;
1955 dout(20) << __func__ << ": written: " << written << dendl;
1956 dout(20) << __func__ << ": op: " << *op << dendl;
1957
1958 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1959 for (auto &&i: op->log_entries) {
1960 if (i.requires_kraken()) {
1961 derr << __func__ << ": log entry " << i << " requires kraken"
1962 << " but overwrites are not enabled!" << dendl;
1963 ceph_abort();
1964 }
1965 }
1966 }
1967
1968 map<hobject_t,extent_set> written_set;
1969 for (auto &&i: written) {
1970 written_set[i.first] = i.second.get_interval_set();
1971 }
1972 dout(20) << __func__ << ": written_set: " << written_set << dendl;
1973 assert(written_set == op->plan.will_write);
1974
1975 if (op->using_cache) {
1976 for (auto &&hpair: written) {
1977 dout(20) << __func__ << ": " << hpair << dendl;
1978 cache.present_rmw_update(hpair.first, op->pin, hpair.second);
1979 }
1980 }
1981 op->remote_read.clear();
1982 op->remote_read_result.clear();
1983
1984 dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
1985 ObjectStore::Transaction empty;
1986 bool should_write_local = false;
1987 ECSubWrite local_write_op;
1988 for (set<pg_shard_t>::const_iterator i =
1989 get_parent()->get_actingbackfill_shards().begin();
1990 i != get_parent()->get_actingbackfill_shards().end();
1991 ++i) {
1992 op->pending_apply.insert(*i);
1993 op->pending_commit.insert(*i);
1994 map<shard_id_t, ObjectStore::Transaction>::iterator iter =
1995 trans.find(i->shard);
1996 assert(iter != trans.end());
1997 bool should_send = get_parent()->should_send_op(*i, op->hoid);
1998 const pg_stat_t &stats =
1999 should_send ?
2000 get_info().stats :
2001 parent->get_shard_info().find(*i)->second.stats;
2002
2003 ECSubWrite sop(
2004 get_parent()->whoami_shard(),
2005 op->tid,
2006 op->reqid,
2007 op->hoid,
2008 stats,
2009 should_send ? iter->second : empty,
2010 op->version,
2011 op->trim_to,
2012 op->roll_forward_to,
2013 op->log_entries,
2014 op->updated_hit_set_history,
2015 op->temp_added,
2016 op->temp_cleared,
2017 !should_send);
2018
2019 ZTracer::Trace trace;
2020 if (op->trace) {
2021 // initialize a child span for this shard
2022 trace.init("ec sub write", nullptr, &op->trace);
2023 trace.keyval("shard", i->shard.id);
2024 }
2025
2026 if (*i == get_parent()->whoami_shard()) {
2027 should_write_local = true;
2028 local_write_op.claim(sop);
2029 } else {
2030 MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop);
2031 r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard);
2032 r->map_epoch = get_parent()->get_epoch();
2033 r->min_epoch = get_parent()->get_interval_start_epoch();
2034 r->trace = trace;
2035 get_parent()->send_message_osd_cluster(
2036 i->osd, r, get_parent()->get_epoch());
2037 }
2038 }
2039 if (should_write_local) {
2040 handle_sub_write(
2041 get_parent()->whoami_shard(),
2042 op->client_op,
2043 local_write_op,
2044 op->trace,
2045 op->on_local_applied_sync);
2046 op->on_local_applied_sync = 0;
2047 }
2048
2049 for (auto i = op->on_write.begin();
2050 i != op->on_write.end();
2051 op->on_write.erase(i++)) {
2052 (*i)();
2053 }
2054
2055 return true;
2056}
2057
2058bool ECBackend::try_finish_rmw()
2059{
2060 if (waiting_commit.empty())
2061 return false;
2062 Op *op = &(waiting_commit.front());
2063 if (op->write_in_progress())
2064 return false;
2065 waiting_commit.pop_front();
2066
2067 dout(10) << __func__ << ": " << *op << dendl;
2068 dout(20) << __func__ << ": " << cache << dendl;
2069
2070 if (op->roll_forward_to > completed_to)
2071 completed_to = op->roll_forward_to;
2072 if (op->version > committed_to)
2073 committed_to = op->version;
2074
31f18b77 2075 if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
7c673cae
FG
2076 if (op->version > get_parent()->get_log().get_can_rollback_to() &&
2077 waiting_reads.empty() &&
2078 waiting_commit.empty()) {
2079 // submit a dummy transaction to kick the rollforward
2080 auto tid = get_parent()->get_tid();
2081 Op *nop = &(tid_to_op_map[tid]);
2082 nop->hoid = op->hoid;
2083 nop->trim_to = op->trim_to;
2084 nop->roll_forward_to = op->version;
2085 nop->tid = tid;
2086 nop->reqid = op->reqid;
2087 waiting_reads.push_back(*nop);
2088 }
2089 }
2090
2091 if (op->using_cache) {
2092 cache.release_write_pin(op->pin);
2093 }
2094 tid_to_op_map.erase(op->tid);
2095
2096 if (waiting_reads.empty() &&
2097 waiting_commit.empty()) {
2098 pipeline_state.clear();
2099 dout(20) << __func__ << ": clearing pipeline_state "
2100 << pipeline_state
2101 << dendl;
2102 }
2103 return true;
2104}
2105
2106void ECBackend::check_ops()
2107{
2108 while (try_state_to_reads() ||
2109 try_reads_to_commit() ||
2110 try_finish_rmw());
2111}
2112
2113int ECBackend::objects_read_sync(
2114 const hobject_t &hoid,
2115 uint64_t off,
2116 uint64_t len,
2117 uint32_t op_flags,
2118 bufferlist *bl)
2119{
2120 return -EOPNOTSUPP;
2121}
2122
2123void ECBackend::objects_read_async(
2124 const hobject_t &hoid,
2125 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2126 pair<bufferlist*, Context*> > > &to_read,
2127 Context *on_complete,
2128 bool fast_read)
2129{
2130 map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
2131 reads;
2132
2133 uint32_t flags = 0;
2134 extent_set es;
2135 for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2136 pair<bufferlist*, Context*> > >::const_iterator i =
2137 to_read.begin();
2138 i != to_read.end();
2139 ++i) {
2140 pair<uint64_t, uint64_t> tmp =
2141 sinfo.offset_len_to_stripe_bounds(
2142 make_pair(i->first.get<0>(), i->first.get<1>()));
2143
2144 extent_set esnew;
2145 esnew.insert(tmp.first, tmp.second);
2146 es.union_of(esnew);
2147 flags |= i->first.get<2>();
2148 }
2149
2150 if (!es.empty()) {
2151 auto &offsets = reads[hoid];
2152 for (auto j = es.begin();
2153 j != es.end();
2154 ++j) {
2155 offsets.push_back(
2156 boost::make_tuple(
2157 j.get_start(),
2158 j.get_len(),
2159 flags));
2160 }
2161 }
2162
2163 struct cb {
2164 ECBackend *ec;
2165 hobject_t hoid;
2166 list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2167 pair<bufferlist*, Context*> > > to_read;
2168 unique_ptr<Context> on_complete;
2169 cb(const cb&) = delete;
2170 cb(cb &&) = default;
2171 cb(ECBackend *ec,
2172 const hobject_t &hoid,
2173 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2174 pair<bufferlist*, Context*> > > &to_read,
2175 Context *on_complete)
2176 : ec(ec),
2177 hoid(hoid),
2178 to_read(to_read),
2179 on_complete(on_complete) {}
2180 void operator()(map<hobject_t,pair<int, extent_map> > &&results) {
2181 auto dpp = ec->get_parent()->get_dpp();
2182 ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
2183 << dendl;
2184 ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
2185 << dendl;
2186
2187 auto &got = results[hoid];
2188
2189 int r = 0;
2190 for (auto &&read: to_read) {
2191 if (got.first < 0) {
2192 if (read.second.second) {
2193 read.second.second->complete(got.first);
2194 }
2195 if (r == 0)
2196 r = got.first;
2197 } else {
2198 assert(read.second.first);
2199 uint64_t offset = read.first.get<0>();
2200 uint64_t length = read.first.get<1>();
2201 auto range = got.second.get_containing_range(offset, length);
2202 assert(range.first != range.second);
2203 assert(range.first.get_off() <= offset);
2204 assert(
2205 (offset + length) <=
2206 (range.first.get_off() + range.first.get_len()));
2207 read.second.first->substr_of(
2208 range.first.get_val(),
2209 offset - range.first.get_off(),
2210 length);
2211 if (read.second.second) {
2212 read.second.second->complete(length);
2213 read.second.second = nullptr;
2214 }
2215 }
2216 }
2217 to_read.clear();
2218 if (on_complete) {
2219 on_complete.release()->complete(r);
2220 }
2221 }
2222 ~cb() {
2223 for (auto &&i: to_read) {
2224 delete i.second.second;
2225 }
2226 to_read.clear();
2227 }
2228 };
2229 objects_read_and_reconstruct(
2230 reads,
2231 fast_read,
2232 make_gen_lambda_context<
2233 map<hobject_t,pair<int, extent_map> > &&, cb>(
2234 cb(this,
2235 hoid,
2236 to_read,
2237 on_complete)));
2238}
2239
2240struct CallClientContexts :
2241 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
2242 hobject_t hoid;
2243 ECBackend *ec;
2244 ECBackend::ClientAsyncReadStatus *status;
2245 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
2246 CallClientContexts(
2247 hobject_t hoid,
2248 ECBackend *ec,
2249 ECBackend::ClientAsyncReadStatus *status,
2250 const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
2251 : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
2252 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
2253 ECBackend::read_result_t &res = in.second;
2254 extent_map result;
2255 if (res.r != 0)
2256 goto out;
2257 assert(res.returned.size() == to_read.size());
2258 assert(res.r == 0);
2259 assert(res.errors.empty());
2260 for (auto &&read: to_read) {
2261 pair<uint64_t, uint64_t> adjusted =
2262 ec->sinfo.offset_len_to_stripe_bounds(
2263 make_pair(read.get<0>(), read.get<1>()));
2264 assert(res.returned.front().get<0>() == adjusted.first &&
2265 res.returned.front().get<1>() == adjusted.second);
2266 map<int, bufferlist> to_decode;
2267 bufferlist bl;
2268 for (map<pg_shard_t, bufferlist>::iterator j =
2269 res.returned.front().get<2>().begin();
2270 j != res.returned.front().get<2>().end();
2271 ++j) {
2272 to_decode[j->first.shard].claim(j->second);
2273 }
2274 int r = ECUtil::decode(
2275 ec->sinfo,
2276 ec->ec_impl,
2277 to_decode,
2278 &bl);
2279 if (r < 0) {
2280 res.r = r;
2281 goto out;
2282 }
2283 bufferlist trimmed;
2284 trimmed.substr_of(
2285 bl,
2286 read.get<0>() - adjusted.first,
2287 MIN(read.get<1>(),
2288 bl.length() - (read.get<0>() - adjusted.first)));
2289 result.insert(
2290 read.get<0>(), trimmed.length(), std::move(trimmed));
2291 res.returned.pop_front();
2292 }
2293out:
2294 status->complete_object(hoid, res.r, std::move(result));
2295 ec->kick_reads();
2296 }
2297};
2298
2299void ECBackend::objects_read_and_reconstruct(
2300 const map<hobject_t,
2301 std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
2302 > &reads,
2303 bool fast_read,
2304 GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
2305{
2306 in_progress_client_reads.emplace_back(
2307 reads.size(), std::move(func));
2308 if (!reads.size()) {
2309 kick_reads();
2310 return;
2311 }
2312
28e407b8 2313 map<hobject_t, set<int>> obj_want_to_read;
7c673cae
FG
2314 set<int> want_to_read;
2315 get_want_to_read_shards(&want_to_read);
2316
2317 map<hobject_t, read_request_t> for_read_op;
2318 for (auto &&to_read: reads) {
2319 set<pg_shard_t> shards;
2320 int r = get_min_avail_to_read_shards(
2321 to_read.first,
2322 want_to_read,
2323 false,
2324 fast_read,
2325 &shards);
2326 assert(r == 0);
2327
2328 CallClientContexts *c = new CallClientContexts(
2329 to_read.first,
2330 this,
2331 &(in_progress_client_reads.back()),
2332 to_read.second);
2333 for_read_op.insert(
2334 make_pair(
2335 to_read.first,
2336 read_request_t(
2337 to_read.second,
2338 shards,
2339 false,
2340 c)));
28e407b8 2341 obj_want_to_read.insert(make_pair(to_read.first, want_to_read));
7c673cae
FG
2342 }
2343
2344 start_read_op(
2345 CEPH_MSG_PRIO_DEFAULT,
28e407b8 2346 obj_want_to_read,
7c673cae
FG
2347 for_read_op,
2348 OpRequestRef(),
2349 fast_read, false);
2350 return;
2351}
2352
2353
2354int ECBackend::send_all_remaining_reads(
2355 const hobject_t &hoid,
2356 ReadOp &rop)
2357{
2358 set<int> already_read;
2359 const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
2360 for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
2361 already_read.insert(i->shard);
2362 dout(10) << __func__ << " have/error shards=" << already_read << dendl;
2363 set<pg_shard_t> shards;
28e407b8
AA
2364 int r = get_remaining_shards(hoid, already_read, rop.want_to_read[hoid],
2365 rop.complete[hoid], &shards, rop.for_recovery);
7c673cae
FG
2366 if (r)
2367 return r;
7c673cae 2368
7c673cae
FG
2369 list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
2370 rop.to_read.find(hoid)->second.to_read;
2371 GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
2372 rop.to_read.find(hoid)->second.cb;
2373
28e407b8
AA
2374 rop.to_read.erase(hoid);
2375 rop.to_read.insert(make_pair(
7c673cae
FG
2376 hoid,
2377 read_request_t(
2378 offsets,
2379 shards,
2380 false,
2381 c)));
7c673cae
FG
2382 do_read_op(rop);
2383 return 0;
2384}
2385
2386int ECBackend::objects_get_attrs(
2387 const hobject_t &hoid,
2388 map<string, bufferlist> *out)
2389{
2390 int r = store->getattrs(
2391 ch,
2392 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2393 *out);
2394 if (r < 0)
2395 return r;
2396
2397 for (map<string, bufferlist>::iterator i = out->begin();
2398 i != out->end();
2399 ) {
2400 if (ECUtil::is_hinfo_key_string(i->first))
2401 out->erase(i++);
2402 else
2403 ++i;
2404 }
2405 return r;
2406}
2407
2408void ECBackend::rollback_append(
2409 const hobject_t &hoid,
2410 uint64_t old_size,
2411 ObjectStore::Transaction *t)
2412{
2413 assert(old_size % sinfo.get_stripe_width() == 0);
2414 t->truncate(
2415 coll,
2416 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2417 sinfo.aligned_logical_offset_to_chunk_offset(
2418 old_size));
2419}
2420
28e407b8 2421int ECBackend::be_deep_scrub(
7c673cae 2422 const hobject_t &poid,
28e407b8
AA
2423 ScrubMap &map,
2424 ScrubMapBuilder &pos,
2425 ScrubMap::object &o)
2426{
2427 dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
7c673cae 2428 int r;
28e407b8
AA
2429
2430 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
2431 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
2432
2433 utime_t sleeptime;
2434 sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
2435 if (sleeptime != utime_t()) {
2436 lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
2437 sleeptime.sleep();
2438 }
2439
2440 if (pos.data_pos == 0) {
2441 pos.data_hash = bufferhash(-1);
2442 }
2443
7c673cae
FG
2444 uint64_t stride = cct->_conf->osd_deep_scrub_stride;
2445 if (stride % sinfo.get_chunk_size())
2446 stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
7c673cae 2447
28e407b8
AA
2448 bufferlist bl;
2449 r = store->read(
2450 ch,
2451 ghobject_t(
2452 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2453 pos.data_pos,
2454 stride, bl,
2455 fadvise_flags);
2456 if (r < 0) {
2457 dout(20) << __func__ << " " << poid << " got "
2458 << r << " on read, read_error" << dendl;
2459 o.read_error = true;
2460 return 0;
7c673cae 2461 }
28e407b8
AA
2462 if (bl.length() % sinfo.get_chunk_size()) {
2463 dout(20) << __func__ << " " << poid << " got "
2464 << r << " on read, not chunk size " << sinfo.get_chunk_size() << " aligned"
2465 << dendl;
7c673cae 2466 o.read_error = true;
28e407b8
AA
2467 return 0;
2468 }
2469 if (r > 0) {
2470 pos.data_hash << bl;
2471 }
2472 pos.data_pos += r;
2473 if (r == (int)stride) {
2474 return -EINPROGRESS;
7c673cae
FG
2475 }
2476
2477 ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
2478 if (!hinfo) {
2479 dout(0) << "_scan_list " << poid << " could not retrieve hash info" << dendl;
2480 o.read_error = true;
2481 o.digest_present = false;
28e407b8 2482 return 0;
7c673cae
FG
2483 } else {
2484 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2485 assert(hinfo->has_chunk_hash());
28e407b8
AA
2486 if (hinfo->get_total_chunk_size() != (unsigned)pos.data_pos) {
2487 dout(0) << "_scan_list " << poid << " got incorrect size on read 0x"
2488 << std::hex << pos
2489 << " expected 0x" << hinfo->get_total_chunk_size() << std::dec
2490 << dendl;
7c673cae 2491 o.ec_size_mismatch = true;
28e407b8 2492 return 0;
7c673cae
FG
2493 }
2494
28e407b8
AA
2495 if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) !=
2496 pos.data_hash.digest()) {
2497 dout(0) << "_scan_list " << poid << " got incorrect hash on read 0x"
2498 << std::hex << pos.data_hash.digest() << " != expected 0x"
2499 << hinfo->get_chunk_hash(get_parent()->whoami_shard().shard)
2500 << std::dec << dendl;
7c673cae 2501 o.ec_hash_mismatch = true;
28e407b8 2502 return 0;
7c673cae
FG
2503 }
2504
2505 /* We checked above that we match our own stored hash. We cannot
2506 * send a hash of the actual object, so instead we simply send
2507 * our locally stored hash of shard 0 on the assumption that if
2508 * we match our chunk hash and our recollection of the hash for
2509 * chunk 0 matches that of our peers, there is likely no corruption.
2510 */
2511 o.digest = hinfo->get_chunk_hash(0);
2512 o.digest_present = true;
2513 } else {
2514 /* Hack! We must be using partial overwrites, and partial overwrites
2515 * don't support deep-scrub yet
2516 */
2517 o.digest = 0;
2518 o.digest_present = true;
2519 }
2520 }
2521
28e407b8 2522 o.omap_digest = -1;
7c673cae 2523 o.omap_digest_present = true;
28e407b8 2524 return 0;
7c673cae 2525}