]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/Mirror.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / tools / rbd_mirror / Mirror.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
92f5a8d4
TL
4#include <signal.h>
5
7c673cae
FG
6#include <boost/range/adaptor/map.hpp>
7
8#include "common/Formatter.h"
9#include "common/admin_socket.h"
10#include "common/debug.h"
11#include "common/errno.h"
12#include "librbd/ImageCtx.h"
13#include "Mirror.h"
c07f9fc5 14#include "ServiceDaemon.h"
7c673cae 15#include "Threads.h"
7c673cae
FG
16
17#define dout_context g_ceph_context
18#define dout_subsys ceph_subsys_rbd_mirror
19#undef dout_prefix
20#define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
21 << __func__ << ": "
22
23using std::list;
24using std::map;
25using std::set;
26using std::string;
27using std::unique_ptr;
28using std::vector;
29
30using librados::Rados;
31using librados::IoCtx;
32using librbd::mirror_peer_t;
33
34namespace rbd {
35namespace mirror {
36
37namespace {
38
39class MirrorAdminSocketCommand {
40public:
41 virtual ~MirrorAdminSocketCommand() {}
42 virtual bool call(Formatter *f, stringstream *ss) = 0;
43};
44
45class StatusCommand : public MirrorAdminSocketCommand {
46public:
47 explicit StatusCommand(Mirror *mirror) : mirror(mirror) {}
48
49 bool call(Formatter *f, stringstream *ss) override {
50 mirror->print_status(f, ss);
51 return true;
52 }
53
54private:
55 Mirror *mirror;
56};
57
58class StartCommand : public MirrorAdminSocketCommand {
59public:
60 explicit StartCommand(Mirror *mirror) : mirror(mirror) {}
61
62 bool call(Formatter *f, stringstream *ss) override {
63 mirror->start();
64 return true;
65 }
66
67private:
68 Mirror *mirror;
69};
70
71class StopCommand : public MirrorAdminSocketCommand {
72public:
73 explicit StopCommand(Mirror *mirror) : mirror(mirror) {}
74
75 bool call(Formatter *f, stringstream *ss) override {
76 mirror->stop();
77 return true;
78 }
79
80private:
81 Mirror *mirror;
82};
83
84class RestartCommand : public MirrorAdminSocketCommand {
85public:
86 explicit RestartCommand(Mirror *mirror) : mirror(mirror) {}
87
88 bool call(Formatter *f, stringstream *ss) override {
89 mirror->restart();
90 return true;
91 }
92
93private:
94 Mirror *mirror;
95};
96
97class FlushCommand : public MirrorAdminSocketCommand {
98public:
99 explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
100
101 bool call(Formatter *f, stringstream *ss) override {
102 mirror->flush();
103 return true;
104 }
105
106private:
107 Mirror *mirror;
108};
109
110class LeaderReleaseCommand : public MirrorAdminSocketCommand {
111public:
112 explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {}
113
114 bool call(Formatter *f, stringstream *ss) override {
115 mirror->release_leader();
116 return true;
117 }
118
119private:
120 Mirror *mirror;
121};
122
123} // anonymous namespace
124
125class MirrorAdminSocketHook : public AdminSocketHook {
126public:
127 MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
128 admin_socket(cct->get_admin_socket()) {
129 std::string command;
130 int r;
131
132 command = "rbd mirror status";
133 r = admin_socket->register_command(command, command, this,
134 "get status for rbd mirror");
135 if (r == 0) {
136 commands[command] = new StatusCommand(mirror);
137 }
138
139 command = "rbd mirror start";
140 r = admin_socket->register_command(command, command, this,
141 "start rbd mirror");
142 if (r == 0) {
143 commands[command] = new StartCommand(mirror);
144 }
145
146 command = "rbd mirror stop";
147 r = admin_socket->register_command(command, command, this,
148 "stop rbd mirror");
149 if (r == 0) {
150 commands[command] = new StopCommand(mirror);
151 }
152
153 command = "rbd mirror restart";
154 r = admin_socket->register_command(command, command, this,
155 "restart rbd mirror");
156 if (r == 0) {
157 commands[command] = new RestartCommand(mirror);
158 }
159
160 command = "rbd mirror flush";
161 r = admin_socket->register_command(command, command, this,
162 "flush rbd mirror");
163 if (r == 0) {
164 commands[command] = new FlushCommand(mirror);
165 }
166
167 command = "rbd mirror leader release";
168 r = admin_socket->register_command(command, command, this,
169 "release rbd mirror leader");
170 if (r == 0) {
171 commands[command] = new LeaderReleaseCommand(mirror);
172 }
173 }
174
175 ~MirrorAdminSocketHook() override {
176 for (Commands::const_iterator i = commands.begin(); i != commands.end();
177 ++i) {
178 (void)admin_socket->unregister_command(i->first);
179 delete i->second;
180 }
181 }
182
11fdf7f2
TL
183 bool call(std::string_view command, const cmdmap_t& cmdmap,
184 std::string_view format, bufferlist& out) override {
7c673cae 185 Commands::const_iterator i = commands.find(command);
11fdf7f2 186 ceph_assert(i != commands.end());
7c673cae
FG
187 Formatter *f = Formatter::create(format);
188 stringstream ss;
189 bool r = i->second->call(f, &ss);
190 delete f;
191 out.append(ss);
192 return r;
193 }
194
195private:
11fdf7f2 196 typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands;
7c673cae
FG
197
198 AdminSocket *admin_socket;
199 Commands commands;
200};
201
202Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
203 m_cct(cct),
204 m_args(args),
205 m_lock("rbd::mirror::Mirror"),
206 m_local(new librados::Rados()),
207 m_asok_hook(new MirrorAdminSocketHook(cct, this))
208{
11fdf7f2
TL
209 m_threads =
210 &(cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx>>(
211 "rbd_mirror::threads", false, cct));
c07f9fc5 212 m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
7c673cae
FG
213}
214
215Mirror::~Mirror()
216{
217 delete m_asok_hook;
218}
219
220void Mirror::handle_signal(int signum)
221{
92f5a8d4
TL
222 dout(20) << signum << dendl;
223
224 Mutex::Locker l(m_lock);
225
226 switch (signum) {
227 case SIGHUP:
228 for (auto &it : m_pool_replayers) {
229 it.second->reopen_logs();
230 }
231 g_ceph_context->reopen_logs();
232 break;
233
234 case SIGINT:
235 case SIGTERM:
236 m_stopping = true;
7c673cae 237 m_cond.Signal();
92f5a8d4
TL
238 break;
239
240 default:
241 ceph_abort_msgf("unexpected signal %d", signum);
7c673cae
FG
242 }
243}
244
245int Mirror::init()
246{
247 int r = m_local->init_with_context(m_cct);
248 if (r < 0) {
249 derr << "could not initialize rados handle" << dendl;
250 return r;
251 }
252
253 r = m_local->connect();
254 if (r < 0) {
255 derr << "error connecting to local cluster" << dendl;
256 return r;
257 }
258
c07f9fc5
FG
259 r = m_service_daemon->init();
260 if (r < 0) {
261 derr << "error registering service daemon: " << cpp_strerror(r) << dendl;
262 return r;
263 }
7c673cae 264
c07f9fc5
FG
265 m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock,
266 m_service_daemon.get()));
7c673cae
FG
267 return r;
268}
269
270void Mirror::run()
271{
272 dout(20) << "enter" << dendl;
273 while (!m_stopping) {
274 m_local_cluster_watcher->refresh_pools();
275 Mutex::Locker l(m_lock);
276 if (!m_manual_stop) {
277 update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
278 }
279 m_cond.WaitInterval(
280 m_lock,
11fdf7f2 281 utime_t(m_cct->_conf.get_val<uint64_t>("rbd_mirror_pool_replayers_refresh_interval"), 0));
7c673cae
FG
282 }
283
284 // stop all pool replayers in parallel
285 Mutex::Locker locker(m_lock);
286 for (auto &pool_replayer : m_pool_replayers) {
287 pool_replayer.second->stop(false);
288 }
289 dout(20) << "return" << dendl;
290}
291
292void Mirror::print_status(Formatter *f, stringstream *ss)
293{
294 dout(20) << "enter" << dendl;
295
296 Mutex::Locker l(m_lock);
297
298 if (m_stopping) {
299 return;
300 }
301
302 if (f) {
303 f->open_object_section("mirror_status");
304 f->open_array_section("pool_replayers");
305 };
306
307 for (auto &pool_replayer : m_pool_replayers) {
308 pool_replayer.second->print_status(f, ss);
309 }
310
311 if (f) {
312 f->close_section();
92f5a8d4
TL
313 f->close_section();
314 f->flush(*ss);
7c673cae 315 }
7c673cae
FG
316}
317
318void Mirror::start()
319{
320 dout(20) << "enter" << dendl;
321 Mutex::Locker l(m_lock);
322
323 if (m_stopping) {
324 return;
325 }
326
327 m_manual_stop = false;
328
329 for (auto &pool_replayer : m_pool_replayers) {
330 pool_replayer.second->start();
331 }
332}
333
334void Mirror::stop()
335{
336 dout(20) << "enter" << dendl;
337 Mutex::Locker l(m_lock);
338
339 if (m_stopping) {
340 return;
341 }
342
343 m_manual_stop = true;
344
345 for (auto &pool_replayer : m_pool_replayers) {
346 pool_replayer.second->stop(true);
347 }
348}
349
350void Mirror::restart()
351{
352 dout(20) << "enter" << dendl;
353 Mutex::Locker l(m_lock);
354
355 if (m_stopping) {
356 return;
357 }
358
359 m_manual_stop = false;
360
361 for (auto &pool_replayer : m_pool_replayers) {
362 pool_replayer.second->restart();
363 }
364}
365
366void Mirror::flush()
367{
368 dout(20) << "enter" << dendl;
369 Mutex::Locker l(m_lock);
370
371 if (m_stopping || m_manual_stop) {
372 return;
373 }
374
375 for (auto &pool_replayer : m_pool_replayers) {
376 pool_replayer.second->flush();
377 }
378}
379
380void Mirror::release_leader()
381{
382 dout(20) << "enter" << dendl;
383 Mutex::Locker l(m_lock);
384
385 if (m_stopping) {
386 return;
387 }
388
389 for (auto &pool_replayer : m_pool_replayers) {
390 pool_replayer.second->release_leader();
391 }
392}
393
394void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
395{
396 dout(20) << "enter" << dendl;
11fdf7f2 397 ceph_assert(m_lock.is_locked());
7c673cae
FG
398
399 // remove stale pool replayers before creating new pool replayers
400 for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
401 auto &peer = it->first.second;
402 auto pool_peer_it = pool_peers.find(it->first.first);
c07f9fc5
FG
403 if (pool_peer_it == pool_peers.end() ||
404 pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
7c673cae
FG
405 dout(20) << "removing pool replayer for " << peer << dendl;
406 // TODO: make async
c07f9fc5 407 it->second->shut_down();
7c673cae
FG
408 it = m_pool_replayers.erase(it);
409 } else {
410 ++it;
411 }
412 }
413
414 for (auto &kv : pool_peers) {
415 for (auto &peer : kv.second) {
416 PoolPeer pool_peer(kv.first, peer);
c07f9fc5
FG
417
418 auto pool_replayers_it = m_pool_replayers.find(pool_peer);
419 if (pool_replayers_it != m_pool_replayers.end()) {
420 auto& pool_replayer = pool_replayers_it->second;
421 if (pool_replayer->is_blacklisted()) {
422 derr << "restarting blacklisted pool replayer for " << peer << dendl;
423 // TODO: make async
424 pool_replayer->shut_down();
425 pool_replayer->init();
426 } else if (!pool_replayer->is_running()) {
427 derr << "restarting failed pool replayer for " << peer << dendl;
428 // TODO: make async
429 pool_replayer->shut_down();
430 pool_replayer->init();
431 }
432 } else {
7c673cae 433 dout(20) << "starting pool replayer for " << peer << dendl;
11fdf7f2
TL
434 unique_ptr<PoolReplayer<>> pool_replayer(new PoolReplayer<>(
435 m_threads, m_service_daemon.get(), kv.first, peer, m_args));
7c673cae 436
c07f9fc5
FG
437 // TODO: make async
438 pool_replayer->init();
7c673cae
FG
439 m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
440 }
441 }
442
443 // TODO currently only support a single peer
444 }
445}
446
447} // namespace mirror
448} // namespace rbd