]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/Instances.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / tools / rbd_mirror / Instances.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/stringify.h"
5 #include "common/Timer.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "librbd/Utils.h"
9 #include "librbd/asio/ContextWQ.h"
10 #include "InstanceWatcher.h"
11 #include "Instances.h"
12 #include "Threads.h"
13
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
16 #undef dout_prefix
17 #define dout_prefix *_dout << "rbd::mirror::Instances: " \
18 << this << " " << __func__ << ": "
19
20 namespace rbd {
21 namespace mirror {
22
23 using librbd::util::create_async_context_callback;
24 using librbd::util::create_context_callback;
25 using librbd::util::create_rados_callback;
26
27 template <typename I>
28 Instances<I>::Instances(Threads<I> *threads, librados::IoCtx &ioctx,
29 const std::string& instance_id,
30 instances::Listener& listener) :
31 m_threads(threads), m_ioctx(ioctx), m_instance_id(instance_id),
32 m_listener(listener), m_cct(reinterpret_cast<CephContext *>(ioctx.cct())),
33 m_lock(ceph::make_mutex("rbd::mirror::Instances " + ioctx.get_pool_name())) {
34 }
35
36 template <typename I>
37 Instances<I>::~Instances() {
38 }
39
40 template <typename I>
41 void Instances<I>::init(Context *on_finish) {
42 dout(10) << dendl;
43
44 std::lock_guard locker{m_lock};
45 ceph_assert(m_on_finish == nullptr);
46 m_on_finish = on_finish;
47 get_instances();
48 }
49
50 template <typename I>
51 void Instances<I>::shut_down(Context *on_finish) {
52 dout(10) << dendl;
53
54 std::lock_guard locker{m_lock};
55 ceph_assert(m_on_finish == nullptr);
56 m_on_finish = on_finish;
57
58 Context *ctx = new LambdaContext(
59 [this](int r) {
60 std::scoped_lock locker{m_threads->timer_lock, m_lock};
61 cancel_remove_task();
62 wait_for_ops();
63 });
64
65 m_threads->work_queue->queue(ctx, 0);
66 }
67
68 template <typename I>
69 void Instances<I>::unblock_listener() {
70 dout(5) << dendl;
71
72 std::lock_guard locker{m_lock};
73 ceph_assert(m_listener_blocked);
74 m_listener_blocked = false;
75
76 InstanceIds added_instance_ids;
77 for (auto& pair : m_instances) {
78 if (pair.second.state == INSTANCE_STATE_ADDING) {
79 added_instance_ids.push_back(pair.first);
80 }
81 }
82
83 if (!added_instance_ids.empty()) {
84 m_threads->work_queue->queue(
85 new C_NotifyInstancesAdded(this, added_instance_ids), 0);
86 }
87 }
88
89 template <typename I>
90 void Instances<I>::acked(const InstanceIds& instance_ids) {
91 dout(10) << "instance_ids=" << instance_ids << dendl;
92
93 std::lock_guard locker{m_lock};
94 if (m_on_finish != nullptr) {
95 dout(5) << "received on shut down, ignoring" << dendl;
96 return;
97 }
98
99 Context *ctx = new C_HandleAcked(this, instance_ids);
100 m_threads->work_queue->queue(ctx, 0);
101 }
102
103 template <typename I>
104 void Instances<I>::handle_acked(const InstanceIds& instance_ids) {
105 dout(5) << "instance_ids=" << instance_ids << dendl;
106
107 std::scoped_lock locker{m_threads->timer_lock, m_lock};
108 if (m_on_finish != nullptr) {
109 dout(5) << "handled on shut down, ignoring" << dendl;
110 return;
111 }
112
113 InstanceIds added_instance_ids;
114 auto time = clock_t::now();
115 for (auto& instance_id : instance_ids) {
116 auto &instance = m_instances.insert(
117 std::make_pair(instance_id, Instance{})).first->second;
118 instance.acked_time = time;
119 if (instance.state == INSTANCE_STATE_ADDING) {
120 added_instance_ids.push_back(instance_id);
121 }
122 }
123
124 schedule_remove_task(time);
125 if (!m_listener_blocked && !added_instance_ids.empty()) {
126 m_threads->work_queue->queue(
127 new C_NotifyInstancesAdded(this, added_instance_ids), 0);
128 }
129 }
130
131 template <typename I>
132 void Instances<I>::notify_instances_added(const InstanceIds& instance_ids) {
133 std::unique_lock locker{m_lock};
134 InstanceIds added_instance_ids;
135 for (auto& instance_id : instance_ids) {
136 auto it = m_instances.find(instance_id);
137 if (it != m_instances.end() && it->second.state == INSTANCE_STATE_ADDING) {
138 added_instance_ids.push_back(instance_id);
139 }
140 }
141
142 if (added_instance_ids.empty()) {
143 return;
144 }
145
146 dout(5) << "instance_ids=" << added_instance_ids << dendl;
147 locker.unlock();
148 m_listener.handle_added(added_instance_ids);
149 locker.lock();
150
151 for (auto& instance_id : added_instance_ids) {
152 auto it = m_instances.find(instance_id);
153 if (it != m_instances.end() && it->second.state == INSTANCE_STATE_ADDING) {
154 it->second.state = INSTANCE_STATE_IDLE;
155 }
156 }
157 }
158
159 template <typename I>
160 void Instances<I>::notify_instances_removed(const InstanceIds& instance_ids) {
161 dout(5) << "instance_ids=" << instance_ids << dendl;
162 m_listener.handle_removed(instance_ids);
163
164 std::lock_guard locker{m_lock};
165 for (auto& instance_id : instance_ids) {
166 m_instances.erase(instance_id);
167 }
168 }
169
170 template <typename I>
171 void Instances<I>::list(std::vector<std::string> *instance_ids) {
172 dout(20) << dendl;
173
174 std::lock_guard locker{m_lock};
175
176 for (auto it : m_instances) {
177 instance_ids->push_back(it.first);
178 }
179 }
180
181
182 template <typename I>
183 void Instances<I>::get_instances() {
184 dout(10) << dendl;
185
186 ceph_assert(ceph_mutex_is_locked(m_lock));
187
188 Context *ctx = create_context_callback<
189 Instances, &Instances<I>::handle_get_instances>(this);
190
191 InstanceWatcher<I>::get_instances(m_ioctx, &m_instance_ids, ctx);
192 }
193
194 template <typename I>
195 void Instances<I>::handle_get_instances(int r) {
196 dout(10) << "r=" << r << dendl;
197
198 Context *on_finish = nullptr;
199 {
200 std::lock_guard locker{m_lock};
201 std::swap(on_finish, m_on_finish);
202 }
203
204 if (r < 0) {
205 derr << "error retrieving instances: " << cpp_strerror(r) << dendl;
206 } else {
207 handle_acked(m_instance_ids);
208 }
209 on_finish->complete(r);
210 }
211
212 template <typename I>
213 void Instances<I>::wait_for_ops() {
214 dout(10) << dendl;
215
216 ceph_assert(ceph_mutex_is_locked(m_lock));
217
218 Context *ctx = create_async_context_callback(
219 m_threads->work_queue, create_context_callback<
220 Instances, &Instances<I>::handle_wait_for_ops>(this));
221
222 m_async_op_tracker.wait_for_ops(ctx);
223 }
224
225 template <typename I>
226 void Instances<I>::handle_wait_for_ops(int r) {
227 dout(10) << "r=" << r << dendl;
228
229 ceph_assert(r == 0);
230
231 Context *on_finish = nullptr;
232 {
233 std::lock_guard locker{m_lock};
234 std::swap(on_finish, m_on_finish);
235 }
236 on_finish->complete(r);
237 }
238
239 template <typename I>
240 void Instances<I>::remove_instances(const Instances<I>::clock_t::time_point& time) {
241 ceph_assert(ceph_mutex_is_locked(m_lock));
242
243 InstanceIds instance_ids;
244 for (auto& instance_pair : m_instances) {
245 if (instance_pair.first == m_instance_id) {
246 continue;
247 }
248 auto& instance = instance_pair.second;
249 if (instance.state != INSTANCE_STATE_REMOVING &&
250 instance.acked_time <= time) {
251 instance.state = INSTANCE_STATE_REMOVING;
252 instance_ids.push_back(instance_pair.first);
253 }
254 }
255 ceph_assert(!instance_ids.empty());
256
257 dout(10) << "instance_ids=" << instance_ids << dendl;
258 Context* ctx = new LambdaContext([this, instance_ids](int r) {
259 handle_remove_instances(r, instance_ids);
260 });
261 ctx = create_async_context_callback(m_threads->work_queue, ctx);
262
263 auto gather_ctx = new C_Gather(m_cct, ctx);
264 for (auto& instance_id : instance_ids) {
265 InstanceWatcher<I>::remove_instance(m_ioctx, *m_threads->asio_engine,
266 instance_id, gather_ctx->new_sub());
267 }
268
269 m_async_op_tracker.start_op();
270 gather_ctx->activate();
271 }
272
273 template <typename I>
274 void Instances<I>::handle_remove_instances(
275 int r, const InstanceIds& instance_ids) {
276 std::scoped_lock locker{m_threads->timer_lock, m_lock};
277
278 dout(10) << "r=" << r << ", instance_ids=" << instance_ids << dendl;
279 ceph_assert(r == 0);
280
281 // fire removed notification now that instances have been blocklisted
282 m_threads->work_queue->queue(
283 new C_NotifyInstancesRemoved(this, instance_ids), 0);
284
285 // reschedule the timer for the next batch
286 schedule_remove_task(clock_t::now());
287 m_async_op_tracker.finish_op();
288 }
289
290 template <typename I>
291 void Instances<I>::cancel_remove_task() {
292 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
293 ceph_assert(ceph_mutex_is_locked(m_lock));
294
295 if (m_timer_task == nullptr) {
296 return;
297 }
298
299 dout(10) << dendl;
300
301 bool canceled = m_threads->timer->cancel_event(m_timer_task);
302 ceph_assert(canceled);
303 m_timer_task = nullptr;
304 }
305
306 template <typename I>
307 void Instances<I>::schedule_remove_task(const Instances<I>::clock_t::time_point& time) {
308 cancel_remove_task();
309 if (m_on_finish != nullptr) {
310 dout(10) << "received on shut down, ignoring" << dendl;
311 return;
312 }
313
314 int after = m_cct->_conf.get_val<uint64_t>("rbd_mirror_leader_heartbeat_interval") *
315 (1 + m_cct->_conf.get_val<uint64_t>("rbd_mirror_leader_max_missed_heartbeats") +
316 m_cct->_conf.get_val<uint64_t>("rbd_mirror_leader_max_acquire_attempts_before_break"));
317
318 bool schedule = false;
319 auto oldest_time = time;
320 for (auto& instance : m_instances) {
321 if (instance.first == m_instance_id) {
322 continue;
323 }
324 if (instance.second.state == INSTANCE_STATE_REMOVING) {
325 // removal is already in-flight
326 continue;
327 }
328
329 oldest_time = std::min(oldest_time, instance.second.acked_time);
330 schedule = true;
331 }
332
333 if (!schedule) {
334 return;
335 }
336
337 dout(10) << dendl;
338
339 // schedule a time to fire when the oldest instance should be removed
340 m_timer_task = new LambdaContext(
341 [this, oldest_time](int r) {
342 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
343 std::lock_guard locker{m_lock};
344 m_timer_task = nullptr;
345
346 remove_instances(oldest_time);
347 });
348
349 oldest_time += ceph::make_timespan(after);
350 m_threads->timer->add_event_at(oldest_time, m_timer_task);
351 }
352
353 } // namespace mirror
354 } // namespace rbd
355
356 template class rbd::mirror::Instances<librbd::ImageCtx>;