1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include "PGBackend.h"
20 struct C_ReplicatedBackend_OnPullComplete
;
21 class ReplicatedBackend
: public PGBackend
{
22 struct RPGHandle
: public PGBackend::RecoveryHandle
{
23 map
<pg_shard_t
, vector
<PushOp
> > pushes
;
24 map
<pg_shard_t
, vector
<PullOp
> > pulls
;
26 friend struct C_ReplicatedBackend_OnPullComplete
;
29 PGBackend::Listener
*pg
,
31 ObjectStore::CollectionHandle
&ch
,
35 /// @see PGBackend::open_recovery_op
36 RPGHandle
*_open_recovery_op() {
37 return new RPGHandle();
39 PGBackend::RecoveryHandle
*open_recovery_op() override
{
40 return _open_recovery_op();
43 /// @see PGBackend::run_recovery_op
45 PGBackend::RecoveryHandle
*h
,
46 int priority
) override
;
48 /// @see PGBackend::recover_object
50 const hobject_t
&hoid
,
52 ObjectContextRef head
,
57 void check_recovery_sources(const OSDMapRef
& osdmap
) override
;
59 bool can_handle_while_inactive(OpRequestRef op
) override
;
61 /// @see PGBackend::handle_message
66 void on_change() override
;
67 void clear_recovery_state() override
;
69 class RPCRecPred
: public IsPGRecoverablePredicate
{
71 bool operator()(const set
<pg_shard_t
> &have
) const override
{
75 IsPGRecoverablePredicate
*get_is_recoverable_predicate() const override
{
76 return new RPCRecPred
;
79 class RPCReadPred
: public IsPGReadablePredicate
{
82 explicit RPCReadPred(pg_shard_t whoami
) : whoami(whoami
) {}
83 bool operator()(const set
<pg_shard_t
> &have
) const override
{
84 return have
.count(whoami
);
87 IsPGReadablePredicate
*get_is_readable_predicate() const override
{
88 return new RPCReadPred(get_parent()->whoami_shard());
91 void dump_recovery_info(Formatter
*f
) const override
{
93 f
->open_array_section("pull_from_peer");
94 for (map
<pg_shard_t
, set
<hobject_t
> >::const_iterator i
= pull_from_peer
.begin();
95 i
!= pull_from_peer
.end();
97 f
->open_object_section("pulling_from");
98 f
->dump_stream("pull_from") << i
->first
;
100 f
->open_array_section("pulls");
101 for (set
<hobject_t
>::const_iterator j
= i
->second
.begin();
102 j
!= i
->second
.end();
104 f
->open_object_section("pull_info");
105 ceph_assert(pulling
.count(*j
));
106 pulling
.find(*j
)->second
.dump(f
);
116 f
->open_array_section("pushing");
117 for (map
<hobject_t
, map
<pg_shard_t
, PushInfo
>>::const_iterator i
=
121 f
->open_object_section("object");
122 f
->dump_stream("pushing") << i
->first
;
124 f
->open_array_section("pushing_to");
125 for (map
<pg_shard_t
, PushInfo
>::const_iterator j
= i
->second
.begin();
126 j
!= i
->second
.end();
128 f
->open_object_section("push_progress");
129 f
->dump_stream("pushing_to") << j
->first
;
131 f
->open_object_section("push_info");
145 int objects_read_sync(
146 const hobject_t
&hoid
,
150 bufferlist
*bl
) override
;
152 int objects_readv_sync(
153 const hobject_t
&hoid
,
154 map
<uint64_t, uint64_t>&& m
,
156 bufferlist
*bl
) override
;
158 void objects_read_async(
159 const hobject_t
&hoid
,
160 const list
<pair
<boost::tuple
<uint64_t, uint64_t, uint32_t>,
161 pair
<bufferlist
*, Context
*> > > &to_read
,
162 Context
*on_complete
,
163 bool fast_read
= false) override
;
168 ObjectRecoveryProgress recovery_progress
;
169 ObjectRecoveryInfo recovery_info
;
170 ObjectContextRef obc
;
171 object_stat_sum_t stat
;
172 ObcLockManager lock_manager
;
174 void dump(Formatter
*f
) const {
176 f
->open_object_section("recovery_progress");
177 recovery_progress
.dump(f
);
181 f
->open_object_section("recovery_info");
182 recovery_info
.dump(f
);
187 map
<hobject_t
, map
<pg_shard_t
, PushInfo
>> pushing
;
193 ObjectRecoveryProgress recovery_progress
;
194 ObjectRecoveryInfo recovery_info
;
195 ObjectContextRef head_ctx
;
196 ObjectContextRef obc
;
197 object_stat_sum_t stat
;
198 bool cache_dont_need
;
199 ObcLockManager lock_manager
;
201 void dump(Formatter
*f
) const {
203 f
->open_object_section("recovery_progress");
204 recovery_progress
.dump(f
);
208 f
->open_object_section("recovery_info");
209 recovery_info
.dump(f
);
214 bool is_complete() const {
215 return recovery_progress
.is_complete(recovery_info
);
219 map
<hobject_t
, PullInfo
> pulling
;
221 // Reverse mapping from osd peer to objects being pulled from that peer
222 map
<pg_shard_t
, set
<hobject_t
> > pull_from_peer
;
224 map
<hobject_t
, PullInfo
>::iterator piter
,
225 bool clear_pull_from_peer
= true);
226 void clear_pull_from(
227 map
<hobject_t
, PullInfo
>::iterator piter
);
229 void _do_push(OpRequestRef op
);
230 void _do_pull_response(OpRequestRef op
);
231 void do_push(OpRequestRef op
) {
233 _do_pull_response(op
);
238 void do_pull(OpRequestRef op
);
239 void do_push_reply(OpRequestRef op
);
241 bool handle_push_reply(pg_shard_t peer
, const PushReplyOp
&op
, PushOp
*reply
);
242 void handle_pull(pg_shard_t peer
, PullOp
&op
, PushOp
*reply
);
244 struct pull_complete_info
{
246 object_stat_sum_t stat
;
248 bool handle_pull_response(
249 pg_shard_t from
, const PushOp
&op
, PullOp
*response
,
250 list
<pull_complete_info
> *to_continue
,
251 ObjectStore::Transaction
*t
);
252 void handle_push(pg_shard_t from
, const PushOp
&op
, PushReplyOp
*response
,
253 ObjectStore::Transaction
*t
, bool is_repair
);
255 static void trim_pushed_data(const interval_set
<uint64_t> ©_subset
,
256 const interval_set
<uint64_t> &intervals_received
,
257 bufferlist data_received
,
258 interval_set
<uint64_t> *intervals_usable
,
259 bufferlist
*data_usable
);
260 void _failed_pull(pg_shard_t from
, const hobject_t
&soid
);
262 void send_pushes(int prio
, map
<pg_shard_t
, vector
<PushOp
> > &pushes
);
263 void prep_push_op_blank(const hobject_t
& soid
, PushOp
*op
);
266 map
<pg_shard_t
, vector
<PullOp
> > &pulls
);
268 int build_push_op(const ObjectRecoveryInfo
&recovery_info
,
269 const ObjectRecoveryProgress
&progress
,
270 ObjectRecoveryProgress
*out_progress
,
272 object_stat_sum_t
*stat
= 0,
273 bool cache_dont_need
= true);
274 void submit_push_data(const ObjectRecoveryInfo
&recovery_info
,
278 bool cache_dont_need
,
279 interval_set
<uint64_t> &data_zeros
,
280 const interval_set
<uint64_t> &intervals_included
,
281 bufferlist data_included
,
282 bufferlist omap_header
,
283 const map
<string
, bufferlist
> &attrs
,
284 const map
<string
, bufferlist
> &omap_entries
,
285 ObjectStore::Transaction
*t
);
286 void submit_push_complete(const ObjectRecoveryInfo
&recovery_info
,
287 ObjectStore::Transaction
*t
);
289 void calc_clone_subsets(
290 SnapSet
& snapset
, const hobject_t
& poid
, const pg_missing_t
& missing
,
291 const hobject_t
&last_backfill
,
292 interval_set
<uint64_t>& data_subset
,
293 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
294 ObcLockManager
&lock_manager
);
297 const hobject_t
& soid
,
298 ObjectContextRef headctx
,
301 const hobject_t
&soid
,
302 ObjectContextRef obj
,
304 int prep_push_to_replica(
305 ObjectContextRef obc
, const hobject_t
& soid
, pg_shard_t peer
,
306 PushOp
*pop
, bool cache_dont_need
= true);
308 ObjectContextRef obc
,
309 const hobject_t
& oid
, pg_shard_t dest
,
311 bool cache_dont_need
);
313 ObjectContextRef obc
,
314 const hobject_t
& soid
, pg_shard_t peer
,
316 interval_set
<uint64_t> &data_subset
,
317 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
320 ObcLockManager
&&lock_manager
);
321 void calc_head_subsets(
322 ObjectContextRef obc
, SnapSet
& snapset
, const hobject_t
& head
,
323 const pg_missing_t
& missing
,
324 const hobject_t
&last_backfill
,
325 interval_set
<uint64_t>& data_subset
,
326 map
<hobject_t
, interval_set
<uint64_t>>& clone_subsets
,
327 ObcLockManager
&lock_manager
);
328 ObjectRecoveryInfo
recalc_subsets(
329 const ObjectRecoveryInfo
& recovery_info
,
331 ObcLockManager
&lock_manager
);
336 struct InProgressOp
: public RefCountedObject
{
338 set
<pg_shard_t
> waiting_for_commit
;
343 return waiting_for_commit
.empty();
346 FRIEND_MAKE_REF(InProgressOp
);
347 InProgressOp(ceph_tid_t tid
, Context
*on_commit
, OpRequestRef op
, eversion_t v
)
349 tid(tid
), on_commit(on_commit
),
352 map
<ceph_tid_t
, ceph::ref_t
<InProgressOp
>> in_progress_ops
;
354 friend class C_OSD_OnOpCommit
;
356 void call_write_ordered(std::function
<void(void)> &&cb
) override
{
357 // ReplicatedBackend submits writes inline in submit_transaction, so
358 // we can just call the callback.
362 void submit_transaction(
363 const hobject_t
&hoid
,
364 const object_stat_sum_t
&delta_stats
,
365 const eversion_t
&at_version
,
366 PGTransactionUPtr
&&t
,
367 const eversion_t
&trim_to
,
368 const eversion_t
&min_last_complete_ondisk
,
369 const vector
<pg_log_entry_t
> &log_entries
,
370 std::optional
<pg_hit_set_history_t
> &hset_history
,
371 Context
*on_all_commit
,
378 Message
* generate_subop(
379 const hobject_t
&soid
,
380 const eversion_t
&at_version
,
383 eversion_t pg_trim_to
,
384 eversion_t min_last_complete_ondisk
,
385 hobject_t new_temp_oid
,
386 hobject_t discard_temp_oid
,
387 const bufferlist
&log_entries
,
388 std::optional
<pg_hit_set_history_t
> &hset_history
,
389 ObjectStore::Transaction
&op_t
,
391 const pg_info_t
&pinfo
);
393 const hobject_t
&soid
,
394 const eversion_t
&at_version
,
397 eversion_t pg_trim_to
,
398 eversion_t min_last_complete_ondisk
,
399 hobject_t new_temp_oid
,
400 hobject_t discard_temp_oid
,
401 const vector
<pg_log_entry_t
> &log_entries
,
402 std::optional
<pg_hit_set_history_t
> &hset_history
,
404 ObjectStore::Transaction
&op_t
);
405 void op_commit(const ceph::ref_t
<InProgressOp
>& op
);
406 void do_repop_reply(OpRequestRef op
);
407 void do_repop(OpRequestRef op
);
413 eversion_t last_complete
;
414 epoch_t epoch_started
;
416 ObjectStore::Transaction opt
, localt
;
418 RepModify() : committed(false), ackerosd(-1),
421 typedef std::shared_ptr
<RepModify
> RepModifyRef
;
423 struct C_OSD_RepModifyCommit
;
425 void repop_commit(RepModifyRef rm
);
426 bool auto_repair_supported() const override
{ return store
->has_builtin_csum(); }
430 const hobject_t
&poid
,
432 ScrubMapBuilder
&pos
,
433 ScrubMap::object
&o
) override
;
434 uint64_t be_get_ondisk_size(uint64_t logical_size
) override
{ return logical_size
; }