]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef CEPH_OSD_H | |
16 | #define CEPH_OSD_H | |
17 | ||
18 | #include "PG.h" | |
19 | ||
20 | #include "msg/Dispatcher.h" | |
21 | ||
22 | #include "common/Mutex.h" | |
23 | #include "common/RWLock.h" | |
24 | #include "common/Timer.h" | |
25 | #include "common/WorkQueue.h" | |
26 | #include "common/AsyncReserver.h" | |
27 | #include "common/ceph_context.h" | |
11fdf7f2 | 28 | #include "common/config_cacher.h" |
7c673cae FG |
29 | #include "common/zipkin_trace.h" |
30 | ||
31 | #include "mgr/MgrClient.h" | |
32 | ||
33 | #include "os/ObjectStore.h" | |
34 | #include "OSDCap.h" | |
35 | ||
36 | #include "auth/KeyRing.h" | |
11fdf7f2 | 37 | |
7c673cae FG |
38 | #include "osd/ClassHandler.h" |
39 | ||
40 | #include "include/CompatSet.h" | |
41 | ||
42 | #include "OpRequest.h" | |
43 | #include "Session.h" | |
44 | ||
11fdf7f2 | 45 | #include "osd/OpQueueItem.h" |
224ce89b | 46 | |
7c673cae FG |
47 | #include <atomic> |
48 | #include <map> | |
49 | #include <memory> | |
11fdf7f2 | 50 | #include <string> |
7c673cae FG |
51 | |
52 | #include "include/unordered_map.h" | |
53 | ||
54 | #include "common/shared_cache.hpp" | |
55 | #include "common/simple_cache.hpp" | |
56 | #include "common/sharedptr_registry.hpp" | |
57 | #include "common/WeightedPriorityQueue.h" | |
58 | #include "common/PrioritizedQueue.h" | |
224ce89b WB |
59 | #include "osd/mClockOpClassQueue.h" |
60 | #include "osd/mClockClientQueue.h" | |
7c673cae | 61 | #include "messages/MOSDOp.h" |
7c673cae FG |
62 | #include "common/EventTrace.h" |
63 | ||
64 | #define CEPH_OSD_PROTOCOL 10 /* cluster internal */ | |
65 | ||
11fdf7f2 TL |
66 | /* |
67 | ||
68 | lock ordering for pg map | |
69 | ||
70 | PG::lock | |
71 | ShardData::lock | |
72 | OSD::pg_map_lock | |
73 | ||
74 | */ | |
7c673cae FG |
75 | |
76 | enum { | |
77 | l_osd_first = 10000, | |
78 | l_osd_op_wip, | |
79 | l_osd_op, | |
80 | l_osd_op_inb, | |
81 | l_osd_op_outb, | |
82 | l_osd_op_lat, | |
83 | l_osd_op_process_lat, | |
84 | l_osd_op_prepare_lat, | |
85 | l_osd_op_r, | |
86 | l_osd_op_r_outb, | |
87 | l_osd_op_r_lat, | |
88 | l_osd_op_r_lat_outb_hist, | |
89 | l_osd_op_r_process_lat, | |
90 | l_osd_op_r_prepare_lat, | |
91 | l_osd_op_w, | |
92 | l_osd_op_w_inb, | |
93 | l_osd_op_w_lat, | |
94 | l_osd_op_w_lat_inb_hist, | |
95 | l_osd_op_w_process_lat, | |
96 | l_osd_op_w_prepare_lat, | |
97 | l_osd_op_rw, | |
98 | l_osd_op_rw_inb, | |
99 | l_osd_op_rw_outb, | |
100 | l_osd_op_rw_lat, | |
101 | l_osd_op_rw_lat_inb_hist, | |
102 | l_osd_op_rw_lat_outb_hist, | |
103 | l_osd_op_rw_process_lat, | |
104 | l_osd_op_rw_prepare_lat, | |
105 | ||
224ce89b WB |
106 | l_osd_op_before_queue_op_lat, |
107 | l_osd_op_before_dequeue_op_lat, | |
108 | ||
7c673cae FG |
109 | l_osd_sop, |
110 | l_osd_sop_inb, | |
111 | l_osd_sop_lat, | |
112 | l_osd_sop_w, | |
113 | l_osd_sop_w_inb, | |
114 | l_osd_sop_w_lat, | |
115 | l_osd_sop_pull, | |
116 | l_osd_sop_pull_lat, | |
117 | l_osd_sop_push, | |
118 | l_osd_sop_push_inb, | |
119 | l_osd_sop_push_lat, | |
120 | ||
121 | l_osd_pull, | |
122 | l_osd_push, | |
123 | l_osd_push_outb, | |
124 | ||
125 | l_osd_rop, | |
11fdf7f2 | 126 | l_osd_rbytes, |
7c673cae FG |
127 | |
128 | l_osd_loadavg, | |
7c673cae FG |
129 | l_osd_cached_crc, |
130 | l_osd_cached_crc_adjusted, | |
131 | l_osd_missed_crc, | |
132 | ||
133 | l_osd_pg, | |
134 | l_osd_pg_primary, | |
135 | l_osd_pg_replica, | |
136 | l_osd_pg_stray, | |
94b18763 | 137 | l_osd_pg_removing, |
7c673cae FG |
138 | l_osd_hb_to, |
139 | l_osd_map, | |
140 | l_osd_mape, | |
141 | l_osd_mape_dup, | |
142 | ||
143 | l_osd_waiting_for_map, | |
144 | ||
145 | l_osd_map_cache_hit, | |
146 | l_osd_map_cache_miss, | |
147 | l_osd_map_cache_miss_low, | |
148 | l_osd_map_cache_miss_low_avg, | |
31f18b77 FG |
149 | l_osd_map_bl_cache_hit, |
150 | l_osd_map_bl_cache_miss, | |
7c673cae FG |
151 | |
152 | l_osd_stat_bytes, | |
153 | l_osd_stat_bytes_used, | |
154 | l_osd_stat_bytes_avail, | |
155 | ||
156 | l_osd_copyfrom, | |
157 | ||
158 | l_osd_tier_promote, | |
159 | l_osd_tier_flush, | |
160 | l_osd_tier_flush_fail, | |
161 | l_osd_tier_try_flush, | |
162 | l_osd_tier_try_flush_fail, | |
163 | l_osd_tier_evict, | |
164 | l_osd_tier_whiteout, | |
165 | l_osd_tier_dirty, | |
166 | l_osd_tier_clean, | |
167 | l_osd_tier_delay, | |
168 | l_osd_tier_proxy_read, | |
169 | l_osd_tier_proxy_write, | |
170 | ||
171 | l_osd_agent_wake, | |
172 | l_osd_agent_skip, | |
173 | l_osd_agent_flush, | |
174 | l_osd_agent_evict, | |
175 | ||
176 | l_osd_object_ctx_cache_hit, | |
177 | l_osd_object_ctx_cache_total, | |
178 | ||
179 | l_osd_op_cache_hit, | |
180 | l_osd_tier_flush_lat, | |
181 | l_osd_tier_promote_lat, | |
182 | l_osd_tier_r_lat, | |
183 | ||
184 | l_osd_pg_info, | |
185 | l_osd_pg_fastinfo, | |
186 | l_osd_pg_biginfo, | |
187 | ||
188 | l_osd_last, | |
189 | }; | |
190 | ||
191 | // RecoveryState perf counters | |
192 | enum { | |
193 | rs_first = 20000, | |
194 | rs_initial_latency, | |
195 | rs_started_latency, | |
196 | rs_reset_latency, | |
197 | rs_start_latency, | |
198 | rs_primary_latency, | |
199 | rs_peering_latency, | |
200 | rs_backfilling_latency, | |
201 | rs_waitremotebackfillreserved_latency, | |
202 | rs_waitlocalbackfillreserved_latency, | |
203 | rs_notbackfilling_latency, | |
204 | rs_repnotrecovering_latency, | |
205 | rs_repwaitrecoveryreserved_latency, | |
206 | rs_repwaitbackfillreserved_latency, | |
207 | rs_reprecovering_latency, | |
208 | rs_activating_latency, | |
209 | rs_waitlocalrecoveryreserved_latency, | |
210 | rs_waitremoterecoveryreserved_latency, | |
211 | rs_recovering_latency, | |
212 | rs_recovered_latency, | |
213 | rs_clean_latency, | |
214 | rs_active_latency, | |
215 | rs_replicaactive_latency, | |
216 | rs_stray_latency, | |
217 | rs_getinfo_latency, | |
218 | rs_getlog_latency, | |
219 | rs_waitactingchange_latency, | |
220 | rs_incomplete_latency, | |
221 | rs_down_latency, | |
222 | rs_getmissing_latency, | |
223 | rs_waitupthru_latency, | |
224 | rs_notrecovering_latency, | |
225 | rs_last, | |
226 | }; | |
227 | ||
228 | class Messenger; | |
229 | class Message; | |
230 | class MonClient; | |
231 | class PerfCounters; | |
232 | class ObjectStore; | |
233 | class FuseStore; | |
234 | class OSDMap; | |
235 | class MLog; | |
236 | class Objecter; | |
11fdf7f2 | 237 | class KeyStore; |
7c673cae FG |
238 | |
239 | class Watch; | |
240 | class PrimaryLogPG; | |
241 | ||
7c673cae | 242 | class TestOpsSocketHook; |
11fdf7f2 | 243 | struct C_FinishSplits; |
7c673cae FG |
244 | struct C_OpenPGs; |
245 | class LogChannel; | |
246 | class CephContext; | |
7c673cae FG |
247 | class MOSDOp; |
248 | ||
11fdf7f2 TL |
249 | class MOSDPGCreate2; |
250 | class MOSDPGQuery; | |
251 | class MOSDPGNotify; | |
252 | class MOSDPGInfo; | |
253 | class MOSDPGRemove; | |
254 | class MOSDForceRecovery; | |
7c673cae FG |
255 | |
256 | class OSD; | |
257 | ||
7c673cae FG |
258 | class OSDService { |
259 | public: | |
260 | OSD *osd; | |
261 | CephContext *cct; | |
11fdf7f2 | 262 | ObjectStore::CollectionHandle meta_ch; |
7c673cae FG |
263 | const int whoami; |
264 | ObjectStore *&store; | |
265 | LogClient &log_client; | |
266 | LogChannelRef clog; | |
267 | PGRecoveryStats &pg_recovery_stats; | |
268 | private: | |
269 | Messenger *&cluster_messenger; | |
270 | Messenger *&client_messenger; | |
271 | public: | |
272 | PerfCounters *&logger; | |
273 | PerfCounters *&recoverystate_perf; | |
274 | MonClient *&monc; | |
7c673cae FG |
275 | ClassHandler *&class_handler; |
276 | ||
11fdf7f2 TL |
277 | md_config_cacher_t<Option::size_t> osd_max_object_size; |
278 | md_config_cacher_t<bool> osd_skip_data_digest; | |
279 | ||
280 | void enqueue_back(OpQueueItem&& qi); | |
281 | void enqueue_front(OpQueueItem&& qi); | |
7c673cae FG |
282 | |
283 | void maybe_inject_dispatch_delay() { | |
11fdf7f2 | 284 | if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) { |
7c673cae | 285 | if (rand() % 10000 < |
11fdf7f2 | 286 | g_conf()->osd_debug_inject_dispatch_delay_probability * 10000) { |
7c673cae | 287 | utime_t t; |
11fdf7f2 | 288 | t.set_from_double(g_conf()->osd_debug_inject_dispatch_delay_duration); |
7c673cae FG |
289 | t.sleep(); |
290 | } | |
291 | } | |
292 | } | |
293 | ||
7c673cae FG |
294 | private: |
295 | // -- superblock -- | |
11fdf7f2 | 296 | ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish |
7c673cae FG |
297 | OSDSuperblock superblock; |
298 | ||
299 | public: | |
300 | OSDSuperblock get_superblock() { | |
11fdf7f2 | 301 | std::lock_guard l(publish_lock); |
7c673cae FG |
302 | return superblock; |
303 | } | |
304 | void publish_superblock(const OSDSuperblock &block) { | |
11fdf7f2 | 305 | std::lock_guard l(publish_lock); |
7c673cae FG |
306 | superblock = block; |
307 | } | |
308 | ||
309 | int get_nodeid() const { return whoami; } | |
310 | ||
311 | std::atomic<epoch_t> max_oldest_map; | |
312 | private: | |
313 | OSDMapRef osdmap; | |
314 | ||
315 | public: | |
316 | OSDMapRef get_osdmap() { | |
11fdf7f2 | 317 | std::lock_guard l(publish_lock); |
7c673cae FG |
318 | return osdmap; |
319 | } | |
320 | epoch_t get_osdmap_epoch() { | |
11fdf7f2 | 321 | std::lock_guard l(publish_lock); |
7c673cae FG |
322 | return osdmap ? osdmap->get_epoch() : 0; |
323 | } | |
324 | void publish_map(OSDMapRef map) { | |
11fdf7f2 | 325 | std::lock_guard l(publish_lock); |
7c673cae FG |
326 | osdmap = map; |
327 | } | |
328 | ||
329 | /* | |
330 | * osdmap - current published map | |
331 | * next_osdmap - pre_published map that is about to be published. | |
332 | * | |
333 | * We use the next_osdmap to send messages and initiate connections, | |
334 | * but only if the target is the same instance as the one in the map | |
335 | * epoch the current user is working from (i.e., the result is | |
336 | * equivalent to what is in next_osdmap). | |
337 | * | |
338 | * This allows the helpers to start ignoring osds that are about to | |
339 | * go down, and let OSD::handle_osd_map()/note_down_osd() mark them | |
340 | * down, without worrying about reopening connections from threads | |
341 | * working from old maps. | |
342 | */ | |
343 | private: | |
344 | OSDMapRef next_osdmap; | |
11fdf7f2 | 345 | ceph::condition_variable pre_publish_cond; |
7c673cae FG |
346 | |
347 | public: | |
348 | void pre_publish_map(OSDMapRef map) { | |
11fdf7f2 | 349 | std::lock_guard l(pre_publish_lock); |
7c673cae FG |
350 | next_osdmap = std::move(map); |
351 | } | |
352 | ||
353 | void activate_map(); | |
354 | /// map epochs reserved below | |
355 | map<epoch_t, unsigned> map_reservations; | |
356 | ||
357 | /// gets ref to next_osdmap and registers the epoch as reserved | |
358 | OSDMapRef get_nextmap_reserved() { | |
11fdf7f2 | 359 | std::lock_guard l(pre_publish_lock); |
7c673cae FG |
360 | if (!next_osdmap) |
361 | return OSDMapRef(); | |
362 | epoch_t e = next_osdmap->get_epoch(); | |
363 | map<epoch_t, unsigned>::iterator i = | |
364 | map_reservations.insert(make_pair(e, 0)).first; | |
365 | i->second++; | |
366 | return next_osdmap; | |
367 | } | |
368 | /// releases reservation on map | |
369 | void release_map(OSDMapRef osdmap) { | |
11fdf7f2 | 370 | std::lock_guard l(pre_publish_lock); |
7c673cae FG |
371 | map<epoch_t, unsigned>::iterator i = |
372 | map_reservations.find(osdmap->get_epoch()); | |
11fdf7f2 TL |
373 | ceph_assert(i != map_reservations.end()); |
374 | ceph_assert(i->second > 0); | |
7c673cae FG |
375 | if (--(i->second) == 0) { |
376 | map_reservations.erase(i); | |
377 | } | |
11fdf7f2 | 378 | pre_publish_cond.notify_all(); |
7c673cae FG |
379 | } |
380 | /// blocks until there are no reserved maps prior to next_osdmap | |
381 | void await_reserved_maps() { | |
11fdf7f2 TL |
382 | std::unique_lock l{pre_publish_lock}; |
383 | ceph_assert(next_osdmap); | |
384 | pre_publish_cond.wait(l, [this] { | |
385 | auto i = map_reservations.cbegin(); | |
386 | return (i == map_reservations.cend() || | |
387 | i->first >= next_osdmap->get_epoch()); | |
388 | }); | |
389 | } | |
390 | OSDMapRef get_next_osdmap() { | |
391 | std::lock_guard l(pre_publish_lock); | |
392 | if (!next_osdmap) | |
393 | return OSDMapRef(); | |
394 | return next_osdmap; | |
7c673cae FG |
395 | } |
396 | ||
397 | private: | |
398 | Mutex peer_map_epoch_lock; | |
399 | map<int, epoch_t> peer_map_epoch; | |
400 | public: | |
401 | epoch_t get_peer_epoch(int p); | |
402 | epoch_t note_peer_epoch(int p, epoch_t e); | |
403 | void forget_peer_epoch(int p, epoch_t e); | |
404 | ||
405 | void send_map(class MOSDMap *m, Connection *con); | |
406 | void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap); | |
407 | MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to, | |
408 | OSDSuperblock& superblock); | |
409 | bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch, | |
410 | const OSDMapRef& osdmap, const epoch_t *sent_epoch_p); | |
411 | void share_map(entity_name_t name, Connection *con, epoch_t epoch, | |
412 | OSDMapRef& osdmap, epoch_t *sent_epoch_p); | |
413 | void share_map_peer(int peer, Connection *con, | |
414 | OSDMapRef map = OSDMapRef()); | |
415 | ||
416 | ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch); | |
417 | pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front) | |
418 | void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch); | |
419 | void send_message_osd_cluster(Message *m, Connection *con) { | |
420 | con->send_message(m); | |
421 | } | |
422 | void send_message_osd_cluster(Message *m, const ConnectionRef& con) { | |
423 | con->send_message(m); | |
424 | } | |
425 | void send_message_osd_client(Message *m, Connection *con) { | |
426 | con->send_message(m); | |
427 | } | |
428 | void send_message_osd_client(Message *m, const ConnectionRef& con) { | |
429 | con->send_message(m); | |
430 | } | |
11fdf7f2 | 431 | entity_name_t get_cluster_msgr_name() const; |
7c673cae FG |
432 | |
433 | private: | |
434 | // -- scrub scheduling -- | |
435 | Mutex sched_scrub_lock; | |
eafe8130 TL |
436 | int scrubs_local; |
437 | int scrubs_remote; | |
7c673cae FG |
438 | |
439 | public: | |
440 | struct ScrubJob { | |
441 | CephContext* cct; | |
442 | /// pg to be scrubbed | |
443 | spg_t pgid; | |
444 | /// a time scheduled for scrub. but the scrub could be delayed if system | |
445 | /// load is too high or it fails to fall in the scrub hours | |
446 | utime_t sched_time; | |
447 | /// the hard upper bound of scrub time | |
448 | utime_t deadline; | |
449 | ScrubJob() : cct(nullptr) {} | |
450 | explicit ScrubJob(CephContext* cct, const spg_t& pg, | |
451 | const utime_t& timestamp, | |
452 | double pool_scrub_min_interval = 0, | |
453 | double pool_scrub_max_interval = 0, bool must = true); | |
454 | /// order the jobs by sched_time | |
455 | bool operator<(const ScrubJob& rhs) const; | |
456 | }; | |
457 | set<ScrubJob> sched_scrub_pg; | |
458 | ||
459 | /// @returns the scrub_reg_stamp used for unregister the scrub job | |
460 | utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval, | |
461 | double pool_scrub_max_interval, bool must) { | |
462 | ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval, | |
463 | must); | |
11fdf7f2 | 464 | std::lock_guard l(sched_scrub_lock); |
7c673cae FG |
465 | sched_scrub_pg.insert(scrub); |
466 | return scrub.sched_time; | |
467 | } | |
468 | void unreg_pg_scrub(spg_t pgid, utime_t t) { | |
11fdf7f2 | 469 | std::lock_guard l(sched_scrub_lock); |
7c673cae | 470 | size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t)); |
11fdf7f2 | 471 | ceph_assert(removed); |
7c673cae FG |
472 | } |
473 | bool first_scrub_stamp(ScrubJob *out) { | |
11fdf7f2 | 474 | std::lock_guard l(sched_scrub_lock); |
7c673cae FG |
475 | if (sched_scrub_pg.empty()) |
476 | return false; | |
477 | set<ScrubJob>::iterator iter = sched_scrub_pg.begin(); | |
478 | *out = *iter; | |
479 | return true; | |
480 | } | |
481 | bool next_scrub_stamp(const ScrubJob& next, | |
482 | ScrubJob *out) { | |
11fdf7f2 | 483 | std::lock_guard l(sched_scrub_lock); |
7c673cae FG |
484 | if (sched_scrub_pg.empty()) |
485 | return false; | |
486 | set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next); | |
487 | if (iter == sched_scrub_pg.cend()) | |
488 | return false; | |
489 | ++iter; | |
490 | if (iter == sched_scrub_pg.cend()) | |
491 | return false; | |
492 | *out = *iter; | |
493 | return true; | |
494 | } | |
495 | ||
496 | void dumps_scrub(Formatter *f) { | |
11fdf7f2 TL |
497 | ceph_assert(f != nullptr); |
498 | std::lock_guard l(sched_scrub_lock); | |
7c673cae FG |
499 | |
500 | f->open_array_section("scrubs"); | |
501 | for (const auto &i: sched_scrub_pg) { | |
502 | f->open_object_section("scrub"); | |
503 | f->dump_stream("pgid") << i.pgid; | |
504 | f->dump_stream("sched_time") << i.sched_time; | |
505 | f->dump_stream("deadline") << i.deadline; | |
494da23a | 506 | f->dump_bool("forced", i.sched_time == PG::Scrubber::scrub_must_stamp()); |
7c673cae FG |
507 | f->close_section(); |
508 | } | |
509 | f->close_section(); | |
510 | } | |
511 | ||
eafe8130 TL |
512 | bool can_inc_scrubs(); |
513 | bool inc_scrubs_local(); | |
514 | void dec_scrubs_local(); | |
515 | bool inc_scrubs_remote(); | |
516 | void dec_scrubs_remote(); | |
517 | void dump_scrub_reservations(Formatter *f); | |
7c673cae FG |
518 | |
519 | void reply_op_error(OpRequestRef op, int err); | |
520 | void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv); | |
521 | void handle_misdirected_op(PG *pg, OpRequestRef op); | |
522 | ||
523 | ||
524 | private: | |
525 | // -- agent shared state -- | |
526 | Mutex agent_lock; | |
527 | Cond agent_cond; | |
528 | map<uint64_t, set<PGRef> > agent_queue; | |
529 | set<PGRef>::iterator agent_queue_pos; | |
530 | bool agent_valid_iterator; | |
531 | int agent_ops; | |
532 | int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed | |
533 | set<hobject_t> agent_oids; | |
534 | bool agent_active; | |
535 | struct AgentThread : public Thread { | |
536 | OSDService *osd; | |
537 | explicit AgentThread(OSDService *o) : osd(o) {} | |
538 | void *entry() override { | |
539 | osd->agent_entry(); | |
540 | return NULL; | |
541 | } | |
542 | } agent_thread; | |
543 | bool agent_stop_flag; | |
544 | Mutex agent_timer_lock; | |
545 | SafeTimer agent_timer; | |
546 | ||
547 | public: | |
548 | void agent_entry(); | |
549 | void agent_stop(); | |
550 | ||
551 | void _enqueue(PG *pg, uint64_t priority) { | |
552 | if (!agent_queue.empty() && | |
553 | agent_queue.rbegin()->first < priority) | |
554 | agent_valid_iterator = false; // inserting higher-priority queue | |
555 | set<PGRef>& nq = agent_queue[priority]; | |
556 | if (nq.empty()) | |
557 | agent_cond.Signal(); | |
558 | nq.insert(pg); | |
559 | } | |
560 | ||
561 | void _dequeue(PG *pg, uint64_t old_priority) { | |
562 | set<PGRef>& oq = agent_queue[old_priority]; | |
563 | set<PGRef>::iterator p = oq.find(pg); | |
11fdf7f2 | 564 | ceph_assert(p != oq.end()); |
7c673cae FG |
565 | if (p == agent_queue_pos) |
566 | ++agent_queue_pos; | |
567 | oq.erase(p); | |
568 | if (oq.empty()) { | |
569 | if (agent_queue.rbegin()->first == old_priority) | |
570 | agent_valid_iterator = false; | |
571 | agent_queue.erase(old_priority); | |
572 | } | |
573 | } | |
574 | ||
575 | /// enable agent for a pg | |
576 | void agent_enable_pg(PG *pg, uint64_t priority) { | |
11fdf7f2 | 577 | std::lock_guard l(agent_lock); |
7c673cae FG |
578 | _enqueue(pg, priority); |
579 | } | |
580 | ||
581 | /// adjust priority for an enagled pg | |
582 | void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) { | |
11fdf7f2 TL |
583 | std::lock_guard l(agent_lock); |
584 | ceph_assert(new_priority != old_priority); | |
7c673cae FG |
585 | _enqueue(pg, new_priority); |
586 | _dequeue(pg, old_priority); | |
587 | } | |
588 | ||
589 | /// disable agent for a pg | |
590 | void agent_disable_pg(PG *pg, uint64_t old_priority) { | |
11fdf7f2 | 591 | std::lock_guard l(agent_lock); |
7c673cae FG |
592 | _dequeue(pg, old_priority); |
593 | } | |
594 | ||
595 | /// note start of an async (evict) op | |
596 | void agent_start_evict_op() { | |
11fdf7f2 | 597 | std::lock_guard l(agent_lock); |
7c673cae FG |
598 | ++agent_ops; |
599 | } | |
600 | ||
601 | /// note finish or cancellation of an async (evict) op | |
602 | void agent_finish_evict_op() { | |
11fdf7f2 TL |
603 | std::lock_guard l(agent_lock); |
604 | ceph_assert(agent_ops > 0); | |
7c673cae FG |
605 | --agent_ops; |
606 | agent_cond.Signal(); | |
607 | } | |
608 | ||
609 | /// note start of an async (flush) op | |
610 | void agent_start_op(const hobject_t& oid) { | |
11fdf7f2 | 611 | std::lock_guard l(agent_lock); |
7c673cae | 612 | ++agent_ops; |
11fdf7f2 | 613 | ceph_assert(agent_oids.count(oid) == 0); |
7c673cae FG |
614 | agent_oids.insert(oid); |
615 | } | |
616 | ||
617 | /// note finish or cancellation of an async (flush) op | |
618 | void agent_finish_op(const hobject_t& oid) { | |
11fdf7f2 TL |
619 | std::lock_guard l(agent_lock); |
620 | ceph_assert(agent_ops > 0); | |
7c673cae | 621 | --agent_ops; |
11fdf7f2 | 622 | ceph_assert(agent_oids.count(oid) == 1); |
7c673cae FG |
623 | agent_oids.erase(oid); |
624 | agent_cond.Signal(); | |
625 | } | |
626 | ||
627 | /// check if we are operating on an object | |
628 | bool agent_is_active_oid(const hobject_t& oid) { | |
11fdf7f2 | 629 | std::lock_guard l(agent_lock); |
7c673cae FG |
630 | return agent_oids.count(oid); |
631 | } | |
632 | ||
633 | /// get count of active agent ops | |
634 | int agent_get_num_ops() { | |
11fdf7f2 | 635 | std::lock_guard l(agent_lock); |
7c673cae FG |
636 | return agent_ops; |
637 | } | |
638 | ||
639 | void agent_inc_high_count() { | |
11fdf7f2 | 640 | std::lock_guard l(agent_lock); |
7c673cae FG |
641 | flush_mode_high_count ++; |
642 | } | |
643 | ||
644 | void agent_dec_high_count() { | |
11fdf7f2 | 645 | std::lock_guard l(agent_lock); |
7c673cae FG |
646 | flush_mode_high_count --; |
647 | } | |
648 | ||
649 | private: | |
650 | /// throttle promotion attempts | |
11fdf7f2 | 651 | std::atomic<unsigned int> promote_probability_millis{1000}; ///< probability thousands. one word. |
7c673cae FG |
652 | PromoteCounter promote_counter; |
653 | utime_t last_recalibrate; | |
654 | unsigned long promote_max_objects, promote_max_bytes; | |
655 | ||
656 | public: | |
657 | bool promote_throttle() { | |
658 | // NOTE: lockless! we rely on the probability being a single word. | |
659 | promote_counter.attempt(); | |
660 | if ((unsigned)rand() % 1000 > promote_probability_millis) | |
661 | return true; // yes throttle (no promote) | |
662 | if (promote_max_objects && | |
663 | promote_counter.objects > promote_max_objects) | |
664 | return true; // yes throttle | |
665 | if (promote_max_bytes && | |
666 | promote_counter.bytes > promote_max_bytes) | |
667 | return true; // yes throttle | |
668 | return false; // no throttle (promote) | |
669 | } | |
670 | void promote_finish(uint64_t bytes) { | |
671 | promote_counter.finish(bytes); | |
672 | } | |
673 | void promote_throttle_recalibrate(); | |
674 | ||
675 | // -- Objecter, for tiering reads/writes from/to other OSDs -- | |
676 | Objecter *objecter; | |
11fdf7f2 TL |
677 | int m_objecter_finishers; |
678 | vector<Finisher*> objecter_finishers; | |
7c673cae FG |
679 | |
680 | // -- Watch -- | |
681 | Mutex watch_lock; | |
682 | SafeTimer watch_timer; | |
683 | uint64_t next_notif_id; | |
684 | uint64_t get_next_id(epoch_t cur_epoch) { | |
11fdf7f2 | 685 | std::lock_guard l(watch_lock); |
7c673cae FG |
686 | return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++)); |
687 | } | |
688 | ||
689 | // -- Recovery/Backfill Request Scheduling -- | |
690 | Mutex recovery_request_lock; | |
691 | SafeTimer recovery_request_timer; | |
692 | ||
31f18b77 FG |
693 | // For async recovery sleep |
694 | bool recovery_needs_sleep = true; | |
695 | utime_t recovery_schedule_time = utime_t(); | |
696 | ||
11fdf7f2 TL |
697 | // For recovery & scrub & snap |
698 | Mutex sleep_lock; | |
699 | SafeTimer sleep_timer; | |
31f18b77 | 700 | |
7c673cae FG |
701 | // -- tids -- |
702 | // for ops i issue | |
11fdf7f2 | 703 | std::atomic<unsigned int> last_tid{0}; |
7c673cae FG |
704 | ceph_tid_t get_tid() { |
705 | return (ceph_tid_t)last_tid++; | |
706 | } | |
707 | ||
708 | // -- backfill_reservation -- | |
709 | Finisher reserver_finisher; | |
710 | AsyncReserver<spg_t> local_reserver; | |
711 | AsyncReserver<spg_t> remote_reserver; | |
712 | ||
11fdf7f2 TL |
713 | // -- pg merge -- |
714 | Mutex merge_lock = {"OSD::merge_lock"}; | |
715 | map<pg_t,eversion_t> ready_to_merge_source; // pg -> version | |
716 | map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target; // pg -> (version,les,lec) | |
717 | set<pg_t> not_ready_to_merge_source; | |
718 | map<pg_t,pg_t> not_ready_to_merge_target; | |
719 | set<pg_t> sent_ready_to_merge_source; | |
720 | ||
721 | void set_ready_to_merge_source(PG *pg, | |
722 | eversion_t version); | |
723 | void set_ready_to_merge_target(PG *pg, | |
724 | eversion_t version, | |
725 | epoch_t last_epoch_started, | |
726 | epoch_t last_epoch_clean); | |
727 | void set_not_ready_to_merge_source(pg_t source); | |
728 | void set_not_ready_to_merge_target(pg_t target, pg_t source); | |
729 | void clear_ready_to_merge(PG *pg); | |
730 | void send_ready_to_merge(); | |
731 | void _send_ready_to_merge(); | |
732 | void clear_sent_ready_to_merge(); | |
733 | void prune_sent_ready_to_merge(OSDMapRef& osdmap); | |
734 | ||
7c673cae FG |
735 | // -- pg_temp -- |
736 | private: | |
737 | Mutex pg_temp_lock; | |
94b18763 | 738 | struct pg_temp_t { |
94b18763 FG |
739 | vector<int> acting; |
740 | bool forced = false; | |
741 | }; | |
742 | map<pg_t, pg_temp_t> pg_temp_wanted; | |
743 | map<pg_t, pg_temp_t> pg_temp_pending; | |
7c673cae | 744 | void _sent_pg_temp(); |
94b18763 | 745 | friend std::ostream& operator<<(std::ostream&, const pg_temp_t&); |
7c673cae | 746 | public: |
94b18763 FG |
747 | void queue_want_pg_temp(pg_t pgid, const vector<int>& want, |
748 | bool forced = false); | |
7c673cae FG |
749 | void remove_want_pg_temp(pg_t pgid); |
750 | void requeue_pg_temp(); | |
751 | void send_pg_temp(); | |
752 | ||
11fdf7f2 TL |
753 | ceph::mutex pg_created_lock = ceph::make_mutex("OSDService::pg_created_lock"); |
754 | set<pg_t> pg_created; | |
7c673cae | 755 | void send_pg_created(pg_t pgid); |
11fdf7f2 TL |
756 | void prune_pg_created(); |
757 | void send_pg_created(); | |
31f18b77 | 758 | |
7c673cae | 759 | AsyncReserver<spg_t> snap_reserver; |
11fdf7f2 | 760 | void queue_recovery_context(PG *pg, GenContext<ThreadPool::TPHandle&> *c); |
7c673cae | 761 | void queue_for_snap_trim(PG *pg); |
11fdf7f2 TL |
762 | void queue_for_scrub(PG *pg, bool with_high_priority); |
763 | void queue_for_pg_delete(spg_t pgid, epoch_t e); | |
764 | bool try_finish_pg_delete(PG *pg, unsigned old_pg_num); | |
7c673cae FG |
765 | |
766 | private: | |
767 | // -- pg recovery and associated throttling -- | |
768 | Mutex recovery_lock; | |
769 | list<pair<epoch_t, PGRef> > awaiting_throttle; | |
770 | ||
771 | utime_t defer_recovery_until; | |
772 | uint64_t recovery_ops_active; | |
773 | uint64_t recovery_ops_reserved; | |
774 | bool recovery_paused; | |
775 | #ifdef DEBUG_RECOVERY_OIDS | |
776 | map<spg_t, set<hobject_t> > recovery_oids; | |
777 | #endif | |
778 | bool _recover_now(uint64_t *available_pushes); | |
779 | void _maybe_queue_recovery(); | |
780 | void _queue_for_recovery( | |
11fdf7f2 | 781 | pair<epoch_t, PGRef> p, uint64_t reserved_pushes); |
7c673cae FG |
782 | public: |
783 | void start_recovery_op(PG *pg, const hobject_t& soid); | |
784 | void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue); | |
785 | bool is_recovery_active(); | |
11fdf7f2 | 786 | void release_reserved_pushes(uint64_t pushes); |
7c673cae FG |
787 | void defer_recovery(float defer_for) { |
788 | defer_recovery_until = ceph_clock_now(); | |
789 | defer_recovery_until += defer_for; | |
790 | } | |
791 | void pause_recovery() { | |
11fdf7f2 | 792 | std::lock_guard l(recovery_lock); |
7c673cae FG |
793 | recovery_paused = true; |
794 | } | |
795 | bool recovery_is_paused() { | |
11fdf7f2 | 796 | std::lock_guard l(recovery_lock); |
7c673cae FG |
797 | return recovery_paused; |
798 | } | |
799 | void unpause_recovery() { | |
11fdf7f2 | 800 | std::lock_guard l(recovery_lock); |
7c673cae FG |
801 | recovery_paused = false; |
802 | _maybe_queue_recovery(); | |
803 | } | |
804 | void kick_recovery_queue() { | |
11fdf7f2 | 805 | std::lock_guard l(recovery_lock); |
7c673cae FG |
806 | _maybe_queue_recovery(); |
807 | } | |
808 | void clear_queued_recovery(PG *pg) { | |
11fdf7f2 TL |
809 | std::lock_guard l(recovery_lock); |
810 | awaiting_throttle.remove_if( | |
811 | [pg](decltype(awaiting_throttle)::const_reference awaiting ) { | |
812 | return awaiting.second.get() == pg; | |
813 | }); | |
7c673cae FG |
814 | } |
815 | // delayed pg activation | |
c07f9fc5 | 816 | void queue_for_recovery(PG *pg) { |
11fdf7f2 | 817 | std::lock_guard l(recovery_lock); |
c07f9fc5 | 818 | |
11fdf7f2 | 819 | if (pg->is_forced_recovery_or_backfill()) { |
7c673cae FG |
820 | awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg)); |
821 | } else { | |
822 | awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg)); | |
823 | } | |
824 | _maybe_queue_recovery(); | |
825 | } | |
31f18b77 | 826 | void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) { |
11fdf7f2 | 827 | std::lock_guard l(recovery_lock); |
31f18b77 FG |
828 | _queue_for_recovery(make_pair(queued, pg), reserved_pushes); |
829 | } | |
7c673cae | 830 | |
7c673cae FG |
831 | // osd map cache (past osd maps) |
832 | Mutex map_cache_lock; | |
833 | SharedLRU<epoch_t, const OSDMap> map_cache; | |
834 | SimpleLRU<epoch_t, bufferlist> map_bl_cache; | |
835 | SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache; | |
836 | ||
11fdf7f2 TL |
837 | /// final pg_num values for recently deleted pools |
838 | map<int64_t,int> deleted_pool_pg_nums; | |
839 | ||
7c673cae FG |
840 | OSDMapRef try_get_map(epoch_t e); |
841 | OSDMapRef get_map(epoch_t e) { | |
842 | OSDMapRef ret(try_get_map(e)); | |
11fdf7f2 | 843 | ceph_assert(ret); |
7c673cae FG |
844 | return ret; |
845 | } | |
846 | OSDMapRef add_map(OSDMap *o) { | |
11fdf7f2 | 847 | std::lock_guard l(map_cache_lock); |
7c673cae FG |
848 | return _add_map(o); |
849 | } | |
850 | OSDMapRef _add_map(OSDMap *o); | |
851 | ||
852 | void add_map_bl(epoch_t e, bufferlist& bl) { | |
11fdf7f2 | 853 | std::lock_guard l(map_cache_lock); |
7c673cae FG |
854 | return _add_map_bl(e, bl); |
855 | } | |
7c673cae FG |
856 | void _add_map_bl(epoch_t e, bufferlist& bl); |
857 | bool get_map_bl(epoch_t e, bufferlist& bl) { | |
11fdf7f2 | 858 | std::lock_guard l(map_cache_lock); |
7c673cae FG |
859 | return _get_map_bl(e, bl); |
860 | } | |
861 | bool _get_map_bl(epoch_t e, bufferlist& bl); | |
862 | ||
863 | void add_map_inc_bl(epoch_t e, bufferlist& bl) { | |
11fdf7f2 | 864 | std::lock_guard l(map_cache_lock); |
7c673cae FG |
865 | return _add_map_inc_bl(e, bl); |
866 | } | |
7c673cae FG |
867 | void _add_map_inc_bl(epoch_t e, bufferlist& bl); |
868 | bool get_inc_map_bl(epoch_t e, bufferlist& bl); | |
869 | ||
11fdf7f2 TL |
870 | /// get last pg_num before a pool was deleted (if any) |
871 | int get_deleted_pool_pg_num(int64_t pool); | |
7c673cae | 872 | |
11fdf7f2 TL |
873 | void store_deleted_pool_pg_num(int64_t pool, int pg_num) { |
874 | std::lock_guard l(map_cache_lock); | |
875 | deleted_pool_pg_nums[pool] = pg_num; | |
876 | } | |
7c673cae | 877 | |
11fdf7f2 TL |
878 | /// get pgnum from newmap or, if pool was deleted, last map pool existed in |
879 | int get_possibly_deleted_pool_pg_num(OSDMapRef newmap, | |
880 | int64_t pool) { | |
881 | if (newmap->have_pg_pool(pool)) { | |
882 | return newmap->get_pg_num(pool); | |
883 | } | |
884 | return get_deleted_pool_pg_num(pool); | |
885 | } | |
886 | ||
887 | /// identify split child pgids over a osdmap interval | |
888 | void identify_splits_and_merges( | |
889 | OSDMapRef old_map, | |
890 | OSDMapRef new_map, | |
891 | spg_t pgid, | |
892 | set<pair<spg_t,epoch_t>> *new_children, | |
893 | set<pair<spg_t,epoch_t>> *merge_pgs); | |
894 | ||
895 | void need_heartbeat_peer_update(); | |
7c673cae FG |
896 | |
897 | void init(); | |
898 | void final_init(); | |
899 | void start_shutdown(); | |
31f18b77 | 900 | void shutdown_reserver(); |
7c673cae FG |
901 | void shutdown(); |
902 | ||
7c673cae FG |
903 | // -- stats -- |
904 | Mutex stat_lock; | |
905 | osd_stat_t osd_stat; | |
31f18b77 | 906 | uint32_t seq = 0; |
7c673cae | 907 | |
11fdf7f2 TL |
908 | void set_statfs(const struct store_statfs_t &stbuf, |
909 | osd_alert_list_t& alerts); | |
910 | osd_stat_t set_osd_stat(vector<int>& hb_peers, int num_pgs); | |
911 | void inc_osd_stat_repaired(void); | |
912 | float compute_adjusted_ratio(osd_stat_t new_stat, float *pratio, uint64_t adjust_used = 0); | |
7c673cae | 913 | osd_stat_t get_osd_stat() { |
11fdf7f2 | 914 | std::lock_guard l(stat_lock); |
31f18b77 FG |
915 | ++seq; |
916 | osd_stat.up_from = up_epoch; | |
917 | osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq; | |
7c673cae FG |
918 | return osd_stat; |
919 | } | |
31f18b77 | 920 | uint64_t get_osd_stat_seq() { |
11fdf7f2 | 921 | std::lock_guard l(stat_lock); |
31f18b77 FG |
922 | return osd_stat.seq; |
923 | } | |
eafe8130 TL |
924 | void get_hb_pingtime(map<int, osd_stat_t::Interfaces> *pp) |
925 | { | |
926 | std::lock_guard l(stat_lock); | |
927 | *pp = osd_stat.hb_pingtime; | |
928 | return; | |
929 | } | |
7c673cae FG |
930 | |
931 | // -- OSD Full Status -- | |
932 | private: | |
933 | friend TestOpsSocketHook; | |
934 | mutable Mutex full_status_lock; | |
935 | enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending | |
936 | const char *get_full_state_name(s_names s) const { | |
937 | switch (s) { | |
938 | case NONE: return "none"; | |
939 | case NEARFULL: return "nearfull"; | |
940 | case BACKFILLFULL: return "backfillfull"; | |
941 | case FULL: return "full"; | |
942 | case FAILSAFE: return "failsafe"; | |
943 | default: return "???"; | |
944 | } | |
945 | } | |
946 | s_names get_full_state(string type) const { | |
947 | if (type == "none") | |
948 | return NONE; | |
949 | else if (type == "failsafe") | |
950 | return FAILSAFE; | |
951 | else if (type == "full") | |
952 | return FULL; | |
953 | else if (type == "backfillfull") | |
954 | return BACKFILLFULL; | |
955 | else if (type == "nearfull") | |
956 | return NEARFULL; | |
957 | else | |
958 | return INVALID; | |
959 | } | |
11fdf7f2 | 960 | double cur_ratio, physical_ratio; ///< current utilization |
7c673cae FG |
961 | mutable int64_t injectfull = 0; |
962 | s_names injectfull_state = NONE; | |
963 | float get_failsafe_full_ratio(); | |
11fdf7f2 TL |
964 | bool _check_inject_full(DoutPrefixProvider *dpp, s_names type) const; |
965 | bool _check_full(DoutPrefixProvider *dpp, s_names type) const; | |
7c673cae | 966 | public: |
11fdf7f2 TL |
967 | void check_full_status(float ratio, float pratio); |
968 | s_names recalc_full_state(float ratio, float pratio, string &inject); | |
969 | bool _tentative_full(DoutPrefixProvider *dpp, s_names type, uint64_t adjust_used, osd_stat_t); | |
970 | bool check_failsafe_full(DoutPrefixProvider *dpp) const; | |
971 | bool check_full(DoutPrefixProvider *dpp) const; | |
972 | bool tentative_backfill_full(DoutPrefixProvider *dpp, uint64_t adjust_used, osd_stat_t); | |
973 | bool check_backfill_full(DoutPrefixProvider *dpp) const; | |
974 | bool check_nearfull(DoutPrefixProvider *dpp) const; | |
7c673cae FG |
975 | bool is_failsafe_full() const; |
976 | bool is_full() const; | |
977 | bool is_backfillfull() const; | |
978 | bool is_nearfull() const; | |
979 | bool need_fullness_update(); ///< osdmap state needs update | |
980 | void set_injectfull(s_names type, int64_t count); | |
981 | bool check_osdmap_full(const set<pg_shard_t> &missing_on); | |
982 | ||
983 | ||
984 | // -- epochs -- | |
985 | private: | |
986 | mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch | |
987 | epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started) | |
988 | epoch_t up_epoch; // _most_recent_ epoch we were marked up | |
989 | epoch_t bind_epoch; // epoch we last did a bind to new ip:ports | |
990 | public: | |
991 | /** | |
992 | * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params | |
993 | * can be NULL if you don't care about them. | |
994 | */ | |
995 | void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch, | |
996 | epoch_t *_bind_epoch) const; | |
997 | /** | |
998 | * Set the boot, up, and bind epochs. Any NULL params will not be set. | |
999 | */ | |
1000 | void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch, | |
1001 | const epoch_t *_bind_epoch); | |
1002 | epoch_t get_boot_epoch() const { | |
1003 | epoch_t ret; | |
1004 | retrieve_epochs(&ret, NULL, NULL); | |
1005 | return ret; | |
1006 | } | |
1007 | epoch_t get_up_epoch() const { | |
1008 | epoch_t ret; | |
1009 | retrieve_epochs(NULL, &ret, NULL); | |
1010 | return ret; | |
1011 | } | |
1012 | epoch_t get_bind_epoch() const { | |
1013 | epoch_t ret; | |
1014 | retrieve_epochs(NULL, NULL, &ret); | |
1015 | return ret; | |
1016 | } | |
1017 | ||
181888fb FG |
1018 | void request_osdmap_update(epoch_t e); |
1019 | ||
7c673cae FG |
1020 | // -- stopping -- |
1021 | Mutex is_stopping_lock; | |
1022 | Cond is_stopping_cond; | |
1023 | enum { | |
1024 | NOT_STOPPING, | |
1025 | PREPARING_TO_STOP, | |
1026 | STOPPING }; | |
11fdf7f2 TL |
1027 | std::atomic<int> state{NOT_STOPPING}; |
1028 | int get_state() const { | |
7c673cae FG |
1029 | return state; |
1030 | } | |
1031 | void set_state(int s) { | |
1032 | state = s; | |
1033 | } | |
1034 | bool is_stopping() const { | |
1035 | return state == STOPPING; | |
1036 | } | |
1037 | bool is_preparing_to_stop() const { | |
1038 | return state == PREPARING_TO_STOP; | |
1039 | } | |
1040 | bool prepare_to_stop(); | |
1041 | void got_stop_ack(); | |
1042 | ||
1043 | ||
1044 | #ifdef PG_DEBUG_REFS | |
1045 | Mutex pgid_lock; | |
1046 | map<spg_t, int> pgid_tracker; | |
1047 | map<spg_t, PG*> live_pgs; | |
31f18b77 FG |
1048 | void add_pgid(spg_t pgid, PG *pg); |
1049 | void remove_pgid(spg_t pgid, PG *pg); | |
1050 | void dump_live_pgids(); | |
7c673cae FG |
1051 | #endif |
1052 | ||
1053 | explicit OSDService(OSD *osd); | |
1054 | ~OSDService(); | |
1055 | }; | |
1056 | ||
11fdf7f2 TL |
1057 | |
1058 | enum class io_queue { | |
1059 | prioritized, | |
1060 | weightedpriority, | |
1061 | mclock_opclass, | |
1062 | mclock_client, | |
1063 | }; | |
1064 | ||
1065 | ||
1066 | /* | |
1067 | ||
1068 | Each PG slot includes queues for events that are processing and/or waiting | |
1069 | for a PG to be materialized in the slot. | |
1070 | ||
1071 | These are the constraints: | |
1072 | ||
1073 | - client ops must remained ordered by client, regardless of map epoch | |
1074 | - peering messages/events from peers must remain ordered by peer | |
1075 | - peering messages and client ops need not be ordered relative to each other | |
1076 | ||
1077 | - some peering events can create a pg (e.g., notify) | |
1078 | - the query peering event can proceed when a PG doesn't exist | |
1079 | ||
1080 | Implementation notes: | |
1081 | ||
1082 | - everybody waits for split. If the OSD has the parent PG it will instantiate | |
1083 | the PGSlot early and mark it waiting_for_split. Everything will wait until | |
1084 | the parent is able to commit the split operation and the child PG's are | |
1085 | materialized in the child slots. | |
1086 | ||
1087 | - every event has an epoch property and will wait for the OSDShard to catch | |
1088 | up to that epoch. For example, if we get a peering event from a future | |
1089 | epoch, the event will wait in the slot until the local OSD has caught up. | |
1090 | (We should be judicious in specifying the required epoch [by, e.g., setting | |
1091 | it to the same_interval_since epoch] so that we don't wait for epochs that | |
1092 | don't affect the given PG.) | |
1093 | ||
1094 | - we maintain two separate wait lists, *waiting* and *waiting_peering*. The | |
1095 | OpQueueItem has an is_peering() bool to determine which we use. Waiting | |
1096 | peering events are queued up by epoch required. | |
1097 | ||
1098 | - when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or | |
1099 | materialized the PG), we wake *all* waiting items. (This could be optimized, | |
1100 | probably, but we don't bother.) We always requeue peering items ahead of | |
1101 | client ops. | |
1102 | ||
1103 | - some peering events are marked !peering_requires_pg (PGQuery). if we do | |
1104 | not have a PG these are processed immediately (under the shard lock). | |
1105 | ||
1106 | - we do not have a PG present, we check if the slot maps to the current host. | |
1107 | if so, we either queue the item and wait for the PG to materialize, or | |
1108 | (if the event is a pg creating event like PGNotify), we materialize the PG. | |
1109 | ||
1110 | - when we advance the osdmap on the OSDShard, we scan pg slots and | |
1111 | discard any slots with no pg (and not waiting_for_split) that no | |
1112 | longer map to the current host. | |
1113 | ||
1114 | */ | |
1115 | ||
1116 | struct OSDShardPGSlot { | |
1117 | PGRef pg; ///< pg reference | |
1118 | deque<OpQueueItem> to_process; ///< order items for this slot | |
1119 | int num_running = 0; ///< _process threads doing pg lookup/lock | |
1120 | ||
1121 | deque<OpQueueItem> waiting; ///< waiting for pg (or map + pg) | |
1122 | ||
1123 | /// waiting for map (peering evt) | |
1124 | map<epoch_t,deque<OpQueueItem>> waiting_peering; | |
1125 | ||
1126 | /// incremented by wake_pg_waiters; indicates racing _process threads | |
1127 | /// should bail out (their op has been requeued) | |
1128 | uint64_t requeue_seq = 0; | |
1129 | ||
1130 | /// waiting for split child to materialize in these epoch(s) | |
1131 | set<epoch_t> waiting_for_split; | |
1132 | ||
1133 | epoch_t epoch = 0; | |
1134 | boost::intrusive::set_member_hook<> pg_epoch_item; | |
1135 | ||
1136 | /// waiting for a merge (source or target) by this epoch | |
1137 | epoch_t waiting_for_merge_epoch = 0; | |
1138 | }; | |
1139 | ||
1140 | struct OSDShard { | |
1141 | const unsigned shard_id; | |
1142 | CephContext *cct; | |
1143 | OSD *osd; | |
1144 | ||
1145 | string shard_name; | |
1146 | ||
1147 | string sdata_wait_lock_name; | |
1148 | ceph::mutex sdata_wait_lock; | |
1149 | ceph::condition_variable sdata_cond; | |
1150 | ||
1151 | string osdmap_lock_name; | |
1152 | ceph::mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock | |
1153 | OSDMapRef shard_osdmap; | |
1154 | ||
1155 | OSDMapRef get_osdmap() { | |
1156 | std::lock_guard l(osdmap_lock); | |
1157 | return shard_osdmap; | |
1158 | } | |
1159 | ||
1160 | string shard_lock_name; | |
1161 | ceph::mutex shard_lock; ///< protects remaining members below | |
1162 | ||
1163 | /// map of slots for each spg_t. maintains ordering of items dequeued | |
1164 | /// from pqueue while _process thread drops shard lock to acquire the | |
1165 | /// pg lock. stale slots are removed by consume_map. | |
1166 | unordered_map<spg_t,unique_ptr<OSDShardPGSlot>> pg_slots; | |
1167 | ||
1168 | struct pg_slot_compare_by_epoch { | |
1169 | bool operator()(const OSDShardPGSlot& l, const OSDShardPGSlot& r) const { | |
1170 | return l.epoch < r.epoch; | |
1171 | } | |
1172 | }; | |
1173 | ||
1174 | /// maintain an ordering of pg slots by pg epoch | |
1175 | boost::intrusive::multiset< | |
1176 | OSDShardPGSlot, | |
1177 | boost::intrusive::member_hook< | |
1178 | OSDShardPGSlot, | |
1179 | boost::intrusive::set_member_hook<>, | |
1180 | &OSDShardPGSlot::pg_epoch_item>, | |
1181 | boost::intrusive::compare<pg_slot_compare_by_epoch>> pg_slots_by_epoch; | |
1182 | int waiting_for_min_pg_epoch = 0; | |
1183 | ceph::condition_variable min_pg_epoch_cond; | |
1184 | ||
1185 | /// priority queue | |
1186 | std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue; | |
1187 | ||
1188 | bool stop_waiting = false; | |
1189 | ||
1190 | ContextQueue context_queue; | |
1191 | ||
1192 | void _enqueue_front(OpQueueItem&& item, unsigned cutoff) { | |
1193 | unsigned priority = item.get_priority(); | |
1194 | unsigned cost = item.get_cost(); | |
1195 | if (priority >= cutoff) | |
1196 | pqueue->enqueue_strict_front( | |
1197 | item.get_owner(), | |
1198 | priority, std::move(item)); | |
1199 | else | |
1200 | pqueue->enqueue_front( | |
1201 | item.get_owner(), | |
1202 | priority, cost, std::move(item)); | |
1203 | } | |
1204 | ||
1205 | void _attach_pg(OSDShardPGSlot *slot, PG *pg); | |
1206 | void _detach_pg(OSDShardPGSlot *slot); | |
1207 | ||
1208 | void update_pg_epoch(OSDShardPGSlot *slot, epoch_t epoch); | |
1209 | epoch_t get_min_pg_epoch(); | |
1210 | void wait_min_pg_epoch(epoch_t need); | |
1211 | ||
1212 | /// return newest epoch we are waiting for | |
1213 | epoch_t get_max_waiting_epoch(); | |
1214 | ||
1215 | /// push osdmap into shard | |
1216 | void consume_map( | |
1217 | OSDMapRef& osdmap, | |
1218 | unsigned *pushes_to_free); | |
1219 | ||
1220 | void _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot); | |
1221 | ||
1222 | void identify_splits_and_merges( | |
1223 | const OSDMapRef& as_of_osdmap, | |
1224 | set<pair<spg_t,epoch_t>> *split_children, | |
1225 | set<pair<spg_t,epoch_t>> *merge_pgs); | |
1226 | void _prime_splits(set<pair<spg_t,epoch_t>> *pgids); | |
1227 | void prime_splits(const OSDMapRef& as_of_osdmap, | |
1228 | set<pair<spg_t,epoch_t>> *pgids); | |
1229 | void prime_merges(const OSDMapRef& as_of_osdmap, | |
1230 | set<pair<spg_t,epoch_t>> *merge_pgs); | |
1231 | void register_and_wake_split_child(PG *pg); | |
1232 | void unprime_split_children(spg_t parent, unsigned old_pg_num); | |
1233 | ||
1234 | OSDShard( | |
1235 | int id, | |
1236 | CephContext *cct, | |
1237 | OSD *osd, | |
1238 | uint64_t max_tok_per_prio, uint64_t min_cost, | |
1239 | io_queue opqueue) | |
1240 | : shard_id(id), | |
1241 | cct(cct), | |
1242 | osd(osd), | |
1243 | shard_name(string("OSDShard.") + stringify(id)), | |
1244 | sdata_wait_lock_name(shard_name + "::sdata_wait_lock"), | |
1245 | sdata_wait_lock{make_mutex(sdata_wait_lock_name)}, | |
1246 | osdmap_lock_name(shard_name + "::osdmap_lock"), | |
1247 | osdmap_lock{make_mutex(osdmap_lock_name)}, | |
1248 | shard_lock_name(shard_name + "::shard_lock"), | |
1249 | shard_lock{make_mutex(shard_lock_name)}, | |
1250 | context_queue(sdata_wait_lock, sdata_cond) { | |
1251 | if (opqueue == io_queue::weightedpriority) { | |
1252 | pqueue = std::make_unique< | |
1253 | WeightedPriorityQueue<OpQueueItem,uint64_t>>( | |
1254 | max_tok_per_prio, min_cost); | |
1255 | } else if (opqueue == io_queue::prioritized) { | |
1256 | pqueue = std::make_unique< | |
1257 | PrioritizedQueue<OpQueueItem,uint64_t>>( | |
1258 | max_tok_per_prio, min_cost); | |
1259 | } else if (opqueue == io_queue::mclock_opclass) { | |
1260 | pqueue = std::make_unique<ceph::mClockOpClassQueue>(cct); | |
1261 | } else if (opqueue == io_queue::mclock_client) { | |
1262 | pqueue = std::make_unique<ceph::mClockClientQueue>(cct); | |
1263 | } | |
1264 | } | |
1265 | }; | |
1266 | ||
7c673cae FG |
1267 | class OSD : public Dispatcher, |
1268 | public md_config_obs_t { | |
1269 | /** OSD **/ | |
11fdf7f2 | 1270 | Mutex osd_lock; // global lock |
7c673cae FG |
1271 | SafeTimer tick_timer; // safe timer (osd_lock) |
1272 | ||
1273 | // Tick timer for those stuff that do not need osd_lock | |
1274 | Mutex tick_timer_lock; | |
1275 | SafeTimer tick_timer_without_osd_lock; | |
11fdf7f2 TL |
1276 | std::string gss_ktfile_client{}; |
1277 | ||
7c673cae FG |
1278 | public: |
1279 | // config observer bits | |
1280 | const char** get_tracked_conf_keys() const override; | |
11fdf7f2 | 1281 | void handle_conf_change(const ConfigProxy& conf, |
7c673cae FG |
1282 | const std::set <std::string> &changed) override; |
1283 | void update_log_config(); | |
1284 | void check_config(); | |
1285 | ||
1286 | protected: | |
1287 | ||
91327a77 AA |
1288 | const double OSD_TICK_INTERVAL = { 1.0 }; |
1289 | double get_tick_interval() const; | |
7c673cae | 1290 | |
7c673cae FG |
1291 | Messenger *cluster_messenger; |
1292 | Messenger *client_messenger; | |
1293 | Messenger *objecter_messenger; | |
1294 | MonClient *monc; // check the "monc helpers" list before accessing directly | |
1295 | MgrClient mgrc; | |
1296 | PerfCounters *logger; | |
1297 | PerfCounters *recoverystate_perf; | |
1298 | ObjectStore *store; | |
1299 | #ifdef HAVE_LIBFUSE | |
1300 | FuseStore *fuse_store = nullptr; | |
1301 | #endif | |
1302 | LogClient log_client; | |
1303 | LogChannelRef clog; | |
1304 | ||
1305 | int whoami; | |
1306 | std::string dev_path, journal_path; | |
1307 | ||
11fdf7f2 TL |
1308 | int last_require_osd_release = 0; |
1309 | ||
1310 | int numa_node = -1; | |
1311 | size_t numa_cpu_set_size = 0; | |
1312 | cpu_set_t numa_cpu_set; | |
1313 | ||
31f18b77 | 1314 | bool store_is_rotational = true; |
d2e6a577 | 1315 | bool journal_is_rotational = true; |
31f18b77 | 1316 | |
7c673cae FG |
1317 | ZTracer::Endpoint trace_endpoint; |
1318 | void create_logger(); | |
1319 | void create_recoverystate_perf(); | |
1320 | void tick(); | |
1321 | void tick_without_osd_lock(); | |
1322 | void _dispatch(Message *m); | |
1323 | void dispatch_op(OpRequestRef op); | |
1324 | ||
11fdf7f2 | 1325 | void check_osdmap_features(); |
7c673cae FG |
1326 | |
1327 | // asok | |
1328 | friend class OSDSocketHook; | |
1329 | class OSDSocketHook *asok_hook; | |
11fdf7f2 TL |
1330 | bool asok_command(std::string_view admin_command, const cmdmap_t& cmdmap, |
1331 | std::string_view format, std::ostream& ss); | |
7c673cae FG |
1332 | |
1333 | public: | |
1334 | ClassHandler *class_handler = nullptr; | |
1335 | int get_nodeid() { return whoami; } | |
1336 | ||
1337 | static ghobject_t get_osdmap_pobject_name(epoch_t epoch) { | |
1338 | char foo[20]; | |
1339 | snprintf(foo, sizeof(foo), "osdmap.%d", epoch); | |
1340 | return ghobject_t(hobject_t(sobject_t(object_t(foo), 0))); | |
1341 | } | |
1342 | static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) { | |
1343 | char foo[22]; | |
1344 | snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch); | |
1345 | return ghobject_t(hobject_t(sobject_t(object_t(foo), 0))); | |
1346 | } | |
1347 | ||
1348 | static ghobject_t make_snapmapper_oid() { | |
1349 | return ghobject_t(hobject_t( | |
1350 | sobject_t( | |
1351 | object_t("snapmapper"), | |
1352 | 0))); | |
1353 | } | |
1354 | ||
1355 | static ghobject_t make_pg_log_oid(spg_t pg) { | |
1356 | stringstream ss; | |
1357 | ss << "pglog_" << pg; | |
1358 | string s; | |
1359 | getline(ss, s); | |
1360 | return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0))); | |
1361 | } | |
1362 | ||
1363 | static ghobject_t make_pg_biginfo_oid(spg_t pg) { | |
1364 | stringstream ss; | |
1365 | ss << "pginfo_" << pg; | |
1366 | string s; | |
1367 | getline(ss, s); | |
1368 | return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0))); | |
1369 | } | |
1370 | static ghobject_t make_infos_oid() { | |
1371 | hobject_t oid(sobject_t("infos", CEPH_NOSNAP)); | |
1372 | return ghobject_t(oid); | |
1373 | } | |
11fdf7f2 TL |
1374 | |
1375 | static ghobject_t make_final_pool_info_oid(int64_t pool) { | |
1376 | return ghobject_t( | |
1377 | hobject_t( | |
1378 | sobject_t( | |
1379 | object_t(string("final_pool_") + stringify(pool)), | |
1380 | CEPH_NOSNAP))); | |
1381 | } | |
1382 | ||
1383 | static ghobject_t make_pg_num_history_oid() { | |
1384 | return ghobject_t(hobject_t(sobject_t("pg_num_history", CEPH_NOSNAP))); | |
1385 | } | |
1386 | ||
7c673cae FG |
1387 | static void recursive_remove_collection(CephContext* cct, |
1388 | ObjectStore *store, | |
1389 | spg_t pgid, | |
1390 | coll_t tmp); | |
1391 | ||
1392 | /** | |
1393 | * get_osd_initial_compat_set() | |
1394 | * | |
1395 | * Get the initial feature set for this OSD. Features | |
1396 | * here are automatically upgraded. | |
1397 | * | |
1398 | * Return value: Initial osd CompatSet | |
1399 | */ | |
1400 | static CompatSet get_osd_initial_compat_set(); | |
1401 | ||
1402 | /** | |
1403 | * get_osd_compat_set() | |
1404 | * | |
1405 | * Get all features supported by this OSD | |
1406 | * | |
1407 | * Return value: CompatSet of all supported features | |
1408 | */ | |
1409 | static CompatSet get_osd_compat_set(); | |
1410 | ||
1411 | ||
1412 | private: | |
1413 | class C_Tick; | |
1414 | class C_Tick_WithoutOSDLock; | |
1415 | ||
11fdf7f2 TL |
1416 | // -- config settings -- |
1417 | float m_osd_pg_epoch_max_lag_factor; | |
1418 | ||
7c673cae FG |
1419 | // -- superblock -- |
1420 | OSDSuperblock superblock; | |
1421 | ||
1422 | void write_superblock(); | |
1423 | void write_superblock(ObjectStore::Transaction& t); | |
1424 | int read_superblock(); | |
1425 | ||
1426 | void clear_temp_objects(); | |
1427 | ||
1428 | CompatSet osd_compat; | |
1429 | ||
1430 | // -- state -- | |
1431 | public: | |
1432 | typedef enum { | |
1433 | STATE_INITIALIZING = 1, | |
1434 | STATE_PREBOOT, | |
1435 | STATE_BOOTING, | |
1436 | STATE_ACTIVE, | |
1437 | STATE_STOPPING, | |
1438 | STATE_WAITING_FOR_HEALTHY | |
1439 | } osd_state_t; | |
1440 | ||
1441 | static const char *get_state_name(int s) { | |
1442 | switch (s) { | |
1443 | case STATE_INITIALIZING: return "initializing"; | |
1444 | case STATE_PREBOOT: return "preboot"; | |
1445 | case STATE_BOOTING: return "booting"; | |
1446 | case STATE_ACTIVE: return "active"; | |
1447 | case STATE_STOPPING: return "stopping"; | |
1448 | case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy"; | |
1449 | default: return "???"; | |
1450 | } | |
1451 | } | |
1452 | ||
1453 | private: | |
11fdf7f2 | 1454 | std::atomic<int> state{STATE_INITIALIZING}; |
7c673cae FG |
1455 | |
1456 | public: | |
1457 | int get_state() const { | |
1458 | return state; | |
1459 | } | |
1460 | void set_state(int s) { | |
1461 | state = s; | |
1462 | } | |
1463 | bool is_initializing() const { | |
1464 | return state == STATE_INITIALIZING; | |
1465 | } | |
1466 | bool is_preboot() const { | |
1467 | return state == STATE_PREBOOT; | |
1468 | } | |
1469 | bool is_booting() const { | |
1470 | return state == STATE_BOOTING; | |
1471 | } | |
1472 | bool is_active() const { | |
1473 | return state == STATE_ACTIVE; | |
1474 | } | |
1475 | bool is_stopping() const { | |
1476 | return state == STATE_STOPPING; | |
1477 | } | |
1478 | bool is_waiting_for_healthy() const { | |
1479 | return state == STATE_WAITING_FOR_HEALTHY; | |
1480 | } | |
1481 | ||
1482 | private: | |
1483 | ||
7c673cae | 1484 | ShardedThreadPool osd_op_tp; |
7c673cae FG |
1485 | ThreadPool command_tp; |
1486 | ||
7c673cae FG |
1487 | void get_latest_osdmap(); |
1488 | ||
1489 | // -- sessions -- | |
1490 | private: | |
11fdf7f2 | 1491 | void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap); |
7c673cae FG |
1492 | void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap); |
1493 | ||
1494 | Mutex session_waiting_lock; | |
11fdf7f2 | 1495 | set<SessionRef> session_waiting_for_map; |
7c673cae FG |
1496 | |
1497 | /// Caller assumes refs for included Sessions | |
11fdf7f2 TL |
1498 | void get_sessions_waiting_for_map(set<SessionRef> *out) { |
1499 | std::lock_guard l(session_waiting_lock); | |
7c673cae FG |
1500 | out->swap(session_waiting_for_map); |
1501 | } | |
11fdf7f2 TL |
1502 | void register_session_waiting_on_map(SessionRef session) { |
1503 | std::lock_guard l(session_waiting_lock); | |
1504 | session_waiting_for_map.insert(session); | |
7c673cae | 1505 | } |
11fdf7f2 TL |
1506 | void clear_session_waiting_on_map(SessionRef session) { |
1507 | std::lock_guard l(session_waiting_lock); | |
1508 | session_waiting_for_map.erase(session); | |
7c673cae FG |
1509 | } |
1510 | void dispatch_sessions_waiting_on_map() { | |
11fdf7f2 | 1511 | set<SessionRef> sessions_to_check; |
7c673cae | 1512 | get_sessions_waiting_for_map(&sessions_to_check); |
11fdf7f2 | 1513 | for (auto i = sessions_to_check.begin(); |
7c673cae FG |
1514 | i != sessions_to_check.end(); |
1515 | sessions_to_check.erase(i++)) { | |
11fdf7f2 TL |
1516 | std::lock_guard l{(*i)->session_dispatch_lock}; |
1517 | SessionRef session = *i; | |
1518 | dispatch_session_waiting(session, osdmap); | |
7c673cae FG |
1519 | } |
1520 | } | |
11fdf7f2 TL |
1521 | void session_handle_reset(SessionRef session) { |
1522 | std::lock_guard l(session->session_dispatch_lock); | |
7c673cae FG |
1523 | clear_session_waiting_on_map(session); |
1524 | ||
1525 | session->clear_backoffs(); | |
1526 | ||
1527 | /* Messages have connection refs, we need to clear the | |
1528 | * connection->session->message->connection | |
1529 | * cycles which result. | |
1530 | * Bug #12338 | |
1531 | */ | |
1532 | session->waiting_on_map.clear_and_dispose(TrackedOp::Putter()); | |
1533 | } | |
1534 | ||
1535 | private: | |
1536 | /** | |
1537 | * @defgroup monc helpers | |
1538 | * @{ | |
1539 | * Right now we only have the one | |
1540 | */ | |
1541 | ||
1542 | /** | |
1543 | * Ask the Monitors for a sequence of OSDMaps. | |
1544 | * | |
1545 | * @param epoch The epoch to start with when replying | |
1546 | * @param force_request True if this request forces a new subscription to | |
1547 | * the monitors; false if an outstanding request that encompasses it is | |
1548 | * sufficient. | |
1549 | */ | |
1550 | void osdmap_subscribe(version_t epoch, bool force_request); | |
1551 | /** @} monc helpers */ | |
1552 | ||
181888fb FG |
1553 | Mutex osdmap_subscribe_lock; |
1554 | epoch_t latest_subscribed_epoch{0}; | |
1555 | ||
7c673cae FG |
1556 | // -- heartbeat -- |
1557 | /// information about a heartbeat peer | |
1558 | struct HeartbeatInfo { | |
1559 | int peer; ///< peer | |
1560 | ConnectionRef con_front; ///< peer connection (front) | |
1561 | ConnectionRef con_back; ///< peer connection (back) | |
1562 | utime_t first_tx; ///< time we sent our first ping request | |
1563 | utime_t last_tx; ///< last time we sent a ping request | |
1564 | utime_t last_rx_front; ///< last time we got a ping reply on the front side | |
1565 | utime_t last_rx_back; ///< last time we got a ping reply on the back side | |
1566 | epoch_t epoch; ///< most recent epoch we wanted this peer | |
11fdf7f2 TL |
1567 | /// number of connections we send and receive heartbeat pings/replies |
1568 | static constexpr int HEARTBEAT_MAX_CONN = 2; | |
1569 | /// history of inflight pings, arranging by timestamp we sent | |
1570 | /// send time -> deadline -> remaining replies | |
1571 | map<utime_t, pair<utime_t, int>> ping_history; | |
1572 | ||
eafe8130 TL |
1573 | utime_t hb_interval_start; |
1574 | uint32_t hb_average_count = 0; | |
1575 | uint32_t hb_index = 0; | |
1576 | ||
1577 | uint32_t hb_total_back = 0; | |
1578 | uint32_t hb_min_back = UINT_MAX; | |
1579 | uint32_t hb_max_back = 0; | |
1580 | vector<uint32_t> hb_back_pingtime; | |
1581 | vector<uint32_t> hb_back_min; | |
1582 | vector<uint32_t> hb_back_max; | |
1583 | ||
1584 | uint32_t hb_total_front = 0; | |
1585 | uint32_t hb_min_front = UINT_MAX; | |
1586 | uint32_t hb_max_front = 0; | |
1587 | vector<uint32_t> hb_front_pingtime; | |
1588 | vector<uint32_t> hb_front_min; | |
1589 | vector<uint32_t> hb_front_max; | |
1590 | ||
494da23a TL |
1591 | bool is_stale(utime_t stale) { |
1592 | if (ping_history.empty()) { | |
1593 | return false; | |
1594 | } | |
1595 | utime_t oldest_deadline = ping_history.begin()->second.first; | |
1596 | return oldest_deadline <= stale; | |
1597 | } | |
1598 | ||
11fdf7f2 TL |
1599 | bool is_unhealthy(utime_t now) { |
1600 | if (ping_history.empty()) { | |
1601 | /// we haven't sent a ping yet or we have got all replies, | |
1602 | /// in either way we are safe and healthy for now | |
1603 | return false; | |
1604 | } | |
7c673cae | 1605 | |
11fdf7f2 TL |
1606 | utime_t oldest_deadline = ping_history.begin()->second.first; |
1607 | return now > oldest_deadline; | |
7c673cae FG |
1608 | } |
1609 | ||
11fdf7f2 TL |
1610 | bool is_healthy(utime_t now) { |
1611 | if (last_rx_front == utime_t() || last_rx_back == utime_t()) { | |
1612 | // only declare to be healthy until we have received the first | |
1613 | // replies from both front/back connections | |
1614 | return false; | |
1615 | } | |
1616 | return !is_unhealthy(now); | |
1617 | } | |
7c673cae FG |
1618 | }; |
1619 | /// state attached to outgoing heartbeat connections | |
1620 | struct HeartbeatSession : public RefCountedObject { | |
1621 | int peer; | |
1622 | explicit HeartbeatSession(int p) : peer(p) {} | |
1623 | }; | |
1624 | Mutex heartbeat_lock; | |
1625 | map<int, int> debug_heartbeat_drops_remaining; | |
1626 | Cond heartbeat_cond; | |
1627 | bool heartbeat_stop; | |
11fdf7f2 | 1628 | std::atomic<bool> heartbeat_need_update; |
7c673cae FG |
1629 | map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo |
1630 | utime_t last_mon_heartbeat; | |
1631 | Messenger *hb_front_client_messenger; | |
1632 | Messenger *hb_back_client_messenger; | |
1633 | Messenger *hb_front_server_messenger; | |
1634 | Messenger *hb_back_server_messenger; | |
1635 | utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state | |
1636 | double daily_loadavg; | |
eafe8130 TL |
1637 | |
1638 | // Track ping repsonse times using vector as a circular buffer | |
1639 | // MUST BE A POWER OF 2 | |
1640 | const uint32_t hb_vector_size = 16; | |
1641 | ||
7c673cae FG |
1642 | void _add_heartbeat_peer(int p); |
1643 | void _remove_heartbeat_peer(int p); | |
1644 | bool heartbeat_reset(Connection *con); | |
1645 | void maybe_update_heartbeat_peers(); | |
494da23a | 1646 | void reset_heartbeat_peers(bool all); |
7c673cae FG |
1647 | bool heartbeat_peers_need_update() { |
1648 | return heartbeat_need_update.load(); | |
1649 | } | |
1650 | void heartbeat_set_peers_need_update() { | |
1651 | heartbeat_need_update.store(true); | |
1652 | } | |
1653 | void heartbeat_clear_peers_need_update() { | |
1654 | heartbeat_need_update.store(false); | |
1655 | } | |
1656 | void heartbeat(); | |
1657 | void heartbeat_check(); | |
1658 | void heartbeat_entry(); | |
1659 | void need_heartbeat_peer_update(); | |
1660 | ||
1661 | void heartbeat_kick() { | |
11fdf7f2 | 1662 | std::lock_guard l(heartbeat_lock); |
7c673cae FG |
1663 | heartbeat_cond.Signal(); |
1664 | } | |
1665 | ||
1666 | struct T_Heartbeat : public Thread { | |
1667 | OSD *osd; | |
1668 | explicit T_Heartbeat(OSD *o) : osd(o) {} | |
1669 | void *entry() override { | |
1670 | osd->heartbeat_entry(); | |
1671 | return 0; | |
1672 | } | |
1673 | } heartbeat_thread; | |
1674 | ||
1675 | public: | |
1676 | bool heartbeat_dispatch(Message *m); | |
1677 | ||
1678 | struct HeartbeatDispatcher : public Dispatcher { | |
1679 | OSD *osd; | |
1680 | explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {} | |
1681 | ||
1682 | bool ms_can_fast_dispatch_any() const override { return true; } | |
1683 | bool ms_can_fast_dispatch(const Message *m) const override { | |
1684 | switch (m->get_type()) { | |
11fdf7f2 TL |
1685 | case CEPH_MSG_PING: |
1686 | case MSG_OSD_PING: | |
1687 | return true; | |
1688 | default: | |
1689 | return false; | |
1690 | } | |
7c673cae FG |
1691 | } |
1692 | void ms_fast_dispatch(Message *m) override { | |
1693 | osd->heartbeat_dispatch(m); | |
1694 | } | |
1695 | bool ms_dispatch(Message *m) override { | |
1696 | return osd->heartbeat_dispatch(m); | |
1697 | } | |
1698 | bool ms_handle_reset(Connection *con) override { | |
1699 | return osd->heartbeat_reset(con); | |
1700 | } | |
1701 | void ms_handle_remote_reset(Connection *con) override {} | |
1702 | bool ms_handle_refused(Connection *con) override { | |
1703 | return osd->ms_handle_refused(con); | |
1704 | } | |
11fdf7f2 | 1705 | int ms_handle_authentication(Connection *con) override { |
7c673cae FG |
1706 | return true; |
1707 | } | |
11fdf7f2 TL |
1708 | bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer) override { |
1709 | // some pre-nautilus OSDs get confused if you include an | |
1710 | // authorizer but they are not expecting it. do not try to authorize | |
1711 | // heartbeat connections until all OSDs are nautilus. | |
1712 | if (osd->get_osdmap()->require_osd_release >= CEPH_RELEASE_NAUTILUS) { | |
1713 | return osd->ms_get_authorizer(dest_type, authorizer); | |
1714 | } | |
1715 | return false; | |
1716 | } | |
1717 | KeyStore *ms_get_auth1_authorizer_keystore() override { | |
1718 | return osd->ms_get_auth1_authorizer_keystore(); | |
1719 | } | |
7c673cae FG |
1720 | } heartbeat_dispatcher; |
1721 | ||
1722 | private: | |
1723 | // -- waiters -- | |
1724 | list<OpRequestRef> finished; | |
1725 | ||
1726 | void take_waiters(list<OpRequestRef>& ls) { | |
11fdf7f2 | 1727 | ceph_assert(osd_lock.is_locked()); |
7c673cae FG |
1728 | finished.splice(finished.end(), ls); |
1729 | } | |
1730 | void do_waiters(); | |
1731 | ||
1732 | // -- op tracking -- | |
1733 | OpTracker op_tracker; | |
7c673cae FG |
1734 | void test_ops(std::string command, std::string args, ostream& ss); |
1735 | friend class TestOpsSocketHook; | |
1736 | TestOpsSocketHook *test_ops_hook; | |
11fdf7f2 | 1737 | friend struct C_FinishSplits; |
7c673cae FG |
1738 | friend struct C_OpenPGs; |
1739 | ||
1740 | // -- op queue -- | |
11fdf7f2 | 1741 | friend std::ostream& operator<<(std::ostream& out, const io_queue& q); |
224ce89b | 1742 | |
7c673cae | 1743 | const io_queue op_queue; |
11fdf7f2 | 1744 | public: |
7c673cae | 1745 | const unsigned int op_prio_cutoff; |
11fdf7f2 | 1746 | protected: |
7c673cae FG |
1747 | |
1748 | /* | |
1749 | * The ordered op delivery chain is: | |
1750 | * | |
1751 | * fast dispatch -> pqueue back | |
1752 | * pqueue front <-> to_process back | |
1753 | * to_process front -> RunVis(item) | |
1754 | * <- queue_front() | |
1755 | * | |
1756 | * The pqueue is per-shard, and to_process is per pg_slot. Items can be | |
1757 | * pushed back up into to_process and/or pqueue while order is preserved. | |
1758 | * | |
1759 | * Multiple worker threads can operate on each shard. | |
1760 | * | |
11fdf7f2 | 1761 | * Under normal circumstances, num_running == to_process.size(). There are |
7c673cae FG |
1762 | * two times when that is not true: (1) when waiting_for_pg == true and |
1763 | * to_process is accumulating requests that are waiting for the pg to be | |
1764 | * instantiated; in that case they will all get requeued together by | |
1765 | * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg | |
1766 | * and already requeued the items. | |
1767 | */ | |
11fdf7f2 TL |
1768 | friend class PGOpItem; |
1769 | friend class PGPeeringItem; | |
1770 | friend class PGRecovery; | |
1771 | friend class PGDelete; | |
224ce89b | 1772 | |
7c673cae | 1773 | class ShardedOpWQ |
11fdf7f2 | 1774 | : public ShardedThreadPool::ShardedWQ<OpQueueItem> |
7c673cae | 1775 | { |
7c673cae | 1776 | OSD *osd; |
7c673cae FG |
1777 | |
1778 | public: | |
11fdf7f2 | 1779 | ShardedOpWQ(OSD *o, |
7c673cae FG |
1780 | time_t ti, |
1781 | time_t si, | |
1782 | ShardedThreadPool* tp) | |
11fdf7f2 TL |
1783 | : ShardedThreadPool::ShardedWQ<OpQueueItem>(ti, si, tp), |
1784 | osd(o) { | |
7c673cae FG |
1785 | } |
1786 | ||
11fdf7f2 TL |
1787 | void _add_slot_waiter( |
1788 | spg_t token, | |
1789 | OSDShardPGSlot *slot, | |
1790 | OpQueueItem&& qi); | |
7c673cae FG |
1791 | |
1792 | /// try to do some work | |
1793 | void _process(uint32_t thread_index, heartbeat_handle_d *hb) override; | |
1794 | ||
1795 | /// enqueue a new item | |
11fdf7f2 | 1796 | void _enqueue(OpQueueItem&& item) override; |
7c673cae FG |
1797 | |
1798 | /// requeue an old item (at the front of the line) | |
11fdf7f2 | 1799 | void _enqueue_front(OpQueueItem&& item) override; |
7c673cae FG |
1800 | |
1801 | void return_waiting_threads() override { | |
11fdf7f2 TL |
1802 | for(uint32_t i = 0; i < osd->num_shards; i++) { |
1803 | OSDShard* sdata = osd->shards[i]; | |
1804 | assert (NULL != sdata); | |
1805 | std::scoped_lock l{sdata->sdata_wait_lock}; | |
1806 | sdata->stop_waiting = true; | |
1807 | sdata->sdata_cond.notify_all(); | |
7c673cae FG |
1808 | } |
1809 | } | |
1810 | ||
11fdf7f2 TL |
1811 | void stop_return_waiting_threads() override { |
1812 | for(uint32_t i = 0; i < osd->num_shards; i++) { | |
1813 | OSDShard* sdata = osd->shards[i]; | |
7c673cae | 1814 | assert (NULL != sdata); |
11fdf7f2 TL |
1815 | std::scoped_lock l{sdata->sdata_wait_lock}; |
1816 | sdata->stop_waiting = false; | |
1817 | } | |
1818 | } | |
1819 | ||
1820 | void dump(Formatter *f) { | |
1821 | for(uint32_t i = 0; i < osd->num_shards; i++) { | |
1822 | auto &&sdata = osd->shards[i]; | |
1823 | ||
1824 | char queue_name[32] = {0}; | |
1825 | snprintf(queue_name, sizeof(queue_name), "%s%" PRIu32, "OSD:ShardedOpWQ:", i); | |
1826 | ceph_assert(NULL != sdata); | |
1827 | ||
1828 | std::scoped_lock l{sdata->shard_lock}; | |
1829 | f->open_object_section(queue_name); | |
7c673cae FG |
1830 | sdata->pqueue->dump(f); |
1831 | f->close_section(); | |
7c673cae FG |
1832 | } |
1833 | } | |
1834 | ||
11fdf7f2 TL |
1835 | bool is_shard_empty(uint32_t thread_index) override { |
1836 | uint32_t shard_index = thread_index % osd->num_shards; | |
1837 | auto &&sdata = osd->shards[shard_index]; | |
1838 | ceph_assert(sdata); | |
1839 | std::lock_guard l(sdata->shard_lock); | |
1840 | if (thread_index < osd->num_shards) { | |
1841 | return sdata->pqueue->empty() && sdata->context_queue.empty(); | |
1842 | } else { | |
1843 | return sdata->pqueue->empty(); | |
7c673cae | 1844 | } |
11fdf7f2 | 1845 | } |
7c673cae | 1846 | |
11fdf7f2 TL |
1847 | void handle_oncommits(list<Context*>& oncommits) { |
1848 | for (auto p : oncommits) { | |
1849 | p->complete(0); | |
1850 | } | |
7c673cae FG |
1851 | } |
1852 | } op_shardedwq; | |
1853 | ||
1854 | ||
11fdf7f2 | 1855 | void enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch); |
7c673cae FG |
1856 | void dequeue_op( |
1857 | PGRef pg, OpRequestRef op, | |
1858 | ThreadPool::TPHandle &handle); | |
1859 | ||
11fdf7f2 TL |
1860 | void enqueue_peering_evt( |
1861 | spg_t pgid, | |
1862 | PGPeeringEventRef ref); | |
1863 | void enqueue_peering_evt_front( | |
1864 | spg_t pgid, | |
1865 | PGPeeringEventRef ref); | |
1866 | void dequeue_peering_evt( | |
1867 | OSDShard *sdata, | |
1868 | PG *pg, | |
1869 | PGPeeringEventRef ref, | |
1870 | ThreadPool::TPHandle& handle); | |
1871 | ||
1872 | void dequeue_delete( | |
1873 | OSDShard *sdata, | |
1874 | PG *pg, | |
1875 | epoch_t epoch, | |
1876 | ThreadPool::TPHandle& handle); | |
7c673cae FG |
1877 | |
1878 | friend class PG; | |
11fdf7f2 | 1879 | friend class OSDShard; |
7c673cae FG |
1880 | friend class PrimaryLogPG; |
1881 | ||
1882 | ||
1883 | protected: | |
1884 | ||
1885 | // -- osd map -- | |
1886 | OSDMapRef osdmap; | |
1887 | OSDMapRef get_osdmap() { | |
1888 | return osdmap; | |
1889 | } | |
224ce89b | 1890 | epoch_t get_osdmap_epoch() const { |
7c673cae FG |
1891 | return osdmap ? osdmap->get_epoch() : 0; |
1892 | } | |
1893 | ||
11fdf7f2 TL |
1894 | pool_pg_num_history_t pg_num_history; |
1895 | ||
7c673cae FG |
1896 | utime_t had_map_since; |
1897 | RWLock map_lock; | |
1898 | list<OpRequestRef> waiting_for_osdmap; | |
1899 | deque<utime_t> osd_markdown_log; | |
1900 | ||
1901 | friend struct send_map_on_destruct; | |
1902 | ||
1903 | void wait_for_new_map(OpRequestRef op); | |
1904 | void handle_osd_map(class MOSDMap *m); | |
1905 | void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m); | |
1906 | void trim_maps(epoch_t oldest, int nreceived, bool skip_maps); | |
1907 | void note_down_osd(int osd); | |
1908 | void note_up_osd(int osd); | |
1909 | friend class C_OnMapCommit; | |
1910 | ||
1911 | bool advance_pg( | |
11fdf7f2 TL |
1912 | epoch_t advance_to, |
1913 | PG *pg, | |
7c673cae | 1914 | ThreadPool::TPHandle &handle, |
11fdf7f2 | 1915 | PG::RecoveryCtx *rctx); |
7c673cae FG |
1916 | void consume_map(); |
1917 | void activate_map(); | |
1918 | ||
1919 | // osd map cache (past osd maps) | |
1920 | OSDMapRef get_map(epoch_t e) { | |
1921 | return service.get_map(e); | |
1922 | } | |
1923 | OSDMapRef add_map(OSDMap *o) { | |
1924 | return service.add_map(o); | |
1925 | } | |
1926 | void add_map_bl(epoch_t e, bufferlist& bl) { | |
1927 | return service.add_map_bl(e, bl); | |
1928 | } | |
7c673cae FG |
1929 | bool get_map_bl(epoch_t e, bufferlist& bl) { |
1930 | return service.get_map_bl(e, bl); | |
1931 | } | |
1932 | void add_map_inc_bl(epoch_t e, bufferlist& bl) { | |
1933 | return service.add_map_inc_bl(e, bl); | |
1934 | } | |
11fdf7f2 TL |
1935 | |
1936 | public: | |
1937 | // -- shards -- | |
1938 | vector<OSDShard*> shards; | |
1939 | uint32_t num_shards = 0; | |
1940 | ||
1941 | void inc_num_pgs() { | |
1942 | ++num_pgs; | |
1943 | } | |
1944 | void dec_num_pgs() { | |
1945 | --num_pgs; | |
1946 | } | |
1947 | int get_num_pgs() const { | |
1948 | return num_pgs; | |
7c673cae FG |
1949 | } |
1950 | ||
1951 | protected: | |
11fdf7f2 TL |
1952 | Mutex merge_lock = {"OSD::merge_lock"}; |
1953 | /// merge epoch -> target pgid -> source pgid -> pg | |
1954 | map<epoch_t,map<spg_t,map<spg_t,PGRef>>> merge_waiters; | |
1955 | ||
1956 | bool add_merge_waiter(OSDMapRef nextmap, spg_t target, PGRef source, | |
1957 | unsigned need); | |
1958 | ||
7c673cae | 1959 | // -- placement groups -- |
11fdf7f2 | 1960 | std::atomic<size_t> num_pgs = {0}; |
7c673cae | 1961 | |
3efd9988 | 1962 | std::mutex pending_creates_lock; |
b32b8144 FG |
1963 | using create_from_osd_t = std::pair<pg_t, bool /* is primary*/>; |
1964 | std::set<create_from_osd_t> pending_creates_from_osd; | |
3efd9988 FG |
1965 | unsigned pending_creates_from_mon = 0; |
1966 | ||
7c673cae FG |
1967 | PGRecoveryStats pg_recovery_stats; |
1968 | ||
11fdf7f2 TL |
1969 | PGRef _lookup_pg(spg_t pgid); |
1970 | PGRef _lookup_lock_pg(spg_t pgid); | |
1971 | void register_pg(PGRef pg); | |
1972 | bool try_finish_pg_delete(PG *pg, unsigned old_pg_num); | |
7c673cae | 1973 | |
11fdf7f2 TL |
1974 | void _get_pgs(vector<PGRef> *v, bool clear_too=false); |
1975 | void _get_pgids(vector<spg_t> *v); | |
31f18b77 FG |
1976 | |
1977 | public: | |
11fdf7f2 | 1978 | PGRef lookup_lock_pg(spg_t pgid); |
31f18b77 | 1979 | |
11fdf7f2 | 1980 | std::set<int64_t> get_mapped_pools(); |
35e4c445 | 1981 | |
31f18b77 | 1982 | protected: |
7c673cae | 1983 | PG* _make_pg(OSDMapRef createmap, spg_t pgid); |
7c673cae | 1984 | |
11fdf7f2 TL |
1985 | bool maybe_wait_for_max_pg(const OSDMapRef& osdmap, |
1986 | spg_t pgid, bool is_mon_create); | |
3efd9988 FG |
1987 | void resume_creating_pg(); |
1988 | ||
7c673cae | 1989 | void load_pgs(); |
7c673cae FG |
1990 | |
1991 | /// build initial pg history and intervals on create | |
1992 | void build_initial_pg_history( | |
1993 | spg_t pgid, | |
1994 | epoch_t created, | |
1995 | utime_t created_stamp, | |
1996 | pg_history_t *h, | |
1997 | PastIntervals *pi); | |
1998 | ||
7c673cae FG |
1999 | epoch_t last_pg_create_epoch; |
2000 | ||
2001 | void handle_pg_create(OpRequestRef op); | |
2002 | ||
2003 | void split_pgs( | |
2004 | PG *parent, | |
31f18b77 | 2005 | const set<spg_t> &childpgids, set<PGRef> *out_pgs, |
7c673cae FG |
2006 | OSDMapRef curmap, |
2007 | OSDMapRef nextmap, | |
2008 | PG::RecoveryCtx *rctx); | |
11fdf7f2 | 2009 | void _finish_splits(set<PGRef>& pgs); |
7c673cae FG |
2010 | |
2011 | // == monitor interaction == | |
2012 | Mutex mon_report_lock; | |
2013 | utime_t last_mon_report; | |
11fdf7f2 | 2014 | Finisher boot_finisher; |
7c673cae FG |
2015 | |
2016 | // -- boot -- | |
2017 | void start_boot(); | |
2018 | void _got_mon_epochs(epoch_t oldest, epoch_t newest); | |
2019 | void _preboot(epoch_t oldest, epoch_t newest); | |
2020 | void _send_boot(); | |
2021 | void _collect_metadata(map<string,string> *pmeta); | |
2022 | ||
2023 | void start_waiting_for_healthy(); | |
2024 | bool _is_healthy(); | |
2025 | ||
2026 | void send_full_update(); | |
2027 | ||
2028 | friend struct C_OSD_GetVersion; | |
2029 | ||
2030 | // -- alive -- | |
2031 | epoch_t up_thru_wanted; | |
2032 | ||
2033 | void queue_want_up_thru(epoch_t want); | |
2034 | void send_alive(); | |
2035 | ||
2036 | // -- full map requests -- | |
2037 | epoch_t requested_full_first, requested_full_last; | |
2038 | ||
2039 | void request_full_map(epoch_t first, epoch_t last); | |
2040 | void rerequest_full_maps() { | |
2041 | epoch_t first = requested_full_first; | |
2042 | epoch_t last = requested_full_last; | |
2043 | requested_full_first = 0; | |
2044 | requested_full_last = 0; | |
2045 | request_full_map(first, last); | |
2046 | } | |
2047 | void got_full_map(epoch_t e); | |
2048 | ||
2049 | // -- failures -- | |
2050 | map<int,utime_t> failure_queue; | |
11fdf7f2 | 2051 | map<int,pair<utime_t,entity_addrvec_t> > failure_pending; |
7c673cae FG |
2052 | |
2053 | void requeue_failures(); | |
2054 | void send_failures(); | |
11fdf7f2 TL |
2055 | void send_still_alive(epoch_t epoch, int osd, const entity_addrvec_t &addrs); |
2056 | void cancel_pending_failures(); | |
7c673cae FG |
2057 | |
2058 | ceph::coarse_mono_clock::time_point last_sent_beacon; | |
2059 | Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"}; | |
2060 | epoch_t min_last_epoch_clean = 0; | |
2061 | // which pgs were scanned for min_lec | |
2062 | std::vector<pg_t> min_last_epoch_clean_pgs; | |
2063 | void send_beacon(const ceph::coarse_mono_clock::time_point& now); | |
2064 | ||
7c673cae FG |
2065 | ceph_tid_t get_tid() { |
2066 | return service.get_tid(); | |
2067 | } | |
2068 | ||
2069 | // -- generic pg peering -- | |
2070 | PG::RecoveryCtx create_context(); | |
2071 | void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap, | |
2072 | ThreadPool::TPHandle *handle = NULL); | |
2073 | void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg, | |
2074 | ThreadPool::TPHandle *handle = NULL); | |
11fdf7f2 | 2075 | void discard_context(PG::RecoveryCtx &ctx); |
7c673cae FG |
2076 | void do_notifies(map<int, |
2077 | vector<pair<pg_notify_t, PastIntervals> > >& | |
2078 | notify_list, | |
2079 | OSDMapRef map); | |
2080 | void do_queries(map<int, map<spg_t,pg_query_t> >& query_map, | |
2081 | OSDMapRef map); | |
2082 | void do_infos(map<int, | |
2083 | vector<pair<pg_notify_t, PastIntervals> > >& info_map, | |
2084 | OSDMapRef map); | |
2085 | ||
2086 | bool require_mon_peer(const Message *m); | |
2087 | bool require_mon_or_mgr_peer(const Message *m); | |
2088 | bool require_osd_peer(const Message *m); | |
2089 | /*** | |
2090 | * Verifies that we were alive in the given epoch, and that | |
2091 | * still are. | |
2092 | */ | |
2093 | bool require_self_aliveness(const Message *m, epoch_t alive_since); | |
2094 | /** | |
2095 | * Verifies that the OSD who sent the given op has the same | |
2096 | * address as in the given map. | |
2097 | * @pre op was sent by an OSD using the cluster messenger | |
2098 | */ | |
2099 | bool require_same_peer_instance(const Message *m, OSDMapRef& map, | |
2100 | bool is_fast_dispatch); | |
2101 | ||
2102 | bool require_same_or_newer_map(OpRequestRef& op, epoch_t e, | |
2103 | bool is_fast_dispatch); | |
2104 | ||
11fdf7f2 TL |
2105 | void handle_fast_pg_create(MOSDPGCreate2 *m); |
2106 | void handle_fast_pg_query(MOSDPGQuery *m); | |
2107 | void handle_pg_query_nopg(const MQuery& q); | |
2108 | void handle_fast_pg_notify(MOSDPGNotify *m); | |
2109 | void handle_pg_notify_nopg(const MNotifyRec& q); | |
2110 | void handle_fast_pg_info(MOSDPGInfo *m); | |
2111 | void handle_fast_pg_remove(MOSDPGRemove *m); | |
7c673cae | 2112 | |
11fdf7f2 TL |
2113 | public: |
2114 | // used by OSDShard | |
2115 | PGRef handle_pg_create_info(const OSDMapRef& osdmap, const PGCreateInfo *info); | |
2116 | protected: | |
c07f9fc5 | 2117 | |
11fdf7f2 | 2118 | void handle_fast_force_recovery(MOSDForceRecovery *m); |
7c673cae FG |
2119 | |
2120 | // -- commands -- | |
2121 | struct Command { | |
2122 | vector<string> cmd; | |
2123 | ceph_tid_t tid; | |
2124 | bufferlist indata; | |
2125 | ConnectionRef con; | |
2126 | ||
2127 | Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co) | |
2128 | : cmd(c), tid(t), indata(bl), con(co) {} | |
2129 | }; | |
2130 | list<Command*> command_queue; | |
2131 | struct CommandWQ : public ThreadPool::WorkQueue<Command> { | |
2132 | OSD *osd; | |
2133 | CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp) | |
2134 | : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {} | |
2135 | ||
2136 | bool _empty() override { | |
2137 | return osd->command_queue.empty(); | |
2138 | } | |
2139 | bool _enqueue(Command *c) override { | |
2140 | osd->command_queue.push_back(c); | |
2141 | return true; | |
2142 | } | |
2143 | void _dequeue(Command *pg) override { | |
2144 | ceph_abort(); | |
2145 | } | |
2146 | Command *_dequeue() override { | |
2147 | if (osd->command_queue.empty()) | |
2148 | return NULL; | |
2149 | Command *c = osd->command_queue.front(); | |
2150 | osd->command_queue.pop_front(); | |
2151 | return c; | |
2152 | } | |
2153 | void _process(Command *c, ThreadPool::TPHandle &) override { | |
11fdf7f2 | 2154 | osd->osd_lock.lock(); |
7c673cae | 2155 | if (osd->is_stopping()) { |
11fdf7f2 | 2156 | osd->osd_lock.unlock(); |
7c673cae FG |
2157 | delete c; |
2158 | return; | |
2159 | } | |
2160 | osd->do_command(c->con.get(), c->tid, c->cmd, c->indata); | |
11fdf7f2 | 2161 | osd->osd_lock.unlock(); |
7c673cae FG |
2162 | delete c; |
2163 | } | |
2164 | void _clear() override { | |
2165 | while (!osd->command_queue.empty()) { | |
2166 | Command *c = osd->command_queue.front(); | |
2167 | osd->command_queue.pop_front(); | |
2168 | delete c; | |
2169 | } | |
2170 | } | |
2171 | } command_wq; | |
2172 | ||
2173 | void handle_command(class MMonCommand *m); | |
2174 | void handle_command(class MCommand *m); | |
2175 | void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data); | |
11fdf7f2 TL |
2176 | int _do_command( |
2177 | Connection *con, cmdmap_t& cmdmap, ceph_tid_t tid, bufferlist& data, | |
2178 | bufferlist& odata, stringstream& ss, stringstream& ds); | |
2179 | ||
7c673cae FG |
2180 | |
2181 | // -- pg recovery -- | |
2182 | void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved, | |
2183 | ThreadPool::TPHandle &handle); | |
2184 | ||
2185 | ||
2186 | // -- scrubbing -- | |
2187 | void sched_scrub(); | |
494da23a | 2188 | void resched_all_scrubs(); |
7c673cae FG |
2189 | bool scrub_random_backoff(); |
2190 | bool scrub_load_below_threshold(); | |
2191 | bool scrub_time_permit(utime_t now); | |
2192 | ||
b32b8144 FG |
2193 | // -- status reporting -- |
2194 | MPGStats *collect_pg_stats(); | |
11fdf7f2 TL |
2195 | std::vector<DaemonHealthMetric> get_health_metrics(); |
2196 | ||
b32b8144 | 2197 | |
224ce89b | 2198 | private: |
7c673cae FG |
2199 | bool ms_can_fast_dispatch_any() const override { return true; } |
2200 | bool ms_can_fast_dispatch(const Message *m) const override { | |
2201 | switch (m->get_type()) { | |
11fdf7f2 | 2202 | case CEPH_MSG_PING: |
7c673cae FG |
2203 | case CEPH_MSG_OSD_OP: |
2204 | case CEPH_MSG_OSD_BACKOFF: | |
11fdf7f2 TL |
2205 | case MSG_OSD_SCRUB2: |
2206 | case MSG_OSD_FORCE_RECOVERY: | |
2207 | case MSG_MON_COMMAND: | |
2208 | case MSG_OSD_PG_CREATE2: | |
2209 | case MSG_OSD_PG_QUERY: | |
2210 | case MSG_OSD_PG_INFO: | |
2211 | case MSG_OSD_PG_NOTIFY: | |
2212 | case MSG_OSD_PG_LOG: | |
2213 | case MSG_OSD_PG_TRIM: | |
2214 | case MSG_OSD_PG_REMOVE: | |
2215 | case MSG_OSD_BACKFILL_RESERVE: | |
2216 | case MSG_OSD_RECOVERY_RESERVE: | |
7c673cae | 2217 | case MSG_OSD_REPOP: |
7c673cae FG |
2218 | case MSG_OSD_REPOPREPLY: |
2219 | case MSG_OSD_PG_PUSH: | |
2220 | case MSG_OSD_PG_PULL: | |
2221 | case MSG_OSD_PG_PUSH_REPLY: | |
2222 | case MSG_OSD_PG_SCAN: | |
2223 | case MSG_OSD_PG_BACKFILL: | |
2224 | case MSG_OSD_PG_BACKFILL_REMOVE: | |
2225 | case MSG_OSD_EC_WRITE: | |
2226 | case MSG_OSD_EC_WRITE_REPLY: | |
2227 | case MSG_OSD_EC_READ: | |
2228 | case MSG_OSD_EC_READ_REPLY: | |
2229 | case MSG_OSD_SCRUB_RESERVE: | |
2230 | case MSG_OSD_REP_SCRUB: | |
2231 | case MSG_OSD_REP_SCRUBMAP: | |
2232 | case MSG_OSD_PG_UPDATE_LOG_MISSING: | |
2233 | case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: | |
c07f9fc5 FG |
2234 | case MSG_OSD_PG_RECOVERY_DELETE: |
2235 | case MSG_OSD_PG_RECOVERY_DELETE_REPLY: | |
7c673cae FG |
2236 | return true; |
2237 | default: | |
2238 | return false; | |
2239 | } | |
2240 | } | |
2241 | void ms_fast_dispatch(Message *m) override; | |
7c673cae | 2242 | bool ms_dispatch(Message *m) override; |
11fdf7f2 | 2243 | bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer) override; |
7c673cae FG |
2244 | void ms_handle_connect(Connection *con) override; |
2245 | void ms_handle_fast_connect(Connection *con) override; | |
2246 | void ms_handle_fast_accept(Connection *con) override; | |
11fdf7f2 TL |
2247 | int ms_handle_authentication(Connection *con) override; |
2248 | KeyStore *ms_get_auth1_authorizer_keystore() override; | |
7c673cae FG |
2249 | bool ms_handle_reset(Connection *con) override; |
2250 | void ms_handle_remote_reset(Connection *con) override {} | |
2251 | bool ms_handle_refused(Connection *con) override; | |
2252 | ||
2253 | io_queue get_io_queue() const { | |
2254 | if (cct->_conf->osd_op_queue == "debug_random") { | |
224ce89b WB |
2255 | static io_queue index_lookup[] = { io_queue::prioritized, |
2256 | io_queue::weightedpriority, | |
2257 | io_queue::mclock_opclass, | |
2258 | io_queue::mclock_client }; | |
7c673cae | 2259 | srand(time(NULL)); |
224ce89b WB |
2260 | unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0])); |
2261 | return index_lookup[which]; | |
d2e6a577 FG |
2262 | } else if (cct->_conf->osd_op_queue == "prioritized") { |
2263 | return io_queue::prioritized; | |
224ce89b WB |
2264 | } else if (cct->_conf->osd_op_queue == "mclock_opclass") { |
2265 | return io_queue::mclock_opclass; | |
2266 | } else if (cct->_conf->osd_op_queue == "mclock_client") { | |
2267 | return io_queue::mclock_client; | |
7c673cae | 2268 | } else { |
d2e6a577 FG |
2269 | // default / catch-all is 'wpq' |
2270 | return io_queue::weightedpriority; | |
7c673cae FG |
2271 | } |
2272 | } | |
2273 | ||
2274 | unsigned int get_io_prio_cut() const { | |
2275 | if (cct->_conf->osd_op_queue_cut_off == "debug_random") { | |
2276 | srand(time(NULL)); | |
2277 | return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; | |
d2e6a577 | 2278 | } else if (cct->_conf->osd_op_queue_cut_off == "high") { |
7c673cae | 2279 | return CEPH_MSG_PRIO_HIGH; |
d2e6a577 FG |
2280 | } else { |
2281 | // default / catch-all is 'low' | |
2282 | return CEPH_MSG_PRIO_LOW; | |
7c673cae FG |
2283 | } |
2284 | } | |
2285 | ||
2286 | public: | |
2287 | /* internal and external can point to the same messenger, they will still | |
2288 | * be cleaned up properly*/ | |
2289 | OSD(CephContext *cct_, | |
2290 | ObjectStore *store_, | |
2291 | int id, | |
2292 | Messenger *internal, | |
2293 | Messenger *external, | |
2294 | Messenger *hb_front_client, | |
2295 | Messenger *hb_back_client, | |
2296 | Messenger *hb_front_server, | |
2297 | Messenger *hb_back_server, | |
2298 | Messenger *osdc_messenger, | |
2299 | MonClient *mc, const std::string &dev, const std::string &jdev); | |
2300 | ~OSD() override; | |
2301 | ||
2302 | // static bits | |
11fdf7f2 TL |
2303 | static int mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami); |
2304 | ||
7c673cae FG |
2305 | /* remove any non-user xattrs from a map of them */ |
2306 | void filter_xattrs(map<string, bufferptr>& attrs) { | |
2307 | for (map<string, bufferptr>::iterator iter = attrs.begin(); | |
2308 | iter != attrs.end(); | |
2309 | ) { | |
2310 | if (('_' != iter->first.at(0)) || (iter->first.size() == 1)) | |
2311 | attrs.erase(iter++); | |
2312 | else ++iter; | |
2313 | } | |
2314 | } | |
2315 | ||
2316 | private: | |
2317 | int mon_cmd_maybe_osd_create(string &cmd); | |
2318 | int update_crush_device_class(); | |
2319 | int update_crush_location(); | |
2320 | ||
3efd9988 FG |
2321 | static int write_meta(CephContext *cct, |
2322 | ObjectStore *store, | |
7c673cae FG |
2323 | uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami); |
2324 | ||
7c673cae | 2325 | void handle_scrub(struct MOSDScrub *m); |
11fdf7f2 | 2326 | void handle_fast_scrub(struct MOSDScrub2 *m); |
7c673cae FG |
2327 | void handle_osd_ping(class MOSDPing *m); |
2328 | ||
2329 | int init_op_flags(OpRequestRef& op); | |
2330 | ||
31f18b77 FG |
2331 | int get_num_op_shards(); |
2332 | int get_num_op_threads(); | |
2333 | ||
c07f9fc5 | 2334 | float get_osd_recovery_sleep(); |
11fdf7f2 | 2335 | float get_osd_delete_sleep(); |
494da23a | 2336 | float get_osd_snap_trim_sleep(); |
11fdf7f2 TL |
2337 | |
2338 | void probe_smart(const string& devid, ostream& ss); | |
c07f9fc5 | 2339 | |
7c673cae | 2340 | public: |
11fdf7f2 TL |
2341 | static int peek_meta(ObjectStore *store, |
2342 | string *magic, | |
2343 | uuid_d *cluster_fsid, | |
2344 | uuid_d *osd_fsid, | |
2345 | int *whoami, | |
2346 | int *min_osd_release); | |
7c673cae FG |
2347 | |
2348 | ||
2349 | // startup/shutdown | |
2350 | int pre_init(); | |
2351 | int init(); | |
2352 | void final_init(); | |
2353 | ||
2354 | int enable_disable_fuse(bool stop); | |
11fdf7f2 | 2355 | int set_numa_affinity(); |
7c673cae FG |
2356 | |
2357 | void suicide(int exitcode); | |
2358 | int shutdown(); | |
2359 | ||
2360 | void handle_signal(int signum); | |
2361 | ||
2362 | /// check if we can throw out op from a disconnected client | |
2363 | static bool op_is_discardable(const MOSDOp *m); | |
2364 | ||
2365 | public: | |
2366 | OSDService service; | |
2367 | friend class OSDService; | |
11fdf7f2 TL |
2368 | |
2369 | private: | |
2370 | void set_perf_queries( | |
2371 | const std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> &queries); | |
2372 | void get_perf_reports( | |
2373 | std::map<OSDPerfMetricQuery, OSDPerfMetricReport> *reports); | |
2374 | ||
2375 | Mutex m_perf_queries_lock = {"OSD::m_perf_queries_lock"}; | |
2376 | std::list<OSDPerfMetricQuery> m_perf_queries; | |
2377 | std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> m_perf_limits; | |
7c673cae FG |
2378 | }; |
2379 | ||
224ce89b | 2380 | |
11fdf7f2 | 2381 | std::ostream& operator<<(std::ostream& out, const io_queue& q); |
224ce89b WB |
2382 | |
2383 | ||
7c673cae FG |
2384 | //compatibility of the executable |
2385 | extern const CompatSet::Feature ceph_osd_feature_compat[]; | |
2386 | extern const CompatSet::Feature ceph_osd_feature_ro_compat[]; | |
2387 | extern const CompatSet::Feature ceph_osd_feature_incompat[]; | |
2388 | ||
224ce89b | 2389 | #endif // CEPH_OSD_H |