]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/mon/test-mon-msg.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / test / mon / test-mon-msg.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13 #include <stdio.h>
14 #include <string.h>
15 #include <iostream>
16 #include <sstream>
17 #include <time.h>
18 #include <stdlib.h>
19 #include <map>
20
21 #include "global/global_init.h"
22 #include "global/global_context.h"
23 #include "common/async/context_pool.h"
24 #include "common/ceph_argparse.h"
25 #include "common/version.h"
26 #include "common/dout.h"
27 #include "common/debug.h"
28 #include "common/ceph_mutex.h"
29 #include "common/Timer.h"
30 #include "common/errno.h"
31 #include "mon/MonClient.h"
32 #include "msg/Dispatcher.h"
33 #include "include/err.h"
34 #include <boost/scoped_ptr.hpp>
35
36 #include "gtest/gtest.h"
37
38 #include "common/config.h"
39 #include "include/ceph_assert.h"
40
41 #include "messages/MMonProbe.h"
42 #include "messages/MRoute.h"
43 #include "messages/MGenericMessage.h"
44 #include "messages/MMonJoin.h"
45
46 #define dout_context g_ceph_context
47 #define dout_subsys ceph_subsys_
48 #undef dout_prefix
49 #define dout_prefix *_dout << "test-mon-msg "
50
51 class MonClientHelper : public Dispatcher
52 {
53 protected:
54 CephContext *cct;
55 ceph::async::io_context_pool poolctx;
56 Messenger *msg;
57 MonClient monc;
58
59 ceph::mutex lock = ceph::make_mutex("mon-msg-test::lock");
60
61 set<int> wanted;
62
63 public:
64
65 explicit MonClientHelper(CephContext *cct_)
66 : Dispatcher(cct_),
67 cct(cct_),
68 poolctx(1),
69 msg(NULL),
70 monc(cct_, poolctx)
71 { }
72
73
74 int post_init() {
75 dout(1) << __func__ << dendl;
76 if (!msg)
77 return -EINVAL;
78 msg->add_dispatcher_tail(this);
79 return 0;
80 }
81
82 int init_messenger() {
83 dout(1) << __func__ << dendl;
84
85 std::string public_msgr_type = cct->_conf->ms_public_type.empty() ? cct->_conf.get_val<std::string>("ms_type") : cct->_conf->ms_public_type;
86 msg = Messenger::create(cct, public_msgr_type, entity_name_t::CLIENT(-1),
87 "test-mon-msg", 0);
88 ceph_assert(msg != NULL);
89 msg->set_default_policy(Messenger::Policy::lossy_client(0));
90 dout(0) << __func__ << " starting messenger at "
91 << msg->get_myaddrs() << dendl;
92 msg->start();
93 return 0;
94 }
95
96 int init_monc() {
97 dout(1) << __func__ << dendl;
98 ceph_assert(msg != NULL);
99 int err = monc.build_initial_monmap();
100 if (err < 0) {
101 derr << __func__ << " error building monmap: "
102 << cpp_strerror(err) << dendl;
103 return err;
104 }
105
106 monc.set_messenger(msg);
107 msg->add_dispatcher_head(&monc);
108
109 monc.set_want_keys(CEPH_ENTITY_TYPE_MON);
110 err = monc.init();
111 if (err < 0) {
112 derr << __func__ << " monc init failed: "
113 << cpp_strerror(err) << dendl;
114 goto fail;
115 }
116
117 err = monc.authenticate();
118 if (err < 0) {
119 derr << __func__ << " monc auth failed: "
120 << cpp_strerror(err) << dendl;
121 goto fail_monc;
122 }
123 monc.wait_auth_rotating(30.0);
124 monc.renew_subs();
125 dout(0) << __func__ << " finished" << dendl;
126 return 0;
127
128 fail_monc:
129 derr << __func__ << " failing monc" << dendl;
130 monc.shutdown();
131 fail:
132 return err;
133 }
134
135 void shutdown_messenger() {
136 dout(0) << __func__ << dendl;
137 msg->shutdown();
138 msg->wait();
139 }
140
141 void shutdown_monc() {
142 dout(0) << __func__ << dendl;
143 monc.shutdown();
144 }
145
146 void shutdown() {
147 dout(0) << __func__ << dendl;
148 shutdown_monc();
149 shutdown_messenger();
150 }
151
152 MonMap *get_monmap() {
153 return &monc.monmap;
154 }
155
156 int init() {
157 int err = init_messenger();
158 if (err < 0)
159 goto fail;
160 err = init_monc();
161 if (err < 0)
162 goto fail_msgr;
163 err = post_init();
164 if (err < 0)
165 goto fail_monc;
166 return 0;
167 fail_monc:
168 shutdown_monc();
169 fail_msgr:
170 shutdown_messenger();
171 fail:
172 return err;
173 }
174
175 virtual void handle_wanted(Message *m) { }
176
177 bool handle_message(Message *m) {
178 dout(1) << __func__ << " " << *m << dendl;
179 if (!is_wanted(m)) {
180 dout(10) << __func__ << " not wanted" << dendl;
181 return false;
182 }
183 handle_wanted(m);
184 m->put();
185
186 return true;
187 }
188
189 bool ms_dispatch(Message *m) override {
190 return handle_message(m);
191 }
192 void ms_handle_connect(Connection *con) override { }
193 void ms_handle_remote_reset(Connection *con) override { }
194 bool ms_handle_reset(Connection *con) override { return false; }
195 bool ms_handle_refused(Connection *con) override { return false; }
196
197 bool is_wanted(Message *m) {
198 dout(20) << __func__ << " " << *m << " type " << m->get_type() << dendl;
199 return (wanted.find(m->get_type()) != wanted.end());
200 }
201
202 void add_wanted(int t) {
203 dout(20) << __func__ << " type " << t << dendl;
204 wanted.insert(t);
205 }
206
207 void rm_wanted(int t) {
208 dout(20) << __func__ << " type " << t << dendl;
209 wanted.erase(t);
210 }
211
212 void send_message(Message *m) {
213 dout(15) << __func__ << " " << *m << dendl;
214 monc.send_mon_message(m);
215 }
216
217 void wait() { msg->wait(); }
218 };
219
220 class MonMsgTest : public MonClientHelper,
221 public ::testing::Test
222 {
223 protected:
224 int reply_type = 0;
225 Message *reply_msg = nullptr;
226 ceph::mutex lock = ceph::make_mutex("lock");
227 ceph::condition_variable cond;
228
229 MonMsgTest() :
230 MonClientHelper(g_ceph_context) { }
231
232 public:
233 void SetUp() override {
234 reply_type = -1;
235 if (reply_msg) {
236 reply_msg->put();
237 reply_msg = nullptr;
238 }
239 ASSERT_EQ(init(), 0);
240 }
241
242 void TearDown() override {
243 shutdown();
244 if (reply_msg) {
245 reply_msg->put();
246 reply_msg = nullptr;
247 }
248 }
249
250 void handle_wanted(Message *m) override {
251 std::lock_guard l{lock};
252 // caller will put() after they call us, so hold on to a ref
253 m->get();
254 reply_msg = m;
255 cond.notify_all();
256 }
257
258 Message *send_wait_reply(Message *m, int t, double timeout=30.0) {
259 std::unique_lock l{lock};
260 reply_type = t;
261 add_wanted(t);
262 send_message(m);
263
264 std::cv_status status = std::cv_status::no_timeout;
265 if (timeout > 0) {
266 utime_t s = ceph_clock_now();
267 status = cond.wait_for(l, ceph::make_timespan(timeout));
268 utime_t e = ceph_clock_now();
269 dout(20) << __func__ << " took " << (e-s) << " seconds" << dendl;
270 } else {
271 cond.wait(l);
272 }
273 rm_wanted(t);
274 l.unlock();
275 if (status == std::cv_status::timeout) {
276 dout(20) << __func__ << " error: " << cpp_strerror(ETIMEDOUT) << dendl;
277 return (Message*)((long)-ETIMEDOUT);
278 }
279
280 if (!reply_msg)
281 dout(20) << __func__ << " reply_msg is nullptr" << dendl;
282 else
283 dout(20) << __func__ << " reply_msg " << *reply_msg << dendl;
284 return reply_msg;
285 }
286 };
287
288 TEST_F(MonMsgTest, MMonProbeTest)
289 {
290 Message *m = new MMonProbe(get_monmap()->fsid,
291 MMonProbe::OP_PROBE, "b", false,
292 ceph_release());
293 Message *r = send_wait_reply(m, MSG_MON_PROBE);
294 ASSERT_NE(IS_ERR(r), 0);
295 ASSERT_EQ(PTR_ERR(r), -ETIMEDOUT);
296 }
297
298 TEST_F(MonMsgTest, MRouteTest)
299 {
300 Message *payload = new MGenericMessage(CEPH_MSG_SHUTDOWN);
301 MRoute *m = new MRoute;
302 m->msg = payload;
303 Message *r = send_wait_reply(m, CEPH_MSG_SHUTDOWN);
304 // we want an error
305 ASSERT_NE(IS_ERR(r), 0);
306 ASSERT_EQ(PTR_ERR(r), -ETIMEDOUT);
307 }
308
309 /* MMonScrub and MMonSync have other safeguards in place that prevent
310 * us from actually receiving a reply even if the message is handled
311 * by the monitor due to lack of cap checking.
312 */
313 TEST_F(MonMsgTest, MMonJoin)
314 {
315 Message *m = new MMonJoin(get_monmap()->fsid, string("client"),
316 msg->get_myaddrs());
317 send_wait_reply(m, MSG_MON_PAXOS, 10.0);
318
319 int r = monc.get_monmap();
320 ASSERT_EQ(r, 0);
321 ASSERT_FALSE(monc.monmap.contains("client"));
322 }
323
324 int main(int argc, char *argv[])
325 {
326 vector<const char*> args;
327 argv_to_vec(argc, (const char **)argv, args);
328
329 auto cct = global_init(nullptr, args,
330 CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY,
331 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
332 common_init_finish(g_ceph_context);
333 g_ceph_context->_conf.apply_changes(nullptr);
334 ::testing::InitGoogleTest(&argc, argv);
335
336 return RUN_ALL_TESTS();
337 }
338