]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/ReplicatedBackend.h
update sources to 12.2.7
[ceph.git] / ceph / src / osd / ReplicatedBackend.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Inktank Storage, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef REPBACKEND_H
16#define REPBACKEND_H
17
18#include "OSD.h"
19#include "PGBackend.h"
20#include "include/memory.h"
21
22struct C_ReplicatedBackend_OnPullComplete;
23class ReplicatedBackend : public PGBackend {
24 struct RPGHandle : public PGBackend::RecoveryHandle {
25 map<pg_shard_t, vector<PushOp> > pushes;
26 map<pg_shard_t, vector<PullOp> > pulls;
27 };
28 friend struct C_ReplicatedBackend_OnPullComplete;
29public:
30 ReplicatedBackend(
31 PGBackend::Listener *pg,
32 coll_t coll,
33 ObjectStore::CollectionHandle &ch,
34 ObjectStore *store,
35 CephContext *cct);
36
37 /// @see PGBackend::open_recovery_op
38 RPGHandle *_open_recovery_op() {
39 return new RPGHandle();
40 }
41 PGBackend::RecoveryHandle *open_recovery_op() override {
42 return _open_recovery_op();
43 }
44
45 /// @see PGBackend::run_recovery_op
46 void run_recovery_op(
47 PGBackend::RecoveryHandle *h,
48 int priority) override;
49
50 /// @see PGBackend::recover_object
224ce89b 51 int recover_object(
7c673cae
FG
52 const hobject_t &hoid,
53 eversion_t v,
54 ObjectContextRef head,
55 ObjectContextRef obc,
56 RecoveryHandle *h
57 ) override;
58
59 void check_recovery_sources(const OSDMapRef& osdmap) override;
60
7c673cae
FG
61 bool can_handle_while_inactive(OpRequestRef op) override;
62
63 /// @see PGBackend::handle_message
c07f9fc5 64 bool _handle_message(
7c673cae
FG
65 OpRequestRef op
66 ) override;
67
68 void on_change() override;
69 void clear_recovery_state() override;
70 void on_flushed() override;
71
72 class RPCRecPred : public IsPGRecoverablePredicate {
73 public:
74 bool operator()(const set<pg_shard_t> &have) const override {
75 return !have.empty();
76 }
77 };
78 IsPGRecoverablePredicate *get_is_recoverable_predicate() override {
79 return new RPCRecPred;
80 }
81
82 class RPCReadPred : public IsPGReadablePredicate {
83 pg_shard_t whoami;
84 public:
85 explicit RPCReadPred(pg_shard_t whoami) : whoami(whoami) {}
86 bool operator()(const set<pg_shard_t> &have) const override {
87 return have.count(whoami);
88 }
89 };
90 IsPGReadablePredicate *get_is_readable_predicate() override {
91 return new RPCReadPred(get_parent()->whoami_shard());
92 }
93
94 void dump_recovery_info(Formatter *f) const override {
95 {
96 f->open_array_section("pull_from_peer");
97 for (map<pg_shard_t, set<hobject_t> >::const_iterator i = pull_from_peer.begin();
98 i != pull_from_peer.end();
99 ++i) {
100 f->open_object_section("pulling_from");
101 f->dump_stream("pull_from") << i->first;
102 {
103 f->open_array_section("pulls");
104 for (set<hobject_t>::const_iterator j = i->second.begin();
105 j != i->second.end();
106 ++j) {
107 f->open_object_section("pull_info");
108 assert(pulling.count(*j));
109 pulling.find(*j)->second.dump(f);
110 f->close_section();
111 }
112 f->close_section();
113 }
114 f->close_section();
115 }
116 f->close_section();
117 }
118 {
119 f->open_array_section("pushing");
120 for (map<hobject_t, map<pg_shard_t, PushInfo>>::const_iterator i =
121 pushing.begin();
122 i != pushing.end();
123 ++i) {
124 f->open_object_section("object");
125 f->dump_stream("pushing") << i->first;
126 {
127 f->open_array_section("pushing_to");
128 for (map<pg_shard_t, PushInfo>::const_iterator j = i->second.begin();
129 j != i->second.end();
130 ++j) {
131 f->open_object_section("push_progress");
132 f->dump_stream("pushing_to") << j->first;
133 {
134 f->open_object_section("push_info");
135 j->second.dump(f);
136 f->close_section();
137 }
138 f->close_section();
139 }
140 f->close_section();
141 }
142 f->close_section();
143 }
144 f->close_section();
145 }
146 }
147
148 int objects_read_sync(
149 const hobject_t &hoid,
150 uint64_t off,
151 uint64_t len,
152 uint32_t op_flags,
153 bufferlist *bl) override;
154
155 void objects_read_async(
156 const hobject_t &hoid,
157 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
158 pair<bufferlist*, Context*> > > &to_read,
159 Context *on_complete,
160 bool fast_read = false) override;
161
162private:
163 // push
164 struct PushInfo {
165 ObjectRecoveryProgress recovery_progress;
166 ObjectRecoveryInfo recovery_info;
167 ObjectContextRef obc;
168 object_stat_sum_t stat;
169 ObcLockManager lock_manager;
170
171 void dump(Formatter *f) const {
172 {
173 f->open_object_section("recovery_progress");
174 recovery_progress.dump(f);
175 f->close_section();
176 }
177 {
178 f->open_object_section("recovery_info");
179 recovery_info.dump(f);
180 f->close_section();
181 }
182 }
183 };
184 map<hobject_t, map<pg_shard_t, PushInfo>> pushing;
185
186 // pull
187 struct PullInfo {
188 pg_shard_t from;
189 hobject_t soid;
190 ObjectRecoveryProgress recovery_progress;
191 ObjectRecoveryInfo recovery_info;
192 ObjectContextRef head_ctx;
193 ObjectContextRef obc;
194 object_stat_sum_t stat;
195 bool cache_dont_need;
196 ObcLockManager lock_manager;
197
198 void dump(Formatter *f) const {
199 {
200 f->open_object_section("recovery_progress");
201 recovery_progress.dump(f);
202 f->close_section();
203 }
204 {
205 f->open_object_section("recovery_info");
206 recovery_info.dump(f);
207 f->close_section();
208 }
209 }
210
211 bool is_complete() const {
212 return recovery_progress.is_complete(recovery_info);
213 }
214 };
215
216 map<hobject_t, PullInfo> pulling;
217
218 // Reverse mapping from osd peer to objects beging pulled from that peer
219 map<pg_shard_t, set<hobject_t> > pull_from_peer;
220 void clear_pull(
221 map<hobject_t, PullInfo>::iterator piter,
222 bool clear_pull_from_peer = true);
223 void clear_pull_from(
224 map<hobject_t, PullInfo>::iterator piter);
225
226 void _do_push(OpRequestRef op);
227 void _do_pull_response(OpRequestRef op);
228 void do_push(OpRequestRef op) {
229 if (is_primary()) {
230 _do_pull_response(op);
231 } else {
232 _do_push(op);
233 }
234 }
235 void do_pull(OpRequestRef op);
236 void do_push_reply(OpRequestRef op);
237
238 bool handle_push_reply(pg_shard_t peer, const PushReplyOp &op, PushOp *reply);
239 void handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply);
240
241 struct pull_complete_info {
242 hobject_t hoid;
243 object_stat_sum_t stat;
244 };
245 bool handle_pull_response(
246 pg_shard_t from, const PushOp &op, PullOp *response,
247 list<pull_complete_info> *to_continue,
248 ObjectStore::Transaction *t);
249 void handle_push(pg_shard_t from, const PushOp &op, PushReplyOp *response,
250 ObjectStore::Transaction *t);
251
252 static void trim_pushed_data(const interval_set<uint64_t> &copy_subset,
253 const interval_set<uint64_t> &intervals_received,
254 bufferlist data_received,
255 interval_set<uint64_t> *intervals_usable,
256 bufferlist *data_usable);
224ce89b 257 void _failed_pull(pg_shard_t from, const hobject_t &soid);
7c673cae
FG
258
259 void send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes);
260 void prep_push_op_blank(const hobject_t& soid, PushOp *op);
261 void send_pulls(
262 int priority,
263 map<pg_shard_t, vector<PullOp> > &pulls);
264
265 int build_push_op(const ObjectRecoveryInfo &recovery_info,
266 const ObjectRecoveryProgress &progress,
267 ObjectRecoveryProgress *out_progress,
268 PushOp *out_op,
269 object_stat_sum_t *stat = 0,
270 bool cache_dont_need = true);
271 void submit_push_data(const ObjectRecoveryInfo &recovery_info,
272 bool first,
273 bool complete,
274 bool cache_dont_need,
275 const interval_set<uint64_t> &intervals_included,
276 bufferlist data_included,
277 bufferlist omap_header,
278 const map<string, bufferlist> &attrs,
279 const map<string, bufferlist> &omap_entries,
280 ObjectStore::Transaction *t);
281 void submit_push_complete(const ObjectRecoveryInfo &recovery_info,
282 ObjectStore::Transaction *t);
283
284 void calc_clone_subsets(
285 SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing,
286 const hobject_t &last_backfill,
287 interval_set<uint64_t>& data_subset,
288 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
289 ObcLockManager &lock_manager);
290 void prepare_pull(
291 eversion_t v,
292 const hobject_t& soid,
293 ObjectContextRef headctx,
294 RPGHandle *h);
295 int start_pushes(
296 const hobject_t &soid,
297 ObjectContextRef obj,
298 RPGHandle *h);
224ce89b 299 int prep_push_to_replica(
7c673cae
FG
300 ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
301 PushOp *pop, bool cache_dont_need = true);
224ce89b 302 int prep_push(
7c673cae
FG
303 ObjectContextRef obc,
304 const hobject_t& oid, pg_shard_t dest,
305 PushOp *op,
306 bool cache_dont_need);
224ce89b 307 int prep_push(
7c673cae
FG
308 ObjectContextRef obc,
309 const hobject_t& soid, pg_shard_t peer,
310 eversion_t version,
311 interval_set<uint64_t> &data_subset,
312 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
313 PushOp *op,
314 bool cache,
315 ObcLockManager &&lock_manager);
316 void calc_head_subsets(
317 ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
318 const pg_missing_t& missing,
319 const hobject_t &last_backfill,
320 interval_set<uint64_t>& data_subset,
321 map<hobject_t, interval_set<uint64_t>>& clone_subsets,
322 ObcLockManager &lock_manager);
323 ObjectRecoveryInfo recalc_subsets(
324 const ObjectRecoveryInfo& recovery_info,
325 SnapSetContext *ssc,
326 ObcLockManager &lock_manager);
327
328 /**
329 * Client IO
330 */
331 struct InProgressOp {
332 ceph_tid_t tid;
333 set<pg_shard_t> waiting_for_commit;
334 set<pg_shard_t> waiting_for_applied;
335 Context *on_commit;
336 Context *on_applied;
337 OpRequestRef op;
338 eversion_t v;
339 InProgressOp(
340 ceph_tid_t tid, Context *on_commit, Context *on_applied,
341 OpRequestRef op, eversion_t v)
342 : tid(tid), on_commit(on_commit), on_applied(on_applied),
343 op(op), v(v) {}
344 bool done() const {
345 return waiting_for_commit.empty() &&
346 waiting_for_applied.empty();
347 }
348 };
349 map<ceph_tid_t, InProgressOp> in_progress_ops;
350public:
351 friend class C_OSD_OnOpCommit;
352 friend class C_OSD_OnOpApplied;
353
354 void call_write_ordered(std::function<void(void)> &&cb) override {
355 // ReplicatedBackend submits writes inline in submit_transaction, so
356 // we can just call the callback.
357 cb();
358 }
359
360 void submit_transaction(
361 const hobject_t &hoid,
362 const object_stat_sum_t &delta_stats,
363 const eversion_t &at_version,
364 PGTransactionUPtr &&t,
365 const eversion_t &trim_to,
366 const eversion_t &roll_forward_to,
367 const vector<pg_log_entry_t> &log_entries,
368 boost::optional<pg_hit_set_history_t> &hset_history,
369 Context *on_local_applied_sync,
370 Context *on_all_applied,
371 Context *on_all_commit,
372 ceph_tid_t tid,
373 osd_reqid_t reqid,
374 OpRequestRef op
375 ) override;
376
377private:
378 Message * generate_subop(
379 const hobject_t &soid,
380 const eversion_t &at_version,
381 ceph_tid_t tid,
382 osd_reqid_t reqid,
383 eversion_t pg_trim_to,
384 eversion_t pg_roll_forward_to,
385 hobject_t new_temp_oid,
386 hobject_t discard_temp_oid,
387 const vector<pg_log_entry_t> &log_entries,
388 boost::optional<pg_hit_set_history_t> &hset_history,
389 ObjectStore::Transaction &op_t,
390 pg_shard_t peer,
391 const pg_info_t &pinfo);
392 void issue_op(
393 const hobject_t &soid,
394 const eversion_t &at_version,
395 ceph_tid_t tid,
396 osd_reqid_t reqid,
397 eversion_t pg_trim_to,
398 eversion_t pg_roll_forward_to,
399 hobject_t new_temp_oid,
400 hobject_t discard_temp_oid,
401 const vector<pg_log_entry_t> &log_entries,
402 boost::optional<pg_hit_set_history_t> &hset_history,
403 InProgressOp *op,
404 ObjectStore::Transaction &op_t);
405 void op_applied(InProgressOp *op);
406 void op_commit(InProgressOp *op);
407 void do_repop_reply(OpRequestRef op);
408 void do_repop(OpRequestRef op);
409
410 struct RepModify {
411 OpRequestRef op;
412 bool applied, committed;
413 int ackerosd;
414 eversion_t last_complete;
415 epoch_t epoch_started;
416
417 ObjectStore::Transaction opt, localt;
418
419 RepModify() : applied(false), committed(false), ackerosd(-1),
420 epoch_started(0) {}
421 };
422 typedef ceph::shared_ptr<RepModify> RepModifyRef;
423
424 struct C_OSD_RepModifyApply;
425 struct C_OSD_RepModifyCommit;
426
427 void repop_applied(RepModifyRef rm);
428 void repop_commit(RepModifyRef rm);
429 bool scrub_supported() override { return true; }
430 bool auto_repair_supported() const override { return false; }
431
432
28e407b8
AA
433 int be_deep_scrub(
434 const hobject_t &poid,
435 ScrubMap &map,
436 ScrubMapBuilder &pos,
437 ScrubMap::object &o) override;
7c673cae
FG
438 uint64_t be_get_ondisk_size(uint64_t logical_size) override { return logical_size; }
439};
440
441#endif