]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/PGMonitor.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mon / PGMonitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include "json_spirit/json_spirit.h"
17 #include "common/debug.h" // undo damage
18 #include "PGMonitor.h"
19 #include "Monitor.h"
20 #include "OSDMonitor.h"
21 #include "MonitorDBStore.h"
22 #include "PGStatService.h"
23
24 #include "messages/MPGStats.h"
25 #include "messages/MPGStatsAck.h"
26
27 #include "messages/MOSDPGCreate.h"
28 #include "messages/MMonCommand.h"
29 #include "messages/MOSDScrub.h"
30
31 #include "common/Formatter.h"
32 #include "common/config.h"
33
34 #include "include/stringify.h"
35
36 #include "osd/osd_types.h"
37
38 #include "common/config.h"
39 #include "common/errno.h"
40 #include "common/strtol.h"
41 #include "include/str_list.h"
42 #include <sstream>
43
44 #define dout_subsys ceph_subsys_mon
45 #undef dout_prefix
46 #define dout_prefix _prefix(_dout, mon, pg_map)
47 static ostream& _prefix(std::ostream *_dout, const Monitor *mon, const PGMap& pg_map) {
48 return *_dout << "mon." << mon->name << "@" << mon->rank
49 << "(" << mon->get_state_name()
50 << ").pg v" << pg_map.version << " ";
51 }
52
53 /*
54 Tick function to update the map based on performance every N seconds
55 */
56
57 void PGMonitor::on_restart()
58 {
59 // clear leader state
60 last_osd_report.clear();
61 }
62
63 void PGMonitor::on_active()
64 {
65 if (mon->is_leader()) {
66 check_all_pgs = true;
67 check_osd_map(mon->osdmon()->osdmap.get_epoch());
68 }
69
70 update_logger();
71
72 if (mon->is_leader() &&
73 mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
74 mon->clog->info() << "pgmap " << pg_map;
75 }
76 }
77
78 void PGMonitor::update_logger()
79 {
80 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
81 return;
82 }
83 dout(10) << "update_logger" << dendl;
84
85 mon->cluster_logger->set(l_cluster_osd_bytes, pg_map.osd_sum.kb * 1024ull);
86 mon->cluster_logger->set(l_cluster_osd_bytes_used,
87 pg_map.osd_sum.kb_used * 1024ull);
88 mon->cluster_logger->set(l_cluster_osd_bytes_avail,
89 pg_map.osd_sum.kb_avail * 1024ull);
90
91 mon->cluster_logger->set(l_cluster_num_pool, pg_map.pg_pool_sum.size());
92 mon->cluster_logger->set(l_cluster_num_pg, pg_map.pg_stat.size());
93
94 unsigned active = 0, active_clean = 0, peering = 0;
95 for (ceph::unordered_map<int,int>::iterator p = pg_map.num_pg_by_state.begin();
96 p != pg_map.num_pg_by_state.end();
97 ++p) {
98 if (p->first & PG_STATE_ACTIVE) {
99 active += p->second;
100 if (p->first & PG_STATE_CLEAN)
101 active_clean += p->second;
102 }
103 if (p->first & PG_STATE_PEERING)
104 peering += p->second;
105 }
106 mon->cluster_logger->set(l_cluster_num_pg_active_clean, active_clean);
107 mon->cluster_logger->set(l_cluster_num_pg_active, active);
108 mon->cluster_logger->set(l_cluster_num_pg_peering, peering);
109
110 mon->cluster_logger->set(l_cluster_num_object, pg_map.pg_sum.stats.sum.num_objects);
111 mon->cluster_logger->set(l_cluster_num_object_degraded, pg_map.pg_sum.stats.sum.num_objects_degraded);
112 mon->cluster_logger->set(l_cluster_num_object_misplaced, pg_map.pg_sum.stats.sum.num_objects_misplaced);
113 mon->cluster_logger->set(l_cluster_num_object_unfound, pg_map.pg_sum.stats.sum.num_objects_unfound);
114 mon->cluster_logger->set(l_cluster_num_bytes, pg_map.pg_sum.stats.sum.num_bytes);
115 }
116
117 void PGMonitor::tick()
118 {
119 if (!is_active()) return;
120 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
121 return;
122 }
123
124 handle_osd_timeouts();
125
126 if (!pg_map.pg_sum_deltas.empty()) {
127 utime_t age = ceph_clock_now() - pg_map.stamp;
128 if (age > 2 * g_conf->mon_delta_reset_interval) {
129 dout(10) << " clearing pg_map delta (" << age << " > " << g_conf->mon_delta_reset_interval << " seconds old)" << dendl;
130 pg_map.clear_delta();
131 }
132 }
133
134 /* If we have deltas for pools, run through pgmap's 'per_pool_sum_delta' and
135 * clear any deltas that are old enough.
136 *
137 * Note that 'per_pool_sum_delta' keeps a pool id as key, and a pair containing
138 * the calc'ed stats delta and an absolute timestamp from when those stats were
139 * obtained -- the timestamp IS NOT a delta itself.
140 */
141 if (!pg_map.per_pool_sum_deltas.empty()) {
142 ceph::unordered_map<uint64_t,pair<pool_stat_t,utime_t> >::iterator it;
143 for (it = pg_map.per_pool_sum_delta.begin();
144 it != pg_map.per_pool_sum_delta.end(); ) {
145 utime_t age = ceph_clock_now() - it->second.second;
146 if (age > 2*g_conf->mon_delta_reset_interval) {
147 dout(10) << " clearing pg_map delta for pool " << it->first
148 << " (" << age << " > " << g_conf->mon_delta_reset_interval
149 << " seconds old)" << dendl;
150 pg_map.per_pool_sum_deltas.erase(it->first);
151 pg_map.per_pool_sum_deltas_stamps.erase(it->first);
152 pg_map.per_pool_sum_delta.erase((it++)->first);
153 } else {
154 ++it;
155 }
156 }
157 }
158
159 dout(10) << pg_map << dendl;
160 }
161
162 void PGMonitor::create_initial()
163 {
164 dout(10) << "create_initial -- creating initial map" << dendl;
165 format_version = 1;
166 }
167
168 void PGMonitor::update_from_paxos(bool *need_bootstrap)
169 {
170 if (did_delete)
171 return;
172
173 if (get_value("deleted")) {
174 did_delete = true;
175 dout(10) << __func__ << " deleted, clearing in-memory PGMap" << dendl;
176 pg_map = PGMap();
177 pending_inc = PGMap::Incremental();
178 pgservice.reset();
179 last_osd_report.clear();
180 return;
181 }
182
183 version_t version = get_last_committed();
184 if (version == pg_map.version)
185 return;
186
187 assert(version >= pg_map.version);
188 if (format_version < 1) {
189 derr << __func__ << "unsupported monitor protocol: "
190 << get_service_name() << ".format_version = "
191 << format_version << dendl;
192 }
193 assert(format_version >= 1);
194
195 // pg/osd keys in leveldb
196 // read meta
197 while (version > pg_map.version) {
198 // load full state?
199 if (pg_map.version == 0) {
200 dout(10) << __func__ << " v0, read_full" << dendl;
201 read_pgmap_full();
202 goto out;
203 }
204
205 // incremental state?
206 dout(10) << __func__ << " read_incremental" << dendl;
207 bufferlist bl;
208 int r = get_version(pg_map.version + 1, bl);
209 if (r == -ENOENT) {
210 dout(10) << __func__ << " failed to read_incremental, read_full" << dendl;
211 // reset pg map
212 pg_map = PGMap();
213 read_pgmap_full();
214 goto out;
215 }
216 assert(r == 0);
217 apply_pgmap_delta(bl);
218 }
219
220 read_pgmap_meta();
221
222 out:
223 assert(version == pg_map.version);
224
225 update_logger();
226 }
227
228 void PGMonitor::on_upgrade()
229 {
230 dout(1) << __func__ << " discarding in-core PGMap" << dendl;
231 pg_map = PGMap();
232 }
233
234 void PGMonitor::upgrade_format()
235 {
236 unsigned current = 1;
237 assert(format_version == current);
238 }
239
240 void PGMonitor::post_paxos_update()
241 {
242 if (did_delete)
243 return;
244 dout(10) << __func__ << dendl;
245 OSDMap& osdmap = mon->osdmon()->osdmap;
246 if (mon->monmap->get_required_features().contains_all(
247 ceph::features::mon::FEATURE_LUMINOUS)) {
248 // let OSDMonitor take care of the pg-creates subscriptions.
249 return;
250 }
251 if (osdmap.get_epoch()) {
252 if (osdmap.get_num_up_osds() > 0) {
253 assert(osdmap.get_up_osd_features() & CEPH_FEATURE_MON_STATEFUL_SUB);
254 check_subs();
255 }
256 }
257 }
258
259 void PGMonitor::handle_osd_timeouts()
260 {
261 if (!mon->is_leader())
262 return;
263 if (did_delete)
264 return;
265
266 utime_t now(ceph_clock_now());
267 utime_t timeo(g_conf->mon_osd_report_timeout, 0);
268 if (now - mon->get_leader_since() < timeo) {
269 // We haven't been the leader for long enough to consider OSD timeouts
270 return;
271 }
272
273 if (mon->osdmon()->is_writeable())
274 mon->osdmon()->handle_osd_timeouts(now, last_osd_report);
275 }
276
277 void PGMonitor::create_pending()
278 {
279 if (did_delete)
280 return;
281 do_delete = false;
282 pending_inc = PGMap::Incremental();
283 pending_inc.version = pg_map.version + 1;
284 if (pg_map.version == 0) {
285 // pull initial values from first leader mon's config
286 pending_inc.full_ratio = g_conf->mon_osd_full_ratio;
287 if (pending_inc.full_ratio > 1.0)
288 pending_inc.full_ratio /= 100.0;
289 pending_inc.nearfull_ratio = g_conf->mon_osd_nearfull_ratio;
290 if (pending_inc.nearfull_ratio > 1.0)
291 pending_inc.nearfull_ratio /= 100.0;
292 } else {
293 pending_inc.full_ratio = pg_map.full_ratio;
294 pending_inc.nearfull_ratio = pg_map.nearfull_ratio;
295 }
296 dout(10) << "create_pending v " << pending_inc.version << dendl;
297 }
298
299 void PGMonitor::read_pgmap_meta()
300 {
301 dout(10) << __func__ << dendl;
302
303 string prefix = pgmap_meta_prefix;
304
305 version_t version = mon->store->get(prefix, "version");
306 epoch_t last_osdmap_epoch = mon->store->get(prefix, "last_osdmap_epoch");
307 epoch_t last_pg_scan = mon->store->get(prefix, "last_pg_scan");
308 pg_map.set_version(version);
309 pg_map.set_last_osdmap_epoch(last_osdmap_epoch);
310
311 if (last_pg_scan != pg_map.get_last_pg_scan()) {
312 pg_map.set_last_pg_scan(last_pg_scan);
313 }
314
315 float full_ratio, nearfull_ratio;
316 {
317 bufferlist bl;
318 mon->store->get(prefix, "full_ratio", bl);
319 bufferlist::iterator p = bl.begin();
320 ::decode(full_ratio, p);
321 }
322 {
323 bufferlist bl;
324 mon->store->get(prefix, "nearfull_ratio", bl);
325 bufferlist::iterator p = bl.begin();
326 ::decode(nearfull_ratio, p);
327 }
328 pg_map.set_full_ratios(full_ratio, nearfull_ratio);
329 {
330 bufferlist bl;
331 mon->store->get(prefix, "stamp", bl);
332 bufferlist::iterator p = bl.begin();
333 utime_t stamp;
334 ::decode(stamp, p);
335 pg_map.set_stamp(stamp);
336 }
337 }
338
339 void PGMonitor::read_pgmap_full()
340 {
341 read_pgmap_meta();
342
343 string prefix = pgmap_pg_prefix;
344 for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) {
345 string key = i->key();
346 pg_t pgid;
347 if (!pgid.parse(key.c_str())) {
348 dout(0) << "unable to parse key " << key << dendl;
349 continue;
350 }
351 bufferlist bl = i->value();
352 pg_map.update_pg(pgid, bl);
353 dout(20) << " got " << pgid << dendl;
354 }
355
356 prefix = pgmap_osd_prefix;
357 for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) {
358 string key = i->key();
359 int osd = atoi(key.c_str());
360 bufferlist bl = i->value();
361 pg_map.update_osd(osd, bl);
362 dout(20) << " got osd." << osd << dendl;
363 }
364 }
365
366 void PGMonitor::apply_pgmap_delta(bufferlist& bl)
367 {
368 version_t v = pg_map.version + 1;
369
370 utime_t inc_stamp;
371 bufferlist dirty_pgs, dirty_osds;
372 {
373 bufferlist::iterator p = bl.begin();
374 ::decode(inc_stamp, p);
375 ::decode(dirty_pgs, p);
376 ::decode(dirty_osds, p);
377 }
378
379 pool_stat_t pg_sum_old = pg_map.pg_sum;
380 mempool::pgmap::unordered_map<uint64_t, pool_stat_t> pg_pool_sum_old;
381
382 // pgs
383 set<int64_t> deleted_pools;
384 bufferlist::iterator p = dirty_pgs.begin();
385 while (!p.end()) {
386 pg_t pgid;
387 ::decode(pgid, p);
388
389 int r;
390 bufferlist pgbl;
391 if (deleted_pools.count(pgid.pool())) {
392 r = -ENOENT;
393 } else {
394 r = mon->store->get(pgmap_pg_prefix, stringify(pgid), pgbl);
395 if (pg_pool_sum_old.count(pgid.pool()) == 0)
396 pg_pool_sum_old[pgid.pool()] = pg_map.pg_pool_sum[pgid.pool()];
397 }
398
399 if (r >= 0) {
400 pg_map.update_pg(pgid, pgbl);
401 dout(20) << " refreshing pg " << pgid
402 << " " << pg_map.pg_stat[pgid].reported_epoch
403 << ":" << pg_map.pg_stat[pgid].reported_seq
404 << " " << pg_state_string(pg_map.pg_stat[pgid].state)
405 << dendl;
406 } else {
407 dout(20) << " removing pg " << pgid << dendl;
408 pg_map.remove_pg(pgid);
409 if (pgid.ps() == 0)
410 deleted_pools.insert(pgid.pool());
411 }
412 }
413
414 // osds
415 p = dirty_osds.begin();
416 while (!p.end()) {
417 int32_t osd;
418 ::decode(osd, p);
419 dout(20) << " refreshing osd." << osd << dendl;
420 bufferlist bl;
421 int r = mon->store->get(pgmap_osd_prefix, stringify(osd), bl);
422 if (r >= 0) {
423 pg_map.update_osd(osd, bl);
424 } else {
425 pg_map.remove_osd(osd);
426 }
427 }
428
429 pg_map.update_global_delta(g_ceph_context, inc_stamp, pg_sum_old);
430 pg_map.update_pool_deltas(g_ceph_context, inc_stamp, pg_pool_sum_old);
431
432 // clean up deleted pools after updating the deltas
433 for (set<int64_t>::iterator p = deleted_pools.begin();
434 p != deleted_pools.end();
435 ++p) {
436 dout(20) << " deleted pool " << *p << dendl;
437 pg_map.deleted_pool(*p);
438 }
439
440 // ok, we're now on the new version
441 pg_map.version = v;
442 }
443
444
445 void PGMonitor::encode_pending(MonitorDBStore::TransactionRef t)
446 {
447 if (did_delete)
448 return;
449
450 string prefix = pgmap_meta_prefix;
451 if (do_delete) {
452 dout(1) << __func__ << " clearing pgmap data at v" << pending_inc.version
453 << dendl;
454 do_delete = false;
455 for (auto key : { "version", "stamp", "last_osdmap_epoch",
456 "last_pg_scan", "full_ratio", "nearfull_ratio" }) {
457 t->erase(prefix, key);
458 }
459 for (auto& p : pg_map.pg_stat) {
460 t->erase(prefix, stringify(p.first));
461 }
462 for (auto& p : pg_map.osd_stat) {
463 t->erase(prefix, stringify(p.first));
464 }
465 put_last_committed(t, pending_inc.version);
466 put_value(t, "deleted", 1);
467 return;
468 }
469
470 assert(mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS ||
471 pending_inc.version == 1 /* rebuild-mondb.yaml case */);
472
473 version_t version = pending_inc.version;
474 dout(10) << __func__ << " v " << version << dendl;
475 assert(get_last_committed() + 1 == version);
476 pending_inc.stamp = ceph_clock_now();
477
478 uint64_t features = mon->get_quorum_con_features();
479
480 t->put(prefix, "version", pending_inc.version);
481 {
482 bufferlist bl;
483 ::encode(pending_inc.stamp, bl);
484 t->put(prefix, "stamp", bl);
485 }
486
487 if (pending_inc.osdmap_epoch)
488 t->put(prefix, "last_osdmap_epoch", pending_inc.osdmap_epoch);
489 if (pending_inc.pg_scan)
490 t->put(prefix, "last_pg_scan", pending_inc.pg_scan);
491 if (pending_inc.full_ratio > 0) {
492 bufferlist bl;
493 ::encode(pending_inc.full_ratio, bl);
494 t->put(prefix, "full_ratio", bl);
495 }
496 if (pending_inc.nearfull_ratio > 0) {
497 bufferlist bl;
498 ::encode(pending_inc.nearfull_ratio, bl);
499 t->put(prefix, "nearfull_ratio", bl);
500 }
501
502 bufferlist incbl;
503 ::encode(pending_inc.stamp, incbl);
504 {
505 bufferlist dirty;
506 string prefix = pgmap_pg_prefix;
507 for (map<pg_t,pg_stat_t>::const_iterator p = pending_inc.pg_stat_updates.begin();
508 p != pending_inc.pg_stat_updates.end();
509 ++p) {
510 ::encode(p->first, dirty);
511 bufferlist bl;
512 ::encode(p->second, bl, features);
513 t->put(prefix, stringify(p->first), bl);
514 }
515 for (set<pg_t>::const_iterator p = pending_inc.pg_remove.begin(); p != pending_inc.pg_remove.end(); ++p) {
516 ::encode(*p, dirty);
517 t->erase(prefix, stringify(*p));
518 }
519 ::encode(dirty, incbl);
520 }
521 {
522 bufferlist dirty;
523 string prefix = pgmap_osd_prefix;
524 for (map<int32_t,osd_stat_t>::const_iterator p =
525 pending_inc.get_osd_stat_updates().begin();
526 p != pending_inc.get_osd_stat_updates().end();
527 ++p) {
528 ::encode(p->first, dirty);
529 bufferlist bl;
530 ::encode(p->second, bl, features);
531 ::encode(pending_inc.get_osd_epochs().find(p->first)->second, bl);
532 t->put(prefix, stringify(p->first), bl);
533 }
534 for (set<int32_t>::const_iterator p =
535 pending_inc.get_osd_stat_rm().begin();
536 p != pending_inc.get_osd_stat_rm().end();
537 ++p) {
538 ::encode(*p, dirty);
539 t->erase(prefix, stringify(*p));
540 }
541 ::encode(dirty, incbl);
542 }
543
544 put_version(t, version, incbl);
545
546 put_last_committed(t, version);
547 }
548
549 version_t PGMonitor::get_trim_to()
550 {
551 unsigned max = g_conf->mon_max_pgmap_epochs;
552 version_t version = get_last_committed();
553 if (mon->is_leader() && (version > max))
554 return version - max;
555
556 return 0;
557 }
558
559 bool PGMonitor::preprocess_query(MonOpRequestRef op)
560 {
561 op->mark_pgmon_event(__func__);
562 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
563 dout(10) << "preprocess_query " << *m
564 << " from " << m->get_orig_source_inst() << dendl;
565 switch (m->get_type()) {
566 case MSG_PGSTATS:
567 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
568 return true;
569 }
570 return preprocess_pg_stats(op);
571
572 case MSG_MON_COMMAND:
573 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
574 bufferlist rdata;
575 mon->reply_command(op, -EOPNOTSUPP, "this command is obsolete", rdata,
576 get_last_committed());
577 return true;
578 }
579 return preprocess_command(op);
580
581 default:
582 ceph_abort();
583 return true;
584 }
585 }
586
587 bool PGMonitor::prepare_update(MonOpRequestRef op)
588 {
589 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
590 return false;
591 }
592
593 op->mark_pgmon_event(__func__);
594 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
595 dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
596 switch (m->get_type()) {
597 case MSG_PGSTATS:
598 return prepare_pg_stats(op);
599
600 case MSG_MON_COMMAND:
601 return prepare_command(op);
602
603 default:
604 ceph_abort();
605 return false;
606 }
607 }
608
609 bool PGMonitor::preprocess_pg_stats(MonOpRequestRef op)
610 {
611 op->mark_pgmon_event(__func__);
612 MPGStats *stats = static_cast<MPGStats*>(op->get_req());
613 // check caps
614 MonSession *session = stats->get_session();
615 if (!session) {
616 dout(10) << "PGMonitor::preprocess_pg_stats: no monitor session!" << dendl;
617 return true;
618 }
619 if (!session->is_capable("pg", MON_CAP_R)) {
620 derr << "PGMonitor::preprocess_pg_stats: MPGStats received from entity "
621 << "with insufficient privileges " << session->caps << dendl;
622 return true;
623 }
624
625 if (stats->fsid != mon->monmap->fsid) {
626 dout(0) << __func__ << " drop message on fsid " << stats->fsid << " != "
627 << mon->monmap->fsid << " for " << *stats << dendl;
628 return true;
629 }
630
631 // First, just see if they need a new osdmap. But
632 // only if they've had the map for a while.
633 if (stats->had_map_for > 30.0 &&
634 mon->osdmon()->is_readable() &&
635 stats->epoch < mon->osdmon()->osdmap.get_epoch() &&
636 !session->proxy_con)
637 mon->osdmon()->send_latest_now_nodelete(op, stats->epoch+1);
638
639 // Always forward the PGStats to the leader, even if they are the same as
640 // the old PGStats. The leader will mark as down osds that haven't sent
641 // PGStats for a few minutes.
642 return false;
643 }
644
645 bool PGMonitor::pg_stats_have_changed(int from, const MPGStats *stats) const
646 {
647 // any new osd info?
648 ceph::unordered_map<int,osd_stat_t>::const_iterator s = pg_map.osd_stat.find(from);
649 if (s == pg_map.osd_stat.end())
650 return true;
651
652 if (s->second != stats->osd_stat)
653 return true;
654
655 // any new pg info?
656 for (map<pg_t,pg_stat_t>::const_iterator p = stats->pg_stat.begin();
657 p != stats->pg_stat.end(); ++p) {
658 ceph::unordered_map<pg_t,pg_stat_t>::const_iterator t = pg_map.pg_stat.find(p->first);
659 if (t == pg_map.pg_stat.end())
660 return true;
661
662 if (t->second.reported_epoch != p->second.reported_epoch ||
663 t->second.reported_seq != p->second.reported_seq)
664 return true;
665 }
666
667 return false;
668 }
669
670 struct PGMonitor::C_Stats : public C_MonOp {
671 PGMonitor *pgmon;
672 MonOpRequestRef stats_op_ack;
673 entity_inst_t who;
674 C_Stats(PGMonitor *p,
675 MonOpRequestRef op,
676 MonOpRequestRef op_ack)
677 : C_MonOp(op), pgmon(p), stats_op_ack(op_ack) {}
678 void _finish(int r) override {
679 if (r >= 0) {
680 pgmon->_updated_stats(op, stats_op_ack);
681 } else if (r == -ECANCELED) {
682 return;
683 } else if (r == -EAGAIN) {
684 pgmon->dispatch(op);
685 } else {
686 assert(0 == "bad C_Stats return value");
687 }
688 }
689 };
690
691 bool PGMonitor::prepare_pg_stats(MonOpRequestRef op)
692 {
693 op->mark_pgmon_event(__func__);
694 MPGStats *stats = static_cast<MPGStats*>(op->get_req());
695 dout(10) << "prepare_pg_stats " << *stats << " from " << stats->get_orig_source() << dendl;
696 int from = stats->get_orig_source().num();
697
698 if (stats->fsid != mon->monmap->fsid) {
699 dout(0) << "prepare_pg_stats on fsid " << stats->fsid << " != " << mon->monmap->fsid << dendl;
700 return false;
701 }
702
703 if (!stats->get_orig_source().is_osd() ||
704 !mon->osdmon()->osdmap.is_up(from) ||
705 stats->get_orig_source_inst() != mon->osdmon()->osdmap.get_inst(from)) {
706 dout(1) << " ignoring stats from non-active osd." << dendl;
707 return false;
708 }
709
710 last_osd_report[from] = ceph_clock_now();
711
712 if (!pg_stats_have_changed(from, stats)) {
713 dout(10) << " message contains no new osd|pg stats" << dendl;
714 MPGStatsAck *ack = new MPGStatsAck;
715 ack->set_tid(stats->get_tid());
716 for (map<pg_t,pg_stat_t>::const_iterator p = stats->pg_stat.begin();
717 p != stats->pg_stat.end();
718 ++p) {
719 ack->pg_stat[p->first] = make_pair(p->second.reported_seq, p->second.reported_epoch);
720 }
721 mon->send_reply(op, ack);
722 return false;
723 }
724
725 // osd stat
726 if (mon->osdmon()->osdmap.is_in(from)) {
727 pending_inc.update_stat(from, stats->epoch, std::move(stats->osd_stat));
728 } else {
729 pending_inc.update_stat(from, stats->epoch, osd_stat_t());
730 }
731
732 if (pg_map.osd_stat.count(from))
733 dout(10) << " got osd." << from << " " << stats->osd_stat << " (was " << pg_map.osd_stat[from] << ")" << dendl;
734 else
735 dout(10) << " got osd." << from << " " << stats->osd_stat << " (first report)" << dendl;
736
737 // pg stats
738 MPGStatsAck *ack = new MPGStatsAck;
739 MonOpRequestRef ack_op = mon->op_tracker.create_request<MonOpRequest>(ack);
740 ack->set_tid(stats->get_tid());
741 for (map<pg_t,pg_stat_t>::iterator p = stats->pg_stat.begin();
742 p != stats->pg_stat.end();
743 ++p) {
744 pg_t pgid = p->first;
745 ack->pg_stat[pgid] = make_pair(p->second.reported_seq, p->second.reported_epoch);
746
747 if (pg_map.pg_stat.count(pgid) &&
748 pg_map.pg_stat[pgid].get_version_pair() > p->second.get_version_pair()) {
749 dout(15) << " had " << pgid << " from " << pg_map.pg_stat[pgid].reported_epoch << ":"
750 << pg_map.pg_stat[pgid].reported_seq << dendl;
751 continue;
752 }
753 if (pending_inc.pg_stat_updates.count(pgid) &&
754 pending_inc.pg_stat_updates[pgid].get_version_pair() > p->second.get_version_pair()) {
755 dout(15) << " had " << pgid << " from " << pending_inc.pg_stat_updates[pgid].reported_epoch << ":"
756 << pending_inc.pg_stat_updates[pgid].reported_seq << " (pending)" << dendl;
757 continue;
758 }
759
760 if (pg_map.pg_stat.count(pgid) == 0) {
761 dout(15) << " got " << pgid << " reported at " << p->second.reported_epoch << ":"
762 << p->second.reported_seq
763 << " state " << pg_state_string(p->second.state)
764 << " but DNE in pg_map; pool was probably deleted."
765 << dendl;
766 continue;
767 }
768
769 dout(15) << " got " << pgid
770 << " reported at " << p->second.reported_epoch << ":" << p->second.reported_seq
771 << " state " << pg_state_string(pg_map.pg_stat[pgid].state)
772 << " -> " << pg_state_string(p->second.state)
773 << dendl;
774 pending_inc.pg_stat_updates[pgid] = p->second;
775 }
776
777 wait_for_finished_proposal(op, new C_Stats(this, op, ack_op));
778 return true;
779 }
780
781 void PGMonitor::_updated_stats(MonOpRequestRef op, MonOpRequestRef ack_op)
782 {
783 op->mark_pgmon_event(__func__);
784 ack_op->mark_pgmon_event(__func__);
785 MPGStats *ack = static_cast<MPGStats*>(ack_op->get_req());
786 ack->get(); // MonOpRequestRef owns one ref; give the other to send_reply.
787 dout(7) << "_updated_stats for "
788 << op->get_req()->get_orig_source_inst() << dendl;
789 mon->send_reply(op, ack);
790 }
791
792
793 // ------------------------
794
795 struct RetryCheckOSDMap : public Context {
796 PGMonitor *pgmon;
797 epoch_t epoch;
798 RetryCheckOSDMap(PGMonitor *p, epoch_t e) : pgmon(p), epoch(e) {
799 }
800 void finish(int r) override {
801 if (r == -ECANCELED)
802 return;
803
804 pgmon->check_osd_map(epoch);
805 }
806 };
807
808 void PGMonitor::check_osd_map(epoch_t epoch)
809 {
810 if (mon->is_peon())
811 return; // whatever.
812
813 if (did_delete)
814 return;
815
816 if (pg_map.last_osdmap_epoch >= epoch) {
817 dout(10) << __func__ << " already seen " << pg_map.last_osdmap_epoch
818 << " >= " << epoch << dendl;
819 return;
820 }
821
822 if (!mon->osdmon()->is_readable()) {
823 dout(10) << __func__ << " -- osdmap not readable, waiting" << dendl;
824 mon->osdmon()->wait_for_readable_ctx(new RetryCheckOSDMap(this, epoch));
825 return;
826 }
827
828 if (!is_writeable()) {
829 dout(10) << __func__ << " -- pgmap not writeable, waiting" << dendl;
830 wait_for_writeable_ctx(new RetryCheckOSDMap(this, epoch));
831 return;
832 }
833
834 const OSDMap& osdmap = mon->osdmon()->osdmap;
835 if (!did_delete && osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
836 // delete all my data
837 dout(1) << __func__ << " will clear pg_map data" << dendl;
838 do_delete = true;
839 propose_pending();
840 return;
841 }
842
843 // osds that went up or down
844 set<int> need_check_down_pg_osds;
845
846 // apply latest map(s)
847 epoch = std::max(epoch, osdmap.get_epoch());
848 for (epoch_t e = pg_map.last_osdmap_epoch+1;
849 e <= epoch;
850 e++) {
851 dout(10) << __func__ << " applying osdmap e" << e << " to pg_map" << dendl;
852 bufferlist bl;
853 int err = mon->osdmon()->get_version(e, bl);
854 assert(err == 0);
855
856 assert(bl.length());
857 OSDMap::Incremental inc(bl);
858
859 PGMapUpdater::check_osd_map(inc, &need_check_down_pg_osds,
860 &last_osd_report, &pg_map, &pending_inc);
861 }
862
863 assert(pg_map.last_osdmap_epoch < epoch);
864 pending_inc.osdmap_epoch = epoch;
865 PGMapUpdater::update_creating_pgs(osdmap, pg_map, &pending_inc);
866 PGMapUpdater::register_new_pgs(osdmap, pg_map, &pending_inc);
867
868 PGMapUpdater::check_down_pgs(osdmap, pg_map, check_all_pgs,
869 need_check_down_pg_osds, &pending_inc);
870 check_all_pgs = false;
871
872 propose_pending();
873 }
874
875 epoch_t PGMonitor::send_pg_creates(int osd, Connection *con, epoch_t next)
876 {
877 dout(30) << __func__ << " " << pg_map.creating_pgs_by_osd_epoch << dendl;
878 map<int, map<epoch_t, set<pg_t> > >::iterator p =
879 pg_map.creating_pgs_by_osd_epoch.find(osd);
880 if (p == pg_map.creating_pgs_by_osd_epoch.end())
881 return next;
882
883 assert(p->second.size() > 0);
884
885 MOSDPGCreate *m = NULL;
886 epoch_t last = 0;
887 for (map<epoch_t, set<pg_t> >::iterator q = p->second.lower_bound(next);
888 q != p->second.end();
889 ++q) {
890 dout(20) << __func__ << " osd." << osd << " from " << next
891 << " : epoch " << q->first << " " << q->second.size() << " pgs"
892 << dendl;
893 last = q->first;
894 for (set<pg_t>::iterator r = q->second.begin(); r != q->second.end(); ++r) {
895 pg_stat_t &st = pg_map.pg_stat[*r];
896 if (!m)
897 m = new MOSDPGCreate(pg_map.last_osdmap_epoch);
898 m->mkpg[*r] = pg_create_t(st.created,
899 st.parent,
900 st.parent_split_bits);
901 // Need the create time from the monitor using its clock to set
902 // last_scrub_stamp upon pg creation.
903 m->ctimes[*r] = pg_map.pg_stat[*r].last_scrub_stamp;
904 }
905 }
906 if (!m) {
907 dout(20) << "send_pg_creates osd." << osd << " from " << next
908 << " has nothing to send" << dendl;
909 return next;
910 }
911
912 con->send_message(m);
913
914 // sub is current through last + 1
915 return last + 1;
916 }
917
918 bool PGMonitor::preprocess_command(MonOpRequestRef op)
919 {
920 op->mark_pgmon_event(__func__);
921 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
922 int r = -1;
923 bufferlist rdata;
924 stringstream ss, ds;
925
926 if (m->fsid != mon->monmap->fsid) {
927 dout(0) << __func__ << " drop message on fsid " << m->fsid << " != "
928 << mon->monmap->fsid << " for " << *m << dendl;
929 return true;
930 }
931
932 map<string, cmd_vartype> cmdmap;
933 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
934 // ss has reason for failure
935 string rs = ss.str();
936 mon->reply_command(op, -EINVAL, rs, rdata, get_last_committed());
937 return true;
938 }
939
940 string prefix;
941 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
942
943 MonSession *session = m->get_session();
944 if (!session) {
945 mon->reply_command(op, -EACCES, "access denied", rdata, get_last_committed());
946 return true;
947 }
948
949 string format;
950 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
951 boost::scoped_ptr<Formatter> f(Formatter::create(format));
952
953 if (prefix == "pg scrub" ||
954 prefix == "pg repair" ||
955 prefix == "pg deep-scrub") {
956 string scrubop = prefix.substr(3, string::npos);
957 pg_t pgid;
958 string pgidstr;
959 cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr);
960 if (!pgid.parse(pgidstr.c_str())) {
961 ss << "invalid pgid '" << pgidstr << "'";
962 r = -EINVAL;
963 goto reply;
964 }
965 if (!pg_map.pg_stat.count(pgid)) {
966 ss << "pg " << pgid << " dne";
967 r = -ENOENT;
968 goto reply;
969 }
970 int osd = pg_map.pg_stat[pgid].acting_primary;
971 if (osd == -1) {
972 ss << "pg " << pgid << " has no primary osd";
973 r = -EAGAIN;
974 goto reply;
975 }
976 if (!mon->osdmon()->osdmap.is_up(osd)) {
977 ss << "pg " << pgid << " primary osd." << osd << " not up";
978 r = -EAGAIN;
979 goto reply;
980 }
981 vector<pg_t> pgs(1);
982 pgs[0] = pgid;
983 mon->try_send_message(new MOSDScrub(mon->monmap->fsid, pgs,
984 scrubop == "repair",
985 scrubop == "deep-scrub"),
986 mon->osdmon()->osdmap.get_inst(osd));
987 ss << "instructing pg " << pgid << " on osd." << osd << " to " << scrubop;
988 r = 0;
989 } else {
990 r = process_pg_map_command(prefix, cmdmap, pg_map, mon->osdmon()->osdmap,
991 f.get(), &ss, &rdata);
992 }
993
994 if (r == -EOPNOTSUPP)
995 return false;
996
997 reply:
998 string rs;
999 getline(ss, rs);
1000 rdata.append(ds);
1001 mon->reply_command(op, r, rs, rdata, get_last_committed());
1002 return true;
1003 }
1004
1005 bool PGMonitor::prepare_command(MonOpRequestRef op)
1006 {
1007 op->mark_pgmon_event(__func__);
1008 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
1009 if (m->fsid != mon->monmap->fsid) {
1010 dout(0) << __func__ << " drop message on fsid " << m->fsid << " != "
1011 << mon->monmap->fsid << " for " << *m << dendl;
1012 return true;
1013 }
1014 stringstream ss;
1015 pg_t pgid;
1016 epoch_t epoch = mon->osdmon()->osdmap.get_epoch();
1017 int r = 0;
1018 string rs;
1019
1020 map<string, cmd_vartype> cmdmap;
1021 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
1022 // ss has reason for failure
1023 string rs = ss.str();
1024 mon->reply_command(op, -EINVAL, rs, get_last_committed());
1025 return true;
1026 }
1027
1028 string prefix;
1029 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
1030
1031 MonSession *session = m->get_session();
1032 if (!session) {
1033 mon->reply_command(op, -EACCES, "access denied", get_last_committed());
1034 return true;
1035 }
1036
1037 if (prefix == "pg force_create_pg") {
1038 string pgidstr;
1039 cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr);
1040 if (!pgid.parse(pgidstr.c_str())) {
1041 ss << "pg " << pgidstr << " invalid";
1042 r = -EINVAL;
1043 goto reply;
1044 }
1045 if (!pg_map.pg_stat.count(pgid)) {
1046 ss << "pg " << pgid << " dne";
1047 r = -ENOENT;
1048 goto reply;
1049 }
1050 if (pg_map.creating_pgs.count(pgid)) {
1051 ss << "pg " << pgid << " already creating";
1052 r = 0;
1053 goto reply;
1054 }
1055 {
1056 PGMapUpdater::register_pg(
1057 mon->osdmon()->osdmap,
1058 pgid,
1059 epoch,
1060 true,
1061 pg_map,
1062 &pending_inc);
1063 }
1064 ss << "pg " << pgidstr << " now creating, ok";
1065 goto update;
1066 } else if (prefix == "pg set_full_ratio" ||
1067 prefix == "pg set_nearfull_ratio") {
1068 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
1069 ss << "please use the new luminous interfaces"
1070 << " ('osd set-full-ratio' and 'osd set-nearfull-ratio')";
1071 r = -EPERM;
1072 goto reply;
1073 }
1074 double n;
1075 if (!cmd_getval(g_ceph_context, cmdmap, "ratio", n)) {
1076 ss << "unable to parse 'ratio' value '"
1077 << cmd_vartype_stringify(cmdmap["who"]) << "'";
1078 r = -EINVAL;
1079 goto reply;
1080 }
1081 string op = prefix.substr(3, string::npos);
1082 if (op == "set_full_ratio")
1083 pending_inc.full_ratio = n;
1084 else if (op == "set_nearfull_ratio")
1085 pending_inc.nearfull_ratio = n;
1086 goto update;
1087 } else {
1088 r = -EINVAL;
1089 goto reply;
1090 }
1091
1092 reply:
1093 getline(ss, rs);
1094 if (r < 0 && rs.length() == 0)
1095 rs = cpp_strerror(r);
1096 mon->reply_command(op, r, rs, get_last_committed());
1097 return false;
1098
1099 update:
1100 getline(ss, rs);
1101 wait_for_finished_proposal(op, new Monitor::C_Command(
1102 mon, op, r, rs, get_last_committed() + 1));
1103 return true;
1104 }
1105
1106 void PGMonitor::get_health(list<pair<health_status_t,string> >& summary,
1107 list<pair<health_status_t,string> > *detail,
1108 CephContext *cct) const
1109 {
1110 // legacy pre-luminous full/nearfull
1111 if (mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
1112 check_full_osd_health(summary, detail, pg_map.full_osds, "full",
1113 HEALTH_ERR);
1114 check_full_osd_health(summary, detail, pg_map.nearfull_osds, "near full",
1115 HEALTH_WARN);
1116 pg_map.get_health(cct, mon->osdmon()->osdmap, summary, detail);
1117 }
1118 }
1119
1120 void PGMonitor::check_full_osd_health(list<pair<health_status_t,string> >& summary,
1121 list<pair<health_status_t,string> > *detail,
1122 const mempool::pgmap::set<int>& s, const char *desc,
1123 health_status_t sev) const
1124 {
1125 if (!s.empty()) {
1126 ostringstream ss;
1127 ss << s.size() << " " << desc << " osd(s)";
1128 summary.push_back(make_pair(sev, ss.str()));
1129 if (detail) {
1130 for (set<int>::const_iterator p = s.begin(); p != s.end(); ++p) {
1131 ostringstream ss;
1132 const osd_stat_t& os = pg_map.osd_stat.find(*p)->second;
1133 int ratio = (int)(((float)os.kb_used) / (float) os.kb * 100.0);
1134 ss << "osd." << *p << " is " << desc << " at " << ratio << "%";
1135 detail->push_back(make_pair(sev, ss.str()));
1136 }
1137 }
1138 }
1139 }
1140
1141 void PGMonitor::check_subs()
1142 {
1143 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
1144 return;
1145 }
1146
1147 dout(10) << __func__ << dendl;
1148 const string type = "osd_pg_creates";
1149
1150 mon->with_session_map([this, &type](const MonSessionMap& session_map) {
1151 if (mon->session_map.subs.count(type) == 0)
1152 return;
1153
1154 auto p = mon->session_map.subs[type]->begin();
1155 while (!p.end()) {
1156 Subscription *sub = *p;
1157 ++p;
1158 dout(20) << __func__ << " .. " << sub->session->inst << dendl;
1159 check_sub(sub);
1160 }
1161 });
1162 }
1163
1164 bool PGMonitor::check_sub(Subscription *sub)
1165 {
1166 OSDMap& osdmap = mon->osdmon()->osdmap;
1167 if (sub->type == "osd_pg_creates") {
1168 // only send these if the OSD is up. we will check_subs() when they do
1169 // come up so they will get the creates then.
1170 if (sub->session->inst.name.is_osd() &&
1171 osdmap.is_up(sub->session->inst.name.num())) {
1172 sub->next = send_pg_creates(sub->session->inst.name.num(),
1173 sub->session->con.get(),
1174 sub->next);
1175 }
1176 }
1177 return true;
1178 }
1179
1180 class PGMonStatService : public MonPGStatService, public PGMapStatService {
1181 PGMonitor *pgmon;
1182 public:
1183 PGMonStatService(const PGMap& o, PGMonitor *pgm)
1184 : MonPGStatService(), PGMapStatService(o), pgmon(pgm) {}
1185
1186
1187 bool is_readable() const override { return pgmon->is_readable(); }
1188
1189 unsigned maybe_add_creating_pgs(epoch_t scan_epoch,
1190 const mempool::osdmap::map<int64_t,pg_pool_t>& pools,
1191 creating_pgs_t *pending_creates) const override
1192 {
1193 if (pgmap.last_pg_scan < scan_epoch) {
1194 return 0;
1195 }
1196 unsigned added = 0;
1197 for (auto& pgid : pgmap.creating_pgs) {
1198 if (!pools.count(pgid.pool())) {
1199 continue;
1200 }
1201 auto st = pgmap.pg_stat.find(pgid);
1202 assert(st != pgmap.pg_stat.end());
1203 auto created = make_pair(st->second.created,
1204 st->second.last_scrub_stamp);
1205 // no need to add the pg, if it already exists in creating_pgs
1206 if (pending_creates->pgs.emplace(pgid, created).second) {
1207 added++;
1208 }
1209 }
1210 return added;
1211 }
1212 void maybe_trim_creating_pgs(creating_pgs_t *creates) const override {
1213 auto p = creates->pgs.begin();
1214 while (p != creates->pgs.end()) {
1215 auto q = pgmap.pg_stat.find(p->first);
1216 if (q != pgmap.pg_stat.end() &&
1217 !(q->second.state & PG_STATE_CREATING)) {
1218 p = creates->pgs.erase(p);
1219 creates->created_pools.insert(q->first.pool());
1220 } else {
1221 ++p;
1222 }
1223 }
1224 }
1225 void dump_info(Formatter *f) const override {
1226 f->dump_object("pgmap", pgmap);
1227 f->dump_unsigned("pgmap_first_committed", pgmon->get_first_committed());
1228 f->dump_unsigned("pgmap_last_committed", pgmon->get_last_committed());
1229 }
1230 int process_pg_command(const string& prefix,
1231 const map<string,cmd_vartype>& cmdmap,
1232 const OSDMap& osdmap,
1233 Formatter *f,
1234 stringstream *ss,
1235 bufferlist *odata) const override {
1236 return process_pg_map_command(prefix, cmdmap, pgmap, osdmap, f, ss, odata);
1237 }
1238
1239 int reweight_by_utilization(const OSDMap &osd_map,
1240 int oload,
1241 double max_changef,
1242 int max_osds,
1243 bool by_pg, const set<int64_t> *pools,
1244 bool no_increasing,
1245 mempool::osdmap::map<int32_t, uint32_t>* new_weights,
1246 std::stringstream *ss,
1247 std::string *out_str,
1248 Formatter *f) const override {
1249 return reweight::by_utilization(osd_map, pgmap, oload, max_changef,
1250 max_osds, by_pg, pools, no_increasing,
1251 new_weights, ss, out_str, f);
1252 }
1253 };
1254
1255 MonPGStatService *PGMonitor::get_pg_stat_service()
1256 {
1257 if (!pgservice) {
1258 pgservice.reset(new PGMonStatService(pg_map, this));
1259 }
1260 return pgservice.get();
1261 }
1262
1263 PGMonitor::PGMonitor(Monitor *mn, Paxos *p, const string& service_name)
1264 : PaxosService(mn, p, service_name),
1265 pgmap_meta_prefix("pgmap_meta"),
1266 pgmap_pg_prefix("pgmap_pg"),
1267 pgmap_osd_prefix("pgmap_osd")
1268 {}
1269
1270 PGMonitor::~PGMonitor() = default;