]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/Mirror.cc
update sources to v12.1.0
[ceph.git] / ceph / src / tools / rbd_mirror / Mirror.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <boost/range/adaptor/map.hpp>
5
6#include "common/Formatter.h"
7#include "common/admin_socket.h"
8#include "common/debug.h"
9#include "common/errno.h"
10#include "librbd/ImageCtx.h"
11#include "Mirror.h"
12#include "Threads.h"
13#include "ImageSync.h"
14
15#define dout_context g_ceph_context
16#define dout_subsys ceph_subsys_rbd_mirror
17#undef dout_prefix
18#define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
19 << __func__ << ": "
20
21using std::list;
22using std::map;
23using std::set;
24using std::string;
25using std::unique_ptr;
26using std::vector;
27
28using librados::Rados;
29using librados::IoCtx;
30using librbd::mirror_peer_t;
31
32namespace rbd {
33namespace mirror {
34
35namespace {
36
37class MirrorAdminSocketCommand {
38public:
39 virtual ~MirrorAdminSocketCommand() {}
40 virtual bool call(Formatter *f, stringstream *ss) = 0;
41};
42
43class StatusCommand : public MirrorAdminSocketCommand {
44public:
45 explicit StatusCommand(Mirror *mirror) : mirror(mirror) {}
46
47 bool call(Formatter *f, stringstream *ss) override {
48 mirror->print_status(f, ss);
49 return true;
50 }
51
52private:
53 Mirror *mirror;
54};
55
56class StartCommand : public MirrorAdminSocketCommand {
57public:
58 explicit StartCommand(Mirror *mirror) : mirror(mirror) {}
59
60 bool call(Formatter *f, stringstream *ss) override {
61 mirror->start();
62 return true;
63 }
64
65private:
66 Mirror *mirror;
67};
68
69class StopCommand : public MirrorAdminSocketCommand {
70public:
71 explicit StopCommand(Mirror *mirror) : mirror(mirror) {}
72
73 bool call(Formatter *f, stringstream *ss) override {
74 mirror->stop();
75 return true;
76 }
77
78private:
79 Mirror *mirror;
80};
81
82class RestartCommand : public MirrorAdminSocketCommand {
83public:
84 explicit RestartCommand(Mirror *mirror) : mirror(mirror) {}
85
86 bool call(Formatter *f, stringstream *ss) override {
87 mirror->restart();
88 return true;
89 }
90
91private:
92 Mirror *mirror;
93};
94
95class FlushCommand : public MirrorAdminSocketCommand {
96public:
97 explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
98
99 bool call(Formatter *f, stringstream *ss) override {
100 mirror->flush();
101 return true;
102 }
103
104private:
105 Mirror *mirror;
106};
107
108class LeaderReleaseCommand : public MirrorAdminSocketCommand {
109public:
110 explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {}
111
112 bool call(Formatter *f, stringstream *ss) override {
113 mirror->release_leader();
114 return true;
115 }
116
117private:
118 Mirror *mirror;
119};
120
121} // anonymous namespace
122
123class MirrorAdminSocketHook : public AdminSocketHook {
124public:
125 MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
126 admin_socket(cct->get_admin_socket()) {
127 std::string command;
128 int r;
129
130 command = "rbd mirror status";
131 r = admin_socket->register_command(command, command, this,
132 "get status for rbd mirror");
133 if (r == 0) {
134 commands[command] = new StatusCommand(mirror);
135 }
136
137 command = "rbd mirror start";
138 r = admin_socket->register_command(command, command, this,
139 "start rbd mirror");
140 if (r == 0) {
141 commands[command] = new StartCommand(mirror);
142 }
143
144 command = "rbd mirror stop";
145 r = admin_socket->register_command(command, command, this,
146 "stop rbd mirror");
147 if (r == 0) {
148 commands[command] = new StopCommand(mirror);
149 }
150
151 command = "rbd mirror restart";
152 r = admin_socket->register_command(command, command, this,
153 "restart rbd mirror");
154 if (r == 0) {
155 commands[command] = new RestartCommand(mirror);
156 }
157
158 command = "rbd mirror flush";
159 r = admin_socket->register_command(command, command, this,
160 "flush rbd mirror");
161 if (r == 0) {
162 commands[command] = new FlushCommand(mirror);
163 }
164
165 command = "rbd mirror leader release";
166 r = admin_socket->register_command(command, command, this,
167 "release rbd mirror leader");
168 if (r == 0) {
169 commands[command] = new LeaderReleaseCommand(mirror);
170 }
171 }
172
173 ~MirrorAdminSocketHook() override {
174 for (Commands::const_iterator i = commands.begin(); i != commands.end();
175 ++i) {
176 (void)admin_socket->unregister_command(i->first);
177 delete i->second;
178 }
179 }
180
181 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
182 bufferlist& out) override {
183 Commands::const_iterator i = commands.find(command);
184 assert(i != commands.end());
185 Formatter *f = Formatter::create(format);
186 stringstream ss;
187 bool r = i->second->call(f, &ss);
188 delete f;
189 out.append(ss);
190 return r;
191 }
192
193private:
194 typedef std::map<std::string, MirrorAdminSocketCommand*> Commands;
195
196 AdminSocket *admin_socket;
197 Commands commands;
198};
199
200Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
201 m_cct(cct),
202 m_args(args),
203 m_lock("rbd::mirror::Mirror"),
204 m_local(new librados::Rados()),
205 m_asok_hook(new MirrorAdminSocketHook(cct, this))
206{
207 cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx> >(
208 m_threads, "rbd_mirror::threads");
209}
210
211Mirror::~Mirror()
212{
213 delete m_asok_hook;
214}
215
216void Mirror::handle_signal(int signum)
217{
218 m_stopping = true;
219 {
220 Mutex::Locker l(m_lock);
221 m_cond.Signal();
222 }
223}
224
225int Mirror::init()
226{
227 int r = m_local->init_with_context(m_cct);
228 if (r < 0) {
229 derr << "could not initialize rados handle" << dendl;
230 return r;
231 }
232
233 r = m_local->connect();
234 if (r < 0) {
235 derr << "error connecting to local cluster" << dendl;
236 return r;
237 }
238
239 m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock));
240
241 m_image_deleter.reset(new ImageDeleter(m_threads->work_queue,
242 m_threads->timer,
243 &m_threads->timer_lock));
244
7c673cae
FG
245 return r;
246}
247
248void Mirror::run()
249{
250 dout(20) << "enter" << dendl;
251 while (!m_stopping) {
252 m_local_cluster_watcher->refresh_pools();
253 Mutex::Locker l(m_lock);
254 if (!m_manual_stop) {
255 update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
256 }
257 m_cond.WaitInterval(
258 m_lock,
259 utime_t(m_cct->_conf->rbd_mirror_pool_replayers_refresh_interval, 0));
260 }
261
262 // stop all pool replayers in parallel
263 Mutex::Locker locker(m_lock);
264 for (auto &pool_replayer : m_pool_replayers) {
265 pool_replayer.second->stop(false);
266 }
267 dout(20) << "return" << dendl;
268}
269
270void Mirror::print_status(Formatter *f, stringstream *ss)
271{
272 dout(20) << "enter" << dendl;
273
274 Mutex::Locker l(m_lock);
275
276 if (m_stopping) {
277 return;
278 }
279
280 if (f) {
281 f->open_object_section("mirror_status");
282 f->open_array_section("pool_replayers");
283 };
284
285 for (auto &pool_replayer : m_pool_replayers) {
286 pool_replayer.second->print_status(f, ss);
287 }
288
289 if (f) {
290 f->close_section();
291 f->open_object_section("image_deleter");
292 }
293
294 m_image_deleter->print_status(f, ss);
7c673cae
FG
295}
296
297void Mirror::start()
298{
299 dout(20) << "enter" << dendl;
300 Mutex::Locker l(m_lock);
301
302 if (m_stopping) {
303 return;
304 }
305
306 m_manual_stop = false;
307
308 for (auto &pool_replayer : m_pool_replayers) {
309 pool_replayer.second->start();
310 }
311}
312
313void Mirror::stop()
314{
315 dout(20) << "enter" << dendl;
316 Mutex::Locker l(m_lock);
317
318 if (m_stopping) {
319 return;
320 }
321
322 m_manual_stop = true;
323
324 for (auto &pool_replayer : m_pool_replayers) {
325 pool_replayer.second->stop(true);
326 }
327}
328
329void Mirror::restart()
330{
331 dout(20) << "enter" << dendl;
332 Mutex::Locker l(m_lock);
333
334 if (m_stopping) {
335 return;
336 }
337
338 m_manual_stop = false;
339
340 for (auto &pool_replayer : m_pool_replayers) {
341 pool_replayer.second->restart();
342 }
343}
344
345void Mirror::flush()
346{
347 dout(20) << "enter" << dendl;
348 Mutex::Locker l(m_lock);
349
350 if (m_stopping || m_manual_stop) {
351 return;
352 }
353
354 for (auto &pool_replayer : m_pool_replayers) {
355 pool_replayer.second->flush();
356 }
357}
358
359void Mirror::release_leader()
360{
361 dout(20) << "enter" << dendl;
362 Mutex::Locker l(m_lock);
363
364 if (m_stopping) {
365 return;
366 }
367
368 for (auto &pool_replayer : m_pool_replayers) {
369 pool_replayer.second->release_leader();
370 }
371}
372
373void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
374{
375 dout(20) << "enter" << dendl;
376 assert(m_lock.is_locked());
377
378 // remove stale pool replayers before creating new pool replayers
379 for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
380 auto &peer = it->first.second;
381 auto pool_peer_it = pool_peers.find(it->first.first);
382 if (it->second->is_blacklisted()) {
383 derr << "removing blacklisted pool replayer for " << peer << dendl;
384 // TODO: make async
385 it = m_pool_replayers.erase(it);
386 } else if (pool_peer_it == pool_peers.end() ||
387 pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
388 dout(20) << "removing pool replayer for " << peer << dendl;
389 // TODO: make async
390 it = m_pool_replayers.erase(it);
391 } else {
392 ++it;
393 }
394 }
395
396 for (auto &kv : pool_peers) {
397 for (auto &peer : kv.second) {
398 PoolPeer pool_peer(kv.first, peer);
399 if (m_pool_replayers.find(pool_peer) == m_pool_replayers.end()) {
400 dout(20) << "starting pool replayer for " << peer << dendl;
401 unique_ptr<PoolReplayer> pool_replayer(new PoolReplayer(
31f18b77 402 m_threads, m_image_deleter, kv.first, peer, m_args));
7c673cae
FG
403
404 // TODO: make async, and retry connecting within pool replayer
405 int r = pool_replayer->init();
406 if (r < 0) {
407 continue;
408 }
409 m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
410 }
411 }
412
413 // TODO currently only support a single peer
414 }
415}
416
417} // namespace mirror
418} // namespace rbd