1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #ifdef HAVE_SYS_MOUNT_H
17 #include <sys/mount.h>
20 #ifdef HAVE_SYS_PARAM_H
21 #include <sys/param.h>
32 #include <boost/scoped_ptr.hpp>
33 #include <boost/random/mersenne_twister.hpp>
34 #include <boost/random/uniform_int.hpp>
37 #include "osd/osd_types.h"
38 #include "osdc/Objecter.h"
39 #include "mon/MonClient.h"
40 #include "msg/Dispatcher.h"
41 #include "msg/Messenger.h"
42 #include "common/async/context_pool.h"
43 #include "common/Timer.h"
44 #include "common/ceph_argparse.h"
45 #include "global/global_init.h"
46 #include "global/signal_handler.h"
47 #include "common/config.h"
48 #include "common/debug.h"
49 #include "common/errno.h"
50 #include "common/ceph_mutex.h"
51 #include "common/strtol.h"
52 #include "common/LogEntry.h"
53 #include "auth/KeyRing.h"
54 #include "auth/AuthAuthorizeHandler.h"
55 #include "include/uuid.h"
56 #include "include/ceph_assert.h"
58 #include "messages/MOSDBoot.h"
59 #include "messages/MOSDAlive.h"
60 #include "messages/MOSDPGCreate.h"
61 #include "messages/MOSDPGRemove.h"
62 #include "messages/MOSDMap.h"
63 #include "messages/MPGStats.h"
64 #include "messages/MLog.h"
65 #include "messages/MOSDPGTemp.h"
69 #define dout_context g_ceph_context
70 #define dout_subsys ceph_subsys_
72 #define dout_prefix _prefix(_dout, get_name())
73 static ostream
& _prefix(std::ostream
*_dout
, const string
&n
) {
74 return *_dout
<< " stub(" << n
<< ") ";
78 typedef boost::mt11213b rngen_t
;
79 typedef boost::scoped_ptr
<Messenger
> MessengerRef
;
80 typedef boost::scoped_ptr
<Objecter
> ObjecterRef
;
82 class TestStub
: public Dispatcher
85 MessengerRef messenger
;
86 ceph::async::io_context_pool poolctx
;
90 ceph::condition_variable cond
;
96 struct C_Tick
: public Context
{
98 explicit C_Tick(TestStub
*stub
) : s(stub
) {}
99 void finish(int r
) override
{
100 generic_dout(20) << "C_Tick::" << __func__
<< dendl
;
101 if (r
== -ECANCELED
) {
102 generic_dout(20) << "C_Tick::" << __func__
103 << " shutdown" << dendl
;
110 bool ms_dispatch(Message
*m
) override
= 0;
111 void ms_handle_connect(Connection
*con
) override
= 0;
112 void ms_handle_remote_reset(Connection
*con
) override
= 0;
113 virtual int _shutdown() = 0;
114 // courtesy method to be implemented by the stubs at their
116 virtual void _tick() { }
117 // different stubs may have different needs; if a stub needs
118 // to tick, then it must call this function.
119 void start_ticking(double t
=1.0) {
125 dout(20) << __func__
<< " adding tick timer" << dendl
;
126 timer
.add_event_after(tick_seconds
, new C_Tick(this));
128 // If we have a function to start ticking that the stubs can
129 // use at their own discretion, then we should also have a
130 // function to disable said ticking to be used the same way.
132 // For simplicity's sake, we don't cancel the tick right off
133 // the bat; instead, we wait for the next tick to kick in and
135 void stop_ticking() {
136 dout(20) << __func__
<< " disable tick" << dendl
;
142 std::cout
<< __func__
<< std::endl
;
143 if (do_shutdown
|| (tick_seconds
<= 0)) {
144 std::cout
<< __func__
<< " "
145 << (do_shutdown
? "shutdown" : "stop ticking")
150 timer
.add_event_after(tick_seconds
, new C_Tick(this));
153 virtual const string
get_name() = 0;
154 virtual int init() = 0;
156 virtual int shutdown() {
157 std::lock_guard l
{lock
};
161 dout(10) << __func__
<< " error shutting down: "
162 << cpp_strerror(-r
) << dendl
;
167 messenger
->shutdown();
172 virtual void print(ostream
&out
) {
173 out
<< "stub(" << get_name() << ")";
177 if (messenger
!= NULL
)
181 TestStub(CephContext
*cct
, string who
)
184 lock(ceph::make_mutex(who
.append("::lock"))),
187 tick_seconds(0.0) { }
190 class ClientStub
: public TestStub
192 ObjecterRef objecter
;
196 bool ms_dispatch(Message
*m
) override
{
197 std::lock_guard l
{lock
};
198 dout(1) << "client::" << __func__
<< " " << *m
<< dendl
;
199 switch (m
->get_type()) {
200 case CEPH_MSG_OSD_MAP
:
201 objecter
->handle_osd_map((MOSDMap
*)m
);
208 void ms_handle_connect(Connection
*con
) override
{
209 dout(1) << "client::" << __func__
<< " " << con
<< dendl
;
210 std::lock_guard l
{lock
};
211 objecter
->ms_handle_connect(con
);
214 void ms_handle_remote_reset(Connection
*con
) override
{
215 dout(1) << "client::" << __func__
<< " " << con
<< dendl
;
216 std::lock_guard l
{lock
};
217 objecter
->ms_handle_remote_reset(con
);
220 bool ms_handle_reset(Connection
*con
) override
{
221 dout(1) << "client::" << __func__
<< dendl
;
222 std::lock_guard l
{lock
};
223 objecter
->ms_handle_reset(con
);
227 bool ms_handle_refused(Connection
*con
) override
{
231 const string
get_name() override
{
235 int _shutdown() override
{
237 objecter
->shutdown();
243 explicit ClientStub(CephContext
*cct
)
244 : TestStub(cct
, "client"),
245 gen((int) time(NULL
))
248 int init() override
{
251 err
= monc
.build_initial_monmap();
253 derr
<< "ClientStub::" << __func__
<< " ERROR: build initial monmap: "
254 << cpp_strerror(err
) << dendl
;
258 messenger
.reset(Messenger::create_client_messenger(cct
, "stubclient"));
259 ceph_assert(messenger
.get() != NULL
);
261 messenger
->set_default_policy(
262 Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX
));
263 dout(10) << "ClientStub::" << __func__
<< " starting messenger at "
264 << messenger
->get_myaddrs() << dendl
;
266 objecter
.reset(new Objecter(cct
, messenger
.get(), &monc
, poolctx
));
267 ceph_assert(objecter
.get() != NULL
);
268 objecter
->set_balanced_budget();
270 monc
.set_messenger(messenger
.get());
272 messenger
->add_dispatcher_head(this);
274 monc
.set_want_keys(CEPH_ENTITY_TYPE_MON
|CEPH_ENTITY_TYPE_OSD
);
278 derr
<< "ClientStub::" << __func__
<< " monc init error: "
279 << cpp_strerror(-err
) << dendl
;
283 err
= monc
.authenticate();
285 derr
<< "ClientStub::" << __func__
<< " monc authenticate error: "
286 << cpp_strerror(-err
) << dendl
;
290 monc
.wait_auth_rotating(30.0);
292 objecter
->set_client_incarnation(0);
301 objecter
->wait_for_osd_map();
303 dout(10) << "ClientStub::" << __func__
<< " done" << dendl
;
308 class OSDStub
: public TestStub
315 map
<pg_t
,pg_stat_t
> pgs
;
316 set
<pg_t
> pgs_changes
;
319 boost::uniform_int
<> mon_osd_rng
;
321 utime_t last_boot_attempt
;
322 static const double STUB_BOOT_INTERVAL
;
328 STUB_MON_OSD_ALIVE
= 1,
329 STUB_MON_OSD_PGTEMP
= 2,
330 STUB_MON_OSD_FAILURE
= 3,
331 STUB_MON_OSD_PGSTATS
= 4,
334 STUB_MON_OSD_FIRST
= STUB_MON_OSD_ALIVE
,
335 STUB_MON_OSD_LAST
= STUB_MON_LOG
,
338 struct C_CreatePGs
: public Context
{
340 explicit C_CreatePGs(OSDStub
*stub
) : s(stub
) {}
341 void finish(int r
) override
{
342 if (r
== -ECANCELED
) {
343 generic_dout(20) << "C_CreatePGs::" << __func__
344 << " shutdown" << dendl
;
347 generic_dout(20) << "C_CreatePGs::" << __func__
<< dendl
;
348 s
->auto_create_pgs();
353 OSDStub(int _whoami
, CephContext
*cct
)
354 : TestStub(cct
, "osd"),
357 mon_osd_rng(STUB_MON_OSD_FIRST
, STUB_MON_OSD_LAST
)
359 dout(20) << __func__
<< " auth supported: "
360 << cct
->_conf
->auth_supported
<< dendl
;
362 ss
<< "client-osd" << whoami
;
363 std::string public_msgr_type
= cct
->_conf
->ms_public_type
.empty() ? cct
->_conf
.get_val
<std::string
>("ms_type") : cct
->_conf
->ms_public_type
;
364 messenger
.reset(Messenger::create(cct
, public_msgr_type
, entity_name_t::OSD(whoami
),
365 ss
.str().c_str(), getpid()));
367 Throttle
throttler(g_ceph_context
, "osd_client_bytes",
368 g_conf()->osd_client_message_size_cap
);
370 messenger
->set_default_policy(
371 Messenger::Policy::stateless_server(0));
372 messenger
->set_policy_throttlers(entity_name_t::TYPE_CLIENT
,
374 messenger
->set_policy(entity_name_t::TYPE_MON
,
375 Messenger::Policy::lossy_client(
377 CEPH_FEATURE_PGID64
|
378 CEPH_FEATURE_OSDENC
));
379 messenger
->set_policy(entity_name_t::TYPE_OSD
,
380 Messenger::Policy::stateless_server(0));
382 dout(10) << __func__
<< " public addr " << g_conf()->public_addr
<< dendl
;
383 int err
= messenger
->bind(g_conf()->public_addr
);
387 if (monc
.build_initial_monmap() < 0)
391 monc
.set_messenger(messenger
.get());
394 int init() override
{
395 dout(10) << __func__
<< dendl
;
396 std::lock_guard l
{lock
};
398 dout(1) << __func__
<< " fsid " << monc
.monmap
.fsid
399 << " osd_fsid " << g_conf()->osd_uuid
<< dendl
;
400 dout(1) << __func__
<< " name " << g_conf()->name
<< dendl
;
403 messenger
->add_dispatcher_head(this);
404 monc
.set_want_keys(CEPH_ENTITY_TYPE_MON
| CEPH_ENTITY_TYPE_OSD
);
406 int err
= monc
.init();
408 derr
<< __func__
<< " monc init error: "
409 << cpp_strerror(-err
) << dendl
;
413 err
= monc
.authenticate();
415 derr
<< __func__
<< " monc authenticate error: "
416 << cpp_strerror(-err
) << dendl
;
420 ceph_assert(!monc
.get_fsid().is_zero());
422 monc
.wait_auth_rotating(30.0);
425 dout(10) << __func__
<< " creating osd superblock" << dendl
;
426 sb
.cluster_fsid
= monc
.monmap
.fsid
;
427 sb
.osd_fsid
.generate_random();
429 sb
.compat_features
= CompatSet();
430 dout(20) << __func__
<< " " << sb
<< dendl
;
431 dout(20) << __func__
<< " osdmap " << osdmap
<< dendl
;
436 // give a chance to the mons to inform us of what PGs we should create
437 timer
.add_event_after(30.0, new C_CreatePGs(this));
442 int _shutdown() override
{
448 dout(1) << __func__
<< " boot?" << dendl
;
450 utime_t now
= ceph_clock_now();
451 if ((last_boot_attempt
> 0.0)
452 && ((now
- last_boot_attempt
)) <= STUB_BOOT_INTERVAL
) {
453 dout(1) << __func__
<< " backoff and try again later." << dendl
;
457 dout(1) << __func__
<< " boot!" << dendl
;
458 MOSDBoot
*mboot
= new MOSDBoot
;
460 last_boot_attempt
= now
;
461 monc
.send_mon_message(mboot
);
464 void add_pg(pg_t pgid
, epoch_t epoch
, pg_t parent
) {
466 utime_t now
= ceph_clock_now();
470 s
.last_epoch_clean
= epoch
;
472 s
.state
|= PG_STATE_CLEAN
| PG_STATE_ACTIVE
;
477 s
.last_unstale
= now
;
480 pgs_changes
.insert(pgid
);
483 void auto_create_pgs() {
484 bool has_pgs
= !pgs
.empty();
486 << ": " << (has_pgs
? "has pgs; ignore" : "create pgs") << dendl
;
490 if (!osdmap
.get_epoch()) {
492 << " still don't have osdmap; reschedule pg creation" << dendl
;
493 timer
.add_event_after(10.0, new C_CreatePGs(this));
497 auto& osdmap_pools
= osdmap
.get_pools();
498 for (auto pit
= osdmap_pools
.begin(); pit
!= osdmap_pools
.end(); ++pit
) {
499 const int64_t pool_id
= pit
->first
;
500 const pg_pool_t
&pool
= pit
->second
;
501 int ruleno
= pool
.get_crush_rule();
503 if (!osdmap
.crush
->rule_exists(ruleno
)) {
505 << " no crush rule for pool id " << pool_id
506 << " rule no " << ruleno
<< dendl
;
510 epoch_t pool_epoch
= pool
.get_last_change();
512 << " pool num pgs " << pool
.get_pg_num()
513 << " epoch " << pool_epoch
<< dendl
;
515 for (ps_t ps
= 0; ps
< pool
.get_pg_num(); ++ps
) {
516 pg_t
pgid(ps
, pool_id
);
519 << " pgid " << pgid
<< " parent " << parent
<< dendl
;
520 add_pg(pgid
, pool_epoch
, parent
);
525 void update_osd_stat() {
527 int ret
= statfs(".", &stbuf
);
531 << " cannot statfs ." << cpp_strerror(ret
) << dendl
;
535 osd_stat
.statfs
.total
= stbuf
.f_blocks
* stbuf
.f_bsize
;
536 osd_stat
.statfs
.available
= stbuf
.f_bavail
* stbuf
.f_bsize
;
537 osd_stat
.statfs
.internally_reserved
= 0;
540 void send_pg_stats() {
542 << " pgs " << pgs
.size() << " osdmap " << osdmap
<< dendl
;
543 MPGStats
*mstats
= new MPGStats(monc
.get_fsid(), osdmap
.get_epoch());
546 mstats
->osd_stat
= osd_stat
;
548 set
<pg_t
>::iterator it
;
549 for (it
= pgs_changes
.begin(); it
!= pgs_changes
.end(); ++it
) {
551 if (pgs
.count(pgid
) == 0) {
553 << " pgid " << pgid
<< " not on our map" << dendl
;
554 ceph_abort_msg("pgid not on our map");
556 pg_stat_t
&s
= pgs
[pgid
];
557 mstats
->pg_stat
[pgid
] = s
;
559 JSONFormatter
f(true);
562 << " pg " << pgid
<< " stats:\n";
567 dout(10) << __func__
<< " send " << *mstats
<< dendl
;
568 monc
.send_mon_message(mstats
);
571 void modify_pg(pg_t pgid
) {
572 dout(10) << __func__
<< " pg " << pgid
<< dendl
;
573 ceph_assert(pgs
.count(pgid
) > 0);
575 pg_stat_t
&s
= pgs
[pgid
];
576 utime_t now
= ceph_clock_now();
578 if (now
- s
.last_change
< 10.0) {
580 << " pg " << pgid
<< " changed in the last 10s" << dendl
;
584 s
.state
^= PG_STATE_CLEAN
;
585 if (s
.state
& PG_STATE_CLEAN
)
590 pgs_changes
.insert(pgid
);
594 dout(10) << __func__
<< dendl
;
598 << " no pgs available! don't attempt to modify." << dendl
;
602 boost::uniform_int
<> pg_rng(0, pgs
.size()-1);
605 int num_pgs
= pg_rng(gen
);
606 while ((int)pgs_pos
.size() < num_pgs
)
607 pgs_pos
.insert(pg_rng(gen
));
609 map
<pg_t
,pg_stat_t
>::iterator it
= pgs
.begin();
610 set
<int>::iterator pos_it
= pgs_pos
.begin();
613 while (pos_it
!= pgs_pos
.end()) {
615 dout(20) << __func__
<< " pg at pos " << at
<< dendl
;
616 while ((pgs_at
!= at
) && (it
!= pgs
.end())) {
620 ceph_assert(it
!= pgs
.end());
622 << " pg at pos " << at
<< ": " << it
->first
<< dendl
;
623 modify_pg(it
->first
);
629 dout(10) << __func__
<< dendl
;
630 if (!osdmap
.exists(whoami
)) {
631 dout(0) << __func__
<< " I'm not in the osdmap!!\n";
632 JSONFormatter
f(true);
637 if (osdmap
.get_epoch() == 0) {
638 dout(1) << __func__
<< " wait for osdmap" << dendl
;
641 epoch_t up_thru
= osdmap
.get_up_thru(whoami
);
642 dout(10) << __func__
<< "up_thru: " << osdmap
.get_up_thru(whoami
) << dendl
;
644 monc
.send_mon_message(new MOSDAlive(osdmap
.get_epoch(), up_thru
));
648 if (osdmap
.get_epoch() == 0) {
649 dout(1) << __func__
<< " wait for osdmap" << dendl
;
652 dout(10) << __func__
<< dendl
;
653 MOSDPGTemp
*m
= new MOSDPGTemp(osdmap
.get_epoch());
654 monc
.send_mon_message(m
);
658 dout(10) << __func__
<< dendl
;
662 dout(10) << __func__
<< dendl
;
665 if (!pgs_changes
.empty())
667 monc
.sub_want("osd_pg_creates", 0, CEPH_SUBSCRIBE_ONETIME
);
670 dout(20) << __func__
<< " pg pools:\n";
672 JSONFormatter
f(true);
673 f
.open_array_section("pools");
674 auto& osdmap_pools
= osdmap
.get_pools();
675 for (auto pit
= osdmap_pools
.begin(); pit
!= osdmap_pools
.end(); ++pit
) {
676 const int64_t pool_id
= pit
->first
;
677 const pg_pool_t
&pool
= pit
->second
;
678 f
.open_object_section("pool");
679 f
.dump_int("pool_id", pool_id
);
680 f
.open_object_section("pool_dump");
691 dout(10) << __func__
<< dendl
;
693 MLog
*m
= new MLog(monc
.get_fsid());
695 boost::uniform_int
<> log_rng(1, 10);
696 size_t num_entries
= log_rng(gen
);
698 << " send " << num_entries
<< " log messages" << dendl
;
700 utime_t now
= ceph_clock_now();
702 for (; num_entries
> 0; --num_entries
) {
704 e
.rank
= messenger
->get_myname();
705 e
.addrs
= messenger
->get_myaddrs();
709 e
.msg
= "OSDStub::op_log";
710 m
->entries
.push_back(e
);
713 monc
.send_mon_message(m
);
716 void _tick() override
{
717 if (!osdmap
.exists(whoami
)) {
718 std::cout
<< __func__
<< " not in the cluster; boot!" << std::endl
;
725 boost::uniform_int
<> op_rng(STUB_MON_OSD_FIRST
, STUB_MON_OSD_LAST
);
726 int op
= op_rng(gen
);
728 case STUB_MON_OSD_ALIVE
:
731 case STUB_MON_OSD_PGTEMP
:
734 case STUB_MON_OSD_FAILURE
:
737 case STUB_MON_OSD_PGSTATS
:
746 void handle_pg_create(MOSDPGCreate
*m
) {
747 ceph_assert(m
!= NULL
);
748 if (m
->epoch
< osdmap
.get_epoch()) {
749 std::cout
<< __func__
<< " epoch " << m
->epoch
<< " < "
750 << osdmap
.get_epoch() << "; dropping" << std::endl
;
755 for (map
<pg_t
,pg_create_t
>::iterator it
= m
->mkpg
.begin();
756 it
!= m
->mkpg
.end(); ++it
) {
757 pg_create_t
&c
= it
->second
;
758 std::cout
<< __func__
<< " pg " << it
->first
759 << " created " << c
.created
760 << " parent " << c
.parent
<< std::endl
;
761 if (pgs
.count(it
->first
)) {
762 std::cout
<< __func__
<< " pg " << it
->first
763 << " exists; skipping" << std::endl
;
767 pg_t pgid
= it
->first
;
768 add_pg(pgid
, c
.created
, c
.parent
);
773 void handle_osd_map(MOSDMap
*m
) {
774 dout(1) << __func__
<< dendl
;
775 if (m
->fsid
!= monc
.get_fsid()) {
777 << " message fsid " << m
->fsid
<< " != " << monc
.get_fsid()
779 dout(0) << __func__
<< " " << m
780 << " from " << m
->get_source_inst()
782 dout(0) << monc
.get_monmap() << dendl
;
784 ceph_assert(m
->fsid
== monc
.get_fsid());
786 epoch_t first
= m
->get_first();
787 epoch_t last
= m
->get_last();
789 << " epochs [" << first
<< "," << last
<< "]"
790 << " current " << osdmap
.get_epoch() << dendl
;
792 if (last
<= osdmap
.get_epoch()) {
793 dout(5) << __func__
<< " no new maps here; dropping" << dendl
;
798 if (first
> osdmap
.get_epoch() + 1) {
800 << osdmap
.get_epoch() + 1 << ".." << (first
-1) << dendl
;
801 if ((m
->oldest_map
< first
&& osdmap
.get_epoch() == 0) ||
802 m
->oldest_map
<= osdmap
.get_epoch()) {
803 monc
.sub_want("osdmap", osdmap
.get_epoch()+1,
804 CEPH_SUBSCRIBE_ONETIME
);
811 epoch_t start_full
= std::max(osdmap
.get_epoch() + 1, first
);
813 if (m
->maps
.size() > 0) {
814 map
<epoch_t
,bufferlist
>::reverse_iterator rit
;
815 rit
= m
->maps
.rbegin();
816 if (start_full
<= rit
->first
) {
817 start_full
= rit
->first
;
819 << " full epoch " << start_full
<< dendl
;
820 bufferlist
&bl
= rit
->second
;
821 auto p
= bl
.cbegin();
826 for (epoch_t e
= start_full
; e
<= last
; e
++) {
827 map
<epoch_t
,bufferlist
>::iterator it
;
828 it
= m
->incremental_maps
.find(e
);
829 if (it
== m
->incremental_maps
.end())
833 << " incremental epoch " << e
834 << " on full epoch " << start_full
<< dendl
;
835 OSDMap::Incremental inc
;
836 bufferlist
&bl
= it
->second
;
837 auto p
= bl
.cbegin();
840 int err
= osdmap
.apply_incremental(inc
);
842 derr
<< "osd." << whoami
<< "::" << __func__
843 << "** ERROR: applying incremental: "
844 << cpp_strerror(err
) << dendl
;
845 ceph_abort_msg("error applying incremental");
848 dout(30) << __func__
<< "\nosdmap:\n";
849 JSONFormatter
f(true);
854 if (osdmap
.is_up(whoami
) &&
855 osdmap
.get_addrs(whoami
) == messenger
->get_myaddrs()) {
857 << " got into the osdmap and we're up!" << dendl
;
860 if (m
->newest_map
&& m
->newest_map
> last
) {
862 << " they have more maps; requesting them!" << dendl
;
863 monc
.sub_want("osdmap", osdmap
.get_epoch()+1, CEPH_SUBSCRIBE_ONETIME
);
867 dout(10) << __func__
<< " done" << dendl
;
871 bool ms_dispatch(Message
*m
) override
{
872 dout(1) << __func__
<< " " << *m
<< dendl
;
874 switch (m
->get_type()) {
875 case MSG_OSD_PG_CREATE
:
876 handle_pg_create((MOSDPGCreate
*)m
);
878 case CEPH_MSG_OSD_MAP
:
879 handle_osd_map((MOSDMap
*)m
);
888 void ms_handle_connect(Connection
*con
) override
{
889 dout(1) << __func__
<< " " << con
<< dendl
;
890 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
) {
891 dout(10) << __func__
<< " on mon" << dendl
;
895 void ms_handle_remote_reset(Connection
*con
) override
{}
897 bool ms_handle_reset(Connection
*con
) override
{
898 dout(1) << __func__
<< dendl
;
899 return con
->get_priv().get();
902 bool ms_handle_refused(Connection
*con
) override
{
906 const string
get_name() override
{
908 ss
<< "osd." << whoami
;
913 double const OSDStub::STUB_BOOT_INTERVAL
= 10.0;
916 #define dout_prefix *_dout << "main "
918 const char *our_name
= NULL
;
919 vector
<TestStub
*> stubs
;
920 ceph::mutex shutdown_lock
= ceph::make_mutex("main::shutdown_lock");
921 ceph::condition_variable shutdown_cond
;
922 Context
*shutdown_cb
= NULL
;
923 SafeTimer
*shutdown_timer
= NULL
;
925 struct C_Shutdown
: public Context
927 void finish(int r
) override
{
928 generic_dout(10) << "main::shutdown time has ran out" << dendl
;
929 shutdown_cond
.notify_all();
933 void handle_test_signal(int signum
)
935 if ((signum
!= SIGINT
) && (signum
!= SIGTERM
))
938 std::cerr
<< "*** Got signal " << sig_str(signum
) << " ***" << std::endl
;
939 std::lock_guard l
{shutdown_lock
};
940 if (shutdown_timer
) {
941 shutdown_timer
->cancel_all_events();
942 shutdown_cond
.notify_all();
947 ceph_assert(our_name
!= NULL
);
949 std::cout
<< "usage: " << our_name
950 << " <--stub-id ID> [--stub-id ID...]"
954 -c FILE Read configuration from FILE\n\
955 --keyring FILE Read keyring from FILE\n\
956 --help This message\n\
958 Test-specific Options:\n\
959 --stub-id ID1..ID2 Interval of OSD ids for multiple stubs to mimic.\n\
960 --stub-id ID OSD id a stub will mimic to be\n\
961 (same as --stub-id ID..ID)\n\
965 int get_id_interval(int &first
, int &last
, string
&str
)
967 size_t found
= str
.find("..");
968 string first_str
, last_str
;
969 if (found
== string::npos
) {
970 first_str
= last_str
= str
;
972 first_str
= str
.substr(0, found
);
973 last_str
= str
.substr(found
+2);
977 first
= strict_strtol(first_str
.c_str(), 10, &err
);
978 if ((first
== 0) && (!err
.empty())) {
979 std::cerr
<< err
<< std::endl
;
983 last
= strict_strtol(last_str
.c_str(), 10, &err
);
984 if ((last
== 0) && (!err
.empty())) {
985 std::cerr
<< err
<< std::endl
;
991 int main(int argc
, const char *argv
[])
994 auto args
= argv_to_vec(argc
, argv
);
996 auto cct
= global_init(nullptr, args
,
997 CEPH_ENTITY_TYPE_OSD
, CODE_ENVIRONMENT_UTILITY
,
998 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE
);
1000 common_init_finish(g_ceph_context
);
1001 g_ceph_context
->_conf
.apply_changes(nullptr);
1004 double duration
= 300.0;
1006 for (std::vector
<const char*>::iterator i
= args
.begin(); i
!= args
.end();) {
1009 if (ceph_argparse_double_dash(args
, i
)) {
1011 } else if (ceph_argparse_witharg(args
, i
, &val
,
1012 "--stub-id", (char*) NULL
)) {
1013 int first
= -1, last
= -1;
1014 if (get_id_interval(first
, last
, val
) < 0) {
1015 std::cerr
<< "** error parsing stub id '" << val
<< "'" << std::endl
;
1019 for (; first
<= last
; ++first
)
1020 stub_ids
.insert(first
);
1021 } else if (ceph_argparse_witharg(args
, i
, &val
,
1022 "--duration", (char*) NULL
)) {
1024 duration
= (double) strict_strtol(val
.c_str(), 10, &err
);
1025 if ((duration
== 0) && (!err
.empty())) {
1026 std::cerr
<< "** error parsing '--duration " << val
<< "': '"
1027 << err
<< std::endl
;
1030 } else if (ceph_argparse_flag(args
, i
, "--help", (char*) NULL
)) {
1034 std::cerr
<< "unknown argument '" << *i
<< "'" << std::endl
;
1039 if (stub_ids
.empty()) {
1040 std::cerr
<< "** error: must specify at least one '--stub-id <ID>'"
1046 for (set
<int>::iterator i
= stub_ids
.begin(); i
!= stub_ids
.end(); ++i
) {
1049 std::cout
<< __func__
<< " starting stub." << whoami
<< std::endl
;
1050 OSDStub
*stub
= new OSDStub(whoami
, g_ceph_context
);
1051 int err
= stub
->init();
1053 std::cerr
<< "** osd stub error: " << cpp_strerror(-err
) << std::endl
;
1056 stubs
.push_back(stub
);
1059 std::cout
<< __func__
<< " starting client stub" << std::endl
;
1060 ClientStub
*cstub
= new ClientStub(g_ceph_context
);
1061 int err
= cstub
->init();
1063 std::cerr
<< "** client stub error: " << cpp_strerror(-err
) << std::endl
;
1066 stubs
.push_back(cstub
);
1068 init_async_signal_handler();
1069 register_async_signal_handler_oneshot(SIGINT
, handle_test_signal
);
1070 register_async_signal_handler_oneshot(SIGTERM
, handle_test_signal
);
1073 unique_lock locker
{shutdown_lock
};
1074 shutdown_timer
= new SafeTimer(g_ceph_context
, shutdown_lock
);
1075 shutdown_timer
->init();
1076 if (duration
!= 0) {
1077 std::cout
<< __func__
1078 << " run test for " << duration
<< " seconds" << std::endl
;
1079 shutdown_timer
->add_event_after((double) duration
, new C_Shutdown
);
1081 shutdown_cond
.wait(locker
);
1082 shutdown_timer
->shutdown();
1083 delete shutdown_timer
;
1084 shutdown_timer
= NULL
;
1086 unregister_async_signal_handler(SIGINT
, handle_test_signal
);
1087 unregister_async_signal_handler(SIGTERM
, handle_test_signal
);
1089 std::cout
<< __func__
<< " waiting for stubs to finish" << std::endl
;
1090 vector
<TestStub
*>::iterator it
;
1092 for (i
= 0, it
= stubs
.begin(); it
!= stubs
.end(); ++it
, ++i
) {
1096 std::cout
<< __func__
<< " finished " << (*it
)->get_name() << std::endl
;