]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/Mirror.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / tools / rbd_mirror / Mirror.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <boost/range/adaptor/map.hpp>
5
6#include "common/Formatter.h"
7#include "common/admin_socket.h"
8#include "common/debug.h"
9#include "common/errno.h"
10#include "librbd/ImageCtx.h"
11#include "Mirror.h"
12#include "Threads.h"
13#include "ImageSync.h"
14
15#define dout_context g_ceph_context
16#define dout_subsys ceph_subsys_rbd_mirror
17#undef dout_prefix
18#define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
19 << __func__ << ": "
20
21using std::list;
22using std::map;
23using std::set;
24using std::string;
25using std::unique_ptr;
26using std::vector;
27
28using librados::Rados;
29using librados::IoCtx;
30using librbd::mirror_peer_t;
31
32namespace rbd {
33namespace mirror {
34
35namespace {
36
37class MirrorAdminSocketCommand {
38public:
39 virtual ~MirrorAdminSocketCommand() {}
40 virtual bool call(Formatter *f, stringstream *ss) = 0;
41};
42
43class StatusCommand : public MirrorAdminSocketCommand {
44public:
45 explicit StatusCommand(Mirror *mirror) : mirror(mirror) {}
46
47 bool call(Formatter *f, stringstream *ss) override {
48 mirror->print_status(f, ss);
49 return true;
50 }
51
52private:
53 Mirror *mirror;
54};
55
56class StartCommand : public MirrorAdminSocketCommand {
57public:
58 explicit StartCommand(Mirror *mirror) : mirror(mirror) {}
59
60 bool call(Formatter *f, stringstream *ss) override {
61 mirror->start();
62 return true;
63 }
64
65private:
66 Mirror *mirror;
67};
68
69class StopCommand : public MirrorAdminSocketCommand {
70public:
71 explicit StopCommand(Mirror *mirror) : mirror(mirror) {}
72
73 bool call(Formatter *f, stringstream *ss) override {
74 mirror->stop();
75 return true;
76 }
77
78private:
79 Mirror *mirror;
80};
81
82class RestartCommand : public MirrorAdminSocketCommand {
83public:
84 explicit RestartCommand(Mirror *mirror) : mirror(mirror) {}
85
86 bool call(Formatter *f, stringstream *ss) override {
87 mirror->restart();
88 return true;
89 }
90
91private:
92 Mirror *mirror;
93};
94
95class FlushCommand : public MirrorAdminSocketCommand {
96public:
97 explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
98
99 bool call(Formatter *f, stringstream *ss) override {
100 mirror->flush();
101 return true;
102 }
103
104private:
105 Mirror *mirror;
106};
107
108class LeaderReleaseCommand : public MirrorAdminSocketCommand {
109public:
110 explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {}
111
112 bool call(Formatter *f, stringstream *ss) override {
113 mirror->release_leader();
114 return true;
115 }
116
117private:
118 Mirror *mirror;
119};
120
121} // anonymous namespace
122
123class MirrorAdminSocketHook : public AdminSocketHook {
124public:
125 MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
126 admin_socket(cct->get_admin_socket()) {
127 std::string command;
128 int r;
129
130 command = "rbd mirror status";
131 r = admin_socket->register_command(command, command, this,
132 "get status for rbd mirror");
133 if (r == 0) {
134 commands[command] = new StatusCommand(mirror);
135 }
136
137 command = "rbd mirror start";
138 r = admin_socket->register_command(command, command, this,
139 "start rbd mirror");
140 if (r == 0) {
141 commands[command] = new StartCommand(mirror);
142 }
143
144 command = "rbd mirror stop";
145 r = admin_socket->register_command(command, command, this,
146 "stop rbd mirror");
147 if (r == 0) {
148 commands[command] = new StopCommand(mirror);
149 }
150
151 command = "rbd mirror restart";
152 r = admin_socket->register_command(command, command, this,
153 "restart rbd mirror");
154 if (r == 0) {
155 commands[command] = new RestartCommand(mirror);
156 }
157
158 command = "rbd mirror flush";
159 r = admin_socket->register_command(command, command, this,
160 "flush rbd mirror");
161 if (r == 0) {
162 commands[command] = new FlushCommand(mirror);
163 }
164
165 command = "rbd mirror leader release";
166 r = admin_socket->register_command(command, command, this,
167 "release rbd mirror leader");
168 if (r == 0) {
169 commands[command] = new LeaderReleaseCommand(mirror);
170 }
171 }
172
173 ~MirrorAdminSocketHook() override {
174 for (Commands::const_iterator i = commands.begin(); i != commands.end();
175 ++i) {
176 (void)admin_socket->unregister_command(i->first);
177 delete i->second;
178 }
179 }
180
181 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
182 bufferlist& out) override {
183 Commands::const_iterator i = commands.find(command);
184 assert(i != commands.end());
185 Formatter *f = Formatter::create(format);
186 stringstream ss;
187 bool r = i->second->call(f, &ss);
188 delete f;
189 out.append(ss);
190 return r;
191 }
192
193private:
194 typedef std::map<std::string, MirrorAdminSocketCommand*> Commands;
195
196 AdminSocket *admin_socket;
197 Commands commands;
198};
199
200Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
201 m_cct(cct),
202 m_args(args),
203 m_lock("rbd::mirror::Mirror"),
204 m_local(new librados::Rados()),
205 m_asok_hook(new MirrorAdminSocketHook(cct, this))
206{
207 cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx> >(
208 m_threads, "rbd_mirror::threads");
209}
210
211Mirror::~Mirror()
212{
213 delete m_asok_hook;
214}
215
216void Mirror::handle_signal(int signum)
217{
218 m_stopping = true;
219 {
220 Mutex::Locker l(m_lock);
221 m_cond.Signal();
222 }
223}
224
225int Mirror::init()
226{
227 int r = m_local->init_with_context(m_cct);
228 if (r < 0) {
229 derr << "could not initialize rados handle" << dendl;
230 return r;
231 }
232
233 r = m_local->connect();
234 if (r < 0) {
235 derr << "error connecting to local cluster" << dendl;
236 return r;
237 }
238
239 m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock));
240
241 m_image_deleter.reset(new ImageDeleter(m_threads->work_queue,
242 m_threads->timer,
243 &m_threads->timer_lock));
244
245 m_image_sync_throttler.reset(new ImageSyncThrottler<>());
246
247 return r;
248}
249
250void Mirror::run()
251{
252 dout(20) << "enter" << dendl;
253 while (!m_stopping) {
254 m_local_cluster_watcher->refresh_pools();
255 Mutex::Locker l(m_lock);
256 if (!m_manual_stop) {
257 update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
258 }
259 m_cond.WaitInterval(
260 m_lock,
261 utime_t(m_cct->_conf->rbd_mirror_pool_replayers_refresh_interval, 0));
262 }
263
264 // stop all pool replayers in parallel
265 Mutex::Locker locker(m_lock);
266 for (auto &pool_replayer : m_pool_replayers) {
267 pool_replayer.second->stop(false);
268 }
269 dout(20) << "return" << dendl;
270}
271
272void Mirror::print_status(Formatter *f, stringstream *ss)
273{
274 dout(20) << "enter" << dendl;
275
276 Mutex::Locker l(m_lock);
277
278 if (m_stopping) {
279 return;
280 }
281
282 if (f) {
283 f->open_object_section("mirror_status");
284 f->open_array_section("pool_replayers");
285 };
286
287 for (auto &pool_replayer : m_pool_replayers) {
288 pool_replayer.second->print_status(f, ss);
289 }
290
291 if (f) {
292 f->close_section();
293 f->open_object_section("image_deleter");
294 }
295
296 m_image_deleter->print_status(f, ss);
297
298 if (f) {
299 f->close_section();
300 f->open_object_section("sync_throttler");
301 }
302
303 m_image_sync_throttler->print_status(f, ss);
304
305 if (f) {
306 f->close_section();
307 f->close_section();
308 f->flush(*ss);
309 }
310}
311
312void Mirror::start()
313{
314 dout(20) << "enter" << dendl;
315 Mutex::Locker l(m_lock);
316
317 if (m_stopping) {
318 return;
319 }
320
321 m_manual_stop = false;
322
323 for (auto &pool_replayer : m_pool_replayers) {
324 pool_replayer.second->start();
325 }
326}
327
328void Mirror::stop()
329{
330 dout(20) << "enter" << dendl;
331 Mutex::Locker l(m_lock);
332
333 if (m_stopping) {
334 return;
335 }
336
337 m_manual_stop = true;
338
339 for (auto &pool_replayer : m_pool_replayers) {
340 pool_replayer.second->stop(true);
341 }
342}
343
344void Mirror::restart()
345{
346 dout(20) << "enter" << dendl;
347 Mutex::Locker l(m_lock);
348
349 if (m_stopping) {
350 return;
351 }
352
353 m_manual_stop = false;
354
355 for (auto &pool_replayer : m_pool_replayers) {
356 pool_replayer.second->restart();
357 }
358}
359
360void Mirror::flush()
361{
362 dout(20) << "enter" << dendl;
363 Mutex::Locker l(m_lock);
364
365 if (m_stopping || m_manual_stop) {
366 return;
367 }
368
369 for (auto &pool_replayer : m_pool_replayers) {
370 pool_replayer.second->flush();
371 }
372}
373
374void Mirror::release_leader()
375{
376 dout(20) << "enter" << dendl;
377 Mutex::Locker l(m_lock);
378
379 if (m_stopping) {
380 return;
381 }
382
383 for (auto &pool_replayer : m_pool_replayers) {
384 pool_replayer.second->release_leader();
385 }
386}
387
388void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
389{
390 dout(20) << "enter" << dendl;
391 assert(m_lock.is_locked());
392
393 // remove stale pool replayers before creating new pool replayers
394 for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
395 auto &peer = it->first.second;
396 auto pool_peer_it = pool_peers.find(it->first.first);
397 if (it->second->is_blacklisted()) {
398 derr << "removing blacklisted pool replayer for " << peer << dendl;
399 // TODO: make async
400 it = m_pool_replayers.erase(it);
401 } else if (pool_peer_it == pool_peers.end() ||
402 pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
403 dout(20) << "removing pool replayer for " << peer << dendl;
404 // TODO: make async
405 it = m_pool_replayers.erase(it);
406 } else {
407 ++it;
408 }
409 }
410
411 for (auto &kv : pool_peers) {
412 for (auto &peer : kv.second) {
413 PoolPeer pool_peer(kv.first, peer);
414 if (m_pool_replayers.find(pool_peer) == m_pool_replayers.end()) {
415 dout(20) << "starting pool replayer for " << peer << dendl;
416 unique_ptr<PoolReplayer> pool_replayer(new PoolReplayer(
417 m_threads, m_image_deleter, m_image_sync_throttler, kv.first, peer,
418 m_args));
419
420 // TODO: make async, and retry connecting within pool replayer
421 int r = pool_replayer->init();
422 if (r < 0) {
423 continue;
424 }
425 m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
426 }
427 }
428
429 // TODO currently only support a single peer
430 }
431}
432
433} // namespace mirror
434} // namespace rbd