]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2013 Inktank Storage, Inc. | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef REPBACKEND_H | |
16 | #define REPBACKEND_H | |
17 | ||
18 | #include "OSD.h" | |
19 | #include "PGBackend.h" | |
20 | #include "include/memory.h" | |
21 | ||
22 | struct C_ReplicatedBackend_OnPullComplete; | |
23 | class ReplicatedBackend : public PGBackend { | |
24 | struct RPGHandle : public PGBackend::RecoveryHandle { | |
25 | map<pg_shard_t, vector<PushOp> > pushes; | |
26 | map<pg_shard_t, vector<PullOp> > pulls; | |
27 | }; | |
28 | friend struct C_ReplicatedBackend_OnPullComplete; | |
29 | public: | |
30 | ReplicatedBackend( | |
31 | PGBackend::Listener *pg, | |
32 | coll_t coll, | |
33 | ObjectStore::CollectionHandle &ch, | |
34 | ObjectStore *store, | |
35 | CephContext *cct); | |
36 | ||
37 | /// @see PGBackend::open_recovery_op | |
38 | RPGHandle *_open_recovery_op() { | |
39 | return new RPGHandle(); | |
40 | } | |
41 | PGBackend::RecoveryHandle *open_recovery_op() override { | |
42 | return _open_recovery_op(); | |
43 | } | |
44 | ||
45 | /// @see PGBackend::run_recovery_op | |
46 | void run_recovery_op( | |
47 | PGBackend::RecoveryHandle *h, | |
48 | int priority) override; | |
49 | ||
50 | /// @see PGBackend::recover_object | |
224ce89b | 51 | int recover_object( |
7c673cae FG |
52 | const hobject_t &hoid, |
53 | eversion_t v, | |
54 | ObjectContextRef head, | |
55 | ObjectContextRef obc, | |
56 | RecoveryHandle *h | |
57 | ) override; | |
58 | ||
59 | void check_recovery_sources(const OSDMapRef& osdmap) override; | |
60 | ||
61 | /// @see PGBackend::delay_message_until_active | |
62 | bool can_handle_while_inactive(OpRequestRef op) override; | |
63 | ||
64 | /// @see PGBackend::handle_message | |
65 | bool handle_message( | |
66 | OpRequestRef op | |
67 | ) override; | |
68 | ||
69 | void on_change() override; | |
70 | void clear_recovery_state() override; | |
71 | void on_flushed() override; | |
72 | ||
73 | class RPCRecPred : public IsPGRecoverablePredicate { | |
74 | public: | |
75 | bool operator()(const set<pg_shard_t> &have) const override { | |
76 | return !have.empty(); | |
77 | } | |
78 | }; | |
79 | IsPGRecoverablePredicate *get_is_recoverable_predicate() override { | |
80 | return new RPCRecPred; | |
81 | } | |
82 | ||
83 | class RPCReadPred : public IsPGReadablePredicate { | |
84 | pg_shard_t whoami; | |
85 | public: | |
86 | explicit RPCReadPred(pg_shard_t whoami) : whoami(whoami) {} | |
87 | bool operator()(const set<pg_shard_t> &have) const override { | |
88 | return have.count(whoami); | |
89 | } | |
90 | }; | |
91 | IsPGReadablePredicate *get_is_readable_predicate() override { | |
92 | return new RPCReadPred(get_parent()->whoami_shard()); | |
93 | } | |
94 | ||
95 | void dump_recovery_info(Formatter *f) const override { | |
96 | { | |
97 | f->open_array_section("pull_from_peer"); | |
98 | for (map<pg_shard_t, set<hobject_t> >::const_iterator i = pull_from_peer.begin(); | |
99 | i != pull_from_peer.end(); | |
100 | ++i) { | |
101 | f->open_object_section("pulling_from"); | |
102 | f->dump_stream("pull_from") << i->first; | |
103 | { | |
104 | f->open_array_section("pulls"); | |
105 | for (set<hobject_t>::const_iterator j = i->second.begin(); | |
106 | j != i->second.end(); | |
107 | ++j) { | |
108 | f->open_object_section("pull_info"); | |
109 | assert(pulling.count(*j)); | |
110 | pulling.find(*j)->second.dump(f); | |
111 | f->close_section(); | |
112 | } | |
113 | f->close_section(); | |
114 | } | |
115 | f->close_section(); | |
116 | } | |
117 | f->close_section(); | |
118 | } | |
119 | { | |
120 | f->open_array_section("pushing"); | |
121 | for (map<hobject_t, map<pg_shard_t, PushInfo>>::const_iterator i = | |
122 | pushing.begin(); | |
123 | i != pushing.end(); | |
124 | ++i) { | |
125 | f->open_object_section("object"); | |
126 | f->dump_stream("pushing") << i->first; | |
127 | { | |
128 | f->open_array_section("pushing_to"); | |
129 | for (map<pg_shard_t, PushInfo>::const_iterator j = i->second.begin(); | |
130 | j != i->second.end(); | |
131 | ++j) { | |
132 | f->open_object_section("push_progress"); | |
133 | f->dump_stream("pushing_to") << j->first; | |
134 | { | |
135 | f->open_object_section("push_info"); | |
136 | j->second.dump(f); | |
137 | f->close_section(); | |
138 | } | |
139 | f->close_section(); | |
140 | } | |
141 | f->close_section(); | |
142 | } | |
143 | f->close_section(); | |
144 | } | |
145 | f->close_section(); | |
146 | } | |
147 | } | |
148 | ||
149 | int objects_read_sync( | |
150 | const hobject_t &hoid, | |
151 | uint64_t off, | |
152 | uint64_t len, | |
153 | uint32_t op_flags, | |
154 | bufferlist *bl) override; | |
155 | ||
156 | void objects_read_async( | |
157 | const hobject_t &hoid, | |
158 | const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>, | |
159 | pair<bufferlist*, Context*> > > &to_read, | |
160 | Context *on_complete, | |
161 | bool fast_read = false) override; | |
162 | ||
163 | private: | |
164 | // push | |
165 | struct PushInfo { | |
166 | ObjectRecoveryProgress recovery_progress; | |
167 | ObjectRecoveryInfo recovery_info; | |
168 | ObjectContextRef obc; | |
169 | object_stat_sum_t stat; | |
170 | ObcLockManager lock_manager; | |
171 | ||
172 | void dump(Formatter *f) const { | |
173 | { | |
174 | f->open_object_section("recovery_progress"); | |
175 | recovery_progress.dump(f); | |
176 | f->close_section(); | |
177 | } | |
178 | { | |
179 | f->open_object_section("recovery_info"); | |
180 | recovery_info.dump(f); | |
181 | f->close_section(); | |
182 | } | |
183 | } | |
184 | }; | |
185 | map<hobject_t, map<pg_shard_t, PushInfo>> pushing; | |
186 | ||
187 | // pull | |
188 | struct PullInfo { | |
189 | pg_shard_t from; | |
190 | hobject_t soid; | |
191 | ObjectRecoveryProgress recovery_progress; | |
192 | ObjectRecoveryInfo recovery_info; | |
193 | ObjectContextRef head_ctx; | |
194 | ObjectContextRef obc; | |
195 | object_stat_sum_t stat; | |
196 | bool cache_dont_need; | |
197 | ObcLockManager lock_manager; | |
198 | ||
199 | void dump(Formatter *f) const { | |
200 | { | |
201 | f->open_object_section("recovery_progress"); | |
202 | recovery_progress.dump(f); | |
203 | f->close_section(); | |
204 | } | |
205 | { | |
206 | f->open_object_section("recovery_info"); | |
207 | recovery_info.dump(f); | |
208 | f->close_section(); | |
209 | } | |
210 | } | |
211 | ||
212 | bool is_complete() const { | |
213 | return recovery_progress.is_complete(recovery_info); | |
214 | } | |
215 | }; | |
216 | ||
217 | map<hobject_t, PullInfo> pulling; | |
218 | ||
219 | // Reverse mapping from osd peer to objects beging pulled from that peer | |
220 | map<pg_shard_t, set<hobject_t> > pull_from_peer; | |
221 | void clear_pull( | |
222 | map<hobject_t, PullInfo>::iterator piter, | |
223 | bool clear_pull_from_peer = true); | |
224 | void clear_pull_from( | |
225 | map<hobject_t, PullInfo>::iterator piter); | |
226 | ||
227 | void _do_push(OpRequestRef op); | |
228 | void _do_pull_response(OpRequestRef op); | |
229 | void do_push(OpRequestRef op) { | |
230 | if (is_primary()) { | |
231 | _do_pull_response(op); | |
232 | } else { | |
233 | _do_push(op); | |
234 | } | |
235 | } | |
236 | void do_pull(OpRequestRef op); | |
237 | void do_push_reply(OpRequestRef op); | |
238 | ||
239 | bool handle_push_reply(pg_shard_t peer, const PushReplyOp &op, PushOp *reply); | |
240 | void handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply); | |
241 | ||
242 | struct pull_complete_info { | |
243 | hobject_t hoid; | |
244 | object_stat_sum_t stat; | |
245 | }; | |
246 | bool handle_pull_response( | |
247 | pg_shard_t from, const PushOp &op, PullOp *response, | |
248 | list<pull_complete_info> *to_continue, | |
249 | ObjectStore::Transaction *t); | |
250 | void handle_push(pg_shard_t from, const PushOp &op, PushReplyOp *response, | |
251 | ObjectStore::Transaction *t); | |
252 | ||
253 | static void trim_pushed_data(const interval_set<uint64_t> ©_subset, | |
254 | const interval_set<uint64_t> &intervals_received, | |
255 | bufferlist data_received, | |
256 | interval_set<uint64_t> *intervals_usable, | |
257 | bufferlist *data_usable); | |
224ce89b | 258 | void _failed_pull(pg_shard_t from, const hobject_t &soid); |
7c673cae FG |
259 | |
260 | void send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes); | |
261 | void prep_push_op_blank(const hobject_t& soid, PushOp *op); | |
262 | void send_pulls( | |
263 | int priority, | |
264 | map<pg_shard_t, vector<PullOp> > &pulls); | |
265 | ||
266 | int build_push_op(const ObjectRecoveryInfo &recovery_info, | |
267 | const ObjectRecoveryProgress &progress, | |
268 | ObjectRecoveryProgress *out_progress, | |
269 | PushOp *out_op, | |
270 | object_stat_sum_t *stat = 0, | |
271 | bool cache_dont_need = true); | |
272 | void submit_push_data(const ObjectRecoveryInfo &recovery_info, | |
273 | bool first, | |
274 | bool complete, | |
275 | bool cache_dont_need, | |
276 | const interval_set<uint64_t> &intervals_included, | |
277 | bufferlist data_included, | |
278 | bufferlist omap_header, | |
279 | const map<string, bufferlist> &attrs, | |
280 | const map<string, bufferlist> &omap_entries, | |
281 | ObjectStore::Transaction *t); | |
282 | void submit_push_complete(const ObjectRecoveryInfo &recovery_info, | |
283 | ObjectStore::Transaction *t); | |
284 | ||
285 | void calc_clone_subsets( | |
286 | SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing, | |
287 | const hobject_t &last_backfill, | |
288 | interval_set<uint64_t>& data_subset, | |
289 | map<hobject_t, interval_set<uint64_t>>& clone_subsets, | |
290 | ObcLockManager &lock_manager); | |
291 | void prepare_pull( | |
292 | eversion_t v, | |
293 | const hobject_t& soid, | |
294 | ObjectContextRef headctx, | |
295 | RPGHandle *h); | |
296 | int start_pushes( | |
297 | const hobject_t &soid, | |
298 | ObjectContextRef obj, | |
299 | RPGHandle *h); | |
224ce89b | 300 | int prep_push_to_replica( |
7c673cae FG |
301 | ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer, |
302 | PushOp *pop, bool cache_dont_need = true); | |
224ce89b | 303 | int prep_push( |
7c673cae FG |
304 | ObjectContextRef obc, |
305 | const hobject_t& oid, pg_shard_t dest, | |
306 | PushOp *op, | |
307 | bool cache_dont_need); | |
224ce89b | 308 | int prep_push( |
7c673cae FG |
309 | ObjectContextRef obc, |
310 | const hobject_t& soid, pg_shard_t peer, | |
311 | eversion_t version, | |
312 | interval_set<uint64_t> &data_subset, | |
313 | map<hobject_t, interval_set<uint64_t>>& clone_subsets, | |
314 | PushOp *op, | |
315 | bool cache, | |
316 | ObcLockManager &&lock_manager); | |
317 | void calc_head_subsets( | |
318 | ObjectContextRef obc, SnapSet& snapset, const hobject_t& head, | |
319 | const pg_missing_t& missing, | |
320 | const hobject_t &last_backfill, | |
321 | interval_set<uint64_t>& data_subset, | |
322 | map<hobject_t, interval_set<uint64_t>>& clone_subsets, | |
323 | ObcLockManager &lock_manager); | |
324 | ObjectRecoveryInfo recalc_subsets( | |
325 | const ObjectRecoveryInfo& recovery_info, | |
326 | SnapSetContext *ssc, | |
327 | ObcLockManager &lock_manager); | |
328 | ||
329 | /** | |
330 | * Client IO | |
331 | */ | |
332 | struct InProgressOp { | |
333 | ceph_tid_t tid; | |
334 | set<pg_shard_t> waiting_for_commit; | |
335 | set<pg_shard_t> waiting_for_applied; | |
336 | Context *on_commit; | |
337 | Context *on_applied; | |
338 | OpRequestRef op; | |
339 | eversion_t v; | |
340 | InProgressOp( | |
341 | ceph_tid_t tid, Context *on_commit, Context *on_applied, | |
342 | OpRequestRef op, eversion_t v) | |
343 | : tid(tid), on_commit(on_commit), on_applied(on_applied), | |
344 | op(op), v(v) {} | |
345 | bool done() const { | |
346 | return waiting_for_commit.empty() && | |
347 | waiting_for_applied.empty(); | |
348 | } | |
349 | }; | |
350 | map<ceph_tid_t, InProgressOp> in_progress_ops; | |
351 | public: | |
352 | friend class C_OSD_OnOpCommit; | |
353 | friend class C_OSD_OnOpApplied; | |
354 | ||
355 | void call_write_ordered(std::function<void(void)> &&cb) override { | |
356 | // ReplicatedBackend submits writes inline in submit_transaction, so | |
357 | // we can just call the callback. | |
358 | cb(); | |
359 | } | |
360 | ||
361 | void submit_transaction( | |
362 | const hobject_t &hoid, | |
363 | const object_stat_sum_t &delta_stats, | |
364 | const eversion_t &at_version, | |
365 | PGTransactionUPtr &&t, | |
366 | const eversion_t &trim_to, | |
367 | const eversion_t &roll_forward_to, | |
368 | const vector<pg_log_entry_t> &log_entries, | |
369 | boost::optional<pg_hit_set_history_t> &hset_history, | |
370 | Context *on_local_applied_sync, | |
371 | Context *on_all_applied, | |
372 | Context *on_all_commit, | |
373 | ceph_tid_t tid, | |
374 | osd_reqid_t reqid, | |
375 | OpRequestRef op | |
376 | ) override; | |
377 | ||
378 | private: | |
379 | Message * generate_subop( | |
380 | const hobject_t &soid, | |
381 | const eversion_t &at_version, | |
382 | ceph_tid_t tid, | |
383 | osd_reqid_t reqid, | |
384 | eversion_t pg_trim_to, | |
385 | eversion_t pg_roll_forward_to, | |
386 | hobject_t new_temp_oid, | |
387 | hobject_t discard_temp_oid, | |
388 | const vector<pg_log_entry_t> &log_entries, | |
389 | boost::optional<pg_hit_set_history_t> &hset_history, | |
390 | ObjectStore::Transaction &op_t, | |
391 | pg_shard_t peer, | |
392 | const pg_info_t &pinfo); | |
393 | void issue_op( | |
394 | const hobject_t &soid, | |
395 | const eversion_t &at_version, | |
396 | ceph_tid_t tid, | |
397 | osd_reqid_t reqid, | |
398 | eversion_t pg_trim_to, | |
399 | eversion_t pg_roll_forward_to, | |
400 | hobject_t new_temp_oid, | |
401 | hobject_t discard_temp_oid, | |
402 | const vector<pg_log_entry_t> &log_entries, | |
403 | boost::optional<pg_hit_set_history_t> &hset_history, | |
404 | InProgressOp *op, | |
405 | ObjectStore::Transaction &op_t); | |
406 | void op_applied(InProgressOp *op); | |
407 | void op_commit(InProgressOp *op); | |
408 | void do_repop_reply(OpRequestRef op); | |
409 | void do_repop(OpRequestRef op); | |
410 | ||
411 | struct RepModify { | |
412 | OpRequestRef op; | |
413 | bool applied, committed; | |
414 | int ackerosd; | |
415 | eversion_t last_complete; | |
416 | epoch_t epoch_started; | |
417 | ||
418 | ObjectStore::Transaction opt, localt; | |
419 | ||
420 | RepModify() : applied(false), committed(false), ackerosd(-1), | |
421 | epoch_started(0) {} | |
422 | }; | |
423 | typedef ceph::shared_ptr<RepModify> RepModifyRef; | |
424 | ||
425 | struct C_OSD_RepModifyApply; | |
426 | struct C_OSD_RepModifyCommit; | |
427 | ||
428 | void repop_applied(RepModifyRef rm); | |
429 | void repop_commit(RepModifyRef rm); | |
430 | bool scrub_supported() override { return true; } | |
431 | bool auto_repair_supported() const override { return false; } | |
432 | ||
433 | ||
434 | void be_deep_scrub( | |
435 | const hobject_t &obj, | |
436 | uint32_t seed, | |
437 | ScrubMap::object &o, | |
438 | ThreadPool::TPHandle &handle) override; | |
439 | uint64_t be_get_ondisk_size(uint64_t logical_size) override { return logical_size; } | |
440 | }; | |
441 | ||
442 | #endif |