1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/stringify.h"
5 #include "common/Timer.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "librbd/Utils.h"
9 #include "librbd/asio/ContextWQ.h"
10 #include "InstanceWatcher.h"
11 #include "Instances.h"
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
17 #define dout_prefix *_dout << "rbd::mirror::Instances: " \
18 << this << " " << __func__ << ": "
23 using librbd::util::create_async_context_callback
;
24 using librbd::util::create_context_callback
;
25 using librbd::util::create_rados_callback
;
28 Instances
<I
>::Instances(Threads
<I
> *threads
, librados::IoCtx
&ioctx
,
29 const std::string
& instance_id
,
30 instances::Listener
& listener
) :
31 m_threads(threads
), m_ioctx(ioctx
), m_instance_id(instance_id
),
32 m_listener(listener
), m_cct(reinterpret_cast<CephContext
*>(ioctx
.cct())),
33 m_lock(ceph::make_mutex("rbd::mirror::Instances " + ioctx
.get_pool_name())) {
37 Instances
<I
>::~Instances() {
41 void Instances
<I
>::init(Context
*on_finish
) {
44 std::lock_guard locker
{m_lock
};
45 ceph_assert(m_on_finish
== nullptr);
46 m_on_finish
= on_finish
;
51 void Instances
<I
>::shut_down(Context
*on_finish
) {
54 std::lock_guard locker
{m_lock
};
55 ceph_assert(m_on_finish
== nullptr);
56 m_on_finish
= on_finish
;
58 Context
*ctx
= new LambdaContext(
60 std::scoped_lock locker
{m_threads
->timer_lock
, m_lock
};
65 m_threads
->work_queue
->queue(ctx
, 0);
69 void Instances
<I
>::unblock_listener() {
72 std::lock_guard locker
{m_lock
};
73 ceph_assert(m_listener_blocked
);
74 m_listener_blocked
= false;
76 InstanceIds added_instance_ids
;
77 for (auto& pair
: m_instances
) {
78 if (pair
.second
.state
== INSTANCE_STATE_ADDING
) {
79 added_instance_ids
.push_back(pair
.first
);
83 if (!added_instance_ids
.empty()) {
84 m_threads
->work_queue
->queue(
85 new C_NotifyInstancesAdded(this, added_instance_ids
), 0);
90 void Instances
<I
>::acked(const InstanceIds
& instance_ids
) {
91 dout(10) << "instance_ids=" << instance_ids
<< dendl
;
93 std::lock_guard locker
{m_lock
};
94 if (m_on_finish
!= nullptr) {
95 dout(5) << "received on shut down, ignoring" << dendl
;
99 Context
*ctx
= new C_HandleAcked(this, instance_ids
);
100 m_threads
->work_queue
->queue(ctx
, 0);
103 template <typename I
>
104 void Instances
<I
>::handle_acked(const InstanceIds
& instance_ids
) {
105 dout(5) << "instance_ids=" << instance_ids
<< dendl
;
107 std::scoped_lock locker
{m_threads
->timer_lock
, m_lock
};
108 if (m_on_finish
!= nullptr) {
109 dout(5) << "handled on shut down, ignoring" << dendl
;
113 InstanceIds added_instance_ids
;
114 auto time
= clock_t::now();
115 for (auto& instance_id
: instance_ids
) {
116 auto &instance
= m_instances
.insert(
117 std::make_pair(instance_id
, Instance
{})).first
->second
;
118 instance
.acked_time
= time
;
119 if (instance
.state
== INSTANCE_STATE_ADDING
) {
120 added_instance_ids
.push_back(instance_id
);
124 schedule_remove_task(time
);
125 if (!m_listener_blocked
&& !added_instance_ids
.empty()) {
126 m_threads
->work_queue
->queue(
127 new C_NotifyInstancesAdded(this, added_instance_ids
), 0);
131 template <typename I
>
132 void Instances
<I
>::notify_instances_added(const InstanceIds
& instance_ids
) {
133 std::unique_lock locker
{m_lock
};
134 InstanceIds added_instance_ids
;
135 for (auto& instance_id
: instance_ids
) {
136 auto it
= m_instances
.find(instance_id
);
137 if (it
!= m_instances
.end() && it
->second
.state
== INSTANCE_STATE_ADDING
) {
138 added_instance_ids
.push_back(instance_id
);
142 if (added_instance_ids
.empty()) {
146 dout(5) << "instance_ids=" << added_instance_ids
<< dendl
;
148 m_listener
.handle_added(added_instance_ids
);
151 for (auto& instance_id
: added_instance_ids
) {
152 auto it
= m_instances
.find(instance_id
);
153 if (it
!= m_instances
.end() && it
->second
.state
== INSTANCE_STATE_ADDING
) {
154 it
->second
.state
= INSTANCE_STATE_IDLE
;
159 template <typename I
>
160 void Instances
<I
>::notify_instances_removed(const InstanceIds
& instance_ids
) {
161 dout(5) << "instance_ids=" << instance_ids
<< dendl
;
162 m_listener
.handle_removed(instance_ids
);
164 std::lock_guard locker
{m_lock
};
165 for (auto& instance_id
: instance_ids
) {
166 m_instances
.erase(instance_id
);
170 template <typename I
>
171 void Instances
<I
>::list(std::vector
<std::string
> *instance_ids
) {
174 std::lock_guard locker
{m_lock
};
176 for (auto it
: m_instances
) {
177 instance_ids
->push_back(it
.first
);
182 template <typename I
>
183 void Instances
<I
>::get_instances() {
186 ceph_assert(ceph_mutex_is_locked(m_lock
));
188 Context
*ctx
= create_context_callback
<
189 Instances
, &Instances
<I
>::handle_get_instances
>(this);
191 InstanceWatcher
<I
>::get_instances(m_ioctx
, &m_instance_ids
, ctx
);
194 template <typename I
>
195 void Instances
<I
>::handle_get_instances(int r
) {
196 dout(10) << "r=" << r
<< dendl
;
198 Context
*on_finish
= nullptr;
200 std::lock_guard locker
{m_lock
};
201 std::swap(on_finish
, m_on_finish
);
205 derr
<< "error retrieving instances: " << cpp_strerror(r
) << dendl
;
207 handle_acked(m_instance_ids
);
209 on_finish
->complete(r
);
212 template <typename I
>
213 void Instances
<I
>::wait_for_ops() {
216 ceph_assert(ceph_mutex_is_locked(m_lock
));
218 Context
*ctx
= create_async_context_callback(
219 m_threads
->work_queue
, create_context_callback
<
220 Instances
, &Instances
<I
>::handle_wait_for_ops
>(this));
222 m_async_op_tracker
.wait_for_ops(ctx
);
225 template <typename I
>
226 void Instances
<I
>::handle_wait_for_ops(int r
) {
227 dout(10) << "r=" << r
<< dendl
;
231 Context
*on_finish
= nullptr;
233 std::lock_guard locker
{m_lock
};
234 std::swap(on_finish
, m_on_finish
);
236 on_finish
->complete(r
);
239 template <typename I
>
240 void Instances
<I
>::remove_instances(const Instances
<I
>::clock_t::time_point
& time
) {
241 ceph_assert(ceph_mutex_is_locked(m_lock
));
243 InstanceIds instance_ids
;
244 for (auto& instance_pair
: m_instances
) {
245 if (instance_pair
.first
== m_instance_id
) {
248 auto& instance
= instance_pair
.second
;
249 if (instance
.state
!= INSTANCE_STATE_REMOVING
&&
250 instance
.acked_time
<= time
) {
251 instance
.state
= INSTANCE_STATE_REMOVING
;
252 instance_ids
.push_back(instance_pair
.first
);
255 ceph_assert(!instance_ids
.empty());
257 dout(10) << "instance_ids=" << instance_ids
<< dendl
;
258 Context
* ctx
= new LambdaContext([this, instance_ids
](int r
) {
259 handle_remove_instances(r
, instance_ids
);
261 ctx
= create_async_context_callback(m_threads
->work_queue
, ctx
);
263 auto gather_ctx
= new C_Gather(m_cct
, ctx
);
264 for (auto& instance_id
: instance_ids
) {
265 InstanceWatcher
<I
>::remove_instance(m_ioctx
, *m_threads
->asio_engine
,
266 instance_id
, gather_ctx
->new_sub());
269 m_async_op_tracker
.start_op();
270 gather_ctx
->activate();
273 template <typename I
>
274 void Instances
<I
>::handle_remove_instances(
275 int r
, const InstanceIds
& instance_ids
) {
276 std::scoped_lock locker
{m_threads
->timer_lock
, m_lock
};
278 dout(10) << "r=" << r
<< ", instance_ids=" << instance_ids
<< dendl
;
281 // fire removed notification now that instances have been blocklisted
282 m_threads
->work_queue
->queue(
283 new C_NotifyInstancesRemoved(this, instance_ids
), 0);
285 // reschedule the timer for the next batch
286 schedule_remove_task(clock_t::now());
287 m_async_op_tracker
.finish_op();
290 template <typename I
>
291 void Instances
<I
>::cancel_remove_task() {
292 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
293 ceph_assert(ceph_mutex_is_locked(m_lock
));
295 if (m_timer_task
== nullptr) {
301 bool canceled
= m_threads
->timer
->cancel_event(m_timer_task
);
302 ceph_assert(canceled
);
303 m_timer_task
= nullptr;
306 template <typename I
>
307 void Instances
<I
>::schedule_remove_task(const Instances
<I
>::clock_t::time_point
& time
) {
308 cancel_remove_task();
309 if (m_on_finish
!= nullptr) {
310 dout(10) << "received on shut down, ignoring" << dendl
;
314 int after
= m_cct
->_conf
.get_val
<uint64_t>("rbd_mirror_leader_heartbeat_interval") *
315 (1 + m_cct
->_conf
.get_val
<uint64_t>("rbd_mirror_leader_max_missed_heartbeats") +
316 m_cct
->_conf
.get_val
<uint64_t>("rbd_mirror_leader_max_acquire_attempts_before_break"));
318 bool schedule
= false;
319 auto oldest_time
= time
;
320 for (auto& instance
: m_instances
) {
321 if (instance
.first
== m_instance_id
) {
324 if (instance
.second
.state
== INSTANCE_STATE_REMOVING
) {
325 // removal is already in-flight
329 oldest_time
= std::min(oldest_time
, instance
.second
.acked_time
);
339 // schedule a time to fire when the oldest instance should be removed
340 m_timer_task
= new LambdaContext(
341 [this, oldest_time
](int r
) {
342 ceph_assert(ceph_mutex_is_locked(m_threads
->timer_lock
));
343 std::lock_guard locker
{m_lock
};
344 m_timer_task
= nullptr;
346 remove_instances(oldest_time
);
349 oldest_time
+= ceph::make_timespan(after
);
350 m_threads
->timer
->add_event_at(oldest_time
, m_timer_task
);
353 } // namespace mirror
356 template class rbd::mirror::Instances
<librbd::ImageCtx
>;