]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2013 Inktank Storage, Inc. | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef ECBACKEND_H | |
16 | #define ECBACKEND_H | |
17 | ||
18 | #include <boost/intrusive/set.hpp> | |
19 | #include <boost/intrusive/list.hpp> | |
20 | ||
21 | #include "OSD.h" | |
22 | #include "PGBackend.h" | |
23 | #include "erasure-code/ErasureCodeInterface.h" | |
24 | #include "ECUtil.h" | |
25 | #include "ECTransaction.h" | |
26 | #include "ExtentCache.h" | |
27 | ||
28 | //forward declaration | |
29 | struct ECSubWrite; | |
30 | struct ECSubWriteReply; | |
31 | struct ECSubRead; | |
32 | struct ECSubReadReply; | |
33 | ||
34 | struct RecoveryMessages; | |
35 | class ECBackend : public PGBackend { | |
36 | public: | |
37 | RecoveryHandle *open_recovery_op() override; | |
38 | ||
39 | void run_recovery_op( | |
40 | RecoveryHandle *h, | |
41 | int priority | |
42 | ) override; | |
43 | ||
224ce89b | 44 | int recover_object( |
7c673cae FG |
45 | const hobject_t &hoid, |
46 | eversion_t v, | |
47 | ObjectContextRef head, | |
48 | ObjectContextRef obc, | |
49 | RecoveryHandle *h | |
50 | ) override; | |
51 | ||
c07f9fc5 | 52 | bool _handle_message( |
7c673cae FG |
53 | OpRequestRef op |
54 | ) override; | |
55 | bool can_handle_while_inactive( | |
56 | OpRequestRef op | |
57 | ) override; | |
58 | friend struct SubWriteApplied; | |
59 | friend struct SubWriteCommitted; | |
7c673cae FG |
60 | void sub_write_committed( |
61 | ceph_tid_t tid, | |
62 | eversion_t version, | |
63 | eversion_t last_complete, | |
64 | const ZTracer::Trace &trace); | |
65 | void handle_sub_write( | |
66 | pg_shard_t from, | |
67 | OpRequestRef msg, | |
68 | ECSubWrite &op, | |
11fdf7f2 | 69 | const ZTracer::Trace &trace |
7c673cae FG |
70 | ); |
71 | void handle_sub_read( | |
72 | pg_shard_t from, | |
73 | const ECSubRead &op, | |
74 | ECSubReadReply *reply, | |
75 | const ZTracer::Trace &trace | |
76 | ); | |
77 | void handle_sub_write_reply( | |
78 | pg_shard_t from, | |
79 | const ECSubWriteReply &op, | |
80 | const ZTracer::Trace &trace | |
81 | ); | |
82 | void handle_sub_read_reply( | |
83 | pg_shard_t from, | |
84 | ECSubReadReply &op, | |
85 | RecoveryMessages *m, | |
86 | const ZTracer::Trace &trace | |
87 | ); | |
88 | ||
89 | /// @see ReadOp below | |
90 | void check_recovery_sources(const OSDMapRef& osdmap) override; | |
91 | ||
92 | void on_change() override; | |
93 | void clear_recovery_state() override; | |
94 | ||
f67539c2 | 95 | void dump_recovery_info(ceph::Formatter *f) const override; |
7c673cae FG |
96 | |
97 | void call_write_ordered(std::function<void(void)> &&cb) override; | |
98 | ||
99 | void submit_transaction( | |
100 | const hobject_t &hoid, | |
101 | const object_stat_sum_t &delta_stats, | |
102 | const eversion_t &at_version, | |
103 | PGTransactionUPtr &&t, | |
104 | const eversion_t &trim_to, | |
9f95a23c | 105 | const eversion_t &min_last_complete_ondisk, |
f67539c2 | 106 | std::vector<pg_log_entry_t>&& log_entries, |
9f95a23c | 107 | std::optional<pg_hit_set_history_t> &hset_history, |
7c673cae FG |
108 | Context *on_all_commit, |
109 | ceph_tid_t tid, | |
110 | osd_reqid_t reqid, | |
111 | OpRequestRef op | |
112 | ) override; | |
113 | ||
114 | int objects_read_sync( | |
115 | const hobject_t &hoid, | |
116 | uint64_t off, | |
117 | uint64_t len, | |
118 | uint32_t op_flags, | |
f67539c2 | 119 | ceph::buffer::list *bl) override; |
7c673cae FG |
120 | |
121 | /** | |
122 | * Async read mechanism | |
123 | * | |
124 | * Async reads use the same async read mechanism as does recovery. | |
125 | * CallClientContexts is responsible for reconstructing the response | |
126 | * buffer as well as for calling the callbacks. | |
127 | * | |
128 | * One tricky bit is that two reads may possibly not read from the same | |
f67539c2 | 129 | * std::set of replicas. This could result in two reads completing in the |
7c673cae FG |
130 | * wrong (from the interface user's point of view) order. Thus, we |
131 | * maintain a queue of in progress reads (@see in_progress_client_reads) | |
132 | * to ensure that we always call the completion callback in order. | |
133 | * | |
11fdf7f2 | 134 | * Another subtly is that while we may read a degraded object, we will |
f67539c2 | 135 | * still only perform a client read from shards in the acting std::set. This |
7c673cae FG |
136 | * ensures that we won't ever have to restart a client initiated read in |
137 | * check_recovery_sources. | |
138 | */ | |
139 | void objects_read_and_reconstruct( | |
f67539c2 | 140 | const std::map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > |
7c673cae FG |
141 | > &reads, |
142 | bool fast_read, | |
f67539c2 | 143 | GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func); |
7c673cae FG |
144 | |
145 | friend struct CallClientContexts; | |
146 | struct ClientAsyncReadStatus { | |
147 | unsigned objects_to_read; | |
f67539c2 TL |
148 | GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> func; |
149 | std::map<hobject_t,std::pair<int, extent_map> > results; | |
7c673cae FG |
150 | explicit ClientAsyncReadStatus( |
151 | unsigned objects_to_read, | |
f67539c2 | 152 | GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func) |
7c673cae FG |
153 | : objects_to_read(objects_to_read), func(std::move(func)) {} |
154 | void complete_object( | |
155 | const hobject_t &hoid, | |
156 | int err, | |
157 | extent_map &&buffers) { | |
11fdf7f2 | 158 | ceph_assert(objects_to_read); |
7c673cae | 159 | --objects_to_read; |
11fdf7f2 | 160 | ceph_assert(!results.count(hoid)); |
f67539c2 | 161 | results.emplace(hoid, std::make_pair(err, std::move(buffers))); |
7c673cae FG |
162 | } |
163 | bool is_complete() const { | |
164 | return objects_to_read == 0; | |
165 | } | |
166 | void run() { | |
167 | func.release()->complete(std::move(results)); | |
168 | } | |
169 | }; | |
f67539c2 | 170 | std::list<ClientAsyncReadStatus> in_progress_client_reads; |
7c673cae FG |
171 | void objects_read_async( |
172 | const hobject_t &hoid, | |
f67539c2 TL |
173 | const std::list<std::pair<boost::tuple<uint64_t, uint64_t, uint32_t>, |
174 | std::pair<ceph::buffer::list*, Context*> > > &to_read, | |
7c673cae FG |
175 | Context *on_complete, |
176 | bool fast_read = false) override; | |
177 | ||
178 | template <typename Func> | |
179 | void objects_read_async_no_cache( | |
f67539c2 | 180 | const std::map<hobject_t,extent_set> &to_read, |
7c673cae | 181 | Func &&on_complete) { |
f67539c2 | 182 | std::map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > > _to_read; |
7c673cae FG |
183 | for (auto &&hpair: to_read) { |
184 | auto &l = _to_read[hpair.first]; | |
185 | for (auto extent: hpair.second) { | |
186 | l.emplace_back(extent.first, extent.second, 0); | |
187 | } | |
188 | } | |
189 | objects_read_and_reconstruct( | |
190 | _to_read, | |
191 | false, | |
192 | make_gen_lambda_context< | |
f67539c2 | 193 | std::map<hobject_t,std::pair<int, extent_map> > &&, Func>( |
7c673cae FG |
194 | std::forward<Func>(on_complete))); |
195 | } | |
196 | void kick_reads() { | |
197 | while (in_progress_client_reads.size() && | |
198 | in_progress_client_reads.front().is_complete()) { | |
199 | in_progress_client_reads.front().run(); | |
200 | in_progress_client_reads.pop_front(); | |
201 | } | |
202 | } | |
203 | ||
204 | private: | |
205 | friend struct ECRecoveryHandle; | |
206 | uint64_t get_recovery_chunk_size() const { | |
11fdf7f2 | 207 | return round_up_to(cct->_conf->osd_recovery_max_chunk, |
7c673cae FG |
208 | sinfo.get_stripe_width()); |
209 | } | |
210 | ||
f67539c2 TL |
211 | void get_want_to_read_shards(std::set<int> *want_to_read) const { |
212 | const std::vector<int> &chunk_mapping = ec_impl->get_chunk_mapping(); | |
7c673cae FG |
213 | for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) { |
214 | int chunk = (int)chunk_mapping.size() > i ? chunk_mapping[i] : i; | |
215 | want_to_read->insert(chunk); | |
216 | } | |
217 | } | |
218 | ||
219 | /** | |
220 | * Recovery | |
221 | * | |
222 | * Recovery uses the same underlying read mechanism as client reads | |
223 | * with the slight difference that recovery reads may come from non | |
224 | * acting shards. Thus, check_recovery_sources may wind up calling | |
225 | * cancel_pull for a read originating with RecoveryOp. | |
226 | * | |
227 | * The recovery process is expressed as a state machine: | |
228 | * - IDLE: Nothing is currently in progress, reads will be started and | |
229 | * we will transition to READING | |
230 | * - READING: We are awaiting a pending read op. Once complete, we will | |
231 | * decode the buffers and proceed to WRITING | |
232 | * - WRITING: We are awaiting a completed push. Once complete, we will | |
233 | * either transition to COMPLETE or to IDLE to continue. | |
234 | * - COMPLETE: complete | |
235 | * | |
236 | * We use the existing Push and PushReply messages and structures to | |
237 | * handle actually shuffling the data over to the replicas. recovery_info | |
238 | * and recovery_progress are expressed in terms of the logical offset | |
239 | * space except for data_included which is in terms of the chunked object | |
240 | * space (to match the passed buffer). | |
241 | * | |
242 | * xattrs are requested on the first read and used to initialize the | |
243 | * object_context if missing on completion of the first read. | |
244 | * | |
245 | * In order to batch up reads and writes, we batch Push, PushReply, | |
246 | * Transaction, and reads in a RecoveryMessages object which is passed | |
247 | * among the recovery methods. | |
248 | */ | |
249 | struct RecoveryOp { | |
250 | hobject_t hoid; | |
251 | eversion_t v; | |
f67539c2 TL |
252 | std::set<pg_shard_t> missing_on; |
253 | std::set<shard_id_t> missing_on_shards; | |
7c673cae FG |
254 | |
255 | ObjectRecoveryInfo recovery_info; | |
256 | ObjectRecoveryProgress recovery_progress; | |
257 | ||
258 | enum state_t { IDLE, READING, WRITING, COMPLETE } state; | |
259 | ||
260 | static const char* tostr(state_t state) { | |
261 | switch (state) { | |
262 | case ECBackend::RecoveryOp::IDLE: | |
263 | return "IDLE"; | |
7c673cae FG |
264 | case ECBackend::RecoveryOp::READING: |
265 | return "READING"; | |
7c673cae FG |
266 | case ECBackend::RecoveryOp::WRITING: |
267 | return "WRITING"; | |
7c673cae FG |
268 | case ECBackend::RecoveryOp::COMPLETE: |
269 | return "COMPLETE"; | |
7c673cae FG |
270 | default: |
271 | ceph_abort(); | |
272 | return ""; | |
273 | } | |
274 | } | |
275 | ||
276 | // must be filled if state == WRITING | |
f67539c2 | 277 | std::map<int, ceph::buffer::list> returned_data; |
20effc67 | 278 | std::map<std::string, ceph::buffer::list, std::less<>> xattrs; |
7c673cae FG |
279 | ECUtil::HashInfoRef hinfo; |
280 | ObjectContextRef obc; | |
f67539c2 | 281 | std::set<pg_shard_t> waiting_on_pushes; |
7c673cae FG |
282 | |
283 | // valid in state READING | |
f67539c2 | 284 | std::pair<uint64_t, uint64_t> extent_requested; |
7c673cae | 285 | |
f67539c2 | 286 | void dump(ceph::Formatter *f) const; |
7c673cae FG |
287 | |
288 | RecoveryOp() : state(IDLE) {} | |
289 | }; | |
290 | friend ostream &operator<<(ostream &lhs, const RecoveryOp &rhs); | |
f67539c2 | 291 | std::map<hobject_t, RecoveryOp> recovery_ops; |
7c673cae FG |
292 | |
293 | void continue_recovery_op( | |
294 | RecoveryOp &op, | |
295 | RecoveryMessages *m); | |
296 | void dispatch_recovery_messages(RecoveryMessages &m, int priority); | |
297 | friend struct OnRecoveryReadComplete; | |
298 | void handle_recovery_read_complete( | |
299 | const hobject_t &hoid, | |
f67539c2 | 300 | boost::tuple<uint64_t, uint64_t, std::map<pg_shard_t, ceph::buffer::list> > &to_read, |
20effc67 | 301 | std::optional<std::map<std::string, ceph::buffer::list, std::less<>> > attrs, |
7c673cae FG |
302 | RecoveryMessages *m); |
303 | void handle_recovery_push( | |
304 | const PushOp &op, | |
11fdf7f2 TL |
305 | RecoveryMessages *m, |
306 | bool is_repair); | |
7c673cae FG |
307 | void handle_recovery_push_reply( |
308 | const PushReplyOp &op, | |
309 | pg_shard_t from, | |
310 | RecoveryMessages *m); | |
b32b8144 FG |
311 | void get_all_avail_shards( |
312 | const hobject_t &hoid, | |
f67539c2 TL |
313 | const std::set<pg_shard_t> &error_shards, |
314 | std::set<int> &have, | |
315 | std::map<shard_id_t, pg_shard_t> &shards, | |
b32b8144 | 316 | bool for_recovery); |
7c673cae FG |
317 | |
318 | public: | |
319 | /** | |
320 | * Low level async read mechanism | |
321 | * | |
322 | * To avoid duplicating the logic for requesting and waiting for | |
323 | * multiple object shards, there is a common async read mechanism | |
f67539c2 | 324 | * taking a std::map of hobject_t->read_request_t which defines callbacks |
7c673cae FG |
325 | * taking read_result_ts as arguments. |
326 | * | |
327 | * tid_to_read_map gives open read ops. check_recovery_sources uses | |
328 | * shard_to_read_map and ReadOp::source_to_obj to restart reads | |
329 | * involving down osds. | |
330 | * | |
331 | * The user is responsible for specifying replicas on which to read | |
332 | * and for reassembling the buffer on the other side since client | |
333 | * reads require the original object buffer while recovery only needs | |
334 | * the missing pieces. | |
335 | * | |
336 | * Rather than handling reads on the primary directly, we simply send | |
337 | * ourselves a message. This avoids a dedicated primary path for that | |
338 | * part. | |
339 | */ | |
340 | struct read_result_t { | |
341 | int r; | |
f67539c2 | 342 | std::map<pg_shard_t, int> errors; |
20effc67 | 343 | std::optional<std::map<std::string, ceph::buffer::list, std::less<>> > attrs; |
f67539c2 | 344 | std::list< |
7c673cae | 345 | boost::tuple< |
f67539c2 | 346 | uint64_t, uint64_t, std::map<pg_shard_t, ceph::buffer::list> > > returned; |
7c673cae FG |
347 | read_result_t() : r(0) {} |
348 | }; | |
349 | struct read_request_t { | |
f67539c2 TL |
350 | const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read; |
351 | std::map<pg_shard_t, std::vector<std::pair<int, int>>> need; | |
352 | bool want_attrs; | |
353 | GenContext<std::pair<RecoveryMessages *, read_result_t& > &> *cb; | |
7c673cae | 354 | read_request_t( |
f67539c2 TL |
355 | const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read, |
356 | const std::map<pg_shard_t, std::vector<std::pair<int, int>>> &need, | |
7c673cae | 357 | bool want_attrs, |
f67539c2 | 358 | GenContext<std::pair<RecoveryMessages *, read_result_t& > &> *cb) |
7c673cae FG |
359 | : to_read(to_read), need(need), want_attrs(want_attrs), |
360 | cb(cb) {} | |
361 | }; | |
362 | friend ostream &operator<<(ostream &lhs, const read_request_t &rhs); | |
363 | ||
364 | struct ReadOp { | |
365 | int priority; | |
366 | ceph_tid_t tid; | |
367 | OpRequestRef op; // may be null if not on behalf of a client | |
368 | // True if redundant reads are issued, false otherwise, | |
369 | // this is useful to tradeoff some resources (redundant ops) for | |
370 | // low latency read, especially on relatively idle cluster | |
371 | bool do_redundant_reads; | |
372 | // True if reading for recovery which could possibly reading only a subset | |
373 | // of the available shards. | |
374 | bool for_recovery; | |
375 | ||
376 | ZTracer::Trace trace; | |
377 | ||
f67539c2 TL |
378 | std::map<hobject_t, std::set<int>> want_to_read; |
379 | std::map<hobject_t, read_request_t> to_read; | |
380 | std::map<hobject_t, read_result_t> complete; | |
7c673cae | 381 | |
f67539c2 TL |
382 | std::map<hobject_t, std::set<pg_shard_t>> obj_to_source; |
383 | std::map<pg_shard_t, std::set<hobject_t> > source_to_obj; | |
7c673cae | 384 | |
f67539c2 | 385 | void dump(ceph::Formatter *f) const; |
7c673cae | 386 | |
f67539c2 | 387 | std::set<pg_shard_t> in_progress; |
7c673cae FG |
388 | |
389 | ReadOp( | |
390 | int priority, | |
391 | ceph_tid_t tid, | |
392 | bool do_redundant_reads, | |
393 | bool for_recovery, | |
394 | OpRequestRef op, | |
f67539c2 TL |
395 | std::map<hobject_t, std::set<int>> &&_want_to_read, |
396 | std::map<hobject_t, read_request_t> &&_to_read) | |
7c673cae | 397 | : priority(priority), tid(tid), op(op), do_redundant_reads(do_redundant_reads), |
28e407b8 AA |
398 | for_recovery(for_recovery), want_to_read(std::move(_want_to_read)), |
399 | to_read(std::move(_to_read)) { | |
7c673cae FG |
400 | for (auto &&hpair: to_read) { |
401 | auto &returned = complete[hpair.first].returned; | |
402 | for (auto &&extent: hpair.second.to_read) { | |
403 | returned.push_back( | |
404 | boost::make_tuple( | |
405 | extent.get<0>(), | |
406 | extent.get<1>(), | |
f67539c2 | 407 | std::map<pg_shard_t, ceph::buffer::list>())); |
7c673cae FG |
408 | } |
409 | } | |
410 | } | |
411 | ReadOp() = delete; | |
412 | ReadOp(const ReadOp &) = default; | |
413 | ReadOp(ReadOp &&) = default; | |
414 | }; | |
415 | friend struct FinishReadOp; | |
416 | void filter_read_op( | |
417 | const OSDMapRef& osdmap, | |
418 | ReadOp &op); | |
419 | void complete_read_op(ReadOp &rop, RecoveryMessages *m); | |
420 | friend ostream &operator<<(ostream &lhs, const ReadOp &rhs); | |
f67539c2 TL |
421 | std::map<ceph_tid_t, ReadOp> tid_to_read_map; |
422 | std::map<pg_shard_t, std::set<ceph_tid_t> > shard_to_read_map; | |
7c673cae FG |
423 | void start_read_op( |
424 | int priority, | |
f67539c2 TL |
425 | std::map<hobject_t, std::set<int>> &want_to_read, |
426 | std::map<hobject_t, read_request_t> &to_read, | |
7c673cae FG |
427 | OpRequestRef op, |
428 | bool do_redundant_reads, bool for_recovery); | |
429 | ||
430 | void do_read_op(ReadOp &rop); | |
431 | int send_all_remaining_reads( | |
432 | const hobject_t &hoid, | |
433 | ReadOp &rop); | |
434 | ||
435 | ||
436 | /** | |
437 | * Client writes | |
438 | * | |
439 | * ECTransaction is responsible for generating a transaction for | |
440 | * each shard to which we need to send the write. As required | |
441 | * by the PGBackend interface, the ECBackend write mechanism | |
442 | * passes trim information with the write and last_complete back | |
443 | * with the reply. | |
444 | * | |
445 | * As with client reads, there is a possibility of out-of-order | |
446 | * completions. Thus, callbacks and completion are called in order | |
f67539c2 | 447 | * on the writing std::list. |
7c673cae FG |
448 | */ |
449 | struct Op : boost::intrusive::list_base_hook<> { | |
11fdf7f2 | 450 | /// From submit_transaction caller, describes operation |
7c673cae FG |
451 | hobject_t hoid; |
452 | object_stat_sum_t delta_stats; | |
453 | eversion_t version; | |
454 | eversion_t trim_to; | |
9f95a23c | 455 | std::optional<pg_hit_set_history_t> updated_hit_set_history; |
f67539c2 | 456 | std::vector<pg_log_entry_t> log_entries; |
7c673cae FG |
457 | ceph_tid_t tid; |
458 | osd_reqid_t reqid; | |
459 | ZTracer::Trace trace; | |
460 | ||
461 | eversion_t roll_forward_to; /// Soon to be generated internally | |
462 | ||
463 | /// Ancillary also provided from submit_transaction caller | |
f67539c2 | 464 | std::map<hobject_t, ObjectContextRef> obc_map; |
7c673cae FG |
465 | |
466 | /// see call_write_ordered | |
467 | std::list<std::function<void(void)> > on_write; | |
468 | ||
469 | /// Generated internally | |
f67539c2 TL |
470 | std::set<hobject_t> temp_added; |
471 | std::set<hobject_t> temp_cleared; | |
7c673cae FG |
472 | |
473 | ECTransaction::WritePlan plan; | |
474 | bool requires_rmw() const { return !plan.to_read.empty(); } | |
475 | bool invalidates_cache() const { return plan.invalidates_cache; } | |
476 | ||
477 | // must be true if requires_rmw(), must be false if invalidates_cache() | |
11fdf7f2 | 478 | bool using_cache = true; |
7c673cae FG |
479 | |
480 | /// In progress read state; | |
f67539c2 TL |
481 | std::map<hobject_t,extent_set> pending_read; // subset already being read |
482 | std::map<hobject_t,extent_set> remote_read; // subset we must read | |
483 | std::map<hobject_t,extent_map> remote_read_result; | |
7c673cae FG |
484 | bool read_in_progress() const { |
485 | return !remote_read.empty() && remote_read_result.empty(); | |
486 | } | |
487 | ||
11fdf7f2 | 488 | /// In progress write state. |
f67539c2 | 489 | std::set<pg_shard_t> pending_commit; |
11fdf7f2 TL |
490 | // we need pending_apply for pre-mimic peers so that we don't issue a |
491 | // read on a remote shard before it has applied a previous write. We can | |
492 | // remove this after nautilus. | |
f67539c2 | 493 | std::set<pg_shard_t> pending_apply; |
7c673cae FG |
494 | bool write_in_progress() const { |
495 | return !pending_commit.empty() || !pending_apply.empty(); | |
496 | } | |
497 | ||
498 | /// optional, may be null, for tracking purposes | |
499 | OpRequestRef client_op; | |
500 | ||
501 | /// pin for cache | |
502 | ExtentCache::write_pin pin; | |
503 | ||
504 | /// Callbacks | |
7c673cae FG |
505 | Context *on_all_commit = nullptr; |
506 | ~Op() { | |
7c673cae FG |
507 | delete on_all_commit; |
508 | } | |
509 | }; | |
510 | using op_list = boost::intrusive::list<Op>; | |
511 | friend ostream &operator<<(ostream &lhs, const Op &rhs); | |
512 | ||
513 | ExtentCache cache; | |
f67539c2 | 514 | std::map<ceph_tid_t, Op> tid_to_op_map; /// Owns Op structure |
7c673cae FG |
515 | |
516 | /** | |
f67539c2 | 517 | * We model the possible rmw states as a std::set of waitlists. |
7c673cae FG |
518 | * All writes at this time complete in order, so a write blocked |
519 | * at waiting_state blocks all writes behind it as well (same for | |
520 | * other states). | |
521 | * | |
522 | * Future work: We can break this up into a per-object pipeline | |
523 | * (almost). First, provide an ordering token to submit_transaction | |
524 | * and require that all operations within a single transaction take | |
525 | * place on a subset of hobject_t space partitioned by that token | |
526 | * (the hashid seem about right to me -- even works for temp objects | |
527 | * if you recall that a temp object created for object head foo will | |
528 | * only ever be referenced by other transactions on foo and aren't | |
529 | * reused). Next, factor this part into a class and maintain one per | |
530 | * ordering token. Next, fixup PrimaryLogPG's repop queue to be | |
531 | * partitioned by ordering token. Finally, refactor the op pipeline | |
11fdf7f2 | 532 | * so that the log entries passed into submit_transaction aren't |
7c673cae FG |
533 | * versioned. We can't assign versions to them until we actually |
534 | * submit the operation. That's probably going to be the hard part. | |
535 | */ | |
536 | class pipeline_state_t { | |
537 | enum { | |
538 | CACHE_VALID = 0, | |
539 | CACHE_INVALID = 1 | |
540 | } pipeline_state = CACHE_VALID; | |
541 | public: | |
542 | bool caching_enabled() const { | |
543 | return pipeline_state == CACHE_VALID; | |
544 | } | |
545 | bool cache_invalid() const { | |
546 | return !caching_enabled(); | |
547 | } | |
548 | void invalidate() { | |
549 | pipeline_state = CACHE_INVALID; | |
550 | } | |
551 | void clear() { | |
552 | pipeline_state = CACHE_VALID; | |
553 | } | |
554 | friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs); | |
555 | } pipeline_state; | |
556 | ||
557 | ||
558 | op_list waiting_state; /// writes waiting on pipe_state | |
559 | op_list waiting_reads; /// writes waiting on partial stripe reads | |
560 | op_list waiting_commit; /// writes waiting on initial commit | |
561 | eversion_t completed_to; | |
562 | eversion_t committed_to; | |
563 | void start_rmw(Op *op, PGTransactionUPtr &&t); | |
564 | bool try_state_to_reads(); | |
565 | bool try_reads_to_commit(); | |
566 | bool try_finish_rmw(); | |
567 | void check_ops(); | |
568 | ||
f67539c2 | 569 | ceph::ErasureCodeInterfaceRef ec_impl; |
7c673cae FG |
570 | |
571 | ||
572 | /** | |
573 | * ECRecPred | |
574 | * | |
11fdf7f2 | 575 | * Determines the whether _have is sufficient to recover an object |
7c673cae FG |
576 | */ |
577 | class ECRecPred : public IsPGRecoverablePredicate { | |
f67539c2 TL |
578 | std::set<int> want; |
579 | ceph::ErasureCodeInterfaceRef ec_impl; | |
7c673cae | 580 | public: |
f67539c2 | 581 | explicit ECRecPred(ceph::ErasureCodeInterfaceRef ec_impl) : ec_impl(ec_impl) { |
7c673cae FG |
582 | for (unsigned i = 0; i < ec_impl->get_chunk_count(); ++i) { |
583 | want.insert(i); | |
584 | } | |
585 | } | |
f67539c2 TL |
586 | bool operator()(const std::set<pg_shard_t> &_have) const override { |
587 | std::set<int> have; | |
588 | for (std::set<pg_shard_t>::const_iterator i = _have.begin(); | |
7c673cae FG |
589 | i != _have.end(); |
590 | ++i) { | |
591 | have.insert(i->shard); | |
592 | } | |
f67539c2 | 593 | std::map<int, std::vector<std::pair<int, int>>> min; |
7c673cae FG |
594 | return ec_impl->minimum_to_decode(want, have, &min) == 0; |
595 | } | |
596 | }; | |
11fdf7f2 | 597 | IsPGRecoverablePredicate *get_is_recoverable_predicate() const override { |
7c673cae FG |
598 | return new ECRecPred(ec_impl); |
599 | } | |
600 | ||
11fdf7f2 TL |
601 | int get_ec_data_chunk_count() const override { |
602 | return ec_impl->get_data_chunk_count(); | |
603 | } | |
604 | int get_ec_stripe_chunk_size() const override { | |
605 | return sinfo.get_chunk_size(); | |
606 | } | |
607 | ||
7c673cae FG |
608 | /** |
609 | * ECReadPred | |
610 | * | |
11fdf7f2 | 611 | * Determines the whether _have is sufficient to read an object |
7c673cae FG |
612 | */ |
613 | class ECReadPred : public IsPGReadablePredicate { | |
614 | pg_shard_t whoami; | |
615 | ECRecPred rec_pred; | |
616 | public: | |
617 | ECReadPred( | |
618 | pg_shard_t whoami, | |
f67539c2 TL |
619 | ceph::ErasureCodeInterfaceRef ec_impl) : whoami(whoami), rec_pred(ec_impl) {} |
620 | bool operator()(const std::set<pg_shard_t> &_have) const override { | |
7c673cae FG |
621 | return _have.count(whoami) && rec_pred(_have); |
622 | } | |
623 | }; | |
11fdf7f2 | 624 | IsPGReadablePredicate *get_is_readable_predicate() const override { |
7c673cae FG |
625 | return new ECReadPred(get_parent()->whoami_shard(), ec_impl); |
626 | } | |
627 | ||
628 | ||
629 | const ECUtil::stripe_info_t sinfo; | |
630 | /// If modified, ensure that the ref is held until the update is applied | |
631 | SharedPtrRegistry<hobject_t, ECUtil::HashInfo> unstable_hashinfo_registry; | |
a4b75251 | 632 | ECUtil::HashInfoRef get_hash_info(const hobject_t &hoid, bool create = false, |
20effc67 | 633 | const std::map<std::string, ceph::buffer::ptr, std::less<>> *attr = NULL); |
7c673cae FG |
634 | |
635 | public: | |
636 | ECBackend( | |
637 | PGBackend::Listener *pg, | |
11fdf7f2 | 638 | const coll_t &coll, |
7c673cae FG |
639 | ObjectStore::CollectionHandle &ch, |
640 | ObjectStore *store, | |
641 | CephContext *cct, | |
f67539c2 | 642 | ceph::ErasureCodeInterfaceRef ec_impl, |
7c673cae FG |
643 | uint64_t stripe_width); |
644 | ||
645 | /// Returns to_read replicas sufficient to reconstruct want | |
646 | int get_min_avail_to_read_shards( | |
647 | const hobject_t &hoid, ///< [in] object | |
f67539c2 | 648 | const std::set<int> &want, ///< [in] desired shards |
7c673cae FG |
649 | bool for_recovery, ///< [in] true if we may use non-acting replicas |
650 | bool do_redundant_reads, ///< [in] true if we want to issue redundant reads to reduce latency | |
f67539c2 | 651 | std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read ///< [out] shards, corresponding subchunks to read |
7c673cae FG |
652 | ); ///< @return error code, 0 on success |
653 | ||
654 | int get_remaining_shards( | |
655 | const hobject_t &hoid, | |
f67539c2 TL |
656 | const std::set<int> &avail, |
657 | const std::set<int> &want, | |
28e407b8 | 658 | const read_result_t &result, |
f67539c2 | 659 | std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read, |
b32b8144 | 660 | bool for_recovery); |
7c673cae FG |
661 | |
662 | int objects_get_attrs( | |
663 | const hobject_t &hoid, | |
20effc67 | 664 | std::map<std::string, ceph::buffer::list, std::less<>> *out) override; |
7c673cae FG |
665 | |
666 | void rollback_append( | |
667 | const hobject_t &hoid, | |
668 | uint64_t old_size, | |
669 | ObjectStore::Transaction *t) override; | |
670 | ||
7c673cae FG |
671 | bool auto_repair_supported() const override { return true; } |
672 | ||
28e407b8 AA |
673 | int be_deep_scrub( |
674 | const hobject_t &poid, | |
675 | ScrubMap &map, | |
676 | ScrubMapBuilder &pos, | |
677 | ScrubMap::object &o) override; | |
7c673cae FG |
678 | uint64_t be_get_ondisk_size(uint64_t logical_size) override { |
679 | return sinfo.logical_to_next_chunk_offset(logical_size); | |
680 | } | |
681 | void _failed_push(const hobject_t &hoid, | |
f67539c2 | 682 | std::pair<RecoveryMessages *, ECBackend::read_result_t &> &in); |
7c673cae FG |
683 | }; |
684 | ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs); | |
685 | ||
686 | #endif |