]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/Instances.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / tools / rbd_mirror / Instances.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/stringify.h"
5#include "common/Timer.h"
6#include "common/WorkQueue.h"
7#include "common/debug.h"
8#include "common/errno.h"
9#include "librbd/Utils.h"
10#include "InstanceWatcher.h"
11#include "Instances.h"
12#include "Threads.h"
13
14#define dout_context g_ceph_context
15#define dout_subsys ceph_subsys_rbd_mirror
16#undef dout_prefix
17#define dout_prefix *_dout << "rbd::mirror::Instances: " \
18 << this << " " << __func__ << ": "
19
20namespace rbd {
21namespace mirror {
22
23using librbd::util::create_async_context_callback;
24using librbd::util::create_context_callback;
25using librbd::util::create_rados_callback;
26
27template <typename I>
11fdf7f2
TL
28Instances<I>::Instances(Threads<I> *threads, librados::IoCtx &ioctx,
29 const std::string& instance_id,
30 instances::Listener& listener) :
31 m_threads(threads), m_ioctx(ioctx), m_instance_id(instance_id),
32 m_listener(listener), m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
9f95a23c 33 m_lock(ceph::make_mutex("rbd::mirror::Instances " + ioctx.get_pool_name())) {
7c673cae
FG
34}
35
36template <typename I>
37Instances<I>::~Instances() {
38}
39
40template <typename I>
41void Instances<I>::init(Context *on_finish) {
11fdf7f2 42 dout(10) << dendl;
7c673cae 43
9f95a23c 44 std::lock_guard locker{m_lock};
11fdf7f2 45 ceph_assert(m_on_finish == nullptr);
7c673cae
FG
46 m_on_finish = on_finish;
47 get_instances();
48}
49
50template <typename I>
51void Instances<I>::shut_down(Context *on_finish) {
11fdf7f2 52 dout(10) << dendl;
7c673cae 53
9f95a23c 54 std::lock_guard locker{m_lock};
11fdf7f2 55 ceph_assert(m_on_finish == nullptr);
7c673cae
FG
56 m_on_finish = on_finish;
57
9f95a23c 58 Context *ctx = new LambdaContext(
7c673cae 59 [this](int r) {
9f95a23c 60 std::scoped_lock locker{m_threads->timer_lock, m_lock};
11fdf7f2 61 cancel_remove_task();
7c673cae
FG
62 wait_for_ops();
63 });
64
65 m_threads->work_queue->queue(ctx, 0);
66}
67
68template <typename I>
11fdf7f2
TL
69void Instances<I>::unblock_listener() {
70 dout(5) << dendl;
7c673cae 71
9f95a23c 72 std::lock_guard locker{m_lock};
11fdf7f2
TL
73 ceph_assert(m_listener_blocked);
74 m_listener_blocked = false;
75
76 InstanceIds added_instance_ids;
77 for (auto& pair : m_instances) {
78 if (pair.second.state == INSTANCE_STATE_ADDING) {
79 added_instance_ids.push_back(pair.first);
80 }
81 }
82
83 if (!added_instance_ids.empty()) {
84 m_threads->work_queue->queue(
85 new C_NotifyInstancesAdded(this, added_instance_ids), 0);
86 }
87}
7c673cae 88
11fdf7f2
TL
89template <typename I>
90void Instances<I>::acked(const InstanceIds& instance_ids) {
91 dout(10) << "instance_ids=" << instance_ids << dendl;
92
9f95a23c 93 std::lock_guard locker{m_lock};
7c673cae 94 if (m_on_finish != nullptr) {
11fdf7f2 95 dout(5) << "received on shut down, ignoring" << dendl;
7c673cae
FG
96 return;
97 }
98
11fdf7f2 99 Context *ctx = new C_HandleAcked(this, instance_ids);
7c673cae
FG
100 m_threads->work_queue->queue(ctx, 0);
101}
102
103template <typename I>
11fdf7f2
TL
104void Instances<I>::handle_acked(const InstanceIds& instance_ids) {
105 dout(5) << "instance_ids=" << instance_ids << dendl;
7c673cae 106
9f95a23c 107 std::scoped_lock locker{m_threads->timer_lock, m_lock};
7c673cae 108 if (m_on_finish != nullptr) {
11fdf7f2 109 dout(5) << "handled on shut down, ignoring" << dendl;
7c673cae
FG
110 return;
111 }
112
11fdf7f2 113 InstanceIds added_instance_ids;
9f95a23c 114 auto time = clock_t::now();
11fdf7f2
TL
115 for (auto& instance_id : instance_ids) {
116 auto &instance = m_instances.insert(
117 std::make_pair(instance_id, Instance{})).first->second;
118 instance.acked_time = time;
119 if (instance.state == INSTANCE_STATE_ADDING) {
120 added_instance_ids.push_back(instance_id);
121 }
122 }
7c673cae 123
11fdf7f2
TL
124 schedule_remove_task(time);
125 if (!m_listener_blocked && !added_instance_ids.empty()) {
126 m_threads->work_queue->queue(
127 new C_NotifyInstancesAdded(this, added_instance_ids), 0);
128 }
129}
130
131template <typename I>
132void Instances<I>::notify_instances_added(const InstanceIds& instance_ids) {
9f95a23c 133 std::unique_lock locker{m_lock};
11fdf7f2
TL
134 InstanceIds added_instance_ids;
135 for (auto& instance_id : instance_ids) {
136 auto it = m_instances.find(instance_id);
137 if (it != m_instances.end() && it->second.state == INSTANCE_STATE_ADDING) {
138 added_instance_ids.push_back(instance_id);
139 }
140 }
141
142 if (added_instance_ids.empty()) {
143 return;
144 }
145
146 dout(5) << "instance_ids=" << added_instance_ids << dendl;
9f95a23c 147 locker.unlock();
11fdf7f2 148 m_listener.handle_added(added_instance_ids);
9f95a23c 149 locker.lock();
11fdf7f2
TL
150
151 for (auto& instance_id : added_instance_ids) {
152 auto it = m_instances.find(instance_id);
153 if (it != m_instances.end() && it->second.state == INSTANCE_STATE_ADDING) {
154 it->second.state = INSTANCE_STATE_IDLE;
155 }
156 }
157}
158
159template <typename I>
160void Instances<I>::notify_instances_removed(const InstanceIds& instance_ids) {
161 dout(5) << "instance_ids=" << instance_ids << dendl;
162 m_listener.handle_removed(instance_ids);
163
9f95a23c 164 std::lock_guard locker{m_lock};
11fdf7f2
TL
165 for (auto& instance_id : instance_ids) {
166 m_instances.erase(instance_id);
167 }
7c673cae
FG
168}
169
170template <typename I>
171void Instances<I>::list(std::vector<std::string> *instance_ids) {
172 dout(20) << dendl;
173
9f95a23c 174 std::lock_guard locker{m_lock};
7c673cae
FG
175
176 for (auto it : m_instances) {
177 instance_ids->push_back(it.first);
178 }
179}
180
181
182template <typename I>
183void Instances<I>::get_instances() {
11fdf7f2 184 dout(10) << dendl;
7c673cae 185
9f95a23c 186 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
187
188 Context *ctx = create_context_callback<
189 Instances, &Instances<I>::handle_get_instances>(this);
190
191 InstanceWatcher<I>::get_instances(m_ioctx, &m_instance_ids, ctx);
192}
193
194template <typename I>
195void Instances<I>::handle_get_instances(int r) {
11fdf7f2 196 dout(10) << "r=" << r << dendl;
7c673cae
FG
197
198 Context *on_finish = nullptr;
199 {
9f95a23c 200 std::lock_guard locker{m_lock};
7c673cae
FG
201 std::swap(on_finish, m_on_finish);
202 }
11fdf7f2
TL
203
204 if (r < 0) {
205 derr << "error retrieving instances: " << cpp_strerror(r) << dendl;
206 } else {
207 handle_acked(m_instance_ids);
208 }
7c673cae
FG
209 on_finish->complete(r);
210}
211
212template <typename I>
213void Instances<I>::wait_for_ops() {
11fdf7f2 214 dout(10) << dendl;
7c673cae 215
9f95a23c 216 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
217
218 Context *ctx = create_async_context_callback(
219 m_threads->work_queue, create_context_callback<
220 Instances, &Instances<I>::handle_wait_for_ops>(this));
221
222 m_async_op_tracker.wait_for_ops(ctx);
223}
224
225template <typename I>
226void Instances<I>::handle_wait_for_ops(int r) {
11fdf7f2 227 dout(10) << "r=" << r << dendl;
7c673cae 228
11fdf7f2 229 ceph_assert(r == 0);
7c673cae
FG
230
231 Context *on_finish = nullptr;
232 {
9f95a23c 233 std::lock_guard locker{m_lock};
7c673cae
FG
234 std::swap(on_finish, m_on_finish);
235 }
236 on_finish->complete(r);
237}
238
239template <typename I>
9f95a23c
TL
240void Instances<I>::remove_instances(const Instances<I>::clock_t::time_point& time) {
241 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
242
243 InstanceIds instance_ids;
244 for (auto& instance_pair : m_instances) {
245 if (instance_pair.first == m_instance_id) {
246 continue;
247 }
248 auto& instance = instance_pair.second;
249 if (instance.state != INSTANCE_STATE_REMOVING &&
250 instance.acked_time <= time) {
251 instance.state = INSTANCE_STATE_REMOVING;
252 instance_ids.push_back(instance_pair.first);
253 }
254 }
255 ceph_assert(!instance_ids.empty());
7c673cae 256
11fdf7f2 257 dout(10) << "instance_ids=" << instance_ids << dendl;
9f95a23c 258 Context* ctx = new LambdaContext([this, instance_ids](int r) {
11fdf7f2
TL
259 handle_remove_instances(r, instance_ids);
260 });
261 ctx = create_async_context_callback(m_threads->work_queue, ctx);
7c673cae 262
11fdf7f2
TL
263 auto gather_ctx = new C_Gather(m_cct, ctx);
264 for (auto& instance_id : instance_ids) {
265 InstanceWatcher<I>::remove_instance(m_ioctx, m_threads->work_queue,
266 instance_id, gather_ctx->new_sub());
267 }
7c673cae
FG
268
269 m_async_op_tracker.start_op();
11fdf7f2 270 gather_ctx->activate();
7c673cae
FG
271}
272
273template <typename I>
11fdf7f2
TL
274void Instances<I>::handle_remove_instances(
275 int r, const InstanceIds& instance_ids) {
9f95a23c 276 std::scoped_lock locker{m_threads->timer_lock, m_lock};
7c673cae 277
11fdf7f2
TL
278 dout(10) << "r=" << r << ", instance_ids=" << instance_ids << dendl;
279 ceph_assert(r == 0);
7c673cae 280
11fdf7f2
TL
281 // fire removed notification now that instances have been blacklisted
282 m_threads->work_queue->queue(
283 new C_NotifyInstancesRemoved(this, instance_ids), 0);
7c673cae 284
11fdf7f2 285 // reschedule the timer for the next batch
9f95a23c 286 schedule_remove_task(clock_t::now());
7c673cae
FG
287 m_async_op_tracker.finish_op();
288}
289
290template <typename I>
11fdf7f2 291void Instances<I>::cancel_remove_task() {
9f95a23c
TL
292 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
293 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae 294
11fdf7f2 295 if (m_timer_task == nullptr) {
7c673cae
FG
296 return;
297 }
298
11fdf7f2 299 dout(10) << dendl;
7c673cae 300
11fdf7f2
TL
301 bool canceled = m_threads->timer->cancel_event(m_timer_task);
302 ceph_assert(canceled);
303 m_timer_task = nullptr;
7c673cae
FG
304}
305
306template <typename I>
9f95a23c 307void Instances<I>::schedule_remove_task(const Instances<I>::clock_t::time_point& time) {
11fdf7f2
TL
308 cancel_remove_task();
309 if (m_on_finish != nullptr) {
310 dout(10) << "received on shut down, ignoring" << dendl;
311 return;
312 }
313
314 int after = m_cct->_conf.get_val<uint64_t>("rbd_mirror_leader_heartbeat_interval") *
315 (1 + m_cct->_conf.get_val<uint64_t>("rbd_mirror_leader_max_missed_heartbeats") +
316 m_cct->_conf.get_val<uint64_t>("rbd_mirror_leader_max_acquire_attempts_before_break"));
317
318 bool schedule = false;
9f95a23c 319 auto oldest_time = time;
11fdf7f2
TL
320 for (auto& instance : m_instances) {
321 if (instance.first == m_instance_id) {
322 continue;
323 }
324 if (instance.second.state == INSTANCE_STATE_REMOVING) {
325 // removal is already in-flight
326 continue;
327 }
328
329 oldest_time = std::min(oldest_time, instance.second.acked_time);
330 schedule = true;
331 }
7c673cae 332
11fdf7f2
TL
333 if (!schedule) {
334 return;
335 }
7c673cae 336
11fdf7f2 337 dout(10) << dendl;
7c673cae 338
11fdf7f2 339 // schedule a time to fire when the oldest instance should be removed
9f95a23c 340 m_timer_task = new LambdaContext(
11fdf7f2 341 [this, oldest_time](int r) {
9f95a23c
TL
342 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
343 std::lock_guard locker{m_lock};
11fdf7f2 344 m_timer_task = nullptr;
7c673cae 345
11fdf7f2
TL
346 remove_instances(oldest_time);
347 });
7c673cae 348
9f95a23c 349 oldest_time += ceph::make_timespan(after);
11fdf7f2 350 m_threads->timer->add_event_at(oldest_time, m_timer_task);
7c673cae
FG
351}
352
353} // namespace mirror
354} // namespace rbd
355
356template class rbd::mirror::Instances<librbd::ImageCtx>;