]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/ReplicatedBackend.h
update sources to v12.1.1
[ceph.git] / ceph / src / osd / ReplicatedBackend.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef REPBACKEND_H
16 #define REPBACKEND_H
17
18 #include "OSD.h"
19 #include "PGBackend.h"
20 #include "include/memory.h"
21
22 struct C_ReplicatedBackend_OnPullComplete;
23 class ReplicatedBackend : public PGBackend {
24 struct RPGHandle : public PGBackend::RecoveryHandle {
25 map<pg_shard_t, vector<PushOp> > pushes;
26 map<pg_shard_t, vector<PullOp> > pulls;
27 };
28 friend struct C_ReplicatedBackend_OnPullComplete;
29 public:
30 ReplicatedBackend(
31 PGBackend::Listener *pg,
32 coll_t coll,
33 ObjectStore::CollectionHandle &ch,
34 ObjectStore *store,
35 CephContext *cct);
36
37 /// @see PGBackend::open_recovery_op
38 RPGHandle *_open_recovery_op() {
39 return new RPGHandle();
40 }
41 PGBackend::RecoveryHandle *open_recovery_op() override {
42 return _open_recovery_op();
43 }
44
45 /// @see PGBackend::run_recovery_op
46 void run_recovery_op(
47 PGBackend::RecoveryHandle *h,
48 int priority) override;
49
50 /// @see PGBackend::recover_object
51 int recover_object(
52 const hobject_t &hoid,
53 eversion_t v,
54 ObjectContextRef head,
55 ObjectContextRef obc,
56 RecoveryHandle *h
57 ) override;
58
59 void check_recovery_sources(const OSDMapRef& osdmap) override;
60
61 /// @see PGBackend::delay_message_until_active
62 bool can_handle_while_inactive(OpRequestRef op) override;
63
64 /// @see PGBackend::handle_message
65 bool handle_message(
66 OpRequestRef op
67 ) override;
68
69 void on_change() override;
70 void clear_recovery_state() override;
71 void on_flushed() override;
72
73 class RPCRecPred : public IsPGRecoverablePredicate {
74 public:
75 bool operator()(const set<pg_shard_t> &have) const override {
76 return !have.empty();
77 }
78 };
79 IsPGRecoverablePredicate *get_is_recoverable_predicate() override {
80 return new RPCRecPred;
81 }
82
83 class RPCReadPred : public IsPGReadablePredicate {
84 pg_shard_t whoami;
85 public:
86 explicit RPCReadPred(pg_shard_t whoami) : whoami(whoami) {}
87 bool operator()(const set<pg_shard_t> &have) const override {
88 return have.count(whoami);
89 }
90 };
91 IsPGReadablePredicate *get_is_readable_predicate() override {
92 return new RPCReadPred(get_parent()->whoami_shard());
93 }
94
95 void dump_recovery_info(Formatter *f) const override {
96 {
97 f->open_array_section("pull_from_peer");
98 for (map<pg_shard_t, set<hobject_t> >::const_iterator i = pull_from_peer.begin();
99 i != pull_from_peer.end();
100 ++i) {
101 f->open_object_section("pulling_from");
102 f->dump_stream("pull_from") << i->first;
103 {
104 f->open_array_section("pulls");
105 for (set<hobject_t>::const_iterator j = i->second.begin();
106 j != i->second.end();
107 ++j) {
108 f->open_object_section("pull_info");
109 assert(pulling.count(*j));
110 pulling.find(*j)->second.dump(f);
111 f->close_section();
112 }
113 f->close_section();
114 }
115 f->close_section();
116 }
117 f->close_section();
118 }
119 {
120 f->open_array_section("pushing");
121 for (map<hobject_t, map<pg_shard_t, PushInfo>>::const_iterator i =
122 pushing.begin();
123 i != pushing.end();
124 ++i) {
125 f->open_object_section("object");
126 f->dump_stream("pushing") << i->first;
127 {
128 f->open_array_section("pushing_to");
129 for (map<pg_shard_t, PushInfo>::const_iterator j = i->second.begin();
130 j != i->second.end();
131 ++j) {
132 f->open_object_section("push_progress");
133 f->dump_stream("pushing_to") << j->first;
134 {
135 f->open_object_section("push_info");
136 j->second.dump(f);
137 f->close_section();
138 }
139 f->close_section();
140 }
141 f->close_section();
142 }
143 f->close_section();
144 }
145 f->close_section();
146 }
147 }
148
149 int objects_read_sync(
150 const hobject_t &hoid,
151 uint64_t off,
152 uint64_t len,
153 uint32_t op_flags,
154 bufferlist *bl) override;
155
156 void objects_read_async(
157 const hobject_t &hoid,
158 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
159 pair<bufferlist*, Context*> > > &to_read,
160 Context *on_complete,
161 bool fast_read = false) override;
162
163 private:
164 // push
165 struct PushInfo {
166 ObjectRecoveryProgress recovery_progress;
167 ObjectRecoveryInfo recovery_info;
168 ObjectContextRef obc;
169 object_stat_sum_t stat;
170 ObcLockManager lock_manager;
171
172 void dump(Formatter *f) const {
173 {
174 f->open_object_section("recovery_progress");
175 recovery_progress.dump(f);
176 f->close_section();
177 }
178 {
179 f->open_object_section("recovery_info");
180 recovery_info.dump(f);
181 f->close_section();
182 }
183 }
184 };
185 map<hobject_t, map<pg_shard_t, PushInfo>> pushing;
186
187 // pull
188 struct PullInfo {
189 pg_shard_t from;
190 hobject_t soid;
191 ObjectRecoveryProgress recovery_progress;
192 ObjectRecoveryInfo recovery_info;
193 ObjectContextRef head_ctx;
194 ObjectContextRef obc;
195 object_stat_sum_t stat;
196 bool cache_dont_need;
197 ObcLockManager lock_manager;
198
199 void dump(Formatter *f) const {
200 {
201 f->open_object_section("recovery_progress");
202 recovery_progress.dump(f);
203 f->close_section();
204 }
205 {
206 f->open_object_section("recovery_info");
207 recovery_info.dump(f);
208 f->close_section();
209 }
210 }
211
212 bool is_complete() const {
213 return recovery_progress.is_complete(recovery_info);
214 }
215 };
216
217 map<hobject_t, PullInfo> pulling;
218
219 // Reverse mapping from osd peer to objects beging pulled from that peer
220 map<pg_shard_t, set<hobject_t> > pull_from_peer;
221 void clear_pull(
222 map<hobject_t, PullInfo>::iterator piter,
223 bool clear_pull_from_peer = true);
224 void clear_pull_from(
225 map<hobject_t, PullInfo>::iterator piter);
226
227 void _do_push(OpRequestRef op);
228 void _do_pull_response(OpRequestRef op);
229 void do_push(OpRequestRef op) {
230 if (is_primary()) {
231 _do_pull_response(op);
232 } else {
233 _do_push(op);
234 }
235 }
236 void do_pull(OpRequestRef op);
237 void do_push_reply(OpRequestRef op);
238
239 bool handle_push_reply(pg_shard_t peer, const PushReplyOp &op, PushOp *reply);
240 void handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply);
241
242 struct pull_complete_info {
243 hobject_t hoid;
244 object_stat_sum_t stat;
245 };
246 bool handle_pull_response(
247 pg_shard_t from, const PushOp &op, PullOp *response,
248 list<pull_complete_info> *to_continue,
249 ObjectStore::Transaction *t);
250 void handle_push(pg_shard_t from, const PushOp &op, PushReplyOp *response,
251 ObjectStore::Transaction *t);
252
253 static void trim_pushed_data(const interval_set<uint64_t> &copy_subset,
254 const interval_set<uint64_t> &intervals_received,
255 bufferlist data_received,
256 interval_set<uint64_t> *intervals_usable,
257 bufferlist *data_usable);
258 void _failed_pull(pg_shard_t from, const hobject_t &soid);
259
260 void send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes);
261 void prep_push_op_blank(const hobject_t& soid, PushOp *op);
262 void send_pulls(
263 int priority,
264 map<pg_shard_t, vector<PullOp> > &pulls);
265
266 int build_push_op(const ObjectRecoveryInfo &recovery_info,
267 const ObjectRecoveryProgress &progress,
268 ObjectRecoveryProgress *out_progress,
269 PushOp *out_op,
270 object_stat_sum_t *stat = 0,
271 bool cache_dont_need = true);
272 void submit_push_data(const ObjectRecoveryInfo &recovery_info,
273 bool first,
274 bool complete,
275 bool cache_dont_need,
276 const interval_set<uint64_t> &intervals_included,
277 bufferlist data_included,
278 bufferlist omap_header,
279 const map<string, bufferlist> &attrs,
280 const map<string, bufferlist> &omap_entries,
281 ObjectStore::Transaction *t);
282 void submit_push_complete(const ObjectRecoveryInfo &recovery_info,
283 ObjectStore::Transaction *t);
284
285 void calc_clone_subsets(
286 SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing,
287 const hobject_t &last_backfill,
288 interval_set<uint64_t>& data_subset,
289 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
290 ObcLockManager &lock_manager);
291 void prepare_pull(
292 eversion_t v,
293 const hobject_t& soid,
294 ObjectContextRef headctx,
295 RPGHandle *h);
296 int start_pushes(
297 const hobject_t &soid,
298 ObjectContextRef obj,
299 RPGHandle *h);
300 int prep_push_to_replica(
301 ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
302 PushOp *pop, bool cache_dont_need = true);
303 int prep_push(
304 ObjectContextRef obc,
305 const hobject_t& oid, pg_shard_t dest,
306 PushOp *op,
307 bool cache_dont_need);
308 int prep_push(
309 ObjectContextRef obc,
310 const hobject_t& soid, pg_shard_t peer,
311 eversion_t version,
312 interval_set<uint64_t> &data_subset,
313 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
314 PushOp *op,
315 bool cache,
316 ObcLockManager &&lock_manager);
317 void calc_head_subsets(
318 ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
319 const pg_missing_t& missing,
320 const hobject_t &last_backfill,
321 interval_set<uint64_t>& data_subset,
322 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
323 ObcLockManager &lock_manager);
324 ObjectRecoveryInfo recalc_subsets(
325 const ObjectRecoveryInfo& recovery_info,
326 SnapSetContext *ssc,
327 ObcLockManager &lock_manager);
328
329 /**
330 * Client IO
331 */
332 struct InProgressOp {
333 ceph_tid_t tid;
334 set<pg_shard_t> waiting_for_commit;
335 set<pg_shard_t> waiting_for_applied;
336 Context *on_commit;
337 Context *on_applied;
338 OpRequestRef op;
339 eversion_t v;
340 InProgressOp(
341 ceph_tid_t tid, Context *on_commit, Context *on_applied,
342 OpRequestRef op, eversion_t v)
343 : tid(tid), on_commit(on_commit), on_applied(on_applied),
344 op(op), v(v) {}
345 bool done() const {
346 return waiting_for_commit.empty() &&
347 waiting_for_applied.empty();
348 }
349 };
350 map<ceph_tid_t, InProgressOp> in_progress_ops;
351 public:
352 friend class C_OSD_OnOpCommit;
353 friend class C_OSD_OnOpApplied;
354
355 void call_write_ordered(std::function<void(void)> &&cb) override {
356 // ReplicatedBackend submits writes inline in submit_transaction, so
357 // we can just call the callback.
358 cb();
359 }
360
361 void submit_transaction(
362 const hobject_t &hoid,
363 const object_stat_sum_t &delta_stats,
364 const eversion_t &at_version,
365 PGTransactionUPtr &&t,
366 const eversion_t &trim_to,
367 const eversion_t &roll_forward_to,
368 const vector<pg_log_entry_t> &log_entries,
369 boost::optional<pg_hit_set_history_t> &hset_history,
370 Context *on_local_applied_sync,
371 Context *on_all_applied,
372 Context *on_all_commit,
373 ceph_tid_t tid,
374 osd_reqid_t reqid,
375 OpRequestRef op
376 ) override;
377
378 private:
379 Message * generate_subop(
380 const hobject_t &soid,
381 const eversion_t &at_version,
382 ceph_tid_t tid,
383 osd_reqid_t reqid,
384 eversion_t pg_trim_to,
385 eversion_t pg_roll_forward_to,
386 hobject_t new_temp_oid,
387 hobject_t discard_temp_oid,
388 const vector<pg_log_entry_t> &log_entries,
389 boost::optional<pg_hit_set_history_t> &hset_history,
390 ObjectStore::Transaction &op_t,
391 pg_shard_t peer,
392 const pg_info_t &pinfo);
393 void issue_op(
394 const hobject_t &soid,
395 const eversion_t &at_version,
396 ceph_tid_t tid,
397 osd_reqid_t reqid,
398 eversion_t pg_trim_to,
399 eversion_t pg_roll_forward_to,
400 hobject_t new_temp_oid,
401 hobject_t discard_temp_oid,
402 const vector<pg_log_entry_t> &log_entries,
403 boost::optional<pg_hit_set_history_t> &hset_history,
404 InProgressOp *op,
405 ObjectStore::Transaction &op_t);
406 void op_applied(InProgressOp *op);
407 void op_commit(InProgressOp *op);
408 void do_repop_reply(OpRequestRef op);
409 void do_repop(OpRequestRef op);
410
411 struct RepModify {
412 OpRequestRef op;
413 bool applied, committed;
414 int ackerosd;
415 eversion_t last_complete;
416 epoch_t epoch_started;
417
418 ObjectStore::Transaction opt, localt;
419
420 RepModify() : applied(false), committed(false), ackerosd(-1),
421 epoch_started(0) {}
422 };
423 typedef ceph::shared_ptr<RepModify> RepModifyRef;
424
425 struct C_OSD_RepModifyApply;
426 struct C_OSD_RepModifyCommit;
427
428 void repop_applied(RepModifyRef rm);
429 void repop_commit(RepModifyRef rm);
430 bool scrub_supported() override { return true; }
431 bool auto_repair_supported() const override { return false; }
432
433
434 void be_deep_scrub(
435 const hobject_t &obj,
436 uint32_t seed,
437 ScrubMap::object &o,
438 ThreadPool::TPHandle &handle) override;
439 uint64_t be_get_ondisk_size(uint64_t logical_size) override { return logical_size; }
440 };
441
442 #endif