1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
19 #include "PGBackend.h"
20 #include "include/memory.h"
22 struct C_ReplicatedBackend_OnPullComplete
;
23 class ReplicatedBackend
: public PGBackend
{
24 struct RPGHandle
: public PGBackend::RecoveryHandle
{
25 map
<pg_shard_t
, vector
<PushOp
> > pushes
;
26 map
<pg_shard_t
, vector
<PullOp
> > pulls
;
28 friend struct C_ReplicatedBackend_OnPullComplete
;
31 PGBackend::Listener
*pg
,
33 ObjectStore::CollectionHandle
&ch
,
37 /// @see PGBackend::open_recovery_op
38 RPGHandle
*_open_recovery_op() {
39 return new RPGHandle();
41 PGBackend::RecoveryHandle
*open_recovery_op() override
{
42 return _open_recovery_op();
45 /// @see PGBackend::run_recovery_op
47 PGBackend::RecoveryHandle
*h
,
48 int priority
) override
;
50 /// @see PGBackend::recover_object
52 const hobject_t
&hoid
,
54 ObjectContextRef head
,
59 void check_recovery_sources(const OSDMapRef
& osdmap
) override
;
61 /// @see PGBackend::delay_message_until_active
62 bool can_handle_while_inactive(OpRequestRef op
) override
;
64 /// @see PGBackend::handle_message
69 void on_change() override
;
70 void clear_recovery_state() override
;
71 void on_flushed() override
;
73 class RPCRecPred
: public IsPGRecoverablePredicate
{
75 bool operator()(const set
<pg_shard_t
> &have
) const override
{
79 IsPGRecoverablePredicate
*get_is_recoverable_predicate() override
{
80 return new RPCRecPred
;
83 class RPCReadPred
: public IsPGReadablePredicate
{
86 explicit RPCReadPred(pg_shard_t whoami
) : whoami(whoami
) {}
87 bool operator()(const set
<pg_shard_t
> &have
) const override
{
88 return have
.count(whoami
);
91 IsPGReadablePredicate
*get_is_readable_predicate() override
{
92 return new RPCReadPred(get_parent()->whoami_shard());
95 void dump_recovery_info(Formatter
*f
) const override
{
97 f
->open_array_section("pull_from_peer");
98 for (map
<pg_shard_t
, set
<hobject_t
> >::const_iterator i
= pull_from_peer
.begin();
99 i
!= pull_from_peer
.end();
101 f
->open_object_section("pulling_from");
102 f
->dump_stream("pull_from") << i
->first
;
104 f
->open_array_section("pulls");
105 for (set
<hobject_t
>::const_iterator j
= i
->second
.begin();
106 j
!= i
->second
.end();
108 f
->open_object_section("pull_info");
109 assert(pulling
.count(*j
));
110 pulling
.find(*j
)->second
.dump(f
);
120 f
->open_array_section("pushing");
121 for (map
<hobject_t
, map
<pg_shard_t
, PushInfo
>>::const_iterator i
=
125 f
->open_object_section("object");
126 f
->dump_stream("pushing") << i
->first
;
128 f
->open_array_section("pushing_to");
129 for (map
<pg_shard_t
, PushInfo
>::const_iterator j
= i
->second
.begin();
130 j
!= i
->second
.end();
132 f
->open_object_section("push_progress");
133 f
->dump_stream("pushing_to") << j
->first
;
135 f
->open_object_section("push_info");
149 int objects_read_sync(
150 const hobject_t
&hoid
,
154 bufferlist
*bl
) override
;
156 void objects_read_async(
157 const hobject_t
&hoid
,
158 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
159 pair
<bufferlist
*, Context
*> > > &to_read
,
160 Context
*on_complete
,
161 bool fast_read
= false) override
;
166 ObjectRecoveryProgress recovery_progress
;
167 ObjectRecoveryInfo recovery_info
;
168 ObjectContextRef obc
;
169 object_stat_sum_t stat
;
170 ObcLockManager lock_manager
;
172 void dump(Formatter
*f
) const {
174 f
->open_object_section("recovery_progress");
175 recovery_progress
.dump(f
);
179 f
->open_object_section("recovery_info");
180 recovery_info
.dump(f
);
185 map
<hobject_t
, map
<pg_shard_t
, PushInfo
>> pushing
;
191 ObjectRecoveryProgress recovery_progress
;
192 ObjectRecoveryInfo recovery_info
;
193 ObjectContextRef head_ctx
;
194 ObjectContextRef obc
;
195 object_stat_sum_t stat
;
196 bool cache_dont_need
;
197 ObcLockManager lock_manager
;
199 void dump(Formatter
*f
) const {
201 f
->open_object_section("recovery_progress");
202 recovery_progress
.dump(f
);
206 f
->open_object_section("recovery_info");
207 recovery_info
.dump(f
);
212 bool is_complete() const {
213 return recovery_progress
.is_complete(recovery_info
);
217 map
<hobject_t
, PullInfo
> pulling
;
219 // Reverse mapping from osd peer to objects beging pulled from that peer
220 map
<pg_shard_t
, set
<hobject_t
> > pull_from_peer
;
222 map
<hobject_t
, PullInfo
>::iterator piter
,
223 bool clear_pull_from_peer
= true);
224 void clear_pull_from(
225 map
<hobject_t
, PullInfo
>::iterator piter
);
227 void _do_push(OpRequestRef op
);
228 void _do_pull_response(OpRequestRef op
);
229 void do_push(OpRequestRef op
) {
231 _do_pull_response(op
);
236 void do_pull(OpRequestRef op
);
237 void do_push_reply(OpRequestRef op
);
239 bool handle_push_reply(pg_shard_t peer
, const PushReplyOp
&op
, PushOp
*reply
);
240 void handle_pull(pg_shard_t peer
, PullOp
&op
, PushOp
*reply
);
242 struct pull_complete_info
{
244 object_stat_sum_t stat
;
246 bool handle_pull_response(
247 pg_shard_t from
, const PushOp
&op
, PullOp
*response
,
248 list
<pull_complete_info
> *to_continue
,
249 ObjectStore::Transaction
*t
);
250 void handle_push(pg_shard_t from
, const PushOp
&op
, PushReplyOp
*response
,
251 ObjectStore::Transaction
*t
);
253 static void trim_pushed_data(const interval_set
<uint64_t> ©_subset
,
254 const interval_set
<uint64_t> &intervals_received
,
255 bufferlist data_received
,
256 interval_set
<uint64_t> *intervals_usable
,
257 bufferlist
*data_usable
);
258 void _failed_push(pg_shard_t from
, const hobject_t
&soid
);
260 void send_pushes(int prio
, map
<pg_shard_t
, vector
<PushOp
> > &pushes
);
261 void prep_push_op_blank(const hobject_t
& soid
, PushOp
*op
);
264 map
<pg_shard_t
, vector
<PullOp
> > &pulls
);
266 int build_push_op(const ObjectRecoveryInfo
&recovery_info
,
267 const ObjectRecoveryProgress
&progress
,
268 ObjectRecoveryProgress
*out_progress
,
270 object_stat_sum_t
*stat
= 0,
271 bool cache_dont_need
= true);
272 void submit_push_data(const ObjectRecoveryInfo
&recovery_info
,
275 bool cache_dont_need
,
276 const interval_set
<uint64_t> &intervals_included
,
277 bufferlist data_included
,
278 bufferlist omap_header
,
279 const map
<string
, bufferlist
> &attrs
,
280 const map
<string
, bufferlist
> &omap_entries
,
281 ObjectStore::Transaction
*t
);
282 void submit_push_complete(const ObjectRecoveryInfo
&recovery_info
,
283 ObjectStore::Transaction
*t
);
285 void calc_clone_subsets(
286 SnapSet
& snapset
, const hobject_t
& poid
, const pg_missing_t
& missing
,
287 const hobject_t
&last_backfill
,
288 interval_set
<uint64_t>& data_subset
,
289 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
290 ObcLockManager
&lock_manager
);
293 const hobject_t
& soid
,
294 ObjectContextRef headctx
,
297 const hobject_t
&soid
,
298 ObjectContextRef obj
,
300 void prep_push_to_replica(
301 ObjectContextRef obc
, const hobject_t
& soid
, pg_shard_t peer
,
302 PushOp
*pop
, bool cache_dont_need
= true);
304 ObjectContextRef obc
,
305 const hobject_t
& oid
, pg_shard_t dest
,
307 bool cache_dont_need
);
309 ObjectContextRef obc
,
310 const hobject_t
& soid
, pg_shard_t peer
,
312 interval_set
<uint64_t> &data_subset
,
313 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
316 ObcLockManager
&&lock_manager
);
317 void calc_head_subsets(
318 ObjectContextRef obc
, SnapSet
& snapset
, const hobject_t
& head
,
319 const pg_missing_t
& missing
,
320 const hobject_t
&last_backfill
,
321 interval_set
<uint64_t>& data_subset
,
322 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
323 ObcLockManager
&lock_manager
);
324 ObjectRecoveryInfo
recalc_subsets(
325 const ObjectRecoveryInfo
& recovery_info
,
327 ObcLockManager
&lock_manager
);
332 struct InProgressOp
{
334 set
<pg_shard_t
> waiting_for_commit
;
335 set
<pg_shard_t
> waiting_for_applied
;
341 ceph_tid_t tid
, Context
*on_commit
, Context
*on_applied
,
342 OpRequestRef op
, eversion_t v
)
343 : tid(tid
), on_commit(on_commit
), on_applied(on_applied
),
346 return waiting_for_commit
.empty() &&
347 waiting_for_applied
.empty();
350 map
<ceph_tid_t
, InProgressOp
> in_progress_ops
;
352 friend class C_OSD_OnOpCommit
;
353 friend class C_OSD_OnOpApplied
;
355 void call_write_ordered(std::function
<void(void)> &&cb
) override
{
356 // ReplicatedBackend submits writes inline in submit_transaction, so
357 // we can just call the callback.
361 void submit_transaction(
362 const hobject_t
&hoid
,
363 const object_stat_sum_t
&delta_stats
,
364 const eversion_t
&at_version
,
365 PGTransactionUPtr
&&t
,
366 const eversion_t
&trim_to
,
367 const eversion_t
&roll_forward_to
,
368 const vector
<pg_log_entry_t
> &log_entries
,
369 boost::optional
<pg_hit_set_history_t
> &hset_history
,
370 Context
*on_local_applied_sync
,
371 Context
*on_all_applied
,
372 Context
*on_all_commit
,
379 Message
* generate_subop(
380 const hobject_t
&soid
,
381 const eversion_t
&at_version
,
384 eversion_t pg_trim_to
,
385 eversion_t pg_roll_forward_to
,
386 hobject_t new_temp_oid
,
387 hobject_t discard_temp_oid
,
388 const vector
<pg_log_entry_t
> &log_entries
,
389 boost::optional
<pg_hit_set_history_t
> &hset_history
,
390 ObjectStore::Transaction
&op_t
,
392 const pg_info_t
&pinfo
);
394 const hobject_t
&soid
,
395 const eversion_t
&at_version
,
398 eversion_t pg_trim_to
,
399 eversion_t pg_roll_forward_to
,
400 hobject_t new_temp_oid
,
401 hobject_t discard_temp_oid
,
402 const vector
<pg_log_entry_t
> &log_entries
,
403 boost::optional
<pg_hit_set_history_t
> &hset_history
,
405 ObjectStore::Transaction
&op_t
);
406 void op_applied(InProgressOp
*op
);
407 void op_commit(InProgressOp
*op
);
408 void do_repop_reply(OpRequestRef op
);
409 void do_repop(OpRequestRef op
);
413 bool applied
, committed
;
415 eversion_t last_complete
;
416 epoch_t epoch_started
;
418 ObjectStore::Transaction opt
, localt
;
420 RepModify() : applied(false), committed(false), ackerosd(-1),
423 typedef ceph::shared_ptr
<RepModify
> RepModifyRef
;
425 struct C_OSD_RepModifyApply
;
426 struct C_OSD_RepModifyCommit
;
428 void repop_applied(RepModifyRef rm
);
429 void repop_commit(RepModifyRef rm
);
430 bool scrub_supported() override
{ return true; }
431 bool auto_repair_supported() const override
{ return false; }
435 const hobject_t
&obj
,
438 ThreadPool::TPHandle
&handle
) override
;
439 uint64_t be_get_ondisk_size(uint64_t logical_size
) override
{ return logical_size
; }