]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/pg.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / crimson / osd / pg.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #pragma once
5
6 #include <memory>
7 #include <optional>
8 #include <boost/intrusive_ptr.hpp>
9 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
10 #include <boost/smart_ptr/local_shared_ptr.hpp>
11 #include <seastar/core/future.hh>
12 #include <seastar/core/shared_future.hh>
13
14 #include "common/dout.h"
15 #include "crimson/net/Fwd.h"
16 #include "os/Transaction.h"
17 #include "osd/osd_types.h"
18 #include "crimson/osd/object_context.h"
19 #include "osd/PeeringState.h"
20
21 #include "crimson/common/type_helpers.h"
22 #include "crimson/os/futurized_collection.h"
23 #include "crimson/osd/osd_operations/client_request.h"
24 #include "crimson/osd/osd_operations/peering_event.h"
25 #include "crimson/osd/osd_operations/replicated_request.h"
26 #include "crimson/osd/shard_services.h"
27 #include "crimson/osd/osdmap_gate.h"
28
29 class OSDMap;
30 class MQuery;
31 class PGBackend;
32 class PGPeeringEvent;
33 namespace recovery {
34 class Context;
35 }
36
37 namespace crimson::net {
38 class Messenger;
39 }
40
41 namespace crimson::os {
42 class FuturizedStore;
43 }
44
45 namespace crimson::osd {
46 class ClientRequest;
47
48 class PG : public boost::intrusive_ref_counter<
49 PG,
50 boost::thread_unsafe_counter>,
51 PeeringState::PeeringListener,
52 DoutPrefixProvider
53 {
54 using ec_profile_t = std::map<std::string,std::string>;
55 using cached_map_t = boost::local_shared_ptr<const OSDMap>;
56
57 ClientRequest::PGPipeline client_request_pg_pipeline;
58 PeeringEvent::PGPipeline peering_request_pg_pipeline;
59 RepRequest::PGPipeline replicated_request_pg_pipeline;
60
61 spg_t pgid;
62 pg_shard_t pg_whoami;
63 crimson::os::CollectionRef coll_ref;
64 ghobject_t pgmeta_oid;
65
66 seastar::timer<seastar::lowres_clock> check_readable_timer;
67 seastar::timer<seastar::lowres_clock> renew_lease_timer;
68
69 public:
70 PG(spg_t pgid,
71 pg_shard_t pg_shard,
72 crimson::os::CollectionRef coll_ref,
73 pg_pool_t&& pool,
74 std::string&& name,
75 cached_map_t osdmap,
76 ShardServices &shard_services,
77 ec_profile_t profile);
78
79 ~PG();
80
81 const pg_shard_t& get_pg_whoami() const {
82 return pg_whoami;
83 }
84
85 const spg_t& get_pgid() const {
86 return pgid;
87 }
88
89 PGBackend& get_backend() {
90 return *backend;
91 }
92 const PGBackend& get_backend() const {
93 return *backend;
94 }
95
96 // EpochSource
97 epoch_t get_osdmap_epoch() const final {
98 return peering_state.get_osdmap_epoch();
99 }
100
101 // DoutPrefixProvider
102 std::ostream& gen_prefix(std::ostream& out) const final {
103 return out << *this;
104 }
105 crimson::common::CephContext *get_cct() const final {
106 return shard_services.get_cct();
107 }
108 unsigned get_subsys() const final {
109 return ceph_subsys_osd;
110 }
111
112 crimson::os::CollectionRef get_collection_ref() {
113 return coll_ref;
114 }
115
116 // PeeringListener
117 void prepare_write(
118 pg_info_t &info,
119 pg_info_t &last_written_info,
120 PastIntervals &past_intervals,
121 PGLog &pglog,
122 bool dirty_info,
123 bool dirty_big_info,
124 bool need_write_epoch,
125 ceph::os::Transaction &t) final;
126
127 void on_info_history_change() final {
128 // Not needed yet -- mainly for scrub scheduling
129 }
130
131 void scrub_requested(bool deep, bool repair, bool need_auto = false) final {
132 ceph_assert(0 == "Not implemented");
133 }
134
135 uint64_t get_snap_trimq_size() const final {
136 return 0;
137 }
138
139 void send_cluster_message(
140 int osd, Message *m,
141 epoch_t epoch, bool share_map_update=false) final {
142 (void)shard_services.send_to_osd(osd, m, epoch);
143 }
144
145 void send_pg_created(pg_t pgid) final {
146 (void)shard_services.send_pg_created(pgid);
147 }
148
149 bool try_flush_or_schedule_async() final;
150
151 void start_flush_on_transaction(
152 ceph::os::Transaction &t) final {
153 t.register_on_commit(
154 new LambdaContext([this](int r){
155 peering_state.complete_flush();
156 }));
157 }
158
159 void on_flushed() final {
160 // will be needed for unblocking IO operations/peering
161 }
162
163 void schedule_event_after(
164 PGPeeringEventRef event,
165 float delay) final {
166 ceph_assert(0 == "Not implemented yet");
167 }
168
169 void request_local_background_io_reservation(
170 unsigned priority,
171 PGPeeringEventRef on_grant,
172 PGPeeringEventRef on_preempt) final {
173 ceph_assert(0 == "Not implemented yet");
174 }
175
176 void update_local_background_io_priority(
177 unsigned priority) final {
178 ceph_assert(0 == "Not implemented yet");
179 }
180
181 void cancel_local_background_io_reservation() final {
182 // Not implemented yet, but gets called on exit() from some states
183 }
184
185 void request_remote_recovery_reservation(
186 unsigned priority,
187 PGPeeringEventRef on_grant,
188 PGPeeringEventRef on_preempt) final {
189 ceph_assert(0 == "Not implemented yet");
190 }
191
192 void cancel_remote_recovery_reservation() final {
193 // Not implemented yet, but gets called on exit() from some states
194 }
195
196 void schedule_event_on_commit(
197 ceph::os::Transaction &t,
198 PGPeeringEventRef on_commit) final {
199 t.register_on_commit(
200 new LambdaContext(
201 [this, on_commit=std::move(on_commit)](int r){
202 shard_services.start_operation<LocalPeeringEvent>(
203 this,
204 shard_services,
205 pg_whoami,
206 pgid,
207 std::move(*on_commit));
208 }));
209 }
210
211 void update_heartbeat_peers(set<int> peers) final {
212 // Not needed yet
213 }
214 void set_probe_targets(const set<pg_shard_t> &probe_set) final {
215 // Not needed yet
216 }
217 void clear_probe_targets() final {
218 // Not needed yet
219 }
220 void queue_want_pg_temp(const std::vector<int> &wanted) final {
221 shard_services.queue_want_pg_temp(pgid.pgid, wanted);
222 }
223 void clear_want_pg_temp() final {
224 shard_services.remove_want_pg_temp(pgid.pgid);
225 }
226 void publish_stats_to_osd() final {
227 // Not needed yet
228 }
229 void clear_publish_stats() final {
230 // Not needed yet
231 }
232 void check_recovery_sources(const OSDMapRef& newmap) final {
233 // Not needed yet
234 }
235 void check_blacklisted_watchers() final {
236 // Not needed yet
237 }
238 void clear_primary_state() final {
239 // Not needed yet
240 }
241
242 void queue_check_readable(epoch_t last_peering_reset,
243 ceph::timespan delay) final;
244 void recheck_readable() final;
245
246 unsigned get_target_pg_log_entries() const final;
247
248 void on_pool_change() final {
249 // Not needed yet
250 }
251 void on_role_change() final {
252 // Not needed yet
253 }
254 void on_change(ceph::os::Transaction &t) final {
255 // Not needed yet
256 }
257 void on_activate(interval_set<snapid_t> to_trim) final;
258 void on_activate_complete() final;
259 void on_new_interval() final {
260 // Not needed yet
261 }
262 Context *on_clean() final {
263 // Not needed yet (will be needed for IO unblocking)
264 return nullptr;
265 }
266 void on_activate_committed() final {
267 // Not needed yet (will be needed for IO unblocking)
268 }
269 void on_active_exit() final {
270 // Not needed yet
271 }
272
273 void on_removal(ceph::os::Transaction &t) final {
274 // TODO
275 }
276 void do_delete_work(ceph::os::Transaction &t) final;
277
278 // merge/split not ready
279 void clear_ready_to_merge() final {}
280 void set_not_ready_to_merge_target(pg_t pgid, pg_t src) final {}
281 void set_not_ready_to_merge_source(pg_t pgid) final {}
282 void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) final {}
283 void set_ready_to_merge_source(eversion_t lu) final {}
284
285 void on_active_actmap() final {
286 // Not needed yet
287 }
288 void on_active_advmap(const OSDMapRef &osdmap) final {
289 // Not needed yet
290 }
291 epoch_t oldest_stored_osdmap() final {
292 // TODO
293 return 0;
294 }
295
296
297 void on_backfill_reserved() final {
298 ceph_assert(0 == "Not implemented");
299 }
300 void on_backfill_canceled() final {
301 ceph_assert(0 == "Not implemented");
302 }
303 void on_recovery_reserved() final {
304 ceph_assert(0 == "Not implemented");
305 }
306
307
308 bool try_reserve_recovery_space(
309 int64_t primary_num_bytes, int64_t local_num_bytes) final {
310 return true;
311 }
312 void unreserve_recovery_space() final {}
313
314 struct PGLogEntryHandler : public PGLog::LogEntryHandler {
315 PG *pg;
316 ceph::os::Transaction *t;
317 PGLogEntryHandler(PG *pg, ceph::os::Transaction *t) : pg(pg), t(t) {}
318
319 // LogEntryHandler
320 void remove(const hobject_t &hoid) override {
321 // TODO
322 }
323 void try_stash(const hobject_t &hoid, version_t v) override {
324 // TODO
325 }
326 void rollback(const pg_log_entry_t &entry) override {
327 // TODO
328 }
329 void rollforward(const pg_log_entry_t &entry) override {
330 // TODO
331 }
332 void trim(const pg_log_entry_t &entry) override {
333 // TODO
334 }
335 };
336 PGLog::LogEntryHandlerRef get_log_handler(
337 ceph::os::Transaction &t) final {
338 return std::make_unique<PG::PGLogEntryHandler>(this, &t);
339 }
340
341 void rebuild_missing_set_with_deletes(PGLog &pglog) final {
342 ceph_assert(0 == "Impossible for crimson");
343 }
344
345 PerfCounters &get_peering_perf() final {
346 return shard_services.get_recoverystate_perf_logger();
347 }
348 PerfCounters &get_perf_logger() final {
349 return shard_services.get_perf_logger();
350 }
351
352 void log_state_enter(const char *state) final;
353 void log_state_exit(
354 const char *state_name, utime_t enter_time,
355 uint64_t events, utime_t event_dur) final;
356
357 void dump_recovery_info(Formatter *f) const final {
358 }
359
360 OstreamTemp get_clog_info() final {
361 // not needed yet: replace with not a stub (needs to be wired up to monc)
362 return OstreamTemp(CLOG_INFO, nullptr);
363 }
364 OstreamTemp get_clog_debug() final {
365 // not needed yet: replace with not a stub (needs to be wired up to monc)
366 return OstreamTemp(CLOG_DEBUG, nullptr);
367 }
368 OstreamTemp get_clog_error() final {
369 // not needed yet: replace with not a stub (needs to be wired up to monc)
370 return OstreamTemp(CLOG_ERROR, nullptr);
371 }
372
373 ceph::signedspan get_mnow() final;
374 HeartbeatStampsRef get_hb_stamps(int peer) final;
375 void schedule_renew_lease(epoch_t plr, ceph::timespan delay) final;
376
377
378 // Utility
379 bool is_primary() const {
380 return peering_state.is_primary();
381 }
382 pg_stat_t get_stats() {
383 auto stats = peering_state.prepare_stats_for_publish(
384 false,
385 pg_stat_t(),
386 object_stat_collection_t());
387 ceph_assert(stats);
388 return *stats;
389 }
390 bool get_need_up_thru() const {
391 return peering_state.get_need_up_thru();
392 }
393
394 const auto& get_pool() const {
395 return peering_state.get_pool();
396 }
397
398 /// initialize created PG
399 void init(
400 int role,
401 const std::vector<int>& up,
402 int up_primary,
403 const std::vector<int>& acting,
404 int acting_primary,
405 const pg_history_t& history,
406 const PastIntervals& pim,
407 bool backfill,
408 ceph::os::Transaction &t);
409
410 seastar::future<> read_state(crimson::os::FuturizedStore* store);
411
412 void do_peering_event(
413 PGPeeringEvent& evt, PeeringCtx &rctx);
414
415 void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
416 void handle_activate_map(PeeringCtx &rctx);
417 void handle_initialize(PeeringCtx &rctx);
418
419 static std::pair<hobject_t, RWState::State> get_oid_and_lock(
420 const MOSDOp &m,
421 const OpInfo &op_info);
422 static std::optional<hobject_t> resolve_oid(
423 const SnapSet &snapset,
424 const hobject_t &oid);
425
426 using load_obc_ertr = crimson::errorator<
427 crimson::ct_error::object_corrupted>;
428 load_obc_ertr::future<
429 std::pair<crimson::osd::ObjectContextRef, bool>>
430 get_or_load_clone_obc(
431 hobject_t oid, crimson::osd::ObjectContextRef head_obc);
432
433 load_obc_ertr::future<
434 std::pair<crimson::osd::ObjectContextRef, bool>>
435 get_or_load_head_obc(hobject_t oid);
436
437 load_obc_ertr::future<ObjectContextRef> get_locked_obc(
438 Operation *op,
439 const hobject_t &oid,
440 RWState::State type);
441 public:
442 template <typename F>
443 auto with_locked_obc(
444 Ref<MOSDOp> &m,
445 const OpInfo &op_info,
446 Operation *op,
447 F &&f) {
448 auto [oid, type] = get_oid_and_lock(*m, op_info);
449 return get_locked_obc(op, oid, type)
450 .safe_then([this, f=std::forward<F>(f), type=type](auto obc) {
451 return f(obc).finally([this, obc, type=type] {
452 obc->put_lock_type(type);
453 return load_obc_ertr::now();
454 });
455 });
456 }
457
458 seastar::future<> handle_rep_op(Ref<MOSDRepOp> m);
459 void handle_rep_op_reply(crimson::net::Connection* conn,
460 const MOSDRepOpReply& m);
461
462 void print(std::ostream& os) const;
463
464 private:
465 void do_peering_event(
466 const boost::statechart::event_base &evt,
467 PeeringCtx &rctx);
468 seastar::future<Ref<MOSDOpReply>> do_osd_ops(
469 Ref<MOSDOp> m,
470 ObjectContextRef obc);
471 seastar::future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
472 seastar::future<> do_osd_op(
473 ObjectState& os,
474 OSDOp& op,
475 ceph::os::Transaction& txn);
476 seastar::future<ceph::bufferlist> do_pgnls(ceph::bufferlist& indata,
477 const std::string& nspace,
478 uint64_t limit);
479 seastar::future<> submit_transaction(ObjectContextRef&& obc,
480 ceph::os::Transaction&& txn,
481 const MOSDOp& req);
482
483 private:
484 OSDMapGate osdmap_gate;
485 ShardServices &shard_services;
486
487 cached_map_t osdmap;
488
489 public:
490 cached_map_t get_osdmap() { return osdmap; }
491
492 private:
493 std::unique_ptr<PGBackend> backend;
494
495 PeeringState peering_state;
496 eversion_t projected_last_update;
497
498 class WaitForActiveBlocker : public BlockerT<WaitForActiveBlocker> {
499 PG *pg;
500
501 const spg_t pgid;
502 seastar::shared_promise<> p;
503
504 protected:
505 void dump_detail(Formatter *f) const;
506
507 public:
508 static constexpr const char *type_name = "WaitForActiveBlocker";
509
510 WaitForActiveBlocker(PG *pg) : pg(pg) {}
511 void on_active();
512 blocking_future<> wait();
513 } wait_for_active_blocker;
514
515 friend std::ostream& operator<<(std::ostream&, const PG& pg);
516 friend class ClientRequest;
517 friend class PGAdvanceMap;
518 friend class PeeringEvent;
519 friend class RepRequest;
520 };
521
522 std::ostream& operator<<(std::ostream&, const PG& pg);
523
524 }