]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #include "services/svc_zone.h" | |
11fdf7f2 TL |
5 | #include "rgw_common.h" |
6 | #include "rgw_coroutine.h" | |
7 | #include "rgw_sync_module.h" | |
8 | #include "rgw_data_sync.h" | |
9 | #include "rgw_sync_module_pubsub.h" | |
10 | #include "rgw_sync_module_pubsub_rest.h" | |
11 | #include "rgw_rest_conn.h" | |
12 | #include "rgw_cr_rados.h" | |
13 | #include "rgw_cr_rest.h" | |
14 | #include "rgw_cr_tools.h" | |
15 | #include "rgw_op.h" | |
16 | #include "rgw_pubsub.h" | |
17 | #include "rgw_pubsub_push.h" | |
eafe8130 | 18 | #include "rgw_notify_event_type.h" |
11fdf7f2 TL |
19 | #include "rgw_perf_counters.h" |
20 | ||
eafe8130 | 21 | #include <boost/algorithm/hex.hpp> |
11fdf7f2 TL |
22 | #include <boost/asio/yield.hpp> |
23 | ||
24 | #define dout_subsys ceph_subsys_rgw | |
25 | ||
26 | ||
27 | #define PUBSUB_EVENTS_RETENTION_DEFAULT 7 | |
28 | ||
29 | /* | |
30 | ||
31 | config: | |
32 | ||
33 | { | |
34 | "tenant": <tenant>, # default: <empty> | |
35 | "uid": <uid>, # default: "pubsub" | |
36 | "data_bucket_prefix": <prefix> # default: "pubsub-" | |
37 | "data_oid_prefix": <prefix> # | |
eafe8130 TL |
38 | "events_retention_days": <int> # default: 7 |
39 | "start_with_full_sync" <bool> # default: false | |
11fdf7f2 TL |
40 | } |
41 | ||
42 | */ | |
43 | ||
44 | // utility function to convert the args list from string format | |
45 | // (ampresend separated with equal sign) to prased structure | |
46 | RGWHTTPArgs string_to_args(const std::string& str_args) { | |
47 | RGWHTTPArgs args; | |
48 | args.set(str_args); | |
49 | args.parse(); | |
50 | return args; | |
51 | } | |
52 | ||
eafe8130 TL |
53 | struct PSSubConfig { |
54 | std::string name; | |
55 | std::string topic; | |
56 | std::string push_endpoint_name; | |
57 | std::string push_endpoint_args; | |
58 | std::string data_bucket_name; | |
59 | std::string data_oid_prefix; | |
60 | std::string s3_id; | |
61 | std::string arn_topic; | |
11fdf7f2 TL |
62 | RGWPubSubEndpoint::Ptr push_endpoint; |
63 | ||
11fdf7f2 TL |
64 | void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc) { |
65 | name = uc.name; | |
66 | topic = uc.topic; | |
67 | push_endpoint_name = uc.dest.push_endpoint; | |
68 | data_bucket_name = uc.dest.bucket_name; | |
69 | data_oid_prefix = uc.dest.oid_prefix; | |
eafe8130 TL |
70 | s3_id = uc.s3_id; |
71 | arn_topic = uc.dest.arn_topic; | |
72 | if (!push_endpoint_name.empty()) { | |
11fdf7f2 TL |
73 | push_endpoint_args = uc.dest.push_endpoint_args; |
74 | try { | |
eafe8130 TL |
75 | push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct); |
76 | ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl; | |
11fdf7f2 | 77 | } catch (const RGWPubSubEndpoint::configuration_error& e) { |
eafe8130 | 78 | ldout(cct, 1) << "ERROR: failed to create push endpoint: " |
11fdf7f2 TL |
79 | << push_endpoint_name << " due to: " << e.what() << dendl; |
80 | } | |
81 | } | |
82 | } | |
83 | ||
84 | void dump(Formatter *f) const { | |
85 | encode_json("name", name, f); | |
86 | encode_json("topic", topic, f); | |
87 | encode_json("push_endpoint", push_endpoint_name, f); | |
eafe8130 | 88 | encode_json("push_endpoint_args", push_endpoint_args, f); |
11fdf7f2 TL |
89 | encode_json("data_bucket_name", data_bucket_name, f); |
90 | encode_json("data_oid_prefix", data_oid_prefix, f); | |
eafe8130 | 91 | encode_json("s3_id", s3_id, f); |
11fdf7f2 TL |
92 | } |
93 | ||
11fdf7f2 TL |
94 | }; |
95 | ||
96 | using PSSubConfigRef = std::shared_ptr<PSSubConfig>; | |
97 | ||
98 | struct PSTopicConfig { | |
eafe8130 TL |
99 | std::string name; |
100 | std::set<std::string> subs; | |
9f95a23c | 101 | std::string opaque_data; |
11fdf7f2 TL |
102 | |
103 | void dump(Formatter *f) const { | |
104 | encode_json("name", name, f); | |
105 | encode_json("subs", subs, f); | |
9f95a23c | 106 | encode_json("opaque", opaque_data, f); |
11fdf7f2 TL |
107 | } |
108 | }; | |
109 | ||
110 | struct PSNotificationConfig { | |
111 | uint64_t id{0}; | |
112 | string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */ | |
113 | string topic; | |
114 | bool is_prefix{false}; | |
115 | ||
116 | ||
117 | void dump(Formatter *f) const { | |
118 | encode_json("id", id, f); | |
119 | encode_json("path", path, f); | |
120 | encode_json("topic", topic, f); | |
121 | encode_json("is_prefix", is_prefix, f); | |
122 | } | |
123 | ||
124 | void init(CephContext *cct, const JSONFormattable& config) { | |
125 | path = config["path"]; | |
126 | if (!path.empty() && path[path.size() - 1] == '*') { | |
127 | path = path.substr(0, path.size() - 1); | |
128 | is_prefix = true; | |
129 | } | |
130 | topic = config["topic"]; | |
131 | } | |
132 | }; | |
133 | ||
134 | template<class T> | |
135 | static string json_str(const char *name, const T& obj, bool pretty = false) | |
136 | { | |
137 | stringstream ss; | |
138 | JSONFormatter f(pretty); | |
139 | ||
140 | encode_json(name, obj, &f); | |
141 | f.flush(ss); | |
142 | ||
143 | return ss.str(); | |
144 | } | |
145 | ||
146 | using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>; | |
eafe8130 | 147 | using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>; |
11fdf7f2 | 148 | |
f67539c2 | 149 | // global pubsub configuration |
11fdf7f2 | 150 | struct PSConfig { |
9f95a23c | 151 | const std::string id{"pubsub"}; |
11fdf7f2 | 152 | rgw_user user; |
9f95a23c TL |
153 | std::string data_bucket_prefix; |
154 | std::string data_oid_prefix; | |
11fdf7f2 | 155 | int events_retention_days{0}; |
11fdf7f2 | 156 | uint64_t sync_instance{0}; |
eafe8130 | 157 | bool start_with_full_sync{false}; |
11fdf7f2 TL |
158 | |
159 | void dump(Formatter *f) const { | |
160 | encode_json("id", id, f); | |
161 | encode_json("user", user, f); | |
162 | encode_json("data_bucket_prefix", data_bucket_prefix, f); | |
163 | encode_json("data_oid_prefix", data_oid_prefix, f); | |
164 | encode_json("events_retention_days", events_retention_days, f); | |
165 | encode_json("sync_instance", sync_instance, f); | |
eafe8130 | 166 | encode_json("start_with_full_sync", start_with_full_sync, f); |
11fdf7f2 TL |
167 | } |
168 | ||
169 | void init(CephContext *cct, const JSONFormattable& config) { | |
170 | string uid = config["uid"]("pubsub"); | |
171 | user = rgw_user(config["tenant"], uid); | |
172 | data_bucket_prefix = config["data_bucket_prefix"]("pubsub-"); | |
173 | data_oid_prefix = config["data_oid_prefix"]; | |
174 | events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT); | |
eafe8130 | 175 | start_with_full_sync = config["start_with_full_sync"](false); |
11fdf7f2 | 176 | |
f67539c2 | 177 | ldout(cct, 20) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl; |
11fdf7f2 TL |
178 | } |
179 | ||
180 | void init_instance(const RGWRealm& realm, uint64_t instance_id) { | |
181 | sync_instance = instance_id; | |
182 | } | |
11fdf7f2 TL |
183 | }; |
184 | ||
11fdf7f2 | 185 | using PSConfigRef = std::shared_ptr<PSConfig>; |
eafe8130 TL |
186 | template<typename EventType> |
187 | using EventRef = std::shared_ptr<EventType>; | |
11fdf7f2 TL |
188 | |
189 | struct objstore_event { | |
190 | string id; | |
191 | const rgw_bucket& bucket; | |
192 | const rgw_obj_key& key; | |
193 | const ceph::real_time& mtime; | |
194 | const std::vector<std::pair<std::string, std::string> > *attrs; | |
195 | ||
196 | objstore_event(const rgw_bucket& _bucket, | |
197 | const rgw_obj_key& _key, | |
198 | const ceph::real_time& _mtime, | |
199 | const std::vector<std::pair<std::string, std::string> > *_attrs) : bucket(_bucket), | |
200 | key(_key), | |
201 | mtime(_mtime), | |
202 | attrs(_attrs) {} | |
203 | ||
204 | string get_hash() { | |
205 | string etag; | |
206 | RGWMD5Etag hash; | |
207 | hash.update(bucket.bucket_id); | |
208 | hash.update(key.name); | |
209 | hash.update(key.instance); | |
210 | hash.finish(&etag); | |
211 | ||
212 | assert(etag.size() > 8); | |
213 | ||
214 | return etag.substr(0, 8); | |
215 | } | |
216 | ||
217 | void dump(Formatter *f) const { | |
218 | { | |
219 | Formatter::ObjectSection s(*f, "bucket"); | |
220 | encode_json("name", bucket.name, f); | |
221 | encode_json("tenant", bucket.tenant, f); | |
222 | encode_json("bucket_id", bucket.bucket_id, f); | |
223 | } | |
224 | { | |
225 | Formatter::ObjectSection s(*f, "key"); | |
226 | encode_json("name", key.name, f); | |
227 | encode_json("instance", key.instance, f); | |
228 | } | |
229 | utime_t mt(mtime); | |
230 | encode_json("mtime", mt, f); | |
231 | Formatter::ObjectSection s(*f, "attrs"); | |
232 | if (attrs) { | |
233 | for (auto& attr : *attrs) { | |
234 | encode_json(attr.first.c_str(), attr.second.c_str(), f); | |
235 | } | |
236 | } | |
237 | } | |
238 | }; | |
239 | ||
240 | static void make_event_ref(CephContext *cct, const rgw_bucket& bucket, | |
241 | const rgw_obj_key& key, | |
242 | const ceph::real_time& mtime, | |
243 | const std::vector<std::pair<std::string, std::string> > *attrs, | |
eafe8130 TL |
244 | rgw::notify::EventType event_type, |
245 | EventRef<rgw_pubsub_event> *event) { | |
11fdf7f2 TL |
246 | *event = std::make_shared<rgw_pubsub_event>(); |
247 | ||
eafe8130 TL |
248 | EventRef<rgw_pubsub_event>& e = *event; |
249 | e->event_name = rgw::notify::to_ceph_string(event_type); | |
11fdf7f2 TL |
250 | e->source = bucket.name + "/" + key.name; |
251 | e->timestamp = real_clock::now(); | |
252 | ||
253 | objstore_event oevent(bucket, key, mtime, attrs); | |
254 | ||
eafe8130 TL |
255 | const utime_t ts(e->timestamp); |
256 | set_event_id(e->id, oevent.get_hash(), ts); | |
11fdf7f2 TL |
257 | |
258 | encode_json("info", oevent, &e->info); | |
259 | } | |
260 | ||
f67539c2 | 261 | static void make_s3_event_ref(CephContext *cct, const rgw_bucket& bucket, |
eafe8130 TL |
262 | const rgw_user& owner, |
263 | const rgw_obj_key& key, | |
264 | const ceph::real_time& mtime, | |
f67539c2 | 265 | const std::vector<std::pair<std::string, std::string>>* attrs, |
eafe8130 | 266 | rgw::notify::EventType event_type, |
f67539c2 TL |
267 | EventRef<rgw_pubsub_s3_event>* event) { |
268 | *event = std::make_shared<rgw_pubsub_s3_event>(); | |
eafe8130 | 269 | |
f67539c2 TL |
270 | EventRef<rgw_pubsub_s3_event>& e = *event; |
271 | e->eventTime = mtime; | |
272 | e->eventName = rgw::notify::to_string(event_type); | |
92f5a8d4 TL |
273 | // userIdentity: not supported in sync module |
274 | // x_amz_request_id: not supported in sync module | |
275 | // x_amz_id_2: not supported in sync module | |
eafe8130 | 276 | // configurationId is filled from subscription configuration |
f67539c2 TL |
277 | e->bucket_name = bucket.name; |
278 | e->bucket_ownerIdentity = owner.to_str(); | |
279 | e->bucket_arn = to_string(rgw::ARN(bucket)); | |
280 | e->bucket_id = bucket.bucket_id; // rgw extension | |
281 | e->object_key = key.name; | |
92f5a8d4 | 282 | // object_size not supported in sync module |
eafe8130 | 283 | objstore_event oevent(bucket, key, mtime, attrs); |
f67539c2 TL |
284 | e->object_etag = oevent.get_hash(); |
285 | e->object_versionId = key.instance; | |
eafe8130 TL |
286 | |
287 | // use timestamp as per key sequence id (hex encoded) | |
288 | const utime_t ts(real_clock::now()); | |
289 | boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), | |
f67539c2 | 290 | std::back_inserter(e->object_sequencer)); |
eafe8130 | 291 | |
f67539c2 | 292 | set_event_id(e->id, e->object_etag, ts); |
eafe8130 TL |
293 | } |
294 | ||
11fdf7f2 TL |
295 | class PSManager; |
296 | using PSManagerRef = std::shared_ptr<PSManager>; | |
297 | ||
298 | struct PSEnv { | |
299 | PSConfigRef conf; | |
300 | shared_ptr<RGWUserInfo> data_user_info; | |
301 | PSManagerRef manager; | |
302 | ||
303 | PSEnv() : conf(make_shared<PSConfig>()), | |
304 | data_user_info(make_shared<RGWUserInfo>()) {} | |
305 | ||
306 | void init(CephContext *cct, const JSONFormattable& config) { | |
307 | conf->init(cct, config); | |
308 | } | |
309 | ||
310 | void init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr); | |
311 | }; | |
312 | ||
313 | using PSEnvRef = std::shared_ptr<PSEnv>; | |
314 | ||
eafe8130 | 315 | template<typename EventType> |
11fdf7f2 | 316 | class PSEvent { |
eafe8130 | 317 | const EventRef<EventType> event; |
11fdf7f2 TL |
318 | |
319 | public: | |
eafe8130 | 320 | PSEvent(const EventRef<EventType>& _event) : event(_event) {} |
11fdf7f2 TL |
321 | |
322 | void format(bufferlist *bl) const { | |
92f5a8d4 | 323 | bl->append(json_str("", *event)); |
11fdf7f2 TL |
324 | } |
325 | ||
326 | void encode_event(bufferlist& bl) const { | |
327 | encode(*event, bl); | |
328 | } | |
329 | ||
330 | const string& id() const { | |
331 | return event->id; | |
332 | } | |
333 | }; | |
334 | ||
335 | template <class T> | |
336 | class RGWSingletonCR : public RGWCoroutine { | |
337 | friend class WrapperCR; | |
338 | ||
339 | boost::asio::coroutine wrapper_state; | |
340 | bool started{false}; | |
341 | int operate_ret{0}; | |
342 | ||
343 | struct WaiterInfo { | |
344 | RGWCoroutine *cr{nullptr}; | |
345 | T *result; | |
346 | }; | |
347 | using WaiterInfoRef = std::shared_ptr<WaiterInfo>; | |
348 | ||
349 | deque<WaiterInfoRef> waiters; | |
350 | ||
351 | void add_waiter(RGWCoroutine *cr, T *result) { | |
352 | auto waiter = std::make_shared<WaiterInfo>(); | |
353 | waiter->cr = cr; | |
354 | waiter->result = result; | |
355 | waiters.push_back(waiter); | |
356 | }; | |
357 | ||
358 | bool get_next_waiter(WaiterInfoRef *waiter) { | |
359 | if (waiters.empty()) { | |
360 | waiter->reset(); | |
361 | return false; | |
362 | } | |
363 | ||
364 | *waiter = waiters.front(); | |
365 | waiters.pop_front(); | |
366 | return true; | |
367 | } | |
368 | ||
369 | int operate_wrapper() override { | |
370 | reenter(&wrapper_state) { | |
371 | while (!is_done()) { | |
372 | ldout(cct, 20) << __func__ << "(): operate_wrapper() -> operate()" << dendl; | |
373 | operate_ret = operate(); | |
374 | if (operate_ret < 0) { | |
375 | ldout(cct, 20) << *this << ": operate() returned r=" << operate_ret << dendl; | |
376 | } | |
377 | if (!is_done()) { | |
378 | yield; | |
379 | } | |
380 | } | |
381 | ||
382 | ldout(cct, 20) << __func__ << "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters.size() << " waiters" << dendl; | |
383 | /* we're done, can't yield anymore */ | |
384 | ||
385 | WaiterInfoRef waiter; | |
386 | while (get_next_waiter(&waiter)) { | |
387 | ldout(cct, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl; | |
388 | waiter->cr->set_retcode(retcode); | |
389 | waiter->cr->set_sleeping(false); | |
390 | return_result(waiter->result); | |
391 | put(); | |
392 | } | |
393 | ||
394 | return retcode; | |
395 | } | |
396 | return 0; | |
397 | } | |
398 | ||
399 | virtual void return_result(T *result) {} | |
400 | ||
401 | public: | |
402 | RGWSingletonCR(CephContext *_cct) | |
403 | : RGWCoroutine(_cct) {} | |
404 | ||
405 | int execute(RGWCoroutine *caller, T *result = nullptr) { | |
406 | if (!started) { | |
407 | ldout(cct, 20) << __func__ << "(): singleton not started, starting" << dendl; | |
408 | started = true; | |
409 | caller->call(this); | |
410 | return 0; | |
411 | } else if (!is_done()) { | |
412 | ldout(cct, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl; | |
413 | get(); | |
414 | add_waiter(caller, result); | |
415 | caller->set_sleeping(true); | |
416 | return 0; | |
417 | } | |
418 | ||
419 | ldout(cct, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl; | |
420 | caller->set_retcode(retcode); | |
421 | return_result(result); | |
422 | return retcode; | |
423 | } | |
424 | }; | |
425 | ||
426 | ||
427 | class PSSubscription; | |
428 | using PSSubscriptionRef = std::shared_ptr<PSSubscription>; | |
429 | ||
430 | class PSSubscription { | |
431 | class InitCR; | |
432 | friend class InitCR; | |
433 | friend class RGWPSHandleObjEventCR; | |
434 | ||
9f95a23c | 435 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
436 | RGWDataSyncEnv *sync_env; |
437 | PSEnvRef env; | |
438 | PSSubConfigRef sub_conf; | |
eafe8130 | 439 | std::shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result; |
11fdf7f2 TL |
440 | RGWBucketInfo *bucket_info{nullptr}; |
441 | RGWDataAccessRef data_access; | |
442 | RGWDataAccess::BucketRef bucket; | |
443 | ||
11fdf7f2 TL |
444 | InitCR *init_cr{nullptr}; |
445 | ||
446 | class InitBucketLifecycleCR : public RGWCoroutine { | |
9f95a23c | 447 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
448 | RGWDataSyncEnv *sync_env; |
449 | PSConfigRef& conf; | |
450 | LCRule rule; | |
451 | ||
452 | int retention_days; | |
453 | ||
454 | rgw_bucket_lifecycle_config_params lc_config; | |
455 | ||
456 | public: | |
9f95a23c | 457 | InitBucketLifecycleCR(RGWDataSyncCtx *_sc, |
11fdf7f2 TL |
458 | PSConfigRef& _conf, |
459 | RGWBucketInfo& _bucket_info, | |
9f95a23c TL |
460 | std::map<string, bufferlist>& _bucket_attrs) : RGWCoroutine(_sc->cct), |
461 | sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
462 | conf(_conf) { |
463 | lc_config.bucket_info = _bucket_info; | |
464 | lc_config.bucket_attrs = _bucket_attrs; | |
465 | retention_days = conf->events_retention_days; | |
466 | } | |
467 | ||
468 | int operate() override { | |
469 | reenter(this) { | |
470 | ||
471 | rule.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days); | |
472 | ||
473 | { | |
474 | /* maybe we already have it configured? */ | |
475 | RGWLifecycleConfiguration old_config; | |
476 | auto aiter = lc_config.bucket_attrs.find(RGW_ATTR_LC); | |
477 | if (aiter != lc_config.bucket_attrs.end()) { | |
478 | bufferlist::const_iterator iter{&aiter->second}; | |
479 | try { | |
480 | old_config.decode(iter); | |
481 | } catch (const buffer::error& e) { | |
9f95a23c | 482 | ldpp_dout(sync_env->dpp, 0) << __func__ << "(): decode life cycle config failed" << dendl; |
11fdf7f2 TL |
483 | } |
484 | } | |
485 | ||
486 | auto old_rules = old_config.get_rule_map(); | |
487 | for (auto ori : old_rules) { | |
488 | auto& old_rule = ori.second; | |
489 | ||
490 | if (old_rule.get_prefix().empty() && | |
491 | old_rule.get_expiration().get_days() == retention_days && | |
492 | old_rule.is_enabled()) { | |
9f95a23c | 493 | ldpp_dout(sync_env->dpp, 20) << "no need to set lifecycle rule on bucket, existing rule matches config" << dendl; |
11fdf7f2 TL |
494 | return set_cr_done(); |
495 | } | |
496 | } | |
497 | } | |
498 | ||
499 | lc_config.config.add_rule(rule); | |
500 | yield call(new RGWBucketLifecycleConfigCR(sync_env->async_rados, | |
501 | sync_env->store, | |
9f95a23c TL |
502 | lc_config, |
503 | sync_env->dpp)); | |
11fdf7f2 | 504 | if (retcode < 0) { |
9f95a23c | 505 | ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl; |
11fdf7f2 TL |
506 | return set_cr_error(retcode); |
507 | } | |
508 | ||
509 | return set_cr_done(); | |
510 | } | |
511 | return 0; | |
512 | } | |
513 | }; | |
514 | ||
515 | class InitCR : public RGWSingletonCR<bool> { | |
9f95a23c | 516 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
517 | RGWDataSyncEnv *sync_env; |
518 | PSSubscriptionRef sub; | |
519 | rgw_get_bucket_info_params get_bucket_info; | |
520 | rgw_bucket_create_local_params create_bucket; | |
521 | PSConfigRef& conf; | |
522 | PSSubConfigRef& sub_conf; | |
523 | int i; | |
524 | ||
525 | public: | |
9f95a23c TL |
526 | InitCR(RGWDataSyncCtx *_sc, |
527 | PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sc->cct), | |
528 | sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
529 | sub(_sub), conf(sub->env->conf), |
530 | sub_conf(sub->sub_conf) { | |
531 | } | |
532 | ||
533 | int operate() override { | |
534 | reenter(this) { | |
535 | get_bucket_info.tenant = conf->user.tenant; | |
536 | get_bucket_info.bucket_name = sub_conf->data_bucket_name; | |
537 | sub->get_bucket_info_result = make_shared<rgw_get_bucket_info_result>(); | |
538 | ||
539 | for (i = 0; i < 2; ++i) { | |
540 | yield call(new RGWGetBucketInfoCR(sync_env->async_rados, | |
541 | sync_env->store, | |
542 | get_bucket_info, | |
543 | sub->get_bucket_info_result)); | |
544 | if (retcode < 0 && retcode != -ENOENT) { | |
9f95a23c | 545 | ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to geting bucket info: " << "tenant=" |
11fdf7f2 TL |
546 | << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl; |
547 | } | |
548 | if (retcode == 0) { | |
549 | { | |
550 | auto& result = sub->get_bucket_info_result; | |
551 | sub->bucket_info = &result->bucket_info; | |
552 | ||
553 | int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket); | |
554 | if (ret < 0) { | |
9f95a23c | 555 | ldpp_dout(sync_env->dpp, 1) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl; |
11fdf7f2 TL |
556 | return set_cr_error(ret); |
557 | } | |
558 | } | |
559 | ||
9f95a23c | 560 | yield call(new InitBucketLifecycleCR(sc, conf, |
11fdf7f2 TL |
561 | sub->get_bucket_info_result->bucket_info, |
562 | sub->get_bucket_info_result->attrs)); | |
563 | if (retcode < 0) { | |
9f95a23c | 564 | ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl; |
11fdf7f2 TL |
565 | return set_cr_error(retcode); |
566 | } | |
567 | ||
568 | return set_cr_done(); | |
569 | } | |
570 | ||
571 | create_bucket.user_info = sub->env->data_user_info; | |
572 | create_bucket.bucket_name = sub_conf->data_bucket_name; | |
9f95a23c | 573 | ldpp_dout(sync_env->dpp, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl; |
11fdf7f2 TL |
574 | yield call(new RGWBucketCreateLocalCR(sync_env->async_rados, |
575 | sync_env->store, | |
9f95a23c TL |
576 | create_bucket, |
577 | sync_env->dpp)); | |
11fdf7f2 | 578 | if (retcode < 0) { |
9f95a23c | 579 | ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create bucket: " << "tenant=" |
11fdf7f2 TL |
580 | << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl; |
581 | return set_cr_error(retcode); | |
582 | } | |
583 | ||
584 | /* second iteration: we got -ENOENT and created a bucket */ | |
585 | } | |
586 | ||
587 | /* failed twice on -ENOENT, unexpected */ | |
9f95a23c | 588 | ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant |
11fdf7f2 TL |
589 | << " name=" << get_bucket_info.bucket_name << dendl; |
590 | return set_cr_error(-EIO); | |
591 | } | |
592 | return 0; | |
593 | } | |
594 | }; | |
595 | ||
eafe8130 | 596 | template<typename EventType> |
11fdf7f2 | 597 | class StoreEventCR : public RGWCoroutine { |
9f95a23c | 598 | RGWDataSyncCtx* const sc; |
11fdf7f2 TL |
599 | RGWDataSyncEnv* const sync_env; |
600 | const PSSubscriptionRef sub; | |
eafe8130 | 601 | const PSEvent<EventType> pse; |
11fdf7f2 TL |
602 | const string oid_prefix; |
603 | ||
604 | public: | |
9f95a23c | 605 | StoreEventCR(RGWDataSyncCtx* const _sc, |
11fdf7f2 | 606 | const PSSubscriptionRef& _sub, |
9f95a23c TL |
607 | const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct), |
608 | sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
609 | sub(_sub), |
610 | pse(_event), | |
611 | oid_prefix(sub->sub_conf->data_oid_prefix) { | |
612 | } | |
613 | ||
614 | int operate() override { | |
11fdf7f2 TL |
615 | rgw_object_simple_put_params put_obj; |
616 | reenter(this) { | |
617 | ||
618 | put_obj.bucket = sub->bucket; | |
619 | put_obj.key = rgw_obj_key(oid_prefix + pse.id()); | |
620 | ||
621 | pse.format(&put_obj.data); | |
622 | ||
623 | { | |
624 | bufferlist bl; | |
625 | pse.encode_event(bl); | |
626 | bufferlist bl64; | |
627 | bl.encode_base64(bl64); | |
628 | put_obj.user_data = bl64.to_str(); | |
629 | } | |
630 | ||
631 | yield call(new RGWObjectSimplePutCR(sync_env->async_rados, | |
632 | sync_env->store, | |
9f95a23c TL |
633 | put_obj, |
634 | sync_env->dpp)); | |
11fdf7f2 | 635 | if (retcode < 0) { |
eafe8130 | 636 | ldpp_dout(sync_env->dpp, 10) << "failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl; |
11fdf7f2 TL |
637 | return set_cr_error(retcode); |
638 | } else { | |
eafe8130 | 639 | ldpp_dout(sync_env->dpp, 20) << "event stored: " << put_obj.bucket << "/" << put_obj.key << dendl; |
11fdf7f2 TL |
640 | } |
641 | ||
642 | return set_cr_done(); | |
643 | } | |
644 | return 0; | |
645 | } | |
646 | }; | |
647 | ||
eafe8130 | 648 | template<typename EventType> |
11fdf7f2 | 649 | class PushEventCR : public RGWCoroutine { |
9f95a23c | 650 | RGWDataSyncCtx* const sc; |
11fdf7f2 | 651 | RGWDataSyncEnv* const sync_env; |
eafe8130 | 652 | const EventRef<EventType> event; |
11fdf7f2 TL |
653 | const PSSubConfigRef& sub_conf; |
654 | ||
655 | public: | |
9f95a23c | 656 | PushEventCR(RGWDataSyncCtx* const _sc, |
11fdf7f2 | 657 | const PSSubscriptionRef& _sub, |
9f95a23c TL |
658 | const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct), |
659 | sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
660 | event(_event), |
661 | sub_conf(_sub->sub_conf) { | |
662 | } | |
663 | ||
664 | int operate() override { | |
665 | reenter(this) { | |
666 | ceph_assert(sub_conf->push_endpoint); | |
667 | yield call(sub_conf->push_endpoint->send_to_completion_async(*event.get(), sync_env)); | |
668 | ||
669 | if (retcode < 0) { | |
eafe8130 | 670 | ldout(sync_env->cct, 10) << "failed to push event: " << event->id << |
11fdf7f2 | 671 | " to endpoint: " << sub_conf->push_endpoint_name << " ret=" << retcode << dendl; |
11fdf7f2 TL |
672 | return set_cr_error(retcode); |
673 | } | |
674 | ||
eafe8130 | 675 | ldout(sync_env->cct, 20) << "event: " << event->id << |
11fdf7f2 | 676 | " pushed to endpoint: " << sub_conf->push_endpoint_name << dendl; |
11fdf7f2 TL |
677 | return set_cr_done(); |
678 | } | |
679 | return 0; | |
680 | } | |
681 | }; | |
682 | ||
683 | public: | |
9f95a23c | 684 | PSSubscription(RGWDataSyncCtx *_sc, |
11fdf7f2 | 685 | PSEnvRef _env, |
9f95a23c | 686 | PSSubConfigRef& _sub_conf) : sc(_sc), sync_env(_sc->env), |
11fdf7f2 TL |
687 | env(_env), |
688 | sub_conf(_sub_conf), | |
689 | data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {} | |
690 | ||
9f95a23c | 691 | PSSubscription(RGWDataSyncCtx *_sc, |
11fdf7f2 | 692 | PSEnvRef _env, |
9f95a23c | 693 | rgw_pubsub_sub_config& user_sub_conf) : sc(_sc), sync_env(_sc->env), |
11fdf7f2 TL |
694 | env(_env), |
695 | sub_conf(std::make_shared<PSSubConfig>()), | |
696 | data_access(std::make_shared<RGWDataAccess>(sync_env->store)) { | |
697 | sub_conf->from_user_conf(sync_env->cct, user_sub_conf); | |
698 | } | |
699 | virtual ~PSSubscription() { | |
700 | if (init_cr) { | |
701 | init_cr->put(); | |
702 | } | |
703 | } | |
704 | ||
705 | template <class C> | |
9f95a23c | 706 | static PSSubscriptionRef get_shared(RGWDataSyncCtx *_sc, |
11fdf7f2 TL |
707 | PSEnvRef _env, |
708 | C& _sub_conf) { | |
9f95a23c TL |
709 | auto sub = std::make_shared<PSSubscription>(_sc, _env, _sub_conf); |
710 | sub->init_cr = new InitCR(_sc, sub); | |
11fdf7f2 TL |
711 | sub->init_cr->get(); |
712 | return sub; | |
713 | } | |
714 | ||
715 | int call_init_cr(RGWCoroutine *caller) { | |
716 | return init_cr->execute(caller); | |
717 | } | |
718 | ||
eafe8130 | 719 | template<typename EventType> |
9f95a23c TL |
720 | static RGWCoroutine *store_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) { |
721 | return new StoreEventCR<EventType>(sc, sub, event); | |
11fdf7f2 TL |
722 | } |
723 | ||
eafe8130 | 724 | template<typename EventType> |
9f95a23c TL |
725 | static RGWCoroutine *push_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) { |
726 | return new PushEventCR<EventType>(sc, sub, event); | |
11fdf7f2 TL |
727 | } |
728 | friend class InitCR; | |
729 | }; | |
730 | ||
11fdf7f2 TL |
731 | class PSManager |
732 | { | |
9f95a23c | 733 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
734 | RGWDataSyncEnv *sync_env; |
735 | PSEnvRef env; | |
736 | ||
eafe8130 | 737 | std::map<string, PSSubscriptionRef> subs; |
11fdf7f2 TL |
738 | |
739 | class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> { | |
9f95a23c | 740 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
741 | RGWDataSyncEnv *sync_env; |
742 | PSManagerRef mgr; | |
743 | rgw_user owner; | |
744 | string sub_name; | |
745 | string sub_id; | |
746 | PSSubscriptionRef *ref; | |
747 | ||
748 | PSConfigRef conf; | |
749 | ||
750 | PSSubConfigRef sub_conf; | |
751 | rgw_pubsub_sub_config user_sub_conf; | |
eafe8130 | 752 | |
11fdf7f2 | 753 | public: |
9f95a23c | 754 | GetSubCR(RGWDataSyncCtx *_sc, |
11fdf7f2 TL |
755 | PSManagerRef& _mgr, |
756 | const rgw_user& _owner, | |
757 | const string& _sub_name, | |
9f95a23c TL |
758 | PSSubscriptionRef *_ref) : RGWSingletonCR<PSSubscriptionRef>(_sc->cct), |
759 | sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
760 | mgr(_mgr), |
761 | owner(_owner), | |
762 | sub_name(_sub_name), | |
763 | ref(_ref), | |
764 | conf(mgr->env->conf) { | |
765 | } | |
eafe8130 | 766 | ~GetSubCR() { } |
11fdf7f2 TL |
767 | |
768 | int operate() override { | |
769 | reenter(this) { | |
770 | if (owner.empty()) { | |
f67539c2 | 771 | ldout(sync_env->cct, 1) << "ERROR: missing user info when getting subscription: " << sub_name << dendl; |
11fdf7f2 | 772 | mgr->remove_get_sub(owner, sub_name); |
f67539c2 | 773 | return set_cr_error(-EINVAL); |
11fdf7f2 TL |
774 | } else { |
775 | using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>; | |
776 | yield { | |
f67539c2 | 777 | RGWPubSub ps(sync_env->store, owner.tenant); |
11fdf7f2 | 778 | rgw_raw_obj obj; |
f67539c2 | 779 | ps.get_sub_meta_obj(sub_name, &obj); |
11fdf7f2 | 780 | bool empty_on_enoent = false; |
9f95a23c | 781 | call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj, |
11fdf7f2 TL |
782 | obj, |
783 | &user_sub_conf, empty_on_enoent)); | |
784 | } | |
785 | if (retcode < 0) { | |
786 | mgr->remove_get_sub(owner, sub_name); | |
787 | return set_cr_error(retcode); | |
788 | } | |
789 | ||
9f95a23c | 790 | *ref = PSSubscription::get_shared(sc, mgr->env, user_sub_conf); |
11fdf7f2 TL |
791 | } |
792 | ||
793 | yield (*ref)->call_init_cr(this); | |
794 | if (retcode < 0) { | |
f67539c2 | 795 | ldout(sync_env->cct, 1) << "ERROR: failed to init subscription when getting subscription: " << sub_name << dendl; |
11fdf7f2 TL |
796 | mgr->remove_get_sub(owner, sub_name); |
797 | return set_cr_error(retcode); | |
798 | } | |
799 | ||
11fdf7f2 TL |
800 | mgr->remove_get_sub(owner, sub_name); |
801 | ||
802 | return set_cr_done(); | |
803 | } | |
804 | return 0; | |
805 | } | |
806 | ||
807 | void return_result(PSSubscriptionRef *result) override { | |
808 | ldout(cct, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl; | |
809 | if (retcode >= 0) { | |
810 | *result = *ref; | |
811 | } | |
812 | } | |
813 | }; | |
814 | ||
815 | string sub_id(const rgw_user& owner, const string& sub_name) { | |
816 | string owner_prefix; | |
817 | if (!owner.empty()) { | |
818 | owner_prefix = owner.to_str() + "/"; | |
819 | } | |
820 | ||
821 | return owner_prefix + sub_name; | |
822 | } | |
823 | ||
eafe8130 | 824 | std::map<std::string, GetSubCR *> get_subs; |
11fdf7f2 TL |
825 | |
826 | GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) { | |
827 | return get_subs[sub_id(owner, name)]; | |
828 | } | |
829 | ||
830 | void remove_get_sub(const rgw_user& owner, const string& name) { | |
831 | get_subs.erase(sub_id(owner, name)); | |
832 | } | |
833 | ||
834 | bool find_sub_instance(const rgw_user& owner, const string& sub_name, PSSubscriptionRef *sub) { | |
835 | auto iter = subs.find(sub_id(owner, sub_name)); | |
836 | if (iter != subs.end()) { | |
837 | *sub = iter->second; | |
838 | return true; | |
839 | } | |
840 | return false; | |
841 | } | |
842 | ||
9f95a23c TL |
843 | PSManager(RGWDataSyncCtx *_sc, |
844 | PSEnvRef _env) : sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
845 | env(_env) {} |
846 | ||
847 | public: | |
9f95a23c | 848 | static PSManagerRef get_shared(RGWDataSyncCtx *_sc, |
11fdf7f2 | 849 | PSEnvRef _env) { |
9f95a23c | 850 | return std::shared_ptr<PSManager>(new PSManager(_sc, _env)); |
11fdf7f2 TL |
851 | } |
852 | ||
9f95a23c | 853 | static int call_get_subscription_cr(RGWDataSyncCtx *sc, PSManagerRef& mgr, |
11fdf7f2 TL |
854 | RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) { |
855 | if (mgr->find_sub_instance(owner, sub_name, ref)) { | |
856 | /* found it! nothing to execute */ | |
9f95a23c | 857 | ldout(sc->cct, 20) << __func__ << "(): found sub instance" << dendl; |
11fdf7f2 TL |
858 | } |
859 | auto& gs = mgr->get_get_subs(owner, sub_name); | |
860 | if (!gs) { | |
9f95a23c TL |
861 | ldout(sc->cct, 20) << __func__ << "(): first get subs" << dendl; |
862 | gs = new GetSubCR(sc, mgr, owner, sub_name, ref); | |
11fdf7f2 | 863 | } |
9f95a23c | 864 | ldout(sc->cct, 20) << __func__ << "(): executing get subs" << dendl; |
11fdf7f2 TL |
865 | return gs->execute(caller, ref); |
866 | } | |
867 | ||
868 | friend class GetSubCR; | |
869 | }; | |
870 | ||
871 | void PSEnv::init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr) { | |
872 | manager = mgr; | |
873 | conf->init_instance(realm, instance_id); | |
874 | } | |
875 | ||
876 | class RGWPSInitEnvCBCR : public RGWCoroutine { | |
9f95a23c | 877 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
878 | RGWDataSyncEnv *sync_env; |
879 | PSEnvRef env; | |
880 | PSConfigRef& conf; | |
881 | ||
882 | rgw_user_create_params create_user; | |
883 | rgw_get_user_info_params get_user_info; | |
884 | public: | |
9f95a23c TL |
885 | RGWPSInitEnvCBCR(RGWDataSyncCtx *_sc, |
886 | PSEnvRef& _env) : RGWCoroutine(_sc->cct), | |
887 | sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
888 | env(_env), conf(env->conf) {} |
889 | int operate() override { | |
890 | reenter(this) { | |
9f95a23c | 891 | ldpp_dout(sync_env->dpp, 1) << ": init pubsub config zone=" << sc->source_zone << dendl; |
11fdf7f2 TL |
892 | |
893 | /* nothing to do here right now */ | |
894 | create_user.user = conf->user; | |
895 | create_user.max_buckets = 0; /* unlimited */ | |
896 | create_user.display_name = "pubsub"; | |
897 | create_user.generate_key = false; | |
9f95a23c | 898 | yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user, sync_env->dpp)); |
1911f103 | 899 | if (retcode < 0 && retcode != -ERR_USER_EXIST) { |
9f95a23c | 900 | ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl; |
11fdf7f2 TL |
901 | return set_cr_error(retcode); |
902 | } | |
903 | ||
904 | get_user_info.user = conf->user; | |
905 | yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info)); | |
906 | if (retcode < 0) { | |
9f95a23c | 907 | ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl; |
11fdf7f2 TL |
908 | return set_cr_error(retcode); |
909 | } | |
910 | ||
9f95a23c | 911 | ldpp_dout(sync_env->dpp, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env->data_user_info, true) << dendl; |
11fdf7f2 TL |
912 | |
913 | ||
914 | return set_cr_done(); | |
915 | } | |
916 | return 0; | |
917 | } | |
918 | }; | |
919 | ||
eafe8130 TL |
920 | bool match(const rgw_pubsub_topic_filter& filter, const std::string& key_name, rgw::notify::EventType event_type) { |
921 | if (!match(filter.events, event_type)) { | |
922 | return false; | |
923 | } | |
924 | if (!match(filter.s3_filter.key_filter, key_name)) { | |
925 | return false; | |
926 | } | |
927 | return true; | |
928 | } | |
929 | ||
11fdf7f2 | 930 | class RGWPSFindBucketTopicsCR : public RGWCoroutine { |
9f95a23c | 931 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
932 | RGWDataSyncEnv *sync_env; |
933 | PSEnvRef env; | |
934 | rgw_user owner; | |
935 | rgw_bucket bucket; | |
936 | rgw_obj_key key; | |
eafe8130 | 937 | rgw::notify::EventType event_type; |
11fdf7f2 | 938 | |
f67539c2 | 939 | RGWPubSub ps; |
11fdf7f2 TL |
940 | |
941 | rgw_raw_obj bucket_obj; | |
942 | rgw_raw_obj user_obj; | |
943 | rgw_pubsub_bucket_topics bucket_topics; | |
f67539c2 | 944 | rgw_pubsub_topics user_topics; |
11fdf7f2 TL |
945 | TopicsRef *topics; |
946 | public: | |
9f95a23c | 947 | RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc, |
11fdf7f2 TL |
948 | PSEnvRef& _env, |
949 | const rgw_user& _owner, | |
950 | const rgw_bucket& _bucket, | |
951 | const rgw_obj_key& _key, | |
eafe8130 | 952 | rgw::notify::EventType _event_type, |
9f95a23c TL |
953 | TopicsRef *_topics) : RGWCoroutine(_sc->cct), |
954 | sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
955 | env(_env), |
956 | owner(_owner), | |
957 | bucket(_bucket), | |
958 | key(_key), | |
eafe8130 | 959 | event_type(_event_type), |
f67539c2 | 960 | ps(sync_env->store, owner.tenant), |
11fdf7f2 TL |
961 | topics(_topics) { |
962 | *topics = std::make_shared<vector<PSTopicConfigRef> >(); | |
963 | } | |
964 | int operate() override { | |
965 | reenter(this) { | |
f67539c2 TL |
966 | ps.get_bucket_meta_obj(bucket, &bucket_obj); |
967 | ps.get_meta_obj(&user_obj); | |
11fdf7f2 TL |
968 | |
969 | using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>; | |
970 | yield { | |
971 | bool empty_on_enoent = true; | |
9f95a23c | 972 | call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj, |
11fdf7f2 TL |
973 | bucket_obj, |
974 | &bucket_topics, empty_on_enoent)); | |
975 | } | |
976 | if (retcode < 0 && retcode != -ENOENT) { | |
977 | return set_cr_error(retcode); | |
978 | } | |
979 | ||
980 | ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl; | |
981 | ||
982 | if (!bucket_topics.topics.empty()) { | |
f67539c2 | 983 | using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_topics>; |
11fdf7f2 TL |
984 | yield { |
985 | bool empty_on_enoent = true; | |
9f95a23c | 986 | call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj, |
11fdf7f2 TL |
987 | user_obj, |
988 | &user_topics, empty_on_enoent)); | |
989 | } | |
990 | if (retcode < 0 && retcode != -ENOENT) { | |
991 | return set_cr_error(retcode); | |
992 | } | |
993 | } | |
994 | ||
995 | for (auto& titer : bucket_topics.topics) { | |
996 | auto& topic_filter = titer.second; | |
997 | auto& info = topic_filter.topic; | |
eafe8130 | 998 | if (!match(topic_filter, key.name, event_type)) { |
11fdf7f2 TL |
999 | continue; |
1000 | } | |
eafe8130 | 1001 | std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>(); |
11fdf7f2 TL |
1002 | tc->name = info.name; |
1003 | tc->subs = user_topics.topics[info.name].subs; | |
9f95a23c | 1004 | tc->opaque_data = info.opaque_data; |
11fdf7f2 TL |
1005 | (*topics)->push_back(tc); |
1006 | } | |
1007 | ||
11fdf7f2 TL |
1008 | return set_cr_done(); |
1009 | } | |
1010 | return 0; | |
1011 | } | |
1012 | }; | |
1013 | ||
1014 | class RGWPSHandleObjEventCR : public RGWCoroutine { | |
9f95a23c | 1015 | RGWDataSyncCtx* const sc; |
11fdf7f2 | 1016 | const PSEnvRef env; |
f67539c2 | 1017 | const rgw_user owner; |
eafe8130 | 1018 | const EventRef<rgw_pubsub_event> event; |
f67539c2 | 1019 | const EventRef<rgw_pubsub_s3_event> s3_event; |
11fdf7f2 | 1020 | const TopicsRef topics; |
11fdf7f2 TL |
1021 | bool has_subscriptions; |
1022 | bool event_handled; | |
11fdf7f2 | 1023 | PSSubscriptionRef sub; |
eafe8130 | 1024 | std::vector<PSTopicConfigRef>::const_iterator titer; |
9f95a23c | 1025 | std::set<std::string>::const_iterator siter; |
11fdf7f2 TL |
1026 | |
1027 | public: | |
9f95a23c | 1028 | RGWPSHandleObjEventCR(RGWDataSyncCtx* const _sc, |
11fdf7f2 TL |
1029 | const PSEnvRef _env, |
1030 | const rgw_user& _owner, | |
eafe8130 | 1031 | const EventRef<rgw_pubsub_event>& _event, |
f67539c2 | 1032 | const EventRef<rgw_pubsub_s3_event>& _s3_event, |
9f95a23c TL |
1033 | const TopicsRef& _topics) : RGWCoroutine(_sc->cct), |
1034 | sc(_sc), | |
11fdf7f2 TL |
1035 | env(_env), |
1036 | owner(_owner), | |
1037 | event(_event), | |
f67539c2 | 1038 | s3_event(_s3_event), |
11fdf7f2 | 1039 | topics(_topics), |
11fdf7f2 TL |
1040 | has_subscriptions(false), |
1041 | event_handled(false) {} | |
1042 | ||
1043 | int operate() override { | |
1044 | reenter(this) { | |
9f95a23c | 1045 | ldout(sc->cct, 20) << ": handle event: obj: z=" << sc->source_zone |
11fdf7f2 TL |
1046 | << " event=" << json_str("event", *event, false) |
1047 | << " owner=" << owner << dendl; | |
1048 | ||
9f95a23c | 1049 | ldout(sc->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl; |
eafe8130 TL |
1050 | |
1051 | // outside caller should check that | |
1052 | ceph_assert(!topics->empty()); | |
11fdf7f2 TL |
1053 | |
1054 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered); | |
1055 | ||
eafe8130 | 1056 | // loop over all topics related to the bucket/object |
11fdf7f2 | 1057 | for (titer = topics->begin(); titer != topics->end(); ++titer) { |
9f95a23c | 1058 | ldout(sc->cct, 20) << ": notification for " << event->source << ": topic=" << |
11fdf7f2 | 1059 | (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl; |
eafe8130 | 1060 | // loop over all subscriptions of the topic |
11fdf7f2 | 1061 | for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) { |
9f95a23c | 1062 | ldout(sc->cct, 20) << ": subscription: " << *siter << dendl; |
11fdf7f2 | 1063 | has_subscriptions = true; |
f67539c2 TL |
1064 | // try to read subscription configuration |
1065 | yield PSManager::call_get_subscription_cr(sc, env->manager, this, owner, *siter, &sub); | |
1066 | if (retcode < 0) { | |
1067 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf); | |
1068 | ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter | |
1069 | << " ret=" << retcode << dendl; | |
1070 | if (retcode == -ENOENT) { | |
1071 | // missing subscription info should be reflected back as invalid argument | |
1072 | // and not as missing object | |
1073 | retcode = -EINVAL; | |
1074 | } | |
1075 | // try the next subscription | |
1076 | continue; | |
1077 | } | |
1078 | if (sub->sub_conf->s3_id.empty()) { | |
1079 | // subscription was not made by S3 compatible API | |
1080 | ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; | |
1081 | yield call(PSSubscription::store_event_cr(sc, sub, event)); | |
11fdf7f2 | 1082 | if (retcode < 0) { |
f67539c2 TL |
1083 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); |
1084 | ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl; | |
1085 | } else { | |
1086 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); | |
1087 | event_handled = true; | |
11fdf7f2 | 1088 | } |
f67539c2 TL |
1089 | if (sub->sub_conf->push_endpoint) { |
1090 | ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; | |
1091 | yield call(PSSubscription::push_event_cr(sc, sub, event)); | |
eafe8130 | 1092 | if (retcode < 0) { |
f67539c2 TL |
1093 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); |
1094 | ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl; | |
eafe8130 | 1095 | } else { |
f67539c2 | 1096 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); |
eafe8130 TL |
1097 | event_handled = true; |
1098 | } | |
f67539c2 TL |
1099 | } |
1100 | } else { | |
1101 | // subscription was made by S3 compatible API | |
1102 | ldout(sc->cct, 20) << "storing s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; | |
1103 | s3_event->configurationId = sub->sub_conf->s3_id; | |
1104 | s3_event->opaque_data = (*titer)->opaque_data; | |
1105 | yield call(PSSubscription::store_event_cr(sc, sub, s3_event)); | |
1106 | if (retcode < 0) { | |
1107 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); | |
1108 | ldout(sc->cct, 1) << "ERROR: failed to store s3 event for subscription=" << *siter << " ret=" << retcode << dendl; | |
eafe8130 | 1109 | } else { |
f67539c2 TL |
1110 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); |
1111 | event_handled = true; | |
1112 | } | |
1113 | if (sub->sub_conf->push_endpoint) { | |
1114 | ldout(sc->cct, 20) << "push s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; | |
1115 | yield call(PSSubscription::push_event_cr(sc, sub, s3_event)); | |
11fdf7f2 | 1116 | if (retcode < 0) { |
f67539c2 TL |
1117 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); |
1118 | ldout(sc->cct, 1) << "ERROR: failed to push s3 event for subscription=" << *siter << " ret=" << retcode << dendl; | |
11fdf7f2 | 1119 | } else { |
f67539c2 | 1120 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); |
11fdf7f2 TL |
1121 | event_handled = true; |
1122 | } | |
1123 | } | |
1124 | } | |
11fdf7f2 TL |
1125 | } |
1126 | } | |
1127 | if (has_subscriptions && !event_handled) { | |
1128 | // event is considered "lost" of it has subscriptions on any of its topics | |
1129 | // but it was not stored in, or pushed to, any of them | |
1130 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost); | |
1131 | } | |
1132 | if (retcode < 0) { | |
1133 | return set_cr_error(retcode); | |
1134 | } | |
1135 | return set_cr_done(); | |
1136 | } | |
1137 | return 0; | |
1138 | } | |
1139 | }; | |
1140 | ||
eafe8130 | 1141 | // coroutine invoked on remote object creation |
11fdf7f2 | 1142 | class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { |
9f95a23c TL |
1143 | RGWDataSyncCtx *sc; |
1144 | rgw_bucket_sync_pipe sync_pipe; | |
11fdf7f2 TL |
1145 | PSEnvRef env; |
1146 | std::optional<uint64_t> versioned_epoch; | |
eafe8130 | 1147 | EventRef<rgw_pubsub_event> event; |
f67539c2 | 1148 | EventRef<rgw_pubsub_s3_event> s3_event; |
11fdf7f2 TL |
1149 | TopicsRef topics; |
1150 | public: | |
9f95a23c TL |
1151 | RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc, |
1152 | rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, | |
11fdf7f2 | 1153 | PSEnvRef _env, std::optional<uint64_t> _versioned_epoch, |
9f95a23c TL |
1154 | TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key), |
1155 | sc(_sc), | |
1156 | sync_pipe(_sync_pipe), | |
11fdf7f2 TL |
1157 | env(_env), |
1158 | versioned_epoch(_versioned_epoch), | |
1159 | topics(_topics) { | |
1160 | } | |
1161 | int operate() override { | |
1162 | reenter(this) { | |
9f95a23c TL |
1163 | ldout(sc->cct, 20) << ": stat of remote obj: z=" << sc->source_zone |
1164 | << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime | |
11fdf7f2 TL |
1165 | << " attrs=" << attrs << dendl; |
1166 | { | |
1167 | std::vector<std::pair<std::string, std::string> > attrs; | |
1168 | for (auto& attr : attrs) { | |
9f95a23c | 1169 | std::string k = attr.first; |
11fdf7f2 TL |
1170 | if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) { |
1171 | k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1); | |
1172 | } | |
1173 | attrs.push_back(std::make_pair(k, attr.second)); | |
eafe8130 | 1174 | } |
f67539c2 | 1175 | // at this point we don't know whether we need the ceph event or S3 event |
eafe8130 TL |
1176 | // this is why both are created here, once we have information about the |
1177 | // subscription, we will store/push only the relevant ones | |
9f95a23c TL |
1178 | make_event_ref(sc->cct, |
1179 | sync_pipe.info.source_bs.bucket, key, | |
11fdf7f2 | 1180 | mtime, &attrs, |
eafe8130 | 1181 | rgw::notify::ObjectCreated, &event); |
f67539c2 | 1182 | make_s3_event_ref(sc->cct, |
9f95a23c | 1183 | sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key, |
eafe8130 | 1184 | mtime, &attrs, |
f67539c2 | 1185 | rgw::notify::ObjectCreated, &s3_event); |
11fdf7f2 TL |
1186 | } |
1187 | ||
f67539c2 | 1188 | yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, s3_event, topics)); |
11fdf7f2 TL |
1189 | if (retcode < 0) { |
1190 | return set_cr_error(retcode); | |
1191 | } | |
1192 | return set_cr_done(); | |
1193 | } | |
1194 | return 0; | |
1195 | } | |
1196 | }; | |
1197 | ||
1198 | class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { | |
9f95a23c | 1199 | rgw_bucket_sync_pipe sync_pipe; |
11fdf7f2 TL |
1200 | PSEnvRef env; |
1201 | std::optional<uint64_t> versioned_epoch; | |
1202 | TopicsRef topics; | |
1203 | public: | |
9f95a23c TL |
1204 | RGWPSHandleRemoteObjCR(RGWDataSyncCtx *_sc, |
1205 | rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, | |
11fdf7f2 | 1206 | PSEnvRef _env, std::optional<uint64_t> _versioned_epoch, |
9f95a23c TL |
1207 | TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key), |
1208 | sync_pipe(_sync_pipe), | |
11fdf7f2 TL |
1209 | env(_env), versioned_epoch(_versioned_epoch), |
1210 | topics(_topics) { | |
1211 | } | |
1212 | ||
1213 | ~RGWPSHandleRemoteObjCR() override {} | |
1214 | ||
1215 | RGWStatRemoteObjCBCR *allocate_callback() override { | |
9f95a23c | 1216 | return new RGWPSHandleRemoteObjCBCR(sc, sync_pipe, key, env, versioned_epoch, topics); |
11fdf7f2 TL |
1217 | } |
1218 | }; | |
1219 | ||
1220 | class RGWPSHandleObjCreateCR : public RGWCoroutine { | |
9f95a23c TL |
1221 | RGWDataSyncCtx *sc; |
1222 | rgw_bucket_sync_pipe sync_pipe; | |
11fdf7f2 TL |
1223 | rgw_obj_key key; |
1224 | PSEnvRef env; | |
1225 | std::optional<uint64_t> versioned_epoch; | |
1226 | TopicsRef topics; | |
1227 | public: | |
9f95a23c TL |
1228 | RGWPSHandleObjCreateCR(RGWDataSyncCtx *_sc, |
1229 | rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, | |
1230 | PSEnvRef _env, std::optional<uint64_t> _versioned_epoch) : RGWCoroutine(_sc->cct), | |
1231 | sc(_sc), | |
1232 | sync_pipe(_sync_pipe), | |
11fdf7f2 TL |
1233 | key(_key), |
1234 | env(_env), | |
1235 | versioned_epoch(_versioned_epoch) { | |
1236 | } | |
1237 | ||
1238 | ~RGWPSHandleObjCreateCR() override {} | |
1239 | ||
1240 | int operate() override { | |
1241 | reenter(this) { | |
9f95a23c TL |
1242 | yield call(new RGWPSFindBucketTopicsCR(sc, env, sync_pipe.dest_bucket_info.owner, |
1243 | sync_pipe.info.source_bs.bucket, key, | |
eafe8130 | 1244 | rgw::notify::ObjectCreated, |
11fdf7f2 TL |
1245 | &topics)); |
1246 | if (retcode < 0) { | |
9f95a23c | 1247 | ldout(sc->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; |
11fdf7f2 TL |
1248 | return set_cr_error(retcode); |
1249 | } | |
1250 | if (topics->empty()) { | |
9f95a23c | 1251 | ldout(sc->cct, 20) << "no topics found for " << sync_pipe.info.source_bs.bucket << "/" << key << dendl; |
11fdf7f2 TL |
1252 | return set_cr_done(); |
1253 | } | |
9f95a23c | 1254 | yield call(new RGWPSHandleRemoteObjCR(sc, sync_pipe, key, env, versioned_epoch, topics)); |
11fdf7f2 TL |
1255 | if (retcode < 0) { |
1256 | return set_cr_error(retcode); | |
1257 | } | |
1258 | return set_cr_done(); | |
1259 | } | |
1260 | return 0; | |
1261 | } | |
1262 | }; | |
1263 | ||
eafe8130 | 1264 | // coroutine invoked on remote object deletion |
11fdf7f2 | 1265 | class RGWPSGenericObjEventCBCR : public RGWCoroutine { |
9f95a23c | 1266 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
1267 | PSEnvRef env; |
1268 | rgw_user owner; | |
1269 | rgw_bucket bucket; | |
1270 | rgw_obj_key key; | |
1271 | ceph::real_time mtime; | |
eafe8130 TL |
1272 | rgw::notify::EventType event_type; |
1273 | EventRef<rgw_pubsub_event> event; | |
f67539c2 | 1274 | EventRef<rgw_pubsub_s3_event> s3_event; |
11fdf7f2 TL |
1275 | TopicsRef topics; |
1276 | public: | |
9f95a23c | 1277 | RGWPSGenericObjEventCBCR(RGWDataSyncCtx *_sc, |
11fdf7f2 | 1278 | PSEnvRef _env, |
9f95a23c TL |
1279 | rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime, |
1280 | rgw::notify::EventType _event_type) : RGWCoroutine(_sc->cct), | |
1281 | sc(_sc), | |
11fdf7f2 | 1282 | env(_env), |
9f95a23c TL |
1283 | owner(_sync_pipe.dest_bucket_info.owner), |
1284 | bucket(_sync_pipe.dest_bucket_info.bucket), | |
11fdf7f2 | 1285 | key(_key), |
eafe8130 | 1286 | mtime(_mtime), event_type(_event_type) {} |
11fdf7f2 TL |
1287 | int operate() override { |
1288 | reenter(this) { | |
9f95a23c | 1289 | ldout(sc->cct, 20) << ": remove remote obj: z=" << sc->source_zone |
11fdf7f2 | 1290 | << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl; |
9f95a23c | 1291 | yield call(new RGWPSFindBucketTopicsCR(sc, env, owner, bucket, key, event_type, &topics)); |
11fdf7f2 | 1292 | if (retcode < 0) { |
9f95a23c | 1293 | ldout(sc->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; |
11fdf7f2 TL |
1294 | return set_cr_error(retcode); |
1295 | } | |
1296 | if (topics->empty()) { | |
9f95a23c | 1297 | ldout(sc->cct, 20) << "no topics found for " << bucket << "/" << key << dendl; |
11fdf7f2 TL |
1298 | return set_cr_done(); |
1299 | } | |
f67539c2 | 1300 | // at this point we don't know whether we need the ceph event or S3 event |
eafe8130 TL |
1301 | // this is why both are created here, once we have information about the |
1302 | // subscription, we will store/push only the relevant ones | |
9f95a23c | 1303 | make_event_ref(sc->cct, |
eafe8130 TL |
1304 | bucket, key, |
1305 | mtime, nullptr, | |
1306 | event_type, &event); | |
f67539c2 | 1307 | make_s3_event_ref(sc->cct, |
eafe8130 TL |
1308 | bucket, owner, key, |
1309 | mtime, nullptr, | |
f67539c2 TL |
1310 | event_type, &s3_event); |
1311 | yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, s3_event, topics)); | |
11fdf7f2 TL |
1312 | if (retcode < 0) { |
1313 | return set_cr_error(retcode); | |
1314 | } | |
1315 | return set_cr_done(); | |
1316 | } | |
1317 | return 0; | |
1318 | } | |
1319 | ||
1320 | }; | |
1321 | ||
1322 | class RGWPSDataSyncModule : public RGWDataSyncModule { | |
1323 | PSEnvRef env; | |
1324 | PSConfigRef& conf; | |
eafe8130 | 1325 | |
11fdf7f2 TL |
1326 | public: |
1327 | RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared<PSEnv>()), conf(env->conf) { | |
1328 | env->init(cct, config); | |
1329 | } | |
eafe8130 | 1330 | |
11fdf7f2 TL |
1331 | ~RGWPSDataSyncModule() override {} |
1332 | ||
9f95a23c TL |
1333 | void init(RGWDataSyncCtx *sc, uint64_t instance_id) override { |
1334 | auto sync_env = sc->env; | |
1335 | PSManagerRef mgr = PSManager::get_shared(sc, env); | |
1336 | env->init_instance(sync_env->svc->zone->get_realm(), instance_id, mgr); | |
11fdf7f2 TL |
1337 | } |
1338 | ||
9f95a23c TL |
1339 | RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override { |
1340 | ldout(sc->cct, 5) << conf->id << ": start" << dendl; | |
1341 | return new RGWPSInitEnvCBCR(sc, env); | |
11fdf7f2 | 1342 | } |
eafe8130 | 1343 | |
9f95a23c | 1344 | RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, |
eafe8130 | 1345 | rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override { |
9f95a23c | 1346 | ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe << |
eafe8130 | 1347 | " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; |
9f95a23c | 1348 | return new RGWPSHandleObjCreateCR(sc, sync_pipe, key, env, versioned_epoch); |
11fdf7f2 | 1349 | } |
eafe8130 | 1350 | |
9f95a23c | 1351 | RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, |
eafe8130 | 1352 | rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { |
9f95a23c | 1353 | ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe << |
eafe8130 | 1354 | " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; |
9f95a23c | 1355 | return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete); |
11fdf7f2 | 1356 | } |
eafe8130 | 1357 | |
9f95a23c | 1358 | RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, |
eafe8130 | 1359 | rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { |
9f95a23c | 1360 | ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe << |
eafe8130 | 1361 | " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; |
9f95a23c | 1362 | return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated); |
11fdf7f2 TL |
1363 | } |
1364 | ||
1365 | PSConfigRef& get_conf() { return conf; } | |
1366 | }; | |
1367 | ||
1368 | RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config) | |
1369 | { | |
1370 | data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config)); | |
9f95a23c | 1371 | const std::string jconf = json_str("conf", *data_handler->get_conf()); |
11fdf7f2 TL |
1372 | JSONParser p; |
1373 | if (!p.parse(jconf.c_str(), jconf.size())) { | |
eafe8130 | 1374 | ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl; |
11fdf7f2 TL |
1375 | effective_conf = config; |
1376 | } else { | |
1377 | effective_conf.decode_json(&p); | |
1378 | } | |
1379 | } | |
1380 | ||
1381 | RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler() | |
1382 | { | |
1383 | return data_handler.get(); | |
1384 | } | |
1385 | ||
1386 | RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) { | |
1387 | if (dialect != RGW_REST_S3) { | |
1388 | return orig; | |
1389 | } | |
eafe8130 TL |
1390 | return new RGWRESTMgr_PubSub(); |
1391 | } | |
1392 | ||
1393 | bool RGWPSSyncModuleInstance::should_full_sync() const { | |
1394 | return data_handler->get_conf()->start_with_full_sync; | |
11fdf7f2 TL |
1395 | } |
1396 | ||
1397 | int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { | |
1398 | instance->reset(new RGWPSSyncModuleInstance(cct, config)); | |
1399 | return 0; | |
1400 | } | |
1401 | ||
eafe8130 | 1402 |