1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "common/ceph_context.h"
5 #include "common/ceph_json.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "common/WorkQueue.h"
9 #include "cls/cephfs/cls_cephfs_client.h"
10 #include "include/stringify.h"
11 #include "aio_utils.h"
12 #include "InstanceWatcher.h"
15 #define dout_context g_ceph_context
16 #define dout_subsys ceph_subsys_cephfs_mirror
18 #define dout_prefix *_dout << "cephfs::mirror::InstanceWatcher " << __func__
27 std::string
instance_oid(const std::string
&instance_id
) {
28 return CEPHFS_MIRROR_OBJECT
+ "." + instance_id
;
31 } // anonymous namespace
33 InstanceWatcher::InstanceWatcher(librados::IoCtx
&ioctx
,
34 Listener
&listener
, ContextWQ
*work_queue
)
35 : Watcher(ioctx
, instance_oid(stringify(ioctx
.get_instance_id())), work_queue
),
38 m_work_queue(work_queue
),
39 m_lock(ceph::make_mutex("cephfs::mirror::instance_watcher")) {
42 InstanceWatcher::~InstanceWatcher() {
45 void InstanceWatcher::init(Context
*on_finish
) {
49 std::scoped_lock
locker(m_lock
);
50 ceph_assert(m_on_init_finish
== nullptr);
51 m_on_init_finish
= new LambdaContext([this, on_finish
](int r
) {
52 on_finish
->complete(r
);
53 if (m_on_shutdown_finish
!= nullptr) {
54 m_on_shutdown_finish
->complete(0);
62 void InstanceWatcher::shutdown(Context
*on_finish
) {
66 std::scoped_lock
locker(m_lock
);
67 ceph_assert(m_on_shutdown_finish
== nullptr);
68 if (m_on_init_finish
!= nullptr) {
69 dout(10) << ": delaying shutdown -- init in progress" << dendl
;
70 m_on_shutdown_finish
= new LambdaContext([this, on_finish
](int r
) {
71 m_on_shutdown_finish
= nullptr;
77 m_on_shutdown_finish
= on_finish
;
83 void InstanceWatcher::handle_notify(uint64_t notify_id
, uint64_t handle
,
84 uint64_t notifier_id
, bufferlist
& bl
) {
91 JSONDecoder::decode_json("dir_path", dir_path
, &jd
.parser
, true);
92 JSONDecoder::decode_json("mode", mode
, &jd
.parser
, true);
93 } catch (const JSONDecoder::err
&e
) {
94 derr
<< ": failed to decode notify json: " << e
.what() << dendl
;
97 dout(20) << ": notifier_id=" << notifier_id
<< ", dir_path=" << dir_path
98 << ", mode=" << mode
<< dendl
;
100 if (mode
== "acquire") {
101 m_listener
.acquire_directory(dir_path
);
102 } else if (mode
== "release") {
103 m_listener
.release_directory(dir_path
);
105 derr
<< ": unknown mode" << dendl
;
109 acknowledge_notify(notify_id
, handle
, outbl
);
112 void InstanceWatcher::handle_rewatch_complete(int r
) {
113 dout(5) << ": r=" << r
<< dendl
;
115 if (r
== -EBLOCKLISTED
) {
116 dout(0) << ": client blocklisted" <<dendl
;
117 std::scoped_lock
locker(m_lock
);
118 m_blocklisted
= true;
119 } else if (r
== -ENOENT
) {
120 derr
<< ": mirroring object deleted" << dendl
;
123 derr
<< ": rewatch error: " << cpp_strerror(r
) << dendl
;
128 void InstanceWatcher::create_instance() {
131 std::scoped_lock
locker(m_lock
);
132 librados::ObjectWriteOperation op
;
135 librados::AioCompletion
*aio_comp
=
136 librados::Rados::aio_create_completion(
137 this, &rados_callback
<InstanceWatcher
, &InstanceWatcher::handle_create_instance
>);
138 int r
= m_ioctx
.aio_operate(m_oid
, aio_comp
, &op
);
143 void InstanceWatcher::handle_create_instance(int r
) {
144 dout(20) << ": r=" << r
<< dendl
;
146 Context
*on_init_finish
= nullptr;
148 std::scoped_lock
locker(m_lock
);
150 std::swap(on_init_finish
, m_on_init_finish
);
154 if (on_init_finish
!= nullptr) {
155 on_init_finish
->complete(r
);
162 void InstanceWatcher::register_watcher() {
165 std::scoped_lock
locker(m_lock
);
166 Context
*on_finish
= new C_CallbackAdapter
<
167 InstanceWatcher
, &InstanceWatcher::handle_register_watcher
>(this);
168 register_watch(on_finish
);
171 void InstanceWatcher::handle_register_watcher(int r
) {
172 dout(20) << ": r=" << r
<< dendl
;
174 Context
*on_init_finish
= nullptr;
176 std::scoped_lock
locker(m_lock
);
178 std::swap(on_init_finish
, m_on_init_finish
);
182 if (on_init_finish
!= nullptr) {
183 on_init_finish
->complete(r
);
190 void InstanceWatcher::unregister_watcher() {
193 std::scoped_lock
locker(m_lock
);
194 Context
*on_finish
= new C_CallbackAdapter
<
195 InstanceWatcher
, &InstanceWatcher::handle_unregister_watcher
>(this);
196 unregister_watch(new C_AsyncCallback
<ContextWQ
>(m_work_queue
, on_finish
));
199 void InstanceWatcher::handle_unregister_watcher(int r
) {
200 dout(20) << ": r=" << r
<< dendl
;
202 Context
*on_shutdown_finish
= nullptr;
204 std::scoped_lock
locker(m_lock
);
206 std::swap(on_shutdown_finish
, m_on_shutdown_finish
);
210 if (on_shutdown_finish
!= nullptr) {
211 on_shutdown_finish
->complete(r
);
218 void InstanceWatcher::remove_instance() {
221 std::scoped_lock
locker(m_lock
);
222 librados::ObjectWriteOperation op
;
225 librados::AioCompletion
*aio_comp
=
226 librados::Rados::aio_create_completion(
227 this, &rados_callback
<InstanceWatcher
, &InstanceWatcher::handle_remove_instance
>);
228 int r
= m_ioctx
.aio_operate(m_oid
, aio_comp
, &op
);
233 void InstanceWatcher::handle_remove_instance(int r
) {
234 dout(20) << ": r=" << r
<< dendl
;
236 Context
*on_init_finish
= nullptr;
237 Context
*on_shutdown_finish
= nullptr;
239 std::scoped_lock
locker(m_lock
);
240 std::swap(on_init_finish
, m_on_init_finish
);
241 std::swap(on_shutdown_finish
, m_on_shutdown_finish
);
244 if (on_init_finish
!= nullptr) {
245 on_init_finish
->complete(r
);
247 if (on_shutdown_finish
!= nullptr) {
248 on_shutdown_finish
->complete(r
);
252 } // namespace mirror
253 } // namespace cephfs