1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "common/ceph_context.h"
5 #include "common/ceph_json.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "common/WorkQueue.h"
9 #include "include/stringify.h"
10 #include "msg/Messenger.h"
11 #include "aio_utils.h"
12 #include "MirrorWatcher.h"
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_cephfs_mirror
19 #define dout_prefix *_dout << "cephfs::mirror::MirrorWatcher " << __func__
24 MirrorWatcher::MirrorWatcher(librados::IoCtx
&ioctx
, FSMirror
*fs_mirror
,
25 ContextWQ
*work_queue
)
26 : Watcher(ioctx
, CEPHFS_MIRROR_OBJECT
, work_queue
),
28 m_fs_mirror(fs_mirror
),
29 m_work_queue(work_queue
),
30 m_lock(ceph::make_mutex("cephfs::mirror::mirror_watcher")),
31 m_instance_id(stringify(m_ioctx
.get_instance_id())) {
34 MirrorWatcher::~MirrorWatcher() {
37 void MirrorWatcher::init(Context
*on_finish
) {
41 std::scoped_lock
locker(m_lock
);
42 ceph_assert(m_on_init_finish
== nullptr);
43 m_on_init_finish
= new LambdaContext([this, on_finish
](int r
) {
44 on_finish
->complete(r
);
45 if (m_on_shutdown_finish
!= nullptr) {
46 m_on_shutdown_finish
->complete(0);
54 void MirrorWatcher::shutdown(Context
*on_finish
) {
58 std::scoped_lock
locker(m_lock
);
59 ceph_assert(m_on_shutdown_finish
== nullptr);
60 if (m_on_init_finish
!= nullptr) {
61 dout(10) << ": delaying shutdown -- init in progress" << dendl
;
62 m_on_shutdown_finish
= new LambdaContext([this, on_finish
](int r
) {
63 m_on_shutdown_finish
= nullptr;
69 m_on_shutdown_finish
= on_finish
;
75 void MirrorWatcher::handle_notify(uint64_t notify_id
, uint64_t handle
,
76 uint64_t notifier_id
, bufferlist
& bl
) {
80 f
.open_object_section("info");
81 encode_json("addr", m_fs_mirror
->get_instance_addr(), &f
);
86 acknowledge_notify(notify_id
, handle
, outbl
);
89 void MirrorWatcher::handle_rewatch_complete(int r
) {
90 dout(5) << ": r=" << r
<< dendl
;
92 if (r
== -EBLOCKLISTED
) {
93 dout(0) << ": client blocklisted" <<dendl
;
94 std::scoped_lock
locker(m_lock
);
96 } else if (r
== -ENOENT
) {
97 derr
<< ": mirroring object deleted" << dendl
;
100 derr
<< ": rewatch error: " << cpp_strerror(r
) << dendl
;
105 void MirrorWatcher::register_watcher() {
108 std::scoped_lock
locker(m_lock
);
109 Context
*on_finish
= new C_CallbackAdapter
<
110 MirrorWatcher
, &MirrorWatcher::handle_register_watcher
>(this);
111 register_watch(on_finish
);
114 void MirrorWatcher::handle_register_watcher(int r
) {
115 dout(20) << ": r=" << r
<< dendl
;
117 Context
*on_init_finish
= nullptr;
119 std::scoped_lock
locker(m_lock
);
120 std::swap(on_init_finish
, m_on_init_finish
);
123 on_init_finish
->complete(r
);
126 void MirrorWatcher::unregister_watcher() {
129 std::scoped_lock
locker(m_lock
);
130 Context
*on_finish
= new C_CallbackAdapter
<
131 MirrorWatcher
, &MirrorWatcher::handle_unregister_watcher
>(this);
132 unregister_watch(new C_AsyncCallback
<ContextWQ
>(m_work_queue
, on_finish
));
135 void MirrorWatcher::handle_unregister_watcher(int r
) {
136 dout(20) << ": r=" << r
<< dendl
;
138 Context
*on_shutdown_finish
= nullptr;
140 std::scoped_lock
locker(m_lock
);
141 std::swap(on_shutdown_finish
, m_on_shutdown_finish
);
144 on_shutdown_finish
->complete(r
);
147 } // namespace mirror
148 } // namespace cephfs