]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | #include "acconfig.h" | |
15 | ||
16 | #ifdef HAVE_SYS_MOUNT_H | |
17 | #include <sys/mount.h> | |
18 | #endif | |
19 | ||
20 | #ifdef HAVE_SYS_PARAM_H | |
21 | #include <sys/param.h> | |
22 | #endif | |
23 | ||
11fdf7f2 TL |
24 | #ifdef HAVE_SYS_VFS_H |
25 | #include <sys/vfs.h> | |
26 | #endif | |
27 | ||
7c673cae FG |
28 | #include <iostream> |
29 | #include <string> | |
30 | #include <map> | |
31 | ||
32 | #include <boost/scoped_ptr.hpp> | |
33 | #include <boost/random/mersenne_twister.hpp> | |
34 | #include <boost/random/uniform_int.hpp> | |
35 | ||
36 | ||
37 | #include "osd/osd_types.h" | |
7c673cae FG |
38 | #include "osdc/Objecter.h" |
39 | #include "mon/MonClient.h" | |
40 | #include "msg/Dispatcher.h" | |
41 | #include "msg/Messenger.h" | |
f67539c2 | 42 | #include "common/async/context_pool.h" |
7c673cae FG |
43 | #include "common/Timer.h" |
44 | #include "common/ceph_argparse.h" | |
45 | #include "global/global_init.h" | |
46 | #include "global/signal_handler.h" | |
47 | #include "common/config.h" | |
48 | #include "common/debug.h" | |
49 | #include "common/errno.h" | |
9f95a23c | 50 | #include "common/ceph_mutex.h" |
7c673cae FG |
51 | #include "common/strtol.h" |
52 | #include "common/LogEntry.h" | |
53 | #include "auth/KeyRing.h" | |
54 | #include "auth/AuthAuthorizeHandler.h" | |
55 | #include "include/uuid.h" | |
11fdf7f2 | 56 | #include "include/ceph_assert.h" |
7c673cae FG |
57 | |
58 | #include "messages/MOSDBoot.h" | |
59 | #include "messages/MOSDAlive.h" | |
60 | #include "messages/MOSDPGCreate.h" | |
61 | #include "messages/MOSDPGRemove.h" | |
62 | #include "messages/MOSDMap.h" | |
63 | #include "messages/MPGStats.h" | |
64 | #include "messages/MLog.h" | |
65 | #include "messages/MOSDPGTemp.h" | |
66 | ||
67 | using namespace std; | |
68 | ||
69 | #define dout_context g_ceph_context | |
70 | #define dout_subsys ceph_subsys_ | |
71 | #undef dout_prefix | |
72 | #define dout_prefix _prefix(_dout, get_name()) | |
11fdf7f2 | 73 | static ostream& _prefix(std::ostream *_dout, const string &n) { |
7c673cae FG |
74 | return *_dout << " stub(" << n << ") "; |
75 | } | |
76 | ||
77 | ||
78 | typedef boost::mt11213b rngen_t; | |
79 | typedef boost::scoped_ptr<Messenger> MessengerRef; | |
80 | typedef boost::scoped_ptr<Objecter> ObjecterRef; | |
81 | ||
82 | class TestStub : public Dispatcher | |
83 | { | |
84 | protected: | |
85 | MessengerRef messenger; | |
f67539c2 | 86 | ceph::async::io_context_pool poolctx; |
7c673cae FG |
87 | MonClient monc; |
88 | ||
9f95a23c TL |
89 | ceph::mutex lock; |
90 | ceph::condition_variable cond; | |
7c673cae FG |
91 | SafeTimer timer; |
92 | ||
93 | bool do_shutdown; | |
94 | double tick_seconds; | |
95 | ||
96 | struct C_Tick : public Context { | |
97 | TestStub *s; | |
98 | explicit C_Tick(TestStub *stub) : s(stub) {} | |
99 | void finish(int r) override { | |
100 | generic_dout(20) << "C_Tick::" << __func__ << dendl; | |
101 | if (r == -ECANCELED) { | |
102 | generic_dout(20) << "C_Tick::" << __func__ | |
103 | << " shutdown" << dendl; | |
104 | return; | |
105 | } | |
106 | s->tick(); | |
107 | } | |
108 | }; | |
109 | ||
110 | bool ms_dispatch(Message *m) override = 0; | |
111 | void ms_handle_connect(Connection *con) override = 0; | |
112 | void ms_handle_remote_reset(Connection *con) override = 0; | |
113 | virtual int _shutdown() = 0; | |
114 | // courtesy method to be implemented by the stubs at their | |
115 | // own discretion | |
116 | virtual void _tick() { } | |
117 | // different stubs may have different needs; if a stub needs | |
118 | // to tick, then it must call this function. | |
119 | void start_ticking(double t=1.0) { | |
120 | tick_seconds = t; | |
121 | if (t <= 0) { | |
122 | stop_ticking(); | |
123 | return; | |
124 | } | |
125 | dout(20) << __func__ << " adding tick timer" << dendl; | |
126 | timer.add_event_after(tick_seconds, new C_Tick(this)); | |
127 | } | |
128 | // If we have a function to start ticking that the stubs can | |
129 | // use at their own discretion, then we should also have a | |
130 | // function to disable said ticking to be used the same way. | |
131 | // Just in case. | |
132 | // For simplicity's sake, we don't cancel the tick right off | |
133 | // the bat; instead, we wait for the next tick to kick in and | |
134 | // disable itself. | |
135 | void stop_ticking() { | |
136 | dout(20) << __func__ << " disable tick" << dendl; | |
137 | tick_seconds = 0; | |
138 | } | |
139 | ||
140 | public: | |
141 | void tick() { | |
142 | std::cout << __func__ << std::endl; | |
143 | if (do_shutdown || (tick_seconds <= 0)) { | |
144 | std::cout << __func__ << " " | |
145 | << (do_shutdown ? "shutdown" : "stop ticking") | |
146 | << std::endl; | |
147 | return; | |
148 | } | |
149 | _tick(); | |
150 | timer.add_event_after(tick_seconds, new C_Tick(this)); | |
151 | } | |
152 | ||
153 | virtual const string get_name() = 0; | |
154 | virtual int init() = 0; | |
155 | ||
156 | virtual int shutdown() { | |
9f95a23c | 157 | std::lock_guard l{lock}; |
7c673cae FG |
158 | do_shutdown = true; |
159 | int r = _shutdown(); | |
160 | if (r < 0) { | |
161 | dout(10) << __func__ << " error shutting down: " | |
162 | << cpp_strerror(-r) << dendl; | |
163 | return r; | |
164 | } | |
165 | monc.shutdown(); | |
166 | timer.shutdown(); | |
167 | messenger->shutdown(); | |
f67539c2 | 168 | poolctx.finish(); |
7c673cae FG |
169 | return 0; |
170 | } | |
171 | ||
172 | virtual void print(ostream &out) { | |
173 | out << "stub(" << get_name() << ")"; | |
174 | } | |
175 | ||
176 | void wait() { | |
177 | if (messenger != NULL) | |
178 | messenger->wait(); | |
179 | } | |
180 | ||
181 | TestStub(CephContext *cct, string who) | |
182 | : Dispatcher(cct), | |
f67539c2 | 183 | monc(cct, poolctx), |
9f95a23c | 184 | lock(ceph::make_mutex(who.append("::lock"))), |
7c673cae FG |
185 | timer(cct, lock), |
186 | do_shutdown(false), | |
187 | tick_seconds(0.0) { } | |
188 | }; | |
189 | ||
190 | class ClientStub : public TestStub | |
191 | { | |
192 | ObjecterRef objecter; | |
193 | rngen_t gen; | |
194 | ||
195 | protected: | |
196 | bool ms_dispatch(Message *m) override { | |
9f95a23c | 197 | std::lock_guard l{lock}; |
7c673cae FG |
198 | dout(1) << "client::" << __func__ << " " << *m << dendl; |
199 | switch (m->get_type()) { | |
200 | case CEPH_MSG_OSD_MAP: | |
201 | objecter->handle_osd_map((MOSDMap*)m); | |
9f95a23c | 202 | cond.notify_all(); |
7c673cae FG |
203 | break; |
204 | } | |
205 | return true; | |
206 | } | |
207 | ||
208 | void ms_handle_connect(Connection *con) override { | |
209 | dout(1) << "client::" << __func__ << " " << con << dendl; | |
9f95a23c | 210 | std::lock_guard l{lock}; |
7c673cae FG |
211 | objecter->ms_handle_connect(con); |
212 | } | |
213 | ||
214 | void ms_handle_remote_reset(Connection *con) override { | |
215 | dout(1) << "client::" << __func__ << " " << con << dendl; | |
9f95a23c | 216 | std::lock_guard l{lock}; |
7c673cae FG |
217 | objecter->ms_handle_remote_reset(con); |
218 | } | |
219 | ||
220 | bool ms_handle_reset(Connection *con) override { | |
221 | dout(1) << "client::" << __func__ << dendl; | |
9f95a23c | 222 | std::lock_guard l{lock}; |
7c673cae FG |
223 | objecter->ms_handle_reset(con); |
224 | return false; | |
225 | } | |
226 | ||
227 | bool ms_handle_refused(Connection *con) override { | |
228 | return false; | |
229 | } | |
230 | ||
231 | const string get_name() override { | |
232 | return "client"; | |
233 | } | |
234 | ||
235 | int _shutdown() override { | |
236 | if (objecter) { | |
237 | objecter->shutdown(); | |
238 | } | |
239 | return 0; | |
240 | } | |
241 | ||
242 | public: | |
243 | explicit ClientStub(CephContext *cct) | |
244 | : TestStub(cct, "client"), | |
245 | gen((int) time(NULL)) | |
246 | { } | |
247 | ||
248 | int init() override { | |
249 | int err; | |
f67539c2 | 250 | poolctx.start(1); |
7c673cae FG |
251 | err = monc.build_initial_monmap(); |
252 | if (err < 0) { | |
253 | derr << "ClientStub::" << __func__ << " ERROR: build initial monmap: " | |
254 | << cpp_strerror(err) << dendl; | |
255 | return err; | |
256 | } | |
257 | ||
258 | messenger.reset(Messenger::create_client_messenger(cct, "stubclient")); | |
11fdf7f2 | 259 | ceph_assert(messenger.get() != NULL); |
7c673cae FG |
260 | |
261 | messenger->set_default_policy( | |
262 | Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX)); | |
263 | dout(10) << "ClientStub::" << __func__ << " starting messenger at " | |
11fdf7f2 | 264 | << messenger->get_myaddrs() << dendl; |
7c673cae | 265 | |
f67539c2 | 266 | objecter.reset(new Objecter(cct, messenger.get(), &monc, poolctx)); |
11fdf7f2 | 267 | ceph_assert(objecter.get() != NULL); |
7c673cae FG |
268 | objecter->set_balanced_budget(); |
269 | ||
270 | monc.set_messenger(messenger.get()); | |
271 | objecter->init(); | |
272 | messenger->add_dispatcher_head(this); | |
273 | messenger->start(); | |
274 | monc.set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD); | |
275 | ||
276 | err = monc.init(); | |
277 | if (err < 0) { | |
278 | derr << "ClientStub::" << __func__ << " monc init error: " | |
279 | << cpp_strerror(-err) << dendl; | |
280 | return err; | |
281 | } | |
282 | ||
283 | err = monc.authenticate(); | |
284 | if (err < 0) { | |
285 | derr << "ClientStub::" << __func__ << " monc authenticate error: " | |
286 | << cpp_strerror(-err) << dendl; | |
287 | monc.shutdown(); | |
288 | return err; | |
289 | } | |
290 | monc.wait_auth_rotating(30.0); | |
291 | ||
292 | objecter->set_client_incarnation(0); | |
293 | objecter->start(); | |
294 | ||
9f95a23c | 295 | lock.lock(); |
7c673cae FG |
296 | timer.init(); |
297 | monc.renew_subs(); | |
298 | ||
9f95a23c | 299 | lock.unlock(); |
7c673cae FG |
300 | |
301 | objecter->wait_for_osd_map(); | |
302 | ||
303 | dout(10) << "ClientStub::" << __func__ << " done" << dendl; | |
304 | return 0; | |
305 | } | |
306 | }; | |
307 | ||
7c673cae FG |
308 | class OSDStub : public TestStub |
309 | { | |
7c673cae FG |
310 | int whoami; |
311 | OSDSuperblock sb; | |
312 | OSDMap osdmap; | |
313 | osd_stat_t osd_stat; | |
314 | ||
315 | map<pg_t,pg_stat_t> pgs; | |
316 | set<pg_t> pgs_changes; | |
317 | ||
318 | rngen_t gen; | |
319 | boost::uniform_int<> mon_osd_rng; | |
320 | ||
321 | utime_t last_boot_attempt; | |
322 | static const double STUB_BOOT_INTERVAL; | |
323 | ||
324 | ||
325 | public: | |
326 | ||
327 | enum { | |
328 | STUB_MON_OSD_ALIVE = 1, | |
329 | STUB_MON_OSD_PGTEMP = 2, | |
330 | STUB_MON_OSD_FAILURE = 3, | |
331 | STUB_MON_OSD_PGSTATS = 4, | |
332 | STUB_MON_LOG = 5, | |
333 | ||
334 | STUB_MON_OSD_FIRST = STUB_MON_OSD_ALIVE, | |
335 | STUB_MON_OSD_LAST = STUB_MON_LOG, | |
336 | }; | |
337 | ||
338 | struct C_CreatePGs : public Context { | |
339 | OSDStub *s; | |
340 | explicit C_CreatePGs(OSDStub *stub) : s(stub) {} | |
341 | void finish(int r) override { | |
342 | if (r == -ECANCELED) { | |
343 | generic_dout(20) << "C_CreatePGs::" << __func__ | |
344 | << " shutdown" << dendl; | |
345 | return; | |
346 | } | |
347 | generic_dout(20) << "C_CreatePGs::" << __func__ << dendl; | |
348 | s->auto_create_pgs(); | |
349 | } | |
350 | }; | |
351 | ||
352 | ||
353 | OSDStub(int _whoami, CephContext *cct) | |
354 | : TestStub(cct, "osd"), | |
7c673cae FG |
355 | whoami(_whoami), |
356 | gen(whoami), | |
357 | mon_osd_rng(STUB_MON_OSD_FIRST, STUB_MON_OSD_LAST) | |
358 | { | |
359 | dout(20) << __func__ << " auth supported: " | |
360 | << cct->_conf->auth_supported << dendl; | |
361 | stringstream ss; | |
362 | ss << "client-osd" << whoami; | |
11fdf7f2 | 363 | std::string public_msgr_type = cct->_conf->ms_public_type.empty() ? cct->_conf.get_val<std::string>("ms_type") : cct->_conf->ms_public_type; |
7c673cae | 364 | messenger.reset(Messenger::create(cct, public_msgr_type, entity_name_t::OSD(whoami), |
f67539c2 | 365 | ss.str().c_str(), getpid())); |
7c673cae FG |
366 | |
367 | Throttle throttler(g_ceph_context, "osd_client_bytes", | |
11fdf7f2 | 368 | g_conf()->osd_client_message_size_cap); |
7c673cae FG |
369 | |
370 | messenger->set_default_policy( | |
371 | Messenger::Policy::stateless_server(0)); | |
372 | messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT, | |
373 | &throttler, NULL); | |
374 | messenger->set_policy(entity_name_t::TYPE_MON, | |
375 | Messenger::Policy::lossy_client( | |
376 | CEPH_FEATURE_UID | | |
377 | CEPH_FEATURE_PGID64 | | |
378 | CEPH_FEATURE_OSDENC)); | |
379 | messenger->set_policy(entity_name_t::TYPE_OSD, | |
380 | Messenger::Policy::stateless_server(0)); | |
381 | ||
11fdf7f2 TL |
382 | dout(10) << __func__ << " public addr " << g_conf()->public_addr << dendl; |
383 | int err = messenger->bind(g_conf()->public_addr); | |
7c673cae FG |
384 | if (err < 0) |
385 | exit(1); | |
386 | ||
387 | if (monc.build_initial_monmap() < 0) | |
388 | exit(1); | |
389 | ||
390 | messenger->start(); | |
391 | monc.set_messenger(messenger.get()); | |
392 | } | |
393 | ||
394 | int init() override { | |
395 | dout(10) << __func__ << dendl; | |
9f95a23c | 396 | std::lock_guard l{lock}; |
7c673cae FG |
397 | |
398 | dout(1) << __func__ << " fsid " << monc.monmap.fsid | |
11fdf7f2 TL |
399 | << " osd_fsid " << g_conf()->osd_uuid << dendl; |
400 | dout(1) << __func__ << " name " << g_conf()->name << dendl; | |
7c673cae FG |
401 | |
402 | timer.init(); | |
403 | messenger->add_dispatcher_head(this); | |
404 | monc.set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD); | |
405 | ||
406 | int err = monc.init(); | |
407 | if (err < 0) { | |
408 | derr << __func__ << " monc init error: " | |
409 | << cpp_strerror(-err) << dendl; | |
410 | return err; | |
411 | } | |
412 | ||
413 | err = monc.authenticate(); | |
414 | if (err < 0) { | |
415 | derr << __func__ << " monc authenticate error: " | |
416 | << cpp_strerror(-err) << dendl; | |
417 | monc.shutdown(); | |
418 | return err; | |
419 | } | |
11fdf7f2 | 420 | ceph_assert(!monc.get_fsid().is_zero()); |
7c673cae FG |
421 | |
422 | monc.wait_auth_rotating(30.0); | |
423 | ||
424 | ||
425 | dout(10) << __func__ << " creating osd superblock" << dendl; | |
426 | sb.cluster_fsid = monc.monmap.fsid; | |
427 | sb.osd_fsid.generate_random(); | |
428 | sb.whoami = whoami; | |
429 | sb.compat_features = CompatSet(); | |
430 | dout(20) << __func__ << " " << sb << dendl; | |
431 | dout(20) << __func__ << " osdmap " << osdmap << dendl; | |
432 | ||
433 | update_osd_stat(); | |
434 | ||
435 | start_ticking(); | |
436 | // give a chance to the mons to inform us of what PGs we should create | |
437 | timer.add_event_after(30.0, new C_CreatePGs(this)); | |
438 | ||
439 | return 0; | |
440 | } | |
441 | ||
442 | int _shutdown() override { | |
443 | ||
444 | return 0; | |
445 | } | |
446 | ||
447 | void boot() { | |
448 | dout(1) << __func__ << " boot?" << dendl; | |
449 | ||
450 | utime_t now = ceph_clock_now(); | |
451 | if ((last_boot_attempt > 0.0) | |
452 | && ((now - last_boot_attempt)) <= STUB_BOOT_INTERVAL) { | |
453 | dout(1) << __func__ << " backoff and try again later." << dendl; | |
454 | return; | |
455 | } | |
456 | ||
457 | dout(1) << __func__ << " boot!" << dendl; | |
458 | MOSDBoot *mboot = new MOSDBoot; | |
459 | mboot->sb = sb; | |
460 | last_boot_attempt = now; | |
461 | monc.send_mon_message(mboot); | |
462 | } | |
463 | ||
464 | void add_pg(pg_t pgid, epoch_t epoch, pg_t parent) { | |
465 | ||
466 | utime_t now = ceph_clock_now(); | |
467 | ||
468 | pg_stat_t s; | |
469 | s.created = epoch; | |
470 | s.last_epoch_clean = epoch; | |
471 | s.parent = parent; | |
472 | s.state |= PG_STATE_CLEAN | PG_STATE_ACTIVE; | |
473 | s.last_fresh = now; | |
474 | s.last_change = now; | |
475 | s.last_clean = now; | |
476 | s.last_active = now; | |
477 | s.last_unstale = now; | |
478 | ||
479 | pgs[pgid] = s; | |
480 | pgs_changes.insert(pgid); | |
481 | } | |
482 | ||
483 | void auto_create_pgs() { | |
484 | bool has_pgs = !pgs.empty(); | |
485 | dout(10) << __func__ | |
486 | << ": " << (has_pgs ? "has pgs; ignore" : "create pgs") << dendl; | |
487 | if (has_pgs) | |
488 | return; | |
489 | ||
490 | if (!osdmap.get_epoch()) { | |
491 | dout(1) << __func__ | |
492 | << " still don't have osdmap; reschedule pg creation" << dendl; | |
493 | timer.add_event_after(10.0, new C_CreatePGs(this)); | |
494 | return; | |
495 | } | |
496 | ||
497 | auto& osdmap_pools = osdmap.get_pools(); | |
498 | for (auto pit = osdmap_pools.begin(); pit != osdmap_pools.end(); ++pit) { | |
499 | const int64_t pool_id = pit->first; | |
500 | const pg_pool_t &pool = pit->second; | |
31f18b77 | 501 | int ruleno = pool.get_crush_rule(); |
7c673cae FG |
502 | |
503 | if (!osdmap.crush->rule_exists(ruleno)) { | |
504 | dout(20) << __func__ | |
505 | << " no crush rule for pool id " << pool_id | |
506 | << " rule no " << ruleno << dendl; | |
507 | continue; | |
508 | } | |
509 | ||
510 | epoch_t pool_epoch = pool.get_last_change(); | |
511 | dout(20) << __func__ | |
512 | << " pool num pgs " << pool.get_pg_num() | |
513 | << " epoch " << pool_epoch << dendl; | |
514 | ||
515 | for (ps_t ps = 0; ps < pool.get_pg_num(); ++ps) { | |
11fdf7f2 | 516 | pg_t pgid(ps, pool_id); |
7c673cae FG |
517 | pg_t parent; |
518 | dout(20) << __func__ | |
519 | << " pgid " << pgid << " parent " << parent << dendl; | |
520 | add_pg(pgid, pool_epoch, parent); | |
521 | } | |
522 | } | |
523 | } | |
524 | ||
525 | void update_osd_stat() { | |
526 | struct statfs stbuf; | |
527 | int ret = statfs(".", &stbuf); | |
528 | if (ret < 0) { | |
529 | ret = -errno; | |
530 | dout(0) << __func__ | |
531 | << " cannot statfs ." << cpp_strerror(ret) << dendl; | |
532 | return; | |
533 | } | |
534 | ||
11fdf7f2 TL |
535 | osd_stat.statfs.total = stbuf.f_blocks * stbuf.f_bsize; |
536 | osd_stat.statfs.available = stbuf.f_bavail * stbuf.f_bsize; | |
537 | osd_stat.statfs.internally_reserved = 0; | |
7c673cae FG |
538 | } |
539 | ||
540 | void send_pg_stats() { | |
541 | dout(10) << __func__ | |
542 | << " pgs " << pgs.size() << " osdmap " << osdmap << dendl; | |
9f95a23c | 543 | MPGStats *mstats = new MPGStats(monc.get_fsid(), osdmap.get_epoch()); |
7c673cae FG |
544 | |
545 | mstats->set_tid(1); | |
546 | mstats->osd_stat = osd_stat; | |
547 | ||
548 | set<pg_t>::iterator it; | |
549 | for (it = pgs_changes.begin(); it != pgs_changes.end(); ++it) { | |
550 | pg_t pgid = (*it); | |
551 | if (pgs.count(pgid) == 0) { | |
552 | derr << __func__ | |
553 | << " pgid " << pgid << " not on our map" << dendl; | |
11fdf7f2 | 554 | ceph_abort_msg("pgid not on our map"); |
7c673cae FG |
555 | } |
556 | pg_stat_t &s = pgs[pgid]; | |
557 | mstats->pg_stat[pgid] = s; | |
558 | ||
559 | JSONFormatter f(true); | |
560 | s.dump(&f); | |
561 | dout(20) << __func__ | |
562 | << " pg " << pgid << " stats:\n"; | |
563 | f.flush(*_dout); | |
564 | *_dout << dendl; | |
565 | ||
566 | } | |
567 | dout(10) << __func__ << " send " << *mstats << dendl; | |
568 | monc.send_mon_message(mstats); | |
569 | } | |
570 | ||
571 | void modify_pg(pg_t pgid) { | |
572 | dout(10) << __func__ << " pg " << pgid << dendl; | |
11fdf7f2 | 573 | ceph_assert(pgs.count(pgid) > 0); |
7c673cae FG |
574 | |
575 | pg_stat_t &s = pgs[pgid]; | |
576 | utime_t now = ceph_clock_now(); | |
577 | ||
578 | if (now - s.last_change < 10.0) { | |
579 | dout(10) << __func__ | |
580 | << " pg " << pgid << " changed in the last 10s" << dendl; | |
581 | return; | |
582 | } | |
583 | ||
584 | s.state ^= PG_STATE_CLEAN; | |
585 | if (s.state & PG_STATE_CLEAN) | |
586 | s.last_clean = now; | |
587 | s.last_change = now; | |
588 | s.reported_seq++; | |
589 | ||
590 | pgs_changes.insert(pgid); | |
591 | } | |
592 | ||
593 | void modify_pgs() { | |
594 | dout(10) << __func__ << dendl; | |
595 | ||
596 | if (pgs.empty()) { | |
597 | dout(1) << __func__ | |
598 | << " no pgs available! don't attempt to modify." << dendl; | |
599 | return; | |
600 | } | |
601 | ||
602 | boost::uniform_int<> pg_rng(0, pgs.size()-1); | |
603 | set<int> pgs_pos; | |
604 | ||
605 | int num_pgs = pg_rng(gen); | |
606 | while ((int)pgs_pos.size() < num_pgs) | |
607 | pgs_pos.insert(pg_rng(gen)); | |
608 | ||
609 | map<pg_t,pg_stat_t>::iterator it = pgs.begin(); | |
610 | set<int>::iterator pos_it = pgs_pos.begin(); | |
611 | ||
612 | int pgs_at = 0; | |
613 | while (pos_it != pgs_pos.end()) { | |
614 | int at = *pos_it; | |
615 | dout(20) << __func__ << " pg at pos " << at << dendl; | |
616 | while ((pgs_at != at) && (it != pgs.end())) { | |
617 | ++it; | |
618 | ++pgs_at; | |
619 | } | |
11fdf7f2 | 620 | ceph_assert(it != pgs.end()); |
7c673cae FG |
621 | dout(20) << __func__ |
622 | << " pg at pos " << at << ": " << it->first << dendl; | |
623 | modify_pg(it->first); | |
624 | ++pos_it; | |
625 | } | |
626 | } | |
627 | ||
628 | void op_alive() { | |
629 | dout(10) << __func__ << dendl; | |
630 | if (!osdmap.exists(whoami)) { | |
631 | dout(0) << __func__ << " I'm not in the osdmap!!\n"; | |
632 | JSONFormatter f(true); | |
633 | osdmap.dump(&f); | |
634 | f.flush(*_dout); | |
635 | *_dout << dendl; | |
636 | } | |
637 | if (osdmap.get_epoch() == 0) { | |
638 | dout(1) << __func__ << " wait for osdmap" << dendl; | |
639 | return; | |
640 | } | |
641 | epoch_t up_thru = osdmap.get_up_thru(whoami); | |
642 | dout(10) << __func__ << "up_thru: " << osdmap.get_up_thru(whoami) << dendl; | |
643 | ||
644 | monc.send_mon_message(new MOSDAlive(osdmap.get_epoch(), up_thru)); | |
645 | } | |
646 | ||
647 | void op_pgtemp() { | |
648 | if (osdmap.get_epoch() == 0) { | |
649 | dout(1) << __func__ << " wait for osdmap" << dendl; | |
650 | return; | |
651 | } | |
652 | dout(10) << __func__ << dendl; | |
653 | MOSDPGTemp *m = new MOSDPGTemp(osdmap.get_epoch()); | |
654 | monc.send_mon_message(m); | |
655 | } | |
656 | ||
657 | void op_failure() { | |
658 | dout(10) << __func__ << dendl; | |
659 | } | |
660 | ||
661 | void op_pgstats() { | |
662 | dout(10) << __func__ << dendl; | |
663 | ||
664 | modify_pgs(); | |
665 | if (!pgs_changes.empty()) | |
666 | send_pg_stats(); | |
667 | monc.sub_want("osd_pg_creates", 0, CEPH_SUBSCRIBE_ONETIME); | |
668 | monc.renew_subs(); | |
669 | ||
670 | dout(20) << __func__ << " pg pools:\n"; | |
671 | ||
672 | JSONFormatter f(true); | |
673 | f.open_array_section("pools"); | |
674 | auto& osdmap_pools = osdmap.get_pools(); | |
675 | for (auto pit = osdmap_pools.begin(); pit != osdmap_pools.end(); ++pit) { | |
676 | const int64_t pool_id = pit->first; | |
677 | const pg_pool_t &pool = pit->second; | |
678 | f.open_object_section("pool"); | |
679 | f.dump_int("pool_id", pool_id); | |
680 | f.open_object_section("pool_dump"); | |
681 | pool.dump(&f); | |
682 | f.close_section(); | |
683 | f.close_section(); | |
684 | } | |
685 | f.close_section(); | |
686 | f.flush(*_dout); | |
687 | *_dout << dendl; | |
688 | } | |
689 | ||
690 | void op_log() { | |
691 | dout(10) << __func__ << dendl; | |
692 | ||
693 | MLog *m = new MLog(monc.get_fsid()); | |
694 | ||
695 | boost::uniform_int<> log_rng(1, 10); | |
696 | size_t num_entries = log_rng(gen); | |
697 | dout(10) << __func__ | |
698 | << " send " << num_entries << " log messages" << dendl; | |
699 | ||
700 | utime_t now = ceph_clock_now(); | |
701 | int seq = 0; | |
702 | for (; num_entries > 0; --num_entries) { | |
703 | LogEntry e; | |
11fdf7f2 TL |
704 | e.rank = messenger->get_myname(); |
705 | e.addrs = messenger->get_myaddrs(); | |
7c673cae FG |
706 | e.stamp = now; |
707 | e.seq = seq++; | |
708 | e.prio = CLOG_DEBUG; | |
709 | e.msg = "OSDStub::op_log"; | |
710 | m->entries.push_back(e); | |
711 | } | |
712 | ||
713 | monc.send_mon_message(m); | |
714 | } | |
715 | ||
716 | void _tick() override { | |
717 | if (!osdmap.exists(whoami)) { | |
718 | std::cout << __func__ << " not in the cluster; boot!" << std::endl; | |
719 | boot(); | |
720 | return; | |
721 | } | |
722 | ||
723 | update_osd_stat(); | |
724 | ||
725 | boost::uniform_int<> op_rng(STUB_MON_OSD_FIRST, STUB_MON_OSD_LAST); | |
726 | int op = op_rng(gen); | |
727 | switch (op) { | |
728 | case STUB_MON_OSD_ALIVE: | |
729 | op_alive(); | |
730 | break; | |
731 | case STUB_MON_OSD_PGTEMP: | |
732 | op_pgtemp(); | |
733 | break; | |
734 | case STUB_MON_OSD_FAILURE: | |
735 | op_failure(); | |
736 | break; | |
737 | case STUB_MON_OSD_PGSTATS: | |
738 | op_pgstats(); | |
739 | break; | |
740 | case STUB_MON_LOG: | |
741 | op_log(); | |
742 | break; | |
743 | } | |
744 | } | |
745 | ||
746 | void handle_pg_create(MOSDPGCreate *m) { | |
11fdf7f2 | 747 | ceph_assert(m != NULL); |
7c673cae FG |
748 | if (m->epoch < osdmap.get_epoch()) { |
749 | std::cout << __func__ << " epoch " << m->epoch << " < " | |
750 | << osdmap.get_epoch() << "; dropping" << std::endl; | |
751 | m->put(); | |
752 | return; | |
753 | } | |
754 | ||
755 | for (map<pg_t,pg_create_t>::iterator it = m->mkpg.begin(); | |
756 | it != m->mkpg.end(); ++it) { | |
757 | pg_create_t &c = it->second; | |
758 | std::cout << __func__ << " pg " << it->first | |
759 | << " created " << c.created | |
760 | << " parent " << c.parent << std::endl; | |
761 | if (pgs.count(it->first)) { | |
762 | std::cout << __func__ << " pg " << it->first | |
763 | << " exists; skipping" << std::endl; | |
764 | continue; | |
765 | } | |
766 | ||
767 | pg_t pgid = it->first; | |
768 | add_pg(pgid, c.created, c.parent); | |
769 | } | |
770 | send_pg_stats(); | |
771 | } | |
772 | ||
773 | void handle_osd_map(MOSDMap *m) { | |
774 | dout(1) << __func__ << dendl; | |
775 | if (m->fsid != monc.get_fsid()) { | |
776 | dout(0) << __func__ | |
777 | << " message fsid " << m->fsid << " != " << monc.get_fsid() | |
778 | << dendl; | |
779 | dout(0) << __func__ << " " << m | |
780 | << " from " << m->get_source_inst() | |
781 | << dendl; | |
782 | dout(0) << monc.get_monmap() << dendl; | |
783 | } | |
11fdf7f2 | 784 | ceph_assert(m->fsid == monc.get_fsid()); |
7c673cae FG |
785 | |
786 | epoch_t first = m->get_first(); | |
787 | epoch_t last = m->get_last(); | |
788 | dout(5) << __func__ | |
789 | << " epochs [" << first << "," << last << "]" | |
790 | << " current " << osdmap.get_epoch() << dendl; | |
791 | ||
792 | if (last <= osdmap.get_epoch()) { | |
793 | dout(5) << __func__ << " no new maps here; dropping" << dendl; | |
794 | m->put(); | |
795 | return; | |
796 | } | |
797 | ||
798 | if (first > osdmap.get_epoch() + 1) { | |
799 | dout(5) << __func__ | |
800 | << osdmap.get_epoch() + 1 << ".." << (first-1) << dendl; | |
801 | if ((m->oldest_map < first && osdmap.get_epoch() == 0) || | |
802 | m->oldest_map <= osdmap.get_epoch()) { | |
803 | monc.sub_want("osdmap", osdmap.get_epoch()+1, | |
804 | CEPH_SUBSCRIBE_ONETIME); | |
805 | monc.renew_subs(); | |
806 | m->put(); | |
807 | return; | |
808 | } | |
809 | } | |
810 | ||
11fdf7f2 | 811 | epoch_t start_full = std::max(osdmap.get_epoch() + 1, first); |
7c673cae FG |
812 | |
813 | if (m->maps.size() > 0) { | |
814 | map<epoch_t,bufferlist>::reverse_iterator rit; | |
815 | rit = m->maps.rbegin(); | |
816 | if (start_full <= rit->first) { | |
817 | start_full = rit->first; | |
818 | dout(5) << __func__ | |
819 | << " full epoch " << start_full << dendl; | |
820 | bufferlist &bl = rit->second; | |
11fdf7f2 | 821 | auto p = bl.cbegin(); |
7c673cae FG |
822 | osdmap.decode(p); |
823 | } | |
824 | } | |
825 | ||
826 | for (epoch_t e = start_full; e <= last; e++) { | |
827 | map<epoch_t,bufferlist>::iterator it; | |
828 | it = m->incremental_maps.find(e); | |
829 | if (it == m->incremental_maps.end()) | |
830 | continue; | |
831 | ||
832 | dout(20) << __func__ | |
833 | << " incremental epoch " << e | |
834 | << " on full epoch " << start_full << dendl; | |
835 | OSDMap::Incremental inc; | |
836 | bufferlist &bl = it->second; | |
11fdf7f2 | 837 | auto p = bl.cbegin(); |
7c673cae FG |
838 | inc.decode(p); |
839 | ||
840 | int err = osdmap.apply_incremental(inc); | |
841 | if (err < 0) { | |
842 | derr << "osd." << whoami << "::" << __func__ | |
843 | << "** ERROR: applying incremental: " | |
844 | << cpp_strerror(err) << dendl; | |
11fdf7f2 | 845 | ceph_abort_msg("error applying incremental"); |
7c673cae FG |
846 | } |
847 | } | |
848 | dout(30) << __func__ << "\nosdmap:\n"; | |
849 | JSONFormatter f(true); | |
850 | osdmap.dump(&f); | |
851 | f.flush(*_dout); | |
852 | *_dout << dendl; | |
853 | ||
854 | if (osdmap.is_up(whoami) && | |
11fdf7f2 | 855 | osdmap.get_addrs(whoami) == messenger->get_myaddrs()) { |
7c673cae FG |
856 | dout(1) << __func__ |
857 | << " got into the osdmap and we're up!" << dendl; | |
858 | } | |
859 | ||
860 | if (m->newest_map && m->newest_map > last) { | |
861 | dout(1) << __func__ | |
862 | << " they have more maps; requesting them!" << dendl; | |
863 | monc.sub_want("osdmap", osdmap.get_epoch()+1, CEPH_SUBSCRIBE_ONETIME); | |
864 | monc.renew_subs(); | |
865 | } | |
866 | ||
867 | dout(10) << __func__ << " done" << dendl; | |
868 | m->put(); | |
869 | } | |
870 | ||
871 | bool ms_dispatch(Message *m) override { | |
872 | dout(1) << __func__ << " " << *m << dendl; | |
873 | ||
874 | switch (m->get_type()) { | |
875 | case MSG_OSD_PG_CREATE: | |
876 | handle_pg_create((MOSDPGCreate*)m); | |
877 | break; | |
878 | case CEPH_MSG_OSD_MAP: | |
879 | handle_osd_map((MOSDMap*)m); | |
880 | break; | |
881 | default: | |
882 | m->put(); | |
883 | break; | |
884 | } | |
885 | return true; | |
886 | } | |
887 | ||
888 | void ms_handle_connect(Connection *con) override { | |
889 | dout(1) << __func__ << " " << con << dendl; | |
890 | if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) { | |
891 | dout(10) << __func__ << " on mon" << dendl; | |
892 | } | |
893 | } | |
894 | ||
895 | void ms_handle_remote_reset(Connection *con) override {} | |
896 | ||
897 | bool ms_handle_reset(Connection *con) override { | |
898 | dout(1) << __func__ << dendl; | |
11fdf7f2 | 899 | return con->get_priv().get(); |
7c673cae FG |
900 | } |
901 | ||
902 | bool ms_handle_refused(Connection *con) override { | |
903 | return false; | |
904 | } | |
905 | ||
906 | const string get_name() override { | |
907 | stringstream ss; | |
908 | ss << "osd." << whoami; | |
909 | return ss.str(); | |
910 | } | |
911 | }; | |
912 | ||
913 | double const OSDStub::STUB_BOOT_INTERVAL = 10.0; | |
914 | ||
915 | #undef dout_prefix | |
916 | #define dout_prefix *_dout << "main " | |
917 | ||
918 | const char *our_name = NULL; | |
919 | vector<TestStub*> stubs; | |
9f95a23c TL |
920 | ceph::mutex shutdown_lock = ceph::make_mutex("main::shutdown_lock"); |
921 | ceph::condition_variable shutdown_cond; | |
7c673cae FG |
922 | Context *shutdown_cb = NULL; |
923 | SafeTimer *shutdown_timer = NULL; | |
924 | ||
925 | struct C_Shutdown : public Context | |
926 | { | |
927 | void finish(int r) override { | |
928 | generic_dout(10) << "main::shutdown time has ran out" << dendl; | |
9f95a23c | 929 | shutdown_cond.notify_all(); |
7c673cae FG |
930 | } |
931 | }; | |
932 | ||
933 | void handle_test_signal(int signum) | |
934 | { | |
935 | if ((signum != SIGINT) && (signum != SIGTERM)) | |
936 | return; | |
937 | ||
938 | std::cerr << "*** Got signal " << sig_str(signum) << " ***" << std::endl; | |
9f95a23c | 939 | std::lock_guard l{shutdown_lock}; |
7c673cae FG |
940 | if (shutdown_timer) { |
941 | shutdown_timer->cancel_all_events(); | |
9f95a23c | 942 | shutdown_cond.notify_all(); |
7c673cae FG |
943 | } |
944 | } | |
945 | ||
946 | void usage() { | |
11fdf7f2 | 947 | ceph_assert(our_name != NULL); |
7c673cae FG |
948 | |
949 | std::cout << "usage: " << our_name | |
950 | << " <--stub-id ID> [--stub-id ID...]" | |
951 | << std::endl; | |
952 | std::cout << "\n\ | |
953 | Global Options:\n\ | |
954 | -c FILE Read configuration from FILE\n\ | |
955 | --keyring FILE Read keyring from FILE\n\ | |
956 | --help This message\n\ | |
957 | \n\ | |
958 | Test-specific Options:\n\ | |
959 | --stub-id ID1..ID2 Interval of OSD ids for multiple stubs to mimic.\n\ | |
960 | --stub-id ID OSD id a stub will mimic to be\n\ | |
961 | (same as --stub-id ID..ID)\n\ | |
962 | " << std::endl; | |
963 | } | |
964 | ||
965 | int get_id_interval(int &first, int &last, string &str) | |
966 | { | |
967 | size_t found = str.find(".."); | |
968 | string first_str, last_str; | |
969 | if (found == string::npos) { | |
970 | first_str = last_str = str; | |
971 | } else { | |
972 | first_str = str.substr(0, found); | |
973 | last_str = str.substr(found+2); | |
974 | } | |
975 | ||
976 | string err; | |
977 | first = strict_strtol(first_str.c_str(), 10, &err); | |
978 | if ((first == 0) && (!err.empty())) { | |
979 | std::cerr << err << std::endl; | |
980 | return -1; | |
981 | } | |
982 | ||
983 | last = strict_strtol(last_str.c_str(), 10, &err); | |
984 | if ((last == 0) && (!err.empty())) { | |
985 | std::cerr << err << std::endl; | |
986 | return -1; | |
987 | } | |
988 | return 0; | |
989 | } | |
990 | ||
991 | int main(int argc, const char *argv[]) | |
992 | { | |
7c673cae | 993 | our_name = argv[0]; |
20effc67 | 994 | auto args = argv_to_vec(argc, argv); |
7c673cae | 995 | |
20effc67 | 996 | auto cct = global_init(nullptr, args, |
7c673cae | 997 | CEPH_ENTITY_TYPE_OSD, CODE_ENVIRONMENT_UTILITY, |
11fdf7f2 | 998 | CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); |
7c673cae FG |
999 | |
1000 | common_init_finish(g_ceph_context); | |
11fdf7f2 | 1001 | g_ceph_context->_conf.apply_changes(nullptr); |
7c673cae FG |
1002 | |
1003 | set<int> stub_ids; | |
1004 | double duration = 300.0; | |
1005 | ||
1006 | for (std::vector<const char*>::iterator i = args.begin(); i != args.end();) { | |
1007 | string val; | |
1008 | ||
1009 | if (ceph_argparse_double_dash(args, i)) { | |
1010 | break; | |
1011 | } else if (ceph_argparse_witharg(args, i, &val, | |
1012 | "--stub-id", (char*) NULL)) { | |
1013 | int first = -1, last = -1; | |
1014 | if (get_id_interval(first, last, val) < 0) { | |
1015 | std::cerr << "** error parsing stub id '" << val << "'" << std::endl; | |
1016 | exit(1); | |
1017 | } | |
1018 | ||
1019 | for (; first <= last; ++first) | |
1020 | stub_ids.insert(first); | |
1021 | } else if (ceph_argparse_witharg(args, i, &val, | |
1022 | "--duration", (char*) NULL)) { | |
1023 | string err; | |
1024 | duration = (double) strict_strtol(val.c_str(), 10, &err); | |
1025 | if ((duration == 0) && (!err.empty())) { | |
1026 | std::cerr << "** error parsing '--duration " << val << "': '" | |
1027 | << err << std::endl; | |
1028 | exit(1); | |
1029 | } | |
1030 | } else if (ceph_argparse_flag(args, i, "--help", (char*) NULL)) { | |
1031 | usage(); | |
1032 | exit(0); | |
1033 | } else { | |
1034 | std::cerr << "unknown argument '" << *i << "'" << std::endl; | |
1035 | return 1; | |
1036 | } | |
1037 | } | |
1038 | ||
1039 | if (stub_ids.empty()) { | |
1040 | std::cerr << "** error: must specify at least one '--stub-id <ID>'" | |
1041 | << std::endl; | |
1042 | usage(); | |
1043 | return 1; | |
1044 | } | |
1045 | ||
1046 | for (set<int>::iterator i = stub_ids.begin(); i != stub_ids.end(); ++i) { | |
1047 | int whoami = *i; | |
1048 | ||
1049 | std::cout << __func__ << " starting stub." << whoami << std::endl; | |
1050 | OSDStub *stub = new OSDStub(whoami, g_ceph_context); | |
1051 | int err = stub->init(); | |
1052 | if (err < 0) { | |
1053 | std::cerr << "** osd stub error: " << cpp_strerror(-err) << std::endl; | |
1054 | return 1; | |
1055 | } | |
1056 | stubs.push_back(stub); | |
1057 | } | |
1058 | ||
1059 | std::cout << __func__ << " starting client stub" << std::endl; | |
1060 | ClientStub *cstub = new ClientStub(g_ceph_context); | |
1061 | int err = cstub->init(); | |
1062 | if (err < 0) { | |
1063 | std::cerr << "** client stub error: " << cpp_strerror(-err) << std::endl; | |
1064 | return 1; | |
1065 | } | |
1066 | stubs.push_back(cstub); | |
1067 | ||
1068 | init_async_signal_handler(); | |
1069 | register_async_signal_handler_oneshot(SIGINT, handle_test_signal); | |
1070 | register_async_signal_handler_oneshot(SIGTERM, handle_test_signal); | |
1071 | ||
9f95a23c TL |
1072 | { |
1073 | unique_lock locker{shutdown_lock}; | |
1074 | shutdown_timer = new SafeTimer(g_ceph_context, shutdown_lock); | |
1075 | shutdown_timer->init(); | |
1076 | if (duration != 0) { | |
1077 | std::cout << __func__ | |
1078 | << " run test for " << duration << " seconds" << std::endl; | |
1079 | shutdown_timer->add_event_after((double) duration, new C_Shutdown); | |
1080 | } | |
1081 | shutdown_cond.wait(locker); | |
1082 | shutdown_timer->shutdown(); | |
1083 | delete shutdown_timer; | |
1084 | shutdown_timer = NULL; | |
7c673cae | 1085 | } |
7c673cae FG |
1086 | unregister_async_signal_handler(SIGINT, handle_test_signal); |
1087 | unregister_async_signal_handler(SIGTERM, handle_test_signal); | |
1088 | ||
1089 | std::cout << __func__ << " waiting for stubs to finish" << std::endl; | |
1090 | vector<TestStub*>::iterator it; | |
1091 | int i; | |
1092 | for (i = 0, it = stubs.begin(); it != stubs.end(); ++it, ++i) { | |
1093 | if (*it != NULL) { | |
1094 | (*it)->shutdown(); | |
1095 | (*it)->wait(); | |
1096 | std::cout << __func__ << " finished " << (*it)->get_name() << std::endl; | |
1097 | delete (*it); | |
1098 | (*it) = NULL; | |
1099 | } | |
1100 | } | |
1101 | ||
1102 | return 0; | |
1103 | } |