]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef CEPH_OSD_H | |
16 | #define CEPH_OSD_H | |
17 | ||
18 | #include "PG.h" | |
19 | ||
20 | #include "msg/Dispatcher.h" | |
21 | ||
22 | #include "common/Mutex.h" | |
23 | #include "common/RWLock.h" | |
24 | #include "common/Timer.h" | |
25 | #include "common/WorkQueue.h" | |
26 | #include "common/AsyncReserver.h" | |
27 | #include "common/ceph_context.h" | |
28 | #include "common/zipkin_trace.h" | |
29 | ||
30 | #include "mgr/MgrClient.h" | |
31 | ||
32 | #include "os/ObjectStore.h" | |
33 | #include "OSDCap.h" | |
34 | ||
35 | #include "auth/KeyRing.h" | |
36 | #include "osd/ClassHandler.h" | |
37 | ||
38 | #include "include/CompatSet.h" | |
39 | ||
40 | #include "OpRequest.h" | |
41 | #include "Session.h" | |
42 | ||
224ce89b WB |
43 | #include "osd/PGQueueable.h" |
44 | ||
7c673cae FG |
45 | #include <atomic> |
46 | #include <map> | |
47 | #include <memory> | |
48 | #include "include/memory.h" | |
49 | using namespace std; | |
50 | ||
51 | #include "include/unordered_map.h" | |
52 | ||
53 | #include "common/shared_cache.hpp" | |
54 | #include "common/simple_cache.hpp" | |
55 | #include "common/sharedptr_registry.hpp" | |
56 | #include "common/WeightedPriorityQueue.h" | |
57 | #include "common/PrioritizedQueue.h" | |
224ce89b WB |
58 | #include "osd/mClockOpClassQueue.h" |
59 | #include "osd/mClockClientQueue.h" | |
7c673cae FG |
60 | #include "messages/MOSDOp.h" |
61 | #include "include/Spinlock.h" | |
62 | #include "common/EventTrace.h" | |
63 | ||
64 | #define CEPH_OSD_PROTOCOL 10 /* cluster internal */ | |
65 | ||
66 | ||
67 | enum { | |
68 | l_osd_first = 10000, | |
69 | l_osd_op_wip, | |
70 | l_osd_op, | |
71 | l_osd_op_inb, | |
72 | l_osd_op_outb, | |
73 | l_osd_op_lat, | |
74 | l_osd_op_process_lat, | |
75 | l_osd_op_prepare_lat, | |
76 | l_osd_op_r, | |
77 | l_osd_op_r_outb, | |
78 | l_osd_op_r_lat, | |
79 | l_osd_op_r_lat_outb_hist, | |
80 | l_osd_op_r_process_lat, | |
81 | l_osd_op_r_prepare_lat, | |
82 | l_osd_op_w, | |
83 | l_osd_op_w_inb, | |
84 | l_osd_op_w_lat, | |
85 | l_osd_op_w_lat_inb_hist, | |
86 | l_osd_op_w_process_lat, | |
87 | l_osd_op_w_prepare_lat, | |
88 | l_osd_op_rw, | |
89 | l_osd_op_rw_inb, | |
90 | l_osd_op_rw_outb, | |
91 | l_osd_op_rw_lat, | |
92 | l_osd_op_rw_lat_inb_hist, | |
93 | l_osd_op_rw_lat_outb_hist, | |
94 | l_osd_op_rw_process_lat, | |
95 | l_osd_op_rw_prepare_lat, | |
96 | ||
224ce89b WB |
97 | l_osd_op_before_queue_op_lat, |
98 | l_osd_op_before_dequeue_op_lat, | |
99 | ||
7c673cae FG |
100 | l_osd_sop, |
101 | l_osd_sop_inb, | |
102 | l_osd_sop_lat, | |
103 | l_osd_sop_w, | |
104 | l_osd_sop_w_inb, | |
105 | l_osd_sop_w_lat, | |
106 | l_osd_sop_pull, | |
107 | l_osd_sop_pull_lat, | |
108 | l_osd_sop_push, | |
109 | l_osd_sop_push_inb, | |
110 | l_osd_sop_push_lat, | |
111 | ||
112 | l_osd_pull, | |
113 | l_osd_push, | |
114 | l_osd_push_outb, | |
115 | ||
116 | l_osd_rop, | |
117 | ||
118 | l_osd_loadavg, | |
119 | l_osd_buf, | |
120 | l_osd_history_alloc_bytes, | |
121 | l_osd_history_alloc_num, | |
122 | l_osd_cached_crc, | |
123 | l_osd_cached_crc_adjusted, | |
124 | l_osd_missed_crc, | |
125 | ||
126 | l_osd_pg, | |
127 | l_osd_pg_primary, | |
128 | l_osd_pg_replica, | |
129 | l_osd_pg_stray, | |
94b18763 | 130 | l_osd_pg_removing, |
7c673cae FG |
131 | l_osd_hb_to, |
132 | l_osd_map, | |
133 | l_osd_mape, | |
134 | l_osd_mape_dup, | |
135 | ||
136 | l_osd_waiting_for_map, | |
137 | ||
138 | l_osd_map_cache_hit, | |
139 | l_osd_map_cache_miss, | |
140 | l_osd_map_cache_miss_low, | |
141 | l_osd_map_cache_miss_low_avg, | |
31f18b77 FG |
142 | l_osd_map_bl_cache_hit, |
143 | l_osd_map_bl_cache_miss, | |
7c673cae FG |
144 | |
145 | l_osd_stat_bytes, | |
146 | l_osd_stat_bytes_used, | |
147 | l_osd_stat_bytes_avail, | |
148 | ||
149 | l_osd_copyfrom, | |
150 | ||
151 | l_osd_tier_promote, | |
152 | l_osd_tier_flush, | |
153 | l_osd_tier_flush_fail, | |
154 | l_osd_tier_try_flush, | |
155 | l_osd_tier_try_flush_fail, | |
156 | l_osd_tier_evict, | |
157 | l_osd_tier_whiteout, | |
158 | l_osd_tier_dirty, | |
159 | l_osd_tier_clean, | |
160 | l_osd_tier_delay, | |
161 | l_osd_tier_proxy_read, | |
162 | l_osd_tier_proxy_write, | |
163 | ||
164 | l_osd_agent_wake, | |
165 | l_osd_agent_skip, | |
166 | l_osd_agent_flush, | |
167 | l_osd_agent_evict, | |
168 | ||
169 | l_osd_object_ctx_cache_hit, | |
170 | l_osd_object_ctx_cache_total, | |
171 | ||
172 | l_osd_op_cache_hit, | |
173 | l_osd_tier_flush_lat, | |
174 | l_osd_tier_promote_lat, | |
175 | l_osd_tier_r_lat, | |
176 | ||
177 | l_osd_pg_info, | |
178 | l_osd_pg_fastinfo, | |
179 | l_osd_pg_biginfo, | |
180 | ||
181 | l_osd_last, | |
182 | }; | |
183 | ||
184 | // RecoveryState perf counters | |
185 | enum { | |
186 | rs_first = 20000, | |
187 | rs_initial_latency, | |
188 | rs_started_latency, | |
189 | rs_reset_latency, | |
190 | rs_start_latency, | |
191 | rs_primary_latency, | |
192 | rs_peering_latency, | |
193 | rs_backfilling_latency, | |
194 | rs_waitremotebackfillreserved_latency, | |
195 | rs_waitlocalbackfillreserved_latency, | |
196 | rs_notbackfilling_latency, | |
197 | rs_repnotrecovering_latency, | |
198 | rs_repwaitrecoveryreserved_latency, | |
199 | rs_repwaitbackfillreserved_latency, | |
200 | rs_reprecovering_latency, | |
201 | rs_activating_latency, | |
202 | rs_waitlocalrecoveryreserved_latency, | |
203 | rs_waitremoterecoveryreserved_latency, | |
204 | rs_recovering_latency, | |
205 | rs_recovered_latency, | |
206 | rs_clean_latency, | |
207 | rs_active_latency, | |
208 | rs_replicaactive_latency, | |
209 | rs_stray_latency, | |
210 | rs_getinfo_latency, | |
211 | rs_getlog_latency, | |
212 | rs_waitactingchange_latency, | |
213 | rs_incomplete_latency, | |
214 | rs_down_latency, | |
215 | rs_getmissing_latency, | |
216 | rs_waitupthru_latency, | |
217 | rs_notrecovering_latency, | |
218 | rs_last, | |
219 | }; | |
220 | ||
221 | class Messenger; | |
222 | class Message; | |
223 | class MonClient; | |
224 | class PerfCounters; | |
225 | class ObjectStore; | |
226 | class FuseStore; | |
227 | class OSDMap; | |
228 | class MLog; | |
229 | class Objecter; | |
230 | ||
231 | class Watch; | |
232 | class PrimaryLogPG; | |
233 | ||
234 | class AuthAuthorizeHandlerRegistry; | |
235 | ||
236 | class TestOpsSocketHook; | |
237 | struct C_CompleteSplits; | |
238 | struct C_OpenPGs; | |
239 | class LogChannel; | |
240 | class CephContext; | |
241 | typedef ceph::shared_ptr<ObjectStore::Sequencer> SequencerRef; | |
242 | class MOSDOp; | |
243 | ||
244 | class DeletingState { | |
245 | Mutex lock; | |
246 | Cond cond; | |
247 | enum { | |
248 | QUEUED, | |
249 | CLEARING_DIR, | |
250 | CLEARING_WAITING, | |
251 | DELETING_DIR, | |
252 | DELETED_DIR, | |
253 | CANCELED, | |
254 | } status; | |
255 | bool stop_deleting; | |
256 | public: | |
257 | const spg_t pgid; | |
258 | const PGRef old_pg_state; | |
259 | explicit DeletingState(const pair<spg_t, PGRef> &in) : | |
260 | lock("DeletingState::lock"), status(QUEUED), stop_deleting(false), | |
261 | pgid(in.first), old_pg_state(in.second) { | |
262 | } | |
263 | ||
264 | /// transition status to CLEARING_WAITING | |
265 | bool pause_clearing() { | |
266 | Mutex::Locker l(lock); | |
267 | assert(status == CLEARING_DIR); | |
268 | if (stop_deleting) { | |
269 | status = CANCELED; | |
270 | cond.Signal(); | |
271 | return false; | |
272 | } | |
273 | status = CLEARING_WAITING; | |
274 | return true; | |
275 | } ///< @return false if we should cancel deletion | |
276 | ||
277 | /// start or resume the clearing - transition the status to CLEARING_DIR | |
278 | bool start_or_resume_clearing() { | |
279 | Mutex::Locker l(lock); | |
280 | assert( | |
281 | status == QUEUED || | |
282 | status == DELETED_DIR || | |
283 | status == CLEARING_WAITING); | |
284 | if (stop_deleting) { | |
285 | status = CANCELED; | |
286 | cond.Signal(); | |
287 | return false; | |
288 | } | |
289 | status = CLEARING_DIR; | |
290 | return true; | |
291 | } ///< @return false if we should cancel the deletion | |
292 | ||
293 | /// transition status to CLEARING_DIR | |
294 | bool resume_clearing() { | |
295 | Mutex::Locker l(lock); | |
296 | assert(status == CLEARING_WAITING); | |
297 | if (stop_deleting) { | |
298 | status = CANCELED; | |
299 | cond.Signal(); | |
300 | return false; | |
301 | } | |
302 | status = CLEARING_DIR; | |
303 | return true; | |
304 | } ///< @return false if we should cancel deletion | |
305 | ||
306 | /// transition status to deleting | |
307 | bool start_deleting() { | |
308 | Mutex::Locker l(lock); | |
309 | assert(status == CLEARING_DIR); | |
310 | if (stop_deleting) { | |
311 | status = CANCELED; | |
312 | cond.Signal(); | |
313 | return false; | |
314 | } | |
315 | status = DELETING_DIR; | |
316 | return true; | |
317 | } ///< @return false if we should cancel deletion | |
318 | ||
319 | /// signal collection removal queued | |
320 | void finish_deleting() { | |
321 | Mutex::Locker l(lock); | |
322 | assert(status == DELETING_DIR); | |
323 | status = DELETED_DIR; | |
324 | cond.Signal(); | |
325 | } | |
326 | ||
327 | /// try to halt the deletion | |
328 | bool try_stop_deletion() { | |
329 | Mutex::Locker l(lock); | |
330 | stop_deleting = true; | |
331 | /** | |
332 | * If we are in DELETING_DIR or CLEARING_DIR, there are in progress | |
333 | * operations we have to wait for before continuing on. States | |
334 | * CLEARING_WAITING and QUEUED indicate that the remover will check | |
335 | * stop_deleting before queueing any further operations. CANCELED | |
336 | * indicates that the remover has already halted. DELETED_DIR | |
337 | * indicates that the deletion has been fully queued. | |
338 | */ | |
339 | while (status == DELETING_DIR || status == CLEARING_DIR) | |
340 | cond.Wait(lock); | |
341 | return status != DELETED_DIR; | |
342 | } ///< @return true if we don't need to recreate the collection | |
343 | }; | |
344 | typedef ceph::shared_ptr<DeletingState> DeletingStateRef; | |
345 | ||
346 | class OSD; | |
347 | ||
7c673cae FG |
348 | class OSDService { |
349 | public: | |
350 | OSD *osd; | |
351 | CephContext *cct; | |
352 | SharedPtrRegistry<spg_t, ObjectStore::Sequencer> osr_registry; | |
353 | ceph::shared_ptr<ObjectStore::Sequencer> meta_osr; | |
354 | SharedPtrRegistry<spg_t, DeletingState> deleting_pgs; | |
355 | const int whoami; | |
356 | ObjectStore *&store; | |
357 | LogClient &log_client; | |
358 | LogChannelRef clog; | |
359 | PGRecoveryStats &pg_recovery_stats; | |
360 | private: | |
361 | Messenger *&cluster_messenger; | |
362 | Messenger *&client_messenger; | |
363 | public: | |
364 | PerfCounters *&logger; | |
365 | PerfCounters *&recoverystate_perf; | |
366 | MonClient *&monc; | |
367 | ThreadPool::BatchWorkQueue<PG> &peering_wq; | |
368 | GenContextWQ recovery_gen_wq; | |
369 | ClassHandler *&class_handler; | |
370 | ||
371 | void enqueue_back(spg_t pgid, PGQueueable qi); | |
372 | void enqueue_front(spg_t pgid, PGQueueable qi); | |
373 | ||
374 | void maybe_inject_dispatch_delay() { | |
375 | if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) { | |
376 | if (rand() % 10000 < | |
377 | g_conf->osd_debug_inject_dispatch_delay_probability * 10000) { | |
378 | utime_t t; | |
379 | t.set_from_double(g_conf->osd_debug_inject_dispatch_delay_duration); | |
380 | t.sleep(); | |
381 | } | |
382 | } | |
383 | } | |
384 | ||
385 | private: | |
386 | // -- map epoch lower bound -- | |
387 | Mutex pg_epoch_lock; | |
388 | multiset<epoch_t> pg_epochs; | |
389 | map<spg_t,epoch_t> pg_epoch; | |
390 | ||
391 | public: | |
392 | void pg_add_epoch(spg_t pgid, epoch_t epoch) { | |
393 | Mutex::Locker l(pg_epoch_lock); | |
394 | map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid); | |
395 | assert(t == pg_epoch.end()); | |
396 | pg_epoch[pgid] = epoch; | |
397 | pg_epochs.insert(epoch); | |
398 | } | |
399 | void pg_update_epoch(spg_t pgid, epoch_t epoch) { | |
400 | Mutex::Locker l(pg_epoch_lock); | |
401 | map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid); | |
402 | assert(t != pg_epoch.end()); | |
403 | pg_epochs.erase(pg_epochs.find(t->second)); | |
404 | t->second = epoch; | |
405 | pg_epochs.insert(epoch); | |
406 | } | |
407 | void pg_remove_epoch(spg_t pgid) { | |
408 | Mutex::Locker l(pg_epoch_lock); | |
409 | map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid); | |
410 | if (t != pg_epoch.end()) { | |
411 | pg_epochs.erase(pg_epochs.find(t->second)); | |
412 | pg_epoch.erase(t); | |
413 | } | |
414 | } | |
415 | epoch_t get_min_pg_epoch() { | |
416 | Mutex::Locker l(pg_epoch_lock); | |
417 | if (pg_epochs.empty()) | |
418 | return 0; | |
419 | else | |
420 | return *pg_epochs.begin(); | |
421 | } | |
422 | ||
423 | private: | |
424 | // -- superblock -- | |
425 | Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish | |
426 | OSDSuperblock superblock; | |
427 | ||
428 | public: | |
429 | OSDSuperblock get_superblock() { | |
430 | Mutex::Locker l(publish_lock); | |
431 | return superblock; | |
432 | } | |
433 | void publish_superblock(const OSDSuperblock &block) { | |
434 | Mutex::Locker l(publish_lock); | |
435 | superblock = block; | |
436 | } | |
437 | ||
438 | int get_nodeid() const { return whoami; } | |
439 | ||
440 | std::atomic<epoch_t> max_oldest_map; | |
441 | private: | |
442 | OSDMapRef osdmap; | |
443 | ||
444 | public: | |
445 | OSDMapRef get_osdmap() { | |
446 | Mutex::Locker l(publish_lock); | |
447 | return osdmap; | |
448 | } | |
449 | epoch_t get_osdmap_epoch() { | |
450 | Mutex::Locker l(publish_lock); | |
451 | return osdmap ? osdmap->get_epoch() : 0; | |
452 | } | |
453 | void publish_map(OSDMapRef map) { | |
454 | Mutex::Locker l(publish_lock); | |
455 | osdmap = map; | |
456 | } | |
457 | ||
458 | /* | |
459 | * osdmap - current published map | |
460 | * next_osdmap - pre_published map that is about to be published. | |
461 | * | |
462 | * We use the next_osdmap to send messages and initiate connections, | |
463 | * but only if the target is the same instance as the one in the map | |
464 | * epoch the current user is working from (i.e., the result is | |
465 | * equivalent to what is in next_osdmap). | |
466 | * | |
467 | * This allows the helpers to start ignoring osds that are about to | |
468 | * go down, and let OSD::handle_osd_map()/note_down_osd() mark them | |
469 | * down, without worrying about reopening connections from threads | |
470 | * working from old maps. | |
471 | */ | |
472 | private: | |
473 | OSDMapRef next_osdmap; | |
474 | Cond pre_publish_cond; | |
475 | ||
476 | public: | |
477 | void pre_publish_map(OSDMapRef map) { | |
478 | Mutex::Locker l(pre_publish_lock); | |
479 | next_osdmap = std::move(map); | |
480 | } | |
481 | ||
482 | void activate_map(); | |
483 | /// map epochs reserved below | |
484 | map<epoch_t, unsigned> map_reservations; | |
485 | ||
486 | /// gets ref to next_osdmap and registers the epoch as reserved | |
487 | OSDMapRef get_nextmap_reserved() { | |
488 | Mutex::Locker l(pre_publish_lock); | |
489 | if (!next_osdmap) | |
490 | return OSDMapRef(); | |
491 | epoch_t e = next_osdmap->get_epoch(); | |
492 | map<epoch_t, unsigned>::iterator i = | |
493 | map_reservations.insert(make_pair(e, 0)).first; | |
494 | i->second++; | |
495 | return next_osdmap; | |
496 | } | |
497 | /// releases reservation on map | |
498 | void release_map(OSDMapRef osdmap) { | |
499 | Mutex::Locker l(pre_publish_lock); | |
500 | map<epoch_t, unsigned>::iterator i = | |
501 | map_reservations.find(osdmap->get_epoch()); | |
502 | assert(i != map_reservations.end()); | |
503 | assert(i->second > 0); | |
504 | if (--(i->second) == 0) { | |
505 | map_reservations.erase(i); | |
506 | } | |
507 | pre_publish_cond.Signal(); | |
508 | } | |
509 | /// blocks until there are no reserved maps prior to next_osdmap | |
510 | void await_reserved_maps() { | |
511 | Mutex::Locker l(pre_publish_lock); | |
512 | assert(next_osdmap); | |
513 | while (true) { | |
514 | map<epoch_t, unsigned>::const_iterator i = map_reservations.cbegin(); | |
515 | if (i == map_reservations.cend() || i->first >= next_osdmap->get_epoch()) { | |
516 | break; | |
517 | } else { | |
518 | pre_publish_cond.Wait(pre_publish_lock); | |
519 | } | |
520 | } | |
521 | } | |
522 | ||
523 | private: | |
524 | Mutex peer_map_epoch_lock; | |
525 | map<int, epoch_t> peer_map_epoch; | |
526 | public: | |
527 | epoch_t get_peer_epoch(int p); | |
528 | epoch_t note_peer_epoch(int p, epoch_t e); | |
529 | void forget_peer_epoch(int p, epoch_t e); | |
530 | ||
531 | void send_map(class MOSDMap *m, Connection *con); | |
532 | void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap); | |
533 | MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to, | |
534 | OSDSuperblock& superblock); | |
535 | bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch, | |
536 | const OSDMapRef& osdmap, const epoch_t *sent_epoch_p); | |
537 | void share_map(entity_name_t name, Connection *con, epoch_t epoch, | |
538 | OSDMapRef& osdmap, epoch_t *sent_epoch_p); | |
539 | void share_map_peer(int peer, Connection *con, | |
540 | OSDMapRef map = OSDMapRef()); | |
541 | ||
542 | ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch); | |
543 | pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front) | |
544 | void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch); | |
545 | void send_message_osd_cluster(Message *m, Connection *con) { | |
546 | con->send_message(m); | |
547 | } | |
548 | void send_message_osd_cluster(Message *m, const ConnectionRef& con) { | |
549 | con->send_message(m); | |
550 | } | |
551 | void send_message_osd_client(Message *m, Connection *con) { | |
552 | con->send_message(m); | |
553 | } | |
554 | void send_message_osd_client(Message *m, const ConnectionRef& con) { | |
555 | con->send_message(m); | |
556 | } | |
557 | entity_name_t get_cluster_msgr_name() { | |
558 | return cluster_messenger->get_myname(); | |
559 | } | |
560 | ||
561 | private: | |
562 | // -- scrub scheduling -- | |
563 | Mutex sched_scrub_lock; | |
564 | int scrubs_pending; | |
565 | int scrubs_active; | |
566 | ||
567 | public: | |
568 | struct ScrubJob { | |
569 | CephContext* cct; | |
570 | /// pg to be scrubbed | |
571 | spg_t pgid; | |
572 | /// a time scheduled for scrub. but the scrub could be delayed if system | |
573 | /// load is too high or it fails to fall in the scrub hours | |
574 | utime_t sched_time; | |
575 | /// the hard upper bound of scrub time | |
576 | utime_t deadline; | |
577 | ScrubJob() : cct(nullptr) {} | |
578 | explicit ScrubJob(CephContext* cct, const spg_t& pg, | |
579 | const utime_t& timestamp, | |
580 | double pool_scrub_min_interval = 0, | |
581 | double pool_scrub_max_interval = 0, bool must = true); | |
582 | /// order the jobs by sched_time | |
583 | bool operator<(const ScrubJob& rhs) const; | |
584 | }; | |
585 | set<ScrubJob> sched_scrub_pg; | |
586 | ||
587 | /// @returns the scrub_reg_stamp used for unregister the scrub job | |
588 | utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval, | |
589 | double pool_scrub_max_interval, bool must) { | |
590 | ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval, | |
591 | must); | |
592 | Mutex::Locker l(sched_scrub_lock); | |
593 | sched_scrub_pg.insert(scrub); | |
594 | return scrub.sched_time; | |
595 | } | |
596 | void unreg_pg_scrub(spg_t pgid, utime_t t) { | |
597 | Mutex::Locker l(sched_scrub_lock); | |
598 | size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t)); | |
599 | assert(removed); | |
600 | } | |
601 | bool first_scrub_stamp(ScrubJob *out) { | |
602 | Mutex::Locker l(sched_scrub_lock); | |
603 | if (sched_scrub_pg.empty()) | |
604 | return false; | |
605 | set<ScrubJob>::iterator iter = sched_scrub_pg.begin(); | |
606 | *out = *iter; | |
607 | return true; | |
608 | } | |
609 | bool next_scrub_stamp(const ScrubJob& next, | |
610 | ScrubJob *out) { | |
611 | Mutex::Locker l(sched_scrub_lock); | |
612 | if (sched_scrub_pg.empty()) | |
613 | return false; | |
614 | set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next); | |
615 | if (iter == sched_scrub_pg.cend()) | |
616 | return false; | |
617 | ++iter; | |
618 | if (iter == sched_scrub_pg.cend()) | |
619 | return false; | |
620 | *out = *iter; | |
621 | return true; | |
622 | } | |
623 | ||
624 | void dumps_scrub(Formatter *f) { | |
625 | assert(f != nullptr); | |
626 | Mutex::Locker l(sched_scrub_lock); | |
627 | ||
628 | f->open_array_section("scrubs"); | |
629 | for (const auto &i: sched_scrub_pg) { | |
630 | f->open_object_section("scrub"); | |
631 | f->dump_stream("pgid") << i.pgid; | |
632 | f->dump_stream("sched_time") << i.sched_time; | |
633 | f->dump_stream("deadline") << i.deadline; | |
634 | f->dump_bool("forced", i.sched_time == i.deadline); | |
635 | f->close_section(); | |
636 | } | |
637 | f->close_section(); | |
638 | } | |
639 | ||
640 | bool can_inc_scrubs_pending(); | |
641 | bool inc_scrubs_pending(); | |
642 | void inc_scrubs_active(bool reserved); | |
643 | void dec_scrubs_pending(); | |
644 | void dec_scrubs_active(); | |
645 | ||
646 | void reply_op_error(OpRequestRef op, int err); | |
647 | void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv); | |
648 | void handle_misdirected_op(PG *pg, OpRequestRef op); | |
649 | ||
650 | ||
651 | private: | |
652 | // -- agent shared state -- | |
653 | Mutex agent_lock; | |
654 | Cond agent_cond; | |
655 | map<uint64_t, set<PGRef> > agent_queue; | |
656 | set<PGRef>::iterator agent_queue_pos; | |
657 | bool agent_valid_iterator; | |
658 | int agent_ops; | |
659 | int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed | |
660 | set<hobject_t> agent_oids; | |
661 | bool agent_active; | |
662 | struct AgentThread : public Thread { | |
663 | OSDService *osd; | |
664 | explicit AgentThread(OSDService *o) : osd(o) {} | |
665 | void *entry() override { | |
666 | osd->agent_entry(); | |
667 | return NULL; | |
668 | } | |
669 | } agent_thread; | |
670 | bool agent_stop_flag; | |
671 | Mutex agent_timer_lock; | |
672 | SafeTimer agent_timer; | |
673 | ||
674 | public: | |
675 | void agent_entry(); | |
676 | void agent_stop(); | |
677 | ||
678 | void _enqueue(PG *pg, uint64_t priority) { | |
679 | if (!agent_queue.empty() && | |
680 | agent_queue.rbegin()->first < priority) | |
681 | agent_valid_iterator = false; // inserting higher-priority queue | |
682 | set<PGRef>& nq = agent_queue[priority]; | |
683 | if (nq.empty()) | |
684 | agent_cond.Signal(); | |
685 | nq.insert(pg); | |
686 | } | |
687 | ||
688 | void _dequeue(PG *pg, uint64_t old_priority) { | |
689 | set<PGRef>& oq = agent_queue[old_priority]; | |
690 | set<PGRef>::iterator p = oq.find(pg); | |
691 | assert(p != oq.end()); | |
692 | if (p == agent_queue_pos) | |
693 | ++agent_queue_pos; | |
694 | oq.erase(p); | |
695 | if (oq.empty()) { | |
696 | if (agent_queue.rbegin()->first == old_priority) | |
697 | agent_valid_iterator = false; | |
698 | agent_queue.erase(old_priority); | |
699 | } | |
700 | } | |
701 | ||
702 | /// enable agent for a pg | |
703 | void agent_enable_pg(PG *pg, uint64_t priority) { | |
704 | Mutex::Locker l(agent_lock); | |
705 | _enqueue(pg, priority); | |
706 | } | |
707 | ||
708 | /// adjust priority for an enagled pg | |
709 | void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) { | |
710 | Mutex::Locker l(agent_lock); | |
711 | assert(new_priority != old_priority); | |
712 | _enqueue(pg, new_priority); | |
713 | _dequeue(pg, old_priority); | |
714 | } | |
715 | ||
716 | /// disable agent for a pg | |
717 | void agent_disable_pg(PG *pg, uint64_t old_priority) { | |
718 | Mutex::Locker l(agent_lock); | |
719 | _dequeue(pg, old_priority); | |
720 | } | |
721 | ||
722 | /// note start of an async (evict) op | |
723 | void agent_start_evict_op() { | |
724 | Mutex::Locker l(agent_lock); | |
725 | ++agent_ops; | |
726 | } | |
727 | ||
728 | /// note finish or cancellation of an async (evict) op | |
729 | void agent_finish_evict_op() { | |
730 | Mutex::Locker l(agent_lock); | |
731 | assert(agent_ops > 0); | |
732 | --agent_ops; | |
733 | agent_cond.Signal(); | |
734 | } | |
735 | ||
736 | /// note start of an async (flush) op | |
737 | void agent_start_op(const hobject_t& oid) { | |
738 | Mutex::Locker l(agent_lock); | |
739 | ++agent_ops; | |
740 | assert(agent_oids.count(oid) == 0); | |
741 | agent_oids.insert(oid); | |
742 | } | |
743 | ||
744 | /// note finish or cancellation of an async (flush) op | |
745 | void agent_finish_op(const hobject_t& oid) { | |
746 | Mutex::Locker l(agent_lock); | |
747 | assert(agent_ops > 0); | |
748 | --agent_ops; | |
749 | assert(agent_oids.count(oid) == 1); | |
750 | agent_oids.erase(oid); | |
751 | agent_cond.Signal(); | |
752 | } | |
753 | ||
754 | /// check if we are operating on an object | |
755 | bool agent_is_active_oid(const hobject_t& oid) { | |
756 | Mutex::Locker l(agent_lock); | |
757 | return agent_oids.count(oid); | |
758 | } | |
759 | ||
760 | /// get count of active agent ops | |
761 | int agent_get_num_ops() { | |
762 | Mutex::Locker l(agent_lock); | |
763 | return agent_ops; | |
764 | } | |
765 | ||
766 | void agent_inc_high_count() { | |
767 | Mutex::Locker l(agent_lock); | |
768 | flush_mode_high_count ++; | |
769 | } | |
770 | ||
771 | void agent_dec_high_count() { | |
772 | Mutex::Locker l(agent_lock); | |
773 | flush_mode_high_count --; | |
774 | } | |
775 | ||
776 | private: | |
777 | /// throttle promotion attempts | |
778 | std::atomic_uint promote_probability_millis{1000}; ///< probability thousands. one word. | |
779 | PromoteCounter promote_counter; | |
780 | utime_t last_recalibrate; | |
781 | unsigned long promote_max_objects, promote_max_bytes; | |
782 | ||
783 | public: | |
784 | bool promote_throttle() { | |
785 | // NOTE: lockless! we rely on the probability being a single word. | |
786 | promote_counter.attempt(); | |
787 | if ((unsigned)rand() % 1000 > promote_probability_millis) | |
788 | return true; // yes throttle (no promote) | |
789 | if (promote_max_objects && | |
790 | promote_counter.objects > promote_max_objects) | |
791 | return true; // yes throttle | |
792 | if (promote_max_bytes && | |
793 | promote_counter.bytes > promote_max_bytes) | |
794 | return true; // yes throttle | |
795 | return false; // no throttle (promote) | |
796 | } | |
797 | void promote_finish(uint64_t bytes) { | |
798 | promote_counter.finish(bytes); | |
799 | } | |
800 | void promote_throttle_recalibrate(); | |
801 | ||
802 | // -- Objecter, for tiering reads/writes from/to other OSDs -- | |
803 | Objecter *objecter; | |
804 | Finisher objecter_finisher; | |
805 | ||
806 | // -- Watch -- | |
807 | Mutex watch_lock; | |
808 | SafeTimer watch_timer; | |
809 | uint64_t next_notif_id; | |
810 | uint64_t get_next_id(epoch_t cur_epoch) { | |
811 | Mutex::Locker l(watch_lock); | |
812 | return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++)); | |
813 | } | |
814 | ||
815 | // -- Recovery/Backfill Request Scheduling -- | |
816 | Mutex recovery_request_lock; | |
817 | SafeTimer recovery_request_timer; | |
818 | ||
31f18b77 FG |
819 | // For async recovery sleep |
820 | bool recovery_needs_sleep = true; | |
821 | utime_t recovery_schedule_time = utime_t(); | |
822 | ||
823 | Mutex recovery_sleep_lock; | |
824 | SafeTimer recovery_sleep_timer; | |
825 | ||
7c673cae FG |
826 | // -- tids -- |
827 | // for ops i issue | |
828 | std::atomic_uint last_tid{0}; | |
829 | ceph_tid_t get_tid() { | |
830 | return (ceph_tid_t)last_tid++; | |
831 | } | |
832 | ||
833 | // -- backfill_reservation -- | |
834 | Finisher reserver_finisher; | |
835 | AsyncReserver<spg_t> local_reserver; | |
836 | AsyncReserver<spg_t> remote_reserver; | |
837 | ||
838 | // -- pg_temp -- | |
839 | private: | |
840 | Mutex pg_temp_lock; | |
94b18763 FG |
841 | struct pg_temp_t { |
842 | pg_temp_t() | |
843 | {} | |
844 | pg_temp_t(vector<int> v, bool f) | |
845 | : acting{v}, forced{f} | |
846 | {} | |
847 | vector<int> acting; | |
848 | bool forced = false; | |
849 | }; | |
850 | map<pg_t, pg_temp_t> pg_temp_wanted; | |
851 | map<pg_t, pg_temp_t> pg_temp_pending; | |
7c673cae | 852 | void _sent_pg_temp(); |
94b18763 | 853 | friend std::ostream& operator<<(std::ostream&, const pg_temp_t&); |
7c673cae | 854 | public: |
94b18763 FG |
855 | void queue_want_pg_temp(pg_t pgid, const vector<int>& want, |
856 | bool forced = false); | |
7c673cae FG |
857 | void remove_want_pg_temp(pg_t pgid); |
858 | void requeue_pg_temp(); | |
859 | void send_pg_temp(); | |
860 | ||
861 | void send_pg_created(pg_t pgid); | |
862 | ||
863 | void queue_for_peering(PG *pg); | |
864 | ||
865 | Mutex snap_sleep_lock; | |
866 | SafeTimer snap_sleep_timer; | |
867 | ||
31f18b77 FG |
868 | Mutex scrub_sleep_lock; |
869 | SafeTimer scrub_sleep_timer; | |
870 | ||
7c673cae FG |
871 | AsyncReserver<spg_t> snap_reserver; |
872 | void queue_for_snap_trim(PG *pg); | |
873 | ||
31f18b77 FG |
874 | void queue_for_scrub(PG *pg, bool with_high_priority) { |
875 | unsigned scrub_queue_priority = pg->scrubber.priority; | |
876 | if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) { | |
877 | scrub_queue_priority = cct->_conf->osd_client_op_priority; | |
878 | } | |
7c673cae FG |
879 | enqueue_back( |
880 | pg->info.pgid, | |
881 | PGQueueable( | |
882 | PGScrub(pg->get_osdmap()->get_epoch()), | |
883 | cct->_conf->osd_scrub_cost, | |
31f18b77 | 884 | scrub_queue_priority, |
7c673cae FG |
885 | ceph_clock_now(), |
886 | entity_inst_t(), | |
887 | pg->get_osdmap()->get_epoch())); | |
888 | } | |
889 | ||
890 | private: | |
891 | // -- pg recovery and associated throttling -- | |
892 | Mutex recovery_lock; | |
893 | list<pair<epoch_t, PGRef> > awaiting_throttle; | |
894 | ||
895 | utime_t defer_recovery_until; | |
896 | uint64_t recovery_ops_active; | |
897 | uint64_t recovery_ops_reserved; | |
898 | bool recovery_paused; | |
899 | #ifdef DEBUG_RECOVERY_OIDS | |
900 | map<spg_t, set<hobject_t> > recovery_oids; | |
901 | #endif | |
902 | bool _recover_now(uint64_t *available_pushes); | |
903 | void _maybe_queue_recovery(); | |
904 | void _queue_for_recovery( | |
905 | pair<epoch_t, PGRef> p, uint64_t reserved_pushes) { | |
906 | assert(recovery_lock.is_locked_by_me()); | |
907 | enqueue_back( | |
908 | p.second->info.pgid, | |
909 | PGQueueable( | |
910 | PGRecovery(p.first, reserved_pushes), | |
911 | cct->_conf->osd_recovery_cost, | |
912 | cct->_conf->osd_recovery_priority, | |
913 | ceph_clock_now(), | |
914 | entity_inst_t(), | |
915 | p.first)); | |
916 | } | |
917 | public: | |
918 | void start_recovery_op(PG *pg, const hobject_t& soid); | |
919 | void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue); | |
920 | bool is_recovery_active(); | |
921 | void release_reserved_pushes(uint64_t pushes) { | |
922 | Mutex::Locker l(recovery_lock); | |
923 | assert(recovery_ops_reserved >= pushes); | |
924 | recovery_ops_reserved -= pushes; | |
925 | _maybe_queue_recovery(); | |
926 | } | |
927 | void defer_recovery(float defer_for) { | |
928 | defer_recovery_until = ceph_clock_now(); | |
929 | defer_recovery_until += defer_for; | |
930 | } | |
931 | void pause_recovery() { | |
932 | Mutex::Locker l(recovery_lock); | |
933 | recovery_paused = true; | |
934 | } | |
935 | bool recovery_is_paused() { | |
936 | Mutex::Locker l(recovery_lock); | |
937 | return recovery_paused; | |
938 | } | |
939 | void unpause_recovery() { | |
940 | Mutex::Locker l(recovery_lock); | |
941 | recovery_paused = false; | |
942 | _maybe_queue_recovery(); | |
943 | } | |
944 | void kick_recovery_queue() { | |
945 | Mutex::Locker l(recovery_lock); | |
946 | _maybe_queue_recovery(); | |
947 | } | |
948 | void clear_queued_recovery(PG *pg) { | |
949 | Mutex::Locker l(recovery_lock); | |
950 | for (list<pair<epoch_t, PGRef> >::iterator i = awaiting_throttle.begin(); | |
951 | i != awaiting_throttle.end(); | |
952 | ) { | |
953 | if (i->second.get() == pg) { | |
954 | awaiting_throttle.erase(i); | |
955 | return; | |
956 | } else { | |
957 | ++i; | |
958 | } | |
959 | } | |
960 | } | |
961 | // delayed pg activation | |
c07f9fc5 | 962 | void queue_for_recovery(PG *pg) { |
7c673cae | 963 | Mutex::Locker l(recovery_lock); |
c07f9fc5 FG |
964 | |
965 | if (pg->get_state() & (PG_STATE_FORCED_RECOVERY | PG_STATE_FORCED_BACKFILL)) { | |
7c673cae FG |
966 | awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg)); |
967 | } else { | |
968 | awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg)); | |
969 | } | |
970 | _maybe_queue_recovery(); | |
971 | } | |
31f18b77 FG |
972 | void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) { |
973 | Mutex::Locker l(recovery_lock); | |
974 | _queue_for_recovery(make_pair(queued, pg), reserved_pushes); | |
975 | } | |
7c673cae | 976 | |
d2e6a577 | 977 | void adjust_pg_priorities(const vector<PGRef>& pgs, int newflags); |
7c673cae FG |
978 | |
979 | // osd map cache (past osd maps) | |
980 | Mutex map_cache_lock; | |
981 | SharedLRU<epoch_t, const OSDMap> map_cache; | |
982 | SimpleLRU<epoch_t, bufferlist> map_bl_cache; | |
983 | SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache; | |
984 | ||
985 | OSDMapRef try_get_map(epoch_t e); | |
986 | OSDMapRef get_map(epoch_t e) { | |
987 | OSDMapRef ret(try_get_map(e)); | |
988 | assert(ret); | |
989 | return ret; | |
990 | } | |
991 | OSDMapRef add_map(OSDMap *o) { | |
992 | Mutex::Locker l(map_cache_lock); | |
993 | return _add_map(o); | |
994 | } | |
995 | OSDMapRef _add_map(OSDMap *o); | |
996 | ||
997 | void add_map_bl(epoch_t e, bufferlist& bl) { | |
998 | Mutex::Locker l(map_cache_lock); | |
999 | return _add_map_bl(e, bl); | |
1000 | } | |
1001 | void pin_map_bl(epoch_t e, bufferlist &bl); | |
1002 | void _add_map_bl(epoch_t e, bufferlist& bl); | |
1003 | bool get_map_bl(epoch_t e, bufferlist& bl) { | |
1004 | Mutex::Locker l(map_cache_lock); | |
1005 | return _get_map_bl(e, bl); | |
1006 | } | |
1007 | bool _get_map_bl(epoch_t e, bufferlist& bl); | |
1008 | ||
1009 | void add_map_inc_bl(epoch_t e, bufferlist& bl) { | |
1010 | Mutex::Locker l(map_cache_lock); | |
1011 | return _add_map_inc_bl(e, bl); | |
1012 | } | |
1013 | void pin_map_inc_bl(epoch_t e, bufferlist &bl); | |
1014 | void _add_map_inc_bl(epoch_t e, bufferlist& bl); | |
1015 | bool get_inc_map_bl(epoch_t e, bufferlist& bl); | |
1016 | ||
1017 | void clear_map_bl_cache_pins(epoch_t e); | |
1018 | ||
1019 | void need_heartbeat_peer_update(); | |
1020 | ||
1021 | void pg_stat_queue_enqueue(PG *pg); | |
1022 | void pg_stat_queue_dequeue(PG *pg); | |
1023 | ||
1024 | void init(); | |
1025 | void final_init(); | |
1026 | void start_shutdown(); | |
31f18b77 | 1027 | void shutdown_reserver(); |
7c673cae FG |
1028 | void shutdown(); |
1029 | ||
1030 | private: | |
1031 | // split | |
1032 | Mutex in_progress_split_lock; | |
1033 | map<spg_t, spg_t> pending_splits; // child -> parent | |
1034 | map<spg_t, set<spg_t> > rev_pending_splits; // parent -> [children] | |
1035 | set<spg_t> in_progress_splits; // child | |
1036 | ||
1037 | public: | |
1038 | void _start_split(spg_t parent, const set<spg_t> &children); | |
1039 | void start_split(spg_t parent, const set<spg_t> &children) { | |
1040 | Mutex::Locker l(in_progress_split_lock); | |
1041 | return _start_split(parent, children); | |
1042 | } | |
1043 | void mark_split_in_progress(spg_t parent, const set<spg_t> &pgs); | |
1044 | void complete_split(const set<spg_t> &pgs); | |
1045 | void cancel_pending_splits_for_parent(spg_t parent); | |
1046 | void _cancel_pending_splits_for_parent(spg_t parent); | |
1047 | bool splitting(spg_t pgid); | |
1048 | void expand_pg_num(OSDMapRef old_map, | |
1049 | OSDMapRef new_map); | |
1050 | void _maybe_split_pgid(OSDMapRef old_map, | |
1051 | OSDMapRef new_map, | |
1052 | spg_t pgid); | |
1053 | void init_splits_between(spg_t pgid, OSDMapRef frommap, OSDMapRef tomap); | |
1054 | ||
1055 | // -- stats -- | |
1056 | Mutex stat_lock; | |
1057 | osd_stat_t osd_stat; | |
31f18b77 | 1058 | uint32_t seq = 0; |
7c673cae FG |
1059 | |
1060 | void update_osd_stat(vector<int>& hb_peers); | |
224ce89b | 1061 | osd_stat_t set_osd_stat(const struct store_statfs_t &stbuf, |
35e4c445 FG |
1062 | vector<int>& hb_peers, |
1063 | int num_pgs); | |
7c673cae FG |
1064 | osd_stat_t get_osd_stat() { |
1065 | Mutex::Locker l(stat_lock); | |
31f18b77 FG |
1066 | ++seq; |
1067 | osd_stat.up_from = up_epoch; | |
1068 | osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq; | |
7c673cae FG |
1069 | return osd_stat; |
1070 | } | |
31f18b77 FG |
1071 | uint64_t get_osd_stat_seq() { |
1072 | Mutex::Locker l(stat_lock); | |
1073 | return osd_stat.seq; | |
1074 | } | |
7c673cae FG |
1075 | |
1076 | // -- OSD Full Status -- | |
1077 | private: | |
1078 | friend TestOpsSocketHook; | |
1079 | mutable Mutex full_status_lock; | |
1080 | enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending | |
1081 | const char *get_full_state_name(s_names s) const { | |
1082 | switch (s) { | |
1083 | case NONE: return "none"; | |
1084 | case NEARFULL: return "nearfull"; | |
1085 | case BACKFILLFULL: return "backfillfull"; | |
1086 | case FULL: return "full"; | |
1087 | case FAILSAFE: return "failsafe"; | |
1088 | default: return "???"; | |
1089 | } | |
1090 | } | |
1091 | s_names get_full_state(string type) const { | |
1092 | if (type == "none") | |
1093 | return NONE; | |
1094 | else if (type == "failsafe") | |
1095 | return FAILSAFE; | |
1096 | else if (type == "full") | |
1097 | return FULL; | |
1098 | else if (type == "backfillfull") | |
1099 | return BACKFILLFULL; | |
1100 | else if (type == "nearfull") | |
1101 | return NEARFULL; | |
1102 | else | |
1103 | return INVALID; | |
1104 | } | |
1105 | double cur_ratio; ///< current utilization | |
1106 | mutable int64_t injectfull = 0; | |
1107 | s_names injectfull_state = NONE; | |
1108 | float get_failsafe_full_ratio(); | |
224ce89b | 1109 | void check_full_status(float ratio); |
7c673cae FG |
1110 | bool _check_full(s_names type, ostream &ss) const; |
1111 | public: | |
1112 | bool check_failsafe_full(ostream &ss) const; | |
1113 | bool check_full(ostream &ss) const; | |
1114 | bool check_backfill_full(ostream &ss) const; | |
1115 | bool check_nearfull(ostream &ss) const; | |
1116 | bool is_failsafe_full() const; | |
1117 | bool is_full() const; | |
1118 | bool is_backfillfull() const; | |
1119 | bool is_nearfull() const; | |
1120 | bool need_fullness_update(); ///< osdmap state needs update | |
1121 | void set_injectfull(s_names type, int64_t count); | |
1122 | bool check_osdmap_full(const set<pg_shard_t> &missing_on); | |
1123 | ||
1124 | ||
1125 | // -- epochs -- | |
1126 | private: | |
1127 | mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch | |
1128 | epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started) | |
1129 | epoch_t up_epoch; // _most_recent_ epoch we were marked up | |
1130 | epoch_t bind_epoch; // epoch we last did a bind to new ip:ports | |
1131 | public: | |
1132 | /** | |
1133 | * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params | |
1134 | * can be NULL if you don't care about them. | |
1135 | */ | |
1136 | void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch, | |
1137 | epoch_t *_bind_epoch) const; | |
1138 | /** | |
1139 | * Set the boot, up, and bind epochs. Any NULL params will not be set. | |
1140 | */ | |
1141 | void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch, | |
1142 | const epoch_t *_bind_epoch); | |
1143 | epoch_t get_boot_epoch() const { | |
1144 | epoch_t ret; | |
1145 | retrieve_epochs(&ret, NULL, NULL); | |
1146 | return ret; | |
1147 | } | |
1148 | epoch_t get_up_epoch() const { | |
1149 | epoch_t ret; | |
1150 | retrieve_epochs(NULL, &ret, NULL); | |
1151 | return ret; | |
1152 | } | |
1153 | epoch_t get_bind_epoch() const { | |
1154 | epoch_t ret; | |
1155 | retrieve_epochs(NULL, NULL, &ret); | |
1156 | return ret; | |
1157 | } | |
1158 | ||
181888fb FG |
1159 | void request_osdmap_update(epoch_t e); |
1160 | ||
7c673cae FG |
1161 | // -- stopping -- |
1162 | Mutex is_stopping_lock; | |
1163 | Cond is_stopping_cond; | |
1164 | enum { | |
1165 | NOT_STOPPING, | |
1166 | PREPARING_TO_STOP, | |
1167 | STOPPING }; | |
1168 | std::atomic_int state{NOT_STOPPING}; | |
1169 | int get_state() { | |
1170 | return state; | |
1171 | } | |
1172 | void set_state(int s) { | |
1173 | state = s; | |
1174 | } | |
1175 | bool is_stopping() const { | |
1176 | return state == STOPPING; | |
1177 | } | |
1178 | bool is_preparing_to_stop() const { | |
1179 | return state == PREPARING_TO_STOP; | |
1180 | } | |
1181 | bool prepare_to_stop(); | |
1182 | void got_stop_ack(); | |
1183 | ||
1184 | ||
1185 | #ifdef PG_DEBUG_REFS | |
1186 | Mutex pgid_lock; | |
1187 | map<spg_t, int> pgid_tracker; | |
1188 | map<spg_t, PG*> live_pgs; | |
31f18b77 FG |
1189 | void add_pgid(spg_t pgid, PG *pg); |
1190 | void remove_pgid(spg_t pgid, PG *pg); | |
1191 | void dump_live_pgids(); | |
7c673cae FG |
1192 | #endif |
1193 | ||
1194 | explicit OSDService(OSD *osd); | |
1195 | ~OSDService(); | |
1196 | }; | |
1197 | ||
1198 | class OSD : public Dispatcher, | |
1199 | public md_config_obs_t { | |
1200 | /** OSD **/ | |
1201 | Mutex osd_lock; // global lock | |
1202 | SafeTimer tick_timer; // safe timer (osd_lock) | |
1203 | ||
1204 | // Tick timer for those stuff that do not need osd_lock | |
1205 | Mutex tick_timer_lock; | |
1206 | SafeTimer tick_timer_without_osd_lock; | |
1207 | public: | |
1208 | // config observer bits | |
1209 | const char** get_tracked_conf_keys() const override; | |
1210 | void handle_conf_change(const struct md_config_t *conf, | |
1211 | const std::set <std::string> &changed) override; | |
1212 | void update_log_config(); | |
1213 | void check_config(); | |
1214 | ||
1215 | protected: | |
1216 | ||
1217 | static const double OSD_TICK_INTERVAL; // tick interval for tick_timer and tick_timer_without_osd_lock | |
1218 | ||
1219 | AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry; | |
1220 | AuthAuthorizeHandlerRegistry *authorize_handler_service_registry; | |
1221 | ||
1222 | Messenger *cluster_messenger; | |
1223 | Messenger *client_messenger; | |
1224 | Messenger *objecter_messenger; | |
1225 | MonClient *monc; // check the "monc helpers" list before accessing directly | |
1226 | MgrClient mgrc; | |
1227 | PerfCounters *logger; | |
1228 | PerfCounters *recoverystate_perf; | |
1229 | ObjectStore *store; | |
1230 | #ifdef HAVE_LIBFUSE | |
1231 | FuseStore *fuse_store = nullptr; | |
1232 | #endif | |
1233 | LogClient log_client; | |
1234 | LogChannelRef clog; | |
1235 | ||
1236 | int whoami; | |
1237 | std::string dev_path, journal_path; | |
1238 | ||
31f18b77 | 1239 | bool store_is_rotational = true; |
d2e6a577 | 1240 | bool journal_is_rotational = true; |
31f18b77 | 1241 | |
7c673cae FG |
1242 | ZTracer::Endpoint trace_endpoint; |
1243 | void create_logger(); | |
1244 | void create_recoverystate_perf(); | |
1245 | void tick(); | |
1246 | void tick_without_osd_lock(); | |
1247 | void _dispatch(Message *m); | |
1248 | void dispatch_op(OpRequestRef op); | |
1249 | ||
1250 | void check_osdmap_features(ObjectStore *store); | |
1251 | ||
1252 | // asok | |
1253 | friend class OSDSocketHook; | |
1254 | class OSDSocketHook *asok_hook; | |
1255 | bool asok_command(string admin_command, cmdmap_t& cmdmap, string format, ostream& ss); | |
1256 | ||
1257 | public: | |
1258 | ClassHandler *class_handler = nullptr; | |
1259 | int get_nodeid() { return whoami; } | |
1260 | ||
1261 | static ghobject_t get_osdmap_pobject_name(epoch_t epoch) { | |
1262 | char foo[20]; | |
1263 | snprintf(foo, sizeof(foo), "osdmap.%d", epoch); | |
1264 | return ghobject_t(hobject_t(sobject_t(object_t(foo), 0))); | |
1265 | } | |
1266 | static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) { | |
1267 | char foo[22]; | |
1268 | snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch); | |
1269 | return ghobject_t(hobject_t(sobject_t(object_t(foo), 0))); | |
1270 | } | |
1271 | ||
1272 | static ghobject_t make_snapmapper_oid() { | |
1273 | return ghobject_t(hobject_t( | |
1274 | sobject_t( | |
1275 | object_t("snapmapper"), | |
1276 | 0))); | |
1277 | } | |
1278 | ||
1279 | static ghobject_t make_pg_log_oid(spg_t pg) { | |
1280 | stringstream ss; | |
1281 | ss << "pglog_" << pg; | |
1282 | string s; | |
1283 | getline(ss, s); | |
1284 | return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0))); | |
1285 | } | |
1286 | ||
1287 | static ghobject_t make_pg_biginfo_oid(spg_t pg) { | |
1288 | stringstream ss; | |
1289 | ss << "pginfo_" << pg; | |
1290 | string s; | |
1291 | getline(ss, s); | |
1292 | return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0))); | |
1293 | } | |
1294 | static ghobject_t make_infos_oid() { | |
1295 | hobject_t oid(sobject_t("infos", CEPH_NOSNAP)); | |
1296 | return ghobject_t(oid); | |
1297 | } | |
1298 | static void recursive_remove_collection(CephContext* cct, | |
1299 | ObjectStore *store, | |
1300 | spg_t pgid, | |
1301 | coll_t tmp); | |
1302 | ||
1303 | /** | |
1304 | * get_osd_initial_compat_set() | |
1305 | * | |
1306 | * Get the initial feature set for this OSD. Features | |
1307 | * here are automatically upgraded. | |
1308 | * | |
1309 | * Return value: Initial osd CompatSet | |
1310 | */ | |
1311 | static CompatSet get_osd_initial_compat_set(); | |
1312 | ||
1313 | /** | |
1314 | * get_osd_compat_set() | |
1315 | * | |
1316 | * Get all features supported by this OSD | |
1317 | * | |
1318 | * Return value: CompatSet of all supported features | |
1319 | */ | |
1320 | static CompatSet get_osd_compat_set(); | |
1321 | ||
1322 | ||
1323 | private: | |
1324 | class C_Tick; | |
1325 | class C_Tick_WithoutOSDLock; | |
1326 | ||
1327 | // -- superblock -- | |
1328 | OSDSuperblock superblock; | |
1329 | ||
1330 | void write_superblock(); | |
1331 | void write_superblock(ObjectStore::Transaction& t); | |
1332 | int read_superblock(); | |
1333 | ||
1334 | void clear_temp_objects(); | |
1335 | ||
1336 | CompatSet osd_compat; | |
1337 | ||
1338 | // -- state -- | |
1339 | public: | |
1340 | typedef enum { | |
1341 | STATE_INITIALIZING = 1, | |
1342 | STATE_PREBOOT, | |
1343 | STATE_BOOTING, | |
1344 | STATE_ACTIVE, | |
1345 | STATE_STOPPING, | |
1346 | STATE_WAITING_FOR_HEALTHY | |
1347 | } osd_state_t; | |
1348 | ||
1349 | static const char *get_state_name(int s) { | |
1350 | switch (s) { | |
1351 | case STATE_INITIALIZING: return "initializing"; | |
1352 | case STATE_PREBOOT: return "preboot"; | |
1353 | case STATE_BOOTING: return "booting"; | |
1354 | case STATE_ACTIVE: return "active"; | |
1355 | case STATE_STOPPING: return "stopping"; | |
1356 | case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy"; | |
1357 | default: return "???"; | |
1358 | } | |
1359 | } | |
1360 | ||
1361 | private: | |
1362 | std::atomic_int state{STATE_INITIALIZING}; | |
224ce89b | 1363 | bool waiting_for_luminous_mons = false; |
7c673cae FG |
1364 | |
1365 | public: | |
1366 | int get_state() const { | |
1367 | return state; | |
1368 | } | |
1369 | void set_state(int s) { | |
1370 | state = s; | |
1371 | } | |
1372 | bool is_initializing() const { | |
1373 | return state == STATE_INITIALIZING; | |
1374 | } | |
1375 | bool is_preboot() const { | |
1376 | return state == STATE_PREBOOT; | |
1377 | } | |
1378 | bool is_booting() const { | |
1379 | return state == STATE_BOOTING; | |
1380 | } | |
1381 | bool is_active() const { | |
1382 | return state == STATE_ACTIVE; | |
1383 | } | |
1384 | bool is_stopping() const { | |
1385 | return state == STATE_STOPPING; | |
1386 | } | |
1387 | bool is_waiting_for_healthy() const { | |
1388 | return state == STATE_WAITING_FOR_HEALTHY; | |
1389 | } | |
1390 | ||
1391 | private: | |
1392 | ||
31f18b77 | 1393 | ThreadPool peering_tp; |
7c673cae FG |
1394 | ShardedThreadPool osd_op_tp; |
1395 | ThreadPool disk_tp; | |
1396 | ThreadPool command_tp; | |
1397 | ||
1398 | void set_disk_tp_priority(); | |
1399 | void get_latest_osdmap(); | |
1400 | ||
1401 | // -- sessions -- | |
1402 | private: | |
1403 | void dispatch_session_waiting(Session *session, OSDMapRef osdmap); | |
1404 | void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap); | |
1405 | ||
1406 | Mutex session_waiting_lock; | |
1407 | set<Session*> session_waiting_for_map; | |
1408 | ||
1409 | /// Caller assumes refs for included Sessions | |
1410 | void get_sessions_waiting_for_map(set<Session*> *out) { | |
1411 | Mutex::Locker l(session_waiting_lock); | |
1412 | out->swap(session_waiting_for_map); | |
1413 | } | |
1414 | void register_session_waiting_on_map(Session *session) { | |
1415 | Mutex::Locker l(session_waiting_lock); | |
1416 | if (session_waiting_for_map.insert(session).second) { | |
1417 | session->get(); | |
1418 | } | |
1419 | } | |
1420 | void clear_session_waiting_on_map(Session *session) { | |
1421 | Mutex::Locker l(session_waiting_lock); | |
1422 | set<Session*>::iterator i = session_waiting_for_map.find(session); | |
1423 | if (i != session_waiting_for_map.end()) { | |
1424 | (*i)->put(); | |
1425 | session_waiting_for_map.erase(i); | |
1426 | } | |
1427 | } | |
1428 | void dispatch_sessions_waiting_on_map() { | |
1429 | set<Session*> sessions_to_check; | |
1430 | get_sessions_waiting_for_map(&sessions_to_check); | |
1431 | for (set<Session*>::iterator i = sessions_to_check.begin(); | |
1432 | i != sessions_to_check.end(); | |
1433 | sessions_to_check.erase(i++)) { | |
1434 | (*i)->session_dispatch_lock.Lock(); | |
1435 | dispatch_session_waiting(*i, osdmap); | |
1436 | (*i)->session_dispatch_lock.Unlock(); | |
1437 | (*i)->put(); | |
1438 | } | |
1439 | } | |
1440 | void session_handle_reset(Session *session) { | |
1441 | Mutex::Locker l(session->session_dispatch_lock); | |
1442 | clear_session_waiting_on_map(session); | |
1443 | ||
1444 | session->clear_backoffs(); | |
1445 | ||
1446 | /* Messages have connection refs, we need to clear the | |
1447 | * connection->session->message->connection | |
1448 | * cycles which result. | |
1449 | * Bug #12338 | |
1450 | */ | |
1451 | session->waiting_on_map.clear_and_dispose(TrackedOp::Putter()); | |
1452 | } | |
1453 | ||
1454 | private: | |
1455 | /** | |
1456 | * @defgroup monc helpers | |
1457 | * @{ | |
1458 | * Right now we only have the one | |
1459 | */ | |
1460 | ||
1461 | /** | |
1462 | * Ask the Monitors for a sequence of OSDMaps. | |
1463 | * | |
1464 | * @param epoch The epoch to start with when replying | |
1465 | * @param force_request True if this request forces a new subscription to | |
1466 | * the monitors; false if an outstanding request that encompasses it is | |
1467 | * sufficient. | |
1468 | */ | |
1469 | void osdmap_subscribe(version_t epoch, bool force_request); | |
1470 | /** @} monc helpers */ | |
1471 | ||
181888fb FG |
1472 | Mutex osdmap_subscribe_lock; |
1473 | epoch_t latest_subscribed_epoch{0}; | |
1474 | ||
7c673cae FG |
1475 | // -- heartbeat -- |
1476 | /// information about a heartbeat peer | |
1477 | struct HeartbeatInfo { | |
1478 | int peer; ///< peer | |
1479 | ConnectionRef con_front; ///< peer connection (front) | |
1480 | ConnectionRef con_back; ///< peer connection (back) | |
1481 | utime_t first_tx; ///< time we sent our first ping request | |
1482 | utime_t last_tx; ///< last time we sent a ping request | |
1483 | utime_t last_rx_front; ///< last time we got a ping reply on the front side | |
1484 | utime_t last_rx_back; ///< last time we got a ping reply on the back side | |
1485 | epoch_t epoch; ///< most recent epoch we wanted this peer | |
1486 | ||
1487 | bool is_unhealthy(utime_t cutoff) const { | |
1488 | return | |
1489 | ! ((last_rx_front > cutoff || | |
1490 | (last_rx_front == utime_t() && (last_tx == utime_t() || | |
1491 | first_tx > cutoff))) && | |
1492 | (last_rx_back > cutoff || | |
1493 | (last_rx_back == utime_t() && (last_tx == utime_t() || | |
1494 | first_tx > cutoff)))); | |
1495 | } | |
1496 | bool is_healthy(utime_t cutoff) const { | |
1497 | return last_rx_front > cutoff && last_rx_back > cutoff; | |
1498 | } | |
1499 | ||
1500 | }; | |
1501 | /// state attached to outgoing heartbeat connections | |
1502 | struct HeartbeatSession : public RefCountedObject { | |
1503 | int peer; | |
1504 | explicit HeartbeatSession(int p) : peer(p) {} | |
1505 | }; | |
1506 | Mutex heartbeat_lock; | |
1507 | map<int, int> debug_heartbeat_drops_remaining; | |
1508 | Cond heartbeat_cond; | |
1509 | bool heartbeat_stop; | |
1510 | std::atomic_bool heartbeat_need_update; | |
1511 | map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo | |
1512 | utime_t last_mon_heartbeat; | |
1513 | Messenger *hb_front_client_messenger; | |
1514 | Messenger *hb_back_client_messenger; | |
1515 | Messenger *hb_front_server_messenger; | |
1516 | Messenger *hb_back_server_messenger; | |
1517 | utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state | |
1518 | double daily_loadavg; | |
1519 | ||
1520 | void _add_heartbeat_peer(int p); | |
1521 | void _remove_heartbeat_peer(int p); | |
1522 | bool heartbeat_reset(Connection *con); | |
1523 | void maybe_update_heartbeat_peers(); | |
1524 | void reset_heartbeat_peers(); | |
1525 | bool heartbeat_peers_need_update() { | |
1526 | return heartbeat_need_update.load(); | |
1527 | } | |
1528 | void heartbeat_set_peers_need_update() { | |
1529 | heartbeat_need_update.store(true); | |
1530 | } | |
1531 | void heartbeat_clear_peers_need_update() { | |
1532 | heartbeat_need_update.store(false); | |
1533 | } | |
1534 | void heartbeat(); | |
1535 | void heartbeat_check(); | |
1536 | void heartbeat_entry(); | |
1537 | void need_heartbeat_peer_update(); | |
1538 | ||
1539 | void heartbeat_kick() { | |
1540 | Mutex::Locker l(heartbeat_lock); | |
1541 | heartbeat_cond.Signal(); | |
1542 | } | |
1543 | ||
1544 | struct T_Heartbeat : public Thread { | |
1545 | OSD *osd; | |
1546 | explicit T_Heartbeat(OSD *o) : osd(o) {} | |
1547 | void *entry() override { | |
1548 | osd->heartbeat_entry(); | |
1549 | return 0; | |
1550 | } | |
1551 | } heartbeat_thread; | |
1552 | ||
1553 | public: | |
1554 | bool heartbeat_dispatch(Message *m); | |
1555 | ||
1556 | struct HeartbeatDispatcher : public Dispatcher { | |
1557 | OSD *osd; | |
1558 | explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {} | |
1559 | ||
1560 | bool ms_can_fast_dispatch_any() const override { return true; } | |
1561 | bool ms_can_fast_dispatch(const Message *m) const override { | |
1562 | switch (m->get_type()) { | |
1563 | case CEPH_MSG_PING: | |
1564 | case MSG_OSD_PING: | |
1565 | return true; | |
1566 | default: | |
1567 | return false; | |
1568 | } | |
1569 | } | |
1570 | void ms_fast_dispatch(Message *m) override { | |
1571 | osd->heartbeat_dispatch(m); | |
1572 | } | |
1573 | bool ms_dispatch(Message *m) override { | |
1574 | return osd->heartbeat_dispatch(m); | |
1575 | } | |
1576 | bool ms_handle_reset(Connection *con) override { | |
1577 | return osd->heartbeat_reset(con); | |
1578 | } | |
1579 | void ms_handle_remote_reset(Connection *con) override {} | |
1580 | bool ms_handle_refused(Connection *con) override { | |
1581 | return osd->ms_handle_refused(con); | |
1582 | } | |
1583 | bool ms_verify_authorizer(Connection *con, int peer_type, | |
1584 | int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply, | |
28e407b8 AA |
1585 | bool& isvalid, CryptoKey& session_key, |
1586 | std::unique_ptr<AuthAuthorizerChallenge> *challenge) override { | |
7c673cae FG |
1587 | isvalid = true; |
1588 | return true; | |
1589 | } | |
1590 | } heartbeat_dispatcher; | |
1591 | ||
1592 | private: | |
1593 | // -- waiters -- | |
1594 | list<OpRequestRef> finished; | |
1595 | ||
1596 | void take_waiters(list<OpRequestRef>& ls) { | |
1597 | assert(osd_lock.is_locked()); | |
1598 | finished.splice(finished.end(), ls); | |
1599 | } | |
1600 | void do_waiters(); | |
1601 | ||
1602 | // -- op tracking -- | |
1603 | OpTracker op_tracker; | |
1604 | void check_ops_in_flight(); | |
1605 | void test_ops(std::string command, std::string args, ostream& ss); | |
1606 | friend class TestOpsSocketHook; | |
1607 | TestOpsSocketHook *test_ops_hook; | |
1608 | friend struct C_CompleteSplits; | |
1609 | friend struct C_OpenPGs; | |
1610 | ||
1611 | // -- op queue -- | |
224ce89b | 1612 | enum class io_queue { |
7c673cae | 1613 | prioritized, |
224ce89b WB |
1614 | weightedpriority, |
1615 | mclock_opclass, | |
1616 | mclock_client, | |
7c673cae | 1617 | }; |
224ce89b WB |
1618 | friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q); |
1619 | ||
7c673cae FG |
1620 | const io_queue op_queue; |
1621 | const unsigned int op_prio_cutoff; | |
1622 | ||
1623 | /* | |
1624 | * The ordered op delivery chain is: | |
1625 | * | |
1626 | * fast dispatch -> pqueue back | |
1627 | * pqueue front <-> to_process back | |
1628 | * to_process front -> RunVis(item) | |
1629 | * <- queue_front() | |
1630 | * | |
1631 | * The pqueue is per-shard, and to_process is per pg_slot. Items can be | |
1632 | * pushed back up into to_process and/or pqueue while order is preserved. | |
1633 | * | |
1634 | * Multiple worker threads can operate on each shard. | |
1635 | * | |
1636 | * Under normal circumstances, num_running == to_proces.size(). There are | |
1637 | * two times when that is not true: (1) when waiting_for_pg == true and | |
1638 | * to_process is accumulating requests that are waiting for the pg to be | |
1639 | * instantiated; in that case they will all get requeued together by | |
1640 | * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg | |
1641 | * and already requeued the items. | |
1642 | */ | |
1643 | friend class PGQueueable; | |
224ce89b | 1644 | |
7c673cae FG |
1645 | class ShardedOpWQ |
1646 | : public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>> | |
1647 | { | |
1648 | struct ShardData { | |
1649 | Mutex sdata_lock; | |
1650 | Cond sdata_cond; | |
1651 | ||
1652 | Mutex sdata_op_ordering_lock; ///< protects all members below | |
1653 | ||
1654 | OSDMapRef waiting_for_pg_osdmap; | |
1655 | struct pg_slot { | |
1656 | PGRef pg; ///< cached pg reference [optional] | |
1657 | list<PGQueueable> to_process; ///< order items for this slot | |
1658 | int num_running = 0; ///< _process threads doing pg lookup/lock | |
1659 | ||
1660 | /// true if pg does/did not exist. if so all new items go directly to | |
1661 | /// to_process. cleared by prune_pg_waiters. | |
1662 | bool waiting_for_pg = false; | |
1663 | ||
1664 | /// incremented by wake_pg_waiters; indicates racing _process threads | |
1665 | /// should bail out (their op has been requeued) | |
1666 | uint64_t requeue_seq = 0; | |
1667 | }; | |
1668 | ||
1669 | /// map of slots for each spg_t. maintains ordering of items dequeued | |
1670 | /// from pqueue while _process thread drops shard lock to acquire the | |
1671 | /// pg lock. slots are removed only by prune_pg_waiters. | |
1672 | unordered_map<spg_t,pg_slot> pg_slots; | |
1673 | ||
1674 | /// priority queue | |
1675 | std::unique_ptr<OpQueue< pair<spg_t, PGQueueable>, entity_inst_t>> pqueue; | |
1676 | ||
1677 | void _enqueue_front(pair<spg_t, PGQueueable> item, unsigned cutoff) { | |
1678 | unsigned priority = item.second.get_priority(); | |
1679 | unsigned cost = item.second.get_cost(); | |
1680 | if (priority >= cutoff) | |
1681 | pqueue->enqueue_strict_front( | |
1682 | item.second.get_owner(), | |
1683 | priority, item); | |
1684 | else | |
1685 | pqueue->enqueue_front( | |
1686 | item.second.get_owner(), | |
1687 | priority, cost, item); | |
1688 | } | |
1689 | ||
1690 | ShardData( | |
1691 | string lock_name, string ordering_lock, | |
1692 | uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct, | |
1693 | io_queue opqueue) | |
1694 | : sdata_lock(lock_name.c_str(), false, true, false, cct), | |
1695 | sdata_op_ordering_lock(ordering_lock.c_str(), false, true, | |
1696 | false, cct) { | |
224ce89b | 1697 | if (opqueue == io_queue::weightedpriority) { |
7c673cae FG |
1698 | pqueue = std::unique_ptr |
1699 | <WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>( | |
1700 | new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>( | |
1701 | max_tok_per_prio, min_cost)); | |
224ce89b | 1702 | } else if (opqueue == io_queue::prioritized) { |
7c673cae FG |
1703 | pqueue = std::unique_ptr |
1704 | <PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>( | |
1705 | new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>( | |
1706 | max_tok_per_prio, min_cost)); | |
224ce89b WB |
1707 | } else if (opqueue == io_queue::mclock_opclass) { |
1708 | pqueue = std::unique_ptr | |
1709 | <ceph::mClockOpClassQueue>(new ceph::mClockOpClassQueue(cct)); | |
1710 | } else if (opqueue == io_queue::mclock_client) { | |
1711 | pqueue = std::unique_ptr | |
1712 | <ceph::mClockClientQueue>(new ceph::mClockClientQueue(cct)); | |
7c673cae FG |
1713 | } |
1714 | } | |
224ce89b | 1715 | }; // struct ShardData |
7c673cae FG |
1716 | |
1717 | vector<ShardData*> shard_list; | |
1718 | OSD *osd; | |
1719 | uint32_t num_shards; | |
1720 | ||
1721 | public: | |
1722 | ShardedOpWQ(uint32_t pnum_shards, | |
1723 | OSD *o, | |
1724 | time_t ti, | |
1725 | time_t si, | |
1726 | ShardedThreadPool* tp) | |
1727 | : ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>(ti, si, tp), | |
1728 | osd(o), | |
1729 | num_shards(pnum_shards) { | |
1730 | for (uint32_t i = 0; i < num_shards; i++) { | |
1731 | char lock_name[32] = {0}; | |
1732 | snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i); | |
1733 | char order_lock[32] = {0}; | |
1734 | snprintf(order_lock, sizeof(order_lock), "%s.%d", | |
1735 | "OSD:ShardedOpWQ:order:", i); | |
1736 | ShardData* one_shard = new ShardData( | |
1737 | lock_name, order_lock, | |
1738 | osd->cct->_conf->osd_op_pq_max_tokens_per_priority, | |
1739 | osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue); | |
1740 | shard_list.push_back(one_shard); | |
1741 | } | |
1742 | } | |
1743 | ~ShardedOpWQ() override { | |
1744 | while (!shard_list.empty()) { | |
1745 | delete shard_list.back(); | |
1746 | shard_list.pop_back(); | |
1747 | } | |
1748 | } | |
1749 | ||
1750 | /// wake any pg waiters after a PG is created/instantiated | |
1751 | void wake_pg_waiters(spg_t pgid); | |
1752 | ||
1753 | /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here | |
1754 | void prune_pg_waiters(OSDMapRef osdmap, int whoami); | |
1755 | ||
1756 | /// clear cached PGRef on pg deletion | |
1757 | void clear_pg_pointer(spg_t pgid); | |
1758 | ||
1759 | /// clear pg_slots on shutdown | |
1760 | void clear_pg_slots(); | |
1761 | ||
1762 | /// try to do some work | |
1763 | void _process(uint32_t thread_index, heartbeat_handle_d *hb) override; | |
1764 | ||
1765 | /// enqueue a new item | |
1766 | void _enqueue(pair <spg_t, PGQueueable> item) override; | |
1767 | ||
1768 | /// requeue an old item (at the front of the line) | |
1769 | void _enqueue_front(pair <spg_t, PGQueueable> item) override; | |
1770 | ||
1771 | void return_waiting_threads() override { | |
1772 | for(uint32_t i = 0; i < num_shards; i++) { | |
1773 | ShardData* sdata = shard_list[i]; | |
1774 | assert (NULL != sdata); | |
1775 | sdata->sdata_lock.Lock(); | |
1776 | sdata->sdata_cond.Signal(); | |
1777 | sdata->sdata_lock.Unlock(); | |
1778 | } | |
1779 | } | |
1780 | ||
1781 | void dump(Formatter *f) { | |
1782 | for(uint32_t i = 0; i < num_shards; i++) { | |
1783 | ShardData* sdata = shard_list[i]; | |
1784 | char lock_name[32] = {0}; | |
1785 | snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i); | |
1786 | assert (NULL != sdata); | |
1787 | sdata->sdata_op_ordering_lock.Lock(); | |
1788 | f->open_object_section(lock_name); | |
1789 | sdata->pqueue->dump(f); | |
1790 | f->close_section(); | |
1791 | sdata->sdata_op_ordering_lock.Unlock(); | |
1792 | } | |
1793 | } | |
1794 | ||
1795 | /// Must be called on ops queued back to front | |
1796 | struct Pred { | |
1797 | spg_t pgid; | |
1798 | list<OpRequestRef> *out_ops; | |
1799 | uint64_t reserved_pushes_to_free; | |
1800 | Pred(spg_t pg, list<OpRequestRef> *out_ops = 0) | |
1801 | : pgid(pg), out_ops(out_ops), reserved_pushes_to_free(0) {} | |
1802 | void accumulate(const PGQueueable &op) { | |
1803 | reserved_pushes_to_free += op.get_reserved_pushes(); | |
1804 | if (out_ops) { | |
1805 | boost::optional<OpRequestRef> mop = op.maybe_get_op(); | |
1806 | if (mop) | |
1807 | out_ops->push_front(*mop); | |
1808 | } | |
1809 | } | |
1810 | bool operator()(const pair<spg_t, PGQueueable> &op) { | |
1811 | if (op.first == pgid) { | |
1812 | accumulate(op.second); | |
1813 | return true; | |
1814 | } else { | |
1815 | return false; | |
1816 | } | |
1817 | } | |
1818 | uint64_t get_reserved_pushes_to_free() const { | |
1819 | return reserved_pushes_to_free; | |
1820 | } | |
1821 | }; | |
1822 | ||
1823 | bool is_shard_empty(uint32_t thread_index) override { | |
1824 | uint32_t shard_index = thread_index % num_shards; | |
1825 | ShardData* sdata = shard_list[shard_index]; | |
1826 | assert(NULL != sdata); | |
1827 | Mutex::Locker l(sdata->sdata_op_ordering_lock); | |
1828 | return sdata->pqueue->empty(); | |
1829 | } | |
1830 | } op_shardedwq; | |
1831 | ||
1832 | ||
1833 | void enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch); | |
1834 | void dequeue_op( | |
1835 | PGRef pg, OpRequestRef op, | |
1836 | ThreadPool::TPHandle &handle); | |
1837 | ||
1838 | // -- peering queue -- | |
1839 | struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> { | |
1840 | list<PG*> peering_queue; | |
1841 | OSD *osd; | |
1842 | set<PG*> in_use; | |
1843 | PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp) | |
1844 | : ThreadPool::BatchWorkQueue<PG>( | |
1845 | "OSD::PeeringWQ", ti, si, tp), osd(o) {} | |
1846 | ||
1847 | void _dequeue(PG *pg) override { | |
1848 | for (list<PG*>::iterator i = peering_queue.begin(); | |
1849 | i != peering_queue.end(); | |
1850 | ) { | |
1851 | if (*i == pg) { | |
1852 | peering_queue.erase(i++); | |
1853 | pg->put("PeeringWQ"); | |
1854 | } else { | |
1855 | ++i; | |
1856 | } | |
1857 | } | |
1858 | } | |
1859 | bool _enqueue(PG *pg) override { | |
1860 | pg->get("PeeringWQ"); | |
1861 | peering_queue.push_back(pg); | |
1862 | return true; | |
1863 | } | |
1864 | bool _empty() override { | |
1865 | return peering_queue.empty(); | |
1866 | } | |
1867 | void _dequeue(list<PG*> *out) override; | |
1868 | void _process( | |
1869 | const list<PG *> &pgs, | |
1870 | ThreadPool::TPHandle &handle) override { | |
1871 | assert(!pgs.empty()); | |
1872 | osd->process_peering_events(pgs, handle); | |
1873 | for (list<PG *>::const_iterator i = pgs.begin(); | |
1874 | i != pgs.end(); | |
1875 | ++i) { | |
1876 | (*i)->put("PeeringWQ"); | |
1877 | } | |
1878 | } | |
1879 | void _process_finish(const list<PG *> &pgs) override { | |
1880 | for (list<PG*>::const_iterator i = pgs.begin(); | |
1881 | i != pgs.end(); | |
1882 | ++i) { | |
1883 | in_use.erase(*i); | |
1884 | } | |
1885 | } | |
1886 | void _clear() override { | |
1887 | assert(peering_queue.empty()); | |
1888 | } | |
1889 | } peering_wq; | |
1890 | ||
1891 | void process_peering_events( | |
1892 | const list<PG*> &pg, | |
1893 | ThreadPool::TPHandle &handle); | |
1894 | ||
1895 | friend class PG; | |
1896 | friend class PrimaryLogPG; | |
1897 | ||
1898 | ||
1899 | protected: | |
1900 | ||
1901 | // -- osd map -- | |
1902 | OSDMapRef osdmap; | |
1903 | OSDMapRef get_osdmap() { | |
1904 | return osdmap; | |
1905 | } | |
224ce89b | 1906 | epoch_t get_osdmap_epoch() const { |
7c673cae FG |
1907 | return osdmap ? osdmap->get_epoch() : 0; |
1908 | } | |
1909 | ||
1910 | utime_t had_map_since; | |
1911 | RWLock map_lock; | |
1912 | list<OpRequestRef> waiting_for_osdmap; | |
1913 | deque<utime_t> osd_markdown_log; | |
1914 | ||
1915 | friend struct send_map_on_destruct; | |
1916 | ||
1917 | void wait_for_new_map(OpRequestRef op); | |
1918 | void handle_osd_map(class MOSDMap *m); | |
1919 | void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m); | |
1920 | void trim_maps(epoch_t oldest, int nreceived, bool skip_maps); | |
1921 | void note_down_osd(int osd); | |
1922 | void note_up_osd(int osd); | |
1923 | friend class C_OnMapCommit; | |
1924 | ||
1925 | bool advance_pg( | |
1926 | epoch_t advance_to, PG *pg, | |
1927 | ThreadPool::TPHandle &handle, | |
1928 | PG::RecoveryCtx *rctx, | |
31f18b77 | 1929 | set<PGRef> *split_pgs |
7c673cae FG |
1930 | ); |
1931 | void consume_map(); | |
1932 | void activate_map(); | |
1933 | ||
1934 | // osd map cache (past osd maps) | |
1935 | OSDMapRef get_map(epoch_t e) { | |
1936 | return service.get_map(e); | |
1937 | } | |
1938 | OSDMapRef add_map(OSDMap *o) { | |
1939 | return service.add_map(o); | |
1940 | } | |
1941 | void add_map_bl(epoch_t e, bufferlist& bl) { | |
1942 | return service.add_map_bl(e, bl); | |
1943 | } | |
1944 | void pin_map_bl(epoch_t e, bufferlist &bl) { | |
1945 | return service.pin_map_bl(e, bl); | |
1946 | } | |
1947 | bool get_map_bl(epoch_t e, bufferlist& bl) { | |
1948 | return service.get_map_bl(e, bl); | |
1949 | } | |
1950 | void add_map_inc_bl(epoch_t e, bufferlist& bl) { | |
1951 | return service.add_map_inc_bl(e, bl); | |
1952 | } | |
1953 | void pin_map_inc_bl(epoch_t e, bufferlist &bl) { | |
1954 | return service.pin_map_inc_bl(e, bl); | |
1955 | } | |
1956 | ||
1957 | protected: | |
1958 | // -- placement groups -- | |
1959 | RWLock pg_map_lock; // this lock orders *above* individual PG _locks | |
1960 | ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock | |
1961 | ||
3efd9988 | 1962 | std::mutex pending_creates_lock; |
b32b8144 FG |
1963 | using create_from_osd_t = std::pair<pg_t, bool /* is primary*/>; |
1964 | std::set<create_from_osd_t> pending_creates_from_osd; | |
3efd9988 FG |
1965 | unsigned pending_creates_from_mon = 0; |
1966 | ||
7c673cae FG |
1967 | map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split; |
1968 | PGRecoveryStats pg_recovery_stats; | |
1969 | ||
1970 | PGPool _get_pool(int id, OSDMapRef createmap); | |
1971 | ||
1972 | PG *_lookup_lock_pg_with_map_lock_held(spg_t pgid); | |
1973 | PG *_lookup_lock_pg(spg_t pgid); | |
31f18b77 FG |
1974 | |
1975 | public: | |
1976 | PG *lookup_lock_pg(spg_t pgid); | |
1977 | ||
35e4c445 FG |
1978 | int get_num_pgs() { |
1979 | RWLock::RLocker l(pg_map_lock); | |
1980 | return pg_map.size(); | |
1981 | } | |
1982 | ||
31f18b77 | 1983 | protected: |
7c673cae FG |
1984 | PG *_open_lock_pg(OSDMapRef createmap, |
1985 | spg_t pg, bool no_lockdep_check=false); | |
1986 | enum res_result { | |
1987 | RES_PARENT, // resurrected a parent | |
1988 | RES_SELF, // resurrected self | |
1989 | RES_NONE // nothing relevant deleting | |
1990 | }; | |
1991 | res_result _try_resurrect_pg( | |
1992 | OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state); | |
1993 | ||
1994 | PG *_create_lock_pg( | |
1995 | OSDMapRef createmap, | |
1996 | spg_t pgid, | |
1997 | bool hold_map_lock, | |
1998 | bool backfill, | |
1999 | int role, | |
2000 | vector<int>& up, int up_primary, | |
2001 | vector<int>& acting, int acting_primary, | |
2002 | pg_history_t history, | |
2003 | const PastIntervals& pi, | |
2004 | ObjectStore::Transaction& t); | |
2005 | ||
2006 | PG* _make_pg(OSDMapRef createmap, spg_t pgid); | |
2007 | void add_newly_split_pg(PG *pg, | |
2008 | PG::RecoveryCtx *rctx); | |
2009 | ||
2010 | int handle_pg_peering_evt( | |
2011 | spg_t pgid, | |
2012 | const pg_history_t& orig_history, | |
2013 | const PastIntervals& pi, | |
2014 | epoch_t epoch, | |
2015 | PG::CephPeeringEvtRef evt); | |
3efd9988 FG |
2016 | bool maybe_wait_for_max_pg(spg_t pgid, bool is_mon_create); |
2017 | void resume_creating_pg(); | |
2018 | ||
7c673cae FG |
2019 | void load_pgs(); |
2020 | void build_past_intervals_parallel(); | |
2021 | ||
2022 | /// build initial pg history and intervals on create | |
2023 | void build_initial_pg_history( | |
2024 | spg_t pgid, | |
2025 | epoch_t created, | |
2026 | utime_t created_stamp, | |
2027 | pg_history_t *h, | |
2028 | PastIntervals *pi); | |
2029 | ||
2030 | /// project pg history from from to now | |
2031 | bool project_pg_history( | |
2032 | spg_t pgid, pg_history_t& h, epoch_t from, | |
2033 | const vector<int>& lastup, | |
2034 | int lastupprimary, | |
2035 | const vector<int>& lastacting, | |
2036 | int lastactingprimary | |
2037 | ); ///< @return false if there was a map gap between from and now | |
2038 | ||
2039 | // this must be called with pg->lock held on any pg addition to pg_map | |
2040 | void wake_pg_waiters(PGRef pg) { | |
2041 | assert(pg->is_locked()); | |
2042 | op_shardedwq.wake_pg_waiters(pg->info.pgid); | |
2043 | } | |
2044 | epoch_t last_pg_create_epoch; | |
2045 | ||
2046 | void handle_pg_create(OpRequestRef op); | |
2047 | ||
2048 | void split_pgs( | |
2049 | PG *parent, | |
31f18b77 | 2050 | const set<spg_t> &childpgids, set<PGRef> *out_pgs, |
7c673cae FG |
2051 | OSDMapRef curmap, |
2052 | OSDMapRef nextmap, | |
2053 | PG::RecoveryCtx *rctx); | |
2054 | ||
2055 | // == monitor interaction == | |
2056 | Mutex mon_report_lock; | |
2057 | utime_t last_mon_report; | |
2058 | utime_t last_pg_stats_sent; | |
2059 | ||
2060 | /* if our monitor dies, we want to notice it and reconnect. | |
2061 | * So we keep track of when it last acked our stat updates, | |
2062 | * and if too much time passes (and we've been sending | |
2063 | * more updates) then we can call it dead and reconnect | |
2064 | * elsewhere. | |
2065 | */ | |
2066 | utime_t last_pg_stats_ack; | |
2067 | float stats_ack_timeout; | |
2068 | set<uint64_t> outstanding_pg_stats; // how many stat updates haven't been acked yet | |
2069 | ||
2070 | // -- boot -- | |
2071 | void start_boot(); | |
2072 | void _got_mon_epochs(epoch_t oldest, epoch_t newest); | |
2073 | void _preboot(epoch_t oldest, epoch_t newest); | |
2074 | void _send_boot(); | |
2075 | void _collect_metadata(map<string,string> *pmeta); | |
2076 | ||
2077 | void start_waiting_for_healthy(); | |
2078 | bool _is_healthy(); | |
2079 | ||
2080 | void send_full_update(); | |
2081 | ||
2082 | friend struct C_OSD_GetVersion; | |
2083 | ||
2084 | // -- alive -- | |
2085 | epoch_t up_thru_wanted; | |
2086 | ||
2087 | void queue_want_up_thru(epoch_t want); | |
2088 | void send_alive(); | |
2089 | ||
2090 | // -- full map requests -- | |
2091 | epoch_t requested_full_first, requested_full_last; | |
2092 | ||
2093 | void request_full_map(epoch_t first, epoch_t last); | |
2094 | void rerequest_full_maps() { | |
2095 | epoch_t first = requested_full_first; | |
2096 | epoch_t last = requested_full_last; | |
2097 | requested_full_first = 0; | |
2098 | requested_full_last = 0; | |
2099 | request_full_map(first, last); | |
2100 | } | |
2101 | void got_full_map(epoch_t e); | |
2102 | ||
2103 | // -- failures -- | |
2104 | map<int,utime_t> failure_queue; | |
2105 | map<int,pair<utime_t,entity_inst_t> > failure_pending; | |
2106 | ||
2107 | void requeue_failures(); | |
2108 | void send_failures(); | |
2109 | void send_still_alive(epoch_t epoch, const entity_inst_t &i); | |
2110 | ||
2111 | // -- pg stats -- | |
2112 | Mutex pg_stat_queue_lock; | |
2113 | Cond pg_stat_queue_cond; | |
2114 | xlist<PG*> pg_stat_queue; | |
2115 | bool osd_stat_updated; | |
2116 | uint64_t pg_stat_tid, pg_stat_tid_flushed; | |
2117 | ||
2118 | void send_pg_stats(const utime_t &now); | |
2119 | void handle_pg_stats_ack(class MPGStatsAck *ack); | |
2120 | void flush_pg_stats(); | |
2121 | ||
2122 | ceph::coarse_mono_clock::time_point last_sent_beacon; | |
2123 | Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"}; | |
2124 | epoch_t min_last_epoch_clean = 0; | |
2125 | // which pgs were scanned for min_lec | |
2126 | std::vector<pg_t> min_last_epoch_clean_pgs; | |
2127 | void send_beacon(const ceph::coarse_mono_clock::time_point& now); | |
2128 | ||
2129 | void pg_stat_queue_enqueue(PG *pg) { | |
2130 | pg_stat_queue_lock.Lock(); | |
2131 | if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) { | |
2132 | pg->get("pg_stat_queue"); | |
2133 | pg_stat_queue.push_back(&pg->stat_queue_item); | |
2134 | } | |
2135 | osd_stat_updated = true; | |
2136 | pg_stat_queue_lock.Unlock(); | |
2137 | } | |
2138 | void pg_stat_queue_dequeue(PG *pg) { | |
2139 | pg_stat_queue_lock.Lock(); | |
2140 | if (pg->stat_queue_item.remove_myself()) | |
2141 | pg->put("pg_stat_queue"); | |
2142 | pg_stat_queue_lock.Unlock(); | |
2143 | } | |
2144 | void clear_pg_stat_queue() { | |
2145 | pg_stat_queue_lock.Lock(); | |
2146 | while (!pg_stat_queue.empty()) { | |
2147 | PG *pg = pg_stat_queue.front(); | |
2148 | pg_stat_queue.pop_front(); | |
2149 | pg->put("pg_stat_queue"); | |
2150 | } | |
2151 | pg_stat_queue_lock.Unlock(); | |
2152 | } | |
224ce89b WB |
2153 | void clear_outstanding_pg_stats(){ |
2154 | Mutex::Locker l(pg_stat_queue_lock); | |
2155 | outstanding_pg_stats.clear(); | |
2156 | } | |
7c673cae FG |
2157 | |
2158 | ceph_tid_t get_tid() { | |
2159 | return service.get_tid(); | |
2160 | } | |
2161 | ||
2162 | // -- generic pg peering -- | |
2163 | PG::RecoveryCtx create_context(); | |
2164 | void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap, | |
2165 | ThreadPool::TPHandle *handle = NULL); | |
2166 | void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg, | |
2167 | ThreadPool::TPHandle *handle = NULL); | |
2168 | void do_notifies(map<int, | |
2169 | vector<pair<pg_notify_t, PastIntervals> > >& | |
2170 | notify_list, | |
2171 | OSDMapRef map); | |
2172 | void do_queries(map<int, map<spg_t,pg_query_t> >& query_map, | |
2173 | OSDMapRef map); | |
2174 | void do_infos(map<int, | |
2175 | vector<pair<pg_notify_t, PastIntervals> > >& info_map, | |
2176 | OSDMapRef map); | |
2177 | ||
2178 | bool require_mon_peer(const Message *m); | |
2179 | bool require_mon_or_mgr_peer(const Message *m); | |
2180 | bool require_osd_peer(const Message *m); | |
2181 | /*** | |
2182 | * Verifies that we were alive in the given epoch, and that | |
2183 | * still are. | |
2184 | */ | |
2185 | bool require_self_aliveness(const Message *m, epoch_t alive_since); | |
2186 | /** | |
2187 | * Verifies that the OSD who sent the given op has the same | |
2188 | * address as in the given map. | |
2189 | * @pre op was sent by an OSD using the cluster messenger | |
2190 | */ | |
2191 | bool require_same_peer_instance(const Message *m, OSDMapRef& map, | |
2192 | bool is_fast_dispatch); | |
2193 | ||
2194 | bool require_same_or_newer_map(OpRequestRef& op, epoch_t e, | |
2195 | bool is_fast_dispatch); | |
2196 | ||
2197 | void handle_pg_query(OpRequestRef op); | |
2198 | void handle_pg_notify(OpRequestRef op); | |
2199 | void handle_pg_log(OpRequestRef op); | |
2200 | void handle_pg_info(OpRequestRef op); | |
2201 | void handle_pg_trim(OpRequestRef op); | |
2202 | ||
2203 | void handle_pg_backfill_reserve(OpRequestRef op); | |
2204 | void handle_pg_recovery_reserve(OpRequestRef op); | |
2205 | ||
c07f9fc5 FG |
2206 | void handle_force_recovery(Message *m); |
2207 | ||
7c673cae FG |
2208 | void handle_pg_remove(OpRequestRef op); |
2209 | void _remove_pg(PG *pg); | |
2210 | ||
2211 | // -- commands -- | |
2212 | struct Command { | |
2213 | vector<string> cmd; | |
2214 | ceph_tid_t tid; | |
2215 | bufferlist indata; | |
2216 | ConnectionRef con; | |
2217 | ||
2218 | Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co) | |
2219 | : cmd(c), tid(t), indata(bl), con(co) {} | |
2220 | }; | |
2221 | list<Command*> command_queue; | |
2222 | struct CommandWQ : public ThreadPool::WorkQueue<Command> { | |
2223 | OSD *osd; | |
2224 | CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp) | |
2225 | : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {} | |
2226 | ||
2227 | bool _empty() override { | |
2228 | return osd->command_queue.empty(); | |
2229 | } | |
2230 | bool _enqueue(Command *c) override { | |
2231 | osd->command_queue.push_back(c); | |
2232 | return true; | |
2233 | } | |
2234 | void _dequeue(Command *pg) override { | |
2235 | ceph_abort(); | |
2236 | } | |
2237 | Command *_dequeue() override { | |
2238 | if (osd->command_queue.empty()) | |
2239 | return NULL; | |
2240 | Command *c = osd->command_queue.front(); | |
2241 | osd->command_queue.pop_front(); | |
2242 | return c; | |
2243 | } | |
2244 | void _process(Command *c, ThreadPool::TPHandle &) override { | |
2245 | osd->osd_lock.Lock(); | |
2246 | if (osd->is_stopping()) { | |
2247 | osd->osd_lock.Unlock(); | |
2248 | delete c; | |
2249 | return; | |
2250 | } | |
2251 | osd->do_command(c->con.get(), c->tid, c->cmd, c->indata); | |
2252 | osd->osd_lock.Unlock(); | |
2253 | delete c; | |
2254 | } | |
2255 | void _clear() override { | |
2256 | while (!osd->command_queue.empty()) { | |
2257 | Command *c = osd->command_queue.front(); | |
2258 | osd->command_queue.pop_front(); | |
2259 | delete c; | |
2260 | } | |
2261 | } | |
2262 | } command_wq; | |
2263 | ||
2264 | void handle_command(class MMonCommand *m); | |
2265 | void handle_command(class MCommand *m); | |
2266 | void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data); | |
2267 | ||
2268 | // -- pg recovery -- | |
2269 | void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved, | |
2270 | ThreadPool::TPHandle &handle); | |
2271 | ||
2272 | ||
2273 | // -- scrubbing -- | |
2274 | void sched_scrub(); | |
2275 | bool scrub_random_backoff(); | |
2276 | bool scrub_load_below_threshold(); | |
2277 | bool scrub_time_permit(utime_t now); | |
2278 | ||
2279 | // -- removing -- | |
2280 | struct RemoveWQ : | |
2281 | public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > { | |
2282 | CephContext* cct; | |
2283 | ObjectStore *&store; | |
2284 | list<pair<PGRef, DeletingStateRef> > remove_queue; | |
2285 | RemoveWQ(CephContext* cct, ObjectStore *&o, time_t ti, time_t si, | |
2286 | ThreadPool *tp) | |
2287 | : ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> >( | |
2288 | "OSD::RemoveWQ", ti, si, tp), cct(cct), store(o) {} | |
2289 | ||
2290 | bool _empty() override { | |
2291 | return remove_queue.empty(); | |
2292 | } | |
2293 | void _enqueue(pair<PGRef, DeletingStateRef> item) override { | |
2294 | remove_queue.push_back(item); | |
2295 | } | |
2296 | void _enqueue_front(pair<PGRef, DeletingStateRef> item) override { | |
2297 | remove_queue.push_front(item); | |
2298 | } | |
2299 | bool _dequeue(pair<PGRef, DeletingStateRef> item) { | |
2300 | ceph_abort(); | |
2301 | } | |
2302 | pair<PGRef, DeletingStateRef> _dequeue() override { | |
2303 | assert(!remove_queue.empty()); | |
2304 | pair<PGRef, DeletingStateRef> item = remove_queue.front(); | |
2305 | remove_queue.pop_front(); | |
2306 | return item; | |
2307 | } | |
2308 | void _process(pair<PGRef, DeletingStateRef>, | |
2309 | ThreadPool::TPHandle &) override; | |
2310 | void _clear() override { | |
2311 | remove_queue.clear(); | |
2312 | } | |
94b18763 FG |
2313 | int get_remove_queue_len() { |
2314 | lock(); | |
2315 | int r = remove_queue.size(); | |
2316 | unlock(); | |
2317 | return r; | |
2318 | } | |
7c673cae FG |
2319 | } remove_wq; |
2320 | ||
b32b8144 FG |
2321 | // -- status reporting -- |
2322 | MPGStats *collect_pg_stats(); | |
2323 | std::vector<OSDHealthMetric> get_health_metrics(); | |
2324 | ||
224ce89b | 2325 | private: |
7c673cae FG |
2326 | bool ms_can_fast_dispatch_any() const override { return true; } |
2327 | bool ms_can_fast_dispatch(const Message *m) const override { | |
2328 | switch (m->get_type()) { | |
2329 | case CEPH_MSG_OSD_OP: | |
2330 | case CEPH_MSG_OSD_BACKOFF: | |
2331 | case MSG_OSD_SUBOP: | |
2332 | case MSG_OSD_REPOP: | |
2333 | case MSG_OSD_SUBOPREPLY: | |
2334 | case MSG_OSD_REPOPREPLY: | |
2335 | case MSG_OSD_PG_PUSH: | |
2336 | case MSG_OSD_PG_PULL: | |
2337 | case MSG_OSD_PG_PUSH_REPLY: | |
2338 | case MSG_OSD_PG_SCAN: | |
2339 | case MSG_OSD_PG_BACKFILL: | |
2340 | case MSG_OSD_PG_BACKFILL_REMOVE: | |
2341 | case MSG_OSD_EC_WRITE: | |
2342 | case MSG_OSD_EC_WRITE_REPLY: | |
2343 | case MSG_OSD_EC_READ: | |
2344 | case MSG_OSD_EC_READ_REPLY: | |
2345 | case MSG_OSD_SCRUB_RESERVE: | |
2346 | case MSG_OSD_REP_SCRUB: | |
2347 | case MSG_OSD_REP_SCRUBMAP: | |
2348 | case MSG_OSD_PG_UPDATE_LOG_MISSING: | |
2349 | case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: | |
c07f9fc5 FG |
2350 | case MSG_OSD_PG_RECOVERY_DELETE: |
2351 | case MSG_OSD_PG_RECOVERY_DELETE_REPLY: | |
7c673cae FG |
2352 | return true; |
2353 | default: | |
2354 | return false; | |
2355 | } | |
2356 | } | |
2357 | void ms_fast_dispatch(Message *m) override; | |
2358 | void ms_fast_preprocess(Message *m) override; | |
2359 | bool ms_dispatch(Message *m) override; | |
2360 | bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) override; | |
2361 | bool ms_verify_authorizer(Connection *con, int peer_type, | |
2362 | int protocol, bufferlist& authorizer, bufferlist& authorizer_reply, | |
28e407b8 AA |
2363 | bool& isvalid, CryptoKey& session_key, |
2364 | std::unique_ptr<AuthAuthorizerChallenge> *challenge) override; | |
7c673cae FG |
2365 | void ms_handle_connect(Connection *con) override; |
2366 | void ms_handle_fast_connect(Connection *con) override; | |
2367 | void ms_handle_fast_accept(Connection *con) override; | |
2368 | bool ms_handle_reset(Connection *con) override; | |
2369 | void ms_handle_remote_reset(Connection *con) override {} | |
2370 | bool ms_handle_refused(Connection *con) override; | |
2371 | ||
2372 | io_queue get_io_queue() const { | |
2373 | if (cct->_conf->osd_op_queue == "debug_random") { | |
224ce89b WB |
2374 | static io_queue index_lookup[] = { io_queue::prioritized, |
2375 | io_queue::weightedpriority, | |
2376 | io_queue::mclock_opclass, | |
2377 | io_queue::mclock_client }; | |
7c673cae | 2378 | srand(time(NULL)); |
224ce89b WB |
2379 | unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0])); |
2380 | return index_lookup[which]; | |
d2e6a577 FG |
2381 | } else if (cct->_conf->osd_op_queue == "prioritized") { |
2382 | return io_queue::prioritized; | |
224ce89b WB |
2383 | } else if (cct->_conf->osd_op_queue == "mclock_opclass") { |
2384 | return io_queue::mclock_opclass; | |
2385 | } else if (cct->_conf->osd_op_queue == "mclock_client") { | |
2386 | return io_queue::mclock_client; | |
7c673cae | 2387 | } else { |
d2e6a577 FG |
2388 | // default / catch-all is 'wpq' |
2389 | return io_queue::weightedpriority; | |
7c673cae FG |
2390 | } |
2391 | } | |
2392 | ||
2393 | unsigned int get_io_prio_cut() const { | |
2394 | if (cct->_conf->osd_op_queue_cut_off == "debug_random") { | |
2395 | srand(time(NULL)); | |
2396 | return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; | |
d2e6a577 | 2397 | } else if (cct->_conf->osd_op_queue_cut_off == "high") { |
7c673cae | 2398 | return CEPH_MSG_PRIO_HIGH; |
d2e6a577 FG |
2399 | } else { |
2400 | // default / catch-all is 'low' | |
2401 | return CEPH_MSG_PRIO_LOW; | |
7c673cae FG |
2402 | } |
2403 | } | |
2404 | ||
2405 | public: | |
2406 | /* internal and external can point to the same messenger, they will still | |
2407 | * be cleaned up properly*/ | |
2408 | OSD(CephContext *cct_, | |
2409 | ObjectStore *store_, | |
2410 | int id, | |
2411 | Messenger *internal, | |
2412 | Messenger *external, | |
2413 | Messenger *hb_front_client, | |
2414 | Messenger *hb_back_client, | |
2415 | Messenger *hb_front_server, | |
2416 | Messenger *hb_back_server, | |
2417 | Messenger *osdc_messenger, | |
2418 | MonClient *mc, const std::string &dev, const std::string &jdev); | |
2419 | ~OSD() override; | |
2420 | ||
2421 | // static bits | |
2422 | static int mkfs(CephContext *cct, ObjectStore *store, | |
2423 | const string& dev, | |
2424 | uuid_d fsid, int whoami); | |
2425 | /* remove any non-user xattrs from a map of them */ | |
2426 | void filter_xattrs(map<string, bufferptr>& attrs) { | |
2427 | for (map<string, bufferptr>::iterator iter = attrs.begin(); | |
2428 | iter != attrs.end(); | |
2429 | ) { | |
2430 | if (('_' != iter->first.at(0)) || (iter->first.size() == 1)) | |
2431 | attrs.erase(iter++); | |
2432 | else ++iter; | |
2433 | } | |
2434 | } | |
2435 | ||
2436 | private: | |
2437 | int mon_cmd_maybe_osd_create(string &cmd); | |
2438 | int update_crush_device_class(); | |
2439 | int update_crush_location(); | |
2440 | ||
3efd9988 FG |
2441 | static int write_meta(CephContext *cct, |
2442 | ObjectStore *store, | |
7c673cae FG |
2443 | uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami); |
2444 | ||
2445 | void handle_pg_scrub(struct MOSDScrub *m, PG* pg); | |
2446 | void handle_scrub(struct MOSDScrub *m); | |
2447 | void handle_osd_ping(class MOSDPing *m); | |
2448 | ||
2449 | int init_op_flags(OpRequestRef& op); | |
2450 | ||
31f18b77 FG |
2451 | int get_num_op_shards(); |
2452 | int get_num_op_threads(); | |
2453 | ||
c07f9fc5 FG |
2454 | float get_osd_recovery_sleep(); |
2455 | ||
7c673cae FG |
2456 | public: |
2457 | static int peek_meta(ObjectStore *store, string& magic, | |
2458 | uuid_d& cluster_fsid, uuid_d& osd_fsid, int& whoami); | |
2459 | ||
2460 | ||
2461 | // startup/shutdown | |
2462 | int pre_init(); | |
2463 | int init(); | |
2464 | void final_init(); | |
2465 | ||
2466 | int enable_disable_fuse(bool stop); | |
2467 | ||
2468 | void suicide(int exitcode); | |
2469 | int shutdown(); | |
2470 | ||
2471 | void handle_signal(int signum); | |
2472 | ||
2473 | /// check if we can throw out op from a disconnected client | |
2474 | static bool op_is_discardable(const MOSDOp *m); | |
2475 | ||
2476 | public: | |
2477 | OSDService service; | |
2478 | friend class OSDService; | |
2479 | }; | |
2480 | ||
224ce89b WB |
2481 | |
2482 | std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q); | |
2483 | ||
2484 | ||
7c673cae FG |
2485 | //compatibility of the executable |
2486 | extern const CompatSet::Feature ceph_osd_feature_compat[]; | |
2487 | extern const CompatSet::Feature ceph_osd_feature_ro_compat[]; | |
2488 | extern const CompatSet::Feature ceph_osd_feature_incompat[]; | |
2489 | ||
224ce89b | 2490 | #endif // CEPH_OSD_H |