]>
Commit | Line | Data |
---|---|---|
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- | |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef CEPH_OSD_H | |
16 | #define CEPH_OSD_H | |
17 | ||
18 | #include "PG.h" | |
19 | ||
20 | #include "msg/Dispatcher.h" | |
21 | ||
22 | #include "common/Mutex.h" | |
23 | #include "common/RWLock.h" | |
24 | #include "common/Timer.h" | |
25 | #include "common/WorkQueue.h" | |
26 | #include "common/AsyncReserver.h" | |
27 | #include "common/ceph_context.h" | |
28 | #include "common/zipkin_trace.h" | |
29 | ||
30 | #include "mgr/MgrClient.h" | |
31 | ||
32 | #include "os/ObjectStore.h" | |
33 | #include "OSDCap.h" | |
34 | ||
35 | #include "auth/KeyRing.h" | |
36 | #include "osd/ClassHandler.h" | |
37 | ||
38 | #include "include/CompatSet.h" | |
39 | ||
40 | #include "OpRequest.h" | |
41 | #include "Session.h" | |
42 | ||
43 | #include <atomic> | |
44 | #include <map> | |
45 | #include <memory> | |
46 | #include "include/memory.h" | |
47 | using namespace std; | |
48 | ||
49 | #include "include/unordered_map.h" | |
50 | ||
51 | #include "common/shared_cache.hpp" | |
52 | #include "common/simple_cache.hpp" | |
53 | #include "common/sharedptr_registry.hpp" | |
54 | #include "common/WeightedPriorityQueue.h" | |
55 | #include "common/PrioritizedQueue.h" | |
56 | #include "messages/MOSDOp.h" | |
57 | #include "include/Spinlock.h" | |
58 | #include "common/EventTrace.h" | |
59 | ||
60 | #define CEPH_OSD_PROTOCOL 10 /* cluster internal */ | |
61 | ||
62 | ||
63 | enum { | |
64 | l_osd_first = 10000, | |
65 | l_osd_op_wip, | |
66 | l_osd_op, | |
67 | l_osd_op_inb, | |
68 | l_osd_op_outb, | |
69 | l_osd_op_lat, | |
70 | l_osd_op_process_lat, | |
71 | l_osd_op_prepare_lat, | |
72 | l_osd_op_r, | |
73 | l_osd_op_r_outb, | |
74 | l_osd_op_r_lat, | |
75 | l_osd_op_r_lat_outb_hist, | |
76 | l_osd_op_r_process_lat, | |
77 | l_osd_op_r_prepare_lat, | |
78 | l_osd_op_w, | |
79 | l_osd_op_w_inb, | |
80 | l_osd_op_w_lat, | |
81 | l_osd_op_w_lat_inb_hist, | |
82 | l_osd_op_w_process_lat, | |
83 | l_osd_op_w_prepare_lat, | |
84 | l_osd_op_rw, | |
85 | l_osd_op_rw_inb, | |
86 | l_osd_op_rw_outb, | |
87 | l_osd_op_rw_lat, | |
88 | l_osd_op_rw_lat_inb_hist, | |
89 | l_osd_op_rw_lat_outb_hist, | |
90 | l_osd_op_rw_process_lat, | |
91 | l_osd_op_rw_prepare_lat, | |
92 | ||
93 | l_osd_sop, | |
94 | l_osd_sop_inb, | |
95 | l_osd_sop_lat, | |
96 | l_osd_sop_w, | |
97 | l_osd_sop_w_inb, | |
98 | l_osd_sop_w_lat, | |
99 | l_osd_sop_pull, | |
100 | l_osd_sop_pull_lat, | |
101 | l_osd_sop_push, | |
102 | l_osd_sop_push_inb, | |
103 | l_osd_sop_push_lat, | |
104 | ||
105 | l_osd_pull, | |
106 | l_osd_push, | |
107 | l_osd_push_outb, | |
108 | ||
109 | l_osd_rop, | |
110 | ||
111 | l_osd_loadavg, | |
112 | l_osd_buf, | |
113 | l_osd_history_alloc_bytes, | |
114 | l_osd_history_alloc_num, | |
115 | l_osd_cached_crc, | |
116 | l_osd_cached_crc_adjusted, | |
117 | l_osd_missed_crc, | |
118 | ||
119 | l_osd_pg, | |
120 | l_osd_pg_primary, | |
121 | l_osd_pg_replica, | |
122 | l_osd_pg_stray, | |
123 | l_osd_hb_to, | |
124 | l_osd_map, | |
125 | l_osd_mape, | |
126 | l_osd_mape_dup, | |
127 | ||
128 | l_osd_waiting_for_map, | |
129 | ||
130 | l_osd_map_cache_hit, | |
131 | l_osd_map_cache_miss, | |
132 | l_osd_map_cache_miss_low, | |
133 | l_osd_map_cache_miss_low_avg, | |
134 | ||
135 | l_osd_stat_bytes, | |
136 | l_osd_stat_bytes_used, | |
137 | l_osd_stat_bytes_avail, | |
138 | ||
139 | l_osd_copyfrom, | |
140 | ||
141 | l_osd_tier_promote, | |
142 | l_osd_tier_flush, | |
143 | l_osd_tier_flush_fail, | |
144 | l_osd_tier_try_flush, | |
145 | l_osd_tier_try_flush_fail, | |
146 | l_osd_tier_evict, | |
147 | l_osd_tier_whiteout, | |
148 | l_osd_tier_dirty, | |
149 | l_osd_tier_clean, | |
150 | l_osd_tier_delay, | |
151 | l_osd_tier_proxy_read, | |
152 | l_osd_tier_proxy_write, | |
153 | ||
154 | l_osd_agent_wake, | |
155 | l_osd_agent_skip, | |
156 | l_osd_agent_flush, | |
157 | l_osd_agent_evict, | |
158 | ||
159 | l_osd_object_ctx_cache_hit, | |
160 | l_osd_object_ctx_cache_total, | |
161 | ||
162 | l_osd_op_cache_hit, | |
163 | l_osd_tier_flush_lat, | |
164 | l_osd_tier_promote_lat, | |
165 | l_osd_tier_r_lat, | |
166 | ||
167 | l_osd_pg_info, | |
168 | l_osd_pg_fastinfo, | |
169 | l_osd_pg_biginfo, | |
170 | ||
171 | l_osd_last, | |
172 | }; | |
173 | ||
174 | // RecoveryState perf counters | |
175 | enum { | |
176 | rs_first = 20000, | |
177 | rs_initial_latency, | |
178 | rs_started_latency, | |
179 | rs_reset_latency, | |
180 | rs_start_latency, | |
181 | rs_primary_latency, | |
182 | rs_peering_latency, | |
183 | rs_backfilling_latency, | |
184 | rs_waitremotebackfillreserved_latency, | |
185 | rs_waitlocalbackfillreserved_latency, | |
186 | rs_notbackfilling_latency, | |
187 | rs_repnotrecovering_latency, | |
188 | rs_repwaitrecoveryreserved_latency, | |
189 | rs_repwaitbackfillreserved_latency, | |
190 | rs_reprecovering_latency, | |
191 | rs_activating_latency, | |
192 | rs_waitlocalrecoveryreserved_latency, | |
193 | rs_waitremoterecoveryreserved_latency, | |
194 | rs_recovering_latency, | |
195 | rs_recovered_latency, | |
196 | rs_clean_latency, | |
197 | rs_active_latency, | |
198 | rs_replicaactive_latency, | |
199 | rs_stray_latency, | |
200 | rs_getinfo_latency, | |
201 | rs_getlog_latency, | |
202 | rs_waitactingchange_latency, | |
203 | rs_incomplete_latency, | |
204 | rs_down_latency, | |
205 | rs_getmissing_latency, | |
206 | rs_waitupthru_latency, | |
207 | rs_notrecovering_latency, | |
208 | rs_last, | |
209 | }; | |
210 | ||
211 | class Messenger; | |
212 | class Message; | |
213 | class MonClient; | |
214 | class PerfCounters; | |
215 | class ObjectStore; | |
216 | class FuseStore; | |
217 | class OSDMap; | |
218 | class MLog; | |
219 | class Objecter; | |
220 | ||
221 | class Watch; | |
222 | class PrimaryLogPG; | |
223 | ||
224 | class AuthAuthorizeHandlerRegistry; | |
225 | ||
226 | class TestOpsSocketHook; | |
227 | struct C_CompleteSplits; | |
228 | struct C_OpenPGs; | |
229 | class LogChannel; | |
230 | class CephContext; | |
231 | typedef ceph::shared_ptr<ObjectStore::Sequencer> SequencerRef; | |
232 | class MOSDOp; | |
233 | ||
234 | class DeletingState { | |
235 | Mutex lock; | |
236 | Cond cond; | |
237 | enum { | |
238 | QUEUED, | |
239 | CLEARING_DIR, | |
240 | CLEARING_WAITING, | |
241 | DELETING_DIR, | |
242 | DELETED_DIR, | |
243 | CANCELED, | |
244 | } status; | |
245 | bool stop_deleting; | |
246 | public: | |
247 | const spg_t pgid; | |
248 | const PGRef old_pg_state; | |
249 | explicit DeletingState(const pair<spg_t, PGRef> &in) : | |
250 | lock("DeletingState::lock"), status(QUEUED), stop_deleting(false), | |
251 | pgid(in.first), old_pg_state(in.second) { | |
252 | } | |
253 | ||
254 | /// transition status to CLEARING_WAITING | |
255 | bool pause_clearing() { | |
256 | Mutex::Locker l(lock); | |
257 | assert(status == CLEARING_DIR); | |
258 | if (stop_deleting) { | |
259 | status = CANCELED; | |
260 | cond.Signal(); | |
261 | return false; | |
262 | } | |
263 | status = CLEARING_WAITING; | |
264 | return true; | |
265 | } ///< @return false if we should cancel deletion | |
266 | ||
267 | /// start or resume the clearing - transition the status to CLEARING_DIR | |
268 | bool start_or_resume_clearing() { | |
269 | Mutex::Locker l(lock); | |
270 | assert( | |
271 | status == QUEUED || | |
272 | status == DELETED_DIR || | |
273 | status == CLEARING_WAITING); | |
274 | if (stop_deleting) { | |
275 | status = CANCELED; | |
276 | cond.Signal(); | |
277 | return false; | |
278 | } | |
279 | status = CLEARING_DIR; | |
280 | return true; | |
281 | } ///< @return false if we should cancel the deletion | |
282 | ||
283 | /// transition status to CLEARING_DIR | |
284 | bool resume_clearing() { | |
285 | Mutex::Locker l(lock); | |
286 | assert(status == CLEARING_WAITING); | |
287 | if (stop_deleting) { | |
288 | status = CANCELED; | |
289 | cond.Signal(); | |
290 | return false; | |
291 | } | |
292 | status = CLEARING_DIR; | |
293 | return true; | |
294 | } ///< @return false if we should cancel deletion | |
295 | ||
296 | /// transition status to deleting | |
297 | bool start_deleting() { | |
298 | Mutex::Locker l(lock); | |
299 | assert(status == CLEARING_DIR); | |
300 | if (stop_deleting) { | |
301 | status = CANCELED; | |
302 | cond.Signal(); | |
303 | return false; | |
304 | } | |
305 | status = DELETING_DIR; | |
306 | return true; | |
307 | } ///< @return false if we should cancel deletion | |
308 | ||
309 | /// signal collection removal queued | |
310 | void finish_deleting() { | |
311 | Mutex::Locker l(lock); | |
312 | assert(status == DELETING_DIR); | |
313 | status = DELETED_DIR; | |
314 | cond.Signal(); | |
315 | } | |
316 | ||
317 | /// try to halt the deletion | |
318 | bool try_stop_deletion() { | |
319 | Mutex::Locker l(lock); | |
320 | stop_deleting = true; | |
321 | /** | |
322 | * If we are in DELETING_DIR or CLEARING_DIR, there are in progress | |
323 | * operations we have to wait for before continuing on. States | |
324 | * CLEARING_WAITING and QUEUED indicate that the remover will check | |
325 | * stop_deleting before queueing any further operations. CANCELED | |
326 | * indicates that the remover has already halted. DELETED_DIR | |
327 | * indicates that the deletion has been fully queued. | |
328 | */ | |
329 | while (status == DELETING_DIR || status == CLEARING_DIR) | |
330 | cond.Wait(lock); | |
331 | return status != DELETED_DIR; | |
332 | } ///< @return true if we don't need to recreate the collection | |
333 | }; | |
334 | typedef ceph::shared_ptr<DeletingState> DeletingStateRef; | |
335 | ||
336 | class OSD; | |
337 | ||
338 | struct PGScrub { | |
339 | epoch_t epoch_queued; | |
340 | explicit PGScrub(epoch_t e) : epoch_queued(e) {} | |
341 | ostream &operator<<(ostream &rhs) { | |
342 | return rhs << "PGScrub"; | |
343 | } | |
344 | }; | |
345 | ||
346 | struct PGSnapTrim { | |
347 | epoch_t epoch_queued; | |
348 | explicit PGSnapTrim(epoch_t e) : epoch_queued(e) {} | |
349 | ostream &operator<<(ostream &rhs) { | |
350 | return rhs << "PGSnapTrim"; | |
351 | } | |
352 | }; | |
353 | ||
354 | struct PGRecovery { | |
355 | epoch_t epoch_queued; | |
356 | uint64_t reserved_pushes; | |
357 | PGRecovery(epoch_t e, uint64_t reserved_pushes) | |
358 | : epoch_queued(e), reserved_pushes(reserved_pushes) {} | |
359 | ostream &operator<<(ostream &rhs) { | |
360 | return rhs << "PGRecovery(epoch=" << epoch_queued | |
361 | << ", reserved_pushes: " << reserved_pushes << ")"; | |
362 | } | |
363 | }; | |
364 | ||
365 | class PGQueueable { | |
366 | typedef boost::variant< | |
367 | OpRequestRef, | |
368 | PGSnapTrim, | |
369 | PGScrub, | |
370 | PGRecovery | |
371 | > QVariant; | |
372 | QVariant qvariant; | |
373 | int cost; | |
374 | unsigned priority; | |
375 | utime_t start_time; | |
376 | entity_inst_t owner; | |
377 | epoch_t map_epoch; ///< an epoch we expect the PG to exist in | |
378 | ||
379 | struct RunVis : public boost::static_visitor<> { | |
380 | OSD *osd; | |
381 | PGRef &pg; | |
382 | ThreadPool::TPHandle &handle; | |
383 | RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) | |
384 | : osd(osd), pg(pg), handle(handle) {} | |
385 | void operator()(const OpRequestRef &op); | |
386 | void operator()(const PGSnapTrim &op); | |
387 | void operator()(const PGScrub &op); | |
388 | void operator()(const PGRecovery &op); | |
389 | }; | |
390 | ||
391 | struct StringifyVis : public boost::static_visitor<std::string> { | |
392 | std::string operator()(const OpRequestRef &op) { | |
393 | return stringify(op); | |
394 | } | |
395 | std::string operator()(const PGSnapTrim &op) { | |
396 | return "PGSnapTrim"; | |
397 | } | |
398 | std::string operator()(const PGScrub &op) { | |
399 | return "PGScrub"; | |
400 | } | |
401 | std::string operator()(const PGRecovery &op) { | |
402 | return "PGRecovery"; | |
403 | } | |
404 | }; | |
405 | friend ostream& operator<<(ostream& out, const PGQueueable& q) { | |
406 | StringifyVis v; | |
407 | return out << "PGQueueable(" << boost::apply_visitor(v, q.qvariant) | |
408 | << " prio " << q.priority << " cost " << q.cost | |
409 | << " e" << q.map_epoch << ")"; | |
410 | } | |
411 | ||
412 | public: | |
413 | // cppcheck-suppress noExplicitConstructor | |
414 | PGQueueable(OpRequestRef op, epoch_t e) | |
415 | : qvariant(op), cost(op->get_req()->get_cost()), | |
416 | priority(op->get_req()->get_priority()), | |
417 | start_time(op->get_req()->get_recv_stamp()), | |
418 | owner(op->get_req()->get_source_inst()), | |
419 | map_epoch(e) | |
420 | {} | |
421 | PGQueueable( | |
422 | const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time, | |
423 | const entity_inst_t &owner, epoch_t e) | |
424 | : qvariant(op), cost(cost), priority(priority), start_time(start_time), | |
425 | owner(owner), map_epoch(e) {} | |
426 | PGQueueable( | |
427 | const PGScrub &op, int cost, unsigned priority, utime_t start_time, | |
428 | const entity_inst_t &owner, epoch_t e) | |
429 | : qvariant(op), cost(cost), priority(priority), start_time(start_time), | |
430 | owner(owner), map_epoch(e) {} | |
431 | PGQueueable( | |
432 | const PGRecovery &op, int cost, unsigned priority, utime_t start_time, | |
433 | const entity_inst_t &owner, epoch_t e) | |
434 | : qvariant(op), cost(cost), priority(priority), start_time(start_time), | |
435 | owner(owner), map_epoch(e) {} | |
436 | const boost::optional<OpRequestRef> maybe_get_op() const { | |
437 | const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant); | |
438 | return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>(); | |
439 | } | |
440 | uint64_t get_reserved_pushes() const { | |
441 | const PGRecovery *op = boost::get<PGRecovery>(&qvariant); | |
442 | return op ? op->reserved_pushes : 0; | |
443 | } | |
444 | void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) { | |
445 | RunVis v(osd, pg, handle); | |
446 | boost::apply_visitor(v, qvariant); | |
447 | } | |
448 | unsigned get_priority() const { return priority; } | |
449 | int get_cost() const { return cost; } | |
450 | utime_t get_start_time() const { return start_time; } | |
451 | entity_inst_t get_owner() const { return owner; } | |
452 | epoch_t get_map_epoch() const { return map_epoch; } | |
453 | }; | |
454 | ||
455 | class OSDService { | |
456 | public: | |
457 | OSD *osd; | |
458 | CephContext *cct; | |
459 | SharedPtrRegistry<spg_t, ObjectStore::Sequencer> osr_registry; | |
460 | ceph::shared_ptr<ObjectStore::Sequencer> meta_osr; | |
461 | SharedPtrRegistry<spg_t, DeletingState> deleting_pgs; | |
462 | const int whoami; | |
463 | ObjectStore *&store; | |
464 | LogClient &log_client; | |
465 | LogChannelRef clog; | |
466 | PGRecoveryStats &pg_recovery_stats; | |
467 | private: | |
468 | Messenger *&cluster_messenger; | |
469 | Messenger *&client_messenger; | |
470 | public: | |
471 | PerfCounters *&logger; | |
472 | PerfCounters *&recoverystate_perf; | |
473 | MonClient *&monc; | |
474 | ThreadPool::BatchWorkQueue<PG> &peering_wq; | |
475 | GenContextWQ recovery_gen_wq; | |
476 | ClassHandler *&class_handler; | |
477 | ||
478 | void enqueue_back(spg_t pgid, PGQueueable qi); | |
479 | void enqueue_front(spg_t pgid, PGQueueable qi); | |
480 | ||
481 | void maybe_inject_dispatch_delay() { | |
482 | if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) { | |
483 | if (rand() % 10000 < | |
484 | g_conf->osd_debug_inject_dispatch_delay_probability * 10000) { | |
485 | utime_t t; | |
486 | t.set_from_double(g_conf->osd_debug_inject_dispatch_delay_duration); | |
487 | t.sleep(); | |
488 | } | |
489 | } | |
490 | } | |
491 | ||
492 | private: | |
493 | // -- map epoch lower bound -- | |
494 | Mutex pg_epoch_lock; | |
495 | multiset<epoch_t> pg_epochs; | |
496 | map<spg_t,epoch_t> pg_epoch; | |
497 | ||
498 | public: | |
499 | void pg_add_epoch(spg_t pgid, epoch_t epoch) { | |
500 | Mutex::Locker l(pg_epoch_lock); | |
501 | map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid); | |
502 | assert(t == pg_epoch.end()); | |
503 | pg_epoch[pgid] = epoch; | |
504 | pg_epochs.insert(epoch); | |
505 | } | |
506 | void pg_update_epoch(spg_t pgid, epoch_t epoch) { | |
507 | Mutex::Locker l(pg_epoch_lock); | |
508 | map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid); | |
509 | assert(t != pg_epoch.end()); | |
510 | pg_epochs.erase(pg_epochs.find(t->second)); | |
511 | t->second = epoch; | |
512 | pg_epochs.insert(epoch); | |
513 | } | |
514 | void pg_remove_epoch(spg_t pgid) { | |
515 | Mutex::Locker l(pg_epoch_lock); | |
516 | map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid); | |
517 | if (t != pg_epoch.end()) { | |
518 | pg_epochs.erase(pg_epochs.find(t->second)); | |
519 | pg_epoch.erase(t); | |
520 | } | |
521 | } | |
522 | epoch_t get_min_pg_epoch() { | |
523 | Mutex::Locker l(pg_epoch_lock); | |
524 | if (pg_epochs.empty()) | |
525 | return 0; | |
526 | else | |
527 | return *pg_epochs.begin(); | |
528 | } | |
529 | ||
530 | private: | |
531 | // -- superblock -- | |
532 | Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish | |
533 | OSDSuperblock superblock; | |
534 | ||
535 | public: | |
536 | OSDSuperblock get_superblock() { | |
537 | Mutex::Locker l(publish_lock); | |
538 | return superblock; | |
539 | } | |
540 | void publish_superblock(const OSDSuperblock &block) { | |
541 | Mutex::Locker l(publish_lock); | |
542 | superblock = block; | |
543 | } | |
544 | ||
545 | int get_nodeid() const { return whoami; } | |
546 | ||
547 | std::atomic<epoch_t> max_oldest_map; | |
548 | private: | |
549 | OSDMapRef osdmap; | |
550 | ||
551 | public: | |
552 | OSDMapRef get_osdmap() { | |
553 | Mutex::Locker l(publish_lock); | |
554 | return osdmap; | |
555 | } | |
556 | epoch_t get_osdmap_epoch() { | |
557 | Mutex::Locker l(publish_lock); | |
558 | return osdmap ? osdmap->get_epoch() : 0; | |
559 | } | |
560 | void publish_map(OSDMapRef map) { | |
561 | Mutex::Locker l(publish_lock); | |
562 | osdmap = map; | |
563 | } | |
564 | ||
565 | /* | |
566 | * osdmap - current published map | |
567 | * next_osdmap - pre_published map that is about to be published. | |
568 | * | |
569 | * We use the next_osdmap to send messages and initiate connections, | |
570 | * but only if the target is the same instance as the one in the map | |
571 | * epoch the current user is working from (i.e., the result is | |
572 | * equivalent to what is in next_osdmap). | |
573 | * | |
574 | * This allows the helpers to start ignoring osds that are about to | |
575 | * go down, and let OSD::handle_osd_map()/note_down_osd() mark them | |
576 | * down, without worrying about reopening connections from threads | |
577 | * working from old maps. | |
578 | */ | |
579 | private: | |
580 | OSDMapRef next_osdmap; | |
581 | Cond pre_publish_cond; | |
582 | ||
583 | public: | |
584 | void pre_publish_map(OSDMapRef map) { | |
585 | Mutex::Locker l(pre_publish_lock); | |
586 | next_osdmap = std::move(map); | |
587 | } | |
588 | ||
589 | void activate_map(); | |
590 | /// map epochs reserved below | |
591 | map<epoch_t, unsigned> map_reservations; | |
592 | ||
593 | /// gets ref to next_osdmap and registers the epoch as reserved | |
594 | OSDMapRef get_nextmap_reserved() { | |
595 | Mutex::Locker l(pre_publish_lock); | |
596 | if (!next_osdmap) | |
597 | return OSDMapRef(); | |
598 | epoch_t e = next_osdmap->get_epoch(); | |
599 | map<epoch_t, unsigned>::iterator i = | |
600 | map_reservations.insert(make_pair(e, 0)).first; | |
601 | i->second++; | |
602 | return next_osdmap; | |
603 | } | |
604 | /// releases reservation on map | |
605 | void release_map(OSDMapRef osdmap) { | |
606 | Mutex::Locker l(pre_publish_lock); | |
607 | map<epoch_t, unsigned>::iterator i = | |
608 | map_reservations.find(osdmap->get_epoch()); | |
609 | assert(i != map_reservations.end()); | |
610 | assert(i->second > 0); | |
611 | if (--(i->second) == 0) { | |
612 | map_reservations.erase(i); | |
613 | } | |
614 | pre_publish_cond.Signal(); | |
615 | } | |
616 | /// blocks until there are no reserved maps prior to next_osdmap | |
617 | void await_reserved_maps() { | |
618 | Mutex::Locker l(pre_publish_lock); | |
619 | assert(next_osdmap); | |
620 | while (true) { | |
621 | map<epoch_t, unsigned>::const_iterator i = map_reservations.cbegin(); | |
622 | if (i == map_reservations.cend() || i->first >= next_osdmap->get_epoch()) { | |
623 | break; | |
624 | } else { | |
625 | pre_publish_cond.Wait(pre_publish_lock); | |
626 | } | |
627 | } | |
628 | } | |
629 | ||
630 | private: | |
631 | Mutex peer_map_epoch_lock; | |
632 | map<int, epoch_t> peer_map_epoch; | |
633 | public: | |
634 | epoch_t get_peer_epoch(int p); | |
635 | epoch_t note_peer_epoch(int p, epoch_t e); | |
636 | void forget_peer_epoch(int p, epoch_t e); | |
637 | ||
638 | void send_map(class MOSDMap *m, Connection *con); | |
639 | void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap); | |
640 | MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to, | |
641 | OSDSuperblock& superblock); | |
642 | bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch, | |
643 | const OSDMapRef& osdmap, const epoch_t *sent_epoch_p); | |
644 | void share_map(entity_name_t name, Connection *con, epoch_t epoch, | |
645 | OSDMapRef& osdmap, epoch_t *sent_epoch_p); | |
646 | void share_map_peer(int peer, Connection *con, | |
647 | OSDMapRef map = OSDMapRef()); | |
648 | ||
649 | ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch); | |
650 | pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front) | |
651 | void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch); | |
652 | void send_message_osd_cluster(Message *m, Connection *con) { | |
653 | con->send_message(m); | |
654 | } | |
655 | void send_message_osd_cluster(Message *m, const ConnectionRef& con) { | |
656 | con->send_message(m); | |
657 | } | |
658 | void send_message_osd_client(Message *m, Connection *con) { | |
659 | con->send_message(m); | |
660 | } | |
661 | void send_message_osd_client(Message *m, const ConnectionRef& con) { | |
662 | con->send_message(m); | |
663 | } | |
664 | entity_name_t get_cluster_msgr_name() { | |
665 | return cluster_messenger->get_myname(); | |
666 | } | |
667 | ||
668 | private: | |
669 | // -- scrub scheduling -- | |
670 | Mutex sched_scrub_lock; | |
671 | int scrubs_pending; | |
672 | int scrubs_active; | |
673 | ||
674 | public: | |
675 | struct ScrubJob { | |
676 | CephContext* cct; | |
677 | /// pg to be scrubbed | |
678 | spg_t pgid; | |
679 | /// a time scheduled for scrub. but the scrub could be delayed if system | |
680 | /// load is too high or it fails to fall in the scrub hours | |
681 | utime_t sched_time; | |
682 | /// the hard upper bound of scrub time | |
683 | utime_t deadline; | |
684 | ScrubJob() : cct(nullptr) {} | |
685 | explicit ScrubJob(CephContext* cct, const spg_t& pg, | |
686 | const utime_t& timestamp, | |
687 | double pool_scrub_min_interval = 0, | |
688 | double pool_scrub_max_interval = 0, bool must = true); | |
689 | /// order the jobs by sched_time | |
690 | bool operator<(const ScrubJob& rhs) const; | |
691 | }; | |
692 | set<ScrubJob> sched_scrub_pg; | |
693 | ||
694 | /// @returns the scrub_reg_stamp used for unregister the scrub job | |
695 | utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval, | |
696 | double pool_scrub_max_interval, bool must) { | |
697 | ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval, | |
698 | must); | |
699 | Mutex::Locker l(sched_scrub_lock); | |
700 | sched_scrub_pg.insert(scrub); | |
701 | return scrub.sched_time; | |
702 | } | |
703 | void unreg_pg_scrub(spg_t pgid, utime_t t) { | |
704 | Mutex::Locker l(sched_scrub_lock); | |
705 | size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t)); | |
706 | assert(removed); | |
707 | } | |
708 | bool first_scrub_stamp(ScrubJob *out) { | |
709 | Mutex::Locker l(sched_scrub_lock); | |
710 | if (sched_scrub_pg.empty()) | |
711 | return false; | |
712 | set<ScrubJob>::iterator iter = sched_scrub_pg.begin(); | |
713 | *out = *iter; | |
714 | return true; | |
715 | } | |
716 | bool next_scrub_stamp(const ScrubJob& next, | |
717 | ScrubJob *out) { | |
718 | Mutex::Locker l(sched_scrub_lock); | |
719 | if (sched_scrub_pg.empty()) | |
720 | return false; | |
721 | set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next); | |
722 | if (iter == sched_scrub_pg.cend()) | |
723 | return false; | |
724 | ++iter; | |
725 | if (iter == sched_scrub_pg.cend()) | |
726 | return false; | |
727 | *out = *iter; | |
728 | return true; | |
729 | } | |
730 | ||
731 | void dumps_scrub(Formatter *f) { | |
732 | assert(f != nullptr); | |
733 | Mutex::Locker l(sched_scrub_lock); | |
734 | ||
735 | f->open_array_section("scrubs"); | |
736 | for (const auto &i: sched_scrub_pg) { | |
737 | f->open_object_section("scrub"); | |
738 | f->dump_stream("pgid") << i.pgid; | |
739 | f->dump_stream("sched_time") << i.sched_time; | |
740 | f->dump_stream("deadline") << i.deadline; | |
741 | f->dump_bool("forced", i.sched_time == i.deadline); | |
742 | f->close_section(); | |
743 | } | |
744 | f->close_section(); | |
745 | } | |
746 | ||
747 | bool can_inc_scrubs_pending(); | |
748 | bool inc_scrubs_pending(); | |
749 | void inc_scrubs_active(bool reserved); | |
750 | void dec_scrubs_pending(); | |
751 | void dec_scrubs_active(); | |
752 | ||
753 | void reply_op_error(OpRequestRef op, int err); | |
754 | void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv); | |
755 | void handle_misdirected_op(PG *pg, OpRequestRef op); | |
756 | ||
757 | ||
758 | private: | |
759 | // -- agent shared state -- | |
760 | Mutex agent_lock; | |
761 | Cond agent_cond; | |
762 | map<uint64_t, set<PGRef> > agent_queue; | |
763 | set<PGRef>::iterator agent_queue_pos; | |
764 | bool agent_valid_iterator; | |
765 | int agent_ops; | |
766 | int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed | |
767 | set<hobject_t> agent_oids; | |
768 | bool agent_active; | |
769 | struct AgentThread : public Thread { | |
770 | OSDService *osd; | |
771 | explicit AgentThread(OSDService *o) : osd(o) {} | |
772 | void *entry() override { | |
773 | osd->agent_entry(); | |
774 | return NULL; | |
775 | } | |
776 | } agent_thread; | |
777 | bool agent_stop_flag; | |
778 | Mutex agent_timer_lock; | |
779 | SafeTimer agent_timer; | |
780 | ||
781 | public: | |
782 | void agent_entry(); | |
783 | void agent_stop(); | |
784 | ||
785 | void _enqueue(PG *pg, uint64_t priority) { | |
786 | if (!agent_queue.empty() && | |
787 | agent_queue.rbegin()->first < priority) | |
788 | agent_valid_iterator = false; // inserting higher-priority queue | |
789 | set<PGRef>& nq = agent_queue[priority]; | |
790 | if (nq.empty()) | |
791 | agent_cond.Signal(); | |
792 | nq.insert(pg); | |
793 | } | |
794 | ||
795 | void _dequeue(PG *pg, uint64_t old_priority) { | |
796 | set<PGRef>& oq = agent_queue[old_priority]; | |
797 | set<PGRef>::iterator p = oq.find(pg); | |
798 | assert(p != oq.end()); | |
799 | if (p == agent_queue_pos) | |
800 | ++agent_queue_pos; | |
801 | oq.erase(p); | |
802 | if (oq.empty()) { | |
803 | if (agent_queue.rbegin()->first == old_priority) | |
804 | agent_valid_iterator = false; | |
805 | agent_queue.erase(old_priority); | |
806 | } | |
807 | } | |
808 | ||
809 | /// enable agent for a pg | |
810 | void agent_enable_pg(PG *pg, uint64_t priority) { | |
811 | Mutex::Locker l(agent_lock); | |
812 | _enqueue(pg, priority); | |
813 | } | |
814 | ||
815 | /// adjust priority for an enagled pg | |
816 | void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) { | |
817 | Mutex::Locker l(agent_lock); | |
818 | assert(new_priority != old_priority); | |
819 | _enqueue(pg, new_priority); | |
820 | _dequeue(pg, old_priority); | |
821 | } | |
822 | ||
823 | /// disable agent for a pg | |
824 | void agent_disable_pg(PG *pg, uint64_t old_priority) { | |
825 | Mutex::Locker l(agent_lock); | |
826 | _dequeue(pg, old_priority); | |
827 | } | |
828 | ||
829 | /// note start of an async (evict) op | |
830 | void agent_start_evict_op() { | |
831 | Mutex::Locker l(agent_lock); | |
832 | ++agent_ops; | |
833 | } | |
834 | ||
835 | /// note finish or cancellation of an async (evict) op | |
836 | void agent_finish_evict_op() { | |
837 | Mutex::Locker l(agent_lock); | |
838 | assert(agent_ops > 0); | |
839 | --agent_ops; | |
840 | agent_cond.Signal(); | |
841 | } | |
842 | ||
843 | /// note start of an async (flush) op | |
844 | void agent_start_op(const hobject_t& oid) { | |
845 | Mutex::Locker l(agent_lock); | |
846 | ++agent_ops; | |
847 | assert(agent_oids.count(oid) == 0); | |
848 | agent_oids.insert(oid); | |
849 | } | |
850 | ||
851 | /// note finish or cancellation of an async (flush) op | |
852 | void agent_finish_op(const hobject_t& oid) { | |
853 | Mutex::Locker l(agent_lock); | |
854 | assert(agent_ops > 0); | |
855 | --agent_ops; | |
856 | assert(agent_oids.count(oid) == 1); | |
857 | agent_oids.erase(oid); | |
858 | agent_cond.Signal(); | |
859 | } | |
860 | ||
861 | /// check if we are operating on an object | |
862 | bool agent_is_active_oid(const hobject_t& oid) { | |
863 | Mutex::Locker l(agent_lock); | |
864 | return agent_oids.count(oid); | |
865 | } | |
866 | ||
867 | /// get count of active agent ops | |
868 | int agent_get_num_ops() { | |
869 | Mutex::Locker l(agent_lock); | |
870 | return agent_ops; | |
871 | } | |
872 | ||
873 | void agent_inc_high_count() { | |
874 | Mutex::Locker l(agent_lock); | |
875 | flush_mode_high_count ++; | |
876 | } | |
877 | ||
878 | void agent_dec_high_count() { | |
879 | Mutex::Locker l(agent_lock); | |
880 | flush_mode_high_count --; | |
881 | } | |
882 | ||
883 | private: | |
884 | /// throttle promotion attempts | |
885 | std::atomic_uint promote_probability_millis{1000}; ///< probability thousands. one word. | |
886 | PromoteCounter promote_counter; | |
887 | utime_t last_recalibrate; | |
888 | unsigned long promote_max_objects, promote_max_bytes; | |
889 | ||
890 | public: | |
891 | bool promote_throttle() { | |
892 | // NOTE: lockless! we rely on the probability being a single word. | |
893 | promote_counter.attempt(); | |
894 | if ((unsigned)rand() % 1000 > promote_probability_millis) | |
895 | return true; // yes throttle (no promote) | |
896 | if (promote_max_objects && | |
897 | promote_counter.objects > promote_max_objects) | |
898 | return true; // yes throttle | |
899 | if (promote_max_bytes && | |
900 | promote_counter.bytes > promote_max_bytes) | |
901 | return true; // yes throttle | |
902 | return false; // no throttle (promote) | |
903 | } | |
904 | void promote_finish(uint64_t bytes) { | |
905 | promote_counter.finish(bytes); | |
906 | } | |
907 | void promote_throttle_recalibrate(); | |
908 | ||
909 | // -- Objecter, for tiering reads/writes from/to other OSDs -- | |
910 | Objecter *objecter; | |
911 | Finisher objecter_finisher; | |
912 | ||
913 | // -- Watch -- | |
914 | Mutex watch_lock; | |
915 | SafeTimer watch_timer; | |
916 | uint64_t next_notif_id; | |
917 | uint64_t get_next_id(epoch_t cur_epoch) { | |
918 | Mutex::Locker l(watch_lock); | |
919 | return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++)); | |
920 | } | |
921 | ||
922 | // -- Recovery/Backfill Request Scheduling -- | |
923 | Mutex recovery_request_lock; | |
924 | SafeTimer recovery_request_timer; | |
925 | ||
926 | // -- tids -- | |
927 | // for ops i issue | |
928 | std::atomic_uint last_tid{0}; | |
929 | ceph_tid_t get_tid() { | |
930 | return (ceph_tid_t)last_tid++; | |
931 | } | |
932 | ||
933 | // -- backfill_reservation -- | |
934 | Finisher reserver_finisher; | |
935 | AsyncReserver<spg_t> local_reserver; | |
936 | AsyncReserver<spg_t> remote_reserver; | |
937 | ||
938 | // -- pg_temp -- | |
939 | private: | |
940 | Mutex pg_temp_lock; | |
941 | map<pg_t, vector<int> > pg_temp_wanted; | |
942 | map<pg_t, vector<int> > pg_temp_pending; | |
943 | void _sent_pg_temp(); | |
944 | public: | |
945 | void queue_want_pg_temp(pg_t pgid, vector<int>& want); | |
946 | void remove_want_pg_temp(pg_t pgid); | |
947 | void requeue_pg_temp(); | |
948 | void send_pg_temp(); | |
949 | ||
950 | void send_pg_created(pg_t pgid); | |
951 | ||
952 | void queue_for_peering(PG *pg); | |
953 | ||
954 | Mutex snap_sleep_lock; | |
955 | SafeTimer snap_sleep_timer; | |
956 | ||
957 | AsyncReserver<spg_t> snap_reserver; | |
958 | void queue_for_snap_trim(PG *pg); | |
959 | ||
960 | void queue_for_scrub(PG *pg) { | |
961 | enqueue_back( | |
962 | pg->info.pgid, | |
963 | PGQueueable( | |
964 | PGScrub(pg->get_osdmap()->get_epoch()), | |
965 | cct->_conf->osd_scrub_cost, | |
966 | pg->scrubber.priority, | |
967 | ceph_clock_now(), | |
968 | entity_inst_t(), | |
969 | pg->get_osdmap()->get_epoch())); | |
970 | } | |
971 | ||
972 | private: | |
973 | // -- pg recovery and associated throttling -- | |
974 | Mutex recovery_lock; | |
975 | list<pair<epoch_t, PGRef> > awaiting_throttle; | |
976 | ||
977 | utime_t defer_recovery_until; | |
978 | uint64_t recovery_ops_active; | |
979 | uint64_t recovery_ops_reserved; | |
980 | bool recovery_paused; | |
981 | #ifdef DEBUG_RECOVERY_OIDS | |
982 | map<spg_t, set<hobject_t> > recovery_oids; | |
983 | #endif | |
984 | bool _recover_now(uint64_t *available_pushes); | |
985 | void _maybe_queue_recovery(); | |
986 | void _queue_for_recovery( | |
987 | pair<epoch_t, PGRef> p, uint64_t reserved_pushes) { | |
988 | assert(recovery_lock.is_locked_by_me()); | |
989 | enqueue_back( | |
990 | p.second->info.pgid, | |
991 | PGQueueable( | |
992 | PGRecovery(p.first, reserved_pushes), | |
993 | cct->_conf->osd_recovery_cost, | |
994 | cct->_conf->osd_recovery_priority, | |
995 | ceph_clock_now(), | |
996 | entity_inst_t(), | |
997 | p.first)); | |
998 | } | |
999 | public: | |
1000 | void start_recovery_op(PG *pg, const hobject_t& soid); | |
1001 | void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue); | |
1002 | bool is_recovery_active(); | |
1003 | void release_reserved_pushes(uint64_t pushes) { | |
1004 | Mutex::Locker l(recovery_lock); | |
1005 | assert(recovery_ops_reserved >= pushes); | |
1006 | recovery_ops_reserved -= pushes; | |
1007 | _maybe_queue_recovery(); | |
1008 | } | |
1009 | void defer_recovery(float defer_for) { | |
1010 | defer_recovery_until = ceph_clock_now(); | |
1011 | defer_recovery_until += defer_for; | |
1012 | } | |
1013 | void pause_recovery() { | |
1014 | Mutex::Locker l(recovery_lock); | |
1015 | recovery_paused = true; | |
1016 | } | |
1017 | bool recovery_is_paused() { | |
1018 | Mutex::Locker l(recovery_lock); | |
1019 | return recovery_paused; | |
1020 | } | |
1021 | void unpause_recovery() { | |
1022 | Mutex::Locker l(recovery_lock); | |
1023 | recovery_paused = false; | |
1024 | _maybe_queue_recovery(); | |
1025 | } | |
1026 | void kick_recovery_queue() { | |
1027 | Mutex::Locker l(recovery_lock); | |
1028 | _maybe_queue_recovery(); | |
1029 | } | |
1030 | void clear_queued_recovery(PG *pg) { | |
1031 | Mutex::Locker l(recovery_lock); | |
1032 | for (list<pair<epoch_t, PGRef> >::iterator i = awaiting_throttle.begin(); | |
1033 | i != awaiting_throttle.end(); | |
1034 | ) { | |
1035 | if (i->second.get() == pg) { | |
1036 | awaiting_throttle.erase(i); | |
1037 | return; | |
1038 | } else { | |
1039 | ++i; | |
1040 | } | |
1041 | } | |
1042 | } | |
1043 | // delayed pg activation | |
1044 | void queue_for_recovery(PG *pg, bool front = false) { | |
1045 | Mutex::Locker l(recovery_lock); | |
1046 | if (front) { | |
1047 | awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg)); | |
1048 | } else { | |
1049 | awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg)); | |
1050 | } | |
1051 | _maybe_queue_recovery(); | |
1052 | } | |
1053 | ||
1054 | ||
1055 | // osd map cache (past osd maps) | |
1056 | Mutex map_cache_lock; | |
1057 | SharedLRU<epoch_t, const OSDMap> map_cache; | |
1058 | SimpleLRU<epoch_t, bufferlist> map_bl_cache; | |
1059 | SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache; | |
1060 | ||
1061 | OSDMapRef try_get_map(epoch_t e); | |
1062 | OSDMapRef get_map(epoch_t e) { | |
1063 | OSDMapRef ret(try_get_map(e)); | |
1064 | assert(ret); | |
1065 | return ret; | |
1066 | } | |
1067 | OSDMapRef add_map(OSDMap *o) { | |
1068 | Mutex::Locker l(map_cache_lock); | |
1069 | return _add_map(o); | |
1070 | } | |
1071 | OSDMapRef _add_map(OSDMap *o); | |
1072 | ||
1073 | void add_map_bl(epoch_t e, bufferlist& bl) { | |
1074 | Mutex::Locker l(map_cache_lock); | |
1075 | return _add_map_bl(e, bl); | |
1076 | } | |
1077 | void pin_map_bl(epoch_t e, bufferlist &bl); | |
1078 | void _add_map_bl(epoch_t e, bufferlist& bl); | |
1079 | bool get_map_bl(epoch_t e, bufferlist& bl) { | |
1080 | Mutex::Locker l(map_cache_lock); | |
1081 | return _get_map_bl(e, bl); | |
1082 | } | |
1083 | bool _get_map_bl(epoch_t e, bufferlist& bl); | |
1084 | ||
1085 | void add_map_inc_bl(epoch_t e, bufferlist& bl) { | |
1086 | Mutex::Locker l(map_cache_lock); | |
1087 | return _add_map_inc_bl(e, bl); | |
1088 | } | |
1089 | void pin_map_inc_bl(epoch_t e, bufferlist &bl); | |
1090 | void _add_map_inc_bl(epoch_t e, bufferlist& bl); | |
1091 | bool get_inc_map_bl(epoch_t e, bufferlist& bl); | |
1092 | ||
1093 | void clear_map_bl_cache_pins(epoch_t e); | |
1094 | ||
1095 | void need_heartbeat_peer_update(); | |
1096 | ||
1097 | void pg_stat_queue_enqueue(PG *pg); | |
1098 | void pg_stat_queue_dequeue(PG *pg); | |
1099 | ||
1100 | void init(); | |
1101 | void final_init(); | |
1102 | void start_shutdown(); | |
1103 | void shutdown(); | |
1104 | ||
1105 | private: | |
1106 | // split | |
1107 | Mutex in_progress_split_lock; | |
1108 | map<spg_t, spg_t> pending_splits; // child -> parent | |
1109 | map<spg_t, set<spg_t> > rev_pending_splits; // parent -> [children] | |
1110 | set<spg_t> in_progress_splits; // child | |
1111 | ||
1112 | public: | |
1113 | void _start_split(spg_t parent, const set<spg_t> &children); | |
1114 | void start_split(spg_t parent, const set<spg_t> &children) { | |
1115 | Mutex::Locker l(in_progress_split_lock); | |
1116 | return _start_split(parent, children); | |
1117 | } | |
1118 | void mark_split_in_progress(spg_t parent, const set<spg_t> &pgs); | |
1119 | void complete_split(const set<spg_t> &pgs); | |
1120 | void cancel_pending_splits_for_parent(spg_t parent); | |
1121 | void _cancel_pending_splits_for_parent(spg_t parent); | |
1122 | bool splitting(spg_t pgid); | |
1123 | void expand_pg_num(OSDMapRef old_map, | |
1124 | OSDMapRef new_map); | |
1125 | void _maybe_split_pgid(OSDMapRef old_map, | |
1126 | OSDMapRef new_map, | |
1127 | spg_t pgid); | |
1128 | void init_splits_between(spg_t pgid, OSDMapRef frommap, OSDMapRef tomap); | |
1129 | ||
1130 | // -- stats -- | |
1131 | Mutex stat_lock; | |
1132 | osd_stat_t osd_stat; | |
1133 | ||
1134 | void update_osd_stat(vector<int>& hb_peers); | |
1135 | osd_stat_t get_osd_stat() { | |
1136 | Mutex::Locker l(stat_lock); | |
1137 | return osd_stat; | |
1138 | } | |
1139 | ||
1140 | // -- OSD Full Status -- | |
1141 | private: | |
1142 | friend TestOpsSocketHook; | |
1143 | mutable Mutex full_status_lock; | |
1144 | enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending | |
1145 | const char *get_full_state_name(s_names s) const { | |
1146 | switch (s) { | |
1147 | case NONE: return "none"; | |
1148 | case NEARFULL: return "nearfull"; | |
1149 | case BACKFILLFULL: return "backfillfull"; | |
1150 | case FULL: return "full"; | |
1151 | case FAILSAFE: return "failsafe"; | |
1152 | default: return "???"; | |
1153 | } | |
1154 | } | |
1155 | s_names get_full_state(string type) const { | |
1156 | if (type == "none") | |
1157 | return NONE; | |
1158 | else if (type == "failsafe") | |
1159 | return FAILSAFE; | |
1160 | else if (type == "full") | |
1161 | return FULL; | |
1162 | else if (type == "backfillfull") | |
1163 | return BACKFILLFULL; | |
1164 | else if (type == "nearfull") | |
1165 | return NEARFULL; | |
1166 | else | |
1167 | return INVALID; | |
1168 | } | |
1169 | double cur_ratio; ///< current utilization | |
1170 | mutable int64_t injectfull = 0; | |
1171 | s_names injectfull_state = NONE; | |
1172 | float get_failsafe_full_ratio(); | |
1173 | void check_full_status(const osd_stat_t &stat); | |
1174 | bool _check_full(s_names type, ostream &ss) const; | |
1175 | public: | |
1176 | bool check_failsafe_full(ostream &ss) const; | |
1177 | bool check_full(ostream &ss) const; | |
1178 | bool check_backfill_full(ostream &ss) const; | |
1179 | bool check_nearfull(ostream &ss) const; | |
1180 | bool is_failsafe_full() const; | |
1181 | bool is_full() const; | |
1182 | bool is_backfillfull() const; | |
1183 | bool is_nearfull() const; | |
1184 | bool need_fullness_update(); ///< osdmap state needs update | |
1185 | void set_injectfull(s_names type, int64_t count); | |
1186 | bool check_osdmap_full(const set<pg_shard_t> &missing_on); | |
1187 | ||
1188 | ||
1189 | // -- epochs -- | |
1190 | private: | |
1191 | mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch | |
1192 | epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started) | |
1193 | epoch_t up_epoch; // _most_recent_ epoch we were marked up | |
1194 | epoch_t bind_epoch; // epoch we last did a bind to new ip:ports | |
1195 | public: | |
1196 | /** | |
1197 | * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params | |
1198 | * can be NULL if you don't care about them. | |
1199 | */ | |
1200 | void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch, | |
1201 | epoch_t *_bind_epoch) const; | |
1202 | /** | |
1203 | * Set the boot, up, and bind epochs. Any NULL params will not be set. | |
1204 | */ | |
1205 | void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch, | |
1206 | const epoch_t *_bind_epoch); | |
1207 | epoch_t get_boot_epoch() const { | |
1208 | epoch_t ret; | |
1209 | retrieve_epochs(&ret, NULL, NULL); | |
1210 | return ret; | |
1211 | } | |
1212 | epoch_t get_up_epoch() const { | |
1213 | epoch_t ret; | |
1214 | retrieve_epochs(NULL, &ret, NULL); | |
1215 | return ret; | |
1216 | } | |
1217 | epoch_t get_bind_epoch() const { | |
1218 | epoch_t ret; | |
1219 | retrieve_epochs(NULL, NULL, &ret); | |
1220 | return ret; | |
1221 | } | |
1222 | ||
1223 | // -- stopping -- | |
1224 | Mutex is_stopping_lock; | |
1225 | Cond is_stopping_cond; | |
1226 | enum { | |
1227 | NOT_STOPPING, | |
1228 | PREPARING_TO_STOP, | |
1229 | STOPPING }; | |
1230 | std::atomic_int state{NOT_STOPPING}; | |
1231 | int get_state() { | |
1232 | return state; | |
1233 | } | |
1234 | void set_state(int s) { | |
1235 | state = s; | |
1236 | } | |
1237 | bool is_stopping() const { | |
1238 | return state == STOPPING; | |
1239 | } | |
1240 | bool is_preparing_to_stop() const { | |
1241 | return state == PREPARING_TO_STOP; | |
1242 | } | |
1243 | bool prepare_to_stop(); | |
1244 | void got_stop_ack(); | |
1245 | ||
1246 | ||
1247 | #ifdef PG_DEBUG_REFS | |
1248 | Mutex pgid_lock; | |
1249 | map<spg_t, int> pgid_tracker; | |
1250 | map<spg_t, PG*> live_pgs; | |
1251 | void add_pgid(spg_t pgid, PG *pg) { | |
1252 | Mutex::Locker l(pgid_lock); | |
1253 | if (!pgid_tracker.count(pgid)) { | |
1254 | live_pgs[pgid] = pg; | |
1255 | } | |
1256 | pgid_tracker[pgid]++; | |
1257 | } | |
1258 | void remove_pgid(spg_t pgid, PG *pg) { | |
1259 | Mutex::Locker l(pgid_lock); | |
1260 | assert(pgid_tracker.count(pgid)); | |
1261 | assert(pgid_tracker[pgid] > 0); | |
1262 | pgid_tracker[pgid]--; | |
1263 | if (pgid_tracker[pgid] == 0) { | |
1264 | pgid_tracker.erase(pgid); | |
1265 | live_pgs.erase(pgid); | |
1266 | } | |
1267 | } | |
1268 | void dump_live_pgids() { | |
1269 | Mutex::Locker l(pgid_lock); | |
1270 | derr << "live pgids:" << dendl; | |
1271 | for (map<spg_t, int>::const_iterator i = pgid_tracker.cbegin(); | |
1272 | i != pgid_tracker.cend(); | |
1273 | ++i) { | |
1274 | derr << "\t" << *i << dendl; | |
1275 | live_pgs[i->first]->dump_live_ids(); | |
1276 | } | |
1277 | } | |
1278 | #endif | |
1279 | ||
1280 | explicit OSDService(OSD *osd); | |
1281 | ~OSDService(); | |
1282 | }; | |
1283 | ||
1284 | class OSD : public Dispatcher, | |
1285 | public md_config_obs_t { | |
1286 | /** OSD **/ | |
1287 | Mutex osd_lock; // global lock | |
1288 | SafeTimer tick_timer; // safe timer (osd_lock) | |
1289 | ||
1290 | // Tick timer for those stuff that do not need osd_lock | |
1291 | Mutex tick_timer_lock; | |
1292 | SafeTimer tick_timer_without_osd_lock; | |
1293 | public: | |
1294 | // config observer bits | |
1295 | const char** get_tracked_conf_keys() const override; | |
1296 | void handle_conf_change(const struct md_config_t *conf, | |
1297 | const std::set <std::string> &changed) override; | |
1298 | void update_log_config(); | |
1299 | void check_config(); | |
1300 | ||
1301 | protected: | |
1302 | ||
1303 | static const double OSD_TICK_INTERVAL; // tick interval for tick_timer and tick_timer_without_osd_lock | |
1304 | ||
1305 | AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry; | |
1306 | AuthAuthorizeHandlerRegistry *authorize_handler_service_registry; | |
1307 | ||
1308 | Messenger *cluster_messenger; | |
1309 | Messenger *client_messenger; | |
1310 | Messenger *objecter_messenger; | |
1311 | MonClient *monc; // check the "monc helpers" list before accessing directly | |
1312 | MgrClient mgrc; | |
1313 | PerfCounters *logger; | |
1314 | PerfCounters *recoverystate_perf; | |
1315 | ObjectStore *store; | |
1316 | #ifdef HAVE_LIBFUSE | |
1317 | FuseStore *fuse_store = nullptr; | |
1318 | #endif | |
1319 | LogClient log_client; | |
1320 | LogChannelRef clog; | |
1321 | ||
1322 | int whoami; | |
1323 | std::string dev_path, journal_path; | |
1324 | ||
1325 | ZTracer::Endpoint trace_endpoint; | |
1326 | void create_logger(); | |
1327 | void create_recoverystate_perf(); | |
1328 | void tick(); | |
1329 | void tick_without_osd_lock(); | |
1330 | void _dispatch(Message *m); | |
1331 | void dispatch_op(OpRequestRef op); | |
1332 | ||
1333 | void check_osdmap_features(ObjectStore *store); | |
1334 | ||
1335 | // asok | |
1336 | friend class OSDSocketHook; | |
1337 | class OSDSocketHook *asok_hook; | |
1338 | bool asok_command(string admin_command, cmdmap_t& cmdmap, string format, ostream& ss); | |
1339 | ||
1340 | public: | |
1341 | ClassHandler *class_handler = nullptr; | |
1342 | int get_nodeid() { return whoami; } | |
1343 | ||
1344 | static ghobject_t get_osdmap_pobject_name(epoch_t epoch) { | |
1345 | char foo[20]; | |
1346 | snprintf(foo, sizeof(foo), "osdmap.%d", epoch); | |
1347 | return ghobject_t(hobject_t(sobject_t(object_t(foo), 0))); | |
1348 | } | |
1349 | static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) { | |
1350 | char foo[22]; | |
1351 | snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch); | |
1352 | return ghobject_t(hobject_t(sobject_t(object_t(foo), 0))); | |
1353 | } | |
1354 | ||
1355 | static ghobject_t make_snapmapper_oid() { | |
1356 | return ghobject_t(hobject_t( | |
1357 | sobject_t( | |
1358 | object_t("snapmapper"), | |
1359 | 0))); | |
1360 | } | |
1361 | ||
1362 | static ghobject_t make_pg_log_oid(spg_t pg) { | |
1363 | stringstream ss; | |
1364 | ss << "pglog_" << pg; | |
1365 | string s; | |
1366 | getline(ss, s); | |
1367 | return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0))); | |
1368 | } | |
1369 | ||
1370 | static ghobject_t make_pg_biginfo_oid(spg_t pg) { | |
1371 | stringstream ss; | |
1372 | ss << "pginfo_" << pg; | |
1373 | string s; | |
1374 | getline(ss, s); | |
1375 | return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0))); | |
1376 | } | |
1377 | static ghobject_t make_infos_oid() { | |
1378 | hobject_t oid(sobject_t("infos", CEPH_NOSNAP)); | |
1379 | return ghobject_t(oid); | |
1380 | } | |
1381 | static void recursive_remove_collection(CephContext* cct, | |
1382 | ObjectStore *store, | |
1383 | spg_t pgid, | |
1384 | coll_t tmp); | |
1385 | ||
1386 | /** | |
1387 | * get_osd_initial_compat_set() | |
1388 | * | |
1389 | * Get the initial feature set for this OSD. Features | |
1390 | * here are automatically upgraded. | |
1391 | * | |
1392 | * Return value: Initial osd CompatSet | |
1393 | */ | |
1394 | static CompatSet get_osd_initial_compat_set(); | |
1395 | ||
1396 | /** | |
1397 | * get_osd_compat_set() | |
1398 | * | |
1399 | * Get all features supported by this OSD | |
1400 | * | |
1401 | * Return value: CompatSet of all supported features | |
1402 | */ | |
1403 | static CompatSet get_osd_compat_set(); | |
1404 | ||
1405 | ||
1406 | private: | |
1407 | class C_Tick; | |
1408 | class C_Tick_WithoutOSDLock; | |
1409 | ||
1410 | // -- superblock -- | |
1411 | OSDSuperblock superblock; | |
1412 | ||
1413 | void write_superblock(); | |
1414 | void write_superblock(ObjectStore::Transaction& t); | |
1415 | int read_superblock(); | |
1416 | ||
1417 | void clear_temp_objects(); | |
1418 | ||
1419 | CompatSet osd_compat; | |
1420 | ||
1421 | // -- state -- | |
1422 | public: | |
1423 | typedef enum { | |
1424 | STATE_INITIALIZING = 1, | |
1425 | STATE_PREBOOT, | |
1426 | STATE_BOOTING, | |
1427 | STATE_ACTIVE, | |
1428 | STATE_STOPPING, | |
1429 | STATE_WAITING_FOR_HEALTHY | |
1430 | } osd_state_t; | |
1431 | ||
1432 | static const char *get_state_name(int s) { | |
1433 | switch (s) { | |
1434 | case STATE_INITIALIZING: return "initializing"; | |
1435 | case STATE_PREBOOT: return "preboot"; | |
1436 | case STATE_BOOTING: return "booting"; | |
1437 | case STATE_ACTIVE: return "active"; | |
1438 | case STATE_STOPPING: return "stopping"; | |
1439 | case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy"; | |
1440 | default: return "???"; | |
1441 | } | |
1442 | } | |
1443 | ||
1444 | private: | |
1445 | std::atomic_int state{STATE_INITIALIZING}; | |
1446 | ||
1447 | public: | |
1448 | int get_state() const { | |
1449 | return state; | |
1450 | } | |
1451 | void set_state(int s) { | |
1452 | state = s; | |
1453 | } | |
1454 | bool is_initializing() const { | |
1455 | return state == STATE_INITIALIZING; | |
1456 | } | |
1457 | bool is_preboot() const { | |
1458 | return state == STATE_PREBOOT; | |
1459 | } | |
1460 | bool is_booting() const { | |
1461 | return state == STATE_BOOTING; | |
1462 | } | |
1463 | bool is_active() const { | |
1464 | return state == STATE_ACTIVE; | |
1465 | } | |
1466 | bool is_stopping() const { | |
1467 | return state == STATE_STOPPING; | |
1468 | } | |
1469 | bool is_waiting_for_healthy() const { | |
1470 | return state == STATE_WAITING_FOR_HEALTHY; | |
1471 | } | |
1472 | ||
1473 | private: | |
1474 | ||
1475 | ThreadPool osd_tp; | |
1476 | ShardedThreadPool osd_op_tp; | |
1477 | ThreadPool disk_tp; | |
1478 | ThreadPool command_tp; | |
1479 | ||
1480 | void set_disk_tp_priority(); | |
1481 | void get_latest_osdmap(); | |
1482 | ||
1483 | // -- sessions -- | |
1484 | private: | |
1485 | void dispatch_session_waiting(Session *session, OSDMapRef osdmap); | |
1486 | void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap); | |
1487 | ||
1488 | Mutex session_waiting_lock; | |
1489 | set<Session*> session_waiting_for_map; | |
1490 | ||
1491 | /// Caller assumes refs for included Sessions | |
1492 | void get_sessions_waiting_for_map(set<Session*> *out) { | |
1493 | Mutex::Locker l(session_waiting_lock); | |
1494 | out->swap(session_waiting_for_map); | |
1495 | } | |
1496 | void register_session_waiting_on_map(Session *session) { | |
1497 | Mutex::Locker l(session_waiting_lock); | |
1498 | if (session_waiting_for_map.insert(session).second) { | |
1499 | session->get(); | |
1500 | } | |
1501 | } | |
1502 | void clear_session_waiting_on_map(Session *session) { | |
1503 | Mutex::Locker l(session_waiting_lock); | |
1504 | set<Session*>::iterator i = session_waiting_for_map.find(session); | |
1505 | if (i != session_waiting_for_map.end()) { | |
1506 | (*i)->put(); | |
1507 | session_waiting_for_map.erase(i); | |
1508 | } | |
1509 | } | |
1510 | void dispatch_sessions_waiting_on_map() { | |
1511 | set<Session*> sessions_to_check; | |
1512 | get_sessions_waiting_for_map(&sessions_to_check); | |
1513 | for (set<Session*>::iterator i = sessions_to_check.begin(); | |
1514 | i != sessions_to_check.end(); | |
1515 | sessions_to_check.erase(i++)) { | |
1516 | (*i)->session_dispatch_lock.Lock(); | |
1517 | dispatch_session_waiting(*i, osdmap); | |
1518 | (*i)->session_dispatch_lock.Unlock(); | |
1519 | (*i)->put(); | |
1520 | } | |
1521 | } | |
1522 | void session_handle_reset(Session *session) { | |
1523 | Mutex::Locker l(session->session_dispatch_lock); | |
1524 | clear_session_waiting_on_map(session); | |
1525 | ||
1526 | session->clear_backoffs(); | |
1527 | ||
1528 | /* Messages have connection refs, we need to clear the | |
1529 | * connection->session->message->connection | |
1530 | * cycles which result. | |
1531 | * Bug #12338 | |
1532 | */ | |
1533 | session->waiting_on_map.clear_and_dispose(TrackedOp::Putter()); | |
1534 | } | |
1535 | ||
1536 | private: | |
1537 | /** | |
1538 | * @defgroup monc helpers | |
1539 | * @{ | |
1540 | * Right now we only have the one | |
1541 | */ | |
1542 | ||
1543 | /** | |
1544 | * Ask the Monitors for a sequence of OSDMaps. | |
1545 | * | |
1546 | * @param epoch The epoch to start with when replying | |
1547 | * @param force_request True if this request forces a new subscription to | |
1548 | * the monitors; false if an outstanding request that encompasses it is | |
1549 | * sufficient. | |
1550 | */ | |
1551 | void osdmap_subscribe(version_t epoch, bool force_request); | |
1552 | /** @} monc helpers */ | |
1553 | ||
1554 | // -- heartbeat -- | |
1555 | /// information about a heartbeat peer | |
1556 | struct HeartbeatInfo { | |
1557 | int peer; ///< peer | |
1558 | ConnectionRef con_front; ///< peer connection (front) | |
1559 | ConnectionRef con_back; ///< peer connection (back) | |
1560 | utime_t first_tx; ///< time we sent our first ping request | |
1561 | utime_t last_tx; ///< last time we sent a ping request | |
1562 | utime_t last_rx_front; ///< last time we got a ping reply on the front side | |
1563 | utime_t last_rx_back; ///< last time we got a ping reply on the back side | |
1564 | epoch_t epoch; ///< most recent epoch we wanted this peer | |
1565 | ||
1566 | bool is_unhealthy(utime_t cutoff) const { | |
1567 | return | |
1568 | ! ((last_rx_front > cutoff || | |
1569 | (last_rx_front == utime_t() && (last_tx == utime_t() || | |
1570 | first_tx > cutoff))) && | |
1571 | (last_rx_back > cutoff || | |
1572 | (last_rx_back == utime_t() && (last_tx == utime_t() || | |
1573 | first_tx > cutoff)))); | |
1574 | } | |
1575 | bool is_healthy(utime_t cutoff) const { | |
1576 | return last_rx_front > cutoff && last_rx_back > cutoff; | |
1577 | } | |
1578 | ||
1579 | }; | |
1580 | /// state attached to outgoing heartbeat connections | |
1581 | struct HeartbeatSession : public RefCountedObject { | |
1582 | int peer; | |
1583 | explicit HeartbeatSession(int p) : peer(p) {} | |
1584 | }; | |
1585 | Mutex heartbeat_lock; | |
1586 | map<int, int> debug_heartbeat_drops_remaining; | |
1587 | Cond heartbeat_cond; | |
1588 | bool heartbeat_stop; | |
1589 | std::atomic_bool heartbeat_need_update; | |
1590 | map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo | |
1591 | utime_t last_mon_heartbeat; | |
1592 | Messenger *hb_front_client_messenger; | |
1593 | Messenger *hb_back_client_messenger; | |
1594 | Messenger *hb_front_server_messenger; | |
1595 | Messenger *hb_back_server_messenger; | |
1596 | utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state | |
1597 | double daily_loadavg; | |
1598 | ||
1599 | void _add_heartbeat_peer(int p); | |
1600 | void _remove_heartbeat_peer(int p); | |
1601 | bool heartbeat_reset(Connection *con); | |
1602 | void maybe_update_heartbeat_peers(); | |
1603 | void reset_heartbeat_peers(); | |
1604 | bool heartbeat_peers_need_update() { | |
1605 | return heartbeat_need_update.load(); | |
1606 | } | |
1607 | void heartbeat_set_peers_need_update() { | |
1608 | heartbeat_need_update.store(true); | |
1609 | } | |
1610 | void heartbeat_clear_peers_need_update() { | |
1611 | heartbeat_need_update.store(false); | |
1612 | } | |
1613 | void heartbeat(); | |
1614 | void heartbeat_check(); | |
1615 | void heartbeat_entry(); | |
1616 | void need_heartbeat_peer_update(); | |
1617 | ||
1618 | void heartbeat_kick() { | |
1619 | Mutex::Locker l(heartbeat_lock); | |
1620 | heartbeat_cond.Signal(); | |
1621 | } | |
1622 | ||
1623 | struct T_Heartbeat : public Thread { | |
1624 | OSD *osd; | |
1625 | explicit T_Heartbeat(OSD *o) : osd(o) {} | |
1626 | void *entry() override { | |
1627 | osd->heartbeat_entry(); | |
1628 | return 0; | |
1629 | } | |
1630 | } heartbeat_thread; | |
1631 | ||
1632 | public: | |
1633 | bool heartbeat_dispatch(Message *m); | |
1634 | ||
1635 | struct HeartbeatDispatcher : public Dispatcher { | |
1636 | OSD *osd; | |
1637 | explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {} | |
1638 | ||
1639 | bool ms_can_fast_dispatch_any() const override { return true; } | |
1640 | bool ms_can_fast_dispatch(const Message *m) const override { | |
1641 | switch (m->get_type()) { | |
1642 | case CEPH_MSG_PING: | |
1643 | case MSG_OSD_PING: | |
1644 | return true; | |
1645 | default: | |
1646 | return false; | |
1647 | } | |
1648 | } | |
1649 | void ms_fast_dispatch(Message *m) override { | |
1650 | osd->heartbeat_dispatch(m); | |
1651 | } | |
1652 | bool ms_dispatch(Message *m) override { | |
1653 | return osd->heartbeat_dispatch(m); | |
1654 | } | |
1655 | bool ms_handle_reset(Connection *con) override { | |
1656 | return osd->heartbeat_reset(con); | |
1657 | } | |
1658 | void ms_handle_remote_reset(Connection *con) override {} | |
1659 | bool ms_handle_refused(Connection *con) override { | |
1660 | return osd->ms_handle_refused(con); | |
1661 | } | |
1662 | bool ms_verify_authorizer(Connection *con, int peer_type, | |
1663 | int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply, | |
1664 | bool& isvalid, CryptoKey& session_key) override { | |
1665 | isvalid = true; | |
1666 | return true; | |
1667 | } | |
1668 | } heartbeat_dispatcher; | |
1669 | ||
1670 | private: | |
1671 | // -- waiters -- | |
1672 | list<OpRequestRef> finished; | |
1673 | ||
1674 | void take_waiters(list<OpRequestRef>& ls) { | |
1675 | assert(osd_lock.is_locked()); | |
1676 | finished.splice(finished.end(), ls); | |
1677 | } | |
1678 | void do_waiters(); | |
1679 | ||
1680 | // -- op tracking -- | |
1681 | OpTracker op_tracker; | |
1682 | void check_ops_in_flight(); | |
1683 | void test_ops(std::string command, std::string args, ostream& ss); | |
1684 | friend class TestOpsSocketHook; | |
1685 | TestOpsSocketHook *test_ops_hook; | |
1686 | friend struct C_CompleteSplits; | |
1687 | friend struct C_OpenPGs; | |
1688 | ||
1689 | // -- op queue -- | |
1690 | enum io_queue { | |
1691 | prioritized, | |
1692 | weightedpriority | |
1693 | }; | |
1694 | const io_queue op_queue; | |
1695 | const unsigned int op_prio_cutoff; | |
1696 | ||
1697 | /* | |
1698 | * The ordered op delivery chain is: | |
1699 | * | |
1700 | * fast dispatch -> pqueue back | |
1701 | * pqueue front <-> to_process back | |
1702 | * to_process front -> RunVis(item) | |
1703 | * <- queue_front() | |
1704 | * | |
1705 | * The pqueue is per-shard, and to_process is per pg_slot. Items can be | |
1706 | * pushed back up into to_process and/or pqueue while order is preserved. | |
1707 | * | |
1708 | * Multiple worker threads can operate on each shard. | |
1709 | * | |
1710 | * Under normal circumstances, num_running == to_proces.size(). There are | |
1711 | * two times when that is not true: (1) when waiting_for_pg == true and | |
1712 | * to_process is accumulating requests that are waiting for the pg to be | |
1713 | * instantiated; in that case they will all get requeued together by | |
1714 | * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg | |
1715 | * and already requeued the items. | |
1716 | */ | |
1717 | friend class PGQueueable; | |
1718 | class ShardedOpWQ | |
1719 | : public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>> | |
1720 | { | |
1721 | struct ShardData { | |
1722 | Mutex sdata_lock; | |
1723 | Cond sdata_cond; | |
1724 | ||
1725 | Mutex sdata_op_ordering_lock; ///< protects all members below | |
1726 | ||
1727 | OSDMapRef waiting_for_pg_osdmap; | |
1728 | struct pg_slot { | |
1729 | PGRef pg; ///< cached pg reference [optional] | |
1730 | list<PGQueueable> to_process; ///< order items for this slot | |
1731 | int num_running = 0; ///< _process threads doing pg lookup/lock | |
1732 | ||
1733 | /// true if pg does/did not exist. if so all new items go directly to | |
1734 | /// to_process. cleared by prune_pg_waiters. | |
1735 | bool waiting_for_pg = false; | |
1736 | ||
1737 | /// incremented by wake_pg_waiters; indicates racing _process threads | |
1738 | /// should bail out (their op has been requeued) | |
1739 | uint64_t requeue_seq = 0; | |
1740 | }; | |
1741 | ||
1742 | /// map of slots for each spg_t. maintains ordering of items dequeued | |
1743 | /// from pqueue while _process thread drops shard lock to acquire the | |
1744 | /// pg lock. slots are removed only by prune_pg_waiters. | |
1745 | unordered_map<spg_t,pg_slot> pg_slots; | |
1746 | ||
1747 | /// priority queue | |
1748 | std::unique_ptr<OpQueue< pair<spg_t, PGQueueable>, entity_inst_t>> pqueue; | |
1749 | ||
1750 | void _enqueue_front(pair<spg_t, PGQueueable> item, unsigned cutoff) { | |
1751 | unsigned priority = item.second.get_priority(); | |
1752 | unsigned cost = item.second.get_cost(); | |
1753 | if (priority >= cutoff) | |
1754 | pqueue->enqueue_strict_front( | |
1755 | item.second.get_owner(), | |
1756 | priority, item); | |
1757 | else | |
1758 | pqueue->enqueue_front( | |
1759 | item.second.get_owner(), | |
1760 | priority, cost, item); | |
1761 | } | |
1762 | ||
1763 | ShardData( | |
1764 | string lock_name, string ordering_lock, | |
1765 | uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct, | |
1766 | io_queue opqueue) | |
1767 | : sdata_lock(lock_name.c_str(), false, true, false, cct), | |
1768 | sdata_op_ordering_lock(ordering_lock.c_str(), false, true, | |
1769 | false, cct) { | |
1770 | if (opqueue == weightedpriority) { | |
1771 | pqueue = std::unique_ptr | |
1772 | <WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>( | |
1773 | new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>( | |
1774 | max_tok_per_prio, min_cost)); | |
1775 | } else if (opqueue == prioritized) { | |
1776 | pqueue = std::unique_ptr | |
1777 | <PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>( | |
1778 | new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>( | |
1779 | max_tok_per_prio, min_cost)); | |
1780 | } | |
1781 | } | |
1782 | }; | |
1783 | ||
1784 | vector<ShardData*> shard_list; | |
1785 | OSD *osd; | |
1786 | uint32_t num_shards; | |
1787 | ||
1788 | public: | |
1789 | ShardedOpWQ(uint32_t pnum_shards, | |
1790 | OSD *o, | |
1791 | time_t ti, | |
1792 | time_t si, | |
1793 | ShardedThreadPool* tp) | |
1794 | : ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>(ti, si, tp), | |
1795 | osd(o), | |
1796 | num_shards(pnum_shards) { | |
1797 | for (uint32_t i = 0; i < num_shards; i++) { | |
1798 | char lock_name[32] = {0}; | |
1799 | snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i); | |
1800 | char order_lock[32] = {0}; | |
1801 | snprintf(order_lock, sizeof(order_lock), "%s.%d", | |
1802 | "OSD:ShardedOpWQ:order:", i); | |
1803 | ShardData* one_shard = new ShardData( | |
1804 | lock_name, order_lock, | |
1805 | osd->cct->_conf->osd_op_pq_max_tokens_per_priority, | |
1806 | osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue); | |
1807 | shard_list.push_back(one_shard); | |
1808 | } | |
1809 | } | |
1810 | ~ShardedOpWQ() override { | |
1811 | while (!shard_list.empty()) { | |
1812 | delete shard_list.back(); | |
1813 | shard_list.pop_back(); | |
1814 | } | |
1815 | } | |
1816 | ||
1817 | /// wake any pg waiters after a PG is created/instantiated | |
1818 | void wake_pg_waiters(spg_t pgid); | |
1819 | ||
1820 | /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here | |
1821 | void prune_pg_waiters(OSDMapRef osdmap, int whoami); | |
1822 | ||
1823 | /// clear cached PGRef on pg deletion | |
1824 | void clear_pg_pointer(spg_t pgid); | |
1825 | ||
1826 | /// clear pg_slots on shutdown | |
1827 | void clear_pg_slots(); | |
1828 | ||
1829 | /// try to do some work | |
1830 | void _process(uint32_t thread_index, heartbeat_handle_d *hb) override; | |
1831 | ||
1832 | /// enqueue a new item | |
1833 | void _enqueue(pair <spg_t, PGQueueable> item) override; | |
1834 | ||
1835 | /// requeue an old item (at the front of the line) | |
1836 | void _enqueue_front(pair <spg_t, PGQueueable> item) override; | |
1837 | ||
1838 | void return_waiting_threads() override { | |
1839 | for(uint32_t i = 0; i < num_shards; i++) { | |
1840 | ShardData* sdata = shard_list[i]; | |
1841 | assert (NULL != sdata); | |
1842 | sdata->sdata_lock.Lock(); | |
1843 | sdata->sdata_cond.Signal(); | |
1844 | sdata->sdata_lock.Unlock(); | |
1845 | } | |
1846 | } | |
1847 | ||
1848 | void dump(Formatter *f) { | |
1849 | for(uint32_t i = 0; i < num_shards; i++) { | |
1850 | ShardData* sdata = shard_list[i]; | |
1851 | char lock_name[32] = {0}; | |
1852 | snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i); | |
1853 | assert (NULL != sdata); | |
1854 | sdata->sdata_op_ordering_lock.Lock(); | |
1855 | f->open_object_section(lock_name); | |
1856 | sdata->pqueue->dump(f); | |
1857 | f->close_section(); | |
1858 | sdata->sdata_op_ordering_lock.Unlock(); | |
1859 | } | |
1860 | } | |
1861 | ||
1862 | /// Must be called on ops queued back to front | |
1863 | struct Pred { | |
1864 | spg_t pgid; | |
1865 | list<OpRequestRef> *out_ops; | |
1866 | uint64_t reserved_pushes_to_free; | |
1867 | Pred(spg_t pg, list<OpRequestRef> *out_ops = 0) | |
1868 | : pgid(pg), out_ops(out_ops), reserved_pushes_to_free(0) {} | |
1869 | void accumulate(const PGQueueable &op) { | |
1870 | reserved_pushes_to_free += op.get_reserved_pushes(); | |
1871 | if (out_ops) { | |
1872 | boost::optional<OpRequestRef> mop = op.maybe_get_op(); | |
1873 | if (mop) | |
1874 | out_ops->push_front(*mop); | |
1875 | } | |
1876 | } | |
1877 | bool operator()(const pair<spg_t, PGQueueable> &op) { | |
1878 | if (op.first == pgid) { | |
1879 | accumulate(op.second); | |
1880 | return true; | |
1881 | } else { | |
1882 | return false; | |
1883 | } | |
1884 | } | |
1885 | uint64_t get_reserved_pushes_to_free() const { | |
1886 | return reserved_pushes_to_free; | |
1887 | } | |
1888 | }; | |
1889 | ||
1890 | bool is_shard_empty(uint32_t thread_index) override { | |
1891 | uint32_t shard_index = thread_index % num_shards; | |
1892 | ShardData* sdata = shard_list[shard_index]; | |
1893 | assert(NULL != sdata); | |
1894 | Mutex::Locker l(sdata->sdata_op_ordering_lock); | |
1895 | return sdata->pqueue->empty(); | |
1896 | } | |
1897 | } op_shardedwq; | |
1898 | ||
1899 | ||
1900 | void enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch); | |
1901 | void dequeue_op( | |
1902 | PGRef pg, OpRequestRef op, | |
1903 | ThreadPool::TPHandle &handle); | |
1904 | ||
1905 | // -- peering queue -- | |
1906 | struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> { | |
1907 | list<PG*> peering_queue; | |
1908 | OSD *osd; | |
1909 | set<PG*> in_use; | |
1910 | PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp) | |
1911 | : ThreadPool::BatchWorkQueue<PG>( | |
1912 | "OSD::PeeringWQ", ti, si, tp), osd(o) {} | |
1913 | ||
1914 | void _dequeue(PG *pg) override { | |
1915 | for (list<PG*>::iterator i = peering_queue.begin(); | |
1916 | i != peering_queue.end(); | |
1917 | ) { | |
1918 | if (*i == pg) { | |
1919 | peering_queue.erase(i++); | |
1920 | pg->put("PeeringWQ"); | |
1921 | } else { | |
1922 | ++i; | |
1923 | } | |
1924 | } | |
1925 | } | |
1926 | bool _enqueue(PG *pg) override { | |
1927 | pg->get("PeeringWQ"); | |
1928 | peering_queue.push_back(pg); | |
1929 | return true; | |
1930 | } | |
1931 | bool _empty() override { | |
1932 | return peering_queue.empty(); | |
1933 | } | |
1934 | void _dequeue(list<PG*> *out) override; | |
1935 | void _process( | |
1936 | const list<PG *> &pgs, | |
1937 | ThreadPool::TPHandle &handle) override { | |
1938 | assert(!pgs.empty()); | |
1939 | osd->process_peering_events(pgs, handle); | |
1940 | for (list<PG *>::const_iterator i = pgs.begin(); | |
1941 | i != pgs.end(); | |
1942 | ++i) { | |
1943 | (*i)->put("PeeringWQ"); | |
1944 | } | |
1945 | } | |
1946 | void _process_finish(const list<PG *> &pgs) override { | |
1947 | for (list<PG*>::const_iterator i = pgs.begin(); | |
1948 | i != pgs.end(); | |
1949 | ++i) { | |
1950 | in_use.erase(*i); | |
1951 | } | |
1952 | } | |
1953 | void _clear() override { | |
1954 | assert(peering_queue.empty()); | |
1955 | } | |
1956 | } peering_wq; | |
1957 | ||
1958 | void process_peering_events( | |
1959 | const list<PG*> &pg, | |
1960 | ThreadPool::TPHandle &handle); | |
1961 | ||
1962 | friend class PG; | |
1963 | friend class PrimaryLogPG; | |
1964 | ||
1965 | ||
1966 | protected: | |
1967 | ||
1968 | // -- osd map -- | |
1969 | OSDMapRef osdmap; | |
1970 | OSDMapRef get_osdmap() { | |
1971 | return osdmap; | |
1972 | } | |
1973 | epoch_t get_osdmap_epoch() { | |
1974 | return osdmap ? osdmap->get_epoch() : 0; | |
1975 | } | |
1976 | ||
1977 | utime_t had_map_since; | |
1978 | RWLock map_lock; | |
1979 | list<OpRequestRef> waiting_for_osdmap; | |
1980 | deque<utime_t> osd_markdown_log; | |
1981 | ||
1982 | friend struct send_map_on_destruct; | |
1983 | ||
1984 | void wait_for_new_map(OpRequestRef op); | |
1985 | void handle_osd_map(class MOSDMap *m); | |
1986 | void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m); | |
1987 | void trim_maps(epoch_t oldest, int nreceived, bool skip_maps); | |
1988 | void note_down_osd(int osd); | |
1989 | void note_up_osd(int osd); | |
1990 | friend class C_OnMapCommit; | |
1991 | ||
1992 | bool advance_pg( | |
1993 | epoch_t advance_to, PG *pg, | |
1994 | ThreadPool::TPHandle &handle, | |
1995 | PG::RecoveryCtx *rctx, | |
1996 | set<boost::intrusive_ptr<PG> > *split_pgs | |
1997 | ); | |
1998 | void consume_map(); | |
1999 | void activate_map(); | |
2000 | ||
2001 | // osd map cache (past osd maps) | |
2002 | OSDMapRef get_map(epoch_t e) { | |
2003 | return service.get_map(e); | |
2004 | } | |
2005 | OSDMapRef add_map(OSDMap *o) { | |
2006 | return service.add_map(o); | |
2007 | } | |
2008 | void add_map_bl(epoch_t e, bufferlist& bl) { | |
2009 | return service.add_map_bl(e, bl); | |
2010 | } | |
2011 | void pin_map_bl(epoch_t e, bufferlist &bl) { | |
2012 | return service.pin_map_bl(e, bl); | |
2013 | } | |
2014 | bool get_map_bl(epoch_t e, bufferlist& bl) { | |
2015 | return service.get_map_bl(e, bl); | |
2016 | } | |
2017 | void add_map_inc_bl(epoch_t e, bufferlist& bl) { | |
2018 | return service.add_map_inc_bl(e, bl); | |
2019 | } | |
2020 | void pin_map_inc_bl(epoch_t e, bufferlist &bl) { | |
2021 | return service.pin_map_inc_bl(e, bl); | |
2022 | } | |
2023 | ||
2024 | protected: | |
2025 | // -- placement groups -- | |
2026 | RWLock pg_map_lock; // this lock orders *above* individual PG _locks | |
2027 | ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock | |
2028 | ||
2029 | map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split; | |
2030 | PGRecoveryStats pg_recovery_stats; | |
2031 | ||
2032 | PGPool _get_pool(int id, OSDMapRef createmap); | |
2033 | ||
2034 | PG *_lookup_lock_pg_with_map_lock_held(spg_t pgid); | |
2035 | PG *_lookup_lock_pg(spg_t pgid); | |
2036 | PG *_open_lock_pg(OSDMapRef createmap, | |
2037 | spg_t pg, bool no_lockdep_check=false); | |
2038 | enum res_result { | |
2039 | RES_PARENT, // resurrected a parent | |
2040 | RES_SELF, // resurrected self | |
2041 | RES_NONE // nothing relevant deleting | |
2042 | }; | |
2043 | res_result _try_resurrect_pg( | |
2044 | OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state); | |
2045 | ||
2046 | PG *_create_lock_pg( | |
2047 | OSDMapRef createmap, | |
2048 | spg_t pgid, | |
2049 | bool hold_map_lock, | |
2050 | bool backfill, | |
2051 | int role, | |
2052 | vector<int>& up, int up_primary, | |
2053 | vector<int>& acting, int acting_primary, | |
2054 | pg_history_t history, | |
2055 | const PastIntervals& pi, | |
2056 | ObjectStore::Transaction& t); | |
2057 | ||
2058 | PG* _make_pg(OSDMapRef createmap, spg_t pgid); | |
2059 | void add_newly_split_pg(PG *pg, | |
2060 | PG::RecoveryCtx *rctx); | |
2061 | ||
2062 | int handle_pg_peering_evt( | |
2063 | spg_t pgid, | |
2064 | const pg_history_t& orig_history, | |
2065 | const PastIntervals& pi, | |
2066 | epoch_t epoch, | |
2067 | PG::CephPeeringEvtRef evt); | |
2068 | ||
2069 | void load_pgs(); | |
2070 | void build_past_intervals_parallel(); | |
2071 | ||
2072 | /// build initial pg history and intervals on create | |
2073 | void build_initial_pg_history( | |
2074 | spg_t pgid, | |
2075 | epoch_t created, | |
2076 | utime_t created_stamp, | |
2077 | pg_history_t *h, | |
2078 | PastIntervals *pi); | |
2079 | ||
2080 | /// project pg history from from to now | |
2081 | bool project_pg_history( | |
2082 | spg_t pgid, pg_history_t& h, epoch_t from, | |
2083 | const vector<int>& lastup, | |
2084 | int lastupprimary, | |
2085 | const vector<int>& lastacting, | |
2086 | int lastactingprimary | |
2087 | ); ///< @return false if there was a map gap between from and now | |
2088 | ||
2089 | // this must be called with pg->lock held on any pg addition to pg_map | |
2090 | void wake_pg_waiters(PGRef pg) { | |
2091 | assert(pg->is_locked()); | |
2092 | op_shardedwq.wake_pg_waiters(pg->info.pgid); | |
2093 | } | |
2094 | epoch_t last_pg_create_epoch; | |
2095 | ||
2096 | void handle_pg_create(OpRequestRef op); | |
2097 | ||
2098 | void split_pgs( | |
2099 | PG *parent, | |
2100 | const set<spg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs, | |
2101 | OSDMapRef curmap, | |
2102 | OSDMapRef nextmap, | |
2103 | PG::RecoveryCtx *rctx); | |
2104 | ||
2105 | // == monitor interaction == | |
2106 | Mutex mon_report_lock; | |
2107 | utime_t last_mon_report; | |
2108 | utime_t last_pg_stats_sent; | |
2109 | ||
2110 | /* if our monitor dies, we want to notice it and reconnect. | |
2111 | * So we keep track of when it last acked our stat updates, | |
2112 | * and if too much time passes (and we've been sending | |
2113 | * more updates) then we can call it dead and reconnect | |
2114 | * elsewhere. | |
2115 | */ | |
2116 | utime_t last_pg_stats_ack; | |
2117 | float stats_ack_timeout; | |
2118 | set<uint64_t> outstanding_pg_stats; // how many stat updates haven't been acked yet | |
2119 | ||
2120 | // -- boot -- | |
2121 | void start_boot(); | |
2122 | void _got_mon_epochs(epoch_t oldest, epoch_t newest); | |
2123 | void _preboot(epoch_t oldest, epoch_t newest); | |
2124 | void _send_boot(); | |
2125 | void _collect_metadata(map<string,string> *pmeta); | |
2126 | ||
2127 | void start_waiting_for_healthy(); | |
2128 | bool _is_healthy(); | |
2129 | ||
2130 | void send_full_update(); | |
2131 | ||
2132 | friend struct C_OSD_GetVersion; | |
2133 | ||
2134 | // -- alive -- | |
2135 | epoch_t up_thru_wanted; | |
2136 | ||
2137 | void queue_want_up_thru(epoch_t want); | |
2138 | void send_alive(); | |
2139 | ||
2140 | // -- full map requests -- | |
2141 | epoch_t requested_full_first, requested_full_last; | |
2142 | ||
2143 | void request_full_map(epoch_t first, epoch_t last); | |
2144 | void rerequest_full_maps() { | |
2145 | epoch_t first = requested_full_first; | |
2146 | epoch_t last = requested_full_last; | |
2147 | requested_full_first = 0; | |
2148 | requested_full_last = 0; | |
2149 | request_full_map(first, last); | |
2150 | } | |
2151 | void got_full_map(epoch_t e); | |
2152 | ||
2153 | // -- failures -- | |
2154 | map<int,utime_t> failure_queue; | |
2155 | map<int,pair<utime_t,entity_inst_t> > failure_pending; | |
2156 | ||
2157 | void requeue_failures(); | |
2158 | void send_failures(); | |
2159 | void send_still_alive(epoch_t epoch, const entity_inst_t &i); | |
2160 | ||
2161 | // -- pg stats -- | |
2162 | Mutex pg_stat_queue_lock; | |
2163 | Cond pg_stat_queue_cond; | |
2164 | xlist<PG*> pg_stat_queue; | |
2165 | bool osd_stat_updated; | |
2166 | uint64_t pg_stat_tid, pg_stat_tid_flushed; | |
2167 | ||
2168 | void send_pg_stats(const utime_t &now); | |
2169 | void handle_pg_stats_ack(class MPGStatsAck *ack); | |
2170 | void flush_pg_stats(); | |
2171 | ||
2172 | ceph::coarse_mono_clock::time_point last_sent_beacon; | |
2173 | Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"}; | |
2174 | epoch_t min_last_epoch_clean = 0; | |
2175 | // which pgs were scanned for min_lec | |
2176 | std::vector<pg_t> min_last_epoch_clean_pgs; | |
2177 | void send_beacon(const ceph::coarse_mono_clock::time_point& now); | |
2178 | ||
2179 | void pg_stat_queue_enqueue(PG *pg) { | |
2180 | pg_stat_queue_lock.Lock(); | |
2181 | if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) { | |
2182 | pg->get("pg_stat_queue"); | |
2183 | pg_stat_queue.push_back(&pg->stat_queue_item); | |
2184 | } | |
2185 | osd_stat_updated = true; | |
2186 | pg_stat_queue_lock.Unlock(); | |
2187 | } | |
2188 | void pg_stat_queue_dequeue(PG *pg) { | |
2189 | pg_stat_queue_lock.Lock(); | |
2190 | if (pg->stat_queue_item.remove_myself()) | |
2191 | pg->put("pg_stat_queue"); | |
2192 | pg_stat_queue_lock.Unlock(); | |
2193 | } | |
2194 | void clear_pg_stat_queue() { | |
2195 | pg_stat_queue_lock.Lock(); | |
2196 | while (!pg_stat_queue.empty()) { | |
2197 | PG *pg = pg_stat_queue.front(); | |
2198 | pg_stat_queue.pop_front(); | |
2199 | pg->put("pg_stat_queue"); | |
2200 | } | |
2201 | pg_stat_queue_lock.Unlock(); | |
2202 | } | |
2203 | ||
2204 | ceph_tid_t get_tid() { | |
2205 | return service.get_tid(); | |
2206 | } | |
2207 | ||
2208 | // -- generic pg peering -- | |
2209 | PG::RecoveryCtx create_context(); | |
2210 | void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap, | |
2211 | ThreadPool::TPHandle *handle = NULL); | |
2212 | void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg, | |
2213 | ThreadPool::TPHandle *handle = NULL); | |
2214 | void do_notifies(map<int, | |
2215 | vector<pair<pg_notify_t, PastIntervals> > >& | |
2216 | notify_list, | |
2217 | OSDMapRef map); | |
2218 | void do_queries(map<int, map<spg_t,pg_query_t> >& query_map, | |
2219 | OSDMapRef map); | |
2220 | void do_infos(map<int, | |
2221 | vector<pair<pg_notify_t, PastIntervals> > >& info_map, | |
2222 | OSDMapRef map); | |
2223 | ||
2224 | bool require_mon_peer(const Message *m); | |
2225 | bool require_mon_or_mgr_peer(const Message *m); | |
2226 | bool require_osd_peer(const Message *m); | |
2227 | /*** | |
2228 | * Verifies that we were alive in the given epoch, and that | |
2229 | * still are. | |
2230 | */ | |
2231 | bool require_self_aliveness(const Message *m, epoch_t alive_since); | |
2232 | /** | |
2233 | * Verifies that the OSD who sent the given op has the same | |
2234 | * address as in the given map. | |
2235 | * @pre op was sent by an OSD using the cluster messenger | |
2236 | */ | |
2237 | bool require_same_peer_instance(const Message *m, OSDMapRef& map, | |
2238 | bool is_fast_dispatch); | |
2239 | ||
2240 | bool require_same_or_newer_map(OpRequestRef& op, epoch_t e, | |
2241 | bool is_fast_dispatch); | |
2242 | ||
2243 | void handle_pg_query(OpRequestRef op); | |
2244 | void handle_pg_notify(OpRequestRef op); | |
2245 | void handle_pg_log(OpRequestRef op); | |
2246 | void handle_pg_info(OpRequestRef op); | |
2247 | void handle_pg_trim(OpRequestRef op); | |
2248 | ||
2249 | void handle_pg_backfill_reserve(OpRequestRef op); | |
2250 | void handle_pg_recovery_reserve(OpRequestRef op); | |
2251 | ||
2252 | void handle_pg_remove(OpRequestRef op); | |
2253 | void _remove_pg(PG *pg); | |
2254 | ||
2255 | // -- commands -- | |
2256 | struct Command { | |
2257 | vector<string> cmd; | |
2258 | ceph_tid_t tid; | |
2259 | bufferlist indata; | |
2260 | ConnectionRef con; | |
2261 | ||
2262 | Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co) | |
2263 | : cmd(c), tid(t), indata(bl), con(co) {} | |
2264 | }; | |
2265 | list<Command*> command_queue; | |
2266 | struct CommandWQ : public ThreadPool::WorkQueue<Command> { | |
2267 | OSD *osd; | |
2268 | CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp) | |
2269 | : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {} | |
2270 | ||
2271 | bool _empty() override { | |
2272 | return osd->command_queue.empty(); | |
2273 | } | |
2274 | bool _enqueue(Command *c) override { | |
2275 | osd->command_queue.push_back(c); | |
2276 | return true; | |
2277 | } | |
2278 | void _dequeue(Command *pg) override { | |
2279 | ceph_abort(); | |
2280 | } | |
2281 | Command *_dequeue() override { | |
2282 | if (osd->command_queue.empty()) | |
2283 | return NULL; | |
2284 | Command *c = osd->command_queue.front(); | |
2285 | osd->command_queue.pop_front(); | |
2286 | return c; | |
2287 | } | |
2288 | void _process(Command *c, ThreadPool::TPHandle &) override { | |
2289 | osd->osd_lock.Lock(); | |
2290 | if (osd->is_stopping()) { | |
2291 | osd->osd_lock.Unlock(); | |
2292 | delete c; | |
2293 | return; | |
2294 | } | |
2295 | osd->do_command(c->con.get(), c->tid, c->cmd, c->indata); | |
2296 | osd->osd_lock.Unlock(); | |
2297 | delete c; | |
2298 | } | |
2299 | void _clear() override { | |
2300 | while (!osd->command_queue.empty()) { | |
2301 | Command *c = osd->command_queue.front(); | |
2302 | osd->command_queue.pop_front(); | |
2303 | delete c; | |
2304 | } | |
2305 | } | |
2306 | } command_wq; | |
2307 | ||
2308 | void handle_command(class MMonCommand *m); | |
2309 | void handle_command(class MCommand *m); | |
2310 | void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data); | |
2311 | ||
2312 | // -- pg recovery -- | |
2313 | void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved, | |
2314 | ThreadPool::TPHandle &handle); | |
2315 | ||
2316 | ||
2317 | // -- scrubbing -- | |
2318 | void sched_scrub(); | |
2319 | bool scrub_random_backoff(); | |
2320 | bool scrub_load_below_threshold(); | |
2321 | bool scrub_time_permit(utime_t now); | |
2322 | ||
2323 | // -- removing -- | |
2324 | struct RemoveWQ : | |
2325 | public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > { | |
2326 | CephContext* cct; | |
2327 | ObjectStore *&store; | |
2328 | list<pair<PGRef, DeletingStateRef> > remove_queue; | |
2329 | RemoveWQ(CephContext* cct, ObjectStore *&o, time_t ti, time_t si, | |
2330 | ThreadPool *tp) | |
2331 | : ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> >( | |
2332 | "OSD::RemoveWQ", ti, si, tp), cct(cct), store(o) {} | |
2333 | ||
2334 | bool _empty() override { | |
2335 | return remove_queue.empty(); | |
2336 | } | |
2337 | void _enqueue(pair<PGRef, DeletingStateRef> item) override { | |
2338 | remove_queue.push_back(item); | |
2339 | } | |
2340 | void _enqueue_front(pair<PGRef, DeletingStateRef> item) override { | |
2341 | remove_queue.push_front(item); | |
2342 | } | |
2343 | bool _dequeue(pair<PGRef, DeletingStateRef> item) { | |
2344 | ceph_abort(); | |
2345 | } | |
2346 | pair<PGRef, DeletingStateRef> _dequeue() override { | |
2347 | assert(!remove_queue.empty()); | |
2348 | pair<PGRef, DeletingStateRef> item = remove_queue.front(); | |
2349 | remove_queue.pop_front(); | |
2350 | return item; | |
2351 | } | |
2352 | void _process(pair<PGRef, DeletingStateRef>, | |
2353 | ThreadPool::TPHandle &) override; | |
2354 | void _clear() override { | |
2355 | remove_queue.clear(); | |
2356 | } | |
2357 | } remove_wq; | |
2358 | ||
2359 | private: | |
2360 | bool ms_can_fast_dispatch_any() const override { return true; } | |
2361 | bool ms_can_fast_dispatch(const Message *m) const override { | |
2362 | switch (m->get_type()) { | |
2363 | case CEPH_MSG_OSD_OP: | |
2364 | case CEPH_MSG_OSD_BACKOFF: | |
2365 | case MSG_OSD_SUBOP: | |
2366 | case MSG_OSD_REPOP: | |
2367 | case MSG_OSD_SUBOPREPLY: | |
2368 | case MSG_OSD_REPOPREPLY: | |
2369 | case MSG_OSD_PG_PUSH: | |
2370 | case MSG_OSD_PG_PULL: | |
2371 | case MSG_OSD_PG_PUSH_REPLY: | |
2372 | case MSG_OSD_PG_SCAN: | |
2373 | case MSG_OSD_PG_BACKFILL: | |
2374 | case MSG_OSD_PG_BACKFILL_REMOVE: | |
2375 | case MSG_OSD_EC_WRITE: | |
2376 | case MSG_OSD_EC_WRITE_REPLY: | |
2377 | case MSG_OSD_EC_READ: | |
2378 | case MSG_OSD_EC_READ_REPLY: | |
2379 | case MSG_OSD_SCRUB_RESERVE: | |
2380 | case MSG_OSD_REP_SCRUB: | |
2381 | case MSG_OSD_REP_SCRUBMAP: | |
2382 | case MSG_OSD_PG_UPDATE_LOG_MISSING: | |
2383 | case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: | |
2384 | return true; | |
2385 | default: | |
2386 | return false; | |
2387 | } | |
2388 | } | |
2389 | void ms_fast_dispatch(Message *m) override; | |
2390 | void ms_fast_preprocess(Message *m) override; | |
2391 | bool ms_dispatch(Message *m) override; | |
2392 | bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) override; | |
2393 | bool ms_verify_authorizer(Connection *con, int peer_type, | |
2394 | int protocol, bufferlist& authorizer, bufferlist& authorizer_reply, | |
2395 | bool& isvalid, CryptoKey& session_key) override; | |
2396 | void ms_handle_connect(Connection *con) override; | |
2397 | void ms_handle_fast_connect(Connection *con) override; | |
2398 | void ms_handle_fast_accept(Connection *con) override; | |
2399 | bool ms_handle_reset(Connection *con) override; | |
2400 | void ms_handle_remote_reset(Connection *con) override {} | |
2401 | bool ms_handle_refused(Connection *con) override; | |
2402 | ||
2403 | io_queue get_io_queue() const { | |
2404 | if (cct->_conf->osd_op_queue == "debug_random") { | |
2405 | srand(time(NULL)); | |
2406 | return (rand() % 2 < 1) ? prioritized : weightedpriority; | |
2407 | } else if (cct->_conf->osd_op_queue == "wpq") { | |
2408 | return weightedpriority; | |
2409 | } else { | |
2410 | return prioritized; | |
2411 | } | |
2412 | } | |
2413 | ||
2414 | unsigned int get_io_prio_cut() const { | |
2415 | if (cct->_conf->osd_op_queue_cut_off == "debug_random") { | |
2416 | srand(time(NULL)); | |
2417 | return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; | |
2418 | } else if (cct->_conf->osd_op_queue_cut_off == "low") { | |
2419 | return CEPH_MSG_PRIO_LOW; | |
2420 | } else { | |
2421 | return CEPH_MSG_PRIO_HIGH; | |
2422 | } | |
2423 | } | |
2424 | ||
2425 | public: | |
2426 | /* internal and external can point to the same messenger, they will still | |
2427 | * be cleaned up properly*/ | |
2428 | OSD(CephContext *cct_, | |
2429 | ObjectStore *store_, | |
2430 | int id, | |
2431 | Messenger *internal, | |
2432 | Messenger *external, | |
2433 | Messenger *hb_front_client, | |
2434 | Messenger *hb_back_client, | |
2435 | Messenger *hb_front_server, | |
2436 | Messenger *hb_back_server, | |
2437 | Messenger *osdc_messenger, | |
2438 | MonClient *mc, const std::string &dev, const std::string &jdev); | |
2439 | ~OSD() override; | |
2440 | ||
2441 | // static bits | |
2442 | static int mkfs(CephContext *cct, ObjectStore *store, | |
2443 | const string& dev, | |
2444 | uuid_d fsid, int whoami); | |
2445 | /* remove any non-user xattrs from a map of them */ | |
2446 | void filter_xattrs(map<string, bufferptr>& attrs) { | |
2447 | for (map<string, bufferptr>::iterator iter = attrs.begin(); | |
2448 | iter != attrs.end(); | |
2449 | ) { | |
2450 | if (('_' != iter->first.at(0)) || (iter->first.size() == 1)) | |
2451 | attrs.erase(iter++); | |
2452 | else ++iter; | |
2453 | } | |
2454 | } | |
2455 | ||
2456 | private: | |
2457 | int mon_cmd_maybe_osd_create(string &cmd); | |
2458 | int update_crush_device_class(); | |
2459 | int update_crush_location(); | |
2460 | ||
2461 | static int write_meta(ObjectStore *store, | |
2462 | uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami); | |
2463 | ||
2464 | void handle_pg_scrub(struct MOSDScrub *m, PG* pg); | |
2465 | void handle_scrub(struct MOSDScrub *m); | |
2466 | void handle_osd_ping(class MOSDPing *m); | |
2467 | ||
2468 | int init_op_flags(OpRequestRef& op); | |
2469 | ||
2470 | public: | |
2471 | static int peek_meta(ObjectStore *store, string& magic, | |
2472 | uuid_d& cluster_fsid, uuid_d& osd_fsid, int& whoami); | |
2473 | ||
2474 | ||
2475 | // startup/shutdown | |
2476 | int pre_init(); | |
2477 | int init(); | |
2478 | void final_init(); | |
2479 | ||
2480 | int enable_disable_fuse(bool stop); | |
2481 | ||
2482 | void suicide(int exitcode); | |
2483 | int shutdown(); | |
2484 | ||
2485 | void handle_signal(int signum); | |
2486 | ||
2487 | /// check if we can throw out op from a disconnected client | |
2488 | static bool op_is_discardable(const MOSDOp *m); | |
2489 | ||
2490 | public: | |
2491 | OSDService service; | |
2492 | friend class OSDService; | |
2493 | }; | |
2494 | ||
2495 | //compatibility of the executable | |
2496 | extern const CompatSet::Feature ceph_osd_feature_compat[]; | |
2497 | extern const CompatSet::Feature ceph_osd_feature_ro_compat[]; | |
2498 | extern const CompatSet::Feature ceph_osd_feature_incompat[]; | |
2499 | ||
2500 | #endif |