]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/ImageMap.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / tools / rbd_mirror / ImageMap.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "common/debug.h"
5#include "common/errno.h"
6#include "common/Timer.h"
11fdf7f2
TL
7
8#include "librbd/Utils.h"
f67539c2 9#include "librbd/asio/ContextWQ.h"
11fdf7f2
TL
10#include "tools/rbd_mirror/Threads.h"
11
12#include "ImageMap.h"
13#include "image_map/LoadRequest.h"
14#include "image_map/SimplePolicy.h"
15#include "image_map/UpdateRequest.h"
16
17#define dout_context g_ceph_context
18#define dout_subsys ceph_subsys_rbd_mirror
19#undef dout_prefix
20#define dout_prefix *_dout << "rbd::mirror::ImageMap: " << this << " " \
21 << __func__ << ": "
22
20effc67
TL
23using namespace std;
24
11fdf7f2
TL
25namespace rbd {
26namespace mirror {
27
28using ::operator<<;
29using image_map::Policy;
30
31using librbd::util::unique_lock_name;
32using librbd::util::create_async_context_callback;
33
34template <typename I>
35struct ImageMap<I>::C_NotifyInstance : public Context {
36 ImageMap* image_map;
37 std::string global_image_id;
38 bool acquire_release;
39
40 C_NotifyInstance(ImageMap* image_map, const std::string& global_image_id,
41 bool acquire_release)
42 : image_map(image_map), global_image_id(global_image_id),
43 acquire_release(acquire_release) {
44 image_map->start_async_op();
45 }
46
47 void finish(int r) override {
48 if (acquire_release) {
49 image_map->handle_peer_ack(global_image_id, r);
50 } else {
51 image_map->handle_peer_ack_remove(global_image_id, r);
52 }
53 image_map->finish_async_op();
54 }
55};
56
57template <typename I>
58ImageMap<I>::ImageMap(librados::IoCtx &ioctx, Threads<I> *threads,
59 const std::string& instance_id,
60 image_map::Listener &listener)
61 : m_ioctx(ioctx), m_threads(threads), m_instance_id(instance_id),
62 m_listener(listener),
9f95a23c
TL
63 m_lock(ceph::make_mutex(
64 unique_lock_name("rbd::mirror::ImageMap::m_lock", this))) {
11fdf7f2
TL
65}
66
67template <typename I>
68ImageMap<I>::~ImageMap() {
69 ceph_assert(m_async_op_tracker.empty());
70 ceph_assert(m_timer_task == nullptr);
71 ceph_assert(m_rebalance_task == nullptr);
72}
73
74template <typename I>
75void ImageMap<I>::continue_action(const std::set<std::string> &global_image_ids,
76 int r) {
77 dout(20) << dendl;
78
79 {
9f95a23c 80 std::lock_guard locker{m_lock};
11fdf7f2
TL
81 if (m_shutting_down) {
82 return;
83 }
84
85 for (auto const &global_image_id : global_image_ids) {
86 bool schedule = m_policy->finish_action(global_image_id, r);
87 if (schedule) {
88 schedule_action(global_image_id);
89 }
90 }
91 }
92
93 schedule_update_task();
94}
95
96template <typename I>
97void ImageMap<I>::handle_update_request(
98 const Updates &updates,
99 const std::set<std::string> &remove_global_image_ids, int r) {
100 dout(20) << "r=" << r << dendl;
101
102 std::set<std::string> global_image_ids;
103
104 global_image_ids.insert(remove_global_image_ids.begin(),
105 remove_global_image_ids.end());
106 for (auto const &update : updates) {
107 global_image_ids.insert(update.global_image_id);
108 }
109
110 continue_action(global_image_ids, r);
111}
112
113template <typename I>
114void ImageMap<I>::update_image_mapping(Updates&& map_updates,
115 std::set<std::string>&& map_removals) {
116 if (map_updates.empty() && map_removals.empty()) {
117 return;
118 }
119
120 dout(5) << "updates=[" << map_updates << "], "
121 << "removes=[" << map_removals << "]" << dendl;
122
9f95a23c 123 Context *on_finish = new LambdaContext(
11fdf7f2
TL
124 [this, map_updates, map_removals](int r) {
125 handle_update_request(map_updates, map_removals, r);
126 finish_async_op();
127 });
128 on_finish = create_async_context_callback(m_threads->work_queue, on_finish);
129
130 // empty meta policy for now..
131 image_map::PolicyMetaNone policy_meta;
132
133 bufferlist bl;
134 encode(image_map::PolicyData(policy_meta), bl);
135
136 // prepare update map
137 std::map<std::string, cls::rbd::MirrorImageMap> update_mapping;
138 for (auto const &update : map_updates) {
139 update_mapping.emplace(
140 update.global_image_id, cls::rbd::MirrorImageMap(update.instance_id,
141 update.mapped_time, bl));
142 }
143
144 start_async_op();
145 image_map::UpdateRequest<I> *req = image_map::UpdateRequest<I>::create(
146 m_ioctx, std::move(update_mapping), std::move(map_removals), on_finish);
147 req->send();
148}
149
150template <typename I>
151void ImageMap<I>::process_updates() {
152 dout(20) << dendl;
153
9f95a23c 154 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
11fdf7f2
TL
155 ceph_assert(m_timer_task == nullptr);
156
157 Updates map_updates;
158 std::set<std::string> map_removals;
159 Updates acquire_updates;
160 Updates release_updates;
161
162 // gather updates by advancing the state machine
9f95a23c 163 m_lock.lock();
11fdf7f2
TL
164 for (auto const &global_image_id : m_global_image_ids) {
165 image_map::ActionType action_type =
166 m_policy->start_action(global_image_id);
167 image_map::LookupInfo info = m_policy->lookup(global_image_id);
168
169 dout(15) << "global_image_id=" << global_image_id << ", "
170 << "action=" << action_type << ", "
171 << "instance=" << info.instance_id << dendl;
172 switch (action_type) {
173 case image_map::ACTION_TYPE_NONE:
174 continue;
175 case image_map::ACTION_TYPE_MAP_UPDATE:
176 ceph_assert(info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
177 map_updates.emplace_back(global_image_id, info.instance_id,
178 info.mapped_time);
179 break;
180 case image_map::ACTION_TYPE_MAP_REMOVE:
181 map_removals.emplace(global_image_id);
182 break;
183 case image_map::ACTION_TYPE_ACQUIRE:
184 ceph_assert(info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
185 acquire_updates.emplace_back(global_image_id, info.instance_id);
186 break;
187 case image_map::ACTION_TYPE_RELEASE:
188 ceph_assert(info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
189 release_updates.emplace_back(global_image_id, info.instance_id);
190 break;
191 }
192 }
193 m_global_image_ids.clear();
9f95a23c 194 m_lock.unlock();
11fdf7f2
TL
195
196 // notify listener (acquire, release) and update on-disk map. note
197 // that its safe to process this outside m_lock as we still hold
198 // timer lock.
199 notify_listener_acquire_release_images(acquire_updates, release_updates);
200 update_image_mapping(std::move(map_updates), std::move(map_removals));
201}
202
203template <typename I>
204void ImageMap<I>::schedule_update_task() {
9f95a23c 205 std::lock_guard timer_lock{m_threads->timer_lock};
11fdf7f2
TL
206 schedule_update_task(m_threads->timer_lock);
207}
208
209template <typename I>
9f95a23c
TL
210void ImageMap<I>::schedule_update_task(const ceph::mutex &timer_lock) {
211 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
11fdf7f2
TL
212
213 schedule_rebalance_task();
214
215 if (m_timer_task != nullptr) {
216 return;
217 }
218
219 {
9f95a23c 220 std::lock_guard locker{m_lock};
11fdf7f2
TL
221 if (m_global_image_ids.empty()) {
222 return;
223 }
224 }
225
9f95a23c
TL
226 m_timer_task = new LambdaContext([this](int r) {
227 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
11fdf7f2
TL
228 m_timer_task = nullptr;
229
230 process_updates();
231 });
232
233 CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
234 double after = cct->_conf.get_val<double>("rbd_mirror_image_policy_update_throttle_interval");
235
236 dout(20) << "scheduling image check update (" << m_timer_task << ")"
237 << " after " << after << " second(s)" << dendl;
238 m_threads->timer->add_event_after(after, m_timer_task);
239}
240
241template <typename I>
242void ImageMap<I>::rebalance() {
243 ceph_assert(m_rebalance_task == nullptr);
244
245 {
9f95a23c 246 std::lock_guard locker{m_lock};
11fdf7f2
TL
247 if (m_async_op_tracker.empty() && m_global_image_ids.empty()){
248 dout(20) << "starting rebalance" << dendl;
249
250 std::set<std::string> remap_global_image_ids;
251 m_policy->add_instances({}, &remap_global_image_ids);
252
253 for (auto const &global_image_id : remap_global_image_ids) {
254 schedule_action(global_image_id);
255 }
256 }
257 }
258
259 schedule_update_task(m_threads->timer_lock);
260}
261
262template <typename I>
263void ImageMap<I>::schedule_rebalance_task() {
9f95a23c 264 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
11fdf7f2
TL
265
266 CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
267
268 // fetch the updated value of idle timeout for (re)scheduling
269 double resched_after = cct->_conf.get_val<double>(
270 "rbd_mirror_image_policy_rebalance_timeout");
271 if (!resched_after) {
272 return;
273 }
274
275 // cancel existing rebalance task if any before scheduling
276 if (m_rebalance_task != nullptr) {
277 m_threads->timer->cancel_event(m_rebalance_task);
278 }
279
9f95a23c
TL
280 m_rebalance_task = new LambdaContext([this](int _) {
281 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
11fdf7f2
TL
282 m_rebalance_task = nullptr;
283
284 rebalance();
285 });
286
287 dout(20) << "scheduling rebalance (" << m_rebalance_task << ")"
288 << " after " << resched_after << " second(s)" << dendl;
289 m_threads->timer->add_event_after(resched_after, m_rebalance_task);
290}
291
292template <typename I>
293void ImageMap<I>::schedule_action(const std::string &global_image_id) {
294 dout(20) << "global_image_id=" << global_image_id << dendl;
9f95a23c 295 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
296
297 m_global_image_ids.emplace(global_image_id);
298}
299
300template <typename I>
301void ImageMap<I>::notify_listener_acquire_release_images(
302 const Updates &acquire, const Updates &release) {
303 if (acquire.empty() && release.empty()) {
304 return;
305 }
306
307 dout(5) << "acquire=[" << acquire << "], "
308 << "release=[" << release << "]" << dendl;
309
310 for (auto const &update : acquire) {
311 m_listener.acquire_image(
312 update.global_image_id, update.instance_id,
313 create_async_context_callback(
314 m_threads->work_queue,
315 new C_NotifyInstance(this, update.global_image_id, true)));
316 }
317
318 for (auto const &update : release) {
319 m_listener.release_image(
320 update.global_image_id, update.instance_id,
321 create_async_context_callback(
322 m_threads->work_queue,
323 new C_NotifyInstance(this, update.global_image_id, true)));
324 }
325}
326
327template <typename I>
328void ImageMap<I>::notify_listener_remove_images(const std::string &peer_uuid,
329 const Updates &remove) {
330 dout(5) << "peer_uuid=" << peer_uuid << ", "
331 << "remove=[" << remove << "]" << dendl;
332
333 for (auto const &update : remove) {
334 m_listener.remove_image(
335 peer_uuid, update.global_image_id, update.instance_id,
336 create_async_context_callback(
337 m_threads->work_queue,
338 new C_NotifyInstance(this, update.global_image_id, false)));
339 }
340}
341
342template <typename I>
343void ImageMap<I>::handle_load(const std::map<std::string,
344 cls::rbd::MirrorImageMap> &image_mapping) {
345 dout(20) << dendl;
346
347 {
9f95a23c 348 std::lock_guard locker{m_lock};
11fdf7f2
TL
349 m_policy->init(image_mapping);
350
351 for (auto& pair : image_mapping) {
352 schedule_action(pair.first);
353 }
354 }
355 schedule_update_task();
356}
357
358template <typename I>
359void ImageMap<I>::handle_peer_ack_remove(const std::string &global_image_id,
360 int r) {
9f95a23c 361 std::lock_guard locker{m_lock};
11fdf7f2
TL
362 dout(5) << "global_image_id=" << global_image_id << dendl;
363
364 if (r < 0) {
365 derr << "failed to remove global_image_id=" << global_image_id << dendl;
366 }
367
368 auto peer_it = m_peer_map.find(global_image_id);
369 if (peer_it == m_peer_map.end()) {
370 return;
371 }
372
373 m_peer_map.erase(peer_it);
374}
375
376template <typename I>
377void ImageMap<I>::update_images_added(
378 const std::string &peer_uuid,
379 const std::set<std::string> &global_image_ids) {
380 dout(5) << "peer_uuid=" << peer_uuid << ", "
381 << "global_image_ids=[" << global_image_ids << "]" << dendl;
9f95a23c 382 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
383
384 for (auto const &global_image_id : global_image_ids) {
385 auto result = m_peer_map[global_image_id].insert(peer_uuid);
386 if (result.second && m_peer_map[global_image_id].size() == 1) {
387 if (m_policy->add_image(global_image_id)) {
388 schedule_action(global_image_id);
389 }
390 }
391 }
392}
393
394template <typename I>
395void ImageMap<I>::update_images_removed(
396 const std::string &peer_uuid,
397 const std::set<std::string> &global_image_ids) {
398 dout(5) << "peer_uuid=" << peer_uuid << ", "
399 << "global_image_ids=[" << global_image_ids << "]" << dendl;
9f95a23c 400 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
401
402 Updates to_remove;
403 for (auto const &global_image_id : global_image_ids) {
404 image_map::LookupInfo info = m_policy->lookup(global_image_id);
405 bool image_mapped = (info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
406
407 bool image_removed = image_mapped;
408 bool peer_removed = false;
409 auto peer_it = m_peer_map.find(global_image_id);
410 if (peer_it != m_peer_map.end()) {
411 auto& peer_set = peer_it->second;
412 peer_removed = peer_set.erase(peer_uuid);
413 image_removed = peer_removed && peer_set.empty();
414 }
415
416 if (image_mapped && peer_removed && !peer_uuid.empty()) {
417 // peer image has been deleted
418 to_remove.emplace_back(global_image_id, info.instance_id);
419 }
420
a4b75251 421 if (image_removed) {
11fdf7f2
TL
422 // local and peer images have been deleted
423 if (m_policy->remove_image(global_image_id)) {
424 schedule_action(global_image_id);
425 }
426 }
427 }
428
429 if (!to_remove.empty()) {
430 // removal notification will be notified instantly. this is safe
431 // even after scheduling action for images as we still hold m_lock
432 notify_listener_remove_images(peer_uuid, to_remove);
433 }
434}
435
436template <typename I>
437void ImageMap<I>::update_instances_added(
438 const std::vector<std::string> &instance_ids) {
439 {
9f95a23c 440 std::lock_guard locker{m_lock};
11fdf7f2
TL
441 if (m_shutting_down) {
442 return;
443 }
444
445 std::vector<std::string> filtered_instance_ids;
446 filter_instance_ids(instance_ids, &filtered_instance_ids, false);
447 if (filtered_instance_ids.empty()) {
448 return;
449 }
450
451 dout(20) << "instance_ids=" << filtered_instance_ids << dendl;
452
453 std::set<std::string> remap_global_image_ids;
454 m_policy->add_instances(filtered_instance_ids, &remap_global_image_ids);
455
456 for (auto const &global_image_id : remap_global_image_ids) {
457 schedule_action(global_image_id);
458 }
459 }
460
461 schedule_update_task();
462}
463
464template <typename I>
465void ImageMap<I>::update_instances_removed(
466 const std::vector<std::string> &instance_ids) {
467 {
9f95a23c 468 std::lock_guard locker{m_lock};
11fdf7f2
TL
469 if (m_shutting_down) {
470 return;
471 }
472
473 std::vector<std::string> filtered_instance_ids;
474 filter_instance_ids(instance_ids, &filtered_instance_ids, true);
475 if (filtered_instance_ids.empty()) {
476 return;
477 }
478
479 dout(20) << "instance_ids=" << filtered_instance_ids << dendl;
480
481 std::set<std::string> remap_global_image_ids;
482 m_policy->remove_instances(filtered_instance_ids, &remap_global_image_ids);
483
484 for (auto const &global_image_id : remap_global_image_ids) {
485 schedule_action(global_image_id);
486 }
487 }
488
489 schedule_update_task();
490}
491
492template <typename I>
493void ImageMap<I>::update_images(const std::string &peer_uuid,
494 std::set<std::string> &&added_global_image_ids,
495 std::set<std::string> &&removed_global_image_ids) {
496 dout(5) << "peer_uuid=" << peer_uuid << ", " << "added_count="
497 << added_global_image_ids.size() << ", " << "removed_count="
498 << removed_global_image_ids.size() << dendl;
499
500 {
9f95a23c 501 std::lock_guard locker{m_lock};
11fdf7f2
TL
502 if (m_shutting_down) {
503 return;
504 }
505
506 if (!removed_global_image_ids.empty()) {
507 update_images_removed(peer_uuid, removed_global_image_ids);
508 }
509 if (!added_global_image_ids.empty()) {
510 update_images_added(peer_uuid, added_global_image_ids);
511 }
512 }
513
514 schedule_update_task();
515}
516
517template <typename I>
518void ImageMap<I>::handle_peer_ack(const std::string &global_image_id, int r) {
519 dout (20) << "global_image_id=" << global_image_id << ", r=" << r
520 << dendl;
521
522 continue_action({global_image_id}, r);
523}
524
525template <typename I>
526void ImageMap<I>::init(Context *on_finish) {
527 dout(20) << dendl;
528
529 CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
530 std::string policy_type = cct->_conf.get_val<string>("rbd_mirror_image_policy_type");
531
532 if (policy_type == "none" || policy_type == "simple") {
533 m_policy.reset(image_map::SimplePolicy::create(m_ioctx));
534 } else {
535 ceph_abort(); // not really needed as such, but catch it.
536 }
537
538 dout(20) << "mapping policy=" << policy_type << dendl;
539
540 start_async_op();
541 C_LoadMap *ctx = new C_LoadMap(this, on_finish);
542 image_map::LoadRequest<I> *req = image_map::LoadRequest<I>::create(
543 m_ioctx, &ctx->image_mapping, ctx);
544 req->send();
545}
546
547template <typename I>
548void ImageMap<I>::shut_down(Context *on_finish) {
549 dout(20) << dendl;
550
551 {
9f95a23c 552 std::lock_guard timer_lock{m_threads->timer_lock};
11fdf7f2
TL
553
554 {
9f95a23c 555 std::lock_guard locker{m_lock};
11fdf7f2
TL
556 ceph_assert(!m_shutting_down);
557
558 m_shutting_down = true;
559 m_policy.reset();
560 }
561
562 if (m_timer_task != nullptr) {
563 m_threads->timer->cancel_event(m_timer_task);
564 m_timer_task = nullptr;
565 }
566 if (m_rebalance_task != nullptr) {
567 m_threads->timer->cancel_event(m_rebalance_task);
568 m_rebalance_task = nullptr;
569 }
570 }
571
572 wait_for_async_ops(on_finish);
573}
574
575template <typename I>
576void ImageMap<I>::filter_instance_ids(
577 const std::vector<std::string> &instance_ids,
578 std::vector<std::string> *filtered_instance_ids, bool removal) const {
579 CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
580 std::string policy_type = cct->_conf.get_val<string>("rbd_mirror_image_policy_type");
581
582 if (policy_type != "none") {
583 *filtered_instance_ids = instance_ids;
584 return;
585 }
586
587 if (removal) {
588 // propagate removals for external instances
589 for (auto& instance_id : instance_ids) {
590 if (instance_id != m_instance_id) {
591 filtered_instance_ids->push_back(instance_id);
592 }
593 }
594 } else if (std::find(instance_ids.begin(), instance_ids.end(),
595 m_instance_id) != instance_ids.end()) {
596 // propagate addition only for local instance
597 filtered_instance_ids->push_back(m_instance_id);
598 }
599}
600
601} // namespace mirror
602} // namespace rbd
603
604template class rbd::mirror::ImageMap<librbd::ImageCtx>;