]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/cephfs_mirror/InstanceWatcher.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / tools / cephfs_mirror / InstanceWatcher.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "common/ceph_context.h"
5#include "common/ceph_json.h"
6#include "common/debug.h"
7#include "common/errno.h"
8#include "common/WorkQueue.h"
9#include "cls/cephfs/cls_cephfs_client.h"
10#include "include/stringify.h"
11#include "aio_utils.h"
12#include "InstanceWatcher.h"
13#include "Types.h"
14
15#define dout_context g_ceph_context
16#define dout_subsys ceph_subsys_cephfs_mirror
17#undef dout_prefix
18#define dout_prefix *_dout << "cephfs::mirror::InstanceWatcher " << __func__
19
20effc67
TL
20using namespace std;
21
f67539c2
TL
22namespace cephfs {
23namespace mirror {
24
25namespace {
26
27std::string instance_oid(const std::string &instance_id) {
28 return CEPHFS_MIRROR_OBJECT + "." + instance_id;
29}
30
31} // anonymous namespace
32
33InstanceWatcher::InstanceWatcher(librados::IoCtx &ioctx,
34 Listener &listener, ContextWQ *work_queue)
35 : Watcher(ioctx, instance_oid(stringify(ioctx.get_instance_id())), work_queue),
36 m_ioctx(ioctx),
37 m_listener(listener),
38 m_work_queue(work_queue),
39 m_lock(ceph::make_mutex("cephfs::mirror::instance_watcher")) {
40}
41
42InstanceWatcher::~InstanceWatcher() {
43}
44
45void InstanceWatcher::init(Context *on_finish) {
46 dout(20) << dendl;
47
48 {
49 std::scoped_lock locker(m_lock);
50 ceph_assert(m_on_init_finish == nullptr);
51 m_on_init_finish = new LambdaContext([this, on_finish](int r) {
52 on_finish->complete(r);
53 if (m_on_shutdown_finish != nullptr) {
54 m_on_shutdown_finish->complete(0);
55 }
56 });
57 }
58
59 create_instance();
60}
61
62void InstanceWatcher::shutdown(Context *on_finish) {
63 dout(20) << dendl;
64
65 {
66 std::scoped_lock locker(m_lock);
67 ceph_assert(m_on_shutdown_finish == nullptr);
68 if (m_on_init_finish != nullptr) {
69 dout(10) << ": delaying shutdown -- init in progress" << dendl;
70 m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) {
71 m_on_shutdown_finish = nullptr;
72 shutdown(on_finish);
73 });
74 return;
75 }
76
77 m_on_shutdown_finish = on_finish;
78 }
79
80 unregister_watcher();
81}
82
83void InstanceWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
84 uint64_t notifier_id, bufferlist& bl) {
85 dout(20) << dendl;
86
87 std::string dir_path;
88 std::string mode;
89 try {
90 JSONDecoder jd(bl);
91 JSONDecoder::decode_json("dir_path", dir_path, &jd.parser, true);
92 JSONDecoder::decode_json("mode", mode, &jd.parser, true);
93 } catch (const JSONDecoder::err &e) {
94 derr << ": failed to decode notify json: " << e.what() << dendl;
95 }
96
97 dout(20) << ": notifier_id=" << notifier_id << ", dir_path=" << dir_path
98 << ", mode=" << mode << dendl;
99
100 if (mode == "acquire") {
101 m_listener.acquire_directory(dir_path);
102 } else if (mode == "release") {
103 m_listener.release_directory(dir_path);
104 } else {
105 derr << ": unknown mode" << dendl;
106 }
107
108 bufferlist outbl;
109 acknowledge_notify(notify_id, handle, outbl);
110}
111
112void InstanceWatcher::handle_rewatch_complete(int r) {
113 dout(5) << ": r=" << r << dendl;
114
115 if (r == -EBLOCKLISTED) {
116 dout(0) << ": client blocklisted" <<dendl;
117 std::scoped_lock locker(m_lock);
118 m_blocklisted = true;
119 } else if (r == -ENOENT) {
120 derr << ": mirroring object deleted" << dendl;
121 m_failed = true;
122 } else if (r < 0) {
123 derr << ": rewatch error: " << cpp_strerror(r) << dendl;
124 m_failed = true;
125 }
126}
127
128void InstanceWatcher::create_instance() {
129 dout(20) << dendl;
130
131 std::scoped_lock locker(m_lock);
132 librados::ObjectWriteOperation op;
133 op.create(false);
134
135 librados::AioCompletion *aio_comp =
136 librados::Rados::aio_create_completion(
137 this, &rados_callback<InstanceWatcher, &InstanceWatcher::handle_create_instance>);
138 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
139 ceph_assert(r == 0);
140 aio_comp->release();
141}
142
143void InstanceWatcher::handle_create_instance(int r) {
144 dout(20) << ": r=" << r << dendl;
145
146 Context *on_init_finish = nullptr;
147 {
148 std::scoped_lock locker(m_lock);
149 if (r < 0) {
150 std::swap(on_init_finish, m_on_init_finish);
151 }
152 }
153
154 if (on_init_finish != nullptr) {
155 on_init_finish->complete(r);
156 return;
157 }
158
159 register_watcher();
160}
161
162void InstanceWatcher::register_watcher() {
163 dout(20) << dendl;
164
165 std::scoped_lock locker(m_lock);
166 Context *on_finish = new C_CallbackAdapter<
167 InstanceWatcher, &InstanceWatcher::handle_register_watcher>(this);
168 register_watch(on_finish);
169}
170
171void InstanceWatcher::handle_register_watcher(int r) {
172 dout(20) << ": r=" << r << dendl;
173
174 Context *on_init_finish = nullptr;
175 {
176 std::scoped_lock locker(m_lock);
177 if (r == 0) {
178 std::swap(on_init_finish, m_on_init_finish);
179 }
180 }
181
182 if (on_init_finish != nullptr) {
183 on_init_finish->complete(r);
184 return;
185 }
186
187 remove_instance();
188}
189
190void InstanceWatcher::unregister_watcher() {
191 dout(20) << dendl;
192
193 std::scoped_lock locker(m_lock);
194 Context *on_finish = new C_CallbackAdapter<
195 InstanceWatcher, &InstanceWatcher::handle_unregister_watcher>(this);
196 unregister_watch(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
197}
198
199void InstanceWatcher::handle_unregister_watcher(int r) {
200 dout(20) << ": r=" << r << dendl;
201
202 Context *on_shutdown_finish = nullptr;
203 {
204 std::scoped_lock locker(m_lock);
205 if (r < 0) {
206 std::swap(on_shutdown_finish, m_on_shutdown_finish);
207 }
208 }
209
210 if (on_shutdown_finish != nullptr) {
211 on_shutdown_finish->complete(r);
212 return;
213 }
214
215 remove_instance();
216}
217
218void InstanceWatcher::remove_instance() {
219 dout(20) << dendl;
220
221 std::scoped_lock locker(m_lock);
222 librados::ObjectWriteOperation op;
223 op.remove();
224
225 librados::AioCompletion *aio_comp =
226 librados::Rados::aio_create_completion(
227 this, &rados_callback<InstanceWatcher, &InstanceWatcher::handle_remove_instance>);
228 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
229 ceph_assert(r == 0);
230 aio_comp->release();
231}
232
233void InstanceWatcher::handle_remove_instance(int r) {
234 dout(20) << ": r=" << r << dendl;
235
236 Context *on_init_finish = nullptr;
237 Context *on_shutdown_finish = nullptr;
238 {
239 std::scoped_lock locker(m_lock);
240 std::swap(on_init_finish, m_on_init_finish);
241 std::swap(on_shutdown_finish, m_on_shutdown_finish);
242 }
243
244 if (on_init_finish != nullptr) {
245 on_init_finish->complete(r);
246 }
247 if (on_shutdown_finish != nullptr) {
248 on_shutdown_finish->complete(r);
249 }
250}
251
252} // namespace mirror
253} // namespace cephfs