]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/mon/test-mon-msg.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / test / mon / test-mon-msg.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13 #include <stdio.h>
14 #include <string.h>
15 #include <iostream>
16 #include <sstream>
17 #include <time.h>
18 #include <stdlib.h>
19 #include <map>
20
21 #include "global/global_init.h"
22 #include "global/global_context.h"
23 #include "common/ceph_argparse.h"
24 #include "common/version.h"
25 #include "common/dout.h"
26 #include "common/debug.h"
27 #include "common/ceph_mutex.h"
28 #include "common/Timer.h"
29 #include "common/errno.h"
30 #include "mon/MonClient.h"
31 #include "msg/Dispatcher.h"
32 #include "include/err.h"
33 #include <boost/scoped_ptr.hpp>
34
35 #include "gtest/gtest.h"
36
37 #include "common/config.h"
38 #include "include/ceph_assert.h"
39
40 #include "messages/MMonProbe.h"
41 #include "messages/MRoute.h"
42 #include "messages/MGenericMessage.h"
43 #include "messages/MMonJoin.h"
44
45 #define dout_context g_ceph_context
46 #define dout_subsys ceph_subsys_
47 #undef dout_prefix
48 #define dout_prefix *_dout << "test-mon-msg "
49
50 class MonClientHelper : public Dispatcher
51 {
52 protected:
53 CephContext *cct;
54 Messenger *msg;
55 MonClient monc;
56
57 ceph::mutex lock = ceph::make_mutex("mon-msg-test::lock");
58
59 set<int> wanted;
60
61 public:
62
63 explicit MonClientHelper(CephContext *cct_)
64 : Dispatcher(cct_),
65 cct(cct_),
66 msg(NULL),
67 monc(cct_)
68 { }
69
70
71 int post_init() {
72 dout(1) << __func__ << dendl;
73 if (!msg)
74 return -EINVAL;
75 msg->add_dispatcher_tail(this);
76 return 0;
77 }
78
79 int init_messenger() {
80 dout(1) << __func__ << dendl;
81
82 std::string public_msgr_type = cct->_conf->ms_public_type.empty() ? cct->_conf.get_val<std::string>("ms_type") : cct->_conf->ms_public_type;
83 msg = Messenger::create(cct, public_msgr_type, entity_name_t::CLIENT(-1),
84 "test-mon-msg", 0, 0);
85 ceph_assert(msg != NULL);
86 msg->set_default_policy(Messenger::Policy::lossy_client(0));
87 dout(0) << __func__ << " starting messenger at "
88 << msg->get_myaddrs() << dendl;
89 msg->start();
90 return 0;
91 }
92
93 int init_monc() {
94 dout(1) << __func__ << dendl;
95 ceph_assert(msg != NULL);
96 int err = monc.build_initial_monmap();
97 if (err < 0) {
98 derr << __func__ << " error building monmap: "
99 << cpp_strerror(err) << dendl;
100 return err;
101 }
102
103 monc.set_messenger(msg);
104 msg->add_dispatcher_head(&monc);
105
106 monc.set_want_keys(CEPH_ENTITY_TYPE_MON);
107 err = monc.init();
108 if (err < 0) {
109 derr << __func__ << " monc init failed: "
110 << cpp_strerror(err) << dendl;
111 goto fail;
112 }
113
114 err = monc.authenticate();
115 if (err < 0) {
116 derr << __func__ << " monc auth failed: "
117 << cpp_strerror(err) << dendl;
118 goto fail_monc;
119 }
120 monc.wait_auth_rotating(30.0);
121 monc.renew_subs();
122 dout(0) << __func__ << " finished" << dendl;
123 return 0;
124
125 fail_monc:
126 derr << __func__ << " failing monc" << dendl;
127 monc.shutdown();
128 fail:
129 return err;
130 }
131
132 void shutdown_messenger() {
133 dout(0) << __func__ << dendl;
134 msg->shutdown();
135 msg->wait();
136 }
137
138 void shutdown_monc() {
139 dout(0) << __func__ << dendl;
140 monc.shutdown();
141 }
142
143 void shutdown() {
144 dout(0) << __func__ << dendl;
145 shutdown_monc();
146 shutdown_messenger();
147 }
148
149 MonMap *get_monmap() {
150 return &monc.monmap;
151 }
152
153 int init() {
154 int err = init_messenger();
155 if (err < 0)
156 goto fail;
157 err = init_monc();
158 if (err < 0)
159 goto fail_msgr;
160 err = post_init();
161 if (err < 0)
162 goto fail_monc;
163 return 0;
164 fail_monc:
165 shutdown_monc();
166 fail_msgr:
167 shutdown_messenger();
168 fail:
169 return err;
170 }
171
172 virtual void handle_wanted(Message *m) { }
173
174 bool handle_message(Message *m) {
175 dout(1) << __func__ << " " << *m << dendl;
176 if (!is_wanted(m)) {
177 dout(10) << __func__ << " not wanted" << dendl;
178 return false;
179 }
180 handle_wanted(m);
181 m->put();
182
183 return true;
184 }
185
186 bool ms_dispatch(Message *m) override {
187 return handle_message(m);
188 }
189 void ms_handle_connect(Connection *con) override { }
190 void ms_handle_remote_reset(Connection *con) override { }
191 bool ms_handle_reset(Connection *con) override { return false; }
192 bool ms_handle_refused(Connection *con) override { return false; }
193
194 bool is_wanted(Message *m) {
195 dout(20) << __func__ << " " << *m << " type " << m->get_type() << dendl;
196 return (wanted.find(m->get_type()) != wanted.end());
197 }
198
199 void add_wanted(int t) {
200 dout(20) << __func__ << " type " << t << dendl;
201 wanted.insert(t);
202 }
203
204 void rm_wanted(int t) {
205 dout(20) << __func__ << " type " << t << dendl;
206 wanted.erase(t);
207 }
208
209 void send_message(Message *m) {
210 dout(15) << __func__ << " " << *m << dendl;
211 monc.send_mon_message(m);
212 }
213
214 void wait() { msg->wait(); }
215 };
216
217 class MonMsgTest : public MonClientHelper,
218 public ::testing::Test
219 {
220 protected:
221 int reply_type = 0;
222 Message *reply_msg = nullptr;
223 ceph::mutex lock = ceph::make_mutex("lock");
224 ceph::condition_variable cond;
225
226 MonMsgTest() :
227 MonClientHelper(g_ceph_context) { }
228
229 public:
230 void SetUp() override {
231 reply_type = -1;
232 if (reply_msg) {
233 reply_msg->put();
234 reply_msg = nullptr;
235 }
236 ASSERT_EQ(init(), 0);
237 }
238
239 void TearDown() override {
240 shutdown();
241 if (reply_msg) {
242 reply_msg->put();
243 reply_msg = nullptr;
244 }
245 }
246
247 void handle_wanted(Message *m) override {
248 std::lock_guard l{lock};
249 // caller will put() after they call us, so hold on to a ref
250 m->get();
251 reply_msg = m;
252 cond.notify_all();
253 }
254
255 Message *send_wait_reply(Message *m, int t, double timeout=30.0) {
256 std::unique_lock l{lock};
257 reply_type = t;
258 add_wanted(t);
259 send_message(m);
260
261 std::cv_status status = std::cv_status::no_timeout;
262 if (timeout > 0) {
263 utime_t s = ceph_clock_now();
264 status = cond.wait_for(l, ceph::make_timespan(timeout));
265 utime_t e = ceph_clock_now();
266 dout(20) << __func__ << " took " << (e-s) << " seconds" << dendl;
267 } else {
268 cond.wait(l);
269 }
270 rm_wanted(t);
271 l.unlock();
272 if (status == std::cv_status::timeout) {
273 dout(20) << __func__ << " error: " << cpp_strerror(ETIMEDOUT) << dendl;
274 return (Message*)((long)-ETIMEDOUT);
275 }
276
277 if (!reply_msg)
278 dout(20) << __func__ << " reply_msg is nullptr" << dendl;
279 else
280 dout(20) << __func__ << " reply_msg " << *reply_msg << dendl;
281 return reply_msg;
282 }
283 };
284
285 TEST_F(MonMsgTest, MMonProbeTest)
286 {
287 Message *m = new MMonProbe(get_monmap()->fsid,
288 MMonProbe::OP_PROBE, "b", false,
289 ceph_release());
290 Message *r = send_wait_reply(m, MSG_MON_PROBE);
291 ASSERT_NE(IS_ERR(r), 0);
292 ASSERT_EQ(PTR_ERR(r), -ETIMEDOUT);
293 }
294
295 TEST_F(MonMsgTest, MRouteTest)
296 {
297 Message *payload = new MGenericMessage(CEPH_MSG_SHUTDOWN);
298 MRoute *m = new MRoute;
299 m->msg = payload;
300 Message *r = send_wait_reply(m, CEPH_MSG_SHUTDOWN);
301 // we want an error
302 ASSERT_NE(IS_ERR(r), 0);
303 ASSERT_EQ(PTR_ERR(r), -ETIMEDOUT);
304 }
305
306 /* MMonScrub and MMonSync have other safeguards in place that prevent
307 * us from actually receiving a reply even if the message is handled
308 * by the monitor due to lack of cap checking.
309 */
310 TEST_F(MonMsgTest, MMonJoin)
311 {
312 Message *m = new MMonJoin(get_monmap()->fsid, string("client"),
313 msg->get_myaddrs());
314 send_wait_reply(m, MSG_MON_PAXOS, 10.0);
315
316 int r = monc.get_monmap();
317 ASSERT_EQ(r, 0);
318 ASSERT_FALSE(monc.monmap.contains("client"));
319 }
320
321 int main(int argc, char *argv[])
322 {
323 vector<const char*> args;
324 argv_to_vec(argc, (const char **)argv, args);
325
326 auto cct = global_init(nullptr, args,
327 CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY,
328 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
329 common_init_finish(g_ceph_context);
330 g_ceph_context->_conf.apply_changes(nullptr);
331 ::testing::InitGoogleTest(&argc, argv);
332
333 return RUN_ALL_TESTS();
334 }
335