]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "include/stringify.h" | |
5 | #include "common/Timer.h" | |
6 | #include "common/WorkQueue.h" | |
7 | #include "common/debug.h" | |
8 | #include "common/errno.h" | |
9 | #include "librbd/Utils.h" | |
10 | #include "InstanceWatcher.h" | |
11 | #include "Instances.h" | |
12 | #include "Threads.h" | |
13 | ||
14 | #define dout_context g_ceph_context | |
15 | #define dout_subsys ceph_subsys_rbd_mirror | |
16 | #undef dout_prefix | |
17 | #define dout_prefix *_dout << "rbd::mirror::Instances: " \ | |
18 | << this << " " << __func__ << ": " | |
19 | ||
20 | namespace rbd { | |
21 | namespace mirror { | |
22 | ||
23 | using librbd::util::create_async_context_callback; | |
24 | using librbd::util::create_context_callback; | |
25 | using librbd::util::create_rados_callback; | |
26 | ||
27 | template <typename I> | |
11fdf7f2 TL |
28 | Instances<I>::Instances(Threads<I> *threads, librados::IoCtx &ioctx, |
29 | const std::string& instance_id, | |
30 | instances::Listener& listener) : | |
31 | m_threads(threads), m_ioctx(ioctx), m_instance_id(instance_id), | |
32 | m_listener(listener), m_cct(reinterpret_cast<CephContext *>(ioctx.cct())), | |
9f95a23c | 33 | m_lock(ceph::make_mutex("rbd::mirror::Instances " + ioctx.get_pool_name())) { |
7c673cae FG |
34 | } |
35 | ||
36 | template <typename I> | |
37 | Instances<I>::~Instances() { | |
38 | } | |
39 | ||
40 | template <typename I> | |
41 | void Instances<I>::init(Context *on_finish) { | |
11fdf7f2 | 42 | dout(10) << dendl; |
7c673cae | 43 | |
9f95a23c | 44 | std::lock_guard locker{m_lock}; |
11fdf7f2 | 45 | ceph_assert(m_on_finish == nullptr); |
7c673cae FG |
46 | m_on_finish = on_finish; |
47 | get_instances(); | |
48 | } | |
49 | ||
50 | template <typename I> | |
51 | void Instances<I>::shut_down(Context *on_finish) { | |
11fdf7f2 | 52 | dout(10) << dendl; |
7c673cae | 53 | |
9f95a23c | 54 | std::lock_guard locker{m_lock}; |
11fdf7f2 | 55 | ceph_assert(m_on_finish == nullptr); |
7c673cae FG |
56 | m_on_finish = on_finish; |
57 | ||
9f95a23c | 58 | Context *ctx = new LambdaContext( |
7c673cae | 59 | [this](int r) { |
9f95a23c | 60 | std::scoped_lock locker{m_threads->timer_lock, m_lock}; |
11fdf7f2 | 61 | cancel_remove_task(); |
7c673cae FG |
62 | wait_for_ops(); |
63 | }); | |
64 | ||
65 | m_threads->work_queue->queue(ctx, 0); | |
66 | } | |
67 | ||
68 | template <typename I> | |
11fdf7f2 TL |
69 | void Instances<I>::unblock_listener() { |
70 | dout(5) << dendl; | |
7c673cae | 71 | |
9f95a23c | 72 | std::lock_guard locker{m_lock}; |
11fdf7f2 TL |
73 | ceph_assert(m_listener_blocked); |
74 | m_listener_blocked = false; | |
75 | ||
76 | InstanceIds added_instance_ids; | |
77 | for (auto& pair : m_instances) { | |
78 | if (pair.second.state == INSTANCE_STATE_ADDING) { | |
79 | added_instance_ids.push_back(pair.first); | |
80 | } | |
81 | } | |
82 | ||
83 | if (!added_instance_ids.empty()) { | |
84 | m_threads->work_queue->queue( | |
85 | new C_NotifyInstancesAdded(this, added_instance_ids), 0); | |
86 | } | |
87 | } | |
7c673cae | 88 | |
11fdf7f2 TL |
89 | template <typename I> |
90 | void Instances<I>::acked(const InstanceIds& instance_ids) { | |
91 | dout(10) << "instance_ids=" << instance_ids << dendl; | |
92 | ||
9f95a23c | 93 | std::lock_guard locker{m_lock}; |
7c673cae | 94 | if (m_on_finish != nullptr) { |
11fdf7f2 | 95 | dout(5) << "received on shut down, ignoring" << dendl; |
7c673cae FG |
96 | return; |
97 | } | |
98 | ||
11fdf7f2 | 99 | Context *ctx = new C_HandleAcked(this, instance_ids); |
7c673cae FG |
100 | m_threads->work_queue->queue(ctx, 0); |
101 | } | |
102 | ||
103 | template <typename I> | |
11fdf7f2 TL |
104 | void Instances<I>::handle_acked(const InstanceIds& instance_ids) { |
105 | dout(5) << "instance_ids=" << instance_ids << dendl; | |
7c673cae | 106 | |
9f95a23c | 107 | std::scoped_lock locker{m_threads->timer_lock, m_lock}; |
7c673cae | 108 | if (m_on_finish != nullptr) { |
11fdf7f2 | 109 | dout(5) << "handled on shut down, ignoring" << dendl; |
7c673cae FG |
110 | return; |
111 | } | |
112 | ||
11fdf7f2 | 113 | InstanceIds added_instance_ids; |
9f95a23c | 114 | auto time = clock_t::now(); |
11fdf7f2 TL |
115 | for (auto& instance_id : instance_ids) { |
116 | auto &instance = m_instances.insert( | |
117 | std::make_pair(instance_id, Instance{})).first->second; | |
118 | instance.acked_time = time; | |
119 | if (instance.state == INSTANCE_STATE_ADDING) { | |
120 | added_instance_ids.push_back(instance_id); | |
121 | } | |
122 | } | |
7c673cae | 123 | |
11fdf7f2 TL |
124 | schedule_remove_task(time); |
125 | if (!m_listener_blocked && !added_instance_ids.empty()) { | |
126 | m_threads->work_queue->queue( | |
127 | new C_NotifyInstancesAdded(this, added_instance_ids), 0); | |
128 | } | |
129 | } | |
130 | ||
131 | template <typename I> | |
132 | void Instances<I>::notify_instances_added(const InstanceIds& instance_ids) { | |
9f95a23c | 133 | std::unique_lock locker{m_lock}; |
11fdf7f2 TL |
134 | InstanceIds added_instance_ids; |
135 | for (auto& instance_id : instance_ids) { | |
136 | auto it = m_instances.find(instance_id); | |
137 | if (it != m_instances.end() && it->second.state == INSTANCE_STATE_ADDING) { | |
138 | added_instance_ids.push_back(instance_id); | |
139 | } | |
140 | } | |
141 | ||
142 | if (added_instance_ids.empty()) { | |
143 | return; | |
144 | } | |
145 | ||
146 | dout(5) << "instance_ids=" << added_instance_ids << dendl; | |
9f95a23c | 147 | locker.unlock(); |
11fdf7f2 | 148 | m_listener.handle_added(added_instance_ids); |
9f95a23c | 149 | locker.lock(); |
11fdf7f2 TL |
150 | |
151 | for (auto& instance_id : added_instance_ids) { | |
152 | auto it = m_instances.find(instance_id); | |
153 | if (it != m_instances.end() && it->second.state == INSTANCE_STATE_ADDING) { | |
154 | it->second.state = INSTANCE_STATE_IDLE; | |
155 | } | |
156 | } | |
157 | } | |
158 | ||
159 | template <typename I> | |
160 | void Instances<I>::notify_instances_removed(const InstanceIds& instance_ids) { | |
161 | dout(5) << "instance_ids=" << instance_ids << dendl; | |
162 | m_listener.handle_removed(instance_ids); | |
163 | ||
9f95a23c | 164 | std::lock_guard locker{m_lock}; |
11fdf7f2 TL |
165 | for (auto& instance_id : instance_ids) { |
166 | m_instances.erase(instance_id); | |
167 | } | |
7c673cae FG |
168 | } |
169 | ||
170 | template <typename I> | |
171 | void Instances<I>::list(std::vector<std::string> *instance_ids) { | |
172 | dout(20) << dendl; | |
173 | ||
9f95a23c | 174 | std::lock_guard locker{m_lock}; |
7c673cae FG |
175 | |
176 | for (auto it : m_instances) { | |
177 | instance_ids->push_back(it.first); | |
178 | } | |
179 | } | |
180 | ||
181 | ||
182 | template <typename I> | |
183 | void Instances<I>::get_instances() { | |
11fdf7f2 | 184 | dout(10) << dendl; |
7c673cae | 185 | |
9f95a23c | 186 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
187 | |
188 | Context *ctx = create_context_callback< | |
189 | Instances, &Instances<I>::handle_get_instances>(this); | |
190 | ||
191 | InstanceWatcher<I>::get_instances(m_ioctx, &m_instance_ids, ctx); | |
192 | } | |
193 | ||
194 | template <typename I> | |
195 | void Instances<I>::handle_get_instances(int r) { | |
11fdf7f2 | 196 | dout(10) << "r=" << r << dendl; |
7c673cae FG |
197 | |
198 | Context *on_finish = nullptr; | |
199 | { | |
9f95a23c | 200 | std::lock_guard locker{m_lock}; |
7c673cae FG |
201 | std::swap(on_finish, m_on_finish); |
202 | } | |
11fdf7f2 TL |
203 | |
204 | if (r < 0) { | |
205 | derr << "error retrieving instances: " << cpp_strerror(r) << dendl; | |
206 | } else { | |
207 | handle_acked(m_instance_ids); | |
208 | } | |
7c673cae FG |
209 | on_finish->complete(r); |
210 | } | |
211 | ||
212 | template <typename I> | |
213 | void Instances<I>::wait_for_ops() { | |
11fdf7f2 | 214 | dout(10) << dendl; |
7c673cae | 215 | |
9f95a23c | 216 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
217 | |
218 | Context *ctx = create_async_context_callback( | |
219 | m_threads->work_queue, create_context_callback< | |
220 | Instances, &Instances<I>::handle_wait_for_ops>(this)); | |
221 | ||
222 | m_async_op_tracker.wait_for_ops(ctx); | |
223 | } | |
224 | ||
225 | template <typename I> | |
226 | void Instances<I>::handle_wait_for_ops(int r) { | |
11fdf7f2 | 227 | dout(10) << "r=" << r << dendl; |
7c673cae | 228 | |
11fdf7f2 | 229 | ceph_assert(r == 0); |
7c673cae FG |
230 | |
231 | Context *on_finish = nullptr; | |
232 | { | |
9f95a23c | 233 | std::lock_guard locker{m_lock}; |
7c673cae FG |
234 | std::swap(on_finish, m_on_finish); |
235 | } | |
236 | on_finish->complete(r); | |
237 | } | |
238 | ||
239 | template <typename I> | |
9f95a23c TL |
240 | void Instances<I>::remove_instances(const Instances<I>::clock_t::time_point& time) { |
241 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
11fdf7f2 TL |
242 | |
243 | InstanceIds instance_ids; | |
244 | for (auto& instance_pair : m_instances) { | |
245 | if (instance_pair.first == m_instance_id) { | |
246 | continue; | |
247 | } | |
248 | auto& instance = instance_pair.second; | |
249 | if (instance.state != INSTANCE_STATE_REMOVING && | |
250 | instance.acked_time <= time) { | |
251 | instance.state = INSTANCE_STATE_REMOVING; | |
252 | instance_ids.push_back(instance_pair.first); | |
253 | } | |
254 | } | |
255 | ceph_assert(!instance_ids.empty()); | |
7c673cae | 256 | |
11fdf7f2 | 257 | dout(10) << "instance_ids=" << instance_ids << dendl; |
9f95a23c | 258 | Context* ctx = new LambdaContext([this, instance_ids](int r) { |
11fdf7f2 TL |
259 | handle_remove_instances(r, instance_ids); |
260 | }); | |
261 | ctx = create_async_context_callback(m_threads->work_queue, ctx); | |
7c673cae | 262 | |
11fdf7f2 TL |
263 | auto gather_ctx = new C_Gather(m_cct, ctx); |
264 | for (auto& instance_id : instance_ids) { | |
265 | InstanceWatcher<I>::remove_instance(m_ioctx, m_threads->work_queue, | |
266 | instance_id, gather_ctx->new_sub()); | |
267 | } | |
7c673cae FG |
268 | |
269 | m_async_op_tracker.start_op(); | |
11fdf7f2 | 270 | gather_ctx->activate(); |
7c673cae FG |
271 | } |
272 | ||
273 | template <typename I> | |
11fdf7f2 TL |
274 | void Instances<I>::handle_remove_instances( |
275 | int r, const InstanceIds& instance_ids) { | |
9f95a23c | 276 | std::scoped_lock locker{m_threads->timer_lock, m_lock}; |
7c673cae | 277 | |
11fdf7f2 TL |
278 | dout(10) << "r=" << r << ", instance_ids=" << instance_ids << dendl; |
279 | ceph_assert(r == 0); | |
7c673cae | 280 | |
11fdf7f2 TL |
281 | // fire removed notification now that instances have been blacklisted |
282 | m_threads->work_queue->queue( | |
283 | new C_NotifyInstancesRemoved(this, instance_ids), 0); | |
7c673cae | 284 | |
11fdf7f2 | 285 | // reschedule the timer for the next batch |
9f95a23c | 286 | schedule_remove_task(clock_t::now()); |
7c673cae FG |
287 | m_async_op_tracker.finish_op(); |
288 | } | |
289 | ||
290 | template <typename I> | |
11fdf7f2 | 291 | void Instances<I>::cancel_remove_task() { |
9f95a23c TL |
292 | ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); |
293 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
7c673cae | 294 | |
11fdf7f2 | 295 | if (m_timer_task == nullptr) { |
7c673cae FG |
296 | return; |
297 | } | |
298 | ||
11fdf7f2 | 299 | dout(10) << dendl; |
7c673cae | 300 | |
11fdf7f2 TL |
301 | bool canceled = m_threads->timer->cancel_event(m_timer_task); |
302 | ceph_assert(canceled); | |
303 | m_timer_task = nullptr; | |
7c673cae FG |
304 | } |
305 | ||
306 | template <typename I> | |
9f95a23c | 307 | void Instances<I>::schedule_remove_task(const Instances<I>::clock_t::time_point& time) { |
11fdf7f2 TL |
308 | cancel_remove_task(); |
309 | if (m_on_finish != nullptr) { | |
310 | dout(10) << "received on shut down, ignoring" << dendl; | |
311 | return; | |
312 | } | |
313 | ||
314 | int after = m_cct->_conf.get_val<uint64_t>("rbd_mirror_leader_heartbeat_interval") * | |
315 | (1 + m_cct->_conf.get_val<uint64_t>("rbd_mirror_leader_max_missed_heartbeats") + | |
316 | m_cct->_conf.get_val<uint64_t>("rbd_mirror_leader_max_acquire_attempts_before_break")); | |
317 | ||
318 | bool schedule = false; | |
9f95a23c | 319 | auto oldest_time = time; |
11fdf7f2 TL |
320 | for (auto& instance : m_instances) { |
321 | if (instance.first == m_instance_id) { | |
322 | continue; | |
323 | } | |
324 | if (instance.second.state == INSTANCE_STATE_REMOVING) { | |
325 | // removal is already in-flight | |
326 | continue; | |
327 | } | |
328 | ||
329 | oldest_time = std::min(oldest_time, instance.second.acked_time); | |
330 | schedule = true; | |
331 | } | |
7c673cae | 332 | |
11fdf7f2 TL |
333 | if (!schedule) { |
334 | return; | |
335 | } | |
7c673cae | 336 | |
11fdf7f2 | 337 | dout(10) << dendl; |
7c673cae | 338 | |
11fdf7f2 | 339 | // schedule a time to fire when the oldest instance should be removed |
9f95a23c | 340 | m_timer_task = new LambdaContext( |
11fdf7f2 | 341 | [this, oldest_time](int r) { |
9f95a23c TL |
342 | ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); |
343 | std::lock_guard locker{m_lock}; | |
11fdf7f2 | 344 | m_timer_task = nullptr; |
7c673cae | 345 | |
11fdf7f2 TL |
346 | remove_instances(oldest_time); |
347 | }); | |
7c673cae | 348 | |
9f95a23c | 349 | oldest_time += ceph::make_timespan(after); |
11fdf7f2 | 350 | m_threads->timer->add_event_at(oldest_time, m_timer_task); |
7c673cae FG |
351 | } |
352 | ||
353 | } // namespace mirror | |
354 | } // namespace rbd | |
355 | ||
356 | template class rbd::mirror::Instances<librbd::ImageCtx>; |