]>
Commit | Line | Data |
---|---|---|
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- | |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "include/compat.h" | |
5 | #include "common/Formatter.h" | |
6 | #include "common/admin_socket.h" | |
7 | #include "common/debug.h" | |
8 | #include "common/errno.h" | |
9 | #include "include/stringify.h" | |
10 | #include "cls/rbd/cls_rbd_client.h" | |
11 | #include "common/Timer.h" | |
12 | #include "global/global_context.h" | |
13 | #include "journal/Journaler.h" | |
14 | #include "librbd/ExclusiveLock.h" | |
15 | #include "librbd/ImageCtx.h" | |
16 | #include "librbd/ImageState.h" | |
17 | #include "librbd/Journal.h" | |
18 | #include "librbd/Operations.h" | |
19 | #include "librbd/Utils.h" | |
20 | #include "librbd/asio/ContextWQ.h" | |
21 | #include "ImageDeleter.h" | |
22 | #include "ImageReplayer.h" | |
23 | #include "MirrorStatusUpdater.h" | |
24 | #include "Threads.h" | |
25 | #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h" | |
26 | #include "tools/rbd_mirror/image_replayer/ReplayerListener.h" | |
27 | #include "tools/rbd_mirror/image_replayer/StateBuilder.h" | |
28 | #include "tools/rbd_mirror/image_replayer/Utils.h" | |
29 | #include "tools/rbd_mirror/image_replayer/journal/Replayer.h" | |
30 | #include "tools/rbd_mirror/image_replayer/journal/StateBuilder.h" | |
31 | #include <map> | |
32 | ||
33 | #define dout_context g_ceph_context | |
34 | #define dout_subsys ceph_subsys_rbd_mirror | |
35 | #undef dout_prefix | |
36 | #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \ | |
37 | << __func__ << ": " | |
38 | ||
39 | namespace rbd { | |
40 | namespace mirror { | |
41 | ||
42 | using librbd::util::create_context_callback; | |
43 | ||
44 | template <typename I> | |
45 | std::ostream &operator<<(std::ostream &os, | |
46 | const typename ImageReplayer<I>::State &state); | |
47 | ||
48 | namespace { | |
49 | ||
50 | template <typename I> | |
51 | class ImageReplayerAdminSocketCommand { | |
52 | public: | |
53 | ImageReplayerAdminSocketCommand(const std::string &desc, | |
54 | ImageReplayer<I> *replayer) | |
55 | : desc(desc), replayer(replayer) { | |
56 | } | |
57 | virtual ~ImageReplayerAdminSocketCommand() {} | |
58 | virtual int call(Formatter *f) = 0; | |
59 | ||
60 | std::string desc; | |
61 | ImageReplayer<I> *replayer; | |
62 | bool registered = false; | |
63 | }; | |
64 | ||
65 | template <typename I> | |
66 | class StatusCommand : public ImageReplayerAdminSocketCommand<I> { | |
67 | public: | |
68 | explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer) | |
69 | : ImageReplayerAdminSocketCommand<I>(desc, replayer) { | |
70 | } | |
71 | ||
72 | int call(Formatter *f) override { | |
73 | this->replayer->print_status(f); | |
74 | return 0; | |
75 | } | |
76 | }; | |
77 | ||
78 | template <typename I> | |
79 | class StartCommand : public ImageReplayerAdminSocketCommand<I> { | |
80 | public: | |
81 | explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer) | |
82 | : ImageReplayerAdminSocketCommand<I>(desc, replayer) { | |
83 | } | |
84 | ||
85 | int call(Formatter *f) override { | |
86 | this->replayer->start(nullptr, true); | |
87 | return 0; | |
88 | } | |
89 | }; | |
90 | ||
91 | template <typename I> | |
92 | class StopCommand : public ImageReplayerAdminSocketCommand<I> { | |
93 | public: | |
94 | explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer) | |
95 | : ImageReplayerAdminSocketCommand<I>(desc, replayer) { | |
96 | } | |
97 | ||
98 | int call(Formatter *f) override { | |
99 | this->replayer->stop(nullptr, true); | |
100 | return 0; | |
101 | } | |
102 | }; | |
103 | ||
104 | template <typename I> | |
105 | class RestartCommand : public ImageReplayerAdminSocketCommand<I> { | |
106 | public: | |
107 | explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer) | |
108 | : ImageReplayerAdminSocketCommand<I>(desc, replayer) { | |
109 | } | |
110 | ||
111 | int call(Formatter *f) override { | |
112 | this->replayer->restart(); | |
113 | return 0; | |
114 | } | |
115 | }; | |
116 | ||
117 | template <typename I> | |
118 | class FlushCommand : public ImageReplayerAdminSocketCommand<I> { | |
119 | public: | |
120 | explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer) | |
121 | : ImageReplayerAdminSocketCommand<I>(desc, replayer) { | |
122 | } | |
123 | ||
124 | int call(Formatter *f) override { | |
125 | this->replayer->flush(); | |
126 | return 0; | |
127 | } | |
128 | }; | |
129 | ||
130 | template <typename I> | |
131 | class ImageReplayerAdminSocketHook : public AdminSocketHook { | |
132 | public: | |
133 | ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name, | |
134 | ImageReplayer<I> *replayer) | |
135 | : admin_socket(cct->get_admin_socket()), | |
136 | commands{{"rbd mirror flush " + name, | |
137 | new FlushCommand<I>("flush rbd mirror " + name, replayer)}, | |
138 | {"rbd mirror restart " + name, | |
139 | new RestartCommand<I>("restart rbd mirror " + name, replayer)}, | |
140 | {"rbd mirror start " + name, | |
141 | new StartCommand<I>("start rbd mirror " + name, replayer)}, | |
142 | {"rbd mirror status " + name, | |
143 | new StatusCommand<I>("get status for rbd mirror " + name, replayer)}, | |
144 | {"rbd mirror stop " + name, | |
145 | new StopCommand<I>("stop rbd mirror " + name, replayer)}} { | |
146 | } | |
147 | ||
148 | int register_commands() { | |
149 | for (auto &it : commands) { | |
150 | int r = admin_socket->register_command(it.first, this, | |
151 | it.second->desc); | |
152 | if (r < 0) { | |
153 | return r; | |
154 | } | |
155 | it.second->registered = true; | |
156 | } | |
157 | return 0; | |
158 | } | |
159 | ||
160 | ~ImageReplayerAdminSocketHook() override { | |
161 | admin_socket->unregister_commands(this); | |
162 | for (auto &it : commands) { | |
163 | delete it.second; | |
164 | } | |
165 | commands.clear(); | |
166 | } | |
167 | ||
168 | int call(std::string_view command, const cmdmap_t& cmdmap, | |
169 | Formatter *f, | |
170 | std::ostream& errss, | |
171 | bufferlist& out) override { | |
172 | auto i = commands.find(command); | |
173 | ceph_assert(i != commands.end()); | |
174 | return i->second->call(f); | |
175 | } | |
176 | ||
177 | private: | |
178 | typedef std::map<std::string, ImageReplayerAdminSocketCommand<I>*, | |
179 | std::less<>> Commands; | |
180 | ||
181 | AdminSocket *admin_socket; | |
182 | Commands commands; | |
183 | }; | |
184 | ||
185 | } // anonymous namespace | |
186 | ||
187 | template <typename I> | |
188 | void ImageReplayer<I>::BootstrapProgressContext::update_progress( | |
189 | const std::string &description, bool flush) | |
190 | { | |
191 | const std::string desc = "bootstrapping, " + description; | |
192 | replayer->set_state_description(0, desc); | |
193 | if (flush) { | |
194 | replayer->update_mirror_image_status(false, boost::none); | |
195 | } | |
196 | } | |
197 | ||
198 | template <typename I> | |
199 | struct ImageReplayer<I>::ReplayerListener | |
200 | : public image_replayer::ReplayerListener { | |
201 | ImageReplayer<I>* image_replayer; | |
202 | ||
203 | ReplayerListener(ImageReplayer<I>* image_replayer) | |
204 | : image_replayer(image_replayer) { | |
205 | } | |
206 | ||
207 | void handle_notification() override { | |
208 | image_replayer->handle_replayer_notification(); | |
209 | } | |
210 | }; | |
211 | ||
212 | template <typename I> | |
213 | ImageReplayer<I>::ImageReplayer( | |
214 | librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid, | |
215 | const std::string &global_image_id, Threads<I> *threads, | |
216 | InstanceWatcher<I> *instance_watcher, | |
217 | MirrorStatusUpdater<I>* local_status_updater, | |
218 | journal::CacheManagerHandler *cache_manager_handler, | |
219 | PoolMetaCache* pool_meta_cache) : | |
220 | m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid), | |
221 | m_global_image_id(global_image_id), m_threads(threads), | |
222 | m_instance_watcher(instance_watcher), | |
223 | m_local_status_updater(local_status_updater), | |
224 | m_cache_manager_handler(cache_manager_handler), | |
225 | m_pool_meta_cache(pool_meta_cache), | |
226 | m_local_image_name(global_image_id), | |
227 | m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " + | |
228 | stringify(local_io_ctx.get_id()) + " " + global_image_id)), | |
229 | m_progress_cxt(this), | |
230 | m_replayer_listener(new ReplayerListener(this)) | |
231 | { | |
232 | // Register asok commands using a temporary "remote_pool_name/global_image_id" | |
233 | // name. When the image name becomes known on start the asok commands will be | |
234 | // re-registered using "remote_pool_name/remote_image_name" name. | |
235 | ||
236 | m_image_spec = image_replayer::util::compute_image_spec( | |
237 | local_io_ctx, global_image_id); | |
238 | register_admin_socket_hook(); | |
239 | } | |
240 | ||
241 | template <typename I> | |
242 | ImageReplayer<I>::~ImageReplayer() | |
243 | { | |
244 | unregister_admin_socket_hook(); | |
245 | ceph_assert(m_state_builder == nullptr); | |
246 | ceph_assert(m_on_start_finish == nullptr); | |
247 | ceph_assert(m_on_stop_contexts.empty()); | |
248 | ceph_assert(m_bootstrap_request == nullptr); | |
249 | ceph_assert(m_update_status_task == nullptr); | |
250 | delete m_replayer_listener; | |
251 | } | |
252 | ||
253 | template <typename I> | |
254 | image_replayer::HealthState ImageReplayer<I>::get_health_state() const { | |
255 | std::lock_guard locker{m_lock}; | |
256 | ||
257 | if (!m_mirror_image_status_state) { | |
258 | return image_replayer::HEALTH_STATE_OK; | |
259 | } else if (*m_mirror_image_status_state == | |
260 | cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING || | |
261 | *m_mirror_image_status_state == | |
262 | cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) { | |
263 | return image_replayer::HEALTH_STATE_WARNING; | |
264 | } | |
265 | return image_replayer::HEALTH_STATE_ERROR; | |
266 | } | |
267 | ||
268 | template <typename I> | |
269 | void ImageReplayer<I>::add_peer(const Peer<I>& peer) { | |
270 | dout(10) << "peer=" << peer << dendl; | |
271 | ||
272 | std::lock_guard locker{m_lock}; | |
273 | auto it = m_peers.find(peer); | |
274 | if (it == m_peers.end()) { | |
275 | m_peers.insert(peer); | |
276 | } | |
277 | } | |
278 | ||
279 | template <typename I> | |
280 | void ImageReplayer<I>::set_state_description(int r, const std::string &desc) { | |
281 | dout(10) << "r=" << r << ", desc=" << desc << dendl; | |
282 | ||
283 | std::lock_guard l{m_lock}; | |
284 | m_last_r = r; | |
285 | m_state_desc = desc; | |
286 | } | |
287 | ||
288 | template <typename I> | |
289 | void ImageReplayer<I>::start(Context *on_finish, bool manual, bool restart) | |
290 | { | |
291 | dout(10) << "on_finish=" << on_finish << dendl; | |
292 | ||
293 | int r = 0; | |
294 | { | |
295 | std::lock_guard locker{m_lock}; | |
296 | if (!is_stopped_()) { | |
297 | derr << "already running" << dendl; | |
298 | r = -EINVAL; | |
299 | } else if (m_manual_stop && !manual) { | |
300 | dout(5) << "stopped manually, ignoring start without manual flag" | |
301 | << dendl; | |
302 | r = -EPERM; | |
303 | } else if (restart && !m_restart_requested) { | |
304 | dout(10) << "canceled restart" << dendl; | |
305 | r = -ECANCELED; | |
306 | } else { | |
307 | m_state = STATE_STARTING; | |
308 | m_last_r = 0; | |
309 | m_state_desc.clear(); | |
310 | m_manual_stop = false; | |
311 | m_delete_requested = false; | |
312 | m_restart_requested = false; | |
313 | m_status_removed = false; | |
314 | ||
315 | if (on_finish != nullptr) { | |
316 | ceph_assert(m_on_start_finish == nullptr); | |
317 | m_on_start_finish = on_finish; | |
318 | } | |
319 | ceph_assert(m_on_stop_contexts.empty()); | |
320 | } | |
321 | } | |
322 | ||
323 | if (r < 0) { | |
324 | if (on_finish) { | |
325 | on_finish->complete(r); | |
326 | } | |
327 | return; | |
328 | } | |
329 | ||
330 | bootstrap(); | |
331 | } | |
332 | ||
333 | template <typename I> | |
334 | void ImageReplayer<I>::bootstrap() { | |
335 | dout(10) << dendl; | |
336 | ||
337 | std::unique_lock locker{m_lock}; | |
338 | if (m_peers.empty()) { | |
339 | locker.unlock(); | |
340 | ||
341 | dout(5) << "no peer clusters" << dendl; | |
342 | on_start_fail(-ENOENT, "no peer clusters"); | |
343 | return; | |
344 | } | |
345 | ||
346 | // TODO need to support multiple remote images | |
347 | ceph_assert(!m_peers.empty()); | |
348 | m_remote_image_peer = *m_peers.begin(); | |
349 | ||
350 | if (on_start_interrupted(m_lock)) { | |
351 | return; | |
352 | } | |
353 | ||
354 | ceph_assert(m_state_builder == nullptr); | |
355 | auto ctx = create_context_callback< | |
356 | ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this); | |
357 | auto request = image_replayer::BootstrapRequest<I>::create( | |
358 | m_threads, m_local_io_ctx, m_remote_image_peer.io_ctx, m_instance_watcher, | |
359 | m_global_image_id, m_local_mirror_uuid, | |
360 | m_remote_image_peer.remote_pool_meta, m_cache_manager_handler, | |
361 | m_pool_meta_cache, &m_progress_cxt, &m_state_builder, &m_resync_requested, | |
362 | ctx); | |
363 | ||
364 | request->get(); | |
365 | m_bootstrap_request = request; | |
366 | locker.unlock(); | |
367 | ||
368 | update_mirror_image_status(false, boost::none); | |
369 | request->send(); | |
370 | } | |
371 | ||
372 | template <typename I> | |
373 | void ImageReplayer<I>::handle_bootstrap(int r) { | |
374 | dout(10) << "r=" << r << dendl; | |
375 | { | |
376 | std::lock_guard locker{m_lock}; | |
377 | m_bootstrap_request->put(); | |
378 | m_bootstrap_request = nullptr; | |
379 | } | |
380 | ||
381 | if (on_start_interrupted()) { | |
382 | return; | |
383 | } else if (r == -ENOMSG) { | |
384 | dout(5) << "local image is primary" << dendl; | |
385 | on_start_fail(0, "local image is primary"); | |
386 | return; | |
387 | } else if (r == -EREMOTEIO) { | |
388 | dout(5) << "remote image is non-primary" << dendl; | |
389 | on_start_fail(-EREMOTEIO, "remote image is non-primary"); | |
390 | return; | |
391 | } else if (r == -EEXIST) { | |
392 | on_start_fail(r, "split-brain detected"); | |
393 | return; | |
394 | } else if (r == -ENOLINK) { | |
395 | m_delete_requested = true; | |
396 | on_start_fail(0, "remote image no longer exists"); | |
397 | return; | |
398 | } else if (r == -ERESTART) { | |
399 | on_start_fail(r, "image in transient state, try again"); | |
400 | return; | |
401 | } else if (r < 0) { | |
402 | on_start_fail(r, "error bootstrapping replay"); | |
403 | return; | |
404 | } else if (m_resync_requested) { | |
405 | on_start_fail(0, "resync requested"); | |
406 | return; | |
407 | } | |
408 | ||
409 | start_replay(); | |
410 | } | |
411 | ||
412 | template <typename I> | |
413 | void ImageReplayer<I>::start_replay() { | |
414 | dout(10) << dendl; | |
415 | ||
416 | std::unique_lock locker{m_lock}; | |
417 | ceph_assert(m_replayer == nullptr); | |
418 | m_replayer = m_state_builder->create_replayer(m_threads, m_instance_watcher, | |
419 | m_local_mirror_uuid, | |
420 | m_pool_meta_cache, | |
421 | m_replayer_listener); | |
422 | ||
423 | auto ctx = create_context_callback< | |
424 | ImageReplayer<I>, &ImageReplayer<I>::handle_start_replay>(this); | |
425 | m_replayer->init(ctx); | |
426 | } | |
427 | ||
428 | template <typename I> | |
429 | void ImageReplayer<I>::handle_start_replay(int r) { | |
430 | dout(10) << "r=" << r << dendl; | |
431 | ||
432 | if (on_start_interrupted()) { | |
433 | return; | |
434 | } else if (r < 0) { | |
435 | std::string error_description = m_replayer->get_error_description(); | |
436 | if (r == -ENOTCONN && m_replayer->is_resync_requested()) { | |
437 | std::unique_lock locker{m_lock}; | |
438 | m_resync_requested = true; | |
439 | } | |
440 | ||
441 | // shut down not required if init failed | |
442 | m_replayer->destroy(); | |
443 | m_replayer = nullptr; | |
444 | ||
445 | derr << "error starting replay: " << cpp_strerror(r) << dendl; | |
446 | on_start_fail(r, error_description); | |
447 | return; | |
448 | } | |
449 | ||
450 | Context *on_finish = nullptr; | |
451 | { | |
452 | std::unique_lock locker{m_lock}; | |
453 | ceph_assert(m_state == STATE_STARTING); | |
454 | m_state = STATE_REPLAYING; | |
455 | std::swap(m_on_start_finish, on_finish); | |
456 | ||
457 | std::unique_lock timer_locker{m_threads->timer_lock}; | |
458 | schedule_update_mirror_image_replay_status(); | |
459 | } | |
460 | ||
461 | update_mirror_image_status(true, boost::none); | |
462 | if (on_replay_interrupted()) { | |
463 | if (on_finish != nullptr) { | |
464 | on_finish->complete(r); | |
465 | } | |
466 | return; | |
467 | } | |
468 | ||
469 | dout(10) << "start succeeded" << dendl; | |
470 | if (on_finish != nullptr) { | |
471 | dout(10) << "on finish complete, r=" << r << dendl; | |
472 | on_finish->complete(r); | |
473 | } | |
474 | } | |
475 | ||
476 | template <typename I> | |
477 | void ImageReplayer<I>::on_start_fail(int r, const std::string &desc) | |
478 | { | |
479 | dout(10) << "r=" << r << ", desc=" << desc << dendl; | |
480 | Context *ctx = new LambdaContext([this, r, desc](int _r) { | |
481 | { | |
482 | std::lock_guard locker{m_lock}; | |
483 | ceph_assert(m_state == STATE_STARTING); | |
484 | m_state = STATE_STOPPING; | |
485 | if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) { | |
486 | derr << "start failed: " << cpp_strerror(r) << dendl; | |
487 | } else { | |
488 | dout(10) << "start canceled" << dendl; | |
489 | } | |
490 | } | |
491 | ||
492 | set_state_description(r, desc); | |
493 | update_mirror_image_status(false, boost::none); | |
494 | shut_down(r); | |
495 | }); | |
496 | m_threads->work_queue->queue(ctx, 0); | |
497 | } | |
498 | ||
499 | template <typename I> | |
500 | bool ImageReplayer<I>::on_start_interrupted() { | |
501 | std::lock_guard locker{m_lock}; | |
502 | return on_start_interrupted(m_lock); | |
503 | } | |
504 | ||
505 | template <typename I> | |
506 | bool ImageReplayer<I>::on_start_interrupted(ceph::mutex& lock) { | |
507 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
508 | ceph_assert(m_state == STATE_STARTING); | |
509 | if (!m_stop_requested) { | |
510 | return false; | |
511 | } | |
512 | ||
513 | on_start_fail(-ECANCELED, ""); | |
514 | return true; | |
515 | } | |
516 | ||
517 | template <typename I> | |
518 | void ImageReplayer<I>::stop(Context *on_finish, bool manual, bool restart) | |
519 | { | |
520 | dout(10) << "on_finish=" << on_finish << ", manual=" << manual | |
521 | << ", restart=" << restart << dendl; | |
522 | ||
523 | image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr; | |
524 | bool shut_down_replay = false; | |
525 | bool is_stopped = false; | |
526 | { | |
527 | std::lock_guard locker{m_lock}; | |
528 | ||
529 | if (!is_running_()) { | |
530 | if (manual && !m_manual_stop) { | |
531 | dout(10) << "marking manual" << dendl; | |
532 | m_manual_stop = true; | |
533 | } | |
534 | if (!restart && m_restart_requested) { | |
535 | dout(10) << "canceling restart" << dendl; | |
536 | m_restart_requested = false; | |
537 | } | |
538 | if (is_stopped_()) { | |
539 | dout(10) << "already stopped" << dendl; | |
540 | is_stopped = true; | |
541 | } else { | |
542 | dout(10) << "joining in-flight stop" << dendl; | |
543 | if (on_finish != nullptr) { | |
544 | m_on_stop_contexts.push_back(on_finish); | |
545 | } | |
546 | } | |
547 | } else { | |
548 | if (m_state == STATE_STARTING) { | |
549 | dout(10) << "canceling start" << dendl; | |
550 | if (m_bootstrap_request != nullptr) { | |
551 | bootstrap_request = m_bootstrap_request; | |
552 | bootstrap_request->get(); | |
553 | } | |
554 | } else { | |
555 | dout(10) << "interrupting replay" << dendl; | |
556 | shut_down_replay = true; | |
557 | } | |
558 | ||
559 | ceph_assert(m_on_stop_contexts.empty()); | |
560 | if (on_finish != nullptr) { | |
561 | m_on_stop_contexts.push_back(on_finish); | |
562 | } | |
563 | m_stop_requested = true; | |
564 | m_manual_stop = manual; | |
565 | } | |
566 | } | |
567 | ||
568 | if (is_stopped) { | |
569 | if (on_finish) { | |
570 | on_finish->complete(-EINVAL); | |
571 | } | |
572 | return; | |
573 | } | |
574 | ||
575 | // avoid holding lock since bootstrap request will update status | |
576 | if (bootstrap_request != nullptr) { | |
577 | dout(10) << "canceling bootstrap" << dendl; | |
578 | bootstrap_request->cancel(); | |
579 | bootstrap_request->put(); | |
580 | } | |
581 | ||
582 | if (shut_down_replay) { | |
583 | on_stop_journal_replay(); | |
584 | } | |
585 | } | |
586 | ||
587 | template <typename I> | |
588 | void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc) | |
589 | { | |
590 | dout(10) << dendl; | |
591 | ||
592 | { | |
593 | std::lock_guard locker{m_lock}; | |
594 | if (m_state != STATE_REPLAYING) { | |
595 | // might be invoked multiple times while stopping | |
596 | return; | |
597 | } | |
598 | ||
599 | m_stop_requested = true; | |
600 | m_state = STATE_STOPPING; | |
601 | } | |
602 | ||
603 | cancel_update_mirror_image_replay_status(); | |
604 | set_state_description(r, desc); | |
605 | update_mirror_image_status(true, boost::none); | |
606 | shut_down(0); | |
607 | } | |
608 | ||
609 | template <typename I> | |
610 | void ImageReplayer<I>::restart(Context *on_finish) | |
611 | { | |
612 | { | |
613 | std::lock_guard locker{m_lock}; | |
614 | m_restart_requested = true; | |
615 | } | |
616 | ||
617 | auto ctx = new LambdaContext( | |
618 | [this, on_finish](int r) { | |
619 | if (r < 0) { | |
620 | // Try start anyway. | |
621 | } | |
622 | start(on_finish, true, true); | |
623 | }); | |
624 | stop(ctx, false, true); | |
625 | } | |
626 | ||
627 | template <typename I> | |
628 | void ImageReplayer<I>::flush() | |
629 | { | |
630 | C_SaferCond ctx; | |
631 | ||
632 | { | |
633 | std::unique_lock locker{m_lock}; | |
634 | if (m_state != STATE_REPLAYING) { | |
635 | return; | |
636 | } | |
637 | ||
638 | dout(10) << dendl; | |
639 | ceph_assert(m_replayer != nullptr); | |
640 | m_replayer->flush(&ctx); | |
641 | } | |
642 | ||
643 | int r = ctx.wait(); | |
644 | if (r >= 0) { | |
645 | update_mirror_image_status(false, boost::none); | |
646 | } | |
647 | } | |
648 | ||
649 | template <typename I> | |
650 | bool ImageReplayer<I>::on_replay_interrupted() | |
651 | { | |
652 | bool shut_down; | |
653 | { | |
654 | std::lock_guard locker{m_lock}; | |
655 | shut_down = m_stop_requested; | |
656 | } | |
657 | ||
658 | if (shut_down) { | |
659 | on_stop_journal_replay(); | |
660 | } | |
661 | return shut_down; | |
662 | } | |
663 | ||
664 | template <typename I> | |
665 | void ImageReplayer<I>::print_status(Formatter *f) | |
666 | { | |
667 | dout(10) << dendl; | |
668 | ||
669 | std::lock_guard l{m_lock}; | |
670 | ||
671 | f->open_object_section("image_replayer"); | |
672 | f->dump_string("name", m_image_spec); | |
673 | f->dump_string("state", to_string(m_state)); | |
674 | f->close_section(); | |
675 | } | |
676 | ||
677 | template <typename I> | |
678 | void ImageReplayer<I>::schedule_update_mirror_image_replay_status() { | |
679 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
680 | ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock)); | |
681 | if (m_state != STATE_REPLAYING) { | |
682 | return; | |
683 | } | |
684 | ||
685 | dout(10) << dendl; | |
686 | ||
687 | // periodically update the replaying status even if nothing changes | |
688 | // so that we can adjust our performance stats | |
689 | ceph_assert(m_update_status_task == nullptr); | |
690 | m_update_status_task = create_context_callback< | |
691 | ImageReplayer<I>, | |
692 | &ImageReplayer<I>::handle_update_mirror_image_replay_status>(this); | |
693 | m_threads->timer->add_event_after(10, m_update_status_task); | |
694 | } | |
695 | ||
696 | template <typename I> | |
697 | void ImageReplayer<I>::handle_update_mirror_image_replay_status(int r) { | |
698 | dout(10) << dendl; | |
699 | ||
700 | ceph_assert(ceph_mutex_is_locked_by_me(m_threads->timer_lock)); | |
701 | ||
702 | ceph_assert(m_update_status_task != nullptr); | |
703 | m_update_status_task = nullptr; | |
704 | ||
705 | auto ctx = new LambdaContext([this](int) { | |
706 | update_mirror_image_status(false, boost::none); | |
707 | ||
708 | std::unique_lock locker{m_lock}; | |
709 | std::unique_lock timer_locker{m_threads->timer_lock}; | |
710 | ||
711 | schedule_update_mirror_image_replay_status(); | |
712 | m_in_flight_op_tracker.finish_op(); | |
713 | }); | |
714 | ||
715 | m_in_flight_op_tracker.start_op(); | |
716 | m_threads->work_queue->queue(ctx, 0); | |
717 | } | |
718 | ||
719 | template <typename I> | |
720 | void ImageReplayer<I>::cancel_update_mirror_image_replay_status() { | |
721 | std::unique_lock timer_locker{m_threads->timer_lock}; | |
722 | if (m_update_status_task != nullptr) { | |
723 | dout(10) << dendl; | |
724 | ||
725 | if (m_threads->timer->cancel_event(m_update_status_task)) { | |
726 | m_update_status_task = nullptr; | |
727 | } | |
728 | } | |
729 | } | |
730 | ||
731 | template <typename I> | |
732 | void ImageReplayer<I>::update_mirror_image_status( | |
733 | bool force, const OptionalState &opt_state) { | |
734 | dout(15) << "force=" << force << ", " | |
735 | << "state=" << opt_state << dendl; | |
736 | ||
737 | { | |
738 | std::lock_guard locker{m_lock}; | |
739 | if (!force && !is_stopped_() && !is_running_()) { | |
740 | dout(15) << "shut down in-progress: ignoring update" << dendl; | |
741 | return; | |
742 | } | |
743 | } | |
744 | ||
745 | m_in_flight_op_tracker.start_op(); | |
746 | auto ctx = new LambdaContext( | |
747 | [this, force, opt_state](int r) { | |
748 | set_mirror_image_status_update(force, opt_state); | |
749 | }); | |
750 | m_threads->work_queue->queue(ctx, 0); | |
751 | } | |
752 | ||
753 | template <typename I> | |
754 | void ImageReplayer<I>::set_mirror_image_status_update( | |
755 | bool force, const OptionalState &opt_state) { | |
756 | dout(15) << "force=" << force << ", " | |
757 | << "state=" << opt_state << dendl; | |
758 | ||
759 | reregister_admin_socket_hook(); | |
760 | ||
761 | State state; | |
762 | std::string state_desc; | |
763 | int last_r; | |
764 | bool stopping_replay; | |
765 | ||
766 | auto mirror_image_status_state = boost::make_optional( | |
767 | false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN); | |
768 | image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr; | |
769 | { | |
770 | std::lock_guard locker{m_lock}; | |
771 | state = m_state; | |
772 | state_desc = m_state_desc; | |
773 | mirror_image_status_state = m_mirror_image_status_state; | |
774 | last_r = m_last_r; | |
775 | stopping_replay = (m_replayer != nullptr); | |
776 | ||
777 | if (m_bootstrap_request != nullptr) { | |
778 | bootstrap_request = m_bootstrap_request; | |
779 | bootstrap_request->get(); | |
780 | } | |
781 | } | |
782 | ||
783 | bool syncing = false; | |
784 | if (bootstrap_request != nullptr) { | |
785 | syncing = bootstrap_request->is_syncing(); | |
786 | bootstrap_request->put(); | |
787 | bootstrap_request = nullptr; | |
788 | } | |
789 | ||
790 | if (opt_state) { | |
791 | state = *opt_state; | |
792 | } | |
793 | ||
794 | cls::rbd::MirrorImageSiteStatus status; | |
795 | status.up = true; | |
796 | switch (state) { | |
797 | case STATE_STARTING: | |
798 | if (syncing) { | |
799 | status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING; | |
800 | status.description = state_desc.empty() ? "syncing" : state_desc; | |
801 | mirror_image_status_state = status.state; | |
802 | } else { | |
803 | status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY; | |
804 | status.description = "starting replay"; | |
805 | } | |
806 | break; | |
807 | case STATE_REPLAYING: | |
808 | status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING; | |
809 | { | |
810 | std::string desc; | |
811 | auto on_req_finish = new LambdaContext( | |
812 | [this, force](int r) { | |
813 | dout(15) << "replay status ready: r=" << r << dendl; | |
814 | if (r >= 0) { | |
815 | set_mirror_image_status_update(force, boost::none); | |
816 | } else if (r == -EAGAIN) { | |
817 | m_in_flight_op_tracker.finish_op(); | |
818 | } | |
819 | }); | |
820 | ||
821 | ceph_assert(m_replayer != nullptr); | |
822 | if (!m_replayer->get_replay_status(&desc, on_req_finish)) { | |
823 | dout(15) << "waiting for replay status" << dendl; | |
824 | return; | |
825 | } | |
826 | ||
827 | status.description = "replaying, " + desc; | |
828 | mirror_image_status_state = boost::make_optional( | |
829 | false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN); | |
830 | } | |
831 | break; | |
832 | case STATE_STOPPING: | |
833 | if (stopping_replay) { | |
834 | status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY; | |
835 | status.description = state_desc.empty() ? "stopping replay" : state_desc; | |
836 | break; | |
837 | } | |
838 | // FALLTHROUGH | |
839 | case STATE_STOPPED: | |
840 | if (last_r == -EREMOTEIO) { | |
841 | status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN; | |
842 | status.description = state_desc; | |
843 | mirror_image_status_state = status.state; | |
844 | } else if (last_r < 0 && last_r != -ECANCELED) { | |
845 | status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR; | |
846 | status.description = state_desc; | |
847 | mirror_image_status_state = status.state; | |
848 | } else { | |
849 | status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED; | |
850 | status.description = state_desc.empty() ? "stopped" : state_desc; | |
851 | mirror_image_status_state = boost::none; | |
852 | } | |
853 | break; | |
854 | default: | |
855 | ceph_assert(!"invalid state"); | |
856 | } | |
857 | ||
858 | { | |
859 | std::lock_guard locker{m_lock}; | |
860 | m_mirror_image_status_state = mirror_image_status_state; | |
861 | } | |
862 | ||
863 | // prevent the status from ping-ponging when failed replays are restarted | |
864 | if (mirror_image_status_state && | |
865 | *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) { | |
866 | status.state = *mirror_image_status_state; | |
867 | } | |
868 | ||
869 | dout(15) << "status=" << status << dendl; | |
870 | m_local_status_updater->set_mirror_image_status(m_global_image_id, status, | |
871 | force); | |
872 | if (m_remote_image_peer.mirror_status_updater != nullptr) { | |
873 | m_remote_image_peer.mirror_status_updater->set_mirror_image_status( | |
874 | m_global_image_id, status, force); | |
875 | } | |
876 | ||
877 | m_in_flight_op_tracker.finish_op(); | |
878 | } | |
879 | ||
880 | template <typename I> | |
881 | void ImageReplayer<I>::shut_down(int r) { | |
882 | dout(10) << "r=" << r << dendl; | |
883 | ||
884 | { | |
885 | std::lock_guard locker{m_lock}; | |
886 | ceph_assert(m_state == STATE_STOPPING); | |
887 | } | |
888 | ||
889 | if (!m_in_flight_op_tracker.empty()) { | |
890 | dout(15) << "waiting for in-flight operations to complete" << dendl; | |
891 | m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) { | |
892 | shut_down(r); | |
893 | })); | |
894 | return; | |
895 | } | |
896 | ||
897 | // chain the shut down sequence (reverse order) | |
898 | Context *ctx = new LambdaContext( | |
899 | [this, r](int _r) { | |
900 | update_mirror_image_status(true, STATE_STOPPED); | |
901 | handle_shut_down(r); | |
902 | }); | |
903 | ||
904 | // destruct the state builder | |
905 | if (m_state_builder != nullptr) { | |
906 | ctx = new LambdaContext([this, ctx](int r) { | |
907 | m_state_builder->close(ctx); | |
908 | }); | |
909 | } | |
910 | ||
911 | // close the replayer | |
912 | if (m_replayer != nullptr) { | |
913 | ctx = new LambdaContext([this, ctx](int r) { | |
914 | m_replayer->destroy(); | |
915 | m_replayer = nullptr; | |
916 | ctx->complete(0); | |
917 | }); | |
918 | ctx = new LambdaContext([this, ctx](int r) { | |
919 | m_replayer->shut_down(ctx); | |
920 | }); | |
921 | } | |
922 | ||
923 | m_threads->work_queue->queue(ctx, 0); | |
924 | } | |
925 | ||
926 | template <typename I> | |
927 | void ImageReplayer<I>::handle_shut_down(int r) { | |
928 | bool resync_requested = false; | |
929 | bool delete_requested = false; | |
930 | bool unregister_asok_hook = false; | |
931 | { | |
932 | std::lock_guard locker{m_lock}; | |
933 | ||
934 | if (m_delete_requested && m_state_builder != nullptr && | |
935 | !m_state_builder->local_image_id.empty()) { | |
936 | ceph_assert(m_state_builder->remote_image_id.empty()); | |
937 | dout(0) << "remote image no longer exists: scheduling deletion" << dendl; | |
938 | unregister_asok_hook = true; | |
939 | std::swap(delete_requested, m_delete_requested); | |
940 | m_delete_in_progress = true; | |
941 | } | |
942 | ||
943 | std::swap(resync_requested, m_resync_requested); | |
944 | if (!delete_requested && !resync_requested && m_last_r == -ENOENT && | |
945 | ((m_state_builder == nullptr) || | |
946 | (m_state_builder->local_image_id.empty() && | |
947 | m_state_builder->remote_image_id.empty()))) { | |
948 | dout(0) << "mirror image no longer exists" << dendl; | |
949 | unregister_asok_hook = true; | |
950 | m_finished = true; | |
951 | } | |
952 | } | |
953 | ||
954 | if (unregister_asok_hook) { | |
955 | unregister_admin_socket_hook(); | |
956 | } | |
957 | ||
958 | if (delete_requested || resync_requested) { | |
959 | dout(5) << "moving image to trash" << dendl; | |
960 | auto ctx = new LambdaContext([this, r](int) { | |
961 | handle_shut_down(r); | |
962 | }); | |
963 | ImageDeleter<I>::trash_move(m_local_io_ctx, m_global_image_id, | |
964 | resync_requested, m_threads->work_queue, ctx); | |
965 | return; | |
966 | } | |
967 | ||
968 | if (!m_in_flight_op_tracker.empty()) { | |
969 | dout(15) << "waiting for in-flight operations to complete" << dendl; | |
970 | m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) { | |
971 | handle_shut_down(r); | |
972 | })); | |
973 | return; | |
974 | } | |
975 | ||
976 | if (!m_status_removed) { | |
977 | auto ctx = new LambdaContext([this, r](int) { | |
978 | m_status_removed = true; | |
979 | handle_shut_down(r); | |
980 | }); | |
981 | remove_image_status(m_delete_in_progress, ctx); | |
982 | return; | |
983 | } | |
984 | ||
985 | if (m_state_builder != nullptr) { | |
986 | m_state_builder->destroy(); | |
987 | m_state_builder = nullptr; | |
988 | } | |
989 | ||
990 | dout(10) << "stop complete" << dendl; | |
991 | Context *on_start = nullptr; | |
992 | Contexts on_stop_contexts; | |
993 | { | |
994 | std::lock_guard locker{m_lock}; | |
995 | std::swap(on_start, m_on_start_finish); | |
996 | on_stop_contexts = std::move(m_on_stop_contexts); | |
997 | m_stop_requested = false; | |
998 | ceph_assert(m_state == STATE_STOPPING); | |
999 | m_state = STATE_STOPPED; | |
1000 | } | |
1001 | ||
1002 | if (on_start != nullptr) { | |
1003 | dout(10) << "on start finish complete, r=" << r << dendl; | |
1004 | on_start->complete(r); | |
1005 | r = 0; | |
1006 | } | |
1007 | for (auto ctx : on_stop_contexts) { | |
1008 | dout(10) << "on stop finish " << ctx << " complete, r=" << r << dendl; | |
1009 | ctx->complete(r); | |
1010 | } | |
1011 | } | |
1012 | ||
1013 | template <typename I> | |
1014 | void ImageReplayer<I>::handle_replayer_notification() { | |
1015 | dout(10) << dendl; | |
1016 | ||
1017 | std::unique_lock locker{m_lock}; | |
1018 | if (m_state != STATE_REPLAYING) { | |
1019 | // might be attempting to shut down | |
1020 | return; | |
1021 | } | |
1022 | ||
1023 | { | |
1024 | // detect a rename of the local image | |
1025 | ceph_assert(m_state_builder != nullptr && | |
1026 | m_state_builder->local_image_ctx != nullptr); | |
1027 | std::shared_lock image_locker{m_state_builder->local_image_ctx->image_lock}; | |
1028 | if (m_local_image_name != m_state_builder->local_image_ctx->name) { | |
1029 | // will re-register with new name after next status update | |
1030 | dout(10) << "image renamed" << dendl; | |
1031 | m_local_image_name = m_state_builder->local_image_ctx->name; | |
1032 | } | |
1033 | } | |
1034 | ||
1035 | // replayer cannot be shut down while notification is in-flight | |
1036 | ceph_assert(m_replayer != nullptr); | |
1037 | locker.unlock(); | |
1038 | ||
1039 | if (m_replayer->is_resync_requested()) { | |
1040 | dout(10) << "resync requested" << dendl; | |
1041 | m_resync_requested = true; | |
1042 | on_stop_journal_replay(0, "resync requested"); | |
1043 | return; | |
1044 | } | |
1045 | ||
1046 | if (!m_replayer->is_replaying()) { | |
1047 | auto error_code = m_replayer->get_error_code(); | |
1048 | auto error_description = m_replayer->get_error_description(); | |
1049 | dout(10) << "replay interrupted: " | |
1050 | << "r=" << error_code << ", " | |
1051 | << "error=" << error_description << dendl; | |
1052 | on_stop_journal_replay(error_code, error_description); | |
1053 | return; | |
1054 | } | |
1055 | ||
1056 | update_mirror_image_status(false, {}); | |
1057 | } | |
1058 | ||
1059 | template <typename I> | |
1060 | std::string ImageReplayer<I>::to_string(const State state) { | |
1061 | switch (state) { | |
1062 | case ImageReplayer<I>::STATE_STARTING: | |
1063 | return "Starting"; | |
1064 | case ImageReplayer<I>::STATE_REPLAYING: | |
1065 | return "Replaying"; | |
1066 | case ImageReplayer<I>::STATE_STOPPING: | |
1067 | return "Stopping"; | |
1068 | case ImageReplayer<I>::STATE_STOPPED: | |
1069 | return "Stopped"; | |
1070 | default: | |
1071 | break; | |
1072 | } | |
1073 | return "Unknown(" + stringify(state) + ")"; | |
1074 | } | |
1075 | ||
1076 | template <typename I> | |
1077 | void ImageReplayer<I>::register_admin_socket_hook() { | |
1078 | ImageReplayerAdminSocketHook<I> *asok_hook; | |
1079 | { | |
1080 | std::lock_guard locker{m_lock}; | |
1081 | if (m_asok_hook != nullptr) { | |
1082 | return; | |
1083 | } | |
1084 | ||
1085 | dout(15) << "registered asok hook: " << m_image_spec << dendl; | |
1086 | asok_hook = new ImageReplayerAdminSocketHook<I>( | |
1087 | g_ceph_context, m_image_spec, this); | |
1088 | int r = asok_hook->register_commands(); | |
1089 | if (r == 0) { | |
1090 | m_asok_hook = asok_hook; | |
1091 | return; | |
1092 | } | |
1093 | derr << "error registering admin socket commands" << dendl; | |
1094 | } | |
1095 | delete asok_hook; | |
1096 | } | |
1097 | ||
1098 | template <typename I> | |
1099 | void ImageReplayer<I>::unregister_admin_socket_hook() { | |
1100 | dout(15) << dendl; | |
1101 | ||
1102 | AdminSocketHook *asok_hook = nullptr; | |
1103 | { | |
1104 | std::lock_guard locker{m_lock}; | |
1105 | std::swap(asok_hook, m_asok_hook); | |
1106 | } | |
1107 | delete asok_hook; | |
1108 | } | |
1109 | ||
1110 | template <typename I> | |
1111 | void ImageReplayer<I>::reregister_admin_socket_hook() { | |
1112 | std::unique_lock locker{m_lock}; | |
1113 | if (m_state == STATE_STARTING && m_bootstrap_request != nullptr) { | |
1114 | m_local_image_name = m_bootstrap_request->get_local_image_name(); | |
1115 | } | |
1116 | ||
1117 | auto image_spec = image_replayer::util::compute_image_spec( | |
1118 | m_local_io_ctx, m_local_image_name); | |
1119 | if (m_asok_hook != nullptr && m_image_spec == image_spec) { | |
1120 | return; | |
1121 | } | |
1122 | ||
1123 | dout(15) << "old_image_spec=" << m_image_spec << ", " | |
1124 | << "new_image_spec=" << image_spec << dendl; | |
1125 | m_image_spec = image_spec; | |
1126 | ||
1127 | if (m_state == STATE_STOPPING || m_state == STATE_STOPPED) { | |
1128 | // no need to re-register if stopping | |
1129 | return; | |
1130 | } | |
1131 | locker.unlock(); | |
1132 | ||
1133 | unregister_admin_socket_hook(); | |
1134 | register_admin_socket_hook(); | |
1135 | } | |
1136 | ||
1137 | template <typename I> | |
1138 | void ImageReplayer<I>::remove_image_status(bool force, Context *on_finish) | |
1139 | { | |
1140 | auto ctx = new LambdaContext([this, force, on_finish](int) { | |
1141 | remove_image_status_remote(force, on_finish); | |
1142 | }); | |
1143 | ||
1144 | if (m_local_status_updater->exists(m_global_image_id)) { | |
1145 | dout(15) << "removing local mirror image status" << dendl; | |
1146 | if (force) { | |
1147 | m_local_status_updater->remove_mirror_image_status( | |
1148 | m_global_image_id, true, ctx); | |
1149 | } else { | |
1150 | m_local_status_updater->remove_refresh_mirror_image_status( | |
1151 | m_global_image_id, ctx); | |
1152 | } | |
1153 | return; | |
1154 | } | |
1155 | ||
1156 | ctx->complete(0); | |
1157 | } | |
1158 | ||
1159 | template <typename I> | |
1160 | void ImageReplayer<I>::remove_image_status_remote(bool force, Context *on_finish) | |
1161 | { | |
1162 | if (m_remote_image_peer.mirror_status_updater != nullptr && | |
1163 | m_remote_image_peer.mirror_status_updater->exists(m_global_image_id)) { | |
1164 | dout(15) << "removing remote mirror image status" << dendl; | |
1165 | if (force) { | |
1166 | m_remote_image_peer.mirror_status_updater->remove_mirror_image_status( | |
1167 | m_global_image_id, true, on_finish); | |
1168 | } else { | |
1169 | m_remote_image_peer.mirror_status_updater->remove_refresh_mirror_image_status( | |
1170 | m_global_image_id, on_finish); | |
1171 | } | |
1172 | return; | |
1173 | } | |
1174 | if (on_finish) { | |
1175 | on_finish->complete(0); | |
1176 | } | |
1177 | } | |
1178 | ||
1179 | template <typename I> | |
1180 | std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer) | |
1181 | { | |
1182 | os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id() | |
1183 | << "/" << replayer.get_global_image_id() << "]"; | |
1184 | return os; | |
1185 | } | |
1186 | ||
1187 | } // namespace mirror | |
1188 | } // namespace rbd | |
1189 | ||
1190 | template class rbd::mirror::ImageReplayer<librbd::ImageCtx>; |