]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/ECBackend.cc
update sources to v12.2.3
[ceph.git] / ceph / src / osd / ECBackend.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <iostream>
16#include <sstream>
17
18#include "ECBackend.h"
19#include "messages/MOSDPGPush.h"
20#include "messages/MOSDPGPushReply.h"
21#include "messages/MOSDECSubOpWrite.h"
22#include "messages/MOSDECSubOpWriteReply.h"
23#include "messages/MOSDECSubOpRead.h"
24#include "messages/MOSDECSubOpReadReply.h"
25#include "ECMsgTypes.h"
26
27#include "PrimaryLogPG.h"
28
29#define dout_context cct
30#define dout_subsys ceph_subsys_osd
31#define DOUT_PREFIX_ARGS this
32#undef dout_prefix
33#define dout_prefix _prefix(_dout, this)
34static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
35 return *_dout << pgb->get_parent()->gen_dbg_prefix();
36}
37
38struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
39 list<ECBackend::RecoveryOp> ops;
40};
41
42ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
43 switch (rhs.pipeline_state) {
44 case ECBackend::pipeline_state_t::CACHE_VALID:
45 return lhs << "CACHE_VALID";
46 case ECBackend::pipeline_state_t::CACHE_INVALID:
47 return lhs << "CACHE_INVALID";
48 default:
49 assert(0 == "invalid pipeline state");
50 }
51 return lhs; // unreachable
52}
53
54static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
55{
56 lhs << "[";
57 for (map<pg_shard_t, bufferlist>::const_iterator i = rhs.begin();
58 i != rhs.end();
59 ++i) {
60 if (i != rhs.begin())
61 lhs << ", ";
62 lhs << make_pair(i->first, i->second.length());
63 }
64 return lhs << "]";
65}
66
67static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
68{
69 lhs << "[";
70 for (map<int, bufferlist>::const_iterator i = rhs.begin();
71 i != rhs.end();
72 ++i) {
73 if (i != rhs.begin())
74 lhs << ", ";
75 lhs << make_pair(i->first, i->second.length());
76 }
77 return lhs << "]";
78}
79
80static ostream &operator<<(
81 ostream &lhs,
82 const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
83{
84 return lhs << "(" << rhs.get<0>() << ", "
85 << rhs.get<1>() << ", " << rhs.get<2>() << ")";
86}
87
88ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
89{
90 return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
91 << ", need=" << rhs.need
92 << ", want_attrs=" << rhs.want_attrs
93 << ")";
94}
95
96ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
97{
98 lhs << "read_result_t(r=" << rhs.r
99 << ", errors=" << rhs.errors;
100 if (rhs.attrs) {
101 lhs << ", attrs=" << rhs.attrs.get();
102 } else {
103 lhs << ", noattrs";
104 }
105 return lhs << ", returned=" << rhs.returned << ")";
106}
107
108ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
109{
110 lhs << "ReadOp(tid=" << rhs.tid;
111 if (rhs.op && rhs.op->get_req()) {
112 lhs << ", op=";
113 rhs.op->get_req()->print(lhs);
114 }
115 return lhs << ", to_read=" << rhs.to_read
116 << ", complete=" << rhs.complete
117 << ", priority=" << rhs.priority
118 << ", obj_to_source=" << rhs.obj_to_source
119 << ", source_to_obj=" << rhs.source_to_obj
120 << ", in_progress=" << rhs.in_progress << ")";
121}
122
123void ECBackend::ReadOp::dump(Formatter *f) const
124{
125 f->dump_unsigned("tid", tid);
126 if (op && op->get_req()) {
127 f->dump_stream("op") << *(op->get_req());
128 }
129 f->dump_stream("to_read") << to_read;
130 f->dump_stream("complete") << complete;
131 f->dump_int("priority", priority);
132 f->dump_stream("obj_to_source") << obj_to_source;
133 f->dump_stream("source_to_obj") << source_to_obj;
134 f->dump_stream("in_progress") << in_progress;
135}
136
137ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
138{
139 lhs << "Op(" << rhs.hoid
140 << " v=" << rhs.version
141 << " tt=" << rhs.trim_to
142 << " tid=" << rhs.tid
143 << " reqid=" << rhs.reqid;
144 if (rhs.client_op && rhs.client_op->get_req()) {
145 lhs << " client_op=";
146 rhs.client_op->get_req()->print(lhs);
147 }
148 lhs << " roll_forward_to=" << rhs.roll_forward_to
149 << " temp_added=" << rhs.temp_added
150 << " temp_cleared=" << rhs.temp_cleared
151 << " pending_read=" << rhs.pending_read
152 << " remote_read=" << rhs.remote_read
153 << " remote_read_result=" << rhs.remote_read_result
154 << " pending_apply=" << rhs.pending_apply
155 << " pending_commit=" << rhs.pending_commit
156 << " plan.to_read=" << rhs.plan.to_read
157 << " plan.will_write=" << rhs.plan.will_write
158 << ")";
159 return lhs;
160}
161
162ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
163{
164 return lhs << "RecoveryOp("
165 << "hoid=" << rhs.hoid
166 << " v=" << rhs.v
167 << " missing_on=" << rhs.missing_on
168 << " missing_on_shards=" << rhs.missing_on_shards
169 << " recovery_info=" << rhs.recovery_info
170 << " recovery_progress=" << rhs.recovery_progress
171 << " obc refcount=" << rhs.obc.use_count()
172 << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
173 << " waiting_on_pushes=" << rhs.waiting_on_pushes
174 << " extent_requested=" << rhs.extent_requested
175 << ")";
176}
177
178void ECBackend::RecoveryOp::dump(Formatter *f) const
179{
180 f->dump_stream("hoid") << hoid;
181 f->dump_stream("v") << v;
182 f->dump_stream("missing_on") << missing_on;
183 f->dump_stream("missing_on_shards") << missing_on_shards;
184 f->dump_stream("recovery_info") << recovery_info;
185 f->dump_stream("recovery_progress") << recovery_progress;
186 f->dump_stream("state") << tostr(state);
187 f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
188 f->dump_stream("extent_requested") << extent_requested;
189}
190
191ECBackend::ECBackend(
192 PGBackend::Listener *pg,
193 coll_t coll,
194 ObjectStore::CollectionHandle &ch,
195 ObjectStore *store,
196 CephContext *cct,
197 ErasureCodeInterfaceRef ec_impl,
198 uint64_t stripe_width)
199 : PGBackend(cct, pg, store, coll, ch),
200 ec_impl(ec_impl),
201 sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
202 assert((ec_impl->get_data_chunk_count() *
203 ec_impl->get_chunk_size(stripe_width)) == stripe_width);
204}
205
206PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
207{
208 return new ECRecoveryHandle;
209}
210
211void ECBackend::_failed_push(const hobject_t &hoid,
212 pair<RecoveryMessages *, ECBackend::read_result_t &> &in)
213{
214 ECBackend::read_result_t &res = in.second;
215 dout(10) << __func__ << ": Read error " << hoid << " r="
216 << res.r << " errors=" << res.errors << dendl;
217 dout(10) << __func__ << ": canceling recovery op for obj " << hoid
218 << dendl;
219 assert(recovery_ops.count(hoid));
b32b8144 220 eversion_t v = recovery_ops[hoid].v;
7c673cae
FG
221 recovery_ops.erase(hoid);
222
223 list<pg_shard_t> fl;
224 for (auto&& i : res.errors) {
225 fl.push_back(i.first);
226 }
227 get_parent()->failed_push(fl, hoid);
b32b8144
FG
228 get_parent()->backfill_add_missing(hoid, v);
229 get_parent()->finish_degraded_object(hoid);
7c673cae
FG
230}
231
232struct OnRecoveryReadComplete :
233 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
234 ECBackend *pg;
235 hobject_t hoid;
236 set<int> want;
237 OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid)
238 : pg(pg), hoid(hoid) {}
239 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
240 ECBackend::read_result_t &res = in.second;
241 if (!(res.r == 0 && res.errors.empty())) {
242 pg->_failed_push(hoid, in);
243 return;
244 }
245 assert(res.returned.size() == 1);
246 pg->handle_recovery_read_complete(
247 hoid,
248 res.returned.back(),
249 res.attrs,
250 in.first);
251 }
252};
253
254struct RecoveryMessages {
255 map<hobject_t,
256 ECBackend::read_request_t> reads;
257 void read(
258 ECBackend *ec,
259 const hobject_t &hoid, uint64_t off, uint64_t len,
260 const set<pg_shard_t> &need,
261 bool attrs) {
262 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
263 to_read.push_back(boost::make_tuple(off, len, 0));
264 assert(!reads.count(hoid));
265 reads.insert(
266 make_pair(
267 hoid,
268 ECBackend::read_request_t(
269 to_read,
270 need,
271 attrs,
272 new OnRecoveryReadComplete(
273 ec,
274 hoid))));
275 }
276
277 map<pg_shard_t, vector<PushOp> > pushes;
278 map<pg_shard_t, vector<PushReplyOp> > push_replies;
279 ObjectStore::Transaction t;
280 RecoveryMessages() {}
281 ~RecoveryMessages(){}
282};
283
284void ECBackend::handle_recovery_push(
285 const PushOp &op,
286 RecoveryMessages *m)
287{
288 ostringstream ss;
289 if (get_parent()->check_failsafe_full(ss)) {
290 dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl;
291 ceph_abort();
292 }
293
294 bool oneshot = op.before_progress.first && op.after_progress.data_complete;
295 ghobject_t tobj;
296 if (oneshot) {
297 tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
298 get_parent()->whoami_shard().shard);
299 } else {
300 tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
301 op.version),
302 ghobject_t::NO_GEN,
303 get_parent()->whoami_shard().shard);
304 if (op.before_progress.first) {
305 dout(10) << __func__ << ": Adding oid "
306 << tobj.hobj << " in the temp collection" << dendl;
307 add_temp_obj(tobj.hobj);
308 }
309 }
310
311 if (op.before_progress.first) {
312 m->t.remove(coll, tobj);
313 m->t.touch(coll, tobj);
314 }
315
316 if (!op.data_included.empty()) {
317 uint64_t start = op.data_included.range_start();
318 uint64_t end = op.data_included.range_end();
319 assert(op.data.length() == (end - start));
320
321 m->t.write(
322 coll,
323 tobj,
324 start,
325 op.data.length(),
326 op.data);
327 } else {
328 assert(op.data.length() == 0);
329 }
330
331 if (op.before_progress.first) {
332 assert(op.attrset.count(string("_")));
333 m->t.setattrs(
334 coll,
335 tobj,
336 op.attrset);
337 }
338
339 if (op.after_progress.data_complete && !oneshot) {
340 dout(10) << __func__ << ": Removing oid "
341 << tobj.hobj << " from the temp collection" << dendl;
342 clear_temp_obj(tobj.hobj);
343 m->t.remove(coll, ghobject_t(
344 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
345 m->t.collection_move_rename(
346 coll, tobj,
347 coll, ghobject_t(
348 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
349 }
350 if (op.after_progress.data_complete) {
351 if ((get_parent()->pgb_is_primary())) {
352 assert(recovery_ops.count(op.soid));
353 assert(recovery_ops[op.soid].obc);
354 get_parent()->on_local_recover(
355 op.soid,
356 op.recovery_info,
357 recovery_ops[op.soid].obc,
c07f9fc5 358 false,
7c673cae
FG
359 &m->t);
360 } else {
361 get_parent()->on_local_recover(
362 op.soid,
363 op.recovery_info,
364 ObjectContextRef(),
c07f9fc5 365 false,
7c673cae
FG
366 &m->t);
367 }
368 }
369 m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
370 m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
371}
372
373void ECBackend::handle_recovery_push_reply(
374 const PushReplyOp &op,
375 pg_shard_t from,
376 RecoveryMessages *m)
377{
378 if (!recovery_ops.count(op.soid))
379 return;
380 RecoveryOp &rop = recovery_ops[op.soid];
381 assert(rop.waiting_on_pushes.count(from));
382 rop.waiting_on_pushes.erase(from);
383 continue_recovery_op(rop, m);
384}
385
386void ECBackend::handle_recovery_read_complete(
387 const hobject_t &hoid,
388 boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
389 boost::optional<map<string, bufferlist> > attrs,
390 RecoveryMessages *m)
391{
392 dout(10) << __func__ << ": returned " << hoid << " "
393 << "(" << to_read.get<0>()
394 << ", " << to_read.get<1>()
395 << ", " << to_read.get<2>()
396 << ")"
397 << dendl;
398 assert(recovery_ops.count(hoid));
399 RecoveryOp &op = recovery_ops[hoid];
400 assert(op.returned_data.empty());
401 map<int, bufferlist*> target;
402 for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
403 i != op.missing_on_shards.end();
404 ++i) {
405 target[*i] = &(op.returned_data[*i]);
406 }
407 map<int, bufferlist> from;
408 for(map<pg_shard_t, bufferlist>::iterator i = to_read.get<2>().begin();
409 i != to_read.get<2>().end();
410 ++i) {
411 from[i->first.shard].claim(i->second);
412 }
413 dout(10) << __func__ << ": " << from << dendl;
414 int r = ECUtil::decode(sinfo, ec_impl, from, target);
415 assert(r == 0);
416 if (attrs) {
417 op.xattrs.swap(*attrs);
418
419 if (!op.obc) {
420 // attrs only reference the origin bufferlist (decode from
421 // ECSubReadReply message) whose size is much greater than attrs
422 // in recovery. If obc cache it (get_obc maybe cache the attr),
423 // this causes the whole origin bufferlist would not be free
424 // until obc is evicted from obc cache. So rebuild the
425 // bufferlist before cache it.
426 for (map<string, bufferlist>::iterator it = op.xattrs.begin();
427 it != op.xattrs.end();
428 ++it) {
429 it->second.rebuild();
430 }
431 // Need to remove ECUtil::get_hinfo_key() since it should not leak out
432 // of the backend (see bug #12983)
433 map<string, bufferlist> sanitized_attrs(op.xattrs);
434 sanitized_attrs.erase(ECUtil::get_hinfo_key());
435 op.obc = get_parent()->get_obc(hoid, sanitized_attrs);
436 assert(op.obc);
437 op.recovery_info.size = op.obc->obs.oi.size;
438 op.recovery_info.oi = op.obc->obs.oi;
439 }
440
441 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
442 if (op.obc->obs.oi.size > 0) {
443 assert(op.xattrs.count(ECUtil::get_hinfo_key()));
444 bufferlist::iterator bp = op.xattrs[ECUtil::get_hinfo_key()].begin();
445 ::decode(hinfo, bp);
446 }
447 op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
448 }
449 assert(op.xattrs.size());
450 assert(op.obc);
451 continue_recovery_op(op, m);
452}
453
454struct SendPushReplies : public Context {
455 PGBackend::Listener *l;
456 epoch_t epoch;
457 map<int, MOSDPGPushReply*> replies;
458 SendPushReplies(
459 PGBackend::Listener *l,
460 epoch_t epoch,
461 map<int, MOSDPGPushReply*> &in) : l(l), epoch(epoch) {
462 replies.swap(in);
463 }
464 void finish(int) override {
465 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
466 i != replies.end();
467 ++i) {
468 l->send_message_osd_cluster(i->first, i->second, epoch);
469 }
470 replies.clear();
471 }
472 ~SendPushReplies() override {
473 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
474 i != replies.end();
475 ++i) {
476 i->second->put();
477 }
478 replies.clear();
479 }
480};
481
482void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
483{
484 for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
485 i != m.pushes.end();
486 m.pushes.erase(i++)) {
487 MOSDPGPush *msg = new MOSDPGPush();
488 msg->set_priority(priority);
489 msg->map_epoch = get_parent()->get_epoch();
490 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
491 msg->from = get_parent()->whoami_shard();
492 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
493 msg->pushes.swap(i->second);
494 msg->compute_cost(cct);
495 get_parent()->send_message(
496 i->first.osd,
497 msg);
498 }
499 map<int, MOSDPGPushReply*> replies;
500 for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
501 m.push_replies.begin();
502 i != m.push_replies.end();
503 m.push_replies.erase(i++)) {
504 MOSDPGPushReply *msg = new MOSDPGPushReply();
505 msg->set_priority(priority);
506 msg->map_epoch = get_parent()->get_epoch();
507 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
508 msg->from = get_parent()->whoami_shard();
509 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
510 msg->replies.swap(i->second);
511 msg->compute_cost(cct);
512 replies.insert(make_pair(i->first.osd, msg));
513 }
514
515 if (!replies.empty()) {
516 (m.t).register_on_complete(
517 get_parent()->bless_context(
518 new SendPushReplies(
519 get_parent(),
520 get_parent()->get_epoch(),
521 replies)));
522 get_parent()->queue_transaction(std::move(m.t));
523 }
524
525 if (m.reads.empty())
526 return;
527 start_read_op(
528 priority,
529 m.reads,
530 OpRequestRef(),
531 false, true);
532}
533
534void ECBackend::continue_recovery_op(
535 RecoveryOp &op,
536 RecoveryMessages *m)
537{
538 dout(10) << __func__ << ": continuing " << op << dendl;
539 while (1) {
540 switch (op.state) {
541 case RecoveryOp::IDLE: {
542 // start read
543 op.state = RecoveryOp::READING;
544 assert(!op.recovery_progress.data_complete);
545 set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
546 uint64_t from = op.recovery_progress.data_recovered_to;
547 uint64_t amount = get_recovery_chunk_size();
548
549 if (op.recovery_progress.first && op.obc) {
550 /* We've got the attrs and the hinfo, might as well use them */
551 op.hinfo = get_hash_info(op.hoid);
552 assert(op.hinfo);
553 op.xattrs = op.obc->attr_cache;
554 ::encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
555 }
556
557 set<pg_shard_t> to_read;
558 int r = get_min_avail_to_read_shards(
559 op.hoid, want, true, false, &to_read);
560 if (r != 0) {
561 // we must have lost a recovery source
562 assert(!op.recovery_progress.first);
563 dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
564 << dendl;
565 get_parent()->cancel_pull(op.hoid);
566 recovery_ops.erase(op.hoid);
567 return;
568 }
569 m->read(
570 this,
571 op.hoid,
572 op.recovery_progress.data_recovered_to,
573 amount,
574 to_read,
575 op.recovery_progress.first && !op.obc);
576 op.extent_requested = make_pair(
577 from,
578 amount);
579 dout(10) << __func__ << ": IDLE return " << op << dendl;
580 return;
581 }
582 case RecoveryOp::READING: {
583 // read completed, start write
584 assert(op.xattrs.size());
585 assert(op.returned_data.size());
586 op.state = RecoveryOp::WRITING;
587 ObjectRecoveryProgress after_progress = op.recovery_progress;
588 after_progress.data_recovered_to += op.extent_requested.second;
589 after_progress.first = false;
590 if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
591 after_progress.data_recovered_to =
592 sinfo.logical_to_next_stripe_offset(
593 op.obc->obs.oi.size);
594 after_progress.data_complete = true;
595 }
596 for (set<pg_shard_t>::iterator mi = op.missing_on.begin();
597 mi != op.missing_on.end();
598 ++mi) {
599 assert(op.returned_data.count(mi->shard));
600 m->pushes[*mi].push_back(PushOp());
601 PushOp &pop = m->pushes[*mi].back();
602 pop.soid = op.hoid;
603 pop.version = op.v;
604 pop.data = op.returned_data[mi->shard];
605 dout(10) << __func__ << ": before_progress=" << op.recovery_progress
606 << ", after_progress=" << after_progress
607 << ", pop.data.length()=" << pop.data.length()
608 << ", size=" << op.obc->obs.oi.size << dendl;
609 assert(
610 pop.data.length() ==
611 sinfo.aligned_logical_offset_to_chunk_offset(
612 after_progress.data_recovered_to -
613 op.recovery_progress.data_recovered_to)
614 );
615 if (pop.data.length())
616 pop.data_included.insert(
617 sinfo.aligned_logical_offset_to_chunk_offset(
618 op.recovery_progress.data_recovered_to),
619 pop.data.length()
620 );
621 if (op.recovery_progress.first) {
622 pop.attrset = op.xattrs;
623 }
624 pop.recovery_info = op.recovery_info;
625 pop.before_progress = op.recovery_progress;
626 pop.after_progress = after_progress;
627 if (*mi != get_parent()->primary_shard())
628 get_parent()->begin_peer_recover(
629 *mi,
630 op.hoid);
631 }
632 op.returned_data.clear();
633 op.waiting_on_pushes = op.missing_on;
634 op.recovery_progress = after_progress;
635 dout(10) << __func__ << ": READING return " << op << dendl;
636 return;
637 }
638 case RecoveryOp::WRITING: {
639 if (op.waiting_on_pushes.empty()) {
640 if (op.recovery_progress.data_complete) {
641 op.state = RecoveryOp::COMPLETE;
642 for (set<pg_shard_t>::iterator i = op.missing_on.begin();
643 i != op.missing_on.end();
644 ++i) {
645 if (*i != get_parent()->primary_shard()) {
646 dout(10) << __func__ << ": on_peer_recover on " << *i
647 << ", obj " << op.hoid << dendl;
648 get_parent()->on_peer_recover(
649 *i,
650 op.hoid,
651 op.recovery_info);
652 }
653 }
654 object_stat_sum_t stat;
655 stat.num_bytes_recovered = op.recovery_info.size;
656 stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
657 stat.num_objects_recovered = 1;
c07f9fc5 658 get_parent()->on_global_recover(op.hoid, stat, false);
7c673cae
FG
659 dout(10) << __func__ << ": WRITING return " << op << dendl;
660 recovery_ops.erase(op.hoid);
661 return;
662 } else {
663 op.state = RecoveryOp::IDLE;
664 dout(10) << __func__ << ": WRITING continue " << op << dendl;
665 continue;
666 }
667 }
668 return;
669 }
670 // should never be called once complete
671 case RecoveryOp::COMPLETE:
672 default: {
673 ceph_abort();
674 };
675 }
676 }
677}
678
679void ECBackend::run_recovery_op(
680 RecoveryHandle *_h,
681 int priority)
682{
683 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
684 RecoveryMessages m;
685 for (list<RecoveryOp>::iterator i = h->ops.begin();
686 i != h->ops.end();
687 ++i) {
688 dout(10) << __func__ << ": starting " << *i << dendl;
689 assert(!recovery_ops.count(i->hoid));
690 RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
691 continue_recovery_op(op, &m);
692 }
c07f9fc5 693
7c673cae 694 dispatch_recovery_messages(m, priority);
c07f9fc5 695 send_recovery_deletes(priority, h->deletes);
7c673cae
FG
696 delete _h;
697}
698
224ce89b 699int ECBackend::recover_object(
7c673cae
FG
700 const hobject_t &hoid,
701 eversion_t v,
702 ObjectContextRef head,
703 ObjectContextRef obc,
704 RecoveryHandle *_h)
705{
706 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
707 h->ops.push_back(RecoveryOp());
708 h->ops.back().v = v;
709 h->ops.back().hoid = hoid;
710 h->ops.back().obc = obc;
711 h->ops.back().recovery_info.soid = hoid;
712 h->ops.back().recovery_info.version = v;
713 if (obc) {
714 h->ops.back().recovery_info.size = obc->obs.oi.size;
715 h->ops.back().recovery_info.oi = obc->obs.oi;
716 }
717 if (hoid.is_snap()) {
718 if (obc) {
719 assert(obc->ssc);
720 h->ops.back().recovery_info.ss = obc->ssc->snapset;
721 } else if (head) {
722 assert(head->ssc);
723 h->ops.back().recovery_info.ss = head->ssc->snapset;
724 } else {
725 assert(0 == "neither obc nor head set for a snap object");
726 }
727 }
728 h->ops.back().recovery_progress.omap_complete = true;
729 for (set<pg_shard_t>::const_iterator i =
730 get_parent()->get_actingbackfill_shards().begin();
731 i != get_parent()->get_actingbackfill_shards().end();
732 ++i) {
733 dout(10) << "checking " << *i << dendl;
734 if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
735 h->ops.back().missing_on.insert(*i);
736 h->ops.back().missing_on_shards.insert(i->shard);
737 }
738 }
739 dout(10) << __func__ << ": built op " << h->ops.back() << dendl;
224ce89b 740 return 0;
7c673cae
FG
741}
742
743bool ECBackend::can_handle_while_inactive(
744 OpRequestRef _op)
745{
746 return false;
747}
748
c07f9fc5 749bool ECBackend::_handle_message(
7c673cae
FG
750 OpRequestRef _op)
751{
752 dout(10) << __func__ << ": " << *_op->get_req() << dendl;
753 int priority = _op->get_req()->get_priority();
754 switch (_op->get_req()->get_type()) {
755 case MSG_OSD_EC_WRITE: {
756 // NOTE: this is non-const because handle_sub_write modifies the embedded
757 // ObjectStore::Transaction in place (and then std::move's it). It does
758 // not conflict with ECSubWrite's operator<<.
759 MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
760 _op->get_nonconst_req());
761 handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
762 return true;
763 }
764 case MSG_OSD_EC_WRITE_REPLY: {
765 const MOSDECSubOpWriteReply *op = static_cast<const MOSDECSubOpWriteReply*>(
766 _op->get_req());
767 handle_sub_write_reply(op->op.from, op->op, _op->pg_trace);
768 return true;
769 }
770 case MSG_OSD_EC_READ: {
771 const MOSDECSubOpRead *op = static_cast<const MOSDECSubOpRead*>(_op->get_req());
772 MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
773 reply->pgid = get_parent()->primary_spg_t();
774 reply->map_epoch = get_parent()->get_epoch();
775 reply->min_epoch = get_parent()->get_interval_start_epoch();
776 handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace);
777 reply->trace = _op->pg_trace;
778 get_parent()->send_message_osd_cluster(
779 op->op.from.osd, reply, get_parent()->get_epoch());
780 return true;
781 }
782 case MSG_OSD_EC_READ_REPLY: {
783 // NOTE: this is non-const because handle_sub_read_reply steals resulting
784 // buffers. It does not conflict with ECSubReadReply operator<<.
785 MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
786 _op->get_nonconst_req());
787 RecoveryMessages rm;
788 handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
789 dispatch_recovery_messages(rm, priority);
790 return true;
791 }
792 case MSG_OSD_PG_PUSH: {
793 const MOSDPGPush *op = static_cast<const MOSDPGPush *>(_op->get_req());
794 RecoveryMessages rm;
795 for (vector<PushOp>::const_iterator i = op->pushes.begin();
796 i != op->pushes.end();
797 ++i) {
798 handle_recovery_push(*i, &rm);
799 }
800 dispatch_recovery_messages(rm, priority);
801 return true;
802 }
803 case MSG_OSD_PG_PUSH_REPLY: {
804 const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
805 _op->get_req());
806 RecoveryMessages rm;
807 for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
808 i != op->replies.end();
809 ++i) {
810 handle_recovery_push_reply(*i, op->from, &rm);
811 }
812 dispatch_recovery_messages(rm, priority);
813 return true;
814 }
815 default:
816 return false;
817 }
818 return false;
819}
820
821struct SubWriteCommitted : public Context {
822 ECBackend *pg;
823 OpRequestRef msg;
824 ceph_tid_t tid;
825 eversion_t version;
826 eversion_t last_complete;
827 const ZTracer::Trace trace;
828 SubWriteCommitted(
829 ECBackend *pg,
830 OpRequestRef msg,
831 ceph_tid_t tid,
832 eversion_t version,
833 eversion_t last_complete,
834 const ZTracer::Trace &trace)
835 : pg(pg), msg(msg), tid(tid),
836 version(version), last_complete(last_complete), trace(trace) {}
837 void finish(int) override {
838 if (msg)
839 msg->mark_event("sub_op_committed");
840 pg->sub_write_committed(tid, version, last_complete, trace);
841 }
842};
843void ECBackend::sub_write_committed(
844 ceph_tid_t tid, eversion_t version, eversion_t last_complete,
845 const ZTracer::Trace &trace) {
846 if (get_parent()->pgb_is_primary()) {
847 ECSubWriteReply reply;
848 reply.tid = tid;
849 reply.last_complete = last_complete;
850 reply.committed = true;
851 reply.from = get_parent()->whoami_shard();
852 handle_sub_write_reply(
853 get_parent()->whoami_shard(),
854 reply, trace);
855 } else {
856 get_parent()->update_last_complete_ondisk(last_complete);
857 MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
858 r->pgid = get_parent()->primary_spg_t();
859 r->map_epoch = get_parent()->get_epoch();
860 r->min_epoch = get_parent()->get_interval_start_epoch();
861 r->op.tid = tid;
862 r->op.last_complete = last_complete;
863 r->op.committed = true;
864 r->op.from = get_parent()->whoami_shard();
865 r->set_priority(CEPH_MSG_PRIO_HIGH);
866 r->trace = trace;
867 r->trace.event("sending sub op commit");
868 get_parent()->send_message_osd_cluster(
869 get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
870 }
871}
872
873struct SubWriteApplied : public Context {
874 ECBackend *pg;
875 OpRequestRef msg;
876 ceph_tid_t tid;
877 eversion_t version;
878 const ZTracer::Trace trace;
879 SubWriteApplied(
880 ECBackend *pg,
881 OpRequestRef msg,
882 ceph_tid_t tid,
883 eversion_t version,
884 const ZTracer::Trace &trace)
885 : pg(pg), msg(msg), tid(tid), version(version), trace(trace) {}
886 void finish(int) override {
887 if (msg)
888 msg->mark_event("sub_op_applied");
889 pg->sub_write_applied(tid, version, trace);
890 }
891};
892void ECBackend::sub_write_applied(
893 ceph_tid_t tid, eversion_t version,
894 const ZTracer::Trace &trace) {
895 parent->op_applied(version);
896 if (get_parent()->pgb_is_primary()) {
897 ECSubWriteReply reply;
898 reply.from = get_parent()->whoami_shard();
899 reply.tid = tid;
900 reply.applied = true;
901 handle_sub_write_reply(
902 get_parent()->whoami_shard(),
903 reply, trace);
904 } else {
905 MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
906 r->pgid = get_parent()->primary_spg_t();
907 r->map_epoch = get_parent()->get_epoch();
908 r->min_epoch = get_parent()->get_interval_start_epoch();
909 r->op.from = get_parent()->whoami_shard();
910 r->op.tid = tid;
911 r->op.applied = true;
912 r->set_priority(CEPH_MSG_PRIO_HIGH);
913 r->trace = trace;
914 r->trace.event("sending sub op apply");
915 get_parent()->send_message_osd_cluster(
916 get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
917 }
918}
919
920void ECBackend::handle_sub_write(
921 pg_shard_t from,
922 OpRequestRef msg,
923 ECSubWrite &op,
924 const ZTracer::Trace &trace,
925 Context *on_local_applied_sync)
926{
927 if (msg)
928 msg->mark_started();
929 trace.event("handle_sub_write");
930 assert(!get_parent()->get_log().get_missing().is_missing(op.soid));
931 if (!get_parent()->pgb_is_primary())
932 get_parent()->update_stats(op.stats);
933 ObjectStore::Transaction localt;
934 if (!op.temp_added.empty()) {
935 add_temp_objs(op.temp_added);
936 }
937 if (op.backfill) {
938 for (set<hobject_t>::iterator i = op.temp_removed.begin();
939 i != op.temp_removed.end();
940 ++i) {
941 dout(10) << __func__ << ": removing object " << *i
942 << " since we won't get the transaction" << dendl;
943 localt.remove(
944 coll,
945 ghobject_t(
946 *i,
947 ghobject_t::NO_GEN,
948 get_parent()->whoami_shard().shard));
949 }
950 }
951 clear_temp_objs(op.temp_removed);
952 get_parent()->log_operation(
953 op.log_entries,
954 op.updated_hit_set_history,
955 op.trim_to,
956 op.roll_forward_to,
957 !op.backfill,
958 localt);
959
960 PrimaryLogPG *_rPG = dynamic_cast<PrimaryLogPG *>(get_parent());
961 if (_rPG && !_rPG->is_undersized() &&
962 (unsigned)get_parent()->whoami_shard().shard >= ec_impl->get_data_chunk_count())
963 op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
964
965 if (on_local_applied_sync) {
966 dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync << dendl;
967 localt.register_on_applied_sync(on_local_applied_sync);
968 }
969 localt.register_on_commit(
970 get_parent()->bless_context(
971 new SubWriteCommitted(
972 this, msg, op.tid,
973 op.at_version,
974 get_parent()->get_info().last_complete, trace)));
975 localt.register_on_applied(
976 get_parent()->bless_context(
977 new SubWriteApplied(this, msg, op.tid, op.at_version, trace)));
978 vector<ObjectStore::Transaction> tls;
979 tls.reserve(2);
980 tls.push_back(std::move(op.t));
981 tls.push_back(std::move(localt));
982 get_parent()->queue_transactions(tls, msg);
983}
984
985void ECBackend::handle_sub_read(
986 pg_shard_t from,
987 const ECSubRead &op,
988 ECSubReadReply *reply,
989 const ZTracer::Trace &trace)
990{
991 trace.event("handle sub read");
992 shard_id_t shard = get_parent()->whoami_shard().shard;
993 for(auto i = op.to_read.begin();
994 i != op.to_read.end();
995 ++i) {
996 int r = 0;
997 ECUtil::HashInfoRef hinfo;
998 if (!get_parent()->get_pool().allows_ecoverwrites()) {
999 hinfo = get_hash_info(i->first);
1000 if (!hinfo) {
1001 r = -EIO;
c07f9fc5
FG
1002 get_parent()->clog_error() << "Corruption detected: object " << i->first
1003 << " is missing hash_info";
7c673cae
FG
1004 dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
1005 goto error;
1006 }
1007 }
1008 for (auto j = i->second.begin(); j != i->second.end(); ++j) {
1009 bufferlist bl;
1010 r = store->read(
1011 ch,
1012 ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1013 j->get<0>(),
1014 j->get<1>(),
224ce89b 1015 bl, j->get<2>());
7c673cae 1016 if (r < 0) {
c07f9fc5
FG
1017 get_parent()->clog_error() << "Error " << r
1018 << " reading object "
7c673cae
FG
1019 << i->first;
1020 dout(5) << __func__ << ": Error " << r
1021 << " reading " << i->first << dendl;
1022 goto error;
1023 } else {
1024 dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
1025 reply->buffers_read[i->first].push_back(
1026 make_pair(
1027 j->get<0>(),
1028 bl)
1029 );
1030 }
1031
1032 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1033 // This shows that we still need deep scrub because large enough files
1034 // are read in sections, so the digest check here won't be done here.
1035 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1036 // the state of our chunk in case other chunks could substitute.
1037 assert(hinfo->has_chunk_hash());
1038 if ((bl.length() == hinfo->get_total_chunk_size()) &&
1039 (j->get<0>() == 0)) {
1040 dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
1041 bufferhash h(-1);
1042 h << bl;
1043 if (h.digest() != hinfo->get_chunk_hash(shard)) {
c07f9fc5 1044 get_parent()->clog_error() << "Bad hash for " << i->first << " digest 0x"
7c673cae
FG
1045 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec;
1046 dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
1047 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
1048 r = -EIO;
1049 goto error;
1050 }
1051 }
1052 }
1053 }
1054 continue;
1055error:
1056 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1057 // the state of our chunk in case other chunks could substitute.
1058 reply->buffers_read.erase(i->first);
1059 reply->errors[i->first] = r;
1060 }
1061 for (set<hobject_t>::iterator i = op.attrs_to_read.begin();
1062 i != op.attrs_to_read.end();
1063 ++i) {
1064 dout(10) << __func__ << ": fulfilling attr request on "
1065 << *i << dendl;
1066 if (reply->errors.count(*i))
1067 continue;
1068 int r = store->getattrs(
1069 ch,
1070 ghobject_t(
1071 *i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1072 reply->attrs_read[*i]);
1073 if (r < 0) {
1074 reply->buffers_read.erase(*i);
1075 reply->errors[*i] = r;
1076 }
1077 }
1078 reply->from = get_parent()->whoami_shard();
1079 reply->tid = op.tid;
1080}
1081
1082void ECBackend::handle_sub_write_reply(
1083 pg_shard_t from,
1084 const ECSubWriteReply &op,
1085 const ZTracer::Trace &trace)
1086{
1087 map<ceph_tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
1088 assert(i != tid_to_op_map.end());
1089 if (op.committed) {
1090 trace.event("sub write committed");
1091 assert(i->second.pending_commit.count(from));
1092 i->second.pending_commit.erase(from);
1093 if (from != get_parent()->whoami_shard()) {
1094 get_parent()->update_peer_last_complete_ondisk(from, op.last_complete);
1095 }
1096 }
1097 if (op.applied) {
1098 trace.event("sub write applied");
1099 assert(i->second.pending_apply.count(from));
1100 i->second.pending_apply.erase(from);
1101 }
1102
1103 if (i->second.pending_apply.empty() && i->second.on_all_applied) {
1104 dout(10) << __func__ << " Calling on_all_applied on " << i->second << dendl;
1105 i->second.on_all_applied->complete(0);
1106 i->second.on_all_applied = 0;
1107 i->second.trace.event("ec write all applied");
1108 }
1109 if (i->second.pending_commit.empty() && i->second.on_all_commit) {
1110 dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
1111 i->second.on_all_commit->complete(0);
1112 i->second.on_all_commit = 0;
1113 i->second.trace.event("ec write all committed");
1114 }
1115 check_ops();
1116}
1117
1118void ECBackend::handle_sub_read_reply(
1119 pg_shard_t from,
1120 ECSubReadReply &op,
1121 RecoveryMessages *m,
1122 const ZTracer::Trace &trace)
1123{
1124 trace.event("ec sub read reply");
1125 dout(10) << __func__ << ": reply " << op << dendl;
1126 map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
1127 if (iter == tid_to_read_map.end()) {
1128 //canceled
1129 dout(20) << __func__ << ": dropped " << op << dendl;
1130 return;
1131 }
1132 ReadOp &rop = iter->second;
1133 for (auto i = op.buffers_read.begin();
1134 i != op.buffers_read.end();
1135 ++i) {
1136 assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer
1137 if (!rop.to_read.count(i->first)) {
1138 // We canceled this read! @see filter_read_op
1139 dout(20) << __func__ << " to_read skipping" << dendl;
1140 continue;
1141 }
1142 list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter =
1143 rop.to_read.find(i->first)->second.to_read.begin();
1144 list<
1145 boost::tuple<
1146 uint64_t, uint64_t, map<pg_shard_t, bufferlist> > >::iterator riter =
1147 rop.complete[i->first].returned.begin();
1148 for (list<pair<uint64_t, bufferlist> >::iterator j = i->second.begin();
1149 j != i->second.end();
1150 ++j, ++req_iter, ++riter) {
1151 assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
1152 assert(riter != rop.complete[i->first].returned.end());
1153 pair<uint64_t, uint64_t> adjusted =
1154 sinfo.aligned_offset_len_to_chunk(
1155 make_pair(req_iter->get<0>(), req_iter->get<1>()));
1156 assert(adjusted.first == j->first);
1157 riter->get<2>()[from].claim(j->second);
1158 }
1159 }
1160 for (auto i = op.attrs_read.begin();
1161 i != op.attrs_read.end();
1162 ++i) {
1163 assert(!op.errors.count(i->first)); // if read error better not have sent an attribute
1164 if (!rop.to_read.count(i->first)) {
1165 // We canceled this read! @see filter_read_op
1166 dout(20) << __func__ << " to_read skipping" << dendl;
1167 continue;
1168 }
1169 rop.complete[i->first].attrs = map<string, bufferlist>();
1170 (*(rop.complete[i->first].attrs)).swap(i->second);
1171 }
1172 for (auto i = op.errors.begin();
1173 i != op.errors.end();
1174 ++i) {
1175 rop.complete[i->first].errors.insert(
1176 make_pair(
1177 from,
1178 i->second));
1179 dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl;
1180 }
1181
1182 map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
1183 shard_to_read_map.find(from);
1184 assert(siter != shard_to_read_map.end());
1185 assert(siter->second.count(op.tid));
1186 siter->second.erase(op.tid);
1187
1188 assert(rop.in_progress.count(from));
1189 rop.in_progress.erase(from);
1190 unsigned is_complete = 0;
1191 // For redundant reads check for completion as each shard comes in,
1192 // or in a non-recovery read check for completion once all the shards read.
b32b8144 1193 if (rop.do_redundant_reads || rop.in_progress.empty()) {
7c673cae
FG
1194 for (map<hobject_t, read_result_t>::const_iterator iter =
1195 rop.complete.begin();
1196 iter != rop.complete.end();
1197 ++iter) {
1198 set<int> have;
1199 for (map<pg_shard_t, bufferlist>::const_iterator j =
1200 iter->second.returned.front().get<2>().begin();
1201 j != iter->second.returned.front().get<2>().end();
1202 ++j) {
1203 have.insert(j->first.shard);
1204 dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
1205 }
1206 set<int> want_to_read, dummy_minimum;
1207 get_want_to_read_shards(&want_to_read);
1208 int err;
1209 if ((err = ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum)) < 0) {
1210 dout(20) << __func__ << " minimum_to_decode failed" << dendl;
1211 if (rop.in_progress.empty()) {
1212 // If we don't have enough copies and we haven't sent reads for all shards
1213 // we can send the rest of the reads, if any.
1214 if (!rop.do_redundant_reads) {
1215 int r = send_all_remaining_reads(iter->first, rop);
1216 if (r == 0) {
1217 // We added to in_progress and not incrementing is_complete
1218 continue;
1219 }
1220 // Couldn't read any additional shards so handle as completed with errors
1221 }
c07f9fc5
FG
1222 // We don't want to confuse clients / RBD with objectstore error
1223 // values in particular ENOENT. We may have different error returns
1224 // from different shards, so we'll return minimum_to_decode() error
1225 // (usually EIO) to reader. It is likely an error here is due to a
1226 // damaged pg.
7c673cae
FG
1227 rop.complete[iter->first].r = err;
1228 ++is_complete;
1229 }
1230 } else {
1231 assert(rop.complete[iter->first].r == 0);
1232 if (!rop.complete[iter->first].errors.empty()) {
1233 if (cct->_conf->osd_read_ec_check_for_errors) {
1234 dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl;
1235 err = rop.complete[iter->first].errors.begin()->second;
1236 rop.complete[iter->first].r = err;
1237 } else {
c07f9fc5 1238 get_parent()->clog_warn() << "Error(s) ignored for "
7c673cae
FG
1239 << iter->first << " enough copies available";
1240 dout(10) << __func__ << " Error(s) ignored for " << iter->first
1241 << " enough copies available" << dendl;
1242 rop.complete[iter->first].errors.clear();
1243 }
1244 }
1245 ++is_complete;
1246 }
1247 }
1248 }
1249 if (rop.in_progress.empty() || is_complete == rop.complete.size()) {
1250 dout(20) << __func__ << " Complete: " << rop << dendl;
1251 rop.trace.event("ec read complete");
1252 complete_read_op(rop, m);
1253 } else {
1254 dout(10) << __func__ << " readop not complete: " << rop << dendl;
1255 }
1256}
1257
1258void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
1259{
1260 map<hobject_t, read_request_t>::iterator reqiter =
1261 rop.to_read.begin();
1262 map<hobject_t, read_result_t>::iterator resiter =
1263 rop.complete.begin();
1264 assert(rop.to_read.size() == rop.complete.size());
1265 for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
1266 if (reqiter->second.cb) {
1267 pair<RecoveryMessages *, read_result_t &> arg(
1268 m, resiter->second);
1269 reqiter->second.cb->complete(arg);
1270 reqiter->second.cb = NULL;
1271 }
1272 }
1273 tid_to_read_map.erase(rop.tid);
1274}
1275
1276struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
1277 ECBackend *ec;
1278 ceph_tid_t tid;
1279 FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
1280 void finish(ThreadPool::TPHandle &handle) override {
1281 auto ropiter = ec->tid_to_read_map.find(tid);
1282 assert(ropiter != ec->tid_to_read_map.end());
1283 int priority = ropiter->second.priority;
1284 RecoveryMessages rm;
1285 ec->complete_read_op(ropiter->second, &rm);
1286 ec->dispatch_recovery_messages(rm, priority);
1287 }
1288};
1289
1290void ECBackend::filter_read_op(
1291 const OSDMapRef& osdmap,
1292 ReadOp &op)
1293{
1294 set<hobject_t> to_cancel;
1295 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1296 i != op.source_to_obj.end();
1297 ++i) {
1298 if (osdmap->is_down(i->first.osd)) {
1299 to_cancel.insert(i->second.begin(), i->second.end());
1300 op.in_progress.erase(i->first);
1301 continue;
1302 }
1303 }
1304
1305 if (to_cancel.empty())
1306 return;
1307
1308 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1309 i != op.source_to_obj.end();
1310 ) {
1311 for (set<hobject_t>::iterator j = i->second.begin();
1312 j != i->second.end();
1313 ) {
1314 if (to_cancel.count(*j))
1315 i->second.erase(j++);
1316 else
1317 ++j;
1318 }
1319 if (i->second.empty()) {
1320 op.source_to_obj.erase(i++);
1321 } else {
1322 assert(!osdmap->is_down(i->first.osd));
1323 ++i;
1324 }
1325 }
1326
1327 for (set<hobject_t>::iterator i = to_cancel.begin();
1328 i != to_cancel.end();
1329 ++i) {
1330 get_parent()->cancel_pull(*i);
1331
1332 assert(op.to_read.count(*i));
1333 read_request_t &req = op.to_read.find(*i)->second;
1334 dout(10) << __func__ << ": canceling " << req
1335 << " for obj " << *i << dendl;
1336 assert(req.cb);
1337 delete req.cb;
1338 req.cb = NULL;
1339
1340 op.to_read.erase(*i);
1341 op.complete.erase(*i);
1342 recovery_ops.erase(*i);
1343 }
1344
1345 if (op.in_progress.empty()) {
1346 get_parent()->schedule_recovery_work(
1347 get_parent()->bless_gencontext(
1348 new FinishReadOp(this, op.tid)));
1349 }
1350}
1351
1352void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
1353{
1354 set<ceph_tid_t> tids_to_filter;
1355 for (map<pg_shard_t, set<ceph_tid_t> >::iterator
1356 i = shard_to_read_map.begin();
1357 i != shard_to_read_map.end();
1358 ) {
1359 if (osdmap->is_down(i->first.osd)) {
1360 tids_to_filter.insert(i->second.begin(), i->second.end());
1361 shard_to_read_map.erase(i++);
1362 } else {
1363 ++i;
1364 }
1365 }
1366 for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
1367 i != tids_to_filter.end();
1368 ++i) {
1369 map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
1370 assert(j != tid_to_read_map.end());
1371 filter_read_op(osdmap, j->second);
1372 }
1373}
1374
1375void ECBackend::on_change()
1376{
1377 dout(10) << __func__ << dendl;
1378
1379 completed_to = eversion_t();
1380 committed_to = eversion_t();
1381 pipeline_state.clear();
1382 waiting_reads.clear();
1383 waiting_state.clear();
1384 waiting_commit.clear();
1385 for (auto &&op: tid_to_op_map) {
1386 cache.release_write_pin(op.second.pin);
1387 }
1388 tid_to_op_map.clear();
1389
1390 for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
1391 i != tid_to_read_map.end();
1392 ++i) {
1393 dout(10) << __func__ << ": cancelling " << i->second << dendl;
1394 for (map<hobject_t, read_request_t>::iterator j =
1395 i->second.to_read.begin();
1396 j != i->second.to_read.end();
1397 ++j) {
1398 delete j->second.cb;
1399 j->second.cb = 0;
1400 }
1401 }
1402 tid_to_read_map.clear();
1403 in_progress_client_reads.clear();
1404 shard_to_read_map.clear();
1405 clear_recovery_state();
1406}
1407
1408void ECBackend::clear_recovery_state()
1409{
1410 recovery_ops.clear();
1411}
1412
1413void ECBackend::on_flushed()
1414{
1415}
1416
1417void ECBackend::dump_recovery_info(Formatter *f) const
1418{
1419 f->open_array_section("recovery_ops");
1420 for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
1421 i != recovery_ops.end();
1422 ++i) {
1423 f->open_object_section("op");
1424 i->second.dump(f);
1425 f->close_section();
1426 }
1427 f->close_section();
1428 f->open_array_section("read_ops");
1429 for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
1430 i != tid_to_read_map.end();
1431 ++i) {
1432 f->open_object_section("read_op");
1433 i->second.dump(f);
1434 f->close_section();
1435 }
1436 f->close_section();
1437}
1438
1439void ECBackend::submit_transaction(
1440 const hobject_t &hoid,
1441 const object_stat_sum_t &delta_stats,
1442 const eversion_t &at_version,
1443 PGTransactionUPtr &&t,
1444 const eversion_t &trim_to,
1445 const eversion_t &roll_forward_to,
1446 const vector<pg_log_entry_t> &log_entries,
1447 boost::optional<pg_hit_set_history_t> &hset_history,
1448 Context *on_local_applied_sync,
1449 Context *on_all_applied,
1450 Context *on_all_commit,
1451 ceph_tid_t tid,
1452 osd_reqid_t reqid,
1453 OpRequestRef client_op
1454 )
1455{
1456 assert(!tid_to_op_map.count(tid));
1457 Op *op = &(tid_to_op_map[tid]);
1458 op->hoid = hoid;
1459 op->delta_stats = delta_stats;
1460 op->version = at_version;
1461 op->trim_to = trim_to;
1462 op->roll_forward_to = MAX(roll_forward_to, committed_to);
1463 op->log_entries = log_entries;
1464 std::swap(op->updated_hit_set_history, hset_history);
1465 op->on_local_applied_sync = on_local_applied_sync;
1466 op->on_all_applied = on_all_applied;
1467 op->on_all_commit = on_all_commit;
1468 op->tid = tid;
1469 op->reqid = reqid;
1470 op->client_op = client_op;
1471 if (client_op)
1472 op->trace = client_op->pg_trace;
1473
1474 dout(10) << __func__ << ": op " << *op << " starting" << dendl;
1475 start_rmw(op, std::move(t));
1476 dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
1477}
1478
1479void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
1480 if (!waiting_state.empty()) {
1481 waiting_state.back().on_write.emplace_back(std::move(cb));
1482 } else if (!waiting_reads.empty()) {
1483 waiting_reads.back().on_write.emplace_back(std::move(cb));
1484 } else {
1485 // Nothing earlier in the pipeline, just call it
1486 cb();
1487 }
1488}
1489
b32b8144 1490void ECBackend::get_all_avail_shards(
7c673cae 1491 const hobject_t &hoid,
b32b8144
FG
1492 set<int> &have,
1493 map<shard_id_t, pg_shard_t> &shards,
1494 bool for_recovery)
7c673cae 1495{
7c673cae
FG
1496 for (set<pg_shard_t>::const_iterator i =
1497 get_parent()->get_acting_shards().begin();
1498 i != get_parent()->get_acting_shards().end();
1499 ++i) {
1500 dout(10) << __func__ << ": checking acting " << *i << dendl;
1501 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1502 if (!missing.is_missing(hoid)) {
1503 assert(!have.count(i->shard));
1504 have.insert(i->shard);
1505 assert(!shards.count(i->shard));
1506 shards.insert(make_pair(i->shard, *i));
1507 }
1508 }
1509
1510 if (for_recovery) {
1511 for (set<pg_shard_t>::const_iterator i =
1512 get_parent()->get_backfill_shards().begin();
1513 i != get_parent()->get_backfill_shards().end();
1514 ++i) {
1515 if (have.count(i->shard)) {
1516 assert(shards.count(i->shard));
1517 continue;
1518 }
1519 dout(10) << __func__ << ": checking backfill " << *i << dendl;
1520 assert(!shards.count(i->shard));
1521 const pg_info_t &info = get_parent()->get_shard_info(*i);
1522 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1523 if (hoid < info.last_backfill &&
1524 !missing.is_missing(hoid)) {
1525 have.insert(i->shard);
1526 shards.insert(make_pair(i->shard, *i));
1527 }
1528 }
1529
1530 map<hobject_t, set<pg_shard_t>>::const_iterator miter =
1531 get_parent()->get_missing_loc_shards().find(hoid);
1532 if (miter != get_parent()->get_missing_loc_shards().end()) {
1533 for (set<pg_shard_t>::iterator i = miter->second.begin();
1534 i != miter->second.end();
1535 ++i) {
1536 dout(10) << __func__ << ": checking missing_loc " << *i << dendl;
1537 auto m = get_parent()->maybe_get_shard_missing(*i);
1538 if (m) {
1539 assert(!(*m).is_missing(hoid));
1540 }
1541 have.insert(i->shard);
1542 shards.insert(make_pair(i->shard, *i));
1543 }
1544 }
1545 }
b32b8144
FG
1546}
1547
1548int ECBackend::get_min_avail_to_read_shards(
1549 const hobject_t &hoid,
1550 const set<int> &want,
1551 bool for_recovery,
1552 bool do_redundant_reads,
1553 set<pg_shard_t> *to_read)
1554{
1555 // Make sure we don't do redundant reads for recovery
1556 assert(!for_recovery || !do_redundant_reads);
1557
1558 set<int> have;
1559 map<shard_id_t, pg_shard_t> shards;
1560
1561 get_all_avail_shards(hoid, have, shards, for_recovery);
7c673cae
FG
1562
1563 set<int> need;
1564 int r = ec_impl->minimum_to_decode(want, have, &need);
1565 if (r < 0)
1566 return r;
1567
1568 if (do_redundant_reads) {
1569 need.swap(have);
1570 }
1571
1572 if (!to_read)
1573 return 0;
1574
1575 for (set<int>::iterator i = need.begin();
1576 i != need.end();
1577 ++i) {
1578 assert(shards.count(shard_id_t(*i)));
1579 to_read->insert(shards[shard_id_t(*i)]);
1580 }
1581 return 0;
1582}
1583
1584int ECBackend::get_remaining_shards(
1585 const hobject_t &hoid,
1586 const set<int> &avail,
b32b8144
FG
1587 set<pg_shard_t> *to_read,
1588 bool for_recovery)
7c673cae 1589{
b32b8144 1590 assert(to_read);
7c673cae 1591
b32b8144
FG
1592 set<int> have;
1593 map<shard_id_t, pg_shard_t> shards;
7c673cae 1594
b32b8144 1595 get_all_avail_shards(hoid, have, shards, for_recovery);
7c673cae 1596
b32b8144
FG
1597 for (set<int>::iterator i = have.begin();
1598 i != have.end();
7c673cae
FG
1599 ++i) {
1600 assert(shards.count(shard_id_t(*i)));
1601 if (avail.find(*i) == avail.end())
1602 to_read->insert(shards[shard_id_t(*i)]);
1603 }
1604 return 0;
1605}
1606
1607void ECBackend::start_read_op(
1608 int priority,
1609 map<hobject_t, read_request_t> &to_read,
1610 OpRequestRef _op,
1611 bool do_redundant_reads,
1612 bool for_recovery)
1613{
1614 ceph_tid_t tid = get_parent()->get_tid();
1615 assert(!tid_to_read_map.count(tid));
1616 auto &op = tid_to_read_map.emplace(
1617 tid,
1618 ReadOp(
1619 priority,
1620 tid,
1621 do_redundant_reads,
1622 for_recovery,
1623 _op,
1624 std::move(to_read))).first->second;
1625 dout(10) << __func__ << ": starting " << op << dendl;
1626 if (_op) {
1627 op.trace = _op->pg_trace;
1628 op.trace.event("start ec read");
1629 }
1630 do_read_op(op);
1631}
1632
1633void ECBackend::do_read_op(ReadOp &op)
1634{
1635 int priority = op.priority;
1636 ceph_tid_t tid = op.tid;
1637
1638 dout(10) << __func__ << ": starting read " << op << dendl;
1639
1640 map<pg_shard_t, ECSubRead> messages;
1641 for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
1642 i != op.to_read.end();
1643 ++i) {
1644 bool need_attrs = i->second.want_attrs;
1645 for (set<pg_shard_t>::const_iterator j = i->second.need.begin();
1646 j != i->second.need.end();
1647 ++j) {
1648 if (need_attrs) {
1649 messages[*j].attrs_to_read.insert(i->first);
1650 need_attrs = false;
1651 }
1652 op.obj_to_source[i->first].insert(*j);
1653 op.source_to_obj[*j].insert(i->first);
1654 }
1655 for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
1656 i->second.to_read.begin();
1657 j != i->second.to_read.end();
1658 ++j) {
1659 pair<uint64_t, uint64_t> chunk_off_len =
1660 sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
1661 for (set<pg_shard_t>::const_iterator k = i->second.need.begin();
1662 k != i->second.need.end();
1663 ++k) {
1664 messages[*k].to_read[i->first].push_back(
1665 boost::make_tuple(
1666 chunk_off_len.first,
1667 chunk_off_len.second,
1668 j->get<2>()));
1669 }
1670 assert(!need_attrs);
1671 }
1672 }
1673
1674 for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
1675 i != messages.end();
1676 ++i) {
1677 op.in_progress.insert(i->first);
1678 shard_to_read_map[i->first].insert(op.tid);
1679 i->second.tid = tid;
1680 MOSDECSubOpRead *msg = new MOSDECSubOpRead;
1681 msg->set_priority(priority);
1682 msg->pgid = spg_t(
1683 get_parent()->whoami_spg_t().pgid,
1684 i->first.shard);
1685 msg->map_epoch = get_parent()->get_epoch();
1686 msg->min_epoch = get_parent()->get_interval_start_epoch();
1687 msg->op = i->second;
1688 msg->op.from = get_parent()->whoami_shard();
1689 msg->op.tid = tid;
1690 if (op.trace) {
1691 // initialize a child span for this shard
1692 msg->trace.init("ec sub read", nullptr, &op.trace);
1693 msg->trace.keyval("shard", i->first.shard.id);
1694 }
1695 get_parent()->send_message_osd_cluster(
1696 i->first.osd,
1697 msg,
1698 get_parent()->get_epoch());
1699 }
1700 dout(10) << __func__ << ": started " << op << dendl;
1701}
1702
1703ECUtil::HashInfoRef ECBackend::get_hash_info(
1704 const hobject_t &hoid, bool checks, const map<string,bufferptr> *attrs)
1705{
1706 dout(10) << __func__ << ": Getting attr on " << hoid << dendl;
1707 ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid);
1708 if (!ref) {
1709 dout(10) << __func__ << ": not in cache " << hoid << dendl;
1710 struct stat st;
1711 int r = store->stat(
1712 ch,
1713 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1714 &st);
1715 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
1716 // XXX: What does it mean if there is no object on disk?
1717 if (r >= 0) {
1718 dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl;
1719 bufferlist bl;
1720 if (attrs) {
1721 map<string, bufferptr>::const_iterator k = attrs->find(ECUtil::get_hinfo_key());
1722 if (k == attrs->end()) {
1723 dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl;
1724 } else {
1725 bl.push_back(k->second);
1726 }
1727 } else {
1728 r = store->getattr(
1729 ch,
1730 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1731 ECUtil::get_hinfo_key(),
1732 bl);
1733 if (r < 0) {
1734 dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl;
1735 bl.clear(); // just in case
1736 }
1737 }
1738 if (bl.length() > 0) {
1739 bufferlist::iterator bp = bl.begin();
1740 ::decode(hinfo, bp);
1741 if (checks && hinfo.get_total_chunk_size() != (uint64_t)st.st_size) {
1742 dout(0) << __func__ << ": Mismatch of total_chunk_size "
1743 << hinfo.get_total_chunk_size() << dendl;
1744 return ECUtil::HashInfoRef();
1745 }
1746 } else if (st.st_size > 0) { // If empty object and no hinfo, create it
1747 return ECUtil::HashInfoRef();
1748 }
1749 }
1750 ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
1751 }
1752 return ref;
1753}
1754
1755void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
1756{
1757 assert(op);
1758
1759 op->plan = ECTransaction::get_write_plan(
1760 sinfo,
1761 std::move(t),
1762 [&](const hobject_t &i) {
1763 ECUtil::HashInfoRef ref = get_hash_info(i, false);
1764 if (!ref) {
1765 derr << __func__ << ": get_hash_info(" << i << ")"
1766 << " returned a null pointer and there is no "
1767 << " way to recover from such an error in this "
1768 << " context" << dendl;
1769 ceph_abort();
1770 }
1771 return ref;
1772 },
1773 get_parent()->get_dpp());
1774
1775 dout(10) << __func__ << ": " << *op << dendl;
1776
1777 waiting_state.push_back(*op);
1778 check_ops();
1779}
1780
1781bool ECBackend::try_state_to_reads()
1782{
1783 if (waiting_state.empty())
1784 return false;
1785
1786 Op *op = &(waiting_state.front());
1787 if (op->requires_rmw() && pipeline_state.cache_invalid()) {
1788 assert(get_parent()->get_pool().allows_ecoverwrites());
1789 dout(20) << __func__ << ": blocking " << *op
1790 << " because it requires an rmw and the cache is invalid "
1791 << pipeline_state
1792 << dendl;
1793 return false;
1794 }
1795
1796 if (op->invalidates_cache()) {
1797 dout(20) << __func__ << ": invalidating cache after this op"
1798 << dendl;
1799 pipeline_state.invalidate();
1800 op->using_cache = false;
1801 } else {
1802 op->using_cache = pipeline_state.caching_enabled();
1803 }
1804
1805 waiting_state.pop_front();
1806 waiting_reads.push_back(*op);
1807
1808 if (op->using_cache) {
1809 cache.open_write_pin(op->pin);
1810
1811 extent_set empty;
1812 for (auto &&hpair: op->plan.will_write) {
1813 auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
1814 const extent_set &to_read_plan =
1815 to_read_plan_iter == op->plan.to_read.end() ?
1816 empty :
1817 to_read_plan_iter->second;
1818
1819 extent_set remote_read = cache.reserve_extents_for_rmw(
1820 hpair.first,
1821 op->pin,
1822 hpair.second,
1823 to_read_plan);
1824
1825 extent_set pending_read = to_read_plan;
1826 pending_read.subtract(remote_read);
1827
1828 if (!remote_read.empty()) {
1829 op->remote_read[hpair.first] = std::move(remote_read);
1830 }
1831 if (!pending_read.empty()) {
1832 op->pending_read[hpair.first] = std::move(pending_read);
1833 }
1834 }
1835 } else {
1836 op->remote_read = op->plan.to_read;
1837 }
1838
1839 dout(10) << __func__ << ": " << *op << dendl;
1840
1841 if (!op->remote_read.empty()) {
1842 assert(get_parent()->get_pool().allows_ecoverwrites());
1843 objects_read_async_no_cache(
1844 op->remote_read,
1845 [this, op](map<hobject_t,pair<int, extent_map> > &&results) {
1846 for (auto &&i: results) {
1847 op->remote_read_result.emplace(i.first, i.second.second);
1848 }
1849 check_ops();
1850 });
1851 }
1852
1853 return true;
1854}
1855
1856bool ECBackend::try_reads_to_commit()
1857{
1858 if (waiting_reads.empty())
1859 return false;
1860 Op *op = &(waiting_reads.front());
1861 if (op->read_in_progress())
1862 return false;
1863 waiting_reads.pop_front();
1864 waiting_commit.push_back(*op);
1865
1866 dout(10) << __func__ << ": starting commit on " << *op << dendl;
1867 dout(20) << __func__ << ": " << cache << dendl;
1868
1869 get_parent()->apply_stats(
1870 op->hoid,
1871 op->delta_stats);
1872
1873 if (op->using_cache) {
1874 for (auto &&hpair: op->pending_read) {
1875 op->remote_read_result[hpair.first].insert(
1876 cache.get_remaining_extents_for_rmw(
1877 hpair.first,
1878 op->pin,
1879 hpair.second));
1880 }
1881 op->pending_read.clear();
1882 } else {
1883 assert(op->pending_read.empty());
1884 }
1885
1886 map<shard_id_t, ObjectStore::Transaction> trans;
1887 for (set<pg_shard_t>::const_iterator i =
1888 get_parent()->get_actingbackfill_shards().begin();
1889 i != get_parent()->get_actingbackfill_shards().end();
1890 ++i) {
1891 trans[i->shard];
1892 }
1893
1894 op->trace.event("start ec write");
1895
1896 map<hobject_t,extent_map> written;
1897 if (op->plan.t) {
1898 ECTransaction::generate_transactions(
1899 op->plan,
1900 ec_impl,
1901 get_parent()->get_info().pgid.pgid,
31f18b77 1902 (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
7c673cae
FG
1903 sinfo,
1904 op->remote_read_result,
1905 op->log_entries,
1906 &written,
1907 &trans,
1908 &(op->temp_added),
1909 &(op->temp_cleared),
1910 get_parent()->get_dpp());
1911 }
1912
1913 dout(20) << __func__ << ": " << cache << dendl;
1914 dout(20) << __func__ << ": written: " << written << dendl;
1915 dout(20) << __func__ << ": op: " << *op << dendl;
1916
1917 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1918 for (auto &&i: op->log_entries) {
1919 if (i.requires_kraken()) {
1920 derr << __func__ << ": log entry " << i << " requires kraken"
1921 << " but overwrites are not enabled!" << dendl;
1922 ceph_abort();
1923 }
1924 }
1925 }
1926
1927 map<hobject_t,extent_set> written_set;
1928 for (auto &&i: written) {
1929 written_set[i.first] = i.second.get_interval_set();
1930 }
1931 dout(20) << __func__ << ": written_set: " << written_set << dendl;
1932 assert(written_set == op->plan.will_write);
1933
1934 if (op->using_cache) {
1935 for (auto &&hpair: written) {
1936 dout(20) << __func__ << ": " << hpair << dendl;
1937 cache.present_rmw_update(hpair.first, op->pin, hpair.second);
1938 }
1939 }
1940 op->remote_read.clear();
1941 op->remote_read_result.clear();
1942
1943 dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
1944 ObjectStore::Transaction empty;
1945 bool should_write_local = false;
1946 ECSubWrite local_write_op;
1947 for (set<pg_shard_t>::const_iterator i =
1948 get_parent()->get_actingbackfill_shards().begin();
1949 i != get_parent()->get_actingbackfill_shards().end();
1950 ++i) {
1951 op->pending_apply.insert(*i);
1952 op->pending_commit.insert(*i);
1953 map<shard_id_t, ObjectStore::Transaction>::iterator iter =
1954 trans.find(i->shard);
1955 assert(iter != trans.end());
1956 bool should_send = get_parent()->should_send_op(*i, op->hoid);
1957 const pg_stat_t &stats =
1958 should_send ?
1959 get_info().stats :
1960 parent->get_shard_info().find(*i)->second.stats;
1961
1962 ECSubWrite sop(
1963 get_parent()->whoami_shard(),
1964 op->tid,
1965 op->reqid,
1966 op->hoid,
1967 stats,
1968 should_send ? iter->second : empty,
1969 op->version,
1970 op->trim_to,
1971 op->roll_forward_to,
1972 op->log_entries,
1973 op->updated_hit_set_history,
1974 op->temp_added,
1975 op->temp_cleared,
1976 !should_send);
1977
1978 ZTracer::Trace trace;
1979 if (op->trace) {
1980 // initialize a child span for this shard
1981 trace.init("ec sub write", nullptr, &op->trace);
1982 trace.keyval("shard", i->shard.id);
1983 }
1984
1985 if (*i == get_parent()->whoami_shard()) {
1986 should_write_local = true;
1987 local_write_op.claim(sop);
1988 } else {
1989 MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop);
1990 r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard);
1991 r->map_epoch = get_parent()->get_epoch();
1992 r->min_epoch = get_parent()->get_interval_start_epoch();
1993 r->trace = trace;
1994 get_parent()->send_message_osd_cluster(
1995 i->osd, r, get_parent()->get_epoch());
1996 }
1997 }
1998 if (should_write_local) {
1999 handle_sub_write(
2000 get_parent()->whoami_shard(),
2001 op->client_op,
2002 local_write_op,
2003 op->trace,
2004 op->on_local_applied_sync);
2005 op->on_local_applied_sync = 0;
2006 }
2007
2008 for (auto i = op->on_write.begin();
2009 i != op->on_write.end();
2010 op->on_write.erase(i++)) {
2011 (*i)();
2012 }
2013
2014 return true;
2015}
2016
2017bool ECBackend::try_finish_rmw()
2018{
2019 if (waiting_commit.empty())
2020 return false;
2021 Op *op = &(waiting_commit.front());
2022 if (op->write_in_progress())
2023 return false;
2024 waiting_commit.pop_front();
2025
2026 dout(10) << __func__ << ": " << *op << dendl;
2027 dout(20) << __func__ << ": " << cache << dendl;
2028
2029 if (op->roll_forward_to > completed_to)
2030 completed_to = op->roll_forward_to;
2031 if (op->version > committed_to)
2032 committed_to = op->version;
2033
31f18b77 2034 if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
7c673cae
FG
2035 if (op->version > get_parent()->get_log().get_can_rollback_to() &&
2036 waiting_reads.empty() &&
2037 waiting_commit.empty()) {
2038 // submit a dummy transaction to kick the rollforward
2039 auto tid = get_parent()->get_tid();
2040 Op *nop = &(tid_to_op_map[tid]);
2041 nop->hoid = op->hoid;
2042 nop->trim_to = op->trim_to;
2043 nop->roll_forward_to = op->version;
2044 nop->tid = tid;
2045 nop->reqid = op->reqid;
2046 waiting_reads.push_back(*nop);
2047 }
2048 }
2049
2050 if (op->using_cache) {
2051 cache.release_write_pin(op->pin);
2052 }
2053 tid_to_op_map.erase(op->tid);
2054
2055 if (waiting_reads.empty() &&
2056 waiting_commit.empty()) {
2057 pipeline_state.clear();
2058 dout(20) << __func__ << ": clearing pipeline_state "
2059 << pipeline_state
2060 << dendl;
2061 }
2062 return true;
2063}
2064
2065void ECBackend::check_ops()
2066{
2067 while (try_state_to_reads() ||
2068 try_reads_to_commit() ||
2069 try_finish_rmw());
2070}
2071
2072int ECBackend::objects_read_sync(
2073 const hobject_t &hoid,
2074 uint64_t off,
2075 uint64_t len,
2076 uint32_t op_flags,
2077 bufferlist *bl)
2078{
2079 return -EOPNOTSUPP;
2080}
2081
2082void ECBackend::objects_read_async(
2083 const hobject_t &hoid,
2084 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2085 pair<bufferlist*, Context*> > > &to_read,
2086 Context *on_complete,
2087 bool fast_read)
2088{
2089 map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
2090 reads;
2091
2092 uint32_t flags = 0;
2093 extent_set es;
2094 for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2095 pair<bufferlist*, Context*> > >::const_iterator i =
2096 to_read.begin();
2097 i != to_read.end();
2098 ++i) {
2099 pair<uint64_t, uint64_t> tmp =
2100 sinfo.offset_len_to_stripe_bounds(
2101 make_pair(i->first.get<0>(), i->first.get<1>()));
2102
2103 extent_set esnew;
2104 esnew.insert(tmp.first, tmp.second);
2105 es.union_of(esnew);
2106 flags |= i->first.get<2>();
2107 }
2108
2109 if (!es.empty()) {
2110 auto &offsets = reads[hoid];
2111 for (auto j = es.begin();
2112 j != es.end();
2113 ++j) {
2114 offsets.push_back(
2115 boost::make_tuple(
2116 j.get_start(),
2117 j.get_len(),
2118 flags));
2119 }
2120 }
2121
2122 struct cb {
2123 ECBackend *ec;
2124 hobject_t hoid;
2125 list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2126 pair<bufferlist*, Context*> > > to_read;
2127 unique_ptr<Context> on_complete;
2128 cb(const cb&) = delete;
2129 cb(cb &&) = default;
2130 cb(ECBackend *ec,
2131 const hobject_t &hoid,
2132 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2133 pair<bufferlist*, Context*> > > &to_read,
2134 Context *on_complete)
2135 : ec(ec),
2136 hoid(hoid),
2137 to_read(to_read),
2138 on_complete(on_complete) {}
2139 void operator()(map<hobject_t,pair<int, extent_map> > &&results) {
2140 auto dpp = ec->get_parent()->get_dpp();
2141 ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
2142 << dendl;
2143 ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
2144 << dendl;
2145
2146 auto &got = results[hoid];
2147
2148 int r = 0;
2149 for (auto &&read: to_read) {
2150 if (got.first < 0) {
2151 if (read.second.second) {
2152 read.second.second->complete(got.first);
2153 }
2154 if (r == 0)
2155 r = got.first;
2156 } else {
2157 assert(read.second.first);
2158 uint64_t offset = read.first.get<0>();
2159 uint64_t length = read.first.get<1>();
2160 auto range = got.second.get_containing_range(offset, length);
2161 assert(range.first != range.second);
2162 assert(range.first.get_off() <= offset);
2163 assert(
2164 (offset + length) <=
2165 (range.first.get_off() + range.first.get_len()));
2166 read.second.first->substr_of(
2167 range.first.get_val(),
2168 offset - range.first.get_off(),
2169 length);
2170 if (read.second.second) {
2171 read.second.second->complete(length);
2172 read.second.second = nullptr;
2173 }
2174 }
2175 }
2176 to_read.clear();
2177 if (on_complete) {
2178 on_complete.release()->complete(r);
2179 }
2180 }
2181 ~cb() {
2182 for (auto &&i: to_read) {
2183 delete i.second.second;
2184 }
2185 to_read.clear();
2186 }
2187 };
2188 objects_read_and_reconstruct(
2189 reads,
2190 fast_read,
2191 make_gen_lambda_context<
2192 map<hobject_t,pair<int, extent_map> > &&, cb>(
2193 cb(this,
2194 hoid,
2195 to_read,
2196 on_complete)));
2197}
2198
2199struct CallClientContexts :
2200 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
2201 hobject_t hoid;
2202 ECBackend *ec;
2203 ECBackend::ClientAsyncReadStatus *status;
2204 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
2205 CallClientContexts(
2206 hobject_t hoid,
2207 ECBackend *ec,
2208 ECBackend::ClientAsyncReadStatus *status,
2209 const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
2210 : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
2211 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
2212 ECBackend::read_result_t &res = in.second;
2213 extent_map result;
2214 if (res.r != 0)
2215 goto out;
2216 assert(res.returned.size() == to_read.size());
2217 assert(res.r == 0);
2218 assert(res.errors.empty());
2219 for (auto &&read: to_read) {
2220 pair<uint64_t, uint64_t> adjusted =
2221 ec->sinfo.offset_len_to_stripe_bounds(
2222 make_pair(read.get<0>(), read.get<1>()));
2223 assert(res.returned.front().get<0>() == adjusted.first &&
2224 res.returned.front().get<1>() == adjusted.second);
2225 map<int, bufferlist> to_decode;
2226 bufferlist bl;
2227 for (map<pg_shard_t, bufferlist>::iterator j =
2228 res.returned.front().get<2>().begin();
2229 j != res.returned.front().get<2>().end();
2230 ++j) {
2231 to_decode[j->first.shard].claim(j->second);
2232 }
2233 int r = ECUtil::decode(
2234 ec->sinfo,
2235 ec->ec_impl,
2236 to_decode,
2237 &bl);
2238 if (r < 0) {
2239 res.r = r;
2240 goto out;
2241 }
2242 bufferlist trimmed;
2243 trimmed.substr_of(
2244 bl,
2245 read.get<0>() - adjusted.first,
2246 MIN(read.get<1>(),
2247 bl.length() - (read.get<0>() - adjusted.first)));
2248 result.insert(
2249 read.get<0>(), trimmed.length(), std::move(trimmed));
2250 res.returned.pop_front();
2251 }
2252out:
2253 status->complete_object(hoid, res.r, std::move(result));
2254 ec->kick_reads();
2255 }
2256};
2257
2258void ECBackend::objects_read_and_reconstruct(
2259 const map<hobject_t,
2260 std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
2261 > &reads,
2262 bool fast_read,
2263 GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
2264{
2265 in_progress_client_reads.emplace_back(
2266 reads.size(), std::move(func));
2267 if (!reads.size()) {
2268 kick_reads();
2269 return;
2270 }
2271
2272 set<int> want_to_read;
2273 get_want_to_read_shards(&want_to_read);
2274
2275 map<hobject_t, read_request_t> for_read_op;
2276 for (auto &&to_read: reads) {
2277 set<pg_shard_t> shards;
2278 int r = get_min_avail_to_read_shards(
2279 to_read.first,
2280 want_to_read,
2281 false,
2282 fast_read,
2283 &shards);
2284 assert(r == 0);
2285
2286 CallClientContexts *c = new CallClientContexts(
2287 to_read.first,
2288 this,
2289 &(in_progress_client_reads.back()),
2290 to_read.second);
2291 for_read_op.insert(
2292 make_pair(
2293 to_read.first,
2294 read_request_t(
2295 to_read.second,
2296 shards,
2297 false,
2298 c)));
2299 }
2300
2301 start_read_op(
2302 CEPH_MSG_PRIO_DEFAULT,
2303 for_read_op,
2304 OpRequestRef(),
2305 fast_read, false);
2306 return;
2307}
2308
2309
2310int ECBackend::send_all_remaining_reads(
2311 const hobject_t &hoid,
2312 ReadOp &rop)
2313{
2314 set<int> already_read;
2315 const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
2316 for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
2317 already_read.insert(i->shard);
2318 dout(10) << __func__ << " have/error shards=" << already_read << dendl;
2319 set<pg_shard_t> shards;
b32b8144 2320 int r = get_remaining_shards(hoid, already_read, &shards, rop.for_recovery);
7c673cae
FG
2321 if (r)
2322 return r;
2323 if (shards.empty())
2324 return -EIO;
2325
2326 dout(10) << __func__ << " Read remaining shards " << shards << dendl;
2327
2328 // TODOSAM: this doesn't seem right
2329 list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
2330 rop.to_read.find(hoid)->second.to_read;
2331 GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
2332 rop.to_read.find(hoid)->second.cb;
2333
2334 map<hobject_t, read_request_t> for_read_op;
2335 for_read_op.insert(
2336 make_pair(
2337 hoid,
2338 read_request_t(
2339 offsets,
2340 shards,
2341 false,
2342 c)));
2343
2344 rop.to_read.swap(for_read_op);
2345 do_read_op(rop);
2346 return 0;
2347}
2348
2349int ECBackend::objects_get_attrs(
2350 const hobject_t &hoid,
2351 map<string, bufferlist> *out)
2352{
2353 int r = store->getattrs(
2354 ch,
2355 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2356 *out);
2357 if (r < 0)
2358 return r;
2359
2360 for (map<string, bufferlist>::iterator i = out->begin();
2361 i != out->end();
2362 ) {
2363 if (ECUtil::is_hinfo_key_string(i->first))
2364 out->erase(i++);
2365 else
2366 ++i;
2367 }
2368 return r;
2369}
2370
2371void ECBackend::rollback_append(
2372 const hobject_t &hoid,
2373 uint64_t old_size,
2374 ObjectStore::Transaction *t)
2375{
2376 assert(old_size % sinfo.get_stripe_width() == 0);
2377 t->truncate(
2378 coll,
2379 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2380 sinfo.aligned_logical_offset_to_chunk_offset(
2381 old_size));
2382}
2383
2384void ECBackend::be_deep_scrub(
2385 const hobject_t &poid,
2386 uint32_t seed,
2387 ScrubMap::object &o,
2388 ThreadPool::TPHandle &handle) {
2389 bufferhash h(-1); // we always used -1
2390 int r;
2391 uint64_t stride = cct->_conf->osd_deep_scrub_stride;
2392 if (stride % sinfo.get_chunk_size())
2393 stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
2394 uint64_t pos = 0;
2395
2396 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
2397
2398 while (true) {
2399 bufferlist bl;
2400 handle.reset_tp_timeout();
2401 r = store->read(
2402 ch,
2403 ghobject_t(
2404 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2405 pos,
2406 stride, bl,
224ce89b 2407 fadvise_flags);
7c673cae
FG
2408 if (r < 0)
2409 break;
2410 if (bl.length() % sinfo.get_chunk_size()) {
2411 r = -EIO;
2412 break;
2413 }
2414 pos += r;
2415 h << bl;
2416 if ((unsigned)r < stride)
2417 break;
2418 }
2419
2420 if (r == -EIO) {
2421 dout(0) << "_scan_list " << poid << " got "
2422 << r << " on read, read_error" << dendl;
2423 o.read_error = true;
2424 return;
2425 }
2426
2427 ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
2428 if (!hinfo) {
2429 dout(0) << "_scan_list " << poid << " could not retrieve hash info" << dendl;
2430 o.read_error = true;
2431 o.digest_present = false;
2432 return;
2433 } else {
2434 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2435 assert(hinfo->has_chunk_hash());
2436 if (hinfo->get_total_chunk_size() != pos) {
2437 dout(0) << "_scan_list " << poid << " got incorrect size on read" << dendl;
2438 o.ec_size_mismatch = true;
2439 return;
2440 }
2441
2442 if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) {
2443 dout(0) << "_scan_list " << poid << " got incorrect hash on read" << dendl;
2444 o.ec_hash_mismatch = true;
2445 return;
2446 }
2447
2448 /* We checked above that we match our own stored hash. We cannot
2449 * send a hash of the actual object, so instead we simply send
2450 * our locally stored hash of shard 0 on the assumption that if
2451 * we match our chunk hash and our recollection of the hash for
2452 * chunk 0 matches that of our peers, there is likely no corruption.
2453 */
2454 o.digest = hinfo->get_chunk_hash(0);
2455 o.digest_present = true;
2456 } else {
2457 /* Hack! We must be using partial overwrites, and partial overwrites
2458 * don't support deep-scrub yet
2459 */
2460 o.digest = 0;
2461 o.digest_present = true;
2462 }
2463 }
2464
2465 o.omap_digest = seed;
2466 o.omap_digest_present = true;
2467}