]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/ECBackend.cc
update sources to v12.1.1
[ceph.git] / ceph / src / osd / ECBackend.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <iostream>
16#include <sstream>
17
18#include "ECBackend.h"
19#include "messages/MOSDPGPush.h"
20#include "messages/MOSDPGPushReply.h"
21#include "messages/MOSDECSubOpWrite.h"
22#include "messages/MOSDECSubOpWriteReply.h"
23#include "messages/MOSDECSubOpRead.h"
24#include "messages/MOSDECSubOpReadReply.h"
25#include "ECMsgTypes.h"
26
27#include "PrimaryLogPG.h"
28
29#define dout_context cct
30#define dout_subsys ceph_subsys_osd
31#define DOUT_PREFIX_ARGS this
32#undef dout_prefix
33#define dout_prefix _prefix(_dout, this)
34static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
35 return *_dout << pgb->get_parent()->gen_dbg_prefix();
36}
37
38struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
39 list<ECBackend::RecoveryOp> ops;
40};
41
42ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
43 switch (rhs.pipeline_state) {
44 case ECBackend::pipeline_state_t::CACHE_VALID:
45 return lhs << "CACHE_VALID";
46 case ECBackend::pipeline_state_t::CACHE_INVALID:
47 return lhs << "CACHE_INVALID";
48 default:
49 assert(0 == "invalid pipeline state");
50 }
51 return lhs; // unreachable
52}
53
54static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
55{
56 lhs << "[";
57 for (map<pg_shard_t, bufferlist>::const_iterator i = rhs.begin();
58 i != rhs.end();
59 ++i) {
60 if (i != rhs.begin())
61 lhs << ", ";
62 lhs << make_pair(i->first, i->second.length());
63 }
64 return lhs << "]";
65}
66
67static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
68{
69 lhs << "[";
70 for (map<int, bufferlist>::const_iterator i = rhs.begin();
71 i != rhs.end();
72 ++i) {
73 if (i != rhs.begin())
74 lhs << ", ";
75 lhs << make_pair(i->first, i->second.length());
76 }
77 return lhs << "]";
78}
79
80static ostream &operator<<(
81 ostream &lhs,
82 const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
83{
84 return lhs << "(" << rhs.get<0>() << ", "
85 << rhs.get<1>() << ", " << rhs.get<2>() << ")";
86}
87
88ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
89{
90 return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
91 << ", need=" << rhs.need
92 << ", want_attrs=" << rhs.want_attrs
93 << ")";
94}
95
96ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
97{
98 lhs << "read_result_t(r=" << rhs.r
99 << ", errors=" << rhs.errors;
100 if (rhs.attrs) {
101 lhs << ", attrs=" << rhs.attrs.get();
102 } else {
103 lhs << ", noattrs";
104 }
105 return lhs << ", returned=" << rhs.returned << ")";
106}
107
108ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
109{
110 lhs << "ReadOp(tid=" << rhs.tid;
111 if (rhs.op && rhs.op->get_req()) {
112 lhs << ", op=";
113 rhs.op->get_req()->print(lhs);
114 }
115 return lhs << ", to_read=" << rhs.to_read
116 << ", complete=" << rhs.complete
117 << ", priority=" << rhs.priority
118 << ", obj_to_source=" << rhs.obj_to_source
119 << ", source_to_obj=" << rhs.source_to_obj
120 << ", in_progress=" << rhs.in_progress << ")";
121}
122
123void ECBackend::ReadOp::dump(Formatter *f) const
124{
125 f->dump_unsigned("tid", tid);
126 if (op && op->get_req()) {
127 f->dump_stream("op") << *(op->get_req());
128 }
129 f->dump_stream("to_read") << to_read;
130 f->dump_stream("complete") << complete;
131 f->dump_int("priority", priority);
132 f->dump_stream("obj_to_source") << obj_to_source;
133 f->dump_stream("source_to_obj") << source_to_obj;
134 f->dump_stream("in_progress") << in_progress;
135}
136
137ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
138{
139 lhs << "Op(" << rhs.hoid
140 << " v=" << rhs.version
141 << " tt=" << rhs.trim_to
142 << " tid=" << rhs.tid
143 << " reqid=" << rhs.reqid;
144 if (rhs.client_op && rhs.client_op->get_req()) {
145 lhs << " client_op=";
146 rhs.client_op->get_req()->print(lhs);
147 }
148 lhs << " roll_forward_to=" << rhs.roll_forward_to
149 << " temp_added=" << rhs.temp_added
150 << " temp_cleared=" << rhs.temp_cleared
151 << " pending_read=" << rhs.pending_read
152 << " remote_read=" << rhs.remote_read
153 << " remote_read_result=" << rhs.remote_read_result
154 << " pending_apply=" << rhs.pending_apply
155 << " pending_commit=" << rhs.pending_commit
156 << " plan.to_read=" << rhs.plan.to_read
157 << " plan.will_write=" << rhs.plan.will_write
158 << ")";
159 return lhs;
160}
161
162ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
163{
164 return lhs << "RecoveryOp("
165 << "hoid=" << rhs.hoid
166 << " v=" << rhs.v
167 << " missing_on=" << rhs.missing_on
168 << " missing_on_shards=" << rhs.missing_on_shards
169 << " recovery_info=" << rhs.recovery_info
170 << " recovery_progress=" << rhs.recovery_progress
171 << " obc refcount=" << rhs.obc.use_count()
172 << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
173 << " waiting_on_pushes=" << rhs.waiting_on_pushes
174 << " extent_requested=" << rhs.extent_requested
175 << ")";
176}
177
178void ECBackend::RecoveryOp::dump(Formatter *f) const
179{
180 f->dump_stream("hoid") << hoid;
181 f->dump_stream("v") << v;
182 f->dump_stream("missing_on") << missing_on;
183 f->dump_stream("missing_on_shards") << missing_on_shards;
184 f->dump_stream("recovery_info") << recovery_info;
185 f->dump_stream("recovery_progress") << recovery_progress;
186 f->dump_stream("state") << tostr(state);
187 f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
188 f->dump_stream("extent_requested") << extent_requested;
189}
190
191ECBackend::ECBackend(
192 PGBackend::Listener *pg,
193 coll_t coll,
194 ObjectStore::CollectionHandle &ch,
195 ObjectStore *store,
196 CephContext *cct,
197 ErasureCodeInterfaceRef ec_impl,
198 uint64_t stripe_width)
199 : PGBackend(cct, pg, store, coll, ch),
200 ec_impl(ec_impl),
201 sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
202 assert((ec_impl->get_data_chunk_count() *
203 ec_impl->get_chunk_size(stripe_width)) == stripe_width);
204}
205
206PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
207{
208 return new ECRecoveryHandle;
209}
210
211void ECBackend::_failed_push(const hobject_t &hoid,
212 pair<RecoveryMessages *, ECBackend::read_result_t &> &in)
213{
214 ECBackend::read_result_t &res = in.second;
215 dout(10) << __func__ << ": Read error " << hoid << " r="
216 << res.r << " errors=" << res.errors << dendl;
217 dout(10) << __func__ << ": canceling recovery op for obj " << hoid
218 << dendl;
219 assert(recovery_ops.count(hoid));
220 recovery_ops.erase(hoid);
221
222 list<pg_shard_t> fl;
223 for (auto&& i : res.errors) {
224 fl.push_back(i.first);
225 }
226 get_parent()->failed_push(fl, hoid);
227}
228
229struct OnRecoveryReadComplete :
230 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
231 ECBackend *pg;
232 hobject_t hoid;
233 set<int> want;
234 OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid)
235 : pg(pg), hoid(hoid) {}
236 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
237 ECBackend::read_result_t &res = in.second;
238 if (!(res.r == 0 && res.errors.empty())) {
239 pg->_failed_push(hoid, in);
240 return;
241 }
242 assert(res.returned.size() == 1);
243 pg->handle_recovery_read_complete(
244 hoid,
245 res.returned.back(),
246 res.attrs,
247 in.first);
248 }
249};
250
251struct RecoveryMessages {
252 map<hobject_t,
253 ECBackend::read_request_t> reads;
254 void read(
255 ECBackend *ec,
256 const hobject_t &hoid, uint64_t off, uint64_t len,
257 const set<pg_shard_t> &need,
258 bool attrs) {
259 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
260 to_read.push_back(boost::make_tuple(off, len, 0));
261 assert(!reads.count(hoid));
262 reads.insert(
263 make_pair(
264 hoid,
265 ECBackend::read_request_t(
266 to_read,
267 need,
268 attrs,
269 new OnRecoveryReadComplete(
270 ec,
271 hoid))));
272 }
273
274 map<pg_shard_t, vector<PushOp> > pushes;
275 map<pg_shard_t, vector<PushReplyOp> > push_replies;
276 ObjectStore::Transaction t;
277 RecoveryMessages() {}
278 ~RecoveryMessages(){}
279};
280
281void ECBackend::handle_recovery_push(
282 const PushOp &op,
283 RecoveryMessages *m)
284{
285 ostringstream ss;
286 if (get_parent()->check_failsafe_full(ss)) {
287 dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl;
288 ceph_abort();
289 }
290
291 bool oneshot = op.before_progress.first && op.after_progress.data_complete;
292 ghobject_t tobj;
293 if (oneshot) {
294 tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
295 get_parent()->whoami_shard().shard);
296 } else {
297 tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
298 op.version),
299 ghobject_t::NO_GEN,
300 get_parent()->whoami_shard().shard);
301 if (op.before_progress.first) {
302 dout(10) << __func__ << ": Adding oid "
303 << tobj.hobj << " in the temp collection" << dendl;
304 add_temp_obj(tobj.hobj);
305 }
306 }
307
308 if (op.before_progress.first) {
309 m->t.remove(coll, tobj);
310 m->t.touch(coll, tobj);
311 }
312
313 if (!op.data_included.empty()) {
314 uint64_t start = op.data_included.range_start();
315 uint64_t end = op.data_included.range_end();
316 assert(op.data.length() == (end - start));
317
318 m->t.write(
319 coll,
320 tobj,
321 start,
322 op.data.length(),
323 op.data);
324 } else {
325 assert(op.data.length() == 0);
326 }
327
328 if (op.before_progress.first) {
329 assert(op.attrset.count(string("_")));
330 m->t.setattrs(
331 coll,
332 tobj,
333 op.attrset);
334 }
335
336 if (op.after_progress.data_complete && !oneshot) {
337 dout(10) << __func__ << ": Removing oid "
338 << tobj.hobj << " from the temp collection" << dendl;
339 clear_temp_obj(tobj.hobj);
340 m->t.remove(coll, ghobject_t(
341 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
342 m->t.collection_move_rename(
343 coll, tobj,
344 coll, ghobject_t(
345 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
346 }
347 if (op.after_progress.data_complete) {
348 if ((get_parent()->pgb_is_primary())) {
349 assert(recovery_ops.count(op.soid));
350 assert(recovery_ops[op.soid].obc);
351 get_parent()->on_local_recover(
352 op.soid,
353 op.recovery_info,
354 recovery_ops[op.soid].obc,
355 &m->t);
356 } else {
357 get_parent()->on_local_recover(
358 op.soid,
359 op.recovery_info,
360 ObjectContextRef(),
361 &m->t);
362 }
363 }
364 m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
365 m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
366}
367
368void ECBackend::handle_recovery_push_reply(
369 const PushReplyOp &op,
370 pg_shard_t from,
371 RecoveryMessages *m)
372{
373 if (!recovery_ops.count(op.soid))
374 return;
375 RecoveryOp &rop = recovery_ops[op.soid];
376 assert(rop.waiting_on_pushes.count(from));
377 rop.waiting_on_pushes.erase(from);
378 continue_recovery_op(rop, m);
379}
380
381void ECBackend::handle_recovery_read_complete(
382 const hobject_t &hoid,
383 boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
384 boost::optional<map<string, bufferlist> > attrs,
385 RecoveryMessages *m)
386{
387 dout(10) << __func__ << ": returned " << hoid << " "
388 << "(" << to_read.get<0>()
389 << ", " << to_read.get<1>()
390 << ", " << to_read.get<2>()
391 << ")"
392 << dendl;
393 assert(recovery_ops.count(hoid));
394 RecoveryOp &op = recovery_ops[hoid];
395 assert(op.returned_data.empty());
396 map<int, bufferlist*> target;
397 for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
398 i != op.missing_on_shards.end();
399 ++i) {
400 target[*i] = &(op.returned_data[*i]);
401 }
402 map<int, bufferlist> from;
403 for(map<pg_shard_t, bufferlist>::iterator i = to_read.get<2>().begin();
404 i != to_read.get<2>().end();
405 ++i) {
406 from[i->first.shard].claim(i->second);
407 }
408 dout(10) << __func__ << ": " << from << dendl;
409 int r = ECUtil::decode(sinfo, ec_impl, from, target);
410 assert(r == 0);
411 if (attrs) {
412 op.xattrs.swap(*attrs);
413
414 if (!op.obc) {
415 // attrs only reference the origin bufferlist (decode from
416 // ECSubReadReply message) whose size is much greater than attrs
417 // in recovery. If obc cache it (get_obc maybe cache the attr),
418 // this causes the whole origin bufferlist would not be free
419 // until obc is evicted from obc cache. So rebuild the
420 // bufferlist before cache it.
421 for (map<string, bufferlist>::iterator it = op.xattrs.begin();
422 it != op.xattrs.end();
423 ++it) {
424 it->second.rebuild();
425 }
426 // Need to remove ECUtil::get_hinfo_key() since it should not leak out
427 // of the backend (see bug #12983)
428 map<string, bufferlist> sanitized_attrs(op.xattrs);
429 sanitized_attrs.erase(ECUtil::get_hinfo_key());
430 op.obc = get_parent()->get_obc(hoid, sanitized_attrs);
431 assert(op.obc);
432 op.recovery_info.size = op.obc->obs.oi.size;
433 op.recovery_info.oi = op.obc->obs.oi;
434 }
435
436 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
437 if (op.obc->obs.oi.size > 0) {
438 assert(op.xattrs.count(ECUtil::get_hinfo_key()));
439 bufferlist::iterator bp = op.xattrs[ECUtil::get_hinfo_key()].begin();
440 ::decode(hinfo, bp);
441 }
442 op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
443 }
444 assert(op.xattrs.size());
445 assert(op.obc);
446 continue_recovery_op(op, m);
447}
448
449struct SendPushReplies : public Context {
450 PGBackend::Listener *l;
451 epoch_t epoch;
452 map<int, MOSDPGPushReply*> replies;
453 SendPushReplies(
454 PGBackend::Listener *l,
455 epoch_t epoch,
456 map<int, MOSDPGPushReply*> &in) : l(l), epoch(epoch) {
457 replies.swap(in);
458 }
459 void finish(int) override {
460 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
461 i != replies.end();
462 ++i) {
463 l->send_message_osd_cluster(i->first, i->second, epoch);
464 }
465 replies.clear();
466 }
467 ~SendPushReplies() override {
468 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
469 i != replies.end();
470 ++i) {
471 i->second->put();
472 }
473 replies.clear();
474 }
475};
476
477void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
478{
479 for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
480 i != m.pushes.end();
481 m.pushes.erase(i++)) {
482 MOSDPGPush *msg = new MOSDPGPush();
483 msg->set_priority(priority);
484 msg->map_epoch = get_parent()->get_epoch();
485 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
486 msg->from = get_parent()->whoami_shard();
487 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
488 msg->pushes.swap(i->second);
489 msg->compute_cost(cct);
490 get_parent()->send_message(
491 i->first.osd,
492 msg);
493 }
494 map<int, MOSDPGPushReply*> replies;
495 for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
496 m.push_replies.begin();
497 i != m.push_replies.end();
498 m.push_replies.erase(i++)) {
499 MOSDPGPushReply *msg = new MOSDPGPushReply();
500 msg->set_priority(priority);
501 msg->map_epoch = get_parent()->get_epoch();
502 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
503 msg->from = get_parent()->whoami_shard();
504 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
505 msg->replies.swap(i->second);
506 msg->compute_cost(cct);
507 replies.insert(make_pair(i->first.osd, msg));
508 }
509
510 if (!replies.empty()) {
511 (m.t).register_on_complete(
512 get_parent()->bless_context(
513 new SendPushReplies(
514 get_parent(),
515 get_parent()->get_epoch(),
516 replies)));
517 get_parent()->queue_transaction(std::move(m.t));
518 }
519
520 if (m.reads.empty())
521 return;
522 start_read_op(
523 priority,
524 m.reads,
525 OpRequestRef(),
526 false, true);
527}
528
529void ECBackend::continue_recovery_op(
530 RecoveryOp &op,
531 RecoveryMessages *m)
532{
533 dout(10) << __func__ << ": continuing " << op << dendl;
534 while (1) {
535 switch (op.state) {
536 case RecoveryOp::IDLE: {
537 // start read
538 op.state = RecoveryOp::READING;
539 assert(!op.recovery_progress.data_complete);
540 set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
541 uint64_t from = op.recovery_progress.data_recovered_to;
542 uint64_t amount = get_recovery_chunk_size();
543
544 if (op.recovery_progress.first && op.obc) {
545 /* We've got the attrs and the hinfo, might as well use them */
546 op.hinfo = get_hash_info(op.hoid);
547 assert(op.hinfo);
548 op.xattrs = op.obc->attr_cache;
549 ::encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
550 }
551
552 set<pg_shard_t> to_read;
553 int r = get_min_avail_to_read_shards(
554 op.hoid, want, true, false, &to_read);
555 if (r != 0) {
556 // we must have lost a recovery source
557 assert(!op.recovery_progress.first);
558 dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
559 << dendl;
560 get_parent()->cancel_pull(op.hoid);
561 recovery_ops.erase(op.hoid);
562 return;
563 }
564 m->read(
565 this,
566 op.hoid,
567 op.recovery_progress.data_recovered_to,
568 amount,
569 to_read,
570 op.recovery_progress.first && !op.obc);
571 op.extent_requested = make_pair(
572 from,
573 amount);
574 dout(10) << __func__ << ": IDLE return " << op << dendl;
575 return;
576 }
577 case RecoveryOp::READING: {
578 // read completed, start write
579 assert(op.xattrs.size());
580 assert(op.returned_data.size());
581 op.state = RecoveryOp::WRITING;
582 ObjectRecoveryProgress after_progress = op.recovery_progress;
583 after_progress.data_recovered_to += op.extent_requested.second;
584 after_progress.first = false;
585 if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
586 after_progress.data_recovered_to =
587 sinfo.logical_to_next_stripe_offset(
588 op.obc->obs.oi.size);
589 after_progress.data_complete = true;
590 }
591 for (set<pg_shard_t>::iterator mi = op.missing_on.begin();
592 mi != op.missing_on.end();
593 ++mi) {
594 assert(op.returned_data.count(mi->shard));
595 m->pushes[*mi].push_back(PushOp());
596 PushOp &pop = m->pushes[*mi].back();
597 pop.soid = op.hoid;
598 pop.version = op.v;
599 pop.data = op.returned_data[mi->shard];
600 dout(10) << __func__ << ": before_progress=" << op.recovery_progress
601 << ", after_progress=" << after_progress
602 << ", pop.data.length()=" << pop.data.length()
603 << ", size=" << op.obc->obs.oi.size << dendl;
604 assert(
605 pop.data.length() ==
606 sinfo.aligned_logical_offset_to_chunk_offset(
607 after_progress.data_recovered_to -
608 op.recovery_progress.data_recovered_to)
609 );
610 if (pop.data.length())
611 pop.data_included.insert(
612 sinfo.aligned_logical_offset_to_chunk_offset(
613 op.recovery_progress.data_recovered_to),
614 pop.data.length()
615 );
616 if (op.recovery_progress.first) {
617 pop.attrset = op.xattrs;
618 }
619 pop.recovery_info = op.recovery_info;
620 pop.before_progress = op.recovery_progress;
621 pop.after_progress = after_progress;
622 if (*mi != get_parent()->primary_shard())
623 get_parent()->begin_peer_recover(
624 *mi,
625 op.hoid);
626 }
627 op.returned_data.clear();
628 op.waiting_on_pushes = op.missing_on;
629 op.recovery_progress = after_progress;
630 dout(10) << __func__ << ": READING return " << op << dendl;
631 return;
632 }
633 case RecoveryOp::WRITING: {
634 if (op.waiting_on_pushes.empty()) {
635 if (op.recovery_progress.data_complete) {
636 op.state = RecoveryOp::COMPLETE;
637 for (set<pg_shard_t>::iterator i = op.missing_on.begin();
638 i != op.missing_on.end();
639 ++i) {
640 if (*i != get_parent()->primary_shard()) {
641 dout(10) << __func__ << ": on_peer_recover on " << *i
642 << ", obj " << op.hoid << dendl;
643 get_parent()->on_peer_recover(
644 *i,
645 op.hoid,
646 op.recovery_info);
647 }
648 }
649 object_stat_sum_t stat;
650 stat.num_bytes_recovered = op.recovery_info.size;
651 stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
652 stat.num_objects_recovered = 1;
653 get_parent()->on_global_recover(op.hoid, stat);
654 dout(10) << __func__ << ": WRITING return " << op << dendl;
655 recovery_ops.erase(op.hoid);
656 return;
657 } else {
658 op.state = RecoveryOp::IDLE;
659 dout(10) << __func__ << ": WRITING continue " << op << dendl;
660 continue;
661 }
662 }
663 return;
664 }
665 // should never be called once complete
666 case RecoveryOp::COMPLETE:
667 default: {
668 ceph_abort();
669 };
670 }
671 }
672}
673
674void ECBackend::run_recovery_op(
675 RecoveryHandle *_h,
676 int priority)
677{
678 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
679 RecoveryMessages m;
680 for (list<RecoveryOp>::iterator i = h->ops.begin();
681 i != h->ops.end();
682 ++i) {
683 dout(10) << __func__ << ": starting " << *i << dendl;
684 assert(!recovery_ops.count(i->hoid));
685 RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
686 continue_recovery_op(op, &m);
687 }
688 dispatch_recovery_messages(m, priority);
689 delete _h;
690}
691
224ce89b 692int ECBackend::recover_object(
7c673cae
FG
693 const hobject_t &hoid,
694 eversion_t v,
695 ObjectContextRef head,
696 ObjectContextRef obc,
697 RecoveryHandle *_h)
698{
699 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
700 h->ops.push_back(RecoveryOp());
701 h->ops.back().v = v;
702 h->ops.back().hoid = hoid;
703 h->ops.back().obc = obc;
704 h->ops.back().recovery_info.soid = hoid;
705 h->ops.back().recovery_info.version = v;
706 if (obc) {
707 h->ops.back().recovery_info.size = obc->obs.oi.size;
708 h->ops.back().recovery_info.oi = obc->obs.oi;
709 }
710 if (hoid.is_snap()) {
711 if (obc) {
712 assert(obc->ssc);
713 h->ops.back().recovery_info.ss = obc->ssc->snapset;
714 } else if (head) {
715 assert(head->ssc);
716 h->ops.back().recovery_info.ss = head->ssc->snapset;
717 } else {
718 assert(0 == "neither obc nor head set for a snap object");
719 }
720 }
721 h->ops.back().recovery_progress.omap_complete = true;
722 for (set<pg_shard_t>::const_iterator i =
723 get_parent()->get_actingbackfill_shards().begin();
724 i != get_parent()->get_actingbackfill_shards().end();
725 ++i) {
726 dout(10) << "checking " << *i << dendl;
727 if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
728 h->ops.back().missing_on.insert(*i);
729 h->ops.back().missing_on_shards.insert(i->shard);
730 }
731 }
732 dout(10) << __func__ << ": built op " << h->ops.back() << dendl;
224ce89b 733 return 0;
7c673cae
FG
734}
735
736bool ECBackend::can_handle_while_inactive(
737 OpRequestRef _op)
738{
739 return false;
740}
741
742bool ECBackend::handle_message(
743 OpRequestRef _op)
744{
745 dout(10) << __func__ << ": " << *_op->get_req() << dendl;
746 int priority = _op->get_req()->get_priority();
747 switch (_op->get_req()->get_type()) {
748 case MSG_OSD_EC_WRITE: {
749 // NOTE: this is non-const because handle_sub_write modifies the embedded
750 // ObjectStore::Transaction in place (and then std::move's it). It does
751 // not conflict with ECSubWrite's operator<<.
752 MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
753 _op->get_nonconst_req());
754 handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
755 return true;
756 }
757 case MSG_OSD_EC_WRITE_REPLY: {
758 const MOSDECSubOpWriteReply *op = static_cast<const MOSDECSubOpWriteReply*>(
759 _op->get_req());
760 handle_sub_write_reply(op->op.from, op->op, _op->pg_trace);
761 return true;
762 }
763 case MSG_OSD_EC_READ: {
764 const MOSDECSubOpRead *op = static_cast<const MOSDECSubOpRead*>(_op->get_req());
765 MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
766 reply->pgid = get_parent()->primary_spg_t();
767 reply->map_epoch = get_parent()->get_epoch();
768 reply->min_epoch = get_parent()->get_interval_start_epoch();
769 handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace);
770 reply->trace = _op->pg_trace;
771 get_parent()->send_message_osd_cluster(
772 op->op.from.osd, reply, get_parent()->get_epoch());
773 return true;
774 }
775 case MSG_OSD_EC_READ_REPLY: {
776 // NOTE: this is non-const because handle_sub_read_reply steals resulting
777 // buffers. It does not conflict with ECSubReadReply operator<<.
778 MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
779 _op->get_nonconst_req());
780 RecoveryMessages rm;
781 handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
782 dispatch_recovery_messages(rm, priority);
783 return true;
784 }
785 case MSG_OSD_PG_PUSH: {
786 const MOSDPGPush *op = static_cast<const MOSDPGPush *>(_op->get_req());
787 RecoveryMessages rm;
788 for (vector<PushOp>::const_iterator i = op->pushes.begin();
789 i != op->pushes.end();
790 ++i) {
791 handle_recovery_push(*i, &rm);
792 }
793 dispatch_recovery_messages(rm, priority);
794 return true;
795 }
796 case MSG_OSD_PG_PUSH_REPLY: {
797 const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
798 _op->get_req());
799 RecoveryMessages rm;
800 for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
801 i != op->replies.end();
802 ++i) {
803 handle_recovery_push_reply(*i, op->from, &rm);
804 }
805 dispatch_recovery_messages(rm, priority);
806 return true;
807 }
808 default:
809 return false;
810 }
811 return false;
812}
813
814struct SubWriteCommitted : public Context {
815 ECBackend *pg;
816 OpRequestRef msg;
817 ceph_tid_t tid;
818 eversion_t version;
819 eversion_t last_complete;
820 const ZTracer::Trace trace;
821 SubWriteCommitted(
822 ECBackend *pg,
823 OpRequestRef msg,
824 ceph_tid_t tid,
825 eversion_t version,
826 eversion_t last_complete,
827 const ZTracer::Trace &trace)
828 : pg(pg), msg(msg), tid(tid),
829 version(version), last_complete(last_complete), trace(trace) {}
830 void finish(int) override {
831 if (msg)
832 msg->mark_event("sub_op_committed");
833 pg->sub_write_committed(tid, version, last_complete, trace);
834 }
835};
836void ECBackend::sub_write_committed(
837 ceph_tid_t tid, eversion_t version, eversion_t last_complete,
838 const ZTracer::Trace &trace) {
839 if (get_parent()->pgb_is_primary()) {
840 ECSubWriteReply reply;
841 reply.tid = tid;
842 reply.last_complete = last_complete;
843 reply.committed = true;
844 reply.from = get_parent()->whoami_shard();
845 handle_sub_write_reply(
846 get_parent()->whoami_shard(),
847 reply, trace);
848 } else {
849 get_parent()->update_last_complete_ondisk(last_complete);
850 MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
851 r->pgid = get_parent()->primary_spg_t();
852 r->map_epoch = get_parent()->get_epoch();
853 r->min_epoch = get_parent()->get_interval_start_epoch();
854 r->op.tid = tid;
855 r->op.last_complete = last_complete;
856 r->op.committed = true;
857 r->op.from = get_parent()->whoami_shard();
858 r->set_priority(CEPH_MSG_PRIO_HIGH);
859 r->trace = trace;
860 r->trace.event("sending sub op commit");
861 get_parent()->send_message_osd_cluster(
862 get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
863 }
864}
865
866struct SubWriteApplied : public Context {
867 ECBackend *pg;
868 OpRequestRef msg;
869 ceph_tid_t tid;
870 eversion_t version;
871 const ZTracer::Trace trace;
872 SubWriteApplied(
873 ECBackend *pg,
874 OpRequestRef msg,
875 ceph_tid_t tid,
876 eversion_t version,
877 const ZTracer::Trace &trace)
878 : pg(pg), msg(msg), tid(tid), version(version), trace(trace) {}
879 void finish(int) override {
880 if (msg)
881 msg->mark_event("sub_op_applied");
882 pg->sub_write_applied(tid, version, trace);
883 }
884};
885void ECBackend::sub_write_applied(
886 ceph_tid_t tid, eversion_t version,
887 const ZTracer::Trace &trace) {
888 parent->op_applied(version);
889 if (get_parent()->pgb_is_primary()) {
890 ECSubWriteReply reply;
891 reply.from = get_parent()->whoami_shard();
892 reply.tid = tid;
893 reply.applied = true;
894 handle_sub_write_reply(
895 get_parent()->whoami_shard(),
896 reply, trace);
897 } else {
898 MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
899 r->pgid = get_parent()->primary_spg_t();
900 r->map_epoch = get_parent()->get_epoch();
901 r->min_epoch = get_parent()->get_interval_start_epoch();
902 r->op.from = get_parent()->whoami_shard();
903 r->op.tid = tid;
904 r->op.applied = true;
905 r->set_priority(CEPH_MSG_PRIO_HIGH);
906 r->trace = trace;
907 r->trace.event("sending sub op apply");
908 get_parent()->send_message_osd_cluster(
909 get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
910 }
911}
912
913void ECBackend::handle_sub_write(
914 pg_shard_t from,
915 OpRequestRef msg,
916 ECSubWrite &op,
917 const ZTracer::Trace &trace,
918 Context *on_local_applied_sync)
919{
920 if (msg)
921 msg->mark_started();
922 trace.event("handle_sub_write");
923 assert(!get_parent()->get_log().get_missing().is_missing(op.soid));
924 if (!get_parent()->pgb_is_primary())
925 get_parent()->update_stats(op.stats);
926 ObjectStore::Transaction localt;
927 if (!op.temp_added.empty()) {
928 add_temp_objs(op.temp_added);
929 }
930 if (op.backfill) {
931 for (set<hobject_t>::iterator i = op.temp_removed.begin();
932 i != op.temp_removed.end();
933 ++i) {
934 dout(10) << __func__ << ": removing object " << *i
935 << " since we won't get the transaction" << dendl;
936 localt.remove(
937 coll,
938 ghobject_t(
939 *i,
940 ghobject_t::NO_GEN,
941 get_parent()->whoami_shard().shard));
942 }
943 }
944 clear_temp_objs(op.temp_removed);
945 get_parent()->log_operation(
946 op.log_entries,
947 op.updated_hit_set_history,
948 op.trim_to,
949 op.roll_forward_to,
950 !op.backfill,
951 localt);
952
953 PrimaryLogPG *_rPG = dynamic_cast<PrimaryLogPG *>(get_parent());
954 if (_rPG && !_rPG->is_undersized() &&
955 (unsigned)get_parent()->whoami_shard().shard >= ec_impl->get_data_chunk_count())
956 op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
957
958 if (on_local_applied_sync) {
959 dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync << dendl;
960 localt.register_on_applied_sync(on_local_applied_sync);
961 }
962 localt.register_on_commit(
963 get_parent()->bless_context(
964 new SubWriteCommitted(
965 this, msg, op.tid,
966 op.at_version,
967 get_parent()->get_info().last_complete, trace)));
968 localt.register_on_applied(
969 get_parent()->bless_context(
970 new SubWriteApplied(this, msg, op.tid, op.at_version, trace)));
971 vector<ObjectStore::Transaction> tls;
972 tls.reserve(2);
973 tls.push_back(std::move(op.t));
974 tls.push_back(std::move(localt));
975 get_parent()->queue_transactions(tls, msg);
976}
977
978void ECBackend::handle_sub_read(
979 pg_shard_t from,
980 const ECSubRead &op,
981 ECSubReadReply *reply,
982 const ZTracer::Trace &trace)
983{
984 trace.event("handle sub read");
985 shard_id_t shard = get_parent()->whoami_shard().shard;
986 for(auto i = op.to_read.begin();
987 i != op.to_read.end();
988 ++i) {
989 int r = 0;
990 ECUtil::HashInfoRef hinfo;
991 if (!get_parent()->get_pool().allows_ecoverwrites()) {
992 hinfo = get_hash_info(i->first);
993 if (!hinfo) {
994 r = -EIO;
995 get_parent()->clog_error() << __func__ << ": No hinfo for " << i->first;
996 dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
997 goto error;
998 }
999 }
1000 for (auto j = i->second.begin(); j != i->second.end(); ++j) {
1001 bufferlist bl;
1002 r = store->read(
1003 ch,
1004 ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1005 j->get<0>(),
1006 j->get<1>(),
224ce89b 1007 bl, j->get<2>());
7c673cae
FG
1008 if (r < 0) {
1009 get_parent()->clog_error() << __func__
1010 << ": Error " << r
1011 << " reading "
1012 << i->first;
1013 dout(5) << __func__ << ": Error " << r
1014 << " reading " << i->first << dendl;
1015 goto error;
1016 } else {
1017 dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
1018 reply->buffers_read[i->first].push_back(
1019 make_pair(
1020 j->get<0>(),
1021 bl)
1022 );
1023 }
1024
1025 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1026 // This shows that we still need deep scrub because large enough files
1027 // are read in sections, so the digest check here won't be done here.
1028 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1029 // the state of our chunk in case other chunks could substitute.
1030 assert(hinfo->has_chunk_hash());
1031 if ((bl.length() == hinfo->get_total_chunk_size()) &&
1032 (j->get<0>() == 0)) {
1033 dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
1034 bufferhash h(-1);
1035 h << bl;
1036 if (h.digest() != hinfo->get_chunk_hash(shard)) {
1037 get_parent()->clog_error() << __func__ << ": Bad hash for " << i->first << " digest 0x"
1038 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec;
1039 dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
1040 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
1041 r = -EIO;
1042 goto error;
1043 }
1044 }
1045 }
1046 }
1047 continue;
1048error:
1049 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1050 // the state of our chunk in case other chunks could substitute.
1051 reply->buffers_read.erase(i->first);
1052 reply->errors[i->first] = r;
1053 }
1054 for (set<hobject_t>::iterator i = op.attrs_to_read.begin();
1055 i != op.attrs_to_read.end();
1056 ++i) {
1057 dout(10) << __func__ << ": fulfilling attr request on "
1058 << *i << dendl;
1059 if (reply->errors.count(*i))
1060 continue;
1061 int r = store->getattrs(
1062 ch,
1063 ghobject_t(
1064 *i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1065 reply->attrs_read[*i]);
1066 if (r < 0) {
1067 reply->buffers_read.erase(*i);
1068 reply->errors[*i] = r;
1069 }
1070 }
1071 reply->from = get_parent()->whoami_shard();
1072 reply->tid = op.tid;
1073}
1074
1075void ECBackend::handle_sub_write_reply(
1076 pg_shard_t from,
1077 const ECSubWriteReply &op,
1078 const ZTracer::Trace &trace)
1079{
1080 map<ceph_tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
1081 assert(i != tid_to_op_map.end());
1082 if (op.committed) {
1083 trace.event("sub write committed");
1084 assert(i->second.pending_commit.count(from));
1085 i->second.pending_commit.erase(from);
1086 if (from != get_parent()->whoami_shard()) {
1087 get_parent()->update_peer_last_complete_ondisk(from, op.last_complete);
1088 }
1089 }
1090 if (op.applied) {
1091 trace.event("sub write applied");
1092 assert(i->second.pending_apply.count(from));
1093 i->second.pending_apply.erase(from);
1094 }
1095
1096 if (i->second.pending_apply.empty() && i->second.on_all_applied) {
1097 dout(10) << __func__ << " Calling on_all_applied on " << i->second << dendl;
1098 i->second.on_all_applied->complete(0);
1099 i->second.on_all_applied = 0;
1100 i->second.trace.event("ec write all applied");
1101 }
1102 if (i->second.pending_commit.empty() && i->second.on_all_commit) {
1103 dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
1104 i->second.on_all_commit->complete(0);
1105 i->second.on_all_commit = 0;
1106 i->second.trace.event("ec write all committed");
1107 }
1108 check_ops();
1109}
1110
1111void ECBackend::handle_sub_read_reply(
1112 pg_shard_t from,
1113 ECSubReadReply &op,
1114 RecoveryMessages *m,
1115 const ZTracer::Trace &trace)
1116{
1117 trace.event("ec sub read reply");
1118 dout(10) << __func__ << ": reply " << op << dendl;
1119 map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
1120 if (iter == tid_to_read_map.end()) {
1121 //canceled
1122 dout(20) << __func__ << ": dropped " << op << dendl;
1123 return;
1124 }
1125 ReadOp &rop = iter->second;
1126 for (auto i = op.buffers_read.begin();
1127 i != op.buffers_read.end();
1128 ++i) {
1129 assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer
1130 if (!rop.to_read.count(i->first)) {
1131 // We canceled this read! @see filter_read_op
1132 dout(20) << __func__ << " to_read skipping" << dendl;
1133 continue;
1134 }
1135 list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter =
1136 rop.to_read.find(i->first)->second.to_read.begin();
1137 list<
1138 boost::tuple<
1139 uint64_t, uint64_t, map<pg_shard_t, bufferlist> > >::iterator riter =
1140 rop.complete[i->first].returned.begin();
1141 for (list<pair<uint64_t, bufferlist> >::iterator j = i->second.begin();
1142 j != i->second.end();
1143 ++j, ++req_iter, ++riter) {
1144 assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
1145 assert(riter != rop.complete[i->first].returned.end());
1146 pair<uint64_t, uint64_t> adjusted =
1147 sinfo.aligned_offset_len_to_chunk(
1148 make_pair(req_iter->get<0>(), req_iter->get<1>()));
1149 assert(adjusted.first == j->first);
1150 riter->get<2>()[from].claim(j->second);
1151 }
1152 }
1153 for (auto i = op.attrs_read.begin();
1154 i != op.attrs_read.end();
1155 ++i) {
1156 assert(!op.errors.count(i->first)); // if read error better not have sent an attribute
1157 if (!rop.to_read.count(i->first)) {
1158 // We canceled this read! @see filter_read_op
1159 dout(20) << __func__ << " to_read skipping" << dendl;
1160 continue;
1161 }
1162 rop.complete[i->first].attrs = map<string, bufferlist>();
1163 (*(rop.complete[i->first].attrs)).swap(i->second);
1164 }
1165 for (auto i = op.errors.begin();
1166 i != op.errors.end();
1167 ++i) {
1168 rop.complete[i->first].errors.insert(
1169 make_pair(
1170 from,
1171 i->second));
1172 dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl;
1173 }
1174
1175 map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
1176 shard_to_read_map.find(from);
1177 assert(siter != shard_to_read_map.end());
1178 assert(siter->second.count(op.tid));
1179 siter->second.erase(op.tid);
1180
1181 assert(rop.in_progress.count(from));
1182 rop.in_progress.erase(from);
1183 unsigned is_complete = 0;
1184 // For redundant reads check for completion as each shard comes in,
1185 // or in a non-recovery read check for completion once all the shards read.
1186 // TODO: It would be nice if recovery could send more reads too
1187 if (rop.do_redundant_reads || (!rop.for_recovery && rop.in_progress.empty())) {
1188 for (map<hobject_t, read_result_t>::const_iterator iter =
1189 rop.complete.begin();
1190 iter != rop.complete.end();
1191 ++iter) {
1192 set<int> have;
1193 for (map<pg_shard_t, bufferlist>::const_iterator j =
1194 iter->second.returned.front().get<2>().begin();
1195 j != iter->second.returned.front().get<2>().end();
1196 ++j) {
1197 have.insert(j->first.shard);
1198 dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
1199 }
1200 set<int> want_to_read, dummy_minimum;
1201 get_want_to_read_shards(&want_to_read);
1202 int err;
1203 if ((err = ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum)) < 0) {
1204 dout(20) << __func__ << " minimum_to_decode failed" << dendl;
1205 if (rop.in_progress.empty()) {
1206 // If we don't have enough copies and we haven't sent reads for all shards
1207 // we can send the rest of the reads, if any.
1208 if (!rop.do_redundant_reads) {
1209 int r = send_all_remaining_reads(iter->first, rop);
1210 if (r == 0) {
1211 // We added to in_progress and not incrementing is_complete
1212 continue;
1213 }
1214 // Couldn't read any additional shards so handle as completed with errors
1215 }
1216 if (rop.complete[iter->first].errors.empty()) {
1217 dout(20) << __func__ << " simply not enough copies err=" << err << dendl;
1218 } else {
1219 // Grab the first error
1220 err = rop.complete[iter->first].errors.begin()->second;
1221 dout(20) << __func__ << ": Use one of the shard errors err=" << err << dendl;
1222 }
1223 rop.complete[iter->first].r = err;
1224 ++is_complete;
1225 }
1226 } else {
1227 assert(rop.complete[iter->first].r == 0);
1228 if (!rop.complete[iter->first].errors.empty()) {
1229 if (cct->_conf->osd_read_ec_check_for_errors) {
1230 dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl;
1231 err = rop.complete[iter->first].errors.begin()->second;
1232 rop.complete[iter->first].r = err;
1233 } else {
1234 get_parent()->clog_error() << __func__ << ": Error(s) ignored for "
1235 << iter->first << " enough copies available";
1236 dout(10) << __func__ << " Error(s) ignored for " << iter->first
1237 << " enough copies available" << dendl;
1238 rop.complete[iter->first].errors.clear();
1239 }
1240 }
1241 ++is_complete;
1242 }
1243 }
1244 }
1245 if (rop.in_progress.empty() || is_complete == rop.complete.size()) {
1246 dout(20) << __func__ << " Complete: " << rop << dendl;
1247 rop.trace.event("ec read complete");
1248 complete_read_op(rop, m);
1249 } else {
1250 dout(10) << __func__ << " readop not complete: " << rop << dendl;
1251 }
1252}
1253
1254void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
1255{
1256 map<hobject_t, read_request_t>::iterator reqiter =
1257 rop.to_read.begin();
1258 map<hobject_t, read_result_t>::iterator resiter =
1259 rop.complete.begin();
1260 assert(rop.to_read.size() == rop.complete.size());
1261 for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
1262 if (reqiter->second.cb) {
1263 pair<RecoveryMessages *, read_result_t &> arg(
1264 m, resiter->second);
1265 reqiter->second.cb->complete(arg);
1266 reqiter->second.cb = NULL;
1267 }
1268 }
1269 tid_to_read_map.erase(rop.tid);
1270}
1271
1272struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
1273 ECBackend *ec;
1274 ceph_tid_t tid;
1275 FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
1276 void finish(ThreadPool::TPHandle &handle) override {
1277 auto ropiter = ec->tid_to_read_map.find(tid);
1278 assert(ropiter != ec->tid_to_read_map.end());
1279 int priority = ropiter->second.priority;
1280 RecoveryMessages rm;
1281 ec->complete_read_op(ropiter->second, &rm);
1282 ec->dispatch_recovery_messages(rm, priority);
1283 }
1284};
1285
1286void ECBackend::filter_read_op(
1287 const OSDMapRef& osdmap,
1288 ReadOp &op)
1289{
1290 set<hobject_t> to_cancel;
1291 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1292 i != op.source_to_obj.end();
1293 ++i) {
1294 if (osdmap->is_down(i->first.osd)) {
1295 to_cancel.insert(i->second.begin(), i->second.end());
1296 op.in_progress.erase(i->first);
1297 continue;
1298 }
1299 }
1300
1301 if (to_cancel.empty())
1302 return;
1303
1304 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1305 i != op.source_to_obj.end();
1306 ) {
1307 for (set<hobject_t>::iterator j = i->second.begin();
1308 j != i->second.end();
1309 ) {
1310 if (to_cancel.count(*j))
1311 i->second.erase(j++);
1312 else
1313 ++j;
1314 }
1315 if (i->second.empty()) {
1316 op.source_to_obj.erase(i++);
1317 } else {
1318 assert(!osdmap->is_down(i->first.osd));
1319 ++i;
1320 }
1321 }
1322
1323 for (set<hobject_t>::iterator i = to_cancel.begin();
1324 i != to_cancel.end();
1325 ++i) {
1326 get_parent()->cancel_pull(*i);
1327
1328 assert(op.to_read.count(*i));
1329 read_request_t &req = op.to_read.find(*i)->second;
1330 dout(10) << __func__ << ": canceling " << req
1331 << " for obj " << *i << dendl;
1332 assert(req.cb);
1333 delete req.cb;
1334 req.cb = NULL;
1335
1336 op.to_read.erase(*i);
1337 op.complete.erase(*i);
1338 recovery_ops.erase(*i);
1339 }
1340
1341 if (op.in_progress.empty()) {
1342 get_parent()->schedule_recovery_work(
1343 get_parent()->bless_gencontext(
1344 new FinishReadOp(this, op.tid)));
1345 }
1346}
1347
1348void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
1349{
1350 set<ceph_tid_t> tids_to_filter;
1351 for (map<pg_shard_t, set<ceph_tid_t> >::iterator
1352 i = shard_to_read_map.begin();
1353 i != shard_to_read_map.end();
1354 ) {
1355 if (osdmap->is_down(i->first.osd)) {
1356 tids_to_filter.insert(i->second.begin(), i->second.end());
1357 shard_to_read_map.erase(i++);
1358 } else {
1359 ++i;
1360 }
1361 }
1362 for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
1363 i != tids_to_filter.end();
1364 ++i) {
1365 map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
1366 assert(j != tid_to_read_map.end());
1367 filter_read_op(osdmap, j->second);
1368 }
1369}
1370
1371void ECBackend::on_change()
1372{
1373 dout(10) << __func__ << dendl;
1374
1375 completed_to = eversion_t();
1376 committed_to = eversion_t();
1377 pipeline_state.clear();
1378 waiting_reads.clear();
1379 waiting_state.clear();
1380 waiting_commit.clear();
1381 for (auto &&op: tid_to_op_map) {
1382 cache.release_write_pin(op.second.pin);
1383 }
1384 tid_to_op_map.clear();
1385
1386 for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
1387 i != tid_to_read_map.end();
1388 ++i) {
1389 dout(10) << __func__ << ": cancelling " << i->second << dendl;
1390 for (map<hobject_t, read_request_t>::iterator j =
1391 i->second.to_read.begin();
1392 j != i->second.to_read.end();
1393 ++j) {
1394 delete j->second.cb;
1395 j->second.cb = 0;
1396 }
1397 }
1398 tid_to_read_map.clear();
1399 in_progress_client_reads.clear();
1400 shard_to_read_map.clear();
1401 clear_recovery_state();
1402}
1403
1404void ECBackend::clear_recovery_state()
1405{
1406 recovery_ops.clear();
1407}
1408
1409void ECBackend::on_flushed()
1410{
1411}
1412
1413void ECBackend::dump_recovery_info(Formatter *f) const
1414{
1415 f->open_array_section("recovery_ops");
1416 for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
1417 i != recovery_ops.end();
1418 ++i) {
1419 f->open_object_section("op");
1420 i->second.dump(f);
1421 f->close_section();
1422 }
1423 f->close_section();
1424 f->open_array_section("read_ops");
1425 for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
1426 i != tid_to_read_map.end();
1427 ++i) {
1428 f->open_object_section("read_op");
1429 i->second.dump(f);
1430 f->close_section();
1431 }
1432 f->close_section();
1433}
1434
1435void ECBackend::submit_transaction(
1436 const hobject_t &hoid,
1437 const object_stat_sum_t &delta_stats,
1438 const eversion_t &at_version,
1439 PGTransactionUPtr &&t,
1440 const eversion_t &trim_to,
1441 const eversion_t &roll_forward_to,
1442 const vector<pg_log_entry_t> &log_entries,
1443 boost::optional<pg_hit_set_history_t> &hset_history,
1444 Context *on_local_applied_sync,
1445 Context *on_all_applied,
1446 Context *on_all_commit,
1447 ceph_tid_t tid,
1448 osd_reqid_t reqid,
1449 OpRequestRef client_op
1450 )
1451{
1452 assert(!tid_to_op_map.count(tid));
1453 Op *op = &(tid_to_op_map[tid]);
1454 op->hoid = hoid;
1455 op->delta_stats = delta_stats;
1456 op->version = at_version;
1457 op->trim_to = trim_to;
1458 op->roll_forward_to = MAX(roll_forward_to, committed_to);
1459 op->log_entries = log_entries;
1460 std::swap(op->updated_hit_set_history, hset_history);
1461 op->on_local_applied_sync = on_local_applied_sync;
1462 op->on_all_applied = on_all_applied;
1463 op->on_all_commit = on_all_commit;
1464 op->tid = tid;
1465 op->reqid = reqid;
1466 op->client_op = client_op;
1467 if (client_op)
1468 op->trace = client_op->pg_trace;
1469
1470 dout(10) << __func__ << ": op " << *op << " starting" << dendl;
1471 start_rmw(op, std::move(t));
1472 dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
1473}
1474
1475void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
1476 if (!waiting_state.empty()) {
1477 waiting_state.back().on_write.emplace_back(std::move(cb));
1478 } else if (!waiting_reads.empty()) {
1479 waiting_reads.back().on_write.emplace_back(std::move(cb));
1480 } else {
1481 // Nothing earlier in the pipeline, just call it
1482 cb();
1483 }
1484}
1485
1486int ECBackend::get_min_avail_to_read_shards(
1487 const hobject_t &hoid,
1488 const set<int> &want,
1489 bool for_recovery,
1490 bool do_redundant_reads,
1491 set<pg_shard_t> *to_read)
1492{
1493 // Make sure we don't do redundant reads for recovery
1494 assert(!for_recovery || !do_redundant_reads);
1495
1496 set<int> have;
1497 map<shard_id_t, pg_shard_t> shards;
1498
1499 for (set<pg_shard_t>::const_iterator i =
1500 get_parent()->get_acting_shards().begin();
1501 i != get_parent()->get_acting_shards().end();
1502 ++i) {
1503 dout(10) << __func__ << ": checking acting " << *i << dendl;
1504 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1505 if (!missing.is_missing(hoid)) {
1506 assert(!have.count(i->shard));
1507 have.insert(i->shard);
1508 assert(!shards.count(i->shard));
1509 shards.insert(make_pair(i->shard, *i));
1510 }
1511 }
1512
1513 if (for_recovery) {
1514 for (set<pg_shard_t>::const_iterator i =
1515 get_parent()->get_backfill_shards().begin();
1516 i != get_parent()->get_backfill_shards().end();
1517 ++i) {
1518 if (have.count(i->shard)) {
1519 assert(shards.count(i->shard));
1520 continue;
1521 }
1522 dout(10) << __func__ << ": checking backfill " << *i << dendl;
1523 assert(!shards.count(i->shard));
1524 const pg_info_t &info = get_parent()->get_shard_info(*i);
1525 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1526 if (hoid < info.last_backfill &&
1527 !missing.is_missing(hoid)) {
1528 have.insert(i->shard);
1529 shards.insert(make_pair(i->shard, *i));
1530 }
1531 }
1532
1533 map<hobject_t, set<pg_shard_t>>::const_iterator miter =
1534 get_parent()->get_missing_loc_shards().find(hoid);
1535 if (miter != get_parent()->get_missing_loc_shards().end()) {
1536 for (set<pg_shard_t>::iterator i = miter->second.begin();
1537 i != miter->second.end();
1538 ++i) {
1539 dout(10) << __func__ << ": checking missing_loc " << *i << dendl;
1540 auto m = get_parent()->maybe_get_shard_missing(*i);
1541 if (m) {
1542 assert(!(*m).is_missing(hoid));
1543 }
1544 have.insert(i->shard);
1545 shards.insert(make_pair(i->shard, *i));
1546 }
1547 }
1548 }
1549
1550 set<int> need;
1551 int r = ec_impl->minimum_to_decode(want, have, &need);
1552 if (r < 0)
1553 return r;
1554
1555 if (do_redundant_reads) {
1556 need.swap(have);
1557 }
1558
1559 if (!to_read)
1560 return 0;
1561
1562 for (set<int>::iterator i = need.begin();
1563 i != need.end();
1564 ++i) {
1565 assert(shards.count(shard_id_t(*i)));
1566 to_read->insert(shards[shard_id_t(*i)]);
1567 }
1568 return 0;
1569}
1570
1571int ECBackend::get_remaining_shards(
1572 const hobject_t &hoid,
1573 const set<int> &avail,
1574 set<pg_shard_t> *to_read)
1575{
1576 set<int> need;
1577 map<shard_id_t, pg_shard_t> shards;
1578
1579 for (set<pg_shard_t>::const_iterator i =
1580 get_parent()->get_acting_shards().begin();
1581 i != get_parent()->get_acting_shards().end();
1582 ++i) {
1583 dout(10) << __func__ << ": checking acting " << *i << dendl;
1584 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1585 if (!missing.is_missing(hoid)) {
1586 assert(!need.count(i->shard));
1587 need.insert(i->shard);
1588 assert(!shards.count(i->shard));
1589 shards.insert(make_pair(i->shard, *i));
1590 }
1591 }
1592
1593 if (!to_read)
1594 return 0;
1595
1596 for (set<int>::iterator i = need.begin();
1597 i != need.end();
1598 ++i) {
1599 assert(shards.count(shard_id_t(*i)));
1600 if (avail.find(*i) == avail.end())
1601 to_read->insert(shards[shard_id_t(*i)]);
1602 }
1603 return 0;
1604}
1605
1606void ECBackend::start_read_op(
1607 int priority,
1608 map<hobject_t, read_request_t> &to_read,
1609 OpRequestRef _op,
1610 bool do_redundant_reads,
1611 bool for_recovery)
1612{
1613 ceph_tid_t tid = get_parent()->get_tid();
1614 assert(!tid_to_read_map.count(tid));
1615 auto &op = tid_to_read_map.emplace(
1616 tid,
1617 ReadOp(
1618 priority,
1619 tid,
1620 do_redundant_reads,
1621 for_recovery,
1622 _op,
1623 std::move(to_read))).first->second;
1624 dout(10) << __func__ << ": starting " << op << dendl;
1625 if (_op) {
1626 op.trace = _op->pg_trace;
1627 op.trace.event("start ec read");
1628 }
1629 do_read_op(op);
1630}
1631
1632void ECBackend::do_read_op(ReadOp &op)
1633{
1634 int priority = op.priority;
1635 ceph_tid_t tid = op.tid;
1636
1637 dout(10) << __func__ << ": starting read " << op << dendl;
1638
1639 map<pg_shard_t, ECSubRead> messages;
1640 for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
1641 i != op.to_read.end();
1642 ++i) {
1643 bool need_attrs = i->second.want_attrs;
1644 for (set<pg_shard_t>::const_iterator j = i->second.need.begin();
1645 j != i->second.need.end();
1646 ++j) {
1647 if (need_attrs) {
1648 messages[*j].attrs_to_read.insert(i->first);
1649 need_attrs = false;
1650 }
1651 op.obj_to_source[i->first].insert(*j);
1652 op.source_to_obj[*j].insert(i->first);
1653 }
1654 for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
1655 i->second.to_read.begin();
1656 j != i->second.to_read.end();
1657 ++j) {
1658 pair<uint64_t, uint64_t> chunk_off_len =
1659 sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
1660 for (set<pg_shard_t>::const_iterator k = i->second.need.begin();
1661 k != i->second.need.end();
1662 ++k) {
1663 messages[*k].to_read[i->first].push_back(
1664 boost::make_tuple(
1665 chunk_off_len.first,
1666 chunk_off_len.second,
1667 j->get<2>()));
1668 }
1669 assert(!need_attrs);
1670 }
1671 }
1672
1673 for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
1674 i != messages.end();
1675 ++i) {
1676 op.in_progress.insert(i->first);
1677 shard_to_read_map[i->first].insert(op.tid);
1678 i->second.tid = tid;
1679 MOSDECSubOpRead *msg = new MOSDECSubOpRead;
1680 msg->set_priority(priority);
1681 msg->pgid = spg_t(
1682 get_parent()->whoami_spg_t().pgid,
1683 i->first.shard);
1684 msg->map_epoch = get_parent()->get_epoch();
1685 msg->min_epoch = get_parent()->get_interval_start_epoch();
1686 msg->op = i->second;
1687 msg->op.from = get_parent()->whoami_shard();
1688 msg->op.tid = tid;
1689 if (op.trace) {
1690 // initialize a child span for this shard
1691 msg->trace.init("ec sub read", nullptr, &op.trace);
1692 msg->trace.keyval("shard", i->first.shard.id);
1693 }
1694 get_parent()->send_message_osd_cluster(
1695 i->first.osd,
1696 msg,
1697 get_parent()->get_epoch());
1698 }
1699 dout(10) << __func__ << ": started " << op << dendl;
1700}
1701
1702ECUtil::HashInfoRef ECBackend::get_hash_info(
1703 const hobject_t &hoid, bool checks, const map<string,bufferptr> *attrs)
1704{
1705 dout(10) << __func__ << ": Getting attr on " << hoid << dendl;
1706 ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid);
1707 if (!ref) {
1708 dout(10) << __func__ << ": not in cache " << hoid << dendl;
1709 struct stat st;
1710 int r = store->stat(
1711 ch,
1712 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1713 &st);
1714 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
1715 // XXX: What does it mean if there is no object on disk?
1716 if (r >= 0) {
1717 dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl;
1718 bufferlist bl;
1719 if (attrs) {
1720 map<string, bufferptr>::const_iterator k = attrs->find(ECUtil::get_hinfo_key());
1721 if (k == attrs->end()) {
1722 dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl;
1723 } else {
1724 bl.push_back(k->second);
1725 }
1726 } else {
1727 r = store->getattr(
1728 ch,
1729 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1730 ECUtil::get_hinfo_key(),
1731 bl);
1732 if (r < 0) {
1733 dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl;
1734 bl.clear(); // just in case
1735 }
1736 }
1737 if (bl.length() > 0) {
1738 bufferlist::iterator bp = bl.begin();
1739 ::decode(hinfo, bp);
1740 if (checks && hinfo.get_total_chunk_size() != (uint64_t)st.st_size) {
1741 dout(0) << __func__ << ": Mismatch of total_chunk_size "
1742 << hinfo.get_total_chunk_size() << dendl;
1743 return ECUtil::HashInfoRef();
1744 }
1745 } else if (st.st_size > 0) { // If empty object and no hinfo, create it
1746 return ECUtil::HashInfoRef();
1747 }
1748 }
1749 ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
1750 }
1751 return ref;
1752}
1753
1754void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
1755{
1756 assert(op);
1757
1758 op->plan = ECTransaction::get_write_plan(
1759 sinfo,
1760 std::move(t),
1761 [&](const hobject_t &i) {
1762 ECUtil::HashInfoRef ref = get_hash_info(i, false);
1763 if (!ref) {
1764 derr << __func__ << ": get_hash_info(" << i << ")"
1765 << " returned a null pointer and there is no "
1766 << " way to recover from such an error in this "
1767 << " context" << dendl;
1768 ceph_abort();
1769 }
1770 return ref;
1771 },
1772 get_parent()->get_dpp());
1773
1774 dout(10) << __func__ << ": " << *op << dendl;
1775
1776 waiting_state.push_back(*op);
1777 check_ops();
1778}
1779
1780bool ECBackend::try_state_to_reads()
1781{
1782 if (waiting_state.empty())
1783 return false;
1784
1785 Op *op = &(waiting_state.front());
1786 if (op->requires_rmw() && pipeline_state.cache_invalid()) {
1787 assert(get_parent()->get_pool().allows_ecoverwrites());
1788 dout(20) << __func__ << ": blocking " << *op
1789 << " because it requires an rmw and the cache is invalid "
1790 << pipeline_state
1791 << dendl;
1792 return false;
1793 }
1794
1795 if (op->invalidates_cache()) {
1796 dout(20) << __func__ << ": invalidating cache after this op"
1797 << dendl;
1798 pipeline_state.invalidate();
1799 op->using_cache = false;
1800 } else {
1801 op->using_cache = pipeline_state.caching_enabled();
1802 }
1803
1804 waiting_state.pop_front();
1805 waiting_reads.push_back(*op);
1806
1807 if (op->using_cache) {
1808 cache.open_write_pin(op->pin);
1809
1810 extent_set empty;
1811 for (auto &&hpair: op->plan.will_write) {
1812 auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
1813 const extent_set &to_read_plan =
1814 to_read_plan_iter == op->plan.to_read.end() ?
1815 empty :
1816 to_read_plan_iter->second;
1817
1818 extent_set remote_read = cache.reserve_extents_for_rmw(
1819 hpair.first,
1820 op->pin,
1821 hpair.second,
1822 to_read_plan);
1823
1824 extent_set pending_read = to_read_plan;
1825 pending_read.subtract(remote_read);
1826
1827 if (!remote_read.empty()) {
1828 op->remote_read[hpair.first] = std::move(remote_read);
1829 }
1830 if (!pending_read.empty()) {
1831 op->pending_read[hpair.first] = std::move(pending_read);
1832 }
1833 }
1834 } else {
1835 op->remote_read = op->plan.to_read;
1836 }
1837
1838 dout(10) << __func__ << ": " << *op << dendl;
1839
1840 if (!op->remote_read.empty()) {
1841 assert(get_parent()->get_pool().allows_ecoverwrites());
1842 objects_read_async_no_cache(
1843 op->remote_read,
1844 [this, op](map<hobject_t,pair<int, extent_map> > &&results) {
1845 for (auto &&i: results) {
1846 op->remote_read_result.emplace(i.first, i.second.second);
1847 }
1848 check_ops();
1849 });
1850 }
1851
1852 return true;
1853}
1854
1855bool ECBackend::try_reads_to_commit()
1856{
1857 if (waiting_reads.empty())
1858 return false;
1859 Op *op = &(waiting_reads.front());
1860 if (op->read_in_progress())
1861 return false;
1862 waiting_reads.pop_front();
1863 waiting_commit.push_back(*op);
1864
1865 dout(10) << __func__ << ": starting commit on " << *op << dendl;
1866 dout(20) << __func__ << ": " << cache << dendl;
1867
1868 get_parent()->apply_stats(
1869 op->hoid,
1870 op->delta_stats);
1871
1872 if (op->using_cache) {
1873 for (auto &&hpair: op->pending_read) {
1874 op->remote_read_result[hpair.first].insert(
1875 cache.get_remaining_extents_for_rmw(
1876 hpair.first,
1877 op->pin,
1878 hpair.second));
1879 }
1880 op->pending_read.clear();
1881 } else {
1882 assert(op->pending_read.empty());
1883 }
1884
1885 map<shard_id_t, ObjectStore::Transaction> trans;
1886 for (set<pg_shard_t>::const_iterator i =
1887 get_parent()->get_actingbackfill_shards().begin();
1888 i != get_parent()->get_actingbackfill_shards().end();
1889 ++i) {
1890 trans[i->shard];
1891 }
1892
1893 op->trace.event("start ec write");
1894
1895 map<hobject_t,extent_map> written;
1896 if (op->plan.t) {
1897 ECTransaction::generate_transactions(
1898 op->plan,
1899 ec_impl,
1900 get_parent()->get_info().pgid.pgid,
31f18b77 1901 (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
7c673cae
FG
1902 sinfo,
1903 op->remote_read_result,
1904 op->log_entries,
1905 &written,
1906 &trans,
1907 &(op->temp_added),
1908 &(op->temp_cleared),
1909 get_parent()->get_dpp());
1910 }
1911
1912 dout(20) << __func__ << ": " << cache << dendl;
1913 dout(20) << __func__ << ": written: " << written << dendl;
1914 dout(20) << __func__ << ": op: " << *op << dendl;
1915
1916 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1917 for (auto &&i: op->log_entries) {
1918 if (i.requires_kraken()) {
1919 derr << __func__ << ": log entry " << i << " requires kraken"
1920 << " but overwrites are not enabled!" << dendl;
1921 ceph_abort();
1922 }
1923 }
1924 }
1925
1926 map<hobject_t,extent_set> written_set;
1927 for (auto &&i: written) {
1928 written_set[i.first] = i.second.get_interval_set();
1929 }
1930 dout(20) << __func__ << ": written_set: " << written_set << dendl;
1931 assert(written_set == op->plan.will_write);
1932
1933 if (op->using_cache) {
1934 for (auto &&hpair: written) {
1935 dout(20) << __func__ << ": " << hpair << dendl;
1936 cache.present_rmw_update(hpair.first, op->pin, hpair.second);
1937 }
1938 }
1939 op->remote_read.clear();
1940 op->remote_read_result.clear();
1941
1942 dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
1943 ObjectStore::Transaction empty;
1944 bool should_write_local = false;
1945 ECSubWrite local_write_op;
1946 for (set<pg_shard_t>::const_iterator i =
1947 get_parent()->get_actingbackfill_shards().begin();
1948 i != get_parent()->get_actingbackfill_shards().end();
1949 ++i) {
1950 op->pending_apply.insert(*i);
1951 op->pending_commit.insert(*i);
1952 map<shard_id_t, ObjectStore::Transaction>::iterator iter =
1953 trans.find(i->shard);
1954 assert(iter != trans.end());
1955 bool should_send = get_parent()->should_send_op(*i, op->hoid);
1956 const pg_stat_t &stats =
1957 should_send ?
1958 get_info().stats :
1959 parent->get_shard_info().find(*i)->second.stats;
1960
1961 ECSubWrite sop(
1962 get_parent()->whoami_shard(),
1963 op->tid,
1964 op->reqid,
1965 op->hoid,
1966 stats,
1967 should_send ? iter->second : empty,
1968 op->version,
1969 op->trim_to,
1970 op->roll_forward_to,
1971 op->log_entries,
1972 op->updated_hit_set_history,
1973 op->temp_added,
1974 op->temp_cleared,
1975 !should_send);
1976
1977 ZTracer::Trace trace;
1978 if (op->trace) {
1979 // initialize a child span for this shard
1980 trace.init("ec sub write", nullptr, &op->trace);
1981 trace.keyval("shard", i->shard.id);
1982 }
1983
1984 if (*i == get_parent()->whoami_shard()) {
1985 should_write_local = true;
1986 local_write_op.claim(sop);
1987 } else {
1988 MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop);
1989 r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard);
1990 r->map_epoch = get_parent()->get_epoch();
1991 r->min_epoch = get_parent()->get_interval_start_epoch();
1992 r->trace = trace;
1993 get_parent()->send_message_osd_cluster(
1994 i->osd, r, get_parent()->get_epoch());
1995 }
1996 }
1997 if (should_write_local) {
1998 handle_sub_write(
1999 get_parent()->whoami_shard(),
2000 op->client_op,
2001 local_write_op,
2002 op->trace,
2003 op->on_local_applied_sync);
2004 op->on_local_applied_sync = 0;
2005 }
2006
2007 for (auto i = op->on_write.begin();
2008 i != op->on_write.end();
2009 op->on_write.erase(i++)) {
2010 (*i)();
2011 }
2012
2013 return true;
2014}
2015
2016bool ECBackend::try_finish_rmw()
2017{
2018 if (waiting_commit.empty())
2019 return false;
2020 Op *op = &(waiting_commit.front());
2021 if (op->write_in_progress())
2022 return false;
2023 waiting_commit.pop_front();
2024
2025 dout(10) << __func__ << ": " << *op << dendl;
2026 dout(20) << __func__ << ": " << cache << dendl;
2027
2028 if (op->roll_forward_to > completed_to)
2029 completed_to = op->roll_forward_to;
2030 if (op->version > committed_to)
2031 committed_to = op->version;
2032
31f18b77 2033 if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
7c673cae
FG
2034 if (op->version > get_parent()->get_log().get_can_rollback_to() &&
2035 waiting_reads.empty() &&
2036 waiting_commit.empty()) {
2037 // submit a dummy transaction to kick the rollforward
2038 auto tid = get_parent()->get_tid();
2039 Op *nop = &(tid_to_op_map[tid]);
2040 nop->hoid = op->hoid;
2041 nop->trim_to = op->trim_to;
2042 nop->roll_forward_to = op->version;
2043 nop->tid = tid;
2044 nop->reqid = op->reqid;
2045 waiting_reads.push_back(*nop);
2046 }
2047 }
2048
2049 if (op->using_cache) {
2050 cache.release_write_pin(op->pin);
2051 }
2052 tid_to_op_map.erase(op->tid);
2053
2054 if (waiting_reads.empty() &&
2055 waiting_commit.empty()) {
2056 pipeline_state.clear();
2057 dout(20) << __func__ << ": clearing pipeline_state "
2058 << pipeline_state
2059 << dendl;
2060 }
2061 return true;
2062}
2063
2064void ECBackend::check_ops()
2065{
2066 while (try_state_to_reads() ||
2067 try_reads_to_commit() ||
2068 try_finish_rmw());
2069}
2070
2071int ECBackend::objects_read_sync(
2072 const hobject_t &hoid,
2073 uint64_t off,
2074 uint64_t len,
2075 uint32_t op_flags,
2076 bufferlist *bl)
2077{
2078 return -EOPNOTSUPP;
2079}
2080
2081void ECBackend::objects_read_async(
2082 const hobject_t &hoid,
2083 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2084 pair<bufferlist*, Context*> > > &to_read,
2085 Context *on_complete,
2086 bool fast_read)
2087{
2088 map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
2089 reads;
2090
2091 uint32_t flags = 0;
2092 extent_set es;
2093 for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2094 pair<bufferlist*, Context*> > >::const_iterator i =
2095 to_read.begin();
2096 i != to_read.end();
2097 ++i) {
2098 pair<uint64_t, uint64_t> tmp =
2099 sinfo.offset_len_to_stripe_bounds(
2100 make_pair(i->first.get<0>(), i->first.get<1>()));
2101
2102 extent_set esnew;
2103 esnew.insert(tmp.first, tmp.second);
2104 es.union_of(esnew);
2105 flags |= i->first.get<2>();
2106 }
2107
2108 if (!es.empty()) {
2109 auto &offsets = reads[hoid];
2110 for (auto j = es.begin();
2111 j != es.end();
2112 ++j) {
2113 offsets.push_back(
2114 boost::make_tuple(
2115 j.get_start(),
2116 j.get_len(),
2117 flags));
2118 }
2119 }
2120
2121 struct cb {
2122 ECBackend *ec;
2123 hobject_t hoid;
2124 list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2125 pair<bufferlist*, Context*> > > to_read;
2126 unique_ptr<Context> on_complete;
2127 cb(const cb&) = delete;
2128 cb(cb &&) = default;
2129 cb(ECBackend *ec,
2130 const hobject_t &hoid,
2131 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2132 pair<bufferlist*, Context*> > > &to_read,
2133 Context *on_complete)
2134 : ec(ec),
2135 hoid(hoid),
2136 to_read(to_read),
2137 on_complete(on_complete) {}
2138 void operator()(map<hobject_t,pair<int, extent_map> > &&results) {
2139 auto dpp = ec->get_parent()->get_dpp();
2140 ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
2141 << dendl;
2142 ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
2143 << dendl;
2144
2145 auto &got = results[hoid];
2146
2147 int r = 0;
2148 for (auto &&read: to_read) {
2149 if (got.first < 0) {
2150 if (read.second.second) {
2151 read.second.second->complete(got.first);
2152 }
2153 if (r == 0)
2154 r = got.first;
2155 } else {
2156 assert(read.second.first);
2157 uint64_t offset = read.first.get<0>();
2158 uint64_t length = read.first.get<1>();
2159 auto range = got.second.get_containing_range(offset, length);
2160 assert(range.first != range.second);
2161 assert(range.first.get_off() <= offset);
2162 assert(
2163 (offset + length) <=
2164 (range.first.get_off() + range.first.get_len()));
2165 read.second.first->substr_of(
2166 range.first.get_val(),
2167 offset - range.first.get_off(),
2168 length);
2169 if (read.second.second) {
2170 read.second.second->complete(length);
2171 read.second.second = nullptr;
2172 }
2173 }
2174 }
2175 to_read.clear();
2176 if (on_complete) {
2177 on_complete.release()->complete(r);
2178 }
2179 }
2180 ~cb() {
2181 for (auto &&i: to_read) {
2182 delete i.second.second;
2183 }
2184 to_read.clear();
2185 }
2186 };
2187 objects_read_and_reconstruct(
2188 reads,
2189 fast_read,
2190 make_gen_lambda_context<
2191 map<hobject_t,pair<int, extent_map> > &&, cb>(
2192 cb(this,
2193 hoid,
2194 to_read,
2195 on_complete)));
2196}
2197
2198struct CallClientContexts :
2199 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
2200 hobject_t hoid;
2201 ECBackend *ec;
2202 ECBackend::ClientAsyncReadStatus *status;
2203 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
2204 CallClientContexts(
2205 hobject_t hoid,
2206 ECBackend *ec,
2207 ECBackend::ClientAsyncReadStatus *status,
2208 const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
2209 : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
2210 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
2211 ECBackend::read_result_t &res = in.second;
2212 extent_map result;
2213 if (res.r != 0)
2214 goto out;
2215 assert(res.returned.size() == to_read.size());
2216 assert(res.r == 0);
2217 assert(res.errors.empty());
2218 for (auto &&read: to_read) {
2219 pair<uint64_t, uint64_t> adjusted =
2220 ec->sinfo.offset_len_to_stripe_bounds(
2221 make_pair(read.get<0>(), read.get<1>()));
2222 assert(res.returned.front().get<0>() == adjusted.first &&
2223 res.returned.front().get<1>() == adjusted.second);
2224 map<int, bufferlist> to_decode;
2225 bufferlist bl;
2226 for (map<pg_shard_t, bufferlist>::iterator j =
2227 res.returned.front().get<2>().begin();
2228 j != res.returned.front().get<2>().end();
2229 ++j) {
2230 to_decode[j->first.shard].claim(j->second);
2231 }
2232 int r = ECUtil::decode(
2233 ec->sinfo,
2234 ec->ec_impl,
2235 to_decode,
2236 &bl);
2237 if (r < 0) {
2238 res.r = r;
2239 goto out;
2240 }
2241 bufferlist trimmed;
2242 trimmed.substr_of(
2243 bl,
2244 read.get<0>() - adjusted.first,
2245 MIN(read.get<1>(),
2246 bl.length() - (read.get<0>() - adjusted.first)));
2247 result.insert(
2248 read.get<0>(), trimmed.length(), std::move(trimmed));
2249 res.returned.pop_front();
2250 }
2251out:
2252 status->complete_object(hoid, res.r, std::move(result));
2253 ec->kick_reads();
2254 }
2255};
2256
2257void ECBackend::objects_read_and_reconstruct(
2258 const map<hobject_t,
2259 std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
2260 > &reads,
2261 bool fast_read,
2262 GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
2263{
2264 in_progress_client_reads.emplace_back(
2265 reads.size(), std::move(func));
2266 if (!reads.size()) {
2267 kick_reads();
2268 return;
2269 }
2270
2271 set<int> want_to_read;
2272 get_want_to_read_shards(&want_to_read);
2273
2274 map<hobject_t, read_request_t> for_read_op;
2275 for (auto &&to_read: reads) {
2276 set<pg_shard_t> shards;
2277 int r = get_min_avail_to_read_shards(
2278 to_read.first,
2279 want_to_read,
2280 false,
2281 fast_read,
2282 &shards);
2283 assert(r == 0);
2284
2285 CallClientContexts *c = new CallClientContexts(
2286 to_read.first,
2287 this,
2288 &(in_progress_client_reads.back()),
2289 to_read.second);
2290 for_read_op.insert(
2291 make_pair(
2292 to_read.first,
2293 read_request_t(
2294 to_read.second,
2295 shards,
2296 false,
2297 c)));
2298 }
2299
2300 start_read_op(
2301 CEPH_MSG_PRIO_DEFAULT,
2302 for_read_op,
2303 OpRequestRef(),
2304 fast_read, false);
2305 return;
2306}
2307
2308
2309int ECBackend::send_all_remaining_reads(
2310 const hobject_t &hoid,
2311 ReadOp &rop)
2312{
2313 set<int> already_read;
2314 const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
2315 for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
2316 already_read.insert(i->shard);
2317 dout(10) << __func__ << " have/error shards=" << already_read << dendl;
2318 set<pg_shard_t> shards;
2319 int r = get_remaining_shards(hoid, already_read, &shards);
2320 if (r)
2321 return r;
2322 if (shards.empty())
2323 return -EIO;
2324
2325 dout(10) << __func__ << " Read remaining shards " << shards << dendl;
2326
2327 // TODOSAM: this doesn't seem right
2328 list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
2329 rop.to_read.find(hoid)->second.to_read;
2330 GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
2331 rop.to_read.find(hoid)->second.cb;
2332
2333 map<hobject_t, read_request_t> for_read_op;
2334 for_read_op.insert(
2335 make_pair(
2336 hoid,
2337 read_request_t(
2338 offsets,
2339 shards,
2340 false,
2341 c)));
2342
2343 rop.to_read.swap(for_read_op);
2344 do_read_op(rop);
2345 return 0;
2346}
2347
2348int ECBackend::objects_get_attrs(
2349 const hobject_t &hoid,
2350 map<string, bufferlist> *out)
2351{
2352 int r = store->getattrs(
2353 ch,
2354 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2355 *out);
2356 if (r < 0)
2357 return r;
2358
2359 for (map<string, bufferlist>::iterator i = out->begin();
2360 i != out->end();
2361 ) {
2362 if (ECUtil::is_hinfo_key_string(i->first))
2363 out->erase(i++);
2364 else
2365 ++i;
2366 }
2367 return r;
2368}
2369
2370void ECBackend::rollback_append(
2371 const hobject_t &hoid,
2372 uint64_t old_size,
2373 ObjectStore::Transaction *t)
2374{
2375 assert(old_size % sinfo.get_stripe_width() == 0);
2376 t->truncate(
2377 coll,
2378 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2379 sinfo.aligned_logical_offset_to_chunk_offset(
2380 old_size));
2381}
2382
2383void ECBackend::be_deep_scrub(
2384 const hobject_t &poid,
2385 uint32_t seed,
2386 ScrubMap::object &o,
2387 ThreadPool::TPHandle &handle) {
2388 bufferhash h(-1); // we always used -1
2389 int r;
2390 uint64_t stride = cct->_conf->osd_deep_scrub_stride;
2391 if (stride % sinfo.get_chunk_size())
2392 stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
2393 uint64_t pos = 0;
2394
2395 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
2396
2397 while (true) {
2398 bufferlist bl;
2399 handle.reset_tp_timeout();
2400 r = store->read(
2401 ch,
2402 ghobject_t(
2403 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2404 pos,
2405 stride, bl,
224ce89b 2406 fadvise_flags);
7c673cae
FG
2407 if (r < 0)
2408 break;
2409 if (bl.length() % sinfo.get_chunk_size()) {
2410 r = -EIO;
2411 break;
2412 }
2413 pos += r;
2414 h << bl;
2415 if ((unsigned)r < stride)
2416 break;
2417 }
2418
2419 if (r == -EIO) {
2420 dout(0) << "_scan_list " << poid << " got "
2421 << r << " on read, read_error" << dendl;
2422 o.read_error = true;
2423 return;
2424 }
2425
2426 ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
2427 if (!hinfo) {
2428 dout(0) << "_scan_list " << poid << " could not retrieve hash info" << dendl;
2429 o.read_error = true;
2430 o.digest_present = false;
2431 return;
2432 } else {
2433 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2434 assert(hinfo->has_chunk_hash());
2435 if (hinfo->get_total_chunk_size() != pos) {
2436 dout(0) << "_scan_list " << poid << " got incorrect size on read" << dendl;
2437 o.ec_size_mismatch = true;
2438 return;
2439 }
2440
2441 if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) {
2442 dout(0) << "_scan_list " << poid << " got incorrect hash on read" << dendl;
2443 o.ec_hash_mismatch = true;
2444 return;
2445 }
2446
2447 /* We checked above that we match our own stored hash. We cannot
2448 * send a hash of the actual object, so instead we simply send
2449 * our locally stored hash of shard 0 on the assumption that if
2450 * we match our chunk hash and our recollection of the hash for
2451 * chunk 0 matches that of our peers, there is likely no corruption.
2452 */
2453 o.digest = hinfo->get_chunk_hash(0);
2454 o.digest_present = true;
2455 } else {
2456 /* Hack! We must be using partial overwrites, and partial overwrites
2457 * don't support deep-scrub yet
2458 */
2459 o.digest = 0;
2460 o.digest_present = true;
2461 }
2462 }
2463
2464 o.omap_digest = seed;
2465 o.omap_digest_present = true;
2466}