]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2013 Inktank Storage, Inc. | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef REPBACKEND_H | |
16 | #define REPBACKEND_H | |
17 | ||
18 | #include "OSD.h" | |
19 | #include "PGBackend.h" | |
20 | #include "include/memory.h" | |
21 | ||
22 | struct C_ReplicatedBackend_OnPullComplete; | |
23 | class ReplicatedBackend : public PGBackend { | |
24 | struct RPGHandle : public PGBackend::RecoveryHandle { | |
25 | map<pg_shard_t, vector<PushOp> > pushes; | |
26 | map<pg_shard_t, vector<PullOp> > pulls; | |
27 | }; | |
28 | friend struct C_ReplicatedBackend_OnPullComplete; | |
29 | public: | |
30 | ReplicatedBackend( | |
31 | PGBackend::Listener *pg, | |
32 | coll_t coll, | |
33 | ObjectStore::CollectionHandle &ch, | |
34 | ObjectStore *store, | |
35 | CephContext *cct); | |
36 | ||
37 | /// @see PGBackend::open_recovery_op | |
38 | RPGHandle *_open_recovery_op() { | |
39 | return new RPGHandle(); | |
40 | } | |
41 | PGBackend::RecoveryHandle *open_recovery_op() override { | |
42 | return _open_recovery_op(); | |
43 | } | |
44 | ||
45 | /// @see PGBackend::run_recovery_op | |
46 | void run_recovery_op( | |
47 | PGBackend::RecoveryHandle *h, | |
48 | int priority) override; | |
49 | ||
50 | /// @see PGBackend::recover_object | |
224ce89b | 51 | int recover_object( |
7c673cae FG |
52 | const hobject_t &hoid, |
53 | eversion_t v, | |
54 | ObjectContextRef head, | |
55 | ObjectContextRef obc, | |
56 | RecoveryHandle *h | |
57 | ) override; | |
58 | ||
59 | void check_recovery_sources(const OSDMapRef& osdmap) override; | |
60 | ||
7c673cae FG |
61 | bool can_handle_while_inactive(OpRequestRef op) override; |
62 | ||
63 | /// @see PGBackend::handle_message | |
c07f9fc5 | 64 | bool _handle_message( |
7c673cae FG |
65 | OpRequestRef op |
66 | ) override; | |
67 | ||
68 | void on_change() override; | |
69 | void clear_recovery_state() override; | |
70 | void on_flushed() override; | |
71 | ||
72 | class RPCRecPred : public IsPGRecoverablePredicate { | |
73 | public: | |
74 | bool operator()(const set<pg_shard_t> &have) const override { | |
75 | return !have.empty(); | |
76 | } | |
77 | }; | |
78 | IsPGRecoverablePredicate *get_is_recoverable_predicate() override { | |
79 | return new RPCRecPred; | |
80 | } | |
81 | ||
82 | class RPCReadPred : public IsPGReadablePredicate { | |
83 | pg_shard_t whoami; | |
84 | public: | |
85 | explicit RPCReadPred(pg_shard_t whoami) : whoami(whoami) {} | |
86 | bool operator()(const set<pg_shard_t> &have) const override { | |
87 | return have.count(whoami); | |
88 | } | |
89 | }; | |
90 | IsPGReadablePredicate *get_is_readable_predicate() override { | |
91 | return new RPCReadPred(get_parent()->whoami_shard()); | |
92 | } | |
93 | ||
94 | void dump_recovery_info(Formatter *f) const override { | |
95 | { | |
96 | f->open_array_section("pull_from_peer"); | |
97 | for (map<pg_shard_t, set<hobject_t> >::const_iterator i = pull_from_peer.begin(); | |
98 | i != pull_from_peer.end(); | |
99 | ++i) { | |
100 | f->open_object_section("pulling_from"); | |
101 | f->dump_stream("pull_from") << i->first; | |
102 | { | |
103 | f->open_array_section("pulls"); | |
104 | for (set<hobject_t>::const_iterator j = i->second.begin(); | |
105 | j != i->second.end(); | |
106 | ++j) { | |
107 | f->open_object_section("pull_info"); | |
108 | assert(pulling.count(*j)); | |
109 | pulling.find(*j)->second.dump(f); | |
110 | f->close_section(); | |
111 | } | |
112 | f->close_section(); | |
113 | } | |
114 | f->close_section(); | |
115 | } | |
116 | f->close_section(); | |
117 | } | |
118 | { | |
119 | f->open_array_section("pushing"); | |
120 | for (map<hobject_t, map<pg_shard_t, PushInfo>>::const_iterator i = | |
121 | pushing.begin(); | |
122 | i != pushing.end(); | |
123 | ++i) { | |
124 | f->open_object_section("object"); | |
125 | f->dump_stream("pushing") << i->first; | |
126 | { | |
127 | f->open_array_section("pushing_to"); | |
128 | for (map<pg_shard_t, PushInfo>::const_iterator j = i->second.begin(); | |
129 | j != i->second.end(); | |
130 | ++j) { | |
131 | f->open_object_section("push_progress"); | |
132 | f->dump_stream("pushing_to") << j->first; | |
133 | { | |
134 | f->open_object_section("push_info"); | |
135 | j->second.dump(f); | |
136 | f->close_section(); | |
137 | } | |
138 | f->close_section(); | |
139 | } | |
140 | f->close_section(); | |
141 | } | |
142 | f->close_section(); | |
143 | } | |
144 | f->close_section(); | |
145 | } | |
146 | } | |
147 | ||
148 | int objects_read_sync( | |
149 | const hobject_t &hoid, | |
150 | uint64_t off, | |
151 | uint64_t len, | |
152 | uint32_t op_flags, | |
153 | bufferlist *bl) override; | |
154 | ||
155 | void objects_read_async( | |
156 | const hobject_t &hoid, | |
157 | const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>, | |
158 | pair<bufferlist*, Context*> > > &to_read, | |
159 | Context *on_complete, | |
160 | bool fast_read = false) override; | |
161 | ||
162 | private: | |
163 | // push | |
164 | struct PushInfo { | |
165 | ObjectRecoveryProgress recovery_progress; | |
166 | ObjectRecoveryInfo recovery_info; | |
167 | ObjectContextRef obc; | |
168 | object_stat_sum_t stat; | |
169 | ObcLockManager lock_manager; | |
170 | ||
171 | void dump(Formatter *f) const { | |
172 | { | |
173 | f->open_object_section("recovery_progress"); | |
174 | recovery_progress.dump(f); | |
175 | f->close_section(); | |
176 | } | |
177 | { | |
178 | f->open_object_section("recovery_info"); | |
179 | recovery_info.dump(f); | |
180 | f->close_section(); | |
181 | } | |
182 | } | |
183 | }; | |
184 | map<hobject_t, map<pg_shard_t, PushInfo>> pushing; | |
185 | ||
186 | // pull | |
187 | struct PullInfo { | |
188 | pg_shard_t from; | |
189 | hobject_t soid; | |
190 | ObjectRecoveryProgress recovery_progress; | |
191 | ObjectRecoveryInfo recovery_info; | |
192 | ObjectContextRef head_ctx; | |
193 | ObjectContextRef obc; | |
194 | object_stat_sum_t stat; | |
195 | bool cache_dont_need; | |
196 | ObcLockManager lock_manager; | |
197 | ||
198 | void dump(Formatter *f) const { | |
199 | { | |
200 | f->open_object_section("recovery_progress"); | |
201 | recovery_progress.dump(f); | |
202 | f->close_section(); | |
203 | } | |
204 | { | |
205 | f->open_object_section("recovery_info"); | |
206 | recovery_info.dump(f); | |
207 | f->close_section(); | |
208 | } | |
209 | } | |
210 | ||
211 | bool is_complete() const { | |
212 | return recovery_progress.is_complete(recovery_info); | |
213 | } | |
214 | }; | |
215 | ||
216 | map<hobject_t, PullInfo> pulling; | |
217 | ||
218 | // Reverse mapping from osd peer to objects beging pulled from that peer | |
219 | map<pg_shard_t, set<hobject_t> > pull_from_peer; | |
220 | void clear_pull( | |
221 | map<hobject_t, PullInfo>::iterator piter, | |
222 | bool clear_pull_from_peer = true); | |
223 | void clear_pull_from( | |
224 | map<hobject_t, PullInfo>::iterator piter); | |
225 | ||
226 | void _do_push(OpRequestRef op); | |
227 | void _do_pull_response(OpRequestRef op); | |
228 | void do_push(OpRequestRef op) { | |
229 | if (is_primary()) { | |
230 | _do_pull_response(op); | |
231 | } else { | |
232 | _do_push(op); | |
233 | } | |
234 | } | |
235 | void do_pull(OpRequestRef op); | |
236 | void do_push_reply(OpRequestRef op); | |
237 | ||
238 | bool handle_push_reply(pg_shard_t peer, const PushReplyOp &op, PushOp *reply); | |
239 | void handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply); | |
240 | ||
241 | struct pull_complete_info { | |
242 | hobject_t hoid; | |
243 | object_stat_sum_t stat; | |
244 | }; | |
245 | bool handle_pull_response( | |
246 | pg_shard_t from, const PushOp &op, PullOp *response, | |
247 | list<pull_complete_info> *to_continue, | |
248 | ObjectStore::Transaction *t); | |
249 | void handle_push(pg_shard_t from, const PushOp &op, PushReplyOp *response, | |
250 | ObjectStore::Transaction *t); | |
251 | ||
252 | static void trim_pushed_data(const interval_set<uint64_t> ©_subset, | |
253 | const interval_set<uint64_t> &intervals_received, | |
254 | bufferlist data_received, | |
255 | interval_set<uint64_t> *intervals_usable, | |
256 | bufferlist *data_usable); | |
224ce89b | 257 | void _failed_pull(pg_shard_t from, const hobject_t &soid); |
7c673cae FG |
258 | |
259 | void send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes); | |
260 | void prep_push_op_blank(const hobject_t& soid, PushOp *op); | |
261 | void send_pulls( | |
262 | int priority, | |
263 | map<pg_shard_t, vector<PullOp> > &pulls); | |
264 | ||
265 | int build_push_op(const ObjectRecoveryInfo &recovery_info, | |
266 | const ObjectRecoveryProgress &progress, | |
267 | ObjectRecoveryProgress *out_progress, | |
268 | PushOp *out_op, | |
269 | object_stat_sum_t *stat = 0, | |
270 | bool cache_dont_need = true); | |
271 | void submit_push_data(const ObjectRecoveryInfo &recovery_info, | |
272 | bool first, | |
273 | bool complete, | |
274 | bool cache_dont_need, | |
275 | const interval_set<uint64_t> &intervals_included, | |
276 | bufferlist data_included, | |
277 | bufferlist omap_header, | |
278 | const map<string, bufferlist> &attrs, | |
279 | const map<string, bufferlist> &omap_entries, | |
280 | ObjectStore::Transaction *t); | |
281 | void submit_push_complete(const ObjectRecoveryInfo &recovery_info, | |
282 | ObjectStore::Transaction *t); | |
283 | ||
284 | void calc_clone_subsets( | |
285 | SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing, | |
286 | const hobject_t &last_backfill, | |
287 | interval_set<uint64_t>& data_subset, | |
288 | map<hobject_t, interval_set<uint64_t>>& clone_subsets, | |
289 | ObcLockManager &lock_manager); | |
290 | void prepare_pull( | |
291 | eversion_t v, | |
292 | const hobject_t& soid, | |
293 | ObjectContextRef headctx, | |
294 | RPGHandle *h); | |
295 | int start_pushes( | |
296 | const hobject_t &soid, | |
297 | ObjectContextRef obj, | |
298 | RPGHandle *h); | |
224ce89b | 299 | int prep_push_to_replica( |
7c673cae FG |
300 | ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer, |
301 | PushOp *pop, bool cache_dont_need = true); | |
224ce89b | 302 | int prep_push( |
7c673cae FG |
303 | ObjectContextRef obc, |
304 | const hobject_t& oid, pg_shard_t dest, | |
305 | PushOp *op, | |
306 | bool cache_dont_need); | |
224ce89b | 307 | int prep_push( |
7c673cae FG |
308 | ObjectContextRef obc, |
309 | const hobject_t& soid, pg_shard_t peer, | |
310 | eversion_t version, | |
311 | interval_set<uint64_t> &data_subset, | |
312 | map<hobject_t, interval_set<uint64_t>>& clone_subsets, | |
313 | PushOp *op, | |
314 | bool cache, | |
315 | ObcLockManager &&lock_manager); | |
316 | void calc_head_subsets( | |
317 | ObjectContextRef obc, SnapSet& snapset, const hobject_t& head, | |
318 | const pg_missing_t& missing, | |
319 | const hobject_t &last_backfill, | |
320 | interval_set<uint64_t>& data_subset, | |
321 | map<hobject_t, interval_set<uint64_t>>& clone_subsets, | |
322 | ObcLockManager &lock_manager); | |
323 | ObjectRecoveryInfo recalc_subsets( | |
324 | const ObjectRecoveryInfo& recovery_info, | |
325 | SnapSetContext *ssc, | |
326 | ObcLockManager &lock_manager); | |
327 | ||
328 | /** | |
329 | * Client IO | |
330 | */ | |
331 | struct InProgressOp { | |
332 | ceph_tid_t tid; | |
333 | set<pg_shard_t> waiting_for_commit; | |
334 | set<pg_shard_t> waiting_for_applied; | |
335 | Context *on_commit; | |
336 | Context *on_applied; | |
337 | OpRequestRef op; | |
338 | eversion_t v; | |
339 | InProgressOp( | |
340 | ceph_tid_t tid, Context *on_commit, Context *on_applied, | |
341 | OpRequestRef op, eversion_t v) | |
342 | : tid(tid), on_commit(on_commit), on_applied(on_applied), | |
343 | op(op), v(v) {} | |
344 | bool done() const { | |
345 | return waiting_for_commit.empty() && | |
346 | waiting_for_applied.empty(); | |
347 | } | |
348 | }; | |
349 | map<ceph_tid_t, InProgressOp> in_progress_ops; | |
350 | public: | |
351 | friend class C_OSD_OnOpCommit; | |
352 | friend class C_OSD_OnOpApplied; | |
353 | ||
354 | void call_write_ordered(std::function<void(void)> &&cb) override { | |
355 | // ReplicatedBackend submits writes inline in submit_transaction, so | |
356 | // we can just call the callback. | |
357 | cb(); | |
358 | } | |
359 | ||
360 | void submit_transaction( | |
361 | const hobject_t &hoid, | |
362 | const object_stat_sum_t &delta_stats, | |
363 | const eversion_t &at_version, | |
364 | PGTransactionUPtr &&t, | |
365 | const eversion_t &trim_to, | |
366 | const eversion_t &roll_forward_to, | |
367 | const vector<pg_log_entry_t> &log_entries, | |
368 | boost::optional<pg_hit_set_history_t> &hset_history, | |
369 | Context *on_local_applied_sync, | |
370 | Context *on_all_applied, | |
371 | Context *on_all_commit, | |
372 | ceph_tid_t tid, | |
373 | osd_reqid_t reqid, | |
374 | OpRequestRef op | |
375 | ) override; | |
376 | ||
377 | private: | |
378 | Message * generate_subop( | |
379 | const hobject_t &soid, | |
380 | const eversion_t &at_version, | |
381 | ceph_tid_t tid, | |
382 | osd_reqid_t reqid, | |
383 | eversion_t pg_trim_to, | |
384 | eversion_t pg_roll_forward_to, | |
385 | hobject_t new_temp_oid, | |
386 | hobject_t discard_temp_oid, | |
387 | const vector<pg_log_entry_t> &log_entries, | |
388 | boost::optional<pg_hit_set_history_t> &hset_history, | |
389 | ObjectStore::Transaction &op_t, | |
390 | pg_shard_t peer, | |
391 | const pg_info_t &pinfo); | |
392 | void issue_op( | |
393 | const hobject_t &soid, | |
394 | const eversion_t &at_version, | |
395 | ceph_tid_t tid, | |
396 | osd_reqid_t reqid, | |
397 | eversion_t pg_trim_to, | |
398 | eversion_t pg_roll_forward_to, | |
399 | hobject_t new_temp_oid, | |
400 | hobject_t discard_temp_oid, | |
401 | const vector<pg_log_entry_t> &log_entries, | |
402 | boost::optional<pg_hit_set_history_t> &hset_history, | |
403 | InProgressOp *op, | |
404 | ObjectStore::Transaction &op_t); | |
405 | void op_applied(InProgressOp *op); | |
406 | void op_commit(InProgressOp *op); | |
407 | void do_repop_reply(OpRequestRef op); | |
408 | void do_repop(OpRequestRef op); | |
409 | ||
410 | struct RepModify { | |
411 | OpRequestRef op; | |
412 | bool applied, committed; | |
413 | int ackerosd; | |
414 | eversion_t last_complete; | |
415 | epoch_t epoch_started; | |
416 | ||
417 | ObjectStore::Transaction opt, localt; | |
418 | ||
419 | RepModify() : applied(false), committed(false), ackerosd(-1), | |
420 | epoch_started(0) {} | |
421 | }; | |
422 | typedef ceph::shared_ptr<RepModify> RepModifyRef; | |
423 | ||
424 | struct C_OSD_RepModifyApply; | |
425 | struct C_OSD_RepModifyCommit; | |
426 | ||
427 | void repop_applied(RepModifyRef rm); | |
428 | void repop_commit(RepModifyRef rm); | |
429 | bool scrub_supported() override { return true; } | |
430 | bool auto_repair_supported() const override { return false; } | |
431 | ||
432 | ||
433 | void be_deep_scrub( | |
434 | const hobject_t &obj, | |
435 | uint32_t seed, | |
436 | ScrubMap::object &o, | |
437 | ThreadPool::TPHandle &handle) override; | |
438 | uint64_t be_get_ondisk_size(uint64_t logical_size) override { return logical_size; } | |
439 | }; | |
440 | ||
441 | #endif |