]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/Mirror.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / tools / rbd_mirror / Mirror.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <boost/range/adaptor/map.hpp>
5
6 #include "common/Formatter.h"
7 #include "common/admin_socket.h"
8 #include "common/debug.h"
9 #include "common/errno.h"
10 #include "librbd/ImageCtx.h"
11 #include "Mirror.h"
12 #include "ServiceDaemon.h"
13 #include "Threads.h"
14
15 #define dout_context g_ceph_context
16 #define dout_subsys ceph_subsys_rbd_mirror
17 #undef dout_prefix
18 #define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
19 << __func__ << ": "
20
21 using std::list;
22 using std::map;
23 using std::set;
24 using std::string;
25 using std::unique_ptr;
26 using std::vector;
27
28 using librados::Rados;
29 using librados::IoCtx;
30 using librbd::mirror_peer_t;
31
32 namespace rbd {
33 namespace mirror {
34
35 namespace {
36
37 class MirrorAdminSocketCommand {
38 public:
39 virtual ~MirrorAdminSocketCommand() {}
40 virtual bool call(Formatter *f, stringstream *ss) = 0;
41 };
42
43 class StatusCommand : public MirrorAdminSocketCommand {
44 public:
45 explicit StatusCommand(Mirror *mirror) : mirror(mirror) {}
46
47 bool call(Formatter *f, stringstream *ss) override {
48 mirror->print_status(f, ss);
49 return true;
50 }
51
52 private:
53 Mirror *mirror;
54 };
55
56 class StartCommand : public MirrorAdminSocketCommand {
57 public:
58 explicit StartCommand(Mirror *mirror) : mirror(mirror) {}
59
60 bool call(Formatter *f, stringstream *ss) override {
61 mirror->start();
62 return true;
63 }
64
65 private:
66 Mirror *mirror;
67 };
68
69 class StopCommand : public MirrorAdminSocketCommand {
70 public:
71 explicit StopCommand(Mirror *mirror) : mirror(mirror) {}
72
73 bool call(Formatter *f, stringstream *ss) override {
74 mirror->stop();
75 return true;
76 }
77
78 private:
79 Mirror *mirror;
80 };
81
82 class RestartCommand : public MirrorAdminSocketCommand {
83 public:
84 explicit RestartCommand(Mirror *mirror) : mirror(mirror) {}
85
86 bool call(Formatter *f, stringstream *ss) override {
87 mirror->restart();
88 return true;
89 }
90
91 private:
92 Mirror *mirror;
93 };
94
95 class FlushCommand : public MirrorAdminSocketCommand {
96 public:
97 explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
98
99 bool call(Formatter *f, stringstream *ss) override {
100 mirror->flush();
101 return true;
102 }
103
104 private:
105 Mirror *mirror;
106 };
107
108 class LeaderReleaseCommand : public MirrorAdminSocketCommand {
109 public:
110 explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {}
111
112 bool call(Formatter *f, stringstream *ss) override {
113 mirror->release_leader();
114 return true;
115 }
116
117 private:
118 Mirror *mirror;
119 };
120
121 } // anonymous namespace
122
123 class MirrorAdminSocketHook : public AdminSocketHook {
124 public:
125 MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
126 admin_socket(cct->get_admin_socket()) {
127 std::string command;
128 int r;
129
130 command = "rbd mirror status";
131 r = admin_socket->register_command(command, command, this,
132 "get status for rbd mirror");
133 if (r == 0) {
134 commands[command] = new StatusCommand(mirror);
135 }
136
137 command = "rbd mirror start";
138 r = admin_socket->register_command(command, command, this,
139 "start rbd mirror");
140 if (r == 0) {
141 commands[command] = new StartCommand(mirror);
142 }
143
144 command = "rbd mirror stop";
145 r = admin_socket->register_command(command, command, this,
146 "stop rbd mirror");
147 if (r == 0) {
148 commands[command] = new StopCommand(mirror);
149 }
150
151 command = "rbd mirror restart";
152 r = admin_socket->register_command(command, command, this,
153 "restart rbd mirror");
154 if (r == 0) {
155 commands[command] = new RestartCommand(mirror);
156 }
157
158 command = "rbd mirror flush";
159 r = admin_socket->register_command(command, command, this,
160 "flush rbd mirror");
161 if (r == 0) {
162 commands[command] = new FlushCommand(mirror);
163 }
164
165 command = "rbd mirror leader release";
166 r = admin_socket->register_command(command, command, this,
167 "release rbd mirror leader");
168 if (r == 0) {
169 commands[command] = new LeaderReleaseCommand(mirror);
170 }
171 }
172
173 ~MirrorAdminSocketHook() override {
174 for (Commands::const_iterator i = commands.begin(); i != commands.end();
175 ++i) {
176 (void)admin_socket->unregister_command(i->first);
177 delete i->second;
178 }
179 }
180
181 bool call(std::string_view command, const cmdmap_t& cmdmap,
182 std::string_view format, bufferlist& out) override {
183 Commands::const_iterator i = commands.find(command);
184 ceph_assert(i != commands.end());
185 Formatter *f = Formatter::create(format);
186 stringstream ss;
187 bool r = i->second->call(f, &ss);
188 delete f;
189 out.append(ss);
190 return r;
191 }
192
193 private:
194 typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands;
195
196 AdminSocket *admin_socket;
197 Commands commands;
198 };
199
200 Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
201 m_cct(cct),
202 m_args(args),
203 m_lock("rbd::mirror::Mirror"),
204 m_local(new librados::Rados()),
205 m_asok_hook(new MirrorAdminSocketHook(cct, this))
206 {
207 m_threads =
208 &(cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx>>(
209 "rbd_mirror::threads", false, cct));
210 m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
211 }
212
213 Mirror::~Mirror()
214 {
215 delete m_asok_hook;
216 }
217
218 void Mirror::handle_signal(int signum)
219 {
220 m_stopping = true;
221 {
222 Mutex::Locker l(m_lock);
223 m_cond.Signal();
224 }
225 }
226
227 int Mirror::init()
228 {
229 int r = m_local->init_with_context(m_cct);
230 if (r < 0) {
231 derr << "could not initialize rados handle" << dendl;
232 return r;
233 }
234
235 r = m_local->connect();
236 if (r < 0) {
237 derr << "error connecting to local cluster" << dendl;
238 return r;
239 }
240
241 r = m_service_daemon->init();
242 if (r < 0) {
243 derr << "error registering service daemon: " << cpp_strerror(r) << dendl;
244 return r;
245 }
246
247 m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock,
248 m_service_daemon.get()));
249 return r;
250 }
251
252 void Mirror::run()
253 {
254 dout(20) << "enter" << dendl;
255 while (!m_stopping) {
256 m_local_cluster_watcher->refresh_pools();
257 Mutex::Locker l(m_lock);
258 if (!m_manual_stop) {
259 update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
260 }
261 m_cond.WaitInterval(
262 m_lock,
263 utime_t(m_cct->_conf.get_val<uint64_t>("rbd_mirror_pool_replayers_refresh_interval"), 0));
264 }
265
266 // stop all pool replayers in parallel
267 Mutex::Locker locker(m_lock);
268 for (auto &pool_replayer : m_pool_replayers) {
269 pool_replayer.second->stop(false);
270 }
271 dout(20) << "return" << dendl;
272 }
273
274 void Mirror::print_status(Formatter *f, stringstream *ss)
275 {
276 dout(20) << "enter" << dendl;
277
278 Mutex::Locker l(m_lock);
279
280 if (m_stopping) {
281 return;
282 }
283
284 if (f) {
285 f->open_object_section("mirror_status");
286 f->open_array_section("pool_replayers");
287 };
288
289 for (auto &pool_replayer : m_pool_replayers) {
290 pool_replayer.second->print_status(f, ss);
291 }
292
293 if (f) {
294 f->close_section();
295 }
296 }
297
298 void Mirror::start()
299 {
300 dout(20) << "enter" << dendl;
301 Mutex::Locker l(m_lock);
302
303 if (m_stopping) {
304 return;
305 }
306
307 m_manual_stop = false;
308
309 for (auto &pool_replayer : m_pool_replayers) {
310 pool_replayer.second->start();
311 }
312 }
313
314 void Mirror::stop()
315 {
316 dout(20) << "enter" << dendl;
317 Mutex::Locker l(m_lock);
318
319 if (m_stopping) {
320 return;
321 }
322
323 m_manual_stop = true;
324
325 for (auto &pool_replayer : m_pool_replayers) {
326 pool_replayer.second->stop(true);
327 }
328 }
329
330 void Mirror::restart()
331 {
332 dout(20) << "enter" << dendl;
333 Mutex::Locker l(m_lock);
334
335 if (m_stopping) {
336 return;
337 }
338
339 m_manual_stop = false;
340
341 for (auto &pool_replayer : m_pool_replayers) {
342 pool_replayer.second->restart();
343 }
344 }
345
346 void Mirror::flush()
347 {
348 dout(20) << "enter" << dendl;
349 Mutex::Locker l(m_lock);
350
351 if (m_stopping || m_manual_stop) {
352 return;
353 }
354
355 for (auto &pool_replayer : m_pool_replayers) {
356 pool_replayer.second->flush();
357 }
358 }
359
360 void Mirror::release_leader()
361 {
362 dout(20) << "enter" << dendl;
363 Mutex::Locker l(m_lock);
364
365 if (m_stopping) {
366 return;
367 }
368
369 for (auto &pool_replayer : m_pool_replayers) {
370 pool_replayer.second->release_leader();
371 }
372 }
373
374 void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
375 {
376 dout(20) << "enter" << dendl;
377 ceph_assert(m_lock.is_locked());
378
379 // remove stale pool replayers before creating new pool replayers
380 for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
381 auto &peer = it->first.second;
382 auto pool_peer_it = pool_peers.find(it->first.first);
383 if (pool_peer_it == pool_peers.end() ||
384 pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
385 dout(20) << "removing pool replayer for " << peer << dendl;
386 // TODO: make async
387 it->second->shut_down();
388 it = m_pool_replayers.erase(it);
389 } else {
390 ++it;
391 }
392 }
393
394 for (auto &kv : pool_peers) {
395 for (auto &peer : kv.second) {
396 PoolPeer pool_peer(kv.first, peer);
397
398 auto pool_replayers_it = m_pool_replayers.find(pool_peer);
399 if (pool_replayers_it != m_pool_replayers.end()) {
400 auto& pool_replayer = pool_replayers_it->second;
401 if (pool_replayer->is_blacklisted()) {
402 derr << "restarting blacklisted pool replayer for " << peer << dendl;
403 // TODO: make async
404 pool_replayer->shut_down();
405 pool_replayer->init();
406 } else if (!pool_replayer->is_running()) {
407 derr << "restarting failed pool replayer for " << peer << dendl;
408 // TODO: make async
409 pool_replayer->shut_down();
410 pool_replayer->init();
411 }
412 } else {
413 dout(20) << "starting pool replayer for " << peer << dendl;
414 unique_ptr<PoolReplayer<>> pool_replayer(new PoolReplayer<>(
415 m_threads, m_service_daemon.get(), kv.first, peer, m_args));
416
417 // TODO: make async
418 pool_replayer->init();
419 m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
420 }
421 }
422
423 // TODO currently only support a single peer
424 }
425 }
426
427 } // namespace mirror
428 } // namespace rbd