1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <boost/intrusive_ptr.hpp>
7 #include <seastar/core/future.hh>
9 #include "include/common_fwd.h"
10 #include "osd_operation.h"
11 #include "msg/MessageRef.h"
12 #include "crimson/common/exception.h"
13 #include "crimson/os/futurized_collection.h"
14 #include "osd/PeeringState.h"
15 #include "crimson/osd/osdmap_service.h"
16 #include "crimson/osd/object_context.h"
17 #include "common/AsyncReserver.h"
19 namespace crimson::net
{
23 namespace crimson::mgr
{
27 namespace crimson::mon
{
31 namespace crimson::os
{
37 class BufferedRecoveryMessages
;
39 namespace crimson::osd
{
42 * Represents services available to each PG
44 class ShardServices
: public md_config_obs_t
{
45 using cached_map_t
= boost::local_shared_ptr
<const OSDMap
>;
46 OSDMapService
&osdmap_service
;
48 crimson::net::Messenger
&cluster_msgr
;
49 crimson::net::Messenger
&public_msgr
;
50 crimson::mon::Client
&monc
;
51 crimson::mgr::Client
&mgrc
;
52 crimson::os::FuturizedStore
&store
;
54 crimson::common::CephContext cct
;
56 PerfCounters
*perf
= nullptr;
57 PerfCounters
*recoverystate_perf
= nullptr;
59 const char** get_tracked_conf_keys() const final
;
60 void handle_conf_change(const ConfigProxy
& conf
,
61 const std::set
<std::string
> &changed
) final
;
65 OSDMapService
&osdmap_service
,
67 crimson::net::Messenger
&cluster_msgr
,
68 crimson::net::Messenger
&public_msgr
,
69 crimson::mon::Client
&monc
,
70 crimson::mgr::Client
&mgrc
,
71 crimson::os::FuturizedStore
&store
);
73 seastar::future
<> send_to_osd(
78 crimson::os::FuturizedStore
&get_store() {
82 crimson::common::CephContext
*get_cct() {
87 const OSDMapService
&get_osdmap_service() const {
88 return osdmap_service
;
92 OSDOperationRegistry registry
;
93 OperationThrottler throttler
;
95 template <typename T
, typename
... Args
>
96 auto start_operation(Args
&&... args
) {
97 if (__builtin_expect(stopping
, false)) {
98 throw crimson::common::system_shutdown_exception();
100 auto op
= registry
.create_operation
<T
>(std::forward
<Args
>(args
)...);
101 auto fut
= op
->start().then([op
/* by copy */] {
102 // ensure the op's lifetime is appropriate. It is not enough to
103 // guarantee it's alive at the scheduling stages (i.e. `then()`
104 // calling) but also during the actual execution (i.e. when passed
105 // lambdas are actually run).
107 return std::make_pair(std::move(op
), std::move(fut
));
110 seastar::future
<> stop() {
112 return registry
.stop();
116 PerfCounters
&get_recoverystate_perf_logger() {
117 return *recoverystate_perf
;
119 PerfCounters
&get_perf_logger() {
123 /// Dispatch and reset ctx transaction
124 seastar::future
<> dispatch_context_transaction(
125 crimson::os::CollectionRef col
, PeeringCtx
&ctx
);
127 /// Dispatch and reset ctx messages
128 seastar::future
<> dispatch_context_messages(
129 BufferedRecoveryMessages
&&ctx
);
131 /// Dispatch ctx and dispose of context
132 seastar::future
<> dispatch_context(
133 crimson::os::CollectionRef col
,
136 /// Dispatch ctx and dispose of ctx, transaction must be empty
137 seastar::future
<> dispatch_context(
139 return dispatch_context({}, std::move(ctx
));
144 // TODO: hook into map processing and some kind of heartbeat/peering
145 // message processing
147 std::vector
<int> acting
;
150 std::map
<pg_t
, pg_temp_t
> pg_temp_wanted
;
151 std::map
<pg_t
, pg_temp_t
> pg_temp_pending
;
152 friend std::ostream
& operator<<(std::ostream
&, const pg_temp_t
&);
154 void queue_want_pg_temp(pg_t pgid
, const std::vector
<int>& want
,
155 bool forced
= false);
156 void remove_want_pg_temp(pg_t pgid
);
157 void requeue_pg_temp();
158 seastar::future
<> send_pg_temp();
160 // Shard-local OSDMap
164 void update_map(cached_map_t new_osdmap
);
165 cached_map_t
&get_osdmap();
169 std::set
<pg_t
> pg_created
;
171 seastar::future
<> send_pg_created(pg_t pgid
);
172 seastar::future
<> send_pg_created();
173 void prune_pg_created();
175 unsigned get_pg_num() const {
185 seastar::future
<> osdmap_subscribe(version_t epoch
, bool force_request
);
188 ceph::mono_time startup_time
= ceph::mono_clock::now();
189 ceph::signedspan
get_mnow() const {
190 return ceph::mono_clock::now() - startup_time
;
192 HeartbeatStampsRef
get_hb_stamps(int peer
);
193 std::map
<int, HeartbeatStampsRef
> heartbeat_stamps
;
195 crimson::osd::ObjectContextRegistry obc_registry
;
199 unsigned num_pgs
= 0;
201 struct DirectFinisher
{
202 void queue(Context
*c
) {
206 // prevent creating new osd operations when system is shutting down,
207 // this is necessary because there are chances that a new operation
208 // is created, after the interruption of all ongoing operations, and
209 // creats and waits on a new and may-never-resolve future, in which
210 // case the shutdown may never succeed.
211 bool stopping
= false;
213 AsyncReserver
<spg_t
, DirectFinisher
> local_reserver
;
214 AsyncReserver
<spg_t
, DirectFinisher
> remote_reserver
;
217 epoch_t up_thru_wanted
= 0;
219 seastar::future
<> send_alive(epoch_t want
);