]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/ECBackend.cc
update sources to v12.1.2
[ceph.git] / ceph / src / osd / ECBackend.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <iostream>
16#include <sstream>
17
18#include "ECBackend.h"
19#include "messages/MOSDPGPush.h"
20#include "messages/MOSDPGPushReply.h"
21#include "messages/MOSDECSubOpWrite.h"
22#include "messages/MOSDECSubOpWriteReply.h"
23#include "messages/MOSDECSubOpRead.h"
24#include "messages/MOSDECSubOpReadReply.h"
25#include "ECMsgTypes.h"
26
27#include "PrimaryLogPG.h"
28
29#define dout_context cct
30#define dout_subsys ceph_subsys_osd
31#define DOUT_PREFIX_ARGS this
32#undef dout_prefix
33#define dout_prefix _prefix(_dout, this)
34static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
35 return *_dout << pgb->get_parent()->gen_dbg_prefix();
36}
37
38struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
39 list<ECBackend::RecoveryOp> ops;
40};
41
42ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
43 switch (rhs.pipeline_state) {
44 case ECBackend::pipeline_state_t::CACHE_VALID:
45 return lhs << "CACHE_VALID";
46 case ECBackend::pipeline_state_t::CACHE_INVALID:
47 return lhs << "CACHE_INVALID";
48 default:
49 assert(0 == "invalid pipeline state");
50 }
51 return lhs; // unreachable
52}
53
54static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
55{
56 lhs << "[";
57 for (map<pg_shard_t, bufferlist>::const_iterator i = rhs.begin();
58 i != rhs.end();
59 ++i) {
60 if (i != rhs.begin())
61 lhs << ", ";
62 lhs << make_pair(i->first, i->second.length());
63 }
64 return lhs << "]";
65}
66
67static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
68{
69 lhs << "[";
70 for (map<int, bufferlist>::const_iterator i = rhs.begin();
71 i != rhs.end();
72 ++i) {
73 if (i != rhs.begin())
74 lhs << ", ";
75 lhs << make_pair(i->first, i->second.length());
76 }
77 return lhs << "]";
78}
79
80static ostream &operator<<(
81 ostream &lhs,
82 const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
83{
84 return lhs << "(" << rhs.get<0>() << ", "
85 << rhs.get<1>() << ", " << rhs.get<2>() << ")";
86}
87
88ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
89{
90 return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
91 << ", need=" << rhs.need
92 << ", want_attrs=" << rhs.want_attrs
93 << ")";
94}
95
96ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
97{
98 lhs << "read_result_t(r=" << rhs.r
99 << ", errors=" << rhs.errors;
100 if (rhs.attrs) {
101 lhs << ", attrs=" << rhs.attrs.get();
102 } else {
103 lhs << ", noattrs";
104 }
105 return lhs << ", returned=" << rhs.returned << ")";
106}
107
108ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
109{
110 lhs << "ReadOp(tid=" << rhs.tid;
111 if (rhs.op && rhs.op->get_req()) {
112 lhs << ", op=";
113 rhs.op->get_req()->print(lhs);
114 }
115 return lhs << ", to_read=" << rhs.to_read
116 << ", complete=" << rhs.complete
117 << ", priority=" << rhs.priority
118 << ", obj_to_source=" << rhs.obj_to_source
119 << ", source_to_obj=" << rhs.source_to_obj
120 << ", in_progress=" << rhs.in_progress << ")";
121}
122
123void ECBackend::ReadOp::dump(Formatter *f) const
124{
125 f->dump_unsigned("tid", tid);
126 if (op && op->get_req()) {
127 f->dump_stream("op") << *(op->get_req());
128 }
129 f->dump_stream("to_read") << to_read;
130 f->dump_stream("complete") << complete;
131 f->dump_int("priority", priority);
132 f->dump_stream("obj_to_source") << obj_to_source;
133 f->dump_stream("source_to_obj") << source_to_obj;
134 f->dump_stream("in_progress") << in_progress;
135}
136
137ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
138{
139 lhs << "Op(" << rhs.hoid
140 << " v=" << rhs.version
141 << " tt=" << rhs.trim_to
142 << " tid=" << rhs.tid
143 << " reqid=" << rhs.reqid;
144 if (rhs.client_op && rhs.client_op->get_req()) {
145 lhs << " client_op=";
146 rhs.client_op->get_req()->print(lhs);
147 }
148 lhs << " roll_forward_to=" << rhs.roll_forward_to
149 << " temp_added=" << rhs.temp_added
150 << " temp_cleared=" << rhs.temp_cleared
151 << " pending_read=" << rhs.pending_read
152 << " remote_read=" << rhs.remote_read
153 << " remote_read_result=" << rhs.remote_read_result
154 << " pending_apply=" << rhs.pending_apply
155 << " pending_commit=" << rhs.pending_commit
156 << " plan.to_read=" << rhs.plan.to_read
157 << " plan.will_write=" << rhs.plan.will_write
158 << ")";
159 return lhs;
160}
161
162ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
163{
164 return lhs << "RecoveryOp("
165 << "hoid=" << rhs.hoid
166 << " v=" << rhs.v
167 << " missing_on=" << rhs.missing_on
168 << " missing_on_shards=" << rhs.missing_on_shards
169 << " recovery_info=" << rhs.recovery_info
170 << " recovery_progress=" << rhs.recovery_progress
171 << " obc refcount=" << rhs.obc.use_count()
172 << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
173 << " waiting_on_pushes=" << rhs.waiting_on_pushes
174 << " extent_requested=" << rhs.extent_requested
175 << ")";
176}
177
178void ECBackend::RecoveryOp::dump(Formatter *f) const
179{
180 f->dump_stream("hoid") << hoid;
181 f->dump_stream("v") << v;
182 f->dump_stream("missing_on") << missing_on;
183 f->dump_stream("missing_on_shards") << missing_on_shards;
184 f->dump_stream("recovery_info") << recovery_info;
185 f->dump_stream("recovery_progress") << recovery_progress;
186 f->dump_stream("state") << tostr(state);
187 f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
188 f->dump_stream("extent_requested") << extent_requested;
189}
190
191ECBackend::ECBackend(
192 PGBackend::Listener *pg,
193 coll_t coll,
194 ObjectStore::CollectionHandle &ch,
195 ObjectStore *store,
196 CephContext *cct,
197 ErasureCodeInterfaceRef ec_impl,
198 uint64_t stripe_width)
199 : PGBackend(cct, pg, store, coll, ch),
200 ec_impl(ec_impl),
201 sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
202 assert((ec_impl->get_data_chunk_count() *
203 ec_impl->get_chunk_size(stripe_width)) == stripe_width);
204}
205
206PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
207{
208 return new ECRecoveryHandle;
209}
210
211void ECBackend::_failed_push(const hobject_t &hoid,
212 pair<RecoveryMessages *, ECBackend::read_result_t &> &in)
213{
214 ECBackend::read_result_t &res = in.second;
215 dout(10) << __func__ << ": Read error " << hoid << " r="
216 << res.r << " errors=" << res.errors << dendl;
217 dout(10) << __func__ << ": canceling recovery op for obj " << hoid
218 << dendl;
219 assert(recovery_ops.count(hoid));
220 recovery_ops.erase(hoid);
221
222 list<pg_shard_t> fl;
223 for (auto&& i : res.errors) {
224 fl.push_back(i.first);
225 }
226 get_parent()->failed_push(fl, hoid);
227}
228
229struct OnRecoveryReadComplete :
230 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
231 ECBackend *pg;
232 hobject_t hoid;
233 set<int> want;
234 OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid)
235 : pg(pg), hoid(hoid) {}
236 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
237 ECBackend::read_result_t &res = in.second;
238 if (!(res.r == 0 && res.errors.empty())) {
239 pg->_failed_push(hoid, in);
240 return;
241 }
242 assert(res.returned.size() == 1);
243 pg->handle_recovery_read_complete(
244 hoid,
245 res.returned.back(),
246 res.attrs,
247 in.first);
248 }
249};
250
251struct RecoveryMessages {
252 map<hobject_t,
253 ECBackend::read_request_t> reads;
254 void read(
255 ECBackend *ec,
256 const hobject_t &hoid, uint64_t off, uint64_t len,
257 const set<pg_shard_t> &need,
258 bool attrs) {
259 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
260 to_read.push_back(boost::make_tuple(off, len, 0));
261 assert(!reads.count(hoid));
262 reads.insert(
263 make_pair(
264 hoid,
265 ECBackend::read_request_t(
266 to_read,
267 need,
268 attrs,
269 new OnRecoveryReadComplete(
270 ec,
271 hoid))));
272 }
273
274 map<pg_shard_t, vector<PushOp> > pushes;
275 map<pg_shard_t, vector<PushReplyOp> > push_replies;
276 ObjectStore::Transaction t;
277 RecoveryMessages() {}
278 ~RecoveryMessages(){}
279};
280
281void ECBackend::handle_recovery_push(
282 const PushOp &op,
283 RecoveryMessages *m)
284{
285 ostringstream ss;
286 if (get_parent()->check_failsafe_full(ss)) {
287 dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl;
288 ceph_abort();
289 }
290
291 bool oneshot = op.before_progress.first && op.after_progress.data_complete;
292 ghobject_t tobj;
293 if (oneshot) {
294 tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
295 get_parent()->whoami_shard().shard);
296 } else {
297 tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
298 op.version),
299 ghobject_t::NO_GEN,
300 get_parent()->whoami_shard().shard);
301 if (op.before_progress.first) {
302 dout(10) << __func__ << ": Adding oid "
303 << tobj.hobj << " in the temp collection" << dendl;
304 add_temp_obj(tobj.hobj);
305 }
306 }
307
308 if (op.before_progress.first) {
309 m->t.remove(coll, tobj);
310 m->t.touch(coll, tobj);
311 }
312
313 if (!op.data_included.empty()) {
314 uint64_t start = op.data_included.range_start();
315 uint64_t end = op.data_included.range_end();
316 assert(op.data.length() == (end - start));
317
318 m->t.write(
319 coll,
320 tobj,
321 start,
322 op.data.length(),
323 op.data);
324 } else {
325 assert(op.data.length() == 0);
326 }
327
328 if (op.before_progress.first) {
329 assert(op.attrset.count(string("_")));
330 m->t.setattrs(
331 coll,
332 tobj,
333 op.attrset);
334 }
335
336 if (op.after_progress.data_complete && !oneshot) {
337 dout(10) << __func__ << ": Removing oid "
338 << tobj.hobj << " from the temp collection" << dendl;
339 clear_temp_obj(tobj.hobj);
340 m->t.remove(coll, ghobject_t(
341 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
342 m->t.collection_move_rename(
343 coll, tobj,
344 coll, ghobject_t(
345 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
346 }
347 if (op.after_progress.data_complete) {
348 if ((get_parent()->pgb_is_primary())) {
349 assert(recovery_ops.count(op.soid));
350 assert(recovery_ops[op.soid].obc);
351 get_parent()->on_local_recover(
352 op.soid,
353 op.recovery_info,
354 recovery_ops[op.soid].obc,
c07f9fc5 355 false,
7c673cae
FG
356 &m->t);
357 } else {
358 get_parent()->on_local_recover(
359 op.soid,
360 op.recovery_info,
361 ObjectContextRef(),
c07f9fc5 362 false,
7c673cae
FG
363 &m->t);
364 }
365 }
366 m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
367 m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
368}
369
370void ECBackend::handle_recovery_push_reply(
371 const PushReplyOp &op,
372 pg_shard_t from,
373 RecoveryMessages *m)
374{
375 if (!recovery_ops.count(op.soid))
376 return;
377 RecoveryOp &rop = recovery_ops[op.soid];
378 assert(rop.waiting_on_pushes.count(from));
379 rop.waiting_on_pushes.erase(from);
380 continue_recovery_op(rop, m);
381}
382
383void ECBackend::handle_recovery_read_complete(
384 const hobject_t &hoid,
385 boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
386 boost::optional<map<string, bufferlist> > attrs,
387 RecoveryMessages *m)
388{
389 dout(10) << __func__ << ": returned " << hoid << " "
390 << "(" << to_read.get<0>()
391 << ", " << to_read.get<1>()
392 << ", " << to_read.get<2>()
393 << ")"
394 << dendl;
395 assert(recovery_ops.count(hoid));
396 RecoveryOp &op = recovery_ops[hoid];
397 assert(op.returned_data.empty());
398 map<int, bufferlist*> target;
399 for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
400 i != op.missing_on_shards.end();
401 ++i) {
402 target[*i] = &(op.returned_data[*i]);
403 }
404 map<int, bufferlist> from;
405 for(map<pg_shard_t, bufferlist>::iterator i = to_read.get<2>().begin();
406 i != to_read.get<2>().end();
407 ++i) {
408 from[i->first.shard].claim(i->second);
409 }
410 dout(10) << __func__ << ": " << from << dendl;
411 int r = ECUtil::decode(sinfo, ec_impl, from, target);
412 assert(r == 0);
413 if (attrs) {
414 op.xattrs.swap(*attrs);
415
416 if (!op.obc) {
417 // attrs only reference the origin bufferlist (decode from
418 // ECSubReadReply message) whose size is much greater than attrs
419 // in recovery. If obc cache it (get_obc maybe cache the attr),
420 // this causes the whole origin bufferlist would not be free
421 // until obc is evicted from obc cache. So rebuild the
422 // bufferlist before cache it.
423 for (map<string, bufferlist>::iterator it = op.xattrs.begin();
424 it != op.xattrs.end();
425 ++it) {
426 it->second.rebuild();
427 }
428 // Need to remove ECUtil::get_hinfo_key() since it should not leak out
429 // of the backend (see bug #12983)
430 map<string, bufferlist> sanitized_attrs(op.xattrs);
431 sanitized_attrs.erase(ECUtil::get_hinfo_key());
432 op.obc = get_parent()->get_obc(hoid, sanitized_attrs);
433 assert(op.obc);
434 op.recovery_info.size = op.obc->obs.oi.size;
435 op.recovery_info.oi = op.obc->obs.oi;
436 }
437
438 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
439 if (op.obc->obs.oi.size > 0) {
440 assert(op.xattrs.count(ECUtil::get_hinfo_key()));
441 bufferlist::iterator bp = op.xattrs[ECUtil::get_hinfo_key()].begin();
442 ::decode(hinfo, bp);
443 }
444 op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
445 }
446 assert(op.xattrs.size());
447 assert(op.obc);
448 continue_recovery_op(op, m);
449}
450
451struct SendPushReplies : public Context {
452 PGBackend::Listener *l;
453 epoch_t epoch;
454 map<int, MOSDPGPushReply*> replies;
455 SendPushReplies(
456 PGBackend::Listener *l,
457 epoch_t epoch,
458 map<int, MOSDPGPushReply*> &in) : l(l), epoch(epoch) {
459 replies.swap(in);
460 }
461 void finish(int) override {
462 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
463 i != replies.end();
464 ++i) {
465 l->send_message_osd_cluster(i->first, i->second, epoch);
466 }
467 replies.clear();
468 }
469 ~SendPushReplies() override {
470 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
471 i != replies.end();
472 ++i) {
473 i->second->put();
474 }
475 replies.clear();
476 }
477};
478
479void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
480{
481 for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
482 i != m.pushes.end();
483 m.pushes.erase(i++)) {
484 MOSDPGPush *msg = new MOSDPGPush();
485 msg->set_priority(priority);
486 msg->map_epoch = get_parent()->get_epoch();
487 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
488 msg->from = get_parent()->whoami_shard();
489 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
490 msg->pushes.swap(i->second);
491 msg->compute_cost(cct);
492 get_parent()->send_message(
493 i->first.osd,
494 msg);
495 }
496 map<int, MOSDPGPushReply*> replies;
497 for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
498 m.push_replies.begin();
499 i != m.push_replies.end();
500 m.push_replies.erase(i++)) {
501 MOSDPGPushReply *msg = new MOSDPGPushReply();
502 msg->set_priority(priority);
503 msg->map_epoch = get_parent()->get_epoch();
504 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
505 msg->from = get_parent()->whoami_shard();
506 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
507 msg->replies.swap(i->second);
508 msg->compute_cost(cct);
509 replies.insert(make_pair(i->first.osd, msg));
510 }
511
512 if (!replies.empty()) {
513 (m.t).register_on_complete(
514 get_parent()->bless_context(
515 new SendPushReplies(
516 get_parent(),
517 get_parent()->get_epoch(),
518 replies)));
519 get_parent()->queue_transaction(std::move(m.t));
520 }
521
522 if (m.reads.empty())
523 return;
524 start_read_op(
525 priority,
526 m.reads,
527 OpRequestRef(),
528 false, true);
529}
530
531void ECBackend::continue_recovery_op(
532 RecoveryOp &op,
533 RecoveryMessages *m)
534{
535 dout(10) << __func__ << ": continuing " << op << dendl;
536 while (1) {
537 switch (op.state) {
538 case RecoveryOp::IDLE: {
539 // start read
540 op.state = RecoveryOp::READING;
541 assert(!op.recovery_progress.data_complete);
542 set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
543 uint64_t from = op.recovery_progress.data_recovered_to;
544 uint64_t amount = get_recovery_chunk_size();
545
546 if (op.recovery_progress.first && op.obc) {
547 /* We've got the attrs and the hinfo, might as well use them */
548 op.hinfo = get_hash_info(op.hoid);
549 assert(op.hinfo);
550 op.xattrs = op.obc->attr_cache;
551 ::encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
552 }
553
554 set<pg_shard_t> to_read;
555 int r = get_min_avail_to_read_shards(
556 op.hoid, want, true, false, &to_read);
557 if (r != 0) {
558 // we must have lost a recovery source
559 assert(!op.recovery_progress.first);
560 dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
561 << dendl;
562 get_parent()->cancel_pull(op.hoid);
563 recovery_ops.erase(op.hoid);
564 return;
565 }
566 m->read(
567 this,
568 op.hoid,
569 op.recovery_progress.data_recovered_to,
570 amount,
571 to_read,
572 op.recovery_progress.first && !op.obc);
573 op.extent_requested = make_pair(
574 from,
575 amount);
576 dout(10) << __func__ << ": IDLE return " << op << dendl;
577 return;
578 }
579 case RecoveryOp::READING: {
580 // read completed, start write
581 assert(op.xattrs.size());
582 assert(op.returned_data.size());
583 op.state = RecoveryOp::WRITING;
584 ObjectRecoveryProgress after_progress = op.recovery_progress;
585 after_progress.data_recovered_to += op.extent_requested.second;
586 after_progress.first = false;
587 if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
588 after_progress.data_recovered_to =
589 sinfo.logical_to_next_stripe_offset(
590 op.obc->obs.oi.size);
591 after_progress.data_complete = true;
592 }
593 for (set<pg_shard_t>::iterator mi = op.missing_on.begin();
594 mi != op.missing_on.end();
595 ++mi) {
596 assert(op.returned_data.count(mi->shard));
597 m->pushes[*mi].push_back(PushOp());
598 PushOp &pop = m->pushes[*mi].back();
599 pop.soid = op.hoid;
600 pop.version = op.v;
601 pop.data = op.returned_data[mi->shard];
602 dout(10) << __func__ << ": before_progress=" << op.recovery_progress
603 << ", after_progress=" << after_progress
604 << ", pop.data.length()=" << pop.data.length()
605 << ", size=" << op.obc->obs.oi.size << dendl;
606 assert(
607 pop.data.length() ==
608 sinfo.aligned_logical_offset_to_chunk_offset(
609 after_progress.data_recovered_to -
610 op.recovery_progress.data_recovered_to)
611 );
612 if (pop.data.length())
613 pop.data_included.insert(
614 sinfo.aligned_logical_offset_to_chunk_offset(
615 op.recovery_progress.data_recovered_to),
616 pop.data.length()
617 );
618 if (op.recovery_progress.first) {
619 pop.attrset = op.xattrs;
620 }
621 pop.recovery_info = op.recovery_info;
622 pop.before_progress = op.recovery_progress;
623 pop.after_progress = after_progress;
624 if (*mi != get_parent()->primary_shard())
625 get_parent()->begin_peer_recover(
626 *mi,
627 op.hoid);
628 }
629 op.returned_data.clear();
630 op.waiting_on_pushes = op.missing_on;
631 op.recovery_progress = after_progress;
632 dout(10) << __func__ << ": READING return " << op << dendl;
633 return;
634 }
635 case RecoveryOp::WRITING: {
636 if (op.waiting_on_pushes.empty()) {
637 if (op.recovery_progress.data_complete) {
638 op.state = RecoveryOp::COMPLETE;
639 for (set<pg_shard_t>::iterator i = op.missing_on.begin();
640 i != op.missing_on.end();
641 ++i) {
642 if (*i != get_parent()->primary_shard()) {
643 dout(10) << __func__ << ": on_peer_recover on " << *i
644 << ", obj " << op.hoid << dendl;
645 get_parent()->on_peer_recover(
646 *i,
647 op.hoid,
648 op.recovery_info);
649 }
650 }
651 object_stat_sum_t stat;
652 stat.num_bytes_recovered = op.recovery_info.size;
653 stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
654 stat.num_objects_recovered = 1;
c07f9fc5 655 get_parent()->on_global_recover(op.hoid, stat, false);
7c673cae
FG
656 dout(10) << __func__ << ": WRITING return " << op << dendl;
657 recovery_ops.erase(op.hoid);
658 return;
659 } else {
660 op.state = RecoveryOp::IDLE;
661 dout(10) << __func__ << ": WRITING continue " << op << dendl;
662 continue;
663 }
664 }
665 return;
666 }
667 // should never be called once complete
668 case RecoveryOp::COMPLETE:
669 default: {
670 ceph_abort();
671 };
672 }
673 }
674}
675
676void ECBackend::run_recovery_op(
677 RecoveryHandle *_h,
678 int priority)
679{
680 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
681 RecoveryMessages m;
682 for (list<RecoveryOp>::iterator i = h->ops.begin();
683 i != h->ops.end();
684 ++i) {
685 dout(10) << __func__ << ": starting " << *i << dendl;
686 assert(!recovery_ops.count(i->hoid));
687 RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
688 continue_recovery_op(op, &m);
689 }
c07f9fc5 690
7c673cae 691 dispatch_recovery_messages(m, priority);
c07f9fc5 692 send_recovery_deletes(priority, h->deletes);
7c673cae
FG
693 delete _h;
694}
695
224ce89b 696int ECBackend::recover_object(
7c673cae
FG
697 const hobject_t &hoid,
698 eversion_t v,
699 ObjectContextRef head,
700 ObjectContextRef obc,
701 RecoveryHandle *_h)
702{
703 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
704 h->ops.push_back(RecoveryOp());
705 h->ops.back().v = v;
706 h->ops.back().hoid = hoid;
707 h->ops.back().obc = obc;
708 h->ops.back().recovery_info.soid = hoid;
709 h->ops.back().recovery_info.version = v;
710 if (obc) {
711 h->ops.back().recovery_info.size = obc->obs.oi.size;
712 h->ops.back().recovery_info.oi = obc->obs.oi;
713 }
714 if (hoid.is_snap()) {
715 if (obc) {
716 assert(obc->ssc);
717 h->ops.back().recovery_info.ss = obc->ssc->snapset;
718 } else if (head) {
719 assert(head->ssc);
720 h->ops.back().recovery_info.ss = head->ssc->snapset;
721 } else {
722 assert(0 == "neither obc nor head set for a snap object");
723 }
724 }
725 h->ops.back().recovery_progress.omap_complete = true;
726 for (set<pg_shard_t>::const_iterator i =
727 get_parent()->get_actingbackfill_shards().begin();
728 i != get_parent()->get_actingbackfill_shards().end();
729 ++i) {
730 dout(10) << "checking " << *i << dendl;
731 if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
732 h->ops.back().missing_on.insert(*i);
733 h->ops.back().missing_on_shards.insert(i->shard);
734 }
735 }
736 dout(10) << __func__ << ": built op " << h->ops.back() << dendl;
224ce89b 737 return 0;
7c673cae
FG
738}
739
740bool ECBackend::can_handle_while_inactive(
741 OpRequestRef _op)
742{
743 return false;
744}
745
c07f9fc5 746bool ECBackend::_handle_message(
7c673cae
FG
747 OpRequestRef _op)
748{
749 dout(10) << __func__ << ": " << *_op->get_req() << dendl;
750 int priority = _op->get_req()->get_priority();
751 switch (_op->get_req()->get_type()) {
752 case MSG_OSD_EC_WRITE: {
753 // NOTE: this is non-const because handle_sub_write modifies the embedded
754 // ObjectStore::Transaction in place (and then std::move's it). It does
755 // not conflict with ECSubWrite's operator<<.
756 MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
757 _op->get_nonconst_req());
758 handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
759 return true;
760 }
761 case MSG_OSD_EC_WRITE_REPLY: {
762 const MOSDECSubOpWriteReply *op = static_cast<const MOSDECSubOpWriteReply*>(
763 _op->get_req());
764 handle_sub_write_reply(op->op.from, op->op, _op->pg_trace);
765 return true;
766 }
767 case MSG_OSD_EC_READ: {
768 const MOSDECSubOpRead *op = static_cast<const MOSDECSubOpRead*>(_op->get_req());
769 MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
770 reply->pgid = get_parent()->primary_spg_t();
771 reply->map_epoch = get_parent()->get_epoch();
772 reply->min_epoch = get_parent()->get_interval_start_epoch();
773 handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace);
774 reply->trace = _op->pg_trace;
775 get_parent()->send_message_osd_cluster(
776 op->op.from.osd, reply, get_parent()->get_epoch());
777 return true;
778 }
779 case MSG_OSD_EC_READ_REPLY: {
780 // NOTE: this is non-const because handle_sub_read_reply steals resulting
781 // buffers. It does not conflict with ECSubReadReply operator<<.
782 MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
783 _op->get_nonconst_req());
784 RecoveryMessages rm;
785 handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
786 dispatch_recovery_messages(rm, priority);
787 return true;
788 }
789 case MSG_OSD_PG_PUSH: {
790 const MOSDPGPush *op = static_cast<const MOSDPGPush *>(_op->get_req());
791 RecoveryMessages rm;
792 for (vector<PushOp>::const_iterator i = op->pushes.begin();
793 i != op->pushes.end();
794 ++i) {
795 handle_recovery_push(*i, &rm);
796 }
797 dispatch_recovery_messages(rm, priority);
798 return true;
799 }
800 case MSG_OSD_PG_PUSH_REPLY: {
801 const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
802 _op->get_req());
803 RecoveryMessages rm;
804 for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
805 i != op->replies.end();
806 ++i) {
807 handle_recovery_push_reply(*i, op->from, &rm);
808 }
809 dispatch_recovery_messages(rm, priority);
810 return true;
811 }
812 default:
813 return false;
814 }
815 return false;
816}
817
818struct SubWriteCommitted : public Context {
819 ECBackend *pg;
820 OpRequestRef msg;
821 ceph_tid_t tid;
822 eversion_t version;
823 eversion_t last_complete;
824 const ZTracer::Trace trace;
825 SubWriteCommitted(
826 ECBackend *pg,
827 OpRequestRef msg,
828 ceph_tid_t tid,
829 eversion_t version,
830 eversion_t last_complete,
831 const ZTracer::Trace &trace)
832 : pg(pg), msg(msg), tid(tid),
833 version(version), last_complete(last_complete), trace(trace) {}
834 void finish(int) override {
835 if (msg)
836 msg->mark_event("sub_op_committed");
837 pg->sub_write_committed(tid, version, last_complete, trace);
838 }
839};
840void ECBackend::sub_write_committed(
841 ceph_tid_t tid, eversion_t version, eversion_t last_complete,
842 const ZTracer::Trace &trace) {
843 if (get_parent()->pgb_is_primary()) {
844 ECSubWriteReply reply;
845 reply.tid = tid;
846 reply.last_complete = last_complete;
847 reply.committed = true;
848 reply.from = get_parent()->whoami_shard();
849 handle_sub_write_reply(
850 get_parent()->whoami_shard(),
851 reply, trace);
852 } else {
853 get_parent()->update_last_complete_ondisk(last_complete);
854 MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
855 r->pgid = get_parent()->primary_spg_t();
856 r->map_epoch = get_parent()->get_epoch();
857 r->min_epoch = get_parent()->get_interval_start_epoch();
858 r->op.tid = tid;
859 r->op.last_complete = last_complete;
860 r->op.committed = true;
861 r->op.from = get_parent()->whoami_shard();
862 r->set_priority(CEPH_MSG_PRIO_HIGH);
863 r->trace = trace;
864 r->trace.event("sending sub op commit");
865 get_parent()->send_message_osd_cluster(
866 get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
867 }
868}
869
870struct SubWriteApplied : public Context {
871 ECBackend *pg;
872 OpRequestRef msg;
873 ceph_tid_t tid;
874 eversion_t version;
875 const ZTracer::Trace trace;
876 SubWriteApplied(
877 ECBackend *pg,
878 OpRequestRef msg,
879 ceph_tid_t tid,
880 eversion_t version,
881 const ZTracer::Trace &trace)
882 : pg(pg), msg(msg), tid(tid), version(version), trace(trace) {}
883 void finish(int) override {
884 if (msg)
885 msg->mark_event("sub_op_applied");
886 pg->sub_write_applied(tid, version, trace);
887 }
888};
889void ECBackend::sub_write_applied(
890 ceph_tid_t tid, eversion_t version,
891 const ZTracer::Trace &trace) {
892 parent->op_applied(version);
893 if (get_parent()->pgb_is_primary()) {
894 ECSubWriteReply reply;
895 reply.from = get_parent()->whoami_shard();
896 reply.tid = tid;
897 reply.applied = true;
898 handle_sub_write_reply(
899 get_parent()->whoami_shard(),
900 reply, trace);
901 } else {
902 MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
903 r->pgid = get_parent()->primary_spg_t();
904 r->map_epoch = get_parent()->get_epoch();
905 r->min_epoch = get_parent()->get_interval_start_epoch();
906 r->op.from = get_parent()->whoami_shard();
907 r->op.tid = tid;
908 r->op.applied = true;
909 r->set_priority(CEPH_MSG_PRIO_HIGH);
910 r->trace = trace;
911 r->trace.event("sending sub op apply");
912 get_parent()->send_message_osd_cluster(
913 get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
914 }
915}
916
917void ECBackend::handle_sub_write(
918 pg_shard_t from,
919 OpRequestRef msg,
920 ECSubWrite &op,
921 const ZTracer::Trace &trace,
922 Context *on_local_applied_sync)
923{
924 if (msg)
925 msg->mark_started();
926 trace.event("handle_sub_write");
927 assert(!get_parent()->get_log().get_missing().is_missing(op.soid));
928 if (!get_parent()->pgb_is_primary())
929 get_parent()->update_stats(op.stats);
930 ObjectStore::Transaction localt;
931 if (!op.temp_added.empty()) {
932 add_temp_objs(op.temp_added);
933 }
934 if (op.backfill) {
935 for (set<hobject_t>::iterator i = op.temp_removed.begin();
936 i != op.temp_removed.end();
937 ++i) {
938 dout(10) << __func__ << ": removing object " << *i
939 << " since we won't get the transaction" << dendl;
940 localt.remove(
941 coll,
942 ghobject_t(
943 *i,
944 ghobject_t::NO_GEN,
945 get_parent()->whoami_shard().shard));
946 }
947 }
948 clear_temp_objs(op.temp_removed);
949 get_parent()->log_operation(
950 op.log_entries,
951 op.updated_hit_set_history,
952 op.trim_to,
953 op.roll_forward_to,
954 !op.backfill,
955 localt);
956
957 PrimaryLogPG *_rPG = dynamic_cast<PrimaryLogPG *>(get_parent());
958 if (_rPG && !_rPG->is_undersized() &&
959 (unsigned)get_parent()->whoami_shard().shard >= ec_impl->get_data_chunk_count())
960 op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
961
962 if (on_local_applied_sync) {
963 dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync << dendl;
964 localt.register_on_applied_sync(on_local_applied_sync);
965 }
966 localt.register_on_commit(
967 get_parent()->bless_context(
968 new SubWriteCommitted(
969 this, msg, op.tid,
970 op.at_version,
971 get_parent()->get_info().last_complete, trace)));
972 localt.register_on_applied(
973 get_parent()->bless_context(
974 new SubWriteApplied(this, msg, op.tid, op.at_version, trace)));
975 vector<ObjectStore::Transaction> tls;
976 tls.reserve(2);
977 tls.push_back(std::move(op.t));
978 tls.push_back(std::move(localt));
979 get_parent()->queue_transactions(tls, msg);
980}
981
982void ECBackend::handle_sub_read(
983 pg_shard_t from,
984 const ECSubRead &op,
985 ECSubReadReply *reply,
986 const ZTracer::Trace &trace)
987{
988 trace.event("handle sub read");
989 shard_id_t shard = get_parent()->whoami_shard().shard;
990 for(auto i = op.to_read.begin();
991 i != op.to_read.end();
992 ++i) {
993 int r = 0;
994 ECUtil::HashInfoRef hinfo;
995 if (!get_parent()->get_pool().allows_ecoverwrites()) {
996 hinfo = get_hash_info(i->first);
997 if (!hinfo) {
998 r = -EIO;
c07f9fc5
FG
999 get_parent()->clog_error() << "Corruption detected: object " << i->first
1000 << " is missing hash_info";
7c673cae
FG
1001 dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
1002 goto error;
1003 }
1004 }
1005 for (auto j = i->second.begin(); j != i->second.end(); ++j) {
1006 bufferlist bl;
1007 r = store->read(
1008 ch,
1009 ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1010 j->get<0>(),
1011 j->get<1>(),
224ce89b 1012 bl, j->get<2>());
7c673cae 1013 if (r < 0) {
c07f9fc5
FG
1014 get_parent()->clog_error() << "Error " << r
1015 << " reading object "
7c673cae
FG
1016 << i->first;
1017 dout(5) << __func__ << ": Error " << r
1018 << " reading " << i->first << dendl;
1019 goto error;
1020 } else {
1021 dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
1022 reply->buffers_read[i->first].push_back(
1023 make_pair(
1024 j->get<0>(),
1025 bl)
1026 );
1027 }
1028
1029 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1030 // This shows that we still need deep scrub because large enough files
1031 // are read in sections, so the digest check here won't be done here.
1032 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1033 // the state of our chunk in case other chunks could substitute.
1034 assert(hinfo->has_chunk_hash());
1035 if ((bl.length() == hinfo->get_total_chunk_size()) &&
1036 (j->get<0>() == 0)) {
1037 dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
1038 bufferhash h(-1);
1039 h << bl;
1040 if (h.digest() != hinfo->get_chunk_hash(shard)) {
c07f9fc5 1041 get_parent()->clog_error() << "Bad hash for " << i->first << " digest 0x"
7c673cae
FG
1042 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec;
1043 dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
1044 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
1045 r = -EIO;
1046 goto error;
1047 }
1048 }
1049 }
1050 }
1051 continue;
1052error:
1053 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1054 // the state of our chunk in case other chunks could substitute.
1055 reply->buffers_read.erase(i->first);
1056 reply->errors[i->first] = r;
1057 }
1058 for (set<hobject_t>::iterator i = op.attrs_to_read.begin();
1059 i != op.attrs_to_read.end();
1060 ++i) {
1061 dout(10) << __func__ << ": fulfilling attr request on "
1062 << *i << dendl;
1063 if (reply->errors.count(*i))
1064 continue;
1065 int r = store->getattrs(
1066 ch,
1067 ghobject_t(
1068 *i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1069 reply->attrs_read[*i]);
1070 if (r < 0) {
1071 reply->buffers_read.erase(*i);
1072 reply->errors[*i] = r;
1073 }
1074 }
1075 reply->from = get_parent()->whoami_shard();
1076 reply->tid = op.tid;
1077}
1078
1079void ECBackend::handle_sub_write_reply(
1080 pg_shard_t from,
1081 const ECSubWriteReply &op,
1082 const ZTracer::Trace &trace)
1083{
1084 map<ceph_tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
1085 assert(i != tid_to_op_map.end());
1086 if (op.committed) {
1087 trace.event("sub write committed");
1088 assert(i->second.pending_commit.count(from));
1089 i->second.pending_commit.erase(from);
1090 if (from != get_parent()->whoami_shard()) {
1091 get_parent()->update_peer_last_complete_ondisk(from, op.last_complete);
1092 }
1093 }
1094 if (op.applied) {
1095 trace.event("sub write applied");
1096 assert(i->second.pending_apply.count(from));
1097 i->second.pending_apply.erase(from);
1098 }
1099
1100 if (i->second.pending_apply.empty() && i->second.on_all_applied) {
1101 dout(10) << __func__ << " Calling on_all_applied on " << i->second << dendl;
1102 i->second.on_all_applied->complete(0);
1103 i->second.on_all_applied = 0;
1104 i->second.trace.event("ec write all applied");
1105 }
1106 if (i->second.pending_commit.empty() && i->second.on_all_commit) {
1107 dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
1108 i->second.on_all_commit->complete(0);
1109 i->second.on_all_commit = 0;
1110 i->second.trace.event("ec write all committed");
1111 }
1112 check_ops();
1113}
1114
1115void ECBackend::handle_sub_read_reply(
1116 pg_shard_t from,
1117 ECSubReadReply &op,
1118 RecoveryMessages *m,
1119 const ZTracer::Trace &trace)
1120{
1121 trace.event("ec sub read reply");
1122 dout(10) << __func__ << ": reply " << op << dendl;
1123 map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
1124 if (iter == tid_to_read_map.end()) {
1125 //canceled
1126 dout(20) << __func__ << ": dropped " << op << dendl;
1127 return;
1128 }
1129 ReadOp &rop = iter->second;
1130 for (auto i = op.buffers_read.begin();
1131 i != op.buffers_read.end();
1132 ++i) {
1133 assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer
1134 if (!rop.to_read.count(i->first)) {
1135 // We canceled this read! @see filter_read_op
1136 dout(20) << __func__ << " to_read skipping" << dendl;
1137 continue;
1138 }
1139 list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter =
1140 rop.to_read.find(i->first)->second.to_read.begin();
1141 list<
1142 boost::tuple<
1143 uint64_t, uint64_t, map<pg_shard_t, bufferlist> > >::iterator riter =
1144 rop.complete[i->first].returned.begin();
1145 for (list<pair<uint64_t, bufferlist> >::iterator j = i->second.begin();
1146 j != i->second.end();
1147 ++j, ++req_iter, ++riter) {
1148 assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
1149 assert(riter != rop.complete[i->first].returned.end());
1150 pair<uint64_t, uint64_t> adjusted =
1151 sinfo.aligned_offset_len_to_chunk(
1152 make_pair(req_iter->get<0>(), req_iter->get<1>()));
1153 assert(adjusted.first == j->first);
1154 riter->get<2>()[from].claim(j->second);
1155 }
1156 }
1157 for (auto i = op.attrs_read.begin();
1158 i != op.attrs_read.end();
1159 ++i) {
1160 assert(!op.errors.count(i->first)); // if read error better not have sent an attribute
1161 if (!rop.to_read.count(i->first)) {
1162 // We canceled this read! @see filter_read_op
1163 dout(20) << __func__ << " to_read skipping" << dendl;
1164 continue;
1165 }
1166 rop.complete[i->first].attrs = map<string, bufferlist>();
1167 (*(rop.complete[i->first].attrs)).swap(i->second);
1168 }
1169 for (auto i = op.errors.begin();
1170 i != op.errors.end();
1171 ++i) {
1172 rop.complete[i->first].errors.insert(
1173 make_pair(
1174 from,
1175 i->second));
1176 dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl;
1177 }
1178
1179 map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
1180 shard_to_read_map.find(from);
1181 assert(siter != shard_to_read_map.end());
1182 assert(siter->second.count(op.tid));
1183 siter->second.erase(op.tid);
1184
1185 assert(rop.in_progress.count(from));
1186 rop.in_progress.erase(from);
1187 unsigned is_complete = 0;
1188 // For redundant reads check for completion as each shard comes in,
1189 // or in a non-recovery read check for completion once all the shards read.
1190 // TODO: It would be nice if recovery could send more reads too
1191 if (rop.do_redundant_reads || (!rop.for_recovery && rop.in_progress.empty())) {
1192 for (map<hobject_t, read_result_t>::const_iterator iter =
1193 rop.complete.begin();
1194 iter != rop.complete.end();
1195 ++iter) {
1196 set<int> have;
1197 for (map<pg_shard_t, bufferlist>::const_iterator j =
1198 iter->second.returned.front().get<2>().begin();
1199 j != iter->second.returned.front().get<2>().end();
1200 ++j) {
1201 have.insert(j->first.shard);
1202 dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
1203 }
1204 set<int> want_to_read, dummy_minimum;
1205 get_want_to_read_shards(&want_to_read);
1206 int err;
1207 if ((err = ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum)) < 0) {
1208 dout(20) << __func__ << " minimum_to_decode failed" << dendl;
1209 if (rop.in_progress.empty()) {
1210 // If we don't have enough copies and we haven't sent reads for all shards
1211 // we can send the rest of the reads, if any.
1212 if (!rop.do_redundant_reads) {
1213 int r = send_all_remaining_reads(iter->first, rop);
1214 if (r == 0) {
1215 // We added to in_progress and not incrementing is_complete
1216 continue;
1217 }
1218 // Couldn't read any additional shards so handle as completed with errors
1219 }
c07f9fc5
FG
1220 // We don't want to confuse clients / RBD with objectstore error
1221 // values in particular ENOENT. We may have different error returns
1222 // from different shards, so we'll return minimum_to_decode() error
1223 // (usually EIO) to reader. It is likely an error here is due to a
1224 // damaged pg.
7c673cae
FG
1225 rop.complete[iter->first].r = err;
1226 ++is_complete;
1227 }
1228 } else {
1229 assert(rop.complete[iter->first].r == 0);
1230 if (!rop.complete[iter->first].errors.empty()) {
1231 if (cct->_conf->osd_read_ec_check_for_errors) {
1232 dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl;
1233 err = rop.complete[iter->first].errors.begin()->second;
1234 rop.complete[iter->first].r = err;
1235 } else {
c07f9fc5 1236 get_parent()->clog_warn() << "Error(s) ignored for "
7c673cae
FG
1237 << iter->first << " enough copies available";
1238 dout(10) << __func__ << " Error(s) ignored for " << iter->first
1239 << " enough copies available" << dendl;
1240 rop.complete[iter->first].errors.clear();
1241 }
1242 }
1243 ++is_complete;
1244 }
1245 }
1246 }
1247 if (rop.in_progress.empty() || is_complete == rop.complete.size()) {
1248 dout(20) << __func__ << " Complete: " << rop << dendl;
1249 rop.trace.event("ec read complete");
1250 complete_read_op(rop, m);
1251 } else {
1252 dout(10) << __func__ << " readop not complete: " << rop << dendl;
1253 }
1254}
1255
1256void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
1257{
1258 map<hobject_t, read_request_t>::iterator reqiter =
1259 rop.to_read.begin();
1260 map<hobject_t, read_result_t>::iterator resiter =
1261 rop.complete.begin();
1262 assert(rop.to_read.size() == rop.complete.size());
1263 for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
1264 if (reqiter->second.cb) {
1265 pair<RecoveryMessages *, read_result_t &> arg(
1266 m, resiter->second);
1267 reqiter->second.cb->complete(arg);
1268 reqiter->second.cb = NULL;
1269 }
1270 }
1271 tid_to_read_map.erase(rop.tid);
1272}
1273
1274struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
1275 ECBackend *ec;
1276 ceph_tid_t tid;
1277 FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
1278 void finish(ThreadPool::TPHandle &handle) override {
1279 auto ropiter = ec->tid_to_read_map.find(tid);
1280 assert(ropiter != ec->tid_to_read_map.end());
1281 int priority = ropiter->second.priority;
1282 RecoveryMessages rm;
1283 ec->complete_read_op(ropiter->second, &rm);
1284 ec->dispatch_recovery_messages(rm, priority);
1285 }
1286};
1287
1288void ECBackend::filter_read_op(
1289 const OSDMapRef& osdmap,
1290 ReadOp &op)
1291{
1292 set<hobject_t> to_cancel;
1293 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1294 i != op.source_to_obj.end();
1295 ++i) {
1296 if (osdmap->is_down(i->first.osd)) {
1297 to_cancel.insert(i->second.begin(), i->second.end());
1298 op.in_progress.erase(i->first);
1299 continue;
1300 }
1301 }
1302
1303 if (to_cancel.empty())
1304 return;
1305
1306 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1307 i != op.source_to_obj.end();
1308 ) {
1309 for (set<hobject_t>::iterator j = i->second.begin();
1310 j != i->second.end();
1311 ) {
1312 if (to_cancel.count(*j))
1313 i->second.erase(j++);
1314 else
1315 ++j;
1316 }
1317 if (i->second.empty()) {
1318 op.source_to_obj.erase(i++);
1319 } else {
1320 assert(!osdmap->is_down(i->first.osd));
1321 ++i;
1322 }
1323 }
1324
1325 for (set<hobject_t>::iterator i = to_cancel.begin();
1326 i != to_cancel.end();
1327 ++i) {
1328 get_parent()->cancel_pull(*i);
1329
1330 assert(op.to_read.count(*i));
1331 read_request_t &req = op.to_read.find(*i)->second;
1332 dout(10) << __func__ << ": canceling " << req
1333 << " for obj " << *i << dendl;
1334 assert(req.cb);
1335 delete req.cb;
1336 req.cb = NULL;
1337
1338 op.to_read.erase(*i);
1339 op.complete.erase(*i);
1340 recovery_ops.erase(*i);
1341 }
1342
1343 if (op.in_progress.empty()) {
1344 get_parent()->schedule_recovery_work(
1345 get_parent()->bless_gencontext(
1346 new FinishReadOp(this, op.tid)));
1347 }
1348}
1349
1350void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
1351{
1352 set<ceph_tid_t> tids_to_filter;
1353 for (map<pg_shard_t, set<ceph_tid_t> >::iterator
1354 i = shard_to_read_map.begin();
1355 i != shard_to_read_map.end();
1356 ) {
1357 if (osdmap->is_down(i->first.osd)) {
1358 tids_to_filter.insert(i->second.begin(), i->second.end());
1359 shard_to_read_map.erase(i++);
1360 } else {
1361 ++i;
1362 }
1363 }
1364 for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
1365 i != tids_to_filter.end();
1366 ++i) {
1367 map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
1368 assert(j != tid_to_read_map.end());
1369 filter_read_op(osdmap, j->second);
1370 }
1371}
1372
1373void ECBackend::on_change()
1374{
1375 dout(10) << __func__ << dendl;
1376
1377 completed_to = eversion_t();
1378 committed_to = eversion_t();
1379 pipeline_state.clear();
1380 waiting_reads.clear();
1381 waiting_state.clear();
1382 waiting_commit.clear();
1383 for (auto &&op: tid_to_op_map) {
1384 cache.release_write_pin(op.second.pin);
1385 }
1386 tid_to_op_map.clear();
1387
1388 for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
1389 i != tid_to_read_map.end();
1390 ++i) {
1391 dout(10) << __func__ << ": cancelling " << i->second << dendl;
1392 for (map<hobject_t, read_request_t>::iterator j =
1393 i->second.to_read.begin();
1394 j != i->second.to_read.end();
1395 ++j) {
1396 delete j->second.cb;
1397 j->second.cb = 0;
1398 }
1399 }
1400 tid_to_read_map.clear();
1401 in_progress_client_reads.clear();
1402 shard_to_read_map.clear();
1403 clear_recovery_state();
1404}
1405
1406void ECBackend::clear_recovery_state()
1407{
1408 recovery_ops.clear();
1409}
1410
1411void ECBackend::on_flushed()
1412{
1413}
1414
1415void ECBackend::dump_recovery_info(Formatter *f) const
1416{
1417 f->open_array_section("recovery_ops");
1418 for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
1419 i != recovery_ops.end();
1420 ++i) {
1421 f->open_object_section("op");
1422 i->second.dump(f);
1423 f->close_section();
1424 }
1425 f->close_section();
1426 f->open_array_section("read_ops");
1427 for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
1428 i != tid_to_read_map.end();
1429 ++i) {
1430 f->open_object_section("read_op");
1431 i->second.dump(f);
1432 f->close_section();
1433 }
1434 f->close_section();
1435}
1436
1437void ECBackend::submit_transaction(
1438 const hobject_t &hoid,
1439 const object_stat_sum_t &delta_stats,
1440 const eversion_t &at_version,
1441 PGTransactionUPtr &&t,
1442 const eversion_t &trim_to,
1443 const eversion_t &roll_forward_to,
1444 const vector<pg_log_entry_t> &log_entries,
1445 boost::optional<pg_hit_set_history_t> &hset_history,
1446 Context *on_local_applied_sync,
1447 Context *on_all_applied,
1448 Context *on_all_commit,
1449 ceph_tid_t tid,
1450 osd_reqid_t reqid,
1451 OpRequestRef client_op
1452 )
1453{
1454 assert(!tid_to_op_map.count(tid));
1455 Op *op = &(tid_to_op_map[tid]);
1456 op->hoid = hoid;
1457 op->delta_stats = delta_stats;
1458 op->version = at_version;
1459 op->trim_to = trim_to;
1460 op->roll_forward_to = MAX(roll_forward_to, committed_to);
1461 op->log_entries = log_entries;
1462 std::swap(op->updated_hit_set_history, hset_history);
1463 op->on_local_applied_sync = on_local_applied_sync;
1464 op->on_all_applied = on_all_applied;
1465 op->on_all_commit = on_all_commit;
1466 op->tid = tid;
1467 op->reqid = reqid;
1468 op->client_op = client_op;
1469 if (client_op)
1470 op->trace = client_op->pg_trace;
1471
1472 dout(10) << __func__ << ": op " << *op << " starting" << dendl;
1473 start_rmw(op, std::move(t));
1474 dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
1475}
1476
1477void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
1478 if (!waiting_state.empty()) {
1479 waiting_state.back().on_write.emplace_back(std::move(cb));
1480 } else if (!waiting_reads.empty()) {
1481 waiting_reads.back().on_write.emplace_back(std::move(cb));
1482 } else {
1483 // Nothing earlier in the pipeline, just call it
1484 cb();
1485 }
1486}
1487
1488int ECBackend::get_min_avail_to_read_shards(
1489 const hobject_t &hoid,
1490 const set<int> &want,
1491 bool for_recovery,
1492 bool do_redundant_reads,
1493 set<pg_shard_t> *to_read)
1494{
1495 // Make sure we don't do redundant reads for recovery
1496 assert(!for_recovery || !do_redundant_reads);
1497
1498 set<int> have;
1499 map<shard_id_t, pg_shard_t> shards;
1500
1501 for (set<pg_shard_t>::const_iterator i =
1502 get_parent()->get_acting_shards().begin();
1503 i != get_parent()->get_acting_shards().end();
1504 ++i) {
1505 dout(10) << __func__ << ": checking acting " << *i << dendl;
1506 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1507 if (!missing.is_missing(hoid)) {
1508 assert(!have.count(i->shard));
1509 have.insert(i->shard);
1510 assert(!shards.count(i->shard));
1511 shards.insert(make_pair(i->shard, *i));
1512 }
1513 }
1514
1515 if (for_recovery) {
1516 for (set<pg_shard_t>::const_iterator i =
1517 get_parent()->get_backfill_shards().begin();
1518 i != get_parent()->get_backfill_shards().end();
1519 ++i) {
1520 if (have.count(i->shard)) {
1521 assert(shards.count(i->shard));
1522 continue;
1523 }
1524 dout(10) << __func__ << ": checking backfill " << *i << dendl;
1525 assert(!shards.count(i->shard));
1526 const pg_info_t &info = get_parent()->get_shard_info(*i);
1527 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1528 if (hoid < info.last_backfill &&
1529 !missing.is_missing(hoid)) {
1530 have.insert(i->shard);
1531 shards.insert(make_pair(i->shard, *i));
1532 }
1533 }
1534
1535 map<hobject_t, set<pg_shard_t>>::const_iterator miter =
1536 get_parent()->get_missing_loc_shards().find(hoid);
1537 if (miter != get_parent()->get_missing_loc_shards().end()) {
1538 for (set<pg_shard_t>::iterator i = miter->second.begin();
1539 i != miter->second.end();
1540 ++i) {
1541 dout(10) << __func__ << ": checking missing_loc " << *i << dendl;
1542 auto m = get_parent()->maybe_get_shard_missing(*i);
1543 if (m) {
1544 assert(!(*m).is_missing(hoid));
1545 }
1546 have.insert(i->shard);
1547 shards.insert(make_pair(i->shard, *i));
1548 }
1549 }
1550 }
1551
1552 set<int> need;
1553 int r = ec_impl->minimum_to_decode(want, have, &need);
1554 if (r < 0)
1555 return r;
1556
1557 if (do_redundant_reads) {
1558 need.swap(have);
1559 }
1560
1561 if (!to_read)
1562 return 0;
1563
1564 for (set<int>::iterator i = need.begin();
1565 i != need.end();
1566 ++i) {
1567 assert(shards.count(shard_id_t(*i)));
1568 to_read->insert(shards[shard_id_t(*i)]);
1569 }
1570 return 0;
1571}
1572
1573int ECBackend::get_remaining_shards(
1574 const hobject_t &hoid,
1575 const set<int> &avail,
1576 set<pg_shard_t> *to_read)
1577{
1578 set<int> need;
1579 map<shard_id_t, pg_shard_t> shards;
1580
1581 for (set<pg_shard_t>::const_iterator i =
1582 get_parent()->get_acting_shards().begin();
1583 i != get_parent()->get_acting_shards().end();
1584 ++i) {
1585 dout(10) << __func__ << ": checking acting " << *i << dendl;
1586 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1587 if (!missing.is_missing(hoid)) {
1588 assert(!need.count(i->shard));
1589 need.insert(i->shard);
1590 assert(!shards.count(i->shard));
1591 shards.insert(make_pair(i->shard, *i));
1592 }
1593 }
1594
1595 if (!to_read)
1596 return 0;
1597
1598 for (set<int>::iterator i = need.begin();
1599 i != need.end();
1600 ++i) {
1601 assert(shards.count(shard_id_t(*i)));
1602 if (avail.find(*i) == avail.end())
1603 to_read->insert(shards[shard_id_t(*i)]);
1604 }
1605 return 0;
1606}
1607
1608void ECBackend::start_read_op(
1609 int priority,
1610 map<hobject_t, read_request_t> &to_read,
1611 OpRequestRef _op,
1612 bool do_redundant_reads,
1613 bool for_recovery)
1614{
1615 ceph_tid_t tid = get_parent()->get_tid();
1616 assert(!tid_to_read_map.count(tid));
1617 auto &op = tid_to_read_map.emplace(
1618 tid,
1619 ReadOp(
1620 priority,
1621 tid,
1622 do_redundant_reads,
1623 for_recovery,
1624 _op,
1625 std::move(to_read))).first->second;
1626 dout(10) << __func__ << ": starting " << op << dendl;
1627 if (_op) {
1628 op.trace = _op->pg_trace;
1629 op.trace.event("start ec read");
1630 }
1631 do_read_op(op);
1632}
1633
1634void ECBackend::do_read_op(ReadOp &op)
1635{
1636 int priority = op.priority;
1637 ceph_tid_t tid = op.tid;
1638
1639 dout(10) << __func__ << ": starting read " << op << dendl;
1640
1641 map<pg_shard_t, ECSubRead> messages;
1642 for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
1643 i != op.to_read.end();
1644 ++i) {
1645 bool need_attrs = i->second.want_attrs;
1646 for (set<pg_shard_t>::const_iterator j = i->second.need.begin();
1647 j != i->second.need.end();
1648 ++j) {
1649 if (need_attrs) {
1650 messages[*j].attrs_to_read.insert(i->first);
1651 need_attrs = false;
1652 }
1653 op.obj_to_source[i->first].insert(*j);
1654 op.source_to_obj[*j].insert(i->first);
1655 }
1656 for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
1657 i->second.to_read.begin();
1658 j != i->second.to_read.end();
1659 ++j) {
1660 pair<uint64_t, uint64_t> chunk_off_len =
1661 sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
1662 for (set<pg_shard_t>::const_iterator k = i->second.need.begin();
1663 k != i->second.need.end();
1664 ++k) {
1665 messages[*k].to_read[i->first].push_back(
1666 boost::make_tuple(
1667 chunk_off_len.first,
1668 chunk_off_len.second,
1669 j->get<2>()));
1670 }
1671 assert(!need_attrs);
1672 }
1673 }
1674
1675 for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
1676 i != messages.end();
1677 ++i) {
1678 op.in_progress.insert(i->first);
1679 shard_to_read_map[i->first].insert(op.tid);
1680 i->second.tid = tid;
1681 MOSDECSubOpRead *msg = new MOSDECSubOpRead;
1682 msg->set_priority(priority);
1683 msg->pgid = spg_t(
1684 get_parent()->whoami_spg_t().pgid,
1685 i->first.shard);
1686 msg->map_epoch = get_parent()->get_epoch();
1687 msg->min_epoch = get_parent()->get_interval_start_epoch();
1688 msg->op = i->second;
1689 msg->op.from = get_parent()->whoami_shard();
1690 msg->op.tid = tid;
1691 if (op.trace) {
1692 // initialize a child span for this shard
1693 msg->trace.init("ec sub read", nullptr, &op.trace);
1694 msg->trace.keyval("shard", i->first.shard.id);
1695 }
1696 get_parent()->send_message_osd_cluster(
1697 i->first.osd,
1698 msg,
1699 get_parent()->get_epoch());
1700 }
1701 dout(10) << __func__ << ": started " << op << dendl;
1702}
1703
1704ECUtil::HashInfoRef ECBackend::get_hash_info(
1705 const hobject_t &hoid, bool checks, const map<string,bufferptr> *attrs)
1706{
1707 dout(10) << __func__ << ": Getting attr on " << hoid << dendl;
1708 ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid);
1709 if (!ref) {
1710 dout(10) << __func__ << ": not in cache " << hoid << dendl;
1711 struct stat st;
1712 int r = store->stat(
1713 ch,
1714 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1715 &st);
1716 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
1717 // XXX: What does it mean if there is no object on disk?
1718 if (r >= 0) {
1719 dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl;
1720 bufferlist bl;
1721 if (attrs) {
1722 map<string, bufferptr>::const_iterator k = attrs->find(ECUtil::get_hinfo_key());
1723 if (k == attrs->end()) {
1724 dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl;
1725 } else {
1726 bl.push_back(k->second);
1727 }
1728 } else {
1729 r = store->getattr(
1730 ch,
1731 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1732 ECUtil::get_hinfo_key(),
1733 bl);
1734 if (r < 0) {
1735 dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl;
1736 bl.clear(); // just in case
1737 }
1738 }
1739 if (bl.length() > 0) {
1740 bufferlist::iterator bp = bl.begin();
1741 ::decode(hinfo, bp);
1742 if (checks && hinfo.get_total_chunk_size() != (uint64_t)st.st_size) {
1743 dout(0) << __func__ << ": Mismatch of total_chunk_size "
1744 << hinfo.get_total_chunk_size() << dendl;
1745 return ECUtil::HashInfoRef();
1746 }
1747 } else if (st.st_size > 0) { // If empty object and no hinfo, create it
1748 return ECUtil::HashInfoRef();
1749 }
1750 }
1751 ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
1752 }
1753 return ref;
1754}
1755
1756void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
1757{
1758 assert(op);
1759
1760 op->plan = ECTransaction::get_write_plan(
1761 sinfo,
1762 std::move(t),
1763 [&](const hobject_t &i) {
1764 ECUtil::HashInfoRef ref = get_hash_info(i, false);
1765 if (!ref) {
1766 derr << __func__ << ": get_hash_info(" << i << ")"
1767 << " returned a null pointer and there is no "
1768 << " way to recover from such an error in this "
1769 << " context" << dendl;
1770 ceph_abort();
1771 }
1772 return ref;
1773 },
1774 get_parent()->get_dpp());
1775
1776 dout(10) << __func__ << ": " << *op << dendl;
1777
1778 waiting_state.push_back(*op);
1779 check_ops();
1780}
1781
1782bool ECBackend::try_state_to_reads()
1783{
1784 if (waiting_state.empty())
1785 return false;
1786
1787 Op *op = &(waiting_state.front());
1788 if (op->requires_rmw() && pipeline_state.cache_invalid()) {
1789 assert(get_parent()->get_pool().allows_ecoverwrites());
1790 dout(20) << __func__ << ": blocking " << *op
1791 << " because it requires an rmw and the cache is invalid "
1792 << pipeline_state
1793 << dendl;
1794 return false;
1795 }
1796
1797 if (op->invalidates_cache()) {
1798 dout(20) << __func__ << ": invalidating cache after this op"
1799 << dendl;
1800 pipeline_state.invalidate();
1801 op->using_cache = false;
1802 } else {
1803 op->using_cache = pipeline_state.caching_enabled();
1804 }
1805
1806 waiting_state.pop_front();
1807 waiting_reads.push_back(*op);
1808
1809 if (op->using_cache) {
1810 cache.open_write_pin(op->pin);
1811
1812 extent_set empty;
1813 for (auto &&hpair: op->plan.will_write) {
1814 auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
1815 const extent_set &to_read_plan =
1816 to_read_plan_iter == op->plan.to_read.end() ?
1817 empty :
1818 to_read_plan_iter->second;
1819
1820 extent_set remote_read = cache.reserve_extents_for_rmw(
1821 hpair.first,
1822 op->pin,
1823 hpair.second,
1824 to_read_plan);
1825
1826 extent_set pending_read = to_read_plan;
1827 pending_read.subtract(remote_read);
1828
1829 if (!remote_read.empty()) {
1830 op->remote_read[hpair.first] = std::move(remote_read);
1831 }
1832 if (!pending_read.empty()) {
1833 op->pending_read[hpair.first] = std::move(pending_read);
1834 }
1835 }
1836 } else {
1837 op->remote_read = op->plan.to_read;
1838 }
1839
1840 dout(10) << __func__ << ": " << *op << dendl;
1841
1842 if (!op->remote_read.empty()) {
1843 assert(get_parent()->get_pool().allows_ecoverwrites());
1844 objects_read_async_no_cache(
1845 op->remote_read,
1846 [this, op](map<hobject_t,pair<int, extent_map> > &&results) {
1847 for (auto &&i: results) {
1848 op->remote_read_result.emplace(i.first, i.second.second);
1849 }
1850 check_ops();
1851 });
1852 }
1853
1854 return true;
1855}
1856
1857bool ECBackend::try_reads_to_commit()
1858{
1859 if (waiting_reads.empty())
1860 return false;
1861 Op *op = &(waiting_reads.front());
1862 if (op->read_in_progress())
1863 return false;
1864 waiting_reads.pop_front();
1865 waiting_commit.push_back(*op);
1866
1867 dout(10) << __func__ << ": starting commit on " << *op << dendl;
1868 dout(20) << __func__ << ": " << cache << dendl;
1869
1870 get_parent()->apply_stats(
1871 op->hoid,
1872 op->delta_stats);
1873
1874 if (op->using_cache) {
1875 for (auto &&hpair: op->pending_read) {
1876 op->remote_read_result[hpair.first].insert(
1877 cache.get_remaining_extents_for_rmw(
1878 hpair.first,
1879 op->pin,
1880 hpair.second));
1881 }
1882 op->pending_read.clear();
1883 } else {
1884 assert(op->pending_read.empty());
1885 }
1886
1887 map<shard_id_t, ObjectStore::Transaction> trans;
1888 for (set<pg_shard_t>::const_iterator i =
1889 get_parent()->get_actingbackfill_shards().begin();
1890 i != get_parent()->get_actingbackfill_shards().end();
1891 ++i) {
1892 trans[i->shard];
1893 }
1894
1895 op->trace.event("start ec write");
1896
1897 map<hobject_t,extent_map> written;
1898 if (op->plan.t) {
1899 ECTransaction::generate_transactions(
1900 op->plan,
1901 ec_impl,
1902 get_parent()->get_info().pgid.pgid,
31f18b77 1903 (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
7c673cae
FG
1904 sinfo,
1905 op->remote_read_result,
1906 op->log_entries,
1907 &written,
1908 &trans,
1909 &(op->temp_added),
1910 &(op->temp_cleared),
1911 get_parent()->get_dpp());
1912 }
1913
1914 dout(20) << __func__ << ": " << cache << dendl;
1915 dout(20) << __func__ << ": written: " << written << dendl;
1916 dout(20) << __func__ << ": op: " << *op << dendl;
1917
1918 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1919 for (auto &&i: op->log_entries) {
1920 if (i.requires_kraken()) {
1921 derr << __func__ << ": log entry " << i << " requires kraken"
1922 << " but overwrites are not enabled!" << dendl;
1923 ceph_abort();
1924 }
1925 }
1926 }
1927
1928 map<hobject_t,extent_set> written_set;
1929 for (auto &&i: written) {
1930 written_set[i.first] = i.second.get_interval_set();
1931 }
1932 dout(20) << __func__ << ": written_set: " << written_set << dendl;
1933 assert(written_set == op->plan.will_write);
1934
1935 if (op->using_cache) {
1936 for (auto &&hpair: written) {
1937 dout(20) << __func__ << ": " << hpair << dendl;
1938 cache.present_rmw_update(hpair.first, op->pin, hpair.second);
1939 }
1940 }
1941 op->remote_read.clear();
1942 op->remote_read_result.clear();
1943
1944 dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
1945 ObjectStore::Transaction empty;
1946 bool should_write_local = false;
1947 ECSubWrite local_write_op;
1948 for (set<pg_shard_t>::const_iterator i =
1949 get_parent()->get_actingbackfill_shards().begin();
1950 i != get_parent()->get_actingbackfill_shards().end();
1951 ++i) {
1952 op->pending_apply.insert(*i);
1953 op->pending_commit.insert(*i);
1954 map<shard_id_t, ObjectStore::Transaction>::iterator iter =
1955 trans.find(i->shard);
1956 assert(iter != trans.end());
1957 bool should_send = get_parent()->should_send_op(*i, op->hoid);
1958 const pg_stat_t &stats =
1959 should_send ?
1960 get_info().stats :
1961 parent->get_shard_info().find(*i)->second.stats;
1962
1963 ECSubWrite sop(
1964 get_parent()->whoami_shard(),
1965 op->tid,
1966 op->reqid,
1967 op->hoid,
1968 stats,
1969 should_send ? iter->second : empty,
1970 op->version,
1971 op->trim_to,
1972 op->roll_forward_to,
1973 op->log_entries,
1974 op->updated_hit_set_history,
1975 op->temp_added,
1976 op->temp_cleared,
1977 !should_send);
1978
1979 ZTracer::Trace trace;
1980 if (op->trace) {
1981 // initialize a child span for this shard
1982 trace.init("ec sub write", nullptr, &op->trace);
1983 trace.keyval("shard", i->shard.id);
1984 }
1985
1986 if (*i == get_parent()->whoami_shard()) {
1987 should_write_local = true;
1988 local_write_op.claim(sop);
1989 } else {
1990 MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop);
1991 r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard);
1992 r->map_epoch = get_parent()->get_epoch();
1993 r->min_epoch = get_parent()->get_interval_start_epoch();
1994 r->trace = trace;
1995 get_parent()->send_message_osd_cluster(
1996 i->osd, r, get_parent()->get_epoch());
1997 }
1998 }
1999 if (should_write_local) {
2000 handle_sub_write(
2001 get_parent()->whoami_shard(),
2002 op->client_op,
2003 local_write_op,
2004 op->trace,
2005 op->on_local_applied_sync);
2006 op->on_local_applied_sync = 0;
2007 }
2008
2009 for (auto i = op->on_write.begin();
2010 i != op->on_write.end();
2011 op->on_write.erase(i++)) {
2012 (*i)();
2013 }
2014
2015 return true;
2016}
2017
2018bool ECBackend::try_finish_rmw()
2019{
2020 if (waiting_commit.empty())
2021 return false;
2022 Op *op = &(waiting_commit.front());
2023 if (op->write_in_progress())
2024 return false;
2025 waiting_commit.pop_front();
2026
2027 dout(10) << __func__ << ": " << *op << dendl;
2028 dout(20) << __func__ << ": " << cache << dendl;
2029
2030 if (op->roll_forward_to > completed_to)
2031 completed_to = op->roll_forward_to;
2032 if (op->version > committed_to)
2033 committed_to = op->version;
2034
31f18b77 2035 if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
7c673cae
FG
2036 if (op->version > get_parent()->get_log().get_can_rollback_to() &&
2037 waiting_reads.empty() &&
2038 waiting_commit.empty()) {
2039 // submit a dummy transaction to kick the rollforward
2040 auto tid = get_parent()->get_tid();
2041 Op *nop = &(tid_to_op_map[tid]);
2042 nop->hoid = op->hoid;
2043 nop->trim_to = op->trim_to;
2044 nop->roll_forward_to = op->version;
2045 nop->tid = tid;
2046 nop->reqid = op->reqid;
2047 waiting_reads.push_back(*nop);
2048 }
2049 }
2050
2051 if (op->using_cache) {
2052 cache.release_write_pin(op->pin);
2053 }
2054 tid_to_op_map.erase(op->tid);
2055
2056 if (waiting_reads.empty() &&
2057 waiting_commit.empty()) {
2058 pipeline_state.clear();
2059 dout(20) << __func__ << ": clearing pipeline_state "
2060 << pipeline_state
2061 << dendl;
2062 }
2063 return true;
2064}
2065
2066void ECBackend::check_ops()
2067{
2068 while (try_state_to_reads() ||
2069 try_reads_to_commit() ||
2070 try_finish_rmw());
2071}
2072
2073int ECBackend::objects_read_sync(
2074 const hobject_t &hoid,
2075 uint64_t off,
2076 uint64_t len,
2077 uint32_t op_flags,
2078 bufferlist *bl)
2079{
2080 return -EOPNOTSUPP;
2081}
2082
2083void ECBackend::objects_read_async(
2084 const hobject_t &hoid,
2085 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2086 pair<bufferlist*, Context*> > > &to_read,
2087 Context *on_complete,
2088 bool fast_read)
2089{
2090 map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
2091 reads;
2092
2093 uint32_t flags = 0;
2094 extent_set es;
2095 for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2096 pair<bufferlist*, Context*> > >::const_iterator i =
2097 to_read.begin();
2098 i != to_read.end();
2099 ++i) {
2100 pair<uint64_t, uint64_t> tmp =
2101 sinfo.offset_len_to_stripe_bounds(
2102 make_pair(i->first.get<0>(), i->first.get<1>()));
2103
2104 extent_set esnew;
2105 esnew.insert(tmp.first, tmp.second);
2106 es.union_of(esnew);
2107 flags |= i->first.get<2>();
2108 }
2109
2110 if (!es.empty()) {
2111 auto &offsets = reads[hoid];
2112 for (auto j = es.begin();
2113 j != es.end();
2114 ++j) {
2115 offsets.push_back(
2116 boost::make_tuple(
2117 j.get_start(),
2118 j.get_len(),
2119 flags));
2120 }
2121 }
2122
2123 struct cb {
2124 ECBackend *ec;
2125 hobject_t hoid;
2126 list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2127 pair<bufferlist*, Context*> > > to_read;
2128 unique_ptr<Context> on_complete;
2129 cb(const cb&) = delete;
2130 cb(cb &&) = default;
2131 cb(ECBackend *ec,
2132 const hobject_t &hoid,
2133 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2134 pair<bufferlist*, Context*> > > &to_read,
2135 Context *on_complete)
2136 : ec(ec),
2137 hoid(hoid),
2138 to_read(to_read),
2139 on_complete(on_complete) {}
2140 void operator()(map<hobject_t,pair<int, extent_map> > &&results) {
2141 auto dpp = ec->get_parent()->get_dpp();
2142 ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
2143 << dendl;
2144 ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
2145 << dendl;
2146
2147 auto &got = results[hoid];
2148
2149 int r = 0;
2150 for (auto &&read: to_read) {
2151 if (got.first < 0) {
2152 if (read.second.second) {
2153 read.second.second->complete(got.first);
2154 }
2155 if (r == 0)
2156 r = got.first;
2157 } else {
2158 assert(read.second.first);
2159 uint64_t offset = read.first.get<0>();
2160 uint64_t length = read.first.get<1>();
2161 auto range = got.second.get_containing_range(offset, length);
2162 assert(range.first != range.second);
2163 assert(range.first.get_off() <= offset);
2164 assert(
2165 (offset + length) <=
2166 (range.first.get_off() + range.first.get_len()));
2167 read.second.first->substr_of(
2168 range.first.get_val(),
2169 offset - range.first.get_off(),
2170 length);
2171 if (read.second.second) {
2172 read.second.second->complete(length);
2173 read.second.second = nullptr;
2174 }
2175 }
2176 }
2177 to_read.clear();
2178 if (on_complete) {
2179 on_complete.release()->complete(r);
2180 }
2181 }
2182 ~cb() {
2183 for (auto &&i: to_read) {
2184 delete i.second.second;
2185 }
2186 to_read.clear();
2187 }
2188 };
2189 objects_read_and_reconstruct(
2190 reads,
2191 fast_read,
2192 make_gen_lambda_context<
2193 map<hobject_t,pair<int, extent_map> > &&, cb>(
2194 cb(this,
2195 hoid,
2196 to_read,
2197 on_complete)));
2198}
2199
2200struct CallClientContexts :
2201 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
2202 hobject_t hoid;
2203 ECBackend *ec;
2204 ECBackend::ClientAsyncReadStatus *status;
2205 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
2206 CallClientContexts(
2207 hobject_t hoid,
2208 ECBackend *ec,
2209 ECBackend::ClientAsyncReadStatus *status,
2210 const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
2211 : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
2212 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
2213 ECBackend::read_result_t &res = in.second;
2214 extent_map result;
2215 if (res.r != 0)
2216 goto out;
2217 assert(res.returned.size() == to_read.size());
2218 assert(res.r == 0);
2219 assert(res.errors.empty());
2220 for (auto &&read: to_read) {
2221 pair<uint64_t, uint64_t> adjusted =
2222 ec->sinfo.offset_len_to_stripe_bounds(
2223 make_pair(read.get<0>(), read.get<1>()));
2224 assert(res.returned.front().get<0>() == adjusted.first &&
2225 res.returned.front().get<1>() == adjusted.second);
2226 map<int, bufferlist> to_decode;
2227 bufferlist bl;
2228 for (map<pg_shard_t, bufferlist>::iterator j =
2229 res.returned.front().get<2>().begin();
2230 j != res.returned.front().get<2>().end();
2231 ++j) {
2232 to_decode[j->first.shard].claim(j->second);
2233 }
2234 int r = ECUtil::decode(
2235 ec->sinfo,
2236 ec->ec_impl,
2237 to_decode,
2238 &bl);
2239 if (r < 0) {
2240 res.r = r;
2241 goto out;
2242 }
2243 bufferlist trimmed;
2244 trimmed.substr_of(
2245 bl,
2246 read.get<0>() - adjusted.first,
2247 MIN(read.get<1>(),
2248 bl.length() - (read.get<0>() - adjusted.first)));
2249 result.insert(
2250 read.get<0>(), trimmed.length(), std::move(trimmed));
2251 res.returned.pop_front();
2252 }
2253out:
2254 status->complete_object(hoid, res.r, std::move(result));
2255 ec->kick_reads();
2256 }
2257};
2258
2259void ECBackend::objects_read_and_reconstruct(
2260 const map<hobject_t,
2261 std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
2262 > &reads,
2263 bool fast_read,
2264 GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
2265{
2266 in_progress_client_reads.emplace_back(
2267 reads.size(), std::move(func));
2268 if (!reads.size()) {
2269 kick_reads();
2270 return;
2271 }
2272
2273 set<int> want_to_read;
2274 get_want_to_read_shards(&want_to_read);
2275
2276 map<hobject_t, read_request_t> for_read_op;
2277 for (auto &&to_read: reads) {
2278 set<pg_shard_t> shards;
2279 int r = get_min_avail_to_read_shards(
2280 to_read.first,
2281 want_to_read,
2282 false,
2283 fast_read,
2284 &shards);
2285 assert(r == 0);
2286
2287 CallClientContexts *c = new CallClientContexts(
2288 to_read.first,
2289 this,
2290 &(in_progress_client_reads.back()),
2291 to_read.second);
2292 for_read_op.insert(
2293 make_pair(
2294 to_read.first,
2295 read_request_t(
2296 to_read.second,
2297 shards,
2298 false,
2299 c)));
2300 }
2301
2302 start_read_op(
2303 CEPH_MSG_PRIO_DEFAULT,
2304 for_read_op,
2305 OpRequestRef(),
2306 fast_read, false);
2307 return;
2308}
2309
2310
2311int ECBackend::send_all_remaining_reads(
2312 const hobject_t &hoid,
2313 ReadOp &rop)
2314{
2315 set<int> already_read;
2316 const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
2317 for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
2318 already_read.insert(i->shard);
2319 dout(10) << __func__ << " have/error shards=" << already_read << dendl;
2320 set<pg_shard_t> shards;
2321 int r = get_remaining_shards(hoid, already_read, &shards);
2322 if (r)
2323 return r;
2324 if (shards.empty())
2325 return -EIO;
2326
2327 dout(10) << __func__ << " Read remaining shards " << shards << dendl;
2328
2329 // TODOSAM: this doesn't seem right
2330 list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
2331 rop.to_read.find(hoid)->second.to_read;
2332 GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
2333 rop.to_read.find(hoid)->second.cb;
2334
2335 map<hobject_t, read_request_t> for_read_op;
2336 for_read_op.insert(
2337 make_pair(
2338 hoid,
2339 read_request_t(
2340 offsets,
2341 shards,
2342 false,
2343 c)));
2344
2345 rop.to_read.swap(for_read_op);
2346 do_read_op(rop);
2347 return 0;
2348}
2349
2350int ECBackend::objects_get_attrs(
2351 const hobject_t &hoid,
2352 map<string, bufferlist> *out)
2353{
2354 int r = store->getattrs(
2355 ch,
2356 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2357 *out);
2358 if (r < 0)
2359 return r;
2360
2361 for (map<string, bufferlist>::iterator i = out->begin();
2362 i != out->end();
2363 ) {
2364 if (ECUtil::is_hinfo_key_string(i->first))
2365 out->erase(i++);
2366 else
2367 ++i;
2368 }
2369 return r;
2370}
2371
2372void ECBackend::rollback_append(
2373 const hobject_t &hoid,
2374 uint64_t old_size,
2375 ObjectStore::Transaction *t)
2376{
2377 assert(old_size % sinfo.get_stripe_width() == 0);
2378 t->truncate(
2379 coll,
2380 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2381 sinfo.aligned_logical_offset_to_chunk_offset(
2382 old_size));
2383}
2384
2385void ECBackend::be_deep_scrub(
2386 const hobject_t &poid,
2387 uint32_t seed,
2388 ScrubMap::object &o,
2389 ThreadPool::TPHandle &handle) {
2390 bufferhash h(-1); // we always used -1
2391 int r;
2392 uint64_t stride = cct->_conf->osd_deep_scrub_stride;
2393 if (stride % sinfo.get_chunk_size())
2394 stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
2395 uint64_t pos = 0;
2396
2397 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
2398
2399 while (true) {
2400 bufferlist bl;
2401 handle.reset_tp_timeout();
2402 r = store->read(
2403 ch,
2404 ghobject_t(
2405 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2406 pos,
2407 stride, bl,
224ce89b 2408 fadvise_flags);
7c673cae
FG
2409 if (r < 0)
2410 break;
2411 if (bl.length() % sinfo.get_chunk_size()) {
2412 r = -EIO;
2413 break;
2414 }
2415 pos += r;
2416 h << bl;
2417 if ((unsigned)r < stride)
2418 break;
2419 }
2420
2421 if (r == -EIO) {
2422 dout(0) << "_scan_list " << poid << " got "
2423 << r << " on read, read_error" << dendl;
2424 o.read_error = true;
2425 return;
2426 }
2427
2428 ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
2429 if (!hinfo) {
2430 dout(0) << "_scan_list " << poid << " could not retrieve hash info" << dendl;
2431 o.read_error = true;
2432 o.digest_present = false;
2433 return;
2434 } else {
2435 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2436 assert(hinfo->has_chunk_hash());
2437 if (hinfo->get_total_chunk_size() != pos) {
2438 dout(0) << "_scan_list " << poid << " got incorrect size on read" << dendl;
2439 o.ec_size_mismatch = true;
2440 return;
2441 }
2442
2443 if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) {
2444 dout(0) << "_scan_list " << poid << " got incorrect hash on read" << dendl;
2445 o.ec_hash_mismatch = true;
2446 return;
2447 }
2448
2449 /* We checked above that we match our own stored hash. We cannot
2450 * send a hash of the actual object, so instead we simply send
2451 * our locally stored hash of shard 0 on the assumption that if
2452 * we match our chunk hash and our recollection of the hash for
2453 * chunk 0 matches that of our peers, there is likely no corruption.
2454 */
2455 o.digest = hinfo->get_chunk_hash(0);
2456 o.digest_present = true;
2457 } else {
2458 /* Hack! We must be using partial overwrites, and partial overwrites
2459 * don't support deep-scrub yet
2460 */
2461 o.digest = 0;
2462 o.digest_present = true;
2463 }
2464 }
2465
2466 o.omap_digest = seed;
2467 o.omap_digest_present = true;
2468}