1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2015 Haomai Wang
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
25 #include "common/ceph_argparse.h"
26 #include "common/debug.h"
27 #include "common/WorkQueue.h"
28 #include "global/global_init.h"
29 #include "msg/Messenger.h"
30 #include "messages/MOSDOp.h"
31 #include "messages/MOSDOpReply.h"
32 #include "auth/DummyAuth.h"
34 class ServerDispatcher
: public Dispatcher
{
37 class OpWQ
: public ThreadPool::WorkQueue
<Message
> {
38 list
<Message
*> messages
;
41 OpWQ(ceph::timespan timeout
, ceph::timespan suicide_timeout
, ThreadPool
*tp
)
42 : ThreadPool::WorkQueue
<Message
>("ServerDispatcher::OpWQ", timeout
, suicide_timeout
, tp
) {}
44 bool _enqueue(Message
*m
) override
{
45 messages
.push_back(m
);
48 void _dequeue(Message
*m
) override
{
51 bool _empty() override
{
52 return messages
.empty();
54 Message
*_dequeue() override
{
57 Message
*m
= messages
.front();
61 void _process(Message
*m
, ThreadPool::TPHandle
&handle
) override
{
62 MOSDOp
*osd_op
= static_cast<MOSDOp
*>(m
);
63 MOSDOpReply
*reply
= new MOSDOpReply(osd_op
, 0, 0, 0, false);
64 m
->get_connection()->send_message(reply
);
67 void _process_finish(Message
*m
) override
{ }
68 void _clear() override
{
69 ceph_assert(messages
.empty());
74 ServerDispatcher(int threads
, uint64_t delay
): Dispatcher(g_ceph_context
), think_time(delay
),
75 op_tp(g_ceph_context
, "ServerDispatcher::op_tp", "tp_serv_disp", threads
, "serverdispatcher_op_threads"),
76 op_wq(ceph::make_timespan(30), ceph::make_timespan(30), &op_tp
) {
79 ~ServerDispatcher() override
{
82 bool ms_can_fast_dispatch_any() const override
{ return true; }
83 bool ms_can_fast_dispatch(const Message
*m
) const override
{
84 switch (m
->get_type()) {
92 void ms_handle_fast_connect(Connection
*con
) override
{}
93 void ms_handle_fast_accept(Connection
*con
) override
{}
94 bool ms_dispatch(Message
*m
) override
{ return true; }
95 bool ms_handle_reset(Connection
*con
) override
{ return true; }
96 void ms_handle_remote_reset(Connection
*con
) override
{}
97 bool ms_handle_refused(Connection
*con
) override
{ return false; }
98 void ms_fast_dispatch(Message
*m
) override
{
100 //cerr << __func__ << " reply message=" << m << std::endl;
103 int ms_handle_authentication(Connection
*con
) override
{
108 class MessengerServer
{
112 ServerDispatcher dispatcher
;
113 DummyAuthClientServer dummy_auth
;
116 MessengerServer(const string
&t
, const string
&addr
, int threads
, int delay
):
117 msgr(NULL
), type(t
), bindaddr(addr
), dispatcher(threads
, delay
),
118 dummy_auth(g_ceph_context
) {
119 msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::OSD(0), "server", 0);
120 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
121 dummy_auth
.auth_registry
.refresh_config();
122 msgr
->set_auth_server(&dummy_auth
);
130 addr
.parse(bindaddr
.c_str());
132 msgr
->add_dispatcher_head(&dispatcher
);
138 void usage(const string
&name
) {
139 cerr
<< "Usage: " << name
<< " [bind ip:port] [server worker threads] [thinktime us]" << std::endl
;
140 cerr
<< " [bind ip:port]: The ip:port pair to bind, client need to specify this pair to connect" << std::endl
;
141 cerr
<< " [server worker threads]: threads will process incoming messages and reply(matching pg threads)" << std::endl
;
142 cerr
<< " [thinktime]: sleep time when do dispatching(match fast dispatch logic in OSD.cc)" << std::endl
;
145 int main(int argc
, char **argv
)
147 auto args
= argv_to_vec(argc
, argv
);
149 auto cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
,
150 CODE_ENVIRONMENT_UTILITY
,
151 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE
);
152 common_init_finish(g_ceph_context
);
153 g_ceph_context
->_conf
.apply_changes(nullptr);
155 if (args
.size() < 3) {
160 int worker_threads
= atoi(args
[1]);
161 int think_time
= atoi(args
[2]);
162 std::string public_msgr_type
= g_ceph_context
->_conf
->ms_public_type
.empty() ? g_ceph_context
->_conf
.get_val
<std::string
>("ms_type") : g_ceph_context
->_conf
->ms_public_type
;
164 cerr
<< " This tool won't handle connection error alike things, " << std::endl
;
165 cerr
<< "please ensure the proper network environment to test." << std::endl
;
166 cerr
<< " Or ctrl+c when meeting error and restart tests" << std::endl
;
167 cerr
<< " using ms-public-type " << public_msgr_type
<< std::endl
;
168 cerr
<< " bind ip:port " << args
[0] << std::endl
;
169 cerr
<< " worker threads " << worker_threads
<< std::endl
;
170 cerr
<< " thinktime(us) " << think_time
<< std::endl
;
172 MessengerServer
server(public_msgr_type
, args
[0], worker_threads
, think_time
);