1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/stringify.h"
5 #include "common/Timer.h"
6 #include "common/WorkQueue.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "librbd/Utils.h"
10 #include "InstanceWatcher.h"
11 #include "Instances.h"
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
17 #define dout_prefix *_dout << "rbd::mirror::Instances: " \
18 << this << " " << __func__ << ": "
23 using librbd::util::create_async_context_callback
;
24 using librbd::util::create_context_callback
;
25 using librbd::util::create_rados_callback
;
28 Instances
<I
>::Instances(Threads
<I
> *threads
, librados::IoCtx
&ioctx
) :
29 m_threads(threads
), m_ioctx(ioctx
),
30 m_cct(reinterpret_cast<CephContext
*>(ioctx
.cct())),
31 m_lock("rbd::mirror::Instances " + ioctx
.get_pool_name()) {
35 Instances
<I
>::~Instances() {
39 void Instances
<I
>::init(Context
*on_finish
) {
42 Mutex::Locker
locker(m_lock
);
43 assert(m_on_finish
== nullptr);
44 m_on_finish
= on_finish
;
49 void Instances
<I
>::shut_down(Context
*on_finish
) {
52 Mutex::Locker
locker(m_lock
);
53 assert(m_on_finish
== nullptr);
54 m_on_finish
= on_finish
;
56 Context
*ctx
= new FunctionContext(
58 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
59 Mutex::Locker
locker(m_lock
);
61 for (auto it
: m_instances
) {
62 cancel_remove_task(it
.second
);
67 m_threads
->work_queue
->queue(ctx
, 0);
71 void Instances
<I
>::notify(const std::string
&instance_id
) {
72 dout(20) << instance_id
<< dendl
;
74 Mutex::Locker
locker(m_lock
);
76 if (m_on_finish
!= nullptr) {
77 dout(20) << "received on shut down, ignoring" << dendl
;
81 Context
*ctx
= new C_Notify(this, instance_id
);
83 m_threads
->work_queue
->queue(ctx
, 0);
87 void Instances
<I
>::handle_notify(const std::string
&instance_id
) {
88 dout(20) << instance_id
<< dendl
;
90 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
91 Mutex::Locker
locker(m_lock
);
93 if (m_on_finish
!= nullptr) {
94 dout(20) << "handled on shut down, ignoring" << dendl
;
98 auto &instance
= m_instances
.insert(
99 std::make_pair(instance_id
, Instance(instance_id
))).first
->second
;
101 schedule_remove_task(instance
);
104 template <typename I
>
105 void Instances
<I
>::list(std::vector
<std::string
> *instance_ids
) {
108 Mutex::Locker
locker(m_lock
);
110 for (auto it
: m_instances
) {
111 instance_ids
->push_back(it
.first
);
116 template <typename I
>
117 void Instances
<I
>::get_instances() {
120 assert(m_lock
.is_locked());
122 Context
*ctx
= create_context_callback
<
123 Instances
, &Instances
<I
>::handle_get_instances
>(this);
125 InstanceWatcher
<I
>::get_instances(m_ioctx
, &m_instance_ids
, ctx
);
128 template <typename I
>
129 void Instances
<I
>::handle_get_instances(int r
) {
130 dout(20) << "r=" << r
<< dendl
;
132 Context
*on_finish
= nullptr;
134 Mutex::Locker
timer_locker(m_threads
->timer_lock
);
135 Mutex::Locker
locker(m_lock
);
138 derr
<< "error retrieving instances: " << cpp_strerror(r
) << dendl
;
140 auto my_instance_id
= stringify(m_ioctx
.get_instance_id());
141 for (auto &instance_id
: m_instance_ids
) {
142 if (instance_id
== my_instance_id
) {
145 auto &instance
= m_instances
.insert(
146 std::make_pair(instance_id
, Instance(instance_id
))).first
->second
;
147 schedule_remove_task(instance
);
150 std::swap(on_finish
, m_on_finish
);
152 on_finish
->complete(r
);
155 template <typename I
>
156 void Instances
<I
>::wait_for_ops() {
159 assert(m_lock
.is_locked());
161 Context
*ctx
= create_async_context_callback(
162 m_threads
->work_queue
, create_context_callback
<
163 Instances
, &Instances
<I
>::handle_wait_for_ops
>(this));
165 m_async_op_tracker
.wait_for_ops(ctx
);
168 template <typename I
>
169 void Instances
<I
>::handle_wait_for_ops(int r
) {
170 dout(20) << "r=" << r
<< dendl
;
174 Context
*on_finish
= nullptr;
176 Mutex::Locker
locker(m_lock
);
177 std::swap(on_finish
, m_on_finish
);
179 on_finish
->complete(r
);
182 template <typename I
>
183 void Instances
<I
>::remove_instance(Instance
&instance
) {
184 assert(m_lock
.is_locked());
186 dout(20) << instance
.id
<< dendl
;
188 Context
*ctx
= create_async_context_callback(
189 m_threads
->work_queue
, create_context_callback
<
190 Instances
, &Instances
<I
>::handle_remove_instance
>(this));
192 m_async_op_tracker
.start_op();
193 InstanceWatcher
<I
>::remove_instance(m_ioctx
, m_threads
->work_queue
,
195 m_instances
.erase(instance
.id
);
198 template <typename I
>
199 void Instances
<I
>::handle_remove_instance(int r
) {
200 Mutex::Locker
locker(m_lock
);
202 dout(20) << " r=" << r
<< dendl
;
206 m_async_op_tracker
.finish_op();
209 template <typename I
>
210 void Instances
<I
>::cancel_remove_task(Instance
&instance
) {
211 assert(m_threads
->timer_lock
.is_locked());
212 assert(m_lock
.is_locked());
214 if (instance
.timer_task
== nullptr) {
218 dout(20) << instance
.timer_task
<< dendl
;
220 bool canceled
= m_threads
->timer
->cancel_event(instance
.timer_task
);
222 instance
.timer_task
= nullptr;
225 template <typename I
>
226 void Instances
<I
>::schedule_remove_task(Instance
&instance
) {
229 cancel_remove_task(instance
);
231 int after
= m_cct
->_conf
->rbd_mirror_leader_heartbeat_interval
*
232 (1 + m_cct
->_conf
->rbd_mirror_leader_max_missed_heartbeats
+
233 m_cct
->_conf
->rbd_mirror_leader_max_acquire_attempts_before_break
);
235 instance
.timer_task
= new FunctionContext(
236 [this, &instance
](int r
) {
237 assert(m_threads
->timer_lock
.is_locked());
238 Mutex::Locker
locker(m_lock
);
239 instance
.timer_task
= nullptr;
240 remove_instance(instance
);
243 dout(20) << "scheduling instance " << instance
.id
<< " remove after " << after
244 << " sec (task " << instance
.timer_task
<< ")" << dendl
;
246 m_threads
->timer
->add_event_after(after
, instance
.timer_task
);
249 } // namespace mirror
252 template class rbd::mirror::Instances
<librbd::ImageCtx
>;