1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2015 Haomai Wang
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
25 #include "common/ceph_argparse.h"
26 #include "common/debug.h"
27 #include "global/global_init.h"
28 #include "msg/Messenger.h"
29 #include "messages/MOSDOp.h"
30 #include "messages/MOSDOpReply.h"
32 class ServerDispatcher
: public Dispatcher
{
35 class OpWQ
: public ThreadPool::WorkQueue
<Message
> {
36 list
<Message
*> messages
;
39 OpWQ(time_t timeout
, time_t suicide_timeout
, ThreadPool
*tp
)
40 : ThreadPool::WorkQueue
<Message
>("ServerDispatcher::OpWQ", timeout
, suicide_timeout
, tp
) {}
42 bool _enqueue(Message
*m
) override
{
43 messages
.push_back(m
);
46 void _dequeue(Message
*m
) override
{
49 bool _empty() override
{
50 return messages
.empty();
52 Message
*_dequeue() override
{
55 Message
*m
= messages
.front();
59 void _process(Message
*m
, ThreadPool::TPHandle
&handle
) override
{
60 MOSDOp
*osd_op
= static_cast<MOSDOp
*>(m
);
61 MOSDOpReply
*reply
= new MOSDOpReply(osd_op
, 0, 0, 0, false);
62 m
->get_connection()->send_message(reply
);
65 void _process_finish(Message
*m
) override
{ }
66 void _clear() override
{
67 assert(messages
.empty());
72 ServerDispatcher(int threads
, uint64_t delay
): Dispatcher(g_ceph_context
), think_time(delay
),
73 op_tp(g_ceph_context
, "ServerDispatcher::op_tp", "tp_serv_disp", threads
, "serverdispatcher_op_threads"),
74 op_wq(30, 30, &op_tp
) {
77 ~ServerDispatcher() override
{
80 bool ms_can_fast_dispatch_any() const override
{ return true; }
81 bool ms_can_fast_dispatch(const Message
*m
) const override
{
82 switch (m
->get_type()) {
90 void ms_handle_fast_connect(Connection
*con
) override
{}
91 void ms_handle_fast_accept(Connection
*con
) override
{}
92 bool ms_dispatch(Message
*m
) override
{ return true; }
93 bool ms_handle_reset(Connection
*con
) override
{ return true; }
94 void ms_handle_remote_reset(Connection
*con
) override
{}
95 bool ms_handle_refused(Connection
*con
) override
{ return false; }
96 void ms_fast_dispatch(Message
*m
) override
{
98 //cerr << __func__ << " reply message=" << m << std::endl;
101 bool ms_verify_authorizer(Connection
*con
, int peer_type
, int protocol
,
102 bufferlist
& authorizer
, bufferlist
& authorizer_reply
,
103 bool& isvalid
, CryptoKey
& session_key
) override
{
109 class MessengerServer
{
113 ServerDispatcher dispatcher
;
116 MessengerServer(string t
, string addr
, int threads
, int delay
):
117 msgr(NULL
), type(t
), bindaddr(addr
), dispatcher(threads
, delay
) {
118 msgr
= Messenger::create(g_ceph_context
, type
, entity_name_t::OSD(0), "server", 0, 0);
119 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
127 addr
.parse(bindaddr
.c_str());
129 msgr
->add_dispatcher_head(&dispatcher
);
135 void usage(const string
&name
) {
136 cerr
<< "Usage: " << name
<< " [bind ip:port] [server worker threads] [thinktime us]" << std::endl
;
137 cerr
<< " [bind ip:port]: The ip:port pair to bind, client need to specify this pair to connect" << std::endl
;
138 cerr
<< " [server worker threads]: threads will process incoming messages and reply(matching pg threads)" << std::endl
;
139 cerr
<< " [thinktime]: sleep time when do dispatching(match fast dispatch logic in OSD.cc)" << std::endl
;
142 int main(int argc
, char **argv
)
144 vector
<const char*> args
;
145 argv_to_vec(argc
, (const char **)argv
, args
);
147 auto cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
,
148 CODE_ENVIRONMENT_UTILITY
, 0);
149 common_init_finish(g_ceph_context
);
150 g_ceph_context
->_conf
->apply_changes(NULL
);
152 if (args
.size() < 3) {
157 int worker_threads
= atoi(args
[1]);
158 int think_time
= atoi(args
[2]);
159 std::string public_msgr_type
= g_ceph_context
->_conf
->ms_public_type
.empty() ? g_ceph_context
->_conf
->get_val
<std::string
>("ms_type") : g_ceph_context
->_conf
->ms_public_type
;
161 cerr
<< " This tool won't handle connection error alike things, " << std::endl
;
162 cerr
<< "please ensure the proper network environment to test." << std::endl
;
163 cerr
<< " Or ctrl+c when meeting error and restart tests" << std::endl
;
164 cerr
<< " using ms-public-type " << public_msgr_type
<< std::endl
;
165 cerr
<< " bind ip:port " << args
[0] << std::endl
;
166 cerr
<< " worker threads " << worker_threads
<< std::endl
;
167 cerr
<< " thinktime(us) " << think_time
<< std::endl
;
169 MessengerServer
server(public_msgr_type
, args
[0], worker_threads
, think_time
);