]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "PoolReplayer.h" | |
5 | #include <boost/bind.hpp> | |
6 | #include "common/Formatter.h" | |
7 | #include "common/admin_socket.h" | |
8 | #include "common/ceph_argparse.h" | |
9 | #include "common/code_environment.h" | |
10 | #include "common/common_init.h" | |
11 | #include "common/debug.h" | |
12 | #include "common/errno.h" | |
13 | #include "include/stringify.h" | |
14 | #include "cls/rbd/cls_rbd_client.h" | |
15 | #include "global/global_context.h" | |
16 | #include "librbd/internal.h" | |
17 | #include "librbd/Utils.h" | |
18 | #include "librbd/Watcher.h" | |
19 | #include "librbd/api/Mirror.h" | |
20 | #include "InstanceReplayer.h" | |
21 | #include "InstanceWatcher.h" | |
22 | #include "LeaderWatcher.h" | |
c07f9fc5 | 23 | #include "ServiceDaemon.h" |
7c673cae FG |
24 | #include "Threads.h" |
25 | ||
26 | #define dout_context g_ceph_context | |
27 | #define dout_subsys ceph_subsys_rbd_mirror | |
28 | #undef dout_prefix | |
29 | #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \ | |
30 | << this << " " << __func__ << ": " | |
31 | ||
32 | using std::chrono::seconds; | |
33 | using std::map; | |
34 | using std::string; | |
35 | using std::unique_ptr; | |
36 | using std::vector; | |
37 | ||
38 | using librbd::cls_client::dir_get_name; | |
39 | using librbd::util::create_async_context_callback; | |
40 | ||
41 | namespace rbd { | |
42 | namespace mirror { | |
43 | ||
44 | namespace { | |
45 | ||
c07f9fc5 FG |
46 | const std::string SERVICE_DAEMON_LEADER_KEY("leader"); |
47 | const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count"); | |
48 | const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count"); | |
49 | ||
7c673cae FG |
50 | class PoolReplayerAdminSocketCommand { |
51 | public: | |
52 | PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer) | |
53 | : pool_replayer(pool_replayer) { | |
54 | } | |
55 | virtual ~PoolReplayerAdminSocketCommand() {} | |
56 | virtual bool call(Formatter *f, stringstream *ss) = 0; | |
57 | protected: | |
58 | PoolReplayer *pool_replayer; | |
59 | }; | |
60 | ||
61 | class StatusCommand : public PoolReplayerAdminSocketCommand { | |
62 | public: | |
63 | explicit StatusCommand(PoolReplayer *pool_replayer) | |
64 | : PoolReplayerAdminSocketCommand(pool_replayer) { | |
65 | } | |
66 | ||
67 | bool call(Formatter *f, stringstream *ss) override { | |
68 | pool_replayer->print_status(f, ss); | |
69 | return true; | |
70 | } | |
71 | }; | |
72 | ||
73 | class StartCommand : public PoolReplayerAdminSocketCommand { | |
74 | public: | |
75 | explicit StartCommand(PoolReplayer *pool_replayer) | |
76 | : PoolReplayerAdminSocketCommand(pool_replayer) { | |
77 | } | |
78 | ||
79 | bool call(Formatter *f, stringstream *ss) override { | |
80 | pool_replayer->start(); | |
81 | return true; | |
82 | } | |
83 | }; | |
84 | ||
85 | class StopCommand : public PoolReplayerAdminSocketCommand { | |
86 | public: | |
87 | explicit StopCommand(PoolReplayer *pool_replayer) | |
88 | : PoolReplayerAdminSocketCommand(pool_replayer) { | |
89 | } | |
90 | ||
91 | bool call(Formatter *f, stringstream *ss) override { | |
92 | pool_replayer->stop(true); | |
93 | return true; | |
94 | } | |
95 | }; | |
96 | ||
97 | class RestartCommand : public PoolReplayerAdminSocketCommand { | |
98 | public: | |
99 | explicit RestartCommand(PoolReplayer *pool_replayer) | |
100 | : PoolReplayerAdminSocketCommand(pool_replayer) { | |
101 | } | |
102 | ||
103 | bool call(Formatter *f, stringstream *ss) override { | |
104 | pool_replayer->restart(); | |
105 | return true; | |
106 | } | |
107 | }; | |
108 | ||
109 | class FlushCommand : public PoolReplayerAdminSocketCommand { | |
110 | public: | |
111 | explicit FlushCommand(PoolReplayer *pool_replayer) | |
112 | : PoolReplayerAdminSocketCommand(pool_replayer) { | |
113 | } | |
114 | ||
115 | bool call(Formatter *f, stringstream *ss) override { | |
116 | pool_replayer->flush(); | |
117 | return true; | |
118 | } | |
119 | }; | |
120 | ||
121 | class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand { | |
122 | public: | |
123 | explicit LeaderReleaseCommand(PoolReplayer *pool_replayer) | |
124 | : PoolReplayerAdminSocketCommand(pool_replayer) { | |
125 | } | |
126 | ||
127 | bool call(Formatter *f, stringstream *ss) override { | |
128 | pool_replayer->release_leader(); | |
129 | return true; | |
130 | } | |
131 | }; | |
132 | ||
133 | class PoolReplayerAdminSocketHook : public AdminSocketHook { | |
134 | public: | |
135 | PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name, | |
136 | PoolReplayer *pool_replayer) | |
137 | : admin_socket(cct->get_admin_socket()) { | |
138 | std::string command; | |
139 | int r; | |
140 | ||
141 | command = "rbd mirror status " + name; | |
142 | r = admin_socket->register_command(command, command, this, | |
143 | "get status for rbd mirror " + name); | |
144 | if (r == 0) { | |
145 | commands[command] = new StatusCommand(pool_replayer); | |
146 | } | |
147 | ||
148 | command = "rbd mirror start " + name; | |
149 | r = admin_socket->register_command(command, command, this, | |
150 | "start rbd mirror " + name); | |
151 | if (r == 0) { | |
152 | commands[command] = new StartCommand(pool_replayer); | |
153 | } | |
154 | ||
155 | command = "rbd mirror stop " + name; | |
156 | r = admin_socket->register_command(command, command, this, | |
157 | "stop rbd mirror " + name); | |
158 | if (r == 0) { | |
159 | commands[command] = new StopCommand(pool_replayer); | |
160 | } | |
161 | ||
162 | command = "rbd mirror restart " + name; | |
163 | r = admin_socket->register_command(command, command, this, | |
164 | "restart rbd mirror " + name); | |
165 | if (r == 0) { | |
166 | commands[command] = new RestartCommand(pool_replayer); | |
167 | } | |
168 | ||
169 | command = "rbd mirror flush " + name; | |
170 | r = admin_socket->register_command(command, command, this, | |
171 | "flush rbd mirror " + name); | |
172 | if (r == 0) { | |
173 | commands[command] = new FlushCommand(pool_replayer); | |
174 | } | |
175 | ||
176 | command = "rbd mirror leader release " + name; | |
177 | r = admin_socket->register_command(command, command, this, | |
178 | "release rbd mirror leader " + name); | |
179 | if (r == 0) { | |
180 | commands[command] = new LeaderReleaseCommand(pool_replayer); | |
181 | } | |
182 | } | |
183 | ||
184 | ~PoolReplayerAdminSocketHook() override { | |
185 | for (Commands::const_iterator i = commands.begin(); i != commands.end(); | |
186 | ++i) { | |
187 | (void)admin_socket->unregister_command(i->first); | |
188 | delete i->second; | |
189 | } | |
190 | } | |
191 | ||
192 | bool call(std::string command, cmdmap_t& cmdmap, std::string format, | |
193 | bufferlist& out) override { | |
194 | Commands::const_iterator i = commands.find(command); | |
195 | assert(i != commands.end()); | |
196 | Formatter *f = Formatter::create(format); | |
197 | stringstream ss; | |
198 | bool r = i->second->call(f, &ss); | |
199 | delete f; | |
200 | out.append(ss); | |
201 | return r; | |
202 | } | |
203 | ||
204 | private: | |
205 | typedef std::map<std::string, PoolReplayerAdminSocketCommand*> Commands; | |
206 | ||
207 | AdminSocket *admin_socket; | |
208 | Commands commands; | |
209 | }; | |
210 | ||
211 | } // anonymous namespace | |
212 | ||
213 | PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads, | |
c07f9fc5 FG |
214 | ServiceDaemon<librbd::ImageCtx>* service_daemon, |
215 | ImageDeleter<>* image_deleter, | |
7c673cae FG |
216 | int64_t local_pool_id, const peer_t &peer, |
217 | const std::vector<const char*> &args) : | |
218 | m_threads(threads), | |
c07f9fc5 | 219 | m_service_daemon(service_daemon), |
7c673cae | 220 | m_image_deleter(image_deleter), |
c07f9fc5 | 221 | m_local_pool_id(local_pool_id), |
7c673cae FG |
222 | m_peer(peer), |
223 | m_args(args), | |
c07f9fc5 | 224 | m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)), |
7c673cae FG |
225 | m_local_pool_watcher_listener(this, true), |
226 | m_remote_pool_watcher_listener(this, false), | |
7c673cae FG |
227 | m_pool_replayer_thread(this), |
228 | m_leader_listener(this) | |
229 | { | |
230 | } | |
231 | ||
232 | PoolReplayer::~PoolReplayer() | |
233 | { | |
234 | delete m_asok_hook; | |
c07f9fc5 | 235 | shut_down(); |
7c673cae FG |
236 | } |
237 | ||
238 | bool PoolReplayer::is_blacklisted() const { | |
239 | Mutex::Locker locker(m_lock); | |
240 | return m_blacklisted; | |
241 | } | |
242 | ||
243 | bool PoolReplayer::is_leader() const { | |
244 | Mutex::Locker locker(m_lock); | |
245 | return m_leader_watcher && m_leader_watcher->is_leader(); | |
246 | } | |
247 | ||
c07f9fc5 FG |
248 | bool PoolReplayer::is_running() const { |
249 | return m_pool_replayer_thread.is_started(); | |
250 | } | |
251 | ||
252 | void PoolReplayer::init() | |
7c673cae | 253 | { |
c07f9fc5 FG |
254 | assert(!m_pool_replayer_thread.is_started()); |
255 | ||
256 | // reset state | |
257 | m_stopping = false; | |
258 | m_blacklisted = false; | |
7c673cae | 259 | |
c07f9fc5 | 260 | dout(20) << "replaying for " << m_peer << dendl; |
7c673cae FG |
261 | int r = init_rados(g_ceph_context->_conf->cluster, |
262 | g_ceph_context->_conf->name.to_str(), | |
263 | "local cluster", &m_local_rados); | |
264 | if (r < 0) { | |
c07f9fc5 FG |
265 | m_callout_id = m_service_daemon->add_or_update_callout( |
266 | m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, | |
267 | "unable to connect to local cluster"); | |
268 | return; | |
7c673cae FG |
269 | } |
270 | ||
271 | r = init_rados(m_peer.cluster_name, m_peer.client_name, | |
272 | std::string("remote peer ") + stringify(m_peer), | |
273 | &m_remote_rados); | |
274 | if (r < 0) { | |
c07f9fc5 FG |
275 | m_callout_id = m_service_daemon->add_or_update_callout( |
276 | m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, | |
277 | "unable to connect to remote cluster"); | |
278 | return; | |
7c673cae FG |
279 | } |
280 | ||
281 | r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx); | |
282 | if (r < 0) { | |
283 | derr << "error accessing local pool " << m_local_pool_id << ": " | |
284 | << cpp_strerror(r) << dendl; | |
c07f9fc5 | 285 | return; |
7c673cae FG |
286 | } |
287 | ||
288 | std::string local_mirror_uuid; | |
289 | r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx, | |
290 | &local_mirror_uuid); | |
291 | if (r < 0) { | |
292 | derr << "failed to retrieve local mirror uuid from pool " | |
293 | << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; | |
c07f9fc5 FG |
294 | m_callout_id = m_service_daemon->add_or_update_callout( |
295 | m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, | |
296 | "unable to query local mirror uuid"); | |
297 | return; | |
7c673cae FG |
298 | } |
299 | ||
300 | r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(), | |
301 | m_remote_io_ctx); | |
302 | if (r < 0) { | |
303 | derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name() | |
304 | << ": " << cpp_strerror(r) << dendl; | |
c07f9fc5 FG |
305 | m_callout_id = m_service_daemon->add_or_update_callout( |
306 | m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING, | |
307 | "unable to access remote pool"); | |
308 | return; | |
7c673cae FG |
309 | } |
310 | ||
311 | dout(20) << "connected to " << m_peer << dendl; | |
312 | ||
c07f9fc5 FG |
313 | m_instance_replayer.reset(InstanceReplayer<>::create( |
314 | m_threads, m_service_daemon, m_image_deleter, m_local_rados, | |
315 | local_mirror_uuid, m_local_pool_id)); | |
7c673cae FG |
316 | m_instance_replayer->init(); |
317 | m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx); | |
318 | ||
c07f9fc5 FG |
319 | m_instance_watcher.reset(InstanceWatcher<>::create( |
320 | m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get())); | |
7c673cae FG |
321 | r = m_instance_watcher->init(); |
322 | if (r < 0) { | |
323 | derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl; | |
c07f9fc5 FG |
324 | m_callout_id = m_service_daemon->add_or_update_callout( |
325 | m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, | |
326 | "unable to initialize instance messenger object"); | |
327 | return; | |
7c673cae FG |
328 | } |
329 | ||
330 | m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx, | |
331 | &m_leader_listener)); | |
332 | r = m_leader_watcher->init(); | |
333 | if (r < 0) { | |
334 | derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl; | |
c07f9fc5 FG |
335 | m_callout_id = m_service_daemon->add_or_update_callout( |
336 | m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, | |
337 | "unable to initialize leader messenger object"); | |
338 | return; | |
339 | } | |
340 | ||
341 | if (m_callout_id != service_daemon::CALLOUT_ID_NONE) { | |
342 | m_service_daemon->remove_callout(m_local_pool_id, m_callout_id); | |
343 | m_callout_id = service_daemon::CALLOUT_ID_NONE; | |
7c673cae FG |
344 | } |
345 | ||
346 | m_pool_replayer_thread.create("pool replayer"); | |
c07f9fc5 | 347 | } |
7c673cae | 348 | |
c07f9fc5 FG |
349 | void PoolReplayer::shut_down() { |
350 | m_stopping = true; | |
351 | { | |
352 | Mutex::Locker l(m_lock); | |
353 | m_cond.Signal(); | |
354 | } | |
355 | if (m_pool_replayer_thread.is_started()) { | |
356 | m_pool_replayer_thread.join(); | |
357 | } | |
358 | if (m_leader_watcher) { | |
359 | m_leader_watcher->shut_down(); | |
d2e6a577 | 360 | m_leader_watcher.reset(); |
c07f9fc5 FG |
361 | } |
362 | if (m_instance_watcher) { | |
363 | m_instance_watcher->shut_down(); | |
d2e6a577 | 364 | m_instance_watcher.reset(); |
c07f9fc5 FG |
365 | } |
366 | if (m_instance_replayer) { | |
367 | m_instance_replayer->shut_down(); | |
d2e6a577 | 368 | m_instance_replayer.reset(); |
c07f9fc5 FG |
369 | } |
370 | ||
371 | assert(!m_local_pool_watcher); | |
372 | assert(!m_remote_pool_watcher); | |
373 | m_local_rados.reset(); | |
374 | m_remote_rados.reset(); | |
7c673cae FG |
375 | } |
376 | ||
377 | int PoolReplayer::init_rados(const std::string &cluster_name, | |
378 | const std::string &client_name, | |
379 | const std::string &description, | |
380 | RadosRef *rados_ref) { | |
381 | rados_ref->reset(new librados::Rados()); | |
382 | ||
383 | // NOTE: manually bootstrap a CephContext here instead of via | |
384 | // the librados API to avoid mixing global singletons between | |
385 | // the librados shared library and the daemon | |
386 | // TODO: eliminate intermingling of global singletons within Ceph APIs | |
387 | CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT); | |
388 | if (client_name.empty() || !iparams.name.from_str(client_name)) { | |
389 | derr << "error initializing cluster handle for " << description << dendl; | |
390 | return -EINVAL; | |
391 | } | |
392 | ||
393 | CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, | |
394 | CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); | |
395 | cct->_conf->cluster = cluster_name; | |
396 | ||
397 | // librados::Rados::conf_read_file | |
398 | int r = cct->_conf->parse_config_files(nullptr, nullptr, 0); | |
399 | if (r < 0) { | |
400 | derr << "could not read ceph conf for " << description << ": " | |
401 | << cpp_strerror(r) << dendl; | |
402 | cct->put(); | |
403 | return r; | |
404 | } | |
405 | cct->_conf->parse_env(); | |
406 | ||
407 | // librados::Rados::conf_parse_env | |
408 | std::vector<const char*> args; | |
409 | env_to_vec(args, nullptr); | |
410 | r = cct->_conf->parse_argv(args); | |
411 | if (r < 0) { | |
412 | derr << "could not parse environment for " << description << ":" | |
413 | << cpp_strerror(r) << dendl; | |
414 | cct->put(); | |
415 | return r; | |
416 | } | |
417 | ||
418 | if (!m_args.empty()) { | |
419 | // librados::Rados::conf_parse_argv | |
420 | args = m_args; | |
421 | r = cct->_conf->parse_argv(args); | |
422 | if (r < 0) { | |
423 | derr << "could not parse command line args for " << description << ": " | |
424 | << cpp_strerror(r) << dendl; | |
425 | cct->put(); | |
426 | return r; | |
427 | } | |
428 | } | |
429 | ||
430 | if (!g_ceph_context->_conf->admin_socket.empty()) { | |
431 | cct->_conf->set_val_or_die("admin_socket", | |
432 | "$run_dir/$name.$pid.$cluster.$cctid.asok"); | |
433 | } | |
434 | ||
435 | // disable unnecessary librbd cache | |
436 | cct->_conf->set_val_or_die("rbd_cache", "false"); | |
437 | cct->_conf->apply_changes(nullptr); | |
438 | cct->_conf->complain_about_parse_errors(cct); | |
439 | ||
440 | r = (*rados_ref)->init_with_context(cct); | |
441 | assert(r == 0); | |
442 | cct->put(); | |
443 | ||
444 | r = (*rados_ref)->connect(); | |
445 | if (r < 0) { | |
446 | derr << "error connecting to " << description << ": " | |
447 | << cpp_strerror(r) << dendl; | |
448 | return r; | |
449 | } | |
450 | ||
451 | return 0; | |
452 | } | |
453 | ||
454 | void PoolReplayer::run() | |
455 | { | |
456 | dout(20) << "enter" << dendl; | |
457 | ||
458 | while (!m_stopping) { | |
459 | std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " + | |
460 | m_peer.cluster_name; | |
461 | if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) { | |
462 | m_asok_hook_name = asok_hook_name; | |
463 | delete m_asok_hook; | |
464 | ||
465 | m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context, | |
466 | m_asok_hook_name, this); | |
467 | } | |
468 | ||
469 | Mutex::Locker locker(m_lock); | |
470 | if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) || | |
471 | (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) { | |
472 | m_blacklisted = true; | |
473 | m_stopping = true; | |
474 | break; | |
475 | } | |
476 | ||
d2e6a577 FG |
477 | if (!m_stopping) { |
478 | m_cond.WaitInterval(m_lock, utime_t(1, 0)); | |
479 | } | |
7c673cae FG |
480 | } |
481 | } | |
482 | ||
483 | void PoolReplayer::print_status(Formatter *f, stringstream *ss) | |
484 | { | |
485 | dout(20) << "enter" << dendl; | |
486 | ||
487 | if (!f) { | |
488 | return; | |
489 | } | |
490 | ||
491 | Mutex::Locker l(m_lock); | |
492 | ||
493 | f->open_object_section("pool_replayer_status"); | |
494 | f->dump_string("pool", m_local_io_ctx.get_pool_name()); | |
495 | f->dump_stream("peer") << m_peer; | |
496 | f->dump_string("instance_id", m_instance_watcher->get_instance_id()); | |
497 | ||
498 | std::string leader_instance_id; | |
499 | m_leader_watcher->get_leader_instance_id(&leader_instance_id); | |
500 | f->dump_string("leader_instance_id", leader_instance_id); | |
501 | ||
502 | bool leader = m_leader_watcher->is_leader(); | |
503 | f->dump_bool("leader", leader); | |
504 | if (leader) { | |
505 | std::vector<std::string> instance_ids; | |
506 | m_leader_watcher->list_instances(&instance_ids); | |
507 | f->open_array_section("instances"); | |
508 | for (auto instance_id : instance_ids) { | |
509 | f->dump_string("instance_id", instance_id); | |
510 | } | |
511 | f->close_section(); | |
512 | } | |
513 | ||
514 | f->dump_string("local_cluster_admin_socket", | |
515 | reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf-> | |
516 | admin_socket); | |
517 | f->dump_string("remote_cluster_admin_socket", | |
518 | reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf-> | |
519 | admin_socket); | |
520 | ||
31f18b77 FG |
521 | f->open_object_section("sync_throttler"); |
522 | m_instance_watcher->print_sync_status(f, ss); | |
523 | f->close_section(); | |
524 | ||
7c673cae FG |
525 | m_instance_replayer->print_status(f, ss); |
526 | ||
527 | f->close_section(); | |
528 | f->flush(*ss); | |
529 | } | |
530 | ||
531 | void PoolReplayer::start() | |
532 | { | |
533 | dout(20) << "enter" << dendl; | |
534 | ||
535 | Mutex::Locker l(m_lock); | |
536 | ||
537 | if (m_stopping) { | |
538 | return; | |
539 | } | |
540 | ||
541 | m_instance_replayer->start(); | |
542 | } | |
543 | ||
544 | void PoolReplayer::stop(bool manual) | |
545 | { | |
546 | dout(20) << "enter: manual=" << manual << dendl; | |
547 | ||
548 | Mutex::Locker l(m_lock); | |
549 | if (!manual) { | |
550 | m_stopping = true; | |
551 | m_cond.Signal(); | |
552 | return; | |
553 | } else if (m_stopping) { | |
554 | return; | |
555 | } | |
556 | ||
557 | m_instance_replayer->stop(); | |
558 | } | |
559 | ||
560 | void PoolReplayer::restart() | |
561 | { | |
562 | dout(20) << "enter" << dendl; | |
563 | ||
564 | Mutex::Locker l(m_lock); | |
565 | ||
566 | if (m_stopping) { | |
567 | return; | |
568 | } | |
569 | ||
570 | m_instance_replayer->restart(); | |
571 | } | |
572 | ||
573 | void PoolReplayer::flush() | |
574 | { | |
575 | dout(20) << "enter" << dendl; | |
576 | ||
577 | Mutex::Locker l(m_lock); | |
578 | ||
579 | if (m_stopping || m_manual_stop) { | |
580 | return; | |
581 | } | |
582 | ||
583 | m_instance_replayer->flush(); | |
584 | } | |
585 | ||
586 | void PoolReplayer::release_leader() | |
587 | { | |
588 | dout(20) << "enter" << dendl; | |
589 | ||
590 | Mutex::Locker l(m_lock); | |
591 | ||
592 | if (m_stopping || !m_leader_watcher) { | |
593 | return; | |
594 | } | |
595 | ||
596 | m_leader_watcher->release_leader(); | |
597 | } | |
598 | ||
599 | void PoolReplayer::handle_update(const std::string &mirror_uuid, | |
600 | ImageIds &&added_image_ids, | |
601 | ImageIds &&removed_image_ids) { | |
602 | if (m_stopping) { | |
603 | return; | |
604 | } | |
605 | ||
606 | dout(10) << "mirror_uuid=" << mirror_uuid << ", " | |
607 | << "added_count=" << added_image_ids.size() << ", " | |
608 | << "removed_count=" << removed_image_ids.size() << dendl; | |
609 | Mutex::Locker locker(m_lock); | |
610 | if (!m_leader_watcher->is_leader()) { | |
611 | return; | |
612 | } | |
613 | ||
c07f9fc5 FG |
614 | m_service_daemon->add_or_update_attribute( |
615 | m_local_pool_id, SERVICE_DAEMON_LOCAL_COUNT_KEY, | |
616 | m_local_pool_watcher->get_image_count()); | |
617 | if (m_remote_pool_watcher) { | |
618 | m_service_daemon->add_or_update_attribute( | |
619 | m_local_pool_id, SERVICE_DAEMON_REMOTE_COUNT_KEY, | |
620 | m_remote_pool_watcher->get_image_count()); | |
621 | } | |
622 | ||
7c673cae FG |
623 | m_update_op_tracker.start_op(); |
624 | Context *ctx = new FunctionContext([this](int r) { | |
625 | dout(20) << "complete handle_update: r=" << r << dendl; | |
626 | m_update_op_tracker.finish_op(); | |
627 | }); | |
628 | ||
629 | C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); | |
630 | ||
631 | for (auto &image_id : added_image_ids) { | |
632 | // for now always send to myself (the leader) | |
633 | std::string &instance_id = m_instance_watcher->get_instance_id(); | |
634 | m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id, | |
7c673cae FG |
635 | gather_ctx->new_sub()); |
636 | } | |
637 | ||
d2e6a577 FG |
638 | if (!mirror_uuid.empty()) { |
639 | for (auto &image_id : removed_image_ids) { | |
640 | // for now always send to myself (the leader) | |
641 | std::string &instance_id = m_instance_watcher->get_instance_id(); | |
642 | m_instance_watcher->notify_peer_image_removed(instance_id, | |
643 | image_id.global_id, | |
644 | mirror_uuid, | |
645 | gather_ctx->new_sub()); | |
646 | } | |
c07f9fc5 FG |
647 | } |
648 | ||
7c673cae FG |
649 | gather_ctx->activate(); |
650 | } | |
651 | ||
652 | void PoolReplayer::handle_post_acquire_leader(Context *on_finish) { | |
653 | dout(20) << dendl; | |
31f18b77 | 654 | |
c07f9fc5 FG |
655 | m_service_daemon->add_or_update_attribute(m_local_pool_id, |
656 | SERVICE_DAEMON_LEADER_KEY, true); | |
31f18b77 | 657 | m_instance_watcher->handle_acquire_leader(); |
7c673cae FG |
658 | init_local_pool_watcher(on_finish); |
659 | } | |
660 | ||
661 | void PoolReplayer::handle_pre_release_leader(Context *on_finish) { | |
662 | dout(20) << dendl; | |
31f18b77 | 663 | |
c07f9fc5 | 664 | m_service_daemon->remove_attribute(m_local_pool_id, SERVICE_DAEMON_LEADER_KEY); |
31f18b77 | 665 | m_instance_watcher->handle_release_leader(); |
7c673cae FG |
666 | shut_down_pool_watchers(on_finish); |
667 | } | |
668 | ||
669 | void PoolReplayer::init_local_pool_watcher(Context *on_finish) { | |
670 | dout(20) << dendl; | |
671 | ||
672 | Mutex::Locker locker(m_lock); | |
673 | assert(!m_local_pool_watcher); | |
674 | m_local_pool_watcher.reset(new PoolWatcher<>( | |
675 | m_threads, m_local_io_ctx, m_local_pool_watcher_listener)); | |
7c673cae FG |
676 | |
677 | // ensure the initial set of local images is up-to-date | |
678 | // after acquiring the leader role | |
679 | auto ctx = new FunctionContext([this, on_finish](int r) { | |
680 | handle_init_local_pool_watcher(r, on_finish); | |
681 | }); | |
682 | m_local_pool_watcher->init(create_async_context_callback( | |
683 | m_threads->work_queue, ctx)); | |
684 | } | |
685 | ||
686 | void PoolReplayer::handle_init_local_pool_watcher(int r, Context *on_finish) { | |
687 | dout(20) << "r=" << r << dendl; | |
688 | if (r < 0) { | |
689 | derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl; | |
690 | on_finish->complete(r); | |
691 | return; | |
692 | } | |
693 | ||
694 | init_remote_pool_watcher(on_finish); | |
695 | } | |
696 | ||
697 | void PoolReplayer::init_remote_pool_watcher(Context *on_finish) { | |
698 | dout(20) << dendl; | |
699 | ||
700 | Mutex::Locker locker(m_lock); | |
701 | assert(!m_remote_pool_watcher); | |
702 | m_remote_pool_watcher.reset(new PoolWatcher<>( | |
703 | m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener)); | |
704 | m_remote_pool_watcher->init(create_async_context_callback( | |
705 | m_threads->work_queue, on_finish)); | |
706 | ||
707 | m_cond.Signal(); | |
708 | } | |
709 | ||
710 | void PoolReplayer::shut_down_pool_watchers(Context *on_finish) { | |
711 | dout(20) << dendl; | |
712 | ||
713 | { | |
714 | Mutex::Locker locker(m_lock); | |
715 | if (m_local_pool_watcher) { | |
716 | Context *ctx = new FunctionContext([this, on_finish](int r) { | |
717 | handle_shut_down_pool_watchers(r, on_finish); | |
718 | }); | |
719 | ctx = create_async_context_callback(m_threads->work_queue, ctx); | |
720 | ||
721 | auto gather_ctx = new C_Gather(g_ceph_context, ctx); | |
722 | m_local_pool_watcher->shut_down(gather_ctx->new_sub()); | |
723 | if (m_remote_pool_watcher) { | |
724 | m_remote_pool_watcher->shut_down(gather_ctx->new_sub()); | |
725 | } | |
726 | gather_ctx->activate(); | |
727 | return; | |
728 | } | |
729 | } | |
730 | ||
731 | on_finish->complete(0); | |
732 | } | |
733 | ||
734 | void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) { | |
735 | dout(20) << "r=" << r << dendl; | |
736 | ||
737 | { | |
738 | Mutex::Locker locker(m_lock); | |
739 | assert(m_local_pool_watcher); | |
740 | m_local_pool_watcher.reset(); | |
741 | ||
742 | if (m_remote_pool_watcher) { | |
743 | m_remote_pool_watcher.reset(); | |
744 | } | |
745 | } | |
746 | wait_for_update_ops(on_finish); | |
747 | } | |
748 | ||
749 | void PoolReplayer::wait_for_update_ops(Context *on_finish) { | |
750 | dout(20) << dendl; | |
751 | ||
752 | Mutex::Locker locker(m_lock); | |
753 | ||
754 | Context *ctx = new FunctionContext([this, on_finish](int r) { | |
755 | handle_wait_for_update_ops(r, on_finish); | |
756 | }); | |
757 | ctx = create_async_context_callback(m_threads->work_queue, ctx); | |
758 | ||
759 | m_update_op_tracker.wait_for_ops(ctx); | |
760 | } | |
761 | ||
762 | void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) { | |
763 | dout(20) << "r=" << r << dendl; | |
764 | ||
765 | assert(r == 0); | |
766 | ||
767 | Mutex::Locker locker(m_lock); | |
768 | m_instance_replayer->release_all(on_finish); | |
769 | } | |
770 | ||
31f18b77 FG |
771 | void PoolReplayer::handle_update_leader(const std::string &leader_instance_id) { |
772 | dout(20) << "leader_instance_id=" << leader_instance_id << dendl; | |
773 | ||
774 | m_instance_watcher->handle_update_leader(leader_instance_id); | |
775 | } | |
776 | ||
7c673cae FG |
777 | } // namespace mirror |
778 | } // namespace rbd |