]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/PGMonitor.cc
7fdb5bd6357a786afbc30ed4acb8625d5c78078d
[ceph.git] / ceph / src / mon / PGMonitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include "json_spirit/json_spirit.h"
17 #include "common/debug.h" // undo damage
18 #include "PGMonitor.h"
19 #include "Monitor.h"
20 #include "OSDMonitor.h"
21 #include "MonitorDBStore.h"
22 #include "PGStatService.h"
23
24 #include "messages/MPGStats.h"
25 #include "messages/MPGStatsAck.h"
26
27 #include "messages/MOSDPGCreate.h"
28 #include "messages/MMonCommand.h"
29 #include "messages/MOSDScrub.h"
30
31 #include "common/Formatter.h"
32 #include "common/config.h"
33
34 #include "include/stringify.h"
35
36 #include "osd/osd_types.h"
37
38 #include "common/config.h"
39 #include "common/errno.h"
40 #include "common/strtol.h"
41 #include "include/str_list.h"
42 #include <sstream>
43
44 #define dout_subsys ceph_subsys_mon
45 #undef dout_prefix
46 #define dout_prefix _prefix(_dout, mon, pg_map)
47 static ostream& _prefix(std::ostream *_dout, const Monitor *mon, const PGMap& pg_map) {
48 return *_dout << "mon." << mon->name << "@" << mon->rank
49 << "(" << mon->get_state_name()
50 << ").pg v" << pg_map.version << " ";
51 }
52
53 /*
54 Tick function to update the map based on performance every N seconds
55 */
56
57 void PGMonitor::on_restart()
58 {
59 // clear leader state
60 last_osd_report.clear();
61 }
62
63 void PGMonitor::on_active()
64 {
65 if (mon->is_leader()) {
66 check_all_pgs = true;
67 check_osd_map(mon->osdmon()->osdmap.get_epoch());
68 }
69
70 update_logger();
71
72 if (mon->is_leader() &&
73 mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
74 mon->clog->info() << "pgmap " << pg_map;
75 }
76 }
77
78 void PGMonitor::update_logger()
79 {
80 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
81 return;
82 }
83 dout(10) << "update_logger" << dendl;
84
85 mon->cluster_logger->set(l_cluster_osd_bytes, pg_map.osd_sum.kb * 1024ull);
86 mon->cluster_logger->set(l_cluster_osd_bytes_used,
87 pg_map.osd_sum.kb_used * 1024ull);
88 mon->cluster_logger->set(l_cluster_osd_bytes_avail,
89 pg_map.osd_sum.kb_avail * 1024ull);
90
91 mon->cluster_logger->set(l_cluster_num_pool, pg_map.pg_pool_sum.size());
92 mon->cluster_logger->set(l_cluster_num_pg, pg_map.pg_stat.size());
93
94 unsigned active = 0, active_clean = 0, peering = 0;
95 for (ceph::unordered_map<int,int>::iterator p = pg_map.num_pg_by_state.begin();
96 p != pg_map.num_pg_by_state.end();
97 ++p) {
98 if (p->first & PG_STATE_ACTIVE) {
99 active += p->second;
100 if (p->first & PG_STATE_CLEAN)
101 active_clean += p->second;
102 }
103 if (p->first & PG_STATE_PEERING)
104 peering += p->second;
105 }
106 mon->cluster_logger->set(l_cluster_num_pg_active_clean, active_clean);
107 mon->cluster_logger->set(l_cluster_num_pg_active, active);
108 mon->cluster_logger->set(l_cluster_num_pg_peering, peering);
109
110 mon->cluster_logger->set(l_cluster_num_object, pg_map.pg_sum.stats.sum.num_objects);
111 mon->cluster_logger->set(l_cluster_num_object_degraded, pg_map.pg_sum.stats.sum.num_objects_degraded);
112 mon->cluster_logger->set(l_cluster_num_object_misplaced, pg_map.pg_sum.stats.sum.num_objects_misplaced);
113 mon->cluster_logger->set(l_cluster_num_object_unfound, pg_map.pg_sum.stats.sum.num_objects_unfound);
114 mon->cluster_logger->set(l_cluster_num_bytes, pg_map.pg_sum.stats.sum.num_bytes);
115 }
116
117 void PGMonitor::tick()
118 {
119 if (!is_active()) return;
120 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
121 return;
122 }
123
124 handle_osd_timeouts();
125
126 if (!pg_map.pg_sum_deltas.empty()) {
127 utime_t age = ceph_clock_now() - pg_map.stamp;
128 if (age > 2 * g_conf->mon_delta_reset_interval) {
129 dout(10) << " clearing pg_map delta (" << age << " > " << g_conf->mon_delta_reset_interval << " seconds old)" << dendl;
130 pg_map.clear_delta();
131 }
132 }
133
134 /* If we have deltas for pools, run through pgmap's 'per_pool_sum_delta' and
135 * clear any deltas that are old enough.
136 *
137 * Note that 'per_pool_sum_delta' keeps a pool id as key, and a pair containing
138 * the calc'ed stats delta and an absolute timestamp from when those stats were
139 * obtained -- the timestamp IS NOT a delta itself.
140 */
141 if (!pg_map.per_pool_sum_deltas.empty()) {
142 ceph::unordered_map<uint64_t,pair<pool_stat_t,utime_t> >::iterator it;
143 for (it = pg_map.per_pool_sum_delta.begin();
144 it != pg_map.per_pool_sum_delta.end(); ) {
145 utime_t age = ceph_clock_now() - it->second.second;
146 if (age > 2*g_conf->mon_delta_reset_interval) {
147 dout(10) << " clearing pg_map delta for pool " << it->first
148 << " (" << age << " > " << g_conf->mon_delta_reset_interval
149 << " seconds old)" << dendl;
150 pg_map.per_pool_sum_deltas.erase(it->first);
151 pg_map.per_pool_sum_deltas_stamps.erase(it->first);
152 pg_map.per_pool_sum_delta.erase((it++)->first);
153 } else {
154 ++it;
155 }
156 }
157 }
158
159 dout(10) << pg_map << dendl;
160 }
161
162 void PGMonitor::create_initial()
163 {
164 dout(10) << "create_initial -- creating initial map" << dendl;
165 format_version = 1;
166 }
167
168 void PGMonitor::update_from_paxos(bool *need_bootstrap)
169 {
170 if (did_delete)
171 return;
172
173 if (get_value("deleted")) {
174 did_delete = true;
175 dout(10) << __func__ << " deleted, clearing in-memory PGMap" << dendl;
176 pg_map = PGMap();
177 pending_inc = PGMap::Incremental();
178 pgservice.reset();
179 last_osd_report.clear();
180 return;
181 }
182
183 version_t version = get_last_committed();
184 if (version == pg_map.version)
185 return;
186
187 assert(version >= pg_map.version);
188 if (format_version < 1) {
189 derr << __func__ << "unsupported monitor protocol: "
190 << get_service_name() << ".format_version = "
191 << format_version << dendl;
192 }
193 assert(format_version >= 1);
194
195 // pg/osd keys in leveldb
196 // read meta
197 while (version > pg_map.version) {
198 // load full state?
199 if (pg_map.version == 0) {
200 dout(10) << __func__ << " v0, read_full" << dendl;
201 read_pgmap_full();
202 goto out;
203 }
204
205 // incremental state?
206 dout(10) << __func__ << " read_incremental" << dendl;
207 bufferlist bl;
208 int r = get_version(pg_map.version + 1, bl);
209 if (r == -ENOENT) {
210 dout(10) << __func__ << " failed to read_incremental, read_full" << dendl;
211 // reset pg map
212 pg_map = PGMap();
213 read_pgmap_full();
214 goto out;
215 }
216 assert(r == 0);
217 apply_pgmap_delta(bl);
218 }
219
220 read_pgmap_meta();
221
222 out:
223 assert(version == pg_map.version);
224
225 update_logger();
226 }
227
228 void PGMonitor::on_upgrade()
229 {
230 dout(1) << __func__ << " discarding in-core PGMap" << dendl;
231 pg_map = PGMap();
232 }
233
234 void PGMonitor::upgrade_format()
235 {
236 unsigned current = 1;
237 assert(format_version == current);
238 }
239
240 void PGMonitor::post_paxos_update()
241 {
242 if (did_delete)
243 return;
244 dout(10) << __func__ << dendl;
245 OSDMap& osdmap = mon->osdmon()->osdmap;
246 if (mon->monmap->get_required_features().contains_all(
247 ceph::features::mon::FEATURE_LUMINOUS)) {
248 // let OSDMonitor take care of the pg-creates subscriptions.
249 return;
250 }
251 if (osdmap.get_epoch()) {
252 if (osdmap.get_num_up_osds() > 0) {
253 assert(osdmap.get_up_osd_features() & CEPH_FEATURE_MON_STATEFUL_SUB);
254 check_subs();
255 }
256 }
257 }
258
259 void PGMonitor::handle_osd_timeouts()
260 {
261 if (!mon->is_leader())
262 return;
263 if (did_delete)
264 return;
265
266 utime_t now(ceph_clock_now());
267 utime_t timeo(g_conf->mon_osd_report_timeout, 0);
268 if (now - mon->get_leader_since() < timeo) {
269 // We haven't been the leader for long enough to consider OSD timeouts
270 return;
271 }
272
273 if (mon->osdmon()->is_writeable())
274 mon->osdmon()->handle_osd_timeouts(now, last_osd_report);
275 }
276
277 void PGMonitor::create_pending()
278 {
279 if (did_delete)
280 return;
281 do_delete = false;
282 pending_inc = PGMap::Incremental();
283 pending_inc.version = pg_map.version + 1;
284 if (pg_map.version == 0) {
285 // pull initial values from first leader mon's config
286 pending_inc.full_ratio = g_conf->mon_osd_full_ratio;
287 if (pending_inc.full_ratio > 1.0)
288 pending_inc.full_ratio /= 100.0;
289 pending_inc.nearfull_ratio = g_conf->mon_osd_nearfull_ratio;
290 if (pending_inc.nearfull_ratio > 1.0)
291 pending_inc.nearfull_ratio /= 100.0;
292 } else {
293 pending_inc.full_ratio = pg_map.full_ratio;
294 pending_inc.nearfull_ratio = pg_map.nearfull_ratio;
295 }
296 dout(10) << "create_pending v " << pending_inc.version << dendl;
297 }
298
299 void PGMonitor::read_pgmap_meta()
300 {
301 dout(10) << __func__ << dendl;
302
303 string prefix = pgmap_meta_prefix;
304
305 version_t version = mon->store->get(prefix, "version");
306 epoch_t last_osdmap_epoch = mon->store->get(prefix, "last_osdmap_epoch");
307 epoch_t last_pg_scan = mon->store->get(prefix, "last_pg_scan");
308 pg_map.set_version(version);
309 pg_map.set_last_osdmap_epoch(last_osdmap_epoch);
310
311 if (last_pg_scan != pg_map.get_last_pg_scan()) {
312 pg_map.set_last_pg_scan(last_pg_scan);
313 }
314
315 float full_ratio, nearfull_ratio;
316 {
317 bufferlist bl;
318 mon->store->get(prefix, "full_ratio", bl);
319 bufferlist::iterator p = bl.begin();
320 ::decode(full_ratio, p);
321 }
322 {
323 bufferlist bl;
324 mon->store->get(prefix, "nearfull_ratio", bl);
325 bufferlist::iterator p = bl.begin();
326 ::decode(nearfull_ratio, p);
327 }
328 pg_map.set_full_ratios(full_ratio, nearfull_ratio);
329 {
330 bufferlist bl;
331 mon->store->get(prefix, "stamp", bl);
332 bufferlist::iterator p = bl.begin();
333 utime_t stamp;
334 ::decode(stamp, p);
335 pg_map.set_stamp(stamp);
336 }
337 }
338
339 void PGMonitor::read_pgmap_full()
340 {
341 read_pgmap_meta();
342
343 string prefix = pgmap_pg_prefix;
344 for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) {
345 string key = i->key();
346 pg_t pgid;
347 if (!pgid.parse(key.c_str())) {
348 dout(0) << "unable to parse key " << key << dendl;
349 continue;
350 }
351 bufferlist bl = i->value();
352 pg_map.update_pg(pgid, bl);
353 dout(20) << " got " << pgid << dendl;
354 }
355
356 prefix = pgmap_osd_prefix;
357 for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) {
358 string key = i->key();
359 int osd = atoi(key.c_str());
360 bufferlist bl = i->value();
361 pg_map.update_osd(osd, bl);
362 dout(20) << " got osd." << osd << dendl;
363 }
364 }
365
366 void PGMonitor::apply_pgmap_delta(bufferlist& bl)
367 {
368 version_t v = pg_map.version + 1;
369
370 utime_t inc_stamp;
371 bufferlist dirty_pgs, dirty_osds;
372 {
373 bufferlist::iterator p = bl.begin();
374 ::decode(inc_stamp, p);
375 ::decode(dirty_pgs, p);
376 ::decode(dirty_osds, p);
377 }
378
379 pool_stat_t pg_sum_old = pg_map.pg_sum;
380 mempool::pgmap::unordered_map<uint64_t, pool_stat_t> pg_pool_sum_old;
381
382 // pgs
383 set<int64_t> deleted_pools;
384 bufferlist::iterator p = dirty_pgs.begin();
385 while (!p.end()) {
386 pg_t pgid;
387 ::decode(pgid, p);
388
389 int r;
390 bufferlist pgbl;
391 if (deleted_pools.count(pgid.pool())) {
392 r = -ENOENT;
393 } else {
394 r = mon->store->get(pgmap_pg_prefix, stringify(pgid), pgbl);
395 if (pg_pool_sum_old.count(pgid.pool()) == 0)
396 pg_pool_sum_old[pgid.pool()] = pg_map.pg_pool_sum[pgid.pool()];
397 }
398
399 if (r >= 0) {
400 pg_map.update_pg(pgid, pgbl);
401 dout(20) << " refreshing pg " << pgid
402 << " " << pg_map.pg_stat[pgid].reported_epoch
403 << ":" << pg_map.pg_stat[pgid].reported_seq
404 << " " << pg_state_string(pg_map.pg_stat[pgid].state)
405 << dendl;
406 } else {
407 dout(20) << " removing pg " << pgid << dendl;
408 pg_map.remove_pg(pgid);
409 if (pgid.ps() == 0)
410 deleted_pools.insert(pgid.pool());
411 }
412 }
413
414 // osds
415 p = dirty_osds.begin();
416 while (!p.end()) {
417 int32_t osd;
418 ::decode(osd, p);
419 dout(20) << " refreshing osd." << osd << dendl;
420 bufferlist bl;
421 int r = mon->store->get(pgmap_osd_prefix, stringify(osd), bl);
422 if (r >= 0) {
423 pg_map.update_osd(osd, bl);
424 } else {
425 pg_map.remove_osd(osd);
426 }
427 }
428
429 pg_map.update_global_delta(g_ceph_context, inc_stamp, pg_sum_old);
430 pg_map.update_pool_deltas(g_ceph_context, inc_stamp, pg_pool_sum_old);
431
432 // clean up deleted pools after updating the deltas
433 for (set<int64_t>::iterator p = deleted_pools.begin();
434 p != deleted_pools.end();
435 ++p) {
436 dout(20) << " deleted pool " << *p << dendl;
437 pg_map.deleted_pool(*p);
438 }
439
440 // ok, we're now on the new version
441 pg_map.version = v;
442 }
443
444
445 void PGMonitor::encode_pending(MonitorDBStore::TransactionRef t)
446 {
447 if (did_delete)
448 return;
449
450 string prefix = pgmap_meta_prefix;
451 if (do_delete) {
452 dout(1) << __func__ << " clearing pgmap data at v" << pending_inc.version
453 << dendl;
454 do_delete = false;
455 for (auto key : { "version", "stamp", "last_osdmap_epoch",
456 "last_pg_scan", "full_ratio", "nearfull_ratio" }) {
457 t->erase(prefix, key);
458 }
459 for (auto& p : pg_map.pg_stat) {
460 t->erase(prefix, stringify(p.first));
461 }
462 for (auto& p : pg_map.osd_stat) {
463 t->erase(prefix, stringify(p.first));
464 }
465 put_last_committed(t, pending_inc.version);
466 put_value(t, "deleted", 1);
467 return;
468 }
469
470 assert(mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS ||
471 pending_inc.version == 1 /* rebuild-mondb.yaml case */);
472
473 version_t version = pending_inc.version;
474 dout(10) << __func__ << " v " << version << dendl;
475 assert(get_last_committed() + 1 == version);
476 pending_inc.stamp = ceph_clock_now();
477
478 uint64_t features = mon->get_quorum_con_features();
479
480 t->put(prefix, "version", pending_inc.version);
481 {
482 bufferlist bl;
483 ::encode(pending_inc.stamp, bl);
484 t->put(prefix, "stamp", bl);
485 }
486
487 if (pending_inc.osdmap_epoch)
488 t->put(prefix, "last_osdmap_epoch", pending_inc.osdmap_epoch);
489 if (pending_inc.pg_scan)
490 t->put(prefix, "last_pg_scan", pending_inc.pg_scan);
491 if (pending_inc.full_ratio > 0) {
492 bufferlist bl;
493 ::encode(pending_inc.full_ratio, bl);
494 t->put(prefix, "full_ratio", bl);
495 }
496 if (pending_inc.nearfull_ratio > 0) {
497 bufferlist bl;
498 ::encode(pending_inc.nearfull_ratio, bl);
499 t->put(prefix, "nearfull_ratio", bl);
500 }
501
502 bufferlist incbl;
503 ::encode(pending_inc.stamp, incbl);
504 {
505 bufferlist dirty;
506 string prefix = pgmap_pg_prefix;
507 for (map<pg_t,pg_stat_t>::const_iterator p = pending_inc.pg_stat_updates.begin();
508 p != pending_inc.pg_stat_updates.end();
509 ++p) {
510 ::encode(p->first, dirty);
511 bufferlist bl;
512 ::encode(p->second, bl, features);
513 t->put(prefix, stringify(p->first), bl);
514 }
515 for (set<pg_t>::const_iterator p = pending_inc.pg_remove.begin(); p != pending_inc.pg_remove.end(); ++p) {
516 ::encode(*p, dirty);
517 t->erase(prefix, stringify(*p));
518 }
519 ::encode(dirty, incbl);
520 }
521 {
522 bufferlist dirty;
523 string prefix = pgmap_osd_prefix;
524 for (map<int32_t,osd_stat_t>::const_iterator p =
525 pending_inc.get_osd_stat_updates().begin();
526 p != pending_inc.get_osd_stat_updates().end();
527 ++p) {
528 ::encode(p->first, dirty);
529 bufferlist bl;
530 ::encode(p->second, bl, features);
531 ::encode(pending_inc.get_osd_epochs().find(p->first)->second, bl);
532 t->put(prefix, stringify(p->first), bl);
533 }
534 for (set<int32_t>::const_iterator p =
535 pending_inc.get_osd_stat_rm().begin();
536 p != pending_inc.get_osd_stat_rm().end();
537 ++p) {
538 ::encode(*p, dirty);
539 t->erase(prefix, stringify(*p));
540 }
541 ::encode(dirty, incbl);
542 }
543
544 put_version(t, version, incbl);
545
546 put_last_committed(t, version);
547 }
548
549 version_t PGMonitor::get_trim_to()
550 {
551 unsigned max = g_conf->mon_max_pgmap_epochs;
552 version_t version = get_last_committed();
553 if (mon->is_leader() && (version > max))
554 return version - max;
555
556 return 0;
557 }
558
559 bool PGMonitor::preprocess_query(MonOpRequestRef op)
560 {
561 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
562 return false;
563 }
564
565 op->mark_pgmon_event(__func__);
566 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
567 dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
568 switch (m->get_type()) {
569 case MSG_PGSTATS:
570 return preprocess_pg_stats(op);
571
572 case MSG_MON_COMMAND:
573 return preprocess_command(op);
574
575
576 default:
577 ceph_abort();
578 return true;
579 }
580 }
581
582 bool PGMonitor::prepare_update(MonOpRequestRef op)
583 {
584 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
585 return false;
586 }
587
588 op->mark_pgmon_event(__func__);
589 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
590 dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
591 switch (m->get_type()) {
592 case MSG_PGSTATS:
593 return prepare_pg_stats(op);
594
595 case MSG_MON_COMMAND:
596 return prepare_command(op);
597
598 default:
599 ceph_abort();
600 return false;
601 }
602 }
603
604 bool PGMonitor::preprocess_pg_stats(MonOpRequestRef op)
605 {
606 op->mark_pgmon_event(__func__);
607 MPGStats *stats = static_cast<MPGStats*>(op->get_req());
608 // check caps
609 MonSession *session = stats->get_session();
610 if (!session) {
611 dout(10) << "PGMonitor::preprocess_pg_stats: no monitor session!" << dendl;
612 return true;
613 }
614 if (!session->is_capable("pg", MON_CAP_R)) {
615 derr << "PGMonitor::preprocess_pg_stats: MPGStats received from entity "
616 << "with insufficient privileges " << session->caps << dendl;
617 return true;
618 }
619
620 if (stats->fsid != mon->monmap->fsid) {
621 dout(0) << __func__ << " drop message on fsid " << stats->fsid << " != "
622 << mon->monmap->fsid << " for " << *stats << dendl;
623 return true;
624 }
625
626 // First, just see if they need a new osdmap. But
627 // only if they've had the map for a while.
628 if (stats->had_map_for > 30.0 &&
629 mon->osdmon()->is_readable() &&
630 stats->epoch < mon->osdmon()->osdmap.get_epoch() &&
631 !session->proxy_con)
632 mon->osdmon()->send_latest_now_nodelete(op, stats->epoch+1);
633
634 // Always forward the PGStats to the leader, even if they are the same as
635 // the old PGStats. The leader will mark as down osds that haven't sent
636 // PGStats for a few minutes.
637 return false;
638 }
639
640 bool PGMonitor::pg_stats_have_changed(int from, const MPGStats *stats) const
641 {
642 // any new osd info?
643 ceph::unordered_map<int,osd_stat_t>::const_iterator s = pg_map.osd_stat.find(from);
644 if (s == pg_map.osd_stat.end())
645 return true;
646
647 if (s->second != stats->osd_stat)
648 return true;
649
650 // any new pg info?
651 for (map<pg_t,pg_stat_t>::const_iterator p = stats->pg_stat.begin();
652 p != stats->pg_stat.end(); ++p) {
653 ceph::unordered_map<pg_t,pg_stat_t>::const_iterator t = pg_map.pg_stat.find(p->first);
654 if (t == pg_map.pg_stat.end())
655 return true;
656
657 if (t->second.reported_epoch != p->second.reported_epoch ||
658 t->second.reported_seq != p->second.reported_seq)
659 return true;
660 }
661
662 return false;
663 }
664
665 struct PGMonitor::C_Stats : public C_MonOp {
666 PGMonitor *pgmon;
667 MonOpRequestRef stats_op_ack;
668 entity_inst_t who;
669 C_Stats(PGMonitor *p,
670 MonOpRequestRef op,
671 MonOpRequestRef op_ack)
672 : C_MonOp(op), pgmon(p), stats_op_ack(op_ack) {}
673 void _finish(int r) override {
674 if (r >= 0) {
675 pgmon->_updated_stats(op, stats_op_ack);
676 } else if (r == -ECANCELED) {
677 return;
678 } else if (r == -EAGAIN) {
679 pgmon->dispatch(op);
680 } else {
681 assert(0 == "bad C_Stats return value");
682 }
683 }
684 };
685
686 bool PGMonitor::prepare_pg_stats(MonOpRequestRef op)
687 {
688 op->mark_pgmon_event(__func__);
689 MPGStats *stats = static_cast<MPGStats*>(op->get_req());
690 dout(10) << "prepare_pg_stats " << *stats << " from " << stats->get_orig_source() << dendl;
691 int from = stats->get_orig_source().num();
692
693 if (stats->fsid != mon->monmap->fsid) {
694 dout(0) << "prepare_pg_stats on fsid " << stats->fsid << " != " << mon->monmap->fsid << dendl;
695 return false;
696 }
697
698 if (!stats->get_orig_source().is_osd() ||
699 !mon->osdmon()->osdmap.is_up(from) ||
700 stats->get_orig_source_inst() != mon->osdmon()->osdmap.get_inst(from)) {
701 dout(1) << " ignoring stats from non-active osd." << dendl;
702 return false;
703 }
704
705 last_osd_report[from] = ceph_clock_now();
706
707 if (!pg_stats_have_changed(from, stats)) {
708 dout(10) << " message contains no new osd|pg stats" << dendl;
709 MPGStatsAck *ack = new MPGStatsAck;
710 ack->set_tid(stats->get_tid());
711 for (map<pg_t,pg_stat_t>::const_iterator p = stats->pg_stat.begin();
712 p != stats->pg_stat.end();
713 ++p) {
714 ack->pg_stat[p->first] = make_pair(p->second.reported_seq, p->second.reported_epoch);
715 }
716 mon->send_reply(op, ack);
717 return false;
718 }
719
720 // osd stat
721 if (mon->osdmon()->osdmap.is_in(from)) {
722 pending_inc.update_stat(from, stats->epoch, std::move(stats->osd_stat));
723 } else {
724 pending_inc.update_stat(from, stats->epoch, osd_stat_t());
725 }
726
727 if (pg_map.osd_stat.count(from))
728 dout(10) << " got osd." << from << " " << stats->osd_stat << " (was " << pg_map.osd_stat[from] << ")" << dendl;
729 else
730 dout(10) << " got osd." << from << " " << stats->osd_stat << " (first report)" << dendl;
731
732 // pg stats
733 MPGStatsAck *ack = new MPGStatsAck;
734 MonOpRequestRef ack_op = mon->op_tracker.create_request<MonOpRequest>(ack);
735 ack->set_tid(stats->get_tid());
736 for (map<pg_t,pg_stat_t>::iterator p = stats->pg_stat.begin();
737 p != stats->pg_stat.end();
738 ++p) {
739 pg_t pgid = p->first;
740 ack->pg_stat[pgid] = make_pair(p->second.reported_seq, p->second.reported_epoch);
741
742 if (pg_map.pg_stat.count(pgid) &&
743 pg_map.pg_stat[pgid].get_version_pair() > p->second.get_version_pair()) {
744 dout(15) << " had " << pgid << " from " << pg_map.pg_stat[pgid].reported_epoch << ":"
745 << pg_map.pg_stat[pgid].reported_seq << dendl;
746 continue;
747 }
748 if (pending_inc.pg_stat_updates.count(pgid) &&
749 pending_inc.pg_stat_updates[pgid].get_version_pair() > p->second.get_version_pair()) {
750 dout(15) << " had " << pgid << " from " << pending_inc.pg_stat_updates[pgid].reported_epoch << ":"
751 << pending_inc.pg_stat_updates[pgid].reported_seq << " (pending)" << dendl;
752 continue;
753 }
754
755 if (pg_map.pg_stat.count(pgid) == 0) {
756 dout(15) << " got " << pgid << " reported at " << p->second.reported_epoch << ":"
757 << p->second.reported_seq
758 << " state " << pg_state_string(p->second.state)
759 << " but DNE in pg_map; pool was probably deleted."
760 << dendl;
761 continue;
762 }
763
764 dout(15) << " got " << pgid
765 << " reported at " << p->second.reported_epoch << ":" << p->second.reported_seq
766 << " state " << pg_state_string(pg_map.pg_stat[pgid].state)
767 << " -> " << pg_state_string(p->second.state)
768 << dendl;
769 pending_inc.pg_stat_updates[pgid] = p->second;
770 }
771
772 wait_for_finished_proposal(op, new C_Stats(this, op, ack_op));
773 return true;
774 }
775
776 void PGMonitor::_updated_stats(MonOpRequestRef op, MonOpRequestRef ack_op)
777 {
778 op->mark_pgmon_event(__func__);
779 ack_op->mark_pgmon_event(__func__);
780 MPGStats *ack = static_cast<MPGStats*>(ack_op->get_req());
781 ack->get(); // MonOpRequestRef owns one ref; give the other to send_reply.
782 dout(7) << "_updated_stats for "
783 << op->get_req()->get_orig_source_inst() << dendl;
784 mon->send_reply(op, ack);
785 }
786
787
788 // ------------------------
789
790 struct RetryCheckOSDMap : public Context {
791 PGMonitor *pgmon;
792 epoch_t epoch;
793 RetryCheckOSDMap(PGMonitor *p, epoch_t e) : pgmon(p), epoch(e) {
794 }
795 void finish(int r) override {
796 if (r == -ECANCELED)
797 return;
798
799 pgmon->check_osd_map(epoch);
800 }
801 };
802
803 void PGMonitor::check_osd_map(epoch_t epoch)
804 {
805 if (mon->is_peon())
806 return; // whatever.
807
808 if (did_delete)
809 return;
810
811 if (pg_map.last_osdmap_epoch >= epoch) {
812 dout(10) << __func__ << " already seen " << pg_map.last_osdmap_epoch
813 << " >= " << epoch << dendl;
814 return;
815 }
816
817 if (!mon->osdmon()->is_readable()) {
818 dout(10) << __func__ << " -- osdmap not readable, waiting" << dendl;
819 mon->osdmon()->wait_for_readable_ctx(new RetryCheckOSDMap(this, epoch));
820 return;
821 }
822
823 if (!is_writeable()) {
824 dout(10) << __func__ << " -- pgmap not writeable, waiting" << dendl;
825 wait_for_writeable_ctx(new RetryCheckOSDMap(this, epoch));
826 return;
827 }
828
829 const OSDMap& osdmap = mon->osdmon()->osdmap;
830 if (!did_delete && osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
831 // delete all my data
832 dout(1) << __func__ << " will clear pg_map data" << dendl;
833 do_delete = true;
834 propose_pending();
835 return;
836 }
837
838 // osds that went up or down
839 set<int> need_check_down_pg_osds;
840
841 // apply latest map(s)
842 epoch = std::max(epoch, osdmap.get_epoch());
843 for (epoch_t e = pg_map.last_osdmap_epoch+1;
844 e <= epoch;
845 e++) {
846 dout(10) << __func__ << " applying osdmap e" << e << " to pg_map" << dendl;
847 bufferlist bl;
848 int err = mon->osdmon()->get_version(e, bl);
849 assert(err == 0);
850
851 assert(bl.length());
852 OSDMap::Incremental inc(bl);
853
854 PGMapUpdater::check_osd_map(inc, &need_check_down_pg_osds,
855 &last_osd_report, &pg_map, &pending_inc);
856 }
857
858 assert(pg_map.last_osdmap_epoch < epoch);
859 pending_inc.osdmap_epoch = epoch;
860 PGMapUpdater::update_creating_pgs(osdmap, pg_map, &pending_inc);
861 PGMapUpdater::register_new_pgs(osdmap, pg_map, &pending_inc);
862
863 PGMapUpdater::check_down_pgs(osdmap, pg_map, check_all_pgs,
864 need_check_down_pg_osds, &pending_inc);
865 check_all_pgs = false;
866
867 propose_pending();
868 }
869
870 epoch_t PGMonitor::send_pg_creates(int osd, Connection *con, epoch_t next)
871 {
872 dout(30) << __func__ << " " << pg_map.creating_pgs_by_osd_epoch << dendl;
873 map<int, map<epoch_t, set<pg_t> > >::iterator p =
874 pg_map.creating_pgs_by_osd_epoch.find(osd);
875 if (p == pg_map.creating_pgs_by_osd_epoch.end())
876 return next;
877
878 assert(p->second.size() > 0);
879
880 MOSDPGCreate *m = NULL;
881 epoch_t last = 0;
882 for (map<epoch_t, set<pg_t> >::iterator q = p->second.lower_bound(next);
883 q != p->second.end();
884 ++q) {
885 dout(20) << __func__ << " osd." << osd << " from " << next
886 << " : epoch " << q->first << " " << q->second.size() << " pgs"
887 << dendl;
888 last = q->first;
889 for (set<pg_t>::iterator r = q->second.begin(); r != q->second.end(); ++r) {
890 pg_stat_t &st = pg_map.pg_stat[*r];
891 if (!m)
892 m = new MOSDPGCreate(pg_map.last_osdmap_epoch);
893 m->mkpg[*r] = pg_create_t(st.created,
894 st.parent,
895 st.parent_split_bits);
896 // Need the create time from the monitor using its clock to set
897 // last_scrub_stamp upon pg creation.
898 m->ctimes[*r] = pg_map.pg_stat[*r].last_scrub_stamp;
899 }
900 }
901 if (!m) {
902 dout(20) << "send_pg_creates osd." << osd << " from " << next
903 << " has nothing to send" << dendl;
904 return next;
905 }
906
907 con->send_message(m);
908
909 // sub is current through last + 1
910 return last + 1;
911 }
912
913 bool PGMonitor::preprocess_command(MonOpRequestRef op)
914 {
915 op->mark_pgmon_event(__func__);
916 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
917 int r = -1;
918 bufferlist rdata;
919 stringstream ss, ds;
920
921 if (m->fsid != mon->monmap->fsid) {
922 dout(0) << __func__ << " drop message on fsid " << m->fsid << " != "
923 << mon->monmap->fsid << " for " << *m << dendl;
924 return true;
925 }
926
927 map<string, cmd_vartype> cmdmap;
928 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
929 // ss has reason for failure
930 string rs = ss.str();
931 mon->reply_command(op, -EINVAL, rs, rdata, get_last_committed());
932 return true;
933 }
934
935 string prefix;
936 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
937
938 MonSession *session = m->get_session();
939 if (!session) {
940 mon->reply_command(op, -EACCES, "access denied", rdata, get_last_committed());
941 return true;
942 }
943
944 string format;
945 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
946 boost::scoped_ptr<Formatter> f(Formatter::create(format));
947
948 if (prefix == "pg scrub" ||
949 prefix == "pg repair" ||
950 prefix == "pg deep-scrub") {
951 string scrubop = prefix.substr(3, string::npos);
952 pg_t pgid;
953 string pgidstr;
954 cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr);
955 if (!pgid.parse(pgidstr.c_str())) {
956 ss << "invalid pgid '" << pgidstr << "'";
957 r = -EINVAL;
958 goto reply;
959 }
960 if (!pg_map.pg_stat.count(pgid)) {
961 ss << "pg " << pgid << " dne";
962 r = -ENOENT;
963 goto reply;
964 }
965 int osd = pg_map.pg_stat[pgid].acting_primary;
966 if (osd == -1) {
967 ss << "pg " << pgid << " has no primary osd";
968 r = -EAGAIN;
969 goto reply;
970 }
971 if (!mon->osdmon()->osdmap.is_up(osd)) {
972 ss << "pg " << pgid << " primary osd." << osd << " not up";
973 r = -EAGAIN;
974 goto reply;
975 }
976 vector<pg_t> pgs(1);
977 pgs[0] = pgid;
978 mon->try_send_message(new MOSDScrub(mon->monmap->fsid, pgs,
979 scrubop == "repair",
980 scrubop == "deep-scrub"),
981 mon->osdmon()->osdmap.get_inst(osd));
982 ss << "instructing pg " << pgid << " on osd." << osd << " to " << scrubop;
983 r = 0;
984 } else {
985 r = process_pg_map_command(prefix, cmdmap, pg_map, mon->osdmon()->osdmap,
986 f.get(), &ss, &rdata);
987 }
988
989 if (r == -EOPNOTSUPP)
990 return false;
991
992 reply:
993 string rs;
994 getline(ss, rs);
995 rdata.append(ds);
996 mon->reply_command(op, r, rs, rdata, get_last_committed());
997 return true;
998 }
999
1000 bool PGMonitor::prepare_command(MonOpRequestRef op)
1001 {
1002 op->mark_pgmon_event(__func__);
1003 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
1004 if (m->fsid != mon->monmap->fsid) {
1005 dout(0) << __func__ << " drop message on fsid " << m->fsid << " != "
1006 << mon->monmap->fsid << " for " << *m << dendl;
1007 return true;
1008 }
1009 stringstream ss;
1010 pg_t pgid;
1011 epoch_t epoch = mon->osdmon()->osdmap.get_epoch();
1012 int r = 0;
1013 string rs;
1014
1015 map<string, cmd_vartype> cmdmap;
1016 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
1017 // ss has reason for failure
1018 string rs = ss.str();
1019 mon->reply_command(op, -EINVAL, rs, get_last_committed());
1020 return true;
1021 }
1022
1023 string prefix;
1024 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
1025
1026 MonSession *session = m->get_session();
1027 if (!session) {
1028 mon->reply_command(op, -EACCES, "access denied", get_last_committed());
1029 return true;
1030 }
1031
1032 if (prefix == "pg force_create_pg") {
1033 string pgidstr;
1034 cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr);
1035 if (!pgid.parse(pgidstr.c_str())) {
1036 ss << "pg " << pgidstr << " invalid";
1037 r = -EINVAL;
1038 goto reply;
1039 }
1040 if (!pg_map.pg_stat.count(pgid)) {
1041 ss << "pg " << pgid << " dne";
1042 r = -ENOENT;
1043 goto reply;
1044 }
1045 if (pg_map.creating_pgs.count(pgid)) {
1046 ss << "pg " << pgid << " already creating";
1047 r = 0;
1048 goto reply;
1049 }
1050 {
1051 PGMapUpdater::register_pg(
1052 mon->osdmon()->osdmap,
1053 pgid,
1054 epoch,
1055 true,
1056 pg_map,
1057 &pending_inc);
1058 }
1059 ss << "pg " << pgidstr << " now creating, ok";
1060 goto update;
1061 } else if (prefix == "pg set_full_ratio" ||
1062 prefix == "pg set_nearfull_ratio") {
1063 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
1064 ss << "please use the new luminous interfaces"
1065 << " ('osd set-full-ratio' and 'osd set-nearfull-ratio')";
1066 r = -EPERM;
1067 goto reply;
1068 }
1069 double n;
1070 if (!cmd_getval(g_ceph_context, cmdmap, "ratio", n)) {
1071 ss << "unable to parse 'ratio' value '"
1072 << cmd_vartype_stringify(cmdmap["who"]) << "'";
1073 r = -EINVAL;
1074 goto reply;
1075 }
1076 string op = prefix.substr(3, string::npos);
1077 if (op == "set_full_ratio")
1078 pending_inc.full_ratio = n;
1079 else if (op == "set_nearfull_ratio")
1080 pending_inc.nearfull_ratio = n;
1081 goto update;
1082 } else {
1083 r = -EINVAL;
1084 goto reply;
1085 }
1086
1087 reply:
1088 getline(ss, rs);
1089 if (r < 0 && rs.length() == 0)
1090 rs = cpp_strerror(r);
1091 mon->reply_command(op, r, rs, get_last_committed());
1092 return false;
1093
1094 update:
1095 getline(ss, rs);
1096 wait_for_finished_proposal(op, new Monitor::C_Command(
1097 mon, op, r, rs, get_last_committed() + 1));
1098 return true;
1099 }
1100
1101 void PGMonitor::get_health(list<pair<health_status_t,string> >& summary,
1102 list<pair<health_status_t,string> > *detail,
1103 CephContext *cct) const
1104 {
1105 // legacy pre-luminous full/nearfull
1106 if (mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
1107 check_full_osd_health(summary, detail, pg_map.full_osds, "full",
1108 HEALTH_ERR);
1109 check_full_osd_health(summary, detail, pg_map.nearfull_osds, "near full",
1110 HEALTH_WARN);
1111 pg_map.get_health(cct, mon->osdmon()->osdmap, summary, detail);
1112 }
1113 }
1114
1115 void PGMonitor::check_full_osd_health(list<pair<health_status_t,string> >& summary,
1116 list<pair<health_status_t,string> > *detail,
1117 const mempool::pgmap::set<int>& s, const char *desc,
1118 health_status_t sev) const
1119 {
1120 if (!s.empty()) {
1121 ostringstream ss;
1122 ss << s.size() << " " << desc << " osd(s)";
1123 summary.push_back(make_pair(sev, ss.str()));
1124 if (detail) {
1125 for (set<int>::const_iterator p = s.begin(); p != s.end(); ++p) {
1126 ostringstream ss;
1127 const osd_stat_t& os = pg_map.osd_stat.find(*p)->second;
1128 int ratio = (int)(((float)os.kb_used) / (float) os.kb * 100.0);
1129 ss << "osd." << *p << " is " << desc << " at " << ratio << "%";
1130 detail->push_back(make_pair(sev, ss.str()));
1131 }
1132 }
1133 }
1134 }
1135
1136 void PGMonitor::check_subs()
1137 {
1138 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
1139 return;
1140 }
1141
1142 dout(10) << __func__ << dendl;
1143 const string type = "osd_pg_creates";
1144
1145 mon->with_session_map([this, &type](const MonSessionMap& session_map) {
1146 if (mon->session_map.subs.count(type) == 0)
1147 return;
1148
1149 auto p = mon->session_map.subs[type]->begin();
1150 while (!p.end()) {
1151 Subscription *sub = *p;
1152 ++p;
1153 dout(20) << __func__ << " .. " << sub->session->inst << dendl;
1154 check_sub(sub);
1155 }
1156 });
1157 }
1158
1159 bool PGMonitor::check_sub(Subscription *sub)
1160 {
1161 OSDMap& osdmap = mon->osdmon()->osdmap;
1162 if (sub->type == "osd_pg_creates") {
1163 // only send these if the OSD is up. we will check_subs() when they do
1164 // come up so they will get the creates then.
1165 if (sub->session->inst.name.is_osd() &&
1166 osdmap.is_up(sub->session->inst.name.num())) {
1167 sub->next = send_pg_creates(sub->session->inst.name.num(),
1168 sub->session->con.get(),
1169 sub->next);
1170 }
1171 }
1172 return true;
1173 }
1174
1175 class PGMonStatService : public MonPGStatService, public PGMapStatService {
1176 PGMonitor *pgmon;
1177 public:
1178 PGMonStatService(const PGMap& o, PGMonitor *pgm)
1179 : MonPGStatService(), PGMapStatService(o), pgmon(pgm) {}
1180
1181
1182 bool is_readable() const override { return pgmon->is_readable(); }
1183
1184 unsigned maybe_add_creating_pgs(epoch_t scan_epoch,
1185 const mempool::osdmap::map<int64_t,pg_pool_t>& pools,
1186 creating_pgs_t *pending_creates) const override
1187 {
1188 if (pgmap.last_pg_scan < scan_epoch) {
1189 return 0;
1190 }
1191 unsigned added = 0;
1192 for (auto& pgid : pgmap.creating_pgs) {
1193 if (!pools.count(pgid.pool())) {
1194 continue;
1195 }
1196 auto st = pgmap.pg_stat.find(pgid);
1197 assert(st != pgmap.pg_stat.end());
1198 auto created = make_pair(st->second.created,
1199 st->second.last_scrub_stamp);
1200 // no need to add the pg, if it already exists in creating_pgs
1201 if (pending_creates->pgs.emplace(pgid, created).second) {
1202 added++;
1203 }
1204 }
1205 return added;
1206 }
1207 void maybe_trim_creating_pgs(creating_pgs_t *creates) const override {
1208 auto p = creates->pgs.begin();
1209 while (p != creates->pgs.end()) {
1210 auto q = pgmap.pg_stat.find(p->first);
1211 if (q != pgmap.pg_stat.end() &&
1212 !(q->second.state & PG_STATE_CREATING)) {
1213 p = creates->pgs.erase(p);
1214 creates->created_pools.insert(q->first.pool());
1215 } else {
1216 ++p;
1217 }
1218 }
1219 }
1220 void dump_info(Formatter *f) const override {
1221 f->dump_object("pgmap", pgmap);
1222 f->dump_unsigned("pgmap_first_committed", pgmon->get_first_committed());
1223 f->dump_unsigned("pgmap_last_committed", pgmon->get_last_committed());
1224 }
1225 int process_pg_command(const string& prefix,
1226 const map<string,cmd_vartype>& cmdmap,
1227 const OSDMap& osdmap,
1228 Formatter *f,
1229 stringstream *ss,
1230 bufferlist *odata) const override {
1231 return process_pg_map_command(prefix, cmdmap, pgmap, osdmap, f, ss, odata);
1232 }
1233
1234 int reweight_by_utilization(const OSDMap &osd_map,
1235 int oload,
1236 double max_changef,
1237 int max_osds,
1238 bool by_pg, const set<int64_t> *pools,
1239 bool no_increasing,
1240 mempool::osdmap::map<int32_t, uint32_t>* new_weights,
1241 std::stringstream *ss,
1242 std::string *out_str,
1243 Formatter *f) const override {
1244 return reweight::by_utilization(osd_map, pgmap, oload, max_changef,
1245 max_osds, by_pg, pools, no_increasing,
1246 new_weights, ss, out_str, f);
1247 }
1248 };
1249
1250 MonPGStatService *PGMonitor::get_pg_stat_service()
1251 {
1252 if (!pgservice) {
1253 pgservice.reset(new PGMonStatService(pg_map, this));
1254 }
1255 return pgservice.get();
1256 }
1257
1258 PGMonitor::PGMonitor(Monitor *mn, Paxos *p, const string& service_name)
1259 : PaxosService(mn, p, service_name),
1260 pgmap_meta_prefix("pgmap_meta"),
1261 pgmap_pg_prefix("pgmap_pg"),
1262 pgmap_osd_prefix("pgmap_osd")
1263 {}
1264
1265 PGMonitor::~PGMonitor() = default;