]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/cephfs_mirror/MirrorWatcher.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / tools / cephfs_mirror / MirrorWatcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "common/ceph_context.h"
5 #include "common/ceph_json.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "common/WorkQueue.h"
9 #include "include/stringify.h"
10 #include "msg/Messenger.h"
11 #include "aio_utils.h"
12 #include "MirrorWatcher.h"
13 #include "FSMirror.h"
14 #include "Types.h"
15
16 #define dout_context g_ceph_context
17 #define dout_subsys ceph_subsys_cephfs_mirror
18 #undef dout_prefix
19 #define dout_prefix *_dout << "cephfs::mirror::MirrorWatcher " << __func__
20
21 namespace cephfs {
22 namespace mirror {
23
24 MirrorWatcher::MirrorWatcher(librados::IoCtx &ioctx, FSMirror *fs_mirror,
25 ContextWQ *work_queue)
26 : Watcher(ioctx, CEPHFS_MIRROR_OBJECT, work_queue),
27 m_ioctx(ioctx),
28 m_fs_mirror(fs_mirror),
29 m_work_queue(work_queue),
30 m_lock(ceph::make_mutex("cephfs::mirror::mirror_watcher")),
31 m_instance_id(stringify(m_ioctx.get_instance_id())) {
32 }
33
34 MirrorWatcher::~MirrorWatcher() {
35 }
36
37 void MirrorWatcher::init(Context *on_finish) {
38 dout(20) << dendl;
39
40 {
41 std::scoped_lock locker(m_lock);
42 ceph_assert(m_on_init_finish == nullptr);
43 m_on_init_finish = new LambdaContext([this, on_finish](int r) {
44 on_finish->complete(r);
45 if (m_on_shutdown_finish != nullptr) {
46 m_on_shutdown_finish->complete(0);
47 }
48 });
49 }
50
51 register_watcher();
52 }
53
54 void MirrorWatcher::shutdown(Context *on_finish) {
55 dout(20) << dendl;
56
57 {
58 std::scoped_lock locker(m_lock);
59 ceph_assert(m_on_shutdown_finish == nullptr);
60 if (m_on_init_finish != nullptr) {
61 dout(10) << ": delaying shutdown -- init in progress" << dendl;
62 m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) {
63 m_on_shutdown_finish = nullptr;
64 shutdown(on_finish);
65 });
66 return;
67 }
68
69 m_on_shutdown_finish = on_finish;
70 }
71
72 unregister_watcher();
73 }
74
75 void MirrorWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
76 uint64_t notifier_id, bufferlist& bl) {
77 dout(20) << dendl;
78
79 JSONFormatter f;
80 f.open_object_section("info");
81 encode_json("addr", m_fs_mirror->get_instance_addr(), &f);
82 f.close_section();
83
84 bufferlist outbl;
85 f.flush(outbl);
86 acknowledge_notify(notify_id, handle, outbl);
87 }
88
89 void MirrorWatcher::handle_rewatch_complete(int r) {
90 dout(5) << ": r=" << r << dendl;
91
92 if (r == -EBLOCKLISTED) {
93 dout(0) << ": client blocklisted" <<dendl;
94 std::scoped_lock locker(m_lock);
95 m_blocklisted = true;
96 } else if (r == -ENOENT) {
97 derr << ": mirroring object deleted" << dendl;
98 m_failed = true;
99 } else if (r < 0) {
100 derr << ": rewatch error: " << cpp_strerror(r) << dendl;
101 m_failed = true;
102 }
103 }
104
105 void MirrorWatcher::register_watcher() {
106 dout(20) << dendl;
107
108 std::scoped_lock locker(m_lock);
109 Context *on_finish = new C_CallbackAdapter<
110 MirrorWatcher, &MirrorWatcher::handle_register_watcher>(this);
111 register_watch(on_finish);
112 }
113
114 void MirrorWatcher::handle_register_watcher(int r) {
115 dout(20) << ": r=" << r << dendl;
116
117 Context *on_init_finish = nullptr;
118 {
119 std::scoped_lock locker(m_lock);
120 std::swap(on_init_finish, m_on_init_finish);
121 }
122
123 on_init_finish->complete(r);
124 }
125
126 void MirrorWatcher::unregister_watcher() {
127 dout(20) << dendl;
128
129 std::scoped_lock locker(m_lock);
130 Context *on_finish = new C_CallbackAdapter<
131 MirrorWatcher, &MirrorWatcher::handle_unregister_watcher>(this);
132 unregister_watch(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
133 }
134
135 void MirrorWatcher::handle_unregister_watcher(int r) {
136 dout(20) << ": r=" << r << dendl;
137
138 Context *on_shutdown_finish = nullptr;
139 {
140 std::scoped_lock locker(m_lock);
141 std::swap(on_shutdown_finish, m_on_shutdown_finish);
142 }
143
144 on_shutdown_finish->complete(r);
145 }
146
147 } // namespace mirror
148 } // namespace cephfs