]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/Mirror.cc
update sources to v12.2.1
[ceph.git] / ceph / src / tools / rbd_mirror / Mirror.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <boost/range/adaptor/map.hpp>
5
6#include "common/Formatter.h"
7#include "common/admin_socket.h"
8#include "common/debug.h"
9#include "common/errno.h"
10#include "librbd/ImageCtx.h"
11#include "Mirror.h"
c07f9fc5 12#include "ServiceDaemon.h"
7c673cae 13#include "Threads.h"
7c673cae
FG
14
15#define dout_context g_ceph_context
16#define dout_subsys ceph_subsys_rbd_mirror
17#undef dout_prefix
18#define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
19 << __func__ << ": "
20
21using std::list;
22using std::map;
23using std::set;
24using std::string;
25using std::unique_ptr;
26using std::vector;
27
28using librados::Rados;
29using librados::IoCtx;
30using librbd::mirror_peer_t;
31
32namespace rbd {
33namespace mirror {
34
35namespace {
36
37class MirrorAdminSocketCommand {
38public:
39 virtual ~MirrorAdminSocketCommand() {}
40 virtual bool call(Formatter *f, stringstream *ss) = 0;
41};
42
43class StatusCommand : public MirrorAdminSocketCommand {
44public:
45 explicit StatusCommand(Mirror *mirror) : mirror(mirror) {}
46
47 bool call(Formatter *f, stringstream *ss) override {
48 mirror->print_status(f, ss);
49 return true;
50 }
51
52private:
53 Mirror *mirror;
54};
55
56class StartCommand : public MirrorAdminSocketCommand {
57public:
58 explicit StartCommand(Mirror *mirror) : mirror(mirror) {}
59
60 bool call(Formatter *f, stringstream *ss) override {
61 mirror->start();
62 return true;
63 }
64
65private:
66 Mirror *mirror;
67};
68
69class StopCommand : public MirrorAdminSocketCommand {
70public:
71 explicit StopCommand(Mirror *mirror) : mirror(mirror) {}
72
73 bool call(Formatter *f, stringstream *ss) override {
74 mirror->stop();
75 return true;
76 }
77
78private:
79 Mirror *mirror;
80};
81
82class RestartCommand : public MirrorAdminSocketCommand {
83public:
84 explicit RestartCommand(Mirror *mirror) : mirror(mirror) {}
85
86 bool call(Formatter *f, stringstream *ss) override {
87 mirror->restart();
88 return true;
89 }
90
91private:
92 Mirror *mirror;
93};
94
95class FlushCommand : public MirrorAdminSocketCommand {
96public:
97 explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
98
99 bool call(Formatter *f, stringstream *ss) override {
100 mirror->flush();
101 return true;
102 }
103
104private:
105 Mirror *mirror;
106};
107
108class LeaderReleaseCommand : public MirrorAdminSocketCommand {
109public:
110 explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {}
111
112 bool call(Formatter *f, stringstream *ss) override {
113 mirror->release_leader();
114 return true;
115 }
116
117private:
118 Mirror *mirror;
119};
120
121} // anonymous namespace
122
123class MirrorAdminSocketHook : public AdminSocketHook {
124public:
125 MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
126 admin_socket(cct->get_admin_socket()) {
127 std::string command;
128 int r;
129
130 command = "rbd mirror status";
131 r = admin_socket->register_command(command, command, this,
132 "get status for rbd mirror");
133 if (r == 0) {
134 commands[command] = new StatusCommand(mirror);
135 }
136
137 command = "rbd mirror start";
138 r = admin_socket->register_command(command, command, this,
139 "start rbd mirror");
140 if (r == 0) {
141 commands[command] = new StartCommand(mirror);
142 }
143
144 command = "rbd mirror stop";
145 r = admin_socket->register_command(command, command, this,
146 "stop rbd mirror");
147 if (r == 0) {
148 commands[command] = new StopCommand(mirror);
149 }
150
151 command = "rbd mirror restart";
152 r = admin_socket->register_command(command, command, this,
153 "restart rbd mirror");
154 if (r == 0) {
155 commands[command] = new RestartCommand(mirror);
156 }
157
158 command = "rbd mirror flush";
159 r = admin_socket->register_command(command, command, this,
160 "flush rbd mirror");
161 if (r == 0) {
162 commands[command] = new FlushCommand(mirror);
163 }
164
165 command = "rbd mirror leader release";
166 r = admin_socket->register_command(command, command, this,
167 "release rbd mirror leader");
168 if (r == 0) {
169 commands[command] = new LeaderReleaseCommand(mirror);
170 }
171 }
172
173 ~MirrorAdminSocketHook() override {
174 for (Commands::const_iterator i = commands.begin(); i != commands.end();
175 ++i) {
176 (void)admin_socket->unregister_command(i->first);
177 delete i->second;
178 }
179 }
180
181 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
182 bufferlist& out) override {
183 Commands::const_iterator i = commands.find(command);
184 assert(i != commands.end());
185 Formatter *f = Formatter::create(format);
186 stringstream ss;
187 bool r = i->second->call(f, &ss);
188 delete f;
189 out.append(ss);
190 return r;
191 }
192
193private:
194 typedef std::map<std::string, MirrorAdminSocketCommand*> Commands;
195
196 AdminSocket *admin_socket;
197 Commands commands;
198};
199
200Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
201 m_cct(cct),
202 m_args(args),
203 m_lock("rbd::mirror::Mirror"),
204 m_local(new librados::Rados()),
205 m_asok_hook(new MirrorAdminSocketHook(cct, this))
206{
207 cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx> >(
208 m_threads, "rbd_mirror::threads");
c07f9fc5 209 m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
7c673cae
FG
210}
211
212Mirror::~Mirror()
213{
214 delete m_asok_hook;
215}
216
217void Mirror::handle_signal(int signum)
218{
219 m_stopping = true;
220 {
221 Mutex::Locker l(m_lock);
222 m_cond.Signal();
223 }
224}
225
226int Mirror::init()
227{
228 int r = m_local->init_with_context(m_cct);
229 if (r < 0) {
230 derr << "could not initialize rados handle" << dendl;
231 return r;
232 }
233
234 r = m_local->connect();
235 if (r < 0) {
236 derr << "error connecting to local cluster" << dendl;
237 return r;
238 }
239
c07f9fc5
FG
240 r = m_service_daemon->init();
241 if (r < 0) {
242 derr << "error registering service daemon: " << cpp_strerror(r) << dendl;
243 return r;
244 }
7c673cae 245
c07f9fc5
FG
246 m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock,
247 m_service_daemon.get()));
7c673cae 248
c07f9fc5
FG
249 m_image_deleter.reset(new ImageDeleter<>(m_threads->work_queue,
250 m_threads->timer,
251 &m_threads->timer_lock,
252 m_service_daemon.get()));
7c673cae
FG
253 return r;
254}
255
256void Mirror::run()
257{
258 dout(20) << "enter" << dendl;
259 while (!m_stopping) {
260 m_local_cluster_watcher->refresh_pools();
261 Mutex::Locker l(m_lock);
262 if (!m_manual_stop) {
263 update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
264 }
265 m_cond.WaitInterval(
266 m_lock,
181888fb 267 utime_t(m_cct->_conf->get_val<int64_t>("rbd_mirror_pool_replayers_refresh_interval"), 0));
7c673cae
FG
268 }
269
270 // stop all pool replayers in parallel
271 Mutex::Locker locker(m_lock);
272 for (auto &pool_replayer : m_pool_replayers) {
273 pool_replayer.second->stop(false);
274 }
275 dout(20) << "return" << dendl;
276}
277
278void Mirror::print_status(Formatter *f, stringstream *ss)
279{
280 dout(20) << "enter" << dendl;
281
282 Mutex::Locker l(m_lock);
283
284 if (m_stopping) {
285 return;
286 }
287
288 if (f) {
289 f->open_object_section("mirror_status");
290 f->open_array_section("pool_replayers");
291 };
292
293 for (auto &pool_replayer : m_pool_replayers) {
294 pool_replayer.second->print_status(f, ss);
295 }
296
297 if (f) {
298 f->close_section();
299 f->open_object_section("image_deleter");
300 }
301
302 m_image_deleter->print_status(f, ss);
7c673cae
FG
303}
304
305void Mirror::start()
306{
307 dout(20) << "enter" << dendl;
308 Mutex::Locker l(m_lock);
309
310 if (m_stopping) {
311 return;
312 }
313
314 m_manual_stop = false;
315
316 for (auto &pool_replayer : m_pool_replayers) {
317 pool_replayer.second->start();
318 }
319}
320
321void Mirror::stop()
322{
323 dout(20) << "enter" << dendl;
324 Mutex::Locker l(m_lock);
325
326 if (m_stopping) {
327 return;
328 }
329
330 m_manual_stop = true;
331
332 for (auto &pool_replayer : m_pool_replayers) {
333 pool_replayer.second->stop(true);
334 }
335}
336
337void Mirror::restart()
338{
339 dout(20) << "enter" << dendl;
340 Mutex::Locker l(m_lock);
341
342 if (m_stopping) {
343 return;
344 }
345
346 m_manual_stop = false;
347
348 for (auto &pool_replayer : m_pool_replayers) {
349 pool_replayer.second->restart();
350 }
351}
352
353void Mirror::flush()
354{
355 dout(20) << "enter" << dendl;
356 Mutex::Locker l(m_lock);
357
358 if (m_stopping || m_manual_stop) {
359 return;
360 }
361
362 for (auto &pool_replayer : m_pool_replayers) {
363 pool_replayer.second->flush();
364 }
365}
366
367void Mirror::release_leader()
368{
369 dout(20) << "enter" << dendl;
370 Mutex::Locker l(m_lock);
371
372 if (m_stopping) {
373 return;
374 }
375
376 for (auto &pool_replayer : m_pool_replayers) {
377 pool_replayer.second->release_leader();
378 }
379}
380
381void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
382{
383 dout(20) << "enter" << dendl;
384 assert(m_lock.is_locked());
385
386 // remove stale pool replayers before creating new pool replayers
387 for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
388 auto &peer = it->first.second;
389 auto pool_peer_it = pool_peers.find(it->first.first);
c07f9fc5
FG
390 if (pool_peer_it == pool_peers.end() ||
391 pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
7c673cae
FG
392 dout(20) << "removing pool replayer for " << peer << dendl;
393 // TODO: make async
c07f9fc5 394 it->second->shut_down();
7c673cae
FG
395 it = m_pool_replayers.erase(it);
396 } else {
397 ++it;
398 }
399 }
400
401 for (auto &kv : pool_peers) {
402 for (auto &peer : kv.second) {
403 PoolPeer pool_peer(kv.first, peer);
c07f9fc5
FG
404
405 auto pool_replayers_it = m_pool_replayers.find(pool_peer);
406 if (pool_replayers_it != m_pool_replayers.end()) {
407 auto& pool_replayer = pool_replayers_it->second;
408 if (pool_replayer->is_blacklisted()) {
409 derr << "restarting blacklisted pool replayer for " << peer << dendl;
410 // TODO: make async
411 pool_replayer->shut_down();
412 pool_replayer->init();
413 } else if (!pool_replayer->is_running()) {
414 derr << "restarting failed pool replayer for " << peer << dendl;
415 // TODO: make async
416 pool_replayer->shut_down();
417 pool_replayer->init();
418 }
419 } else {
7c673cae
FG
420 dout(20) << "starting pool replayer for " << peer << dendl;
421 unique_ptr<PoolReplayer> pool_replayer(new PoolReplayer(
c07f9fc5
FG
422 m_threads, m_service_daemon.get(), m_image_deleter.get(), kv.first,
423 peer, m_args));
7c673cae 424
c07f9fc5
FG
425 // TODO: make async
426 pool_replayer->init();
7c673cae
FG
427 m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
428 }
429 }
430
431 // TODO currently only support a single peer
432 }
433}
434
435} // namespace mirror
436} // namespace rbd