]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_module_pubsub.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / rgw / rgw_sync_module_pubsub.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "services/svc_zone.h"
5 #include "rgw_common.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_sync_module.h"
8 #include "rgw_data_sync.h"
9 #include "rgw_sync_module_pubsub.h"
10 #include "rgw_sync_module_pubsub_rest.h"
11 #include "rgw_rest_conn.h"
12 #include "rgw_cr_rados.h"
13 #include "rgw_cr_rest.h"
14 #include "rgw_cr_tools.h"
15 #include "rgw_op.h"
16 #include "rgw_pubsub.h"
17 #include "rgw_pubsub_push.h"
18 #include "rgw_notify_event_type.h"
19 #include "rgw_perf_counters.h"
20
21 #include <boost/algorithm/hex.hpp>
22 #include <boost/asio/yield.hpp>
23
24 #define dout_subsys ceph_subsys_rgw
25
26
27 #define PUBSUB_EVENTS_RETENTION_DEFAULT 7
28
29 using namespace std;
30
31 /*
32
33 config:
34
35 {
36 "tenant": <tenant>, # default: <empty>
37 "uid": <uid>, # default: "pubsub"
38 "data_bucket_prefix": <prefix> # default: "pubsub-"
39 "data_oid_prefix": <prefix> #
40 "events_retention_days": <int> # default: 7
41 "start_with_full_sync" <bool> # default: false
42 }
43
44 */
45
46 // utility function to convert the args list from string format
47 // (ampresend separated with equal sign) to prased structure
48 RGWHTTPArgs string_to_args(const std::string& str_args, const DoutPrefixProvider *dpp) {
49 RGWHTTPArgs args;
50 args.set(str_args);
51 args.parse(dpp);
52 return args;
53 }
54
55 struct PSSubConfig {
56 std::string name;
57 std::string topic;
58 std::string push_endpoint_name;
59 std::string push_endpoint_args;
60 std::string data_bucket_name;
61 std::string data_oid_prefix;
62 std::string s3_id;
63 std::string arn_topic;
64 RGWPubSubEndpoint::Ptr push_endpoint;
65
66 void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc, const DoutPrefixProvider *dpp) {
67 name = uc.name;
68 topic = uc.topic;
69 push_endpoint_name = uc.dest.push_endpoint;
70 data_bucket_name = uc.dest.bucket_name;
71 data_oid_prefix = uc.dest.oid_prefix;
72 s3_id = uc.s3_id;
73 arn_topic = uc.dest.arn_topic;
74 if (!push_endpoint_name.empty()) {
75 push_endpoint_args = uc.dest.push_endpoint_args;
76 try {
77 push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args, dpp), cct);
78 ldpp_dout(dpp, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
79 } catch (const RGWPubSubEndpoint::configuration_error& e) {
80 ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint: "
81 << push_endpoint_name << " due to: " << e.what() << dendl;
82 }
83 }
84 }
85
86 void dump(Formatter *f) const {
87 encode_json("name", name, f);
88 encode_json("topic", topic, f);
89 encode_json("push_endpoint", push_endpoint_name, f);
90 encode_json("push_endpoint_args", push_endpoint_args, f);
91 encode_json("data_bucket_name", data_bucket_name, f);
92 encode_json("data_oid_prefix", data_oid_prefix, f);
93 encode_json("s3_id", s3_id, f);
94 }
95
96 };
97
98 using PSSubConfigRef = std::shared_ptr<PSSubConfig>;
99
100 struct PSTopicConfig {
101 std::string name;
102 std::set<std::string> subs;
103 std::string opaque_data;
104
105 void dump(Formatter *f) const {
106 encode_json("name", name, f);
107 encode_json("subs", subs, f);
108 encode_json("opaque", opaque_data, f);
109 }
110 };
111
112 struct PSNotificationConfig {
113 uint64_t id{0};
114 string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */
115 string topic;
116 bool is_prefix{false};
117
118
119 void dump(Formatter *f) const {
120 encode_json("id", id, f);
121 encode_json("path", path, f);
122 encode_json("topic", topic, f);
123 encode_json("is_prefix", is_prefix, f);
124 }
125
126 void init(CephContext *cct, const JSONFormattable& config) {
127 path = config["path"];
128 if (!path.empty() && path[path.size() - 1] == '*') {
129 path = path.substr(0, path.size() - 1);
130 is_prefix = true;
131 }
132 topic = config["topic"];
133 }
134 };
135
136 template<class T>
137 static string json_str(const char *name, const T& obj, bool pretty = false)
138 {
139 stringstream ss;
140 JSONFormatter f(pretty);
141
142 encode_json(name, obj, &f);
143 f.flush(ss);
144
145 return ss.str();
146 }
147
148 using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
149 using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
150
151 // global pubsub configuration
152 struct PSConfig {
153 const std::string id{"pubsub"};
154 rgw_user user;
155 std::string data_bucket_prefix;
156 std::string data_oid_prefix;
157 int events_retention_days{0};
158 uint64_t sync_instance{0};
159 bool start_with_full_sync{false};
160
161 void dump(Formatter *f) const {
162 encode_json("id", id, f);
163 encode_json("user", user, f);
164 encode_json("data_bucket_prefix", data_bucket_prefix, f);
165 encode_json("data_oid_prefix", data_oid_prefix, f);
166 encode_json("events_retention_days", events_retention_days, f);
167 encode_json("sync_instance", sync_instance, f);
168 encode_json("start_with_full_sync", start_with_full_sync, f);
169 }
170
171 void init(CephContext *cct, const JSONFormattable& config) {
172 string uid = config["uid"]("pubsub");
173 user = rgw_user(config["tenant"], uid);
174 data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
175 data_oid_prefix = config["data_oid_prefix"];
176 events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT);
177 start_with_full_sync = config["start_with_full_sync"](false);
178
179 ldout(cct, 20) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
180 }
181
182 void init_instance(const RGWRealm& realm, uint64_t instance_id) {
183 sync_instance = instance_id;
184 }
185 };
186
187 using PSConfigRef = std::shared_ptr<PSConfig>;
188 template<typename EventType>
189 using EventRef = std::shared_ptr<EventType>;
190
191 struct objstore_event {
192 string id;
193 const rgw_bucket& bucket;
194 const rgw_obj_key& key;
195 const ceph::real_time& mtime;
196 const std::vector<std::pair<std::string, std::string> > *attrs;
197
198 objstore_event(const rgw_bucket& _bucket,
199 const rgw_obj_key& _key,
200 const ceph::real_time& _mtime,
201 const std::vector<std::pair<std::string, std::string> > *_attrs) : bucket(_bucket),
202 key(_key),
203 mtime(_mtime),
204 attrs(_attrs) {}
205
206 string get_hash() {
207 string etag;
208 RGWMD5Etag hash;
209 hash.update(bucket.bucket_id);
210 hash.update(key.name);
211 hash.update(key.instance);
212 hash.finish(&etag);
213
214 assert(etag.size() > 8);
215
216 return etag.substr(0, 8);
217 }
218
219 void dump(Formatter *f) const {
220 {
221 Formatter::ObjectSection s(*f, "bucket");
222 encode_json("name", bucket.name, f);
223 encode_json("tenant", bucket.tenant, f);
224 encode_json("bucket_id", bucket.bucket_id, f);
225 }
226 {
227 Formatter::ObjectSection s(*f, "key");
228 encode_json("name", key.name, f);
229 encode_json("instance", key.instance, f);
230 }
231 utime_t mt(mtime);
232 encode_json("mtime", mt, f);
233 Formatter::ObjectSection s(*f, "attrs");
234 if (attrs) {
235 for (auto& attr : *attrs) {
236 encode_json(attr.first.c_str(), attr.second.c_str(), f);
237 }
238 }
239 }
240 };
241
242 static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
243 const rgw_obj_key& key,
244 const ceph::real_time& mtime,
245 const std::vector<std::pair<std::string, std::string> > *attrs,
246 rgw::notify::EventType event_type,
247 EventRef<rgw_pubsub_event> *event) {
248 *event = std::make_shared<rgw_pubsub_event>();
249
250 EventRef<rgw_pubsub_event>& e = *event;
251 e->event_name = rgw::notify::to_ceph_string(event_type);
252 e->source = bucket.name + "/" + key.name;
253 e->timestamp = real_clock::now();
254
255 objstore_event oevent(bucket, key, mtime, attrs);
256
257 const utime_t ts(e->timestamp);
258 set_event_id(e->id, oevent.get_hash(), ts);
259
260 encode_json("info", oevent, &e->info);
261 }
262
263 static void make_s3_event_ref(CephContext *cct, const rgw_bucket& bucket,
264 const rgw_user& owner,
265 const rgw_obj_key& key,
266 const ceph::real_time& mtime,
267 const std::vector<std::pair<std::string, std::string>>* attrs,
268 rgw::notify::EventType event_type,
269 EventRef<rgw_pubsub_s3_event>* event) {
270 *event = std::make_shared<rgw_pubsub_s3_event>();
271
272 EventRef<rgw_pubsub_s3_event>& e = *event;
273 e->eventTime = mtime;
274 e->eventName = rgw::notify::to_event_string(event_type);
275 // userIdentity: not supported in sync module
276 // x_amz_request_id: not supported in sync module
277 // x_amz_id_2: not supported in sync module
278 // configurationId is filled from subscription configuration
279 e->bucket_name = bucket.name;
280 e->bucket_ownerIdentity = owner.to_str();
281 e->bucket_arn = to_string(rgw::ARN(bucket));
282 e->bucket_id = bucket.bucket_id; // rgw extension
283 e->object_key = key.name;
284 // object_size not supported in sync module
285 objstore_event oevent(bucket, key, mtime, attrs);
286 e->object_etag = oevent.get_hash();
287 e->object_versionId = key.instance;
288
289 // use timestamp as per key sequence id (hex encoded)
290 const utime_t ts(real_clock::now());
291 boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
292 std::back_inserter(e->object_sequencer));
293
294 set_event_id(e->id, e->object_etag, ts);
295 }
296
297 class PSManager;
298 using PSManagerRef = std::shared_ptr<PSManager>;
299
300 struct PSEnv {
301 PSConfigRef conf;
302 shared_ptr<RGWUserInfo> data_user_info;
303 PSManagerRef manager;
304
305 PSEnv() : conf(make_shared<PSConfig>()),
306 data_user_info(make_shared<RGWUserInfo>()) {}
307
308 void init(CephContext *cct, const JSONFormattable& config) {
309 conf->init(cct, config);
310 }
311
312 void init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr);
313 };
314
315 using PSEnvRef = std::shared_ptr<PSEnv>;
316
317 template<typename EventType>
318 class PSEvent {
319 const EventRef<EventType> event;
320
321 public:
322 PSEvent(const EventRef<EventType>& _event) : event(_event) {}
323
324 void format(bufferlist *bl) const {
325 bl->append(json_str("", *event));
326 }
327
328 void encode_event(bufferlist& bl) const {
329 encode(*event, bl);
330 }
331
332 const string& id() const {
333 return event->id;
334 }
335 };
336
337 template <class T>
338 class RGWSingletonCR : public RGWCoroutine {
339 friend class WrapperCR;
340
341 boost::asio::coroutine wrapper_state;
342 bool started{false};
343 int operate_ret{0};
344
345 struct WaiterInfo {
346 RGWCoroutine *cr{nullptr};
347 T *result;
348 };
349 using WaiterInfoRef = std::shared_ptr<WaiterInfo>;
350
351 deque<WaiterInfoRef> waiters;
352
353 void add_waiter(RGWCoroutine *cr, T *result) {
354 auto waiter = std::make_shared<WaiterInfo>();
355 waiter->cr = cr;
356 waiter->result = result;
357 waiters.push_back(waiter);
358 };
359
360 bool get_next_waiter(WaiterInfoRef *waiter) {
361 if (waiters.empty()) {
362 waiter->reset();
363 return false;
364 }
365
366 *waiter = waiters.front();
367 waiters.pop_front();
368 return true;
369 }
370
371 int operate_wrapper(const DoutPrefixProvider *dpp) override {
372 reenter(&wrapper_state) {
373 while (!is_done()) {
374 ldpp_dout(dpp, 20) << __func__ << "(): operate_wrapper() -> operate()" << dendl;
375 operate_ret = operate(dpp);
376 if (operate_ret < 0) {
377 ldpp_dout(dpp, 20) << *this << ": operate() returned r=" << operate_ret << dendl;
378 }
379 if (!is_done()) {
380 yield;
381 }
382 }
383
384 ldpp_dout(dpp, 20) << __func__ << "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters.size() << " waiters" << dendl;
385 /* we're done, can't yield anymore */
386
387 WaiterInfoRef waiter;
388 while (get_next_waiter(&waiter)) {
389 ldpp_dout(dpp, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl;
390 waiter->cr->set_retcode(retcode);
391 waiter->cr->set_sleeping(false);
392 return_result(dpp, waiter->result);
393 put();
394 }
395
396 return retcode;
397 }
398 return 0;
399 }
400
401 virtual void return_result(const DoutPrefixProvider *dpp, T *result) {}
402
403 public:
404 RGWSingletonCR(CephContext *_cct)
405 : RGWCoroutine(_cct) {}
406
407 int execute(const DoutPrefixProvider *dpp, RGWCoroutine *caller, T *result = nullptr) {
408 if (!started) {
409 ldpp_dout(dpp, 20) << __func__ << "(): singleton not started, starting" << dendl;
410 started = true;
411 caller->call(this);
412 return 0;
413 } else if (!is_done()) {
414 ldpp_dout(dpp, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl;
415 get();
416 add_waiter(caller, result);
417 caller->set_sleeping(true);
418 return 0;
419 }
420
421 ldpp_dout(dpp, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl;
422 caller->set_retcode(retcode);
423 return_result(dpp, result);
424 return retcode;
425 }
426 };
427
428
429 class PSSubscription;
430 using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
431
432 class PSSubscription {
433 class InitCR;
434 friend class InitCR;
435 friend class RGWPSHandleObjEventCR;
436
437 RGWDataSyncCtx *sc;
438 RGWDataSyncEnv *sync_env;
439 PSEnvRef env;
440 PSSubConfigRef sub_conf;
441 std::shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result;
442 RGWBucketInfo *bucket_info{nullptr};
443 RGWDataAccessRef data_access;
444 RGWDataAccess::BucketRef bucket;
445
446 InitCR *init_cr{nullptr};
447
448 class InitBucketLifecycleCR : public RGWCoroutine {
449 RGWDataSyncCtx *sc;
450 RGWDataSyncEnv *sync_env;
451 PSConfigRef& conf;
452 LCRule rule;
453
454 int retention_days;
455
456 rgw_bucket_lifecycle_config_params lc_config;
457
458 public:
459 InitBucketLifecycleCR(RGWDataSyncCtx *_sc,
460 PSConfigRef& _conf,
461 rgw::sal::Bucket* _bucket) : RGWCoroutine(_sc->cct),
462 sc(_sc), sync_env(_sc->env),
463 conf(_conf) {
464 lc_config.bucket = _bucket;
465 lc_config.bucket_attrs = _bucket->get_attrs();
466 retention_days = conf->events_retention_days;
467 }
468
469 int operate(const DoutPrefixProvider *dpp) override {
470 reenter(this) {
471
472 rule.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days);
473
474 {
475 /* maybe we already have it configured? */
476 RGWLifecycleConfiguration old_config;
477 auto aiter = lc_config.bucket_attrs.find(RGW_ATTR_LC);
478 if (aiter != lc_config.bucket_attrs.end()) {
479 bufferlist::const_iterator iter{&aiter->second};
480 try {
481 old_config.decode(iter);
482 } catch (const buffer::error& e) {
483 ldpp_dout(dpp, 0) << __func__ << "(): decode life cycle config failed" << dendl;
484 }
485 }
486
487 auto old_rules = old_config.get_rule_map();
488 for (auto ori : old_rules) {
489 auto& old_rule = ori.second;
490
491 if (old_rule.get_prefix().empty() &&
492 old_rule.get_expiration().get_days() == retention_days &&
493 old_rule.is_enabled()) {
494 ldpp_dout(dpp, 20) << "no need to set lifecycle rule on bucket, existing rule matches config" << dendl;
495 return set_cr_done();
496 }
497 }
498 }
499
500 lc_config.config.add_rule(rule);
501 yield call(new RGWBucketLifecycleConfigCR(sync_env->async_rados,
502 sync_env->store,
503 lc_config,
504 dpp));
505 if (retcode < 0) {
506 ldpp_dout(dpp, 1) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl;
507 return set_cr_error(retcode);
508 }
509
510 return set_cr_done();
511 }
512 return 0;
513 }
514 };
515
516 class InitCR : public RGWSingletonCR<bool> {
517 RGWDataSyncCtx *sc;
518 RGWDataSyncEnv *sync_env;
519 PSSubscriptionRef sub;
520 rgw_get_bucket_info_params get_bucket_info;
521 rgw_bucket_create_local_params create_bucket;
522 PSConfigRef& conf;
523 PSSubConfigRef& sub_conf;
524 int i;
525
526 public:
527 InitCR(RGWDataSyncCtx *_sc,
528 PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sc->cct),
529 sc(_sc), sync_env(_sc->env),
530 sub(_sub), conf(sub->env->conf),
531 sub_conf(sub->sub_conf) {
532 }
533
534 int operate(const DoutPrefixProvider *dpp) override {
535 reenter(this) {
536 get_bucket_info.tenant = conf->user.tenant;
537 get_bucket_info.bucket_name = sub_conf->data_bucket_name;
538 sub->get_bucket_info_result = make_shared<rgw_get_bucket_info_result>();
539
540 for (i = 0; i < 2; ++i) {
541 yield call(new RGWGetBucketInfoCR(sync_env->async_rados,
542 sync_env->store,
543 get_bucket_info,
544 sub->get_bucket_info_result,
545 dpp));
546 if (retcode < 0 && retcode != -ENOENT) {
547 ldpp_dout(dpp, 1) << "ERROR: failed to geting bucket info: " << "tenant="
548 << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
549 }
550 if (retcode == 0) {
551 {
552 auto& result = sub->get_bucket_info_result;
553 sub->bucket_info = &result->bucket->get_info();
554
555 int ret = sub->data_access->get_bucket(result->bucket->get_info(), result->bucket->get_attrs(), &sub->bucket);
556 if (ret < 0) {
557 ldpp_dout(dpp, 1) << "ERROR: data_access.get_bucket() bucket=" << result->bucket << " failed, ret=" << ret << dendl;
558 return set_cr_error(ret);
559 }
560 }
561
562 yield call(new InitBucketLifecycleCR(sc, conf,
563 sub->get_bucket_info_result->bucket.get()));
564 if (retcode < 0) {
565 ldpp_dout(dpp, 1) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl;
566 return set_cr_error(retcode);
567 }
568
569 return set_cr_done();
570 }
571
572 create_bucket.user_info = sub->env->data_user_info;
573 create_bucket.bucket_name = sub_conf->data_bucket_name;
574 ldpp_dout(dpp, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl;
575 yield call(new RGWBucketCreateLocalCR(sync_env->async_rados,
576 sync_env->store,
577 create_bucket,
578 dpp));
579 if (retcode < 0) {
580 ldpp_dout(dpp, 1) << "ERROR: failed to create bucket: " << "tenant="
581 << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
582 return set_cr_error(retcode);
583 }
584
585 /* second iteration: we got -ENOENT and created a bucket */
586 }
587
588 /* failed twice on -ENOENT, unexpected */
589 ldpp_dout(dpp, 1) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant
590 << " name=" << get_bucket_info.bucket_name << dendl;
591 return set_cr_error(-EIO);
592 }
593 return 0;
594 }
595 };
596
597 template<typename EventType>
598 class StoreEventCR : public RGWCoroutine {
599 RGWDataSyncCtx* const sc;
600 RGWDataSyncEnv* const sync_env;
601 const PSSubscriptionRef sub;
602 const PSEvent<EventType> pse;
603 const string oid_prefix;
604
605 public:
606 StoreEventCR(RGWDataSyncCtx* const _sc,
607 const PSSubscriptionRef& _sub,
608 const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct),
609 sc(_sc), sync_env(_sc->env),
610 sub(_sub),
611 pse(_event),
612 oid_prefix(sub->sub_conf->data_oid_prefix) {
613 }
614
615 int operate(const DoutPrefixProvider *dpp) override {
616 rgw_object_simple_put_params put_obj;
617 reenter(this) {
618
619 put_obj.bucket = sub->bucket;
620 put_obj.key = rgw_obj_key(oid_prefix + pse.id());
621
622 pse.format(&put_obj.data);
623
624 {
625 bufferlist bl;
626 pse.encode_event(bl);
627 bufferlist bl64;
628 bl.encode_base64(bl64);
629 put_obj.user_data = bl64.to_str();
630 }
631
632 yield call(new RGWObjectSimplePutCR(sync_env->async_rados,
633 sync_env->store,
634 put_obj,
635 dpp));
636 if (retcode < 0) {
637 ldpp_dout(dpp, 10) << "failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
638 return set_cr_error(retcode);
639 } else {
640 ldpp_dout(dpp, 20) << "event stored: " << put_obj.bucket << "/" << put_obj.key << dendl;
641 }
642
643 return set_cr_done();
644 }
645 return 0;
646 }
647 };
648
649 template<typename EventType>
650 class PushEventCR : public RGWCoroutine {
651 RGWDataSyncCtx* const sc;
652 RGWDataSyncEnv* const sync_env;
653 const EventRef<EventType> event;
654 const PSSubConfigRef& sub_conf;
655
656 public:
657 PushEventCR(RGWDataSyncCtx* const _sc,
658 const PSSubscriptionRef& _sub,
659 const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct),
660 sc(_sc), sync_env(_sc->env),
661 event(_event),
662 sub_conf(_sub->sub_conf) {
663 }
664
665 int operate(const DoutPrefixProvider *dpp) override {
666 reenter(this) {
667 ceph_assert(sub_conf->push_endpoint);
668 yield call(sub_conf->push_endpoint->send_to_completion_async(*event.get(), sync_env));
669
670 if (retcode < 0) {
671 ldpp_dout(dpp, 10) << "failed to push event: " << event->id <<
672 " to endpoint: " << sub_conf->push_endpoint_name << " ret=" << retcode << dendl;
673 return set_cr_error(retcode);
674 }
675
676 ldpp_dout(dpp, 20) << "event: " << event->id <<
677 " pushed to endpoint: " << sub_conf->push_endpoint_name << dendl;
678 return set_cr_done();
679 }
680 return 0;
681 }
682 };
683
684 public:
685 PSSubscription(RGWDataSyncCtx *_sc,
686 PSEnvRef _env,
687 PSSubConfigRef& _sub_conf) : sc(_sc), sync_env(_sc->env),
688 env(_env),
689 sub_conf(_sub_conf),
690 data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {}
691
692 PSSubscription(RGWDataSyncCtx *_sc,
693 PSEnvRef _env,
694 rgw_pubsub_sub_config& user_sub_conf) : sc(_sc), sync_env(_sc->env),
695 env(_env),
696 sub_conf(std::make_shared<PSSubConfig>()),
697 data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {
698 sub_conf->from_user_conf(sync_env->cct, user_sub_conf, sync_env->dpp);
699 }
700 virtual ~PSSubscription() {
701 if (init_cr) {
702 init_cr->put();
703 }
704 }
705
706 template <class C>
707 static PSSubscriptionRef get_shared(RGWDataSyncCtx *_sc,
708 PSEnvRef _env,
709 C& _sub_conf) {
710 auto sub = std::make_shared<PSSubscription>(_sc, _env, _sub_conf);
711 sub->init_cr = new InitCR(_sc, sub);
712 sub->init_cr->get();
713 return sub;
714 }
715
716 int call_init_cr(const DoutPrefixProvider *dpp, RGWCoroutine *caller) {
717 return init_cr->execute(dpp, caller);
718 }
719
720 template<typename EventType>
721 static RGWCoroutine *store_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
722 return new StoreEventCR<EventType>(sc, sub, event);
723 }
724
725 template<typename EventType>
726 static RGWCoroutine *push_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
727 return new PushEventCR<EventType>(sc, sub, event);
728 }
729 friend class InitCR;
730 };
731
732 class PSManager
733 {
734 RGWDataSyncCtx *sc;
735 RGWDataSyncEnv *sync_env;
736 PSEnvRef env;
737
738 std::map<string, PSSubscriptionRef> subs;
739
740 class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> {
741 RGWDataSyncCtx *sc;
742 RGWDataSyncEnv *sync_env;
743 PSManagerRef mgr;
744 rgw_user owner;
745 string sub_name;
746 string sub_id;
747 PSSubscriptionRef *ref;
748
749 PSConfigRef conf;
750
751 PSSubConfigRef sub_conf;
752 rgw_pubsub_sub_config user_sub_conf;
753
754 public:
755 GetSubCR(RGWDataSyncCtx *_sc,
756 PSManagerRef& _mgr,
757 const rgw_user& _owner,
758 const string& _sub_name,
759 PSSubscriptionRef *_ref) : RGWSingletonCR<PSSubscriptionRef>(_sc->cct),
760 sc(_sc), sync_env(_sc->env),
761 mgr(_mgr),
762 owner(_owner),
763 sub_name(_sub_name),
764 ref(_ref),
765 conf(mgr->env->conf) {
766 }
767 ~GetSubCR() { }
768
769 int operate(const DoutPrefixProvider *dpp) override {
770 reenter(this) {
771 if (owner.empty()) {
772 ldpp_dout(dpp, 1) << "ERROR: missing user info when getting subscription: " << sub_name << dendl;
773 mgr->remove_get_sub(owner, sub_name);
774 return set_cr_error(-EINVAL);
775 } else {
776 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
777 yield {
778 RGWPubSub ps(sync_env->store, owner.tenant);
779 rgw_raw_obj obj;
780 ps.get_sub_meta_obj(sub_name, &obj);
781 bool empty_on_enoent = false;
782 call(new ReadInfoCR(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj,
783 obj,
784 &user_sub_conf, empty_on_enoent));
785 }
786 if (retcode < 0) {
787 mgr->remove_get_sub(owner, sub_name);
788 return set_cr_error(retcode);
789 }
790
791 *ref = PSSubscription::get_shared(sc, mgr->env, user_sub_conf);
792 }
793
794 yield (*ref)->call_init_cr(dpp, this);
795 if (retcode < 0) {
796 ldpp_dout(dpp, 1) << "ERROR: failed to init subscription when getting subscription: " << sub_name << dendl;
797 mgr->remove_get_sub(owner, sub_name);
798 return set_cr_error(retcode);
799 }
800
801 mgr->remove_get_sub(owner, sub_name);
802
803 return set_cr_done();
804 }
805 return 0;
806 }
807
808 void return_result(const DoutPrefixProvider *dpp, PSSubscriptionRef *result) override {
809 ldpp_dout(dpp, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl;
810 if (retcode >= 0) {
811 *result = *ref;
812 }
813 }
814 };
815
816 string sub_id(const rgw_user& owner, const string& sub_name) {
817 string owner_prefix;
818 if (!owner.empty()) {
819 owner_prefix = owner.to_str() + "/";
820 }
821
822 return owner_prefix + sub_name;
823 }
824
825 std::map<std::string, GetSubCR *> get_subs;
826
827 GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) {
828 return get_subs[sub_id(owner, name)];
829 }
830
831 void remove_get_sub(const rgw_user& owner, const string& name) {
832 get_subs.erase(sub_id(owner, name));
833 }
834
835 bool find_sub_instance(const rgw_user& owner, const string& sub_name, PSSubscriptionRef *sub) {
836 auto iter = subs.find(sub_id(owner, sub_name));
837 if (iter != subs.end()) {
838 *sub = iter->second;
839 return true;
840 }
841 return false;
842 }
843
844 PSManager(RGWDataSyncCtx *_sc,
845 PSEnvRef _env) : sc(_sc), sync_env(_sc->env),
846 env(_env) {}
847
848 public:
849 static PSManagerRef get_shared(RGWDataSyncCtx *_sc,
850 PSEnvRef _env) {
851 return std::shared_ptr<PSManager>(new PSManager(_sc, _env));
852 }
853
854 static int call_get_subscription_cr(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, PSManagerRef& mgr,
855 RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) {
856 if (mgr->find_sub_instance(owner, sub_name, ref)) {
857 /* found it! nothing to execute */
858 ldpp_dout(dpp, 20) << __func__ << "(): found sub instance" << dendl;
859 }
860 auto& gs = mgr->get_get_subs(owner, sub_name);
861 if (!gs) {
862 ldpp_dout(dpp, 20) << __func__ << "(): first get subs" << dendl;
863 gs = new GetSubCR(sc, mgr, owner, sub_name, ref);
864 }
865 ldpp_dout(dpp, 20) << __func__ << "(): executing get subs" << dendl;
866 return gs->execute(dpp, caller, ref);
867 }
868
869 friend class GetSubCR;
870 };
871
872 void PSEnv::init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr) {
873 manager = mgr;
874 conf->init_instance(realm, instance_id);
875 }
876
877 class RGWPSInitEnvCBCR : public RGWCoroutine {
878 RGWDataSyncCtx *sc;
879 RGWDataSyncEnv *sync_env;
880 PSEnvRef env;
881 PSConfigRef& conf;
882
883 rgw_user_create_params create_user;
884 rgw_get_user_info_params get_user_info;
885 public:
886 RGWPSInitEnvCBCR(RGWDataSyncCtx *_sc,
887 PSEnvRef& _env) : RGWCoroutine(_sc->cct),
888 sc(_sc), sync_env(_sc->env),
889 env(_env), conf(env->conf) {}
890 int operate(const DoutPrefixProvider *dpp) override {
891 reenter(this) {
892 ldpp_dout(dpp, 1) << ": init pubsub config zone=" << sc->source_zone << dendl;
893
894 /* nothing to do here right now */
895 create_user.user = conf->user;
896 create_user.max_buckets = 0; /* unlimited */
897 create_user.display_name = "pubsub";
898 create_user.generate_key = false;
899 yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user, dpp));
900 if (retcode < 0 && retcode != -ERR_USER_EXIST) {
901 ldpp_dout(dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
902 return set_cr_error(retcode);
903 }
904
905 get_user_info.user = conf->user;
906 yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info, dpp));
907 if (retcode < 0) {
908 ldpp_dout(dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
909 return set_cr_error(retcode);
910 }
911
912 ldpp_dout(dpp, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env->data_user_info, true) << dendl;
913
914
915 return set_cr_done();
916 }
917 return 0;
918 }
919 };
920
921 bool match(const rgw_pubsub_topic_filter& filter, const std::string& key_name, rgw::notify::EventType event_type) {
922 if (!match(filter.events, event_type)) {
923 return false;
924 }
925 if (!match(filter.s3_filter.key_filter, key_name)) {
926 return false;
927 }
928 return true;
929 }
930
931 class RGWPSFindBucketTopicsCR : public RGWCoroutine {
932 RGWDataSyncCtx *sc;
933 RGWDataSyncEnv *sync_env;
934 PSEnvRef env;
935 rgw_user owner;
936 rgw_bucket bucket;
937 rgw_obj_key key;
938 rgw::notify::EventType event_type;
939
940 RGWPubSub ps;
941
942 rgw_raw_obj bucket_obj;
943 rgw_raw_obj user_obj;
944 rgw_pubsub_bucket_topics bucket_topics;
945 rgw_pubsub_topics user_topics;
946 TopicsRef *topics;
947 public:
948 RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc,
949 PSEnvRef& _env,
950 const rgw_user& _owner,
951 const rgw_bucket& _bucket,
952 const rgw_obj_key& _key,
953 rgw::notify::EventType _event_type,
954 TopicsRef *_topics) : RGWCoroutine(_sc->cct),
955 sc(_sc), sync_env(_sc->env),
956 env(_env),
957 owner(_owner),
958 bucket(_bucket),
959 key(_key),
960 event_type(_event_type),
961 ps(sync_env->store, owner.tenant),
962 topics(_topics) {
963 *topics = std::make_shared<vector<PSTopicConfigRef> >();
964 }
965 int operate(const DoutPrefixProvider *dpp) override {
966 reenter(this) {
967 ps.get_bucket_meta_obj(bucket, &bucket_obj);
968 ps.get_meta_obj(&user_obj);
969
970 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
971 yield {
972 bool empty_on_enoent = true;
973 call(new ReadInfoCR(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj,
974 bucket_obj,
975 &bucket_topics, empty_on_enoent));
976 }
977 if (retcode < 0 && retcode != -ENOENT) {
978 return set_cr_error(retcode);
979 }
980
981 ldpp_dout(dpp, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
982
983 if (!bucket_topics.topics.empty()) {
984 using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_topics>;
985 yield {
986 bool empty_on_enoent = true;
987 call(new ReadUserTopicsInfoCR(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj,
988 user_obj,
989 &user_topics, empty_on_enoent));
990 }
991 if (retcode < 0 && retcode != -ENOENT) {
992 return set_cr_error(retcode);
993 }
994 }
995
996 for (auto& titer : bucket_topics.topics) {
997 auto& topic_filter = titer.second;
998 auto& info = topic_filter.topic;
999 if (!match(topic_filter, key.name, event_type)) {
1000 continue;
1001 }
1002 std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
1003 tc->name = info.name;
1004 tc->subs = user_topics.topics[info.name].subs;
1005 tc->opaque_data = info.opaque_data;
1006 (*topics)->push_back(tc);
1007 }
1008
1009 return set_cr_done();
1010 }
1011 return 0;
1012 }
1013 };
1014
1015 class RGWPSHandleObjEventCR : public RGWCoroutine {
1016 RGWDataSyncCtx* const sc;
1017 const PSEnvRef env;
1018 const rgw_user owner;
1019 const EventRef<rgw_pubsub_event> event;
1020 const EventRef<rgw_pubsub_s3_event> s3_event;
1021 const TopicsRef topics;
1022 bool has_subscriptions;
1023 bool event_handled;
1024 PSSubscriptionRef sub;
1025 std::vector<PSTopicConfigRef>::const_iterator titer;
1026 std::set<std::string>::const_iterator siter;
1027
1028 public:
1029 RGWPSHandleObjEventCR(RGWDataSyncCtx* const _sc,
1030 const PSEnvRef _env,
1031 const rgw_user& _owner,
1032 const EventRef<rgw_pubsub_event>& _event,
1033 const EventRef<rgw_pubsub_s3_event>& _s3_event,
1034 const TopicsRef& _topics) : RGWCoroutine(_sc->cct),
1035 sc(_sc),
1036 env(_env),
1037 owner(_owner),
1038 event(_event),
1039 s3_event(_s3_event),
1040 topics(_topics),
1041 has_subscriptions(false),
1042 event_handled(false) {}
1043
1044 int operate(const DoutPrefixProvider *dpp) override {
1045 reenter(this) {
1046 ldpp_dout(dpp, 20) << ": handle event: obj: z=" << sc->source_zone
1047 << " event=" << json_str("event", *event, false)
1048 << " owner=" << owner << dendl;
1049
1050 ldpp_dout(dpp, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl;
1051
1052 // outside caller should check that
1053 ceph_assert(!topics->empty());
1054
1055 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered);
1056
1057 // loop over all topics related to the bucket/object
1058 for (titer = topics->begin(); titer != topics->end(); ++titer) {
1059 ldpp_dout(dpp, 20) << ": notification for " << event->source << ": topic=" <<
1060 (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
1061 // loop over all subscriptions of the topic
1062 for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
1063 ldpp_dout(dpp, 20) << ": subscription: " << *siter << dendl;
1064 has_subscriptions = true;
1065 // try to read subscription configuration
1066 yield PSManager::call_get_subscription_cr(dpp, sc, env->manager, this, owner, *siter, &sub);
1067 if (retcode < 0) {
1068 if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
1069 ldpp_dout(dpp, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
1070 << " ret=" << retcode << dendl;
1071 if (retcode == -ENOENT) {
1072 // missing subscription info should be reflected back as invalid argument
1073 // and not as missing object
1074 retcode = -EINVAL;
1075 }
1076 // try the next subscription
1077 continue;
1078 }
1079 if (sub->sub_conf->s3_id.empty()) {
1080 // subscription was not made by S3 compatible API
1081 ldpp_dout(dpp, 20) << "storing event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
1082 yield call(PSSubscription::store_event_cr(sc, sub, event));
1083 if (retcode < 0) {
1084 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
1085 ldpp_dout(dpp, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
1086 } else {
1087 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
1088 event_handled = true;
1089 }
1090 if (sub->sub_conf->push_endpoint) {
1091 ldpp_dout(dpp, 20) << "push event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
1092 yield call(PSSubscription::push_event_cr(sc, sub, event));
1093 if (retcode < 0) {
1094 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
1095 ldpp_dout(dpp, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
1096 } else {
1097 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
1098 event_handled = true;
1099 }
1100 }
1101 } else {
1102 // subscription was made by S3 compatible API
1103 ldpp_dout(dpp, 20) << "storing s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
1104 s3_event->configurationId = sub->sub_conf->s3_id;
1105 s3_event->opaque_data = (*titer)->opaque_data;
1106 yield call(PSSubscription::store_event_cr(sc, sub, s3_event));
1107 if (retcode < 0) {
1108 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
1109 ldpp_dout(dpp, 1) << "ERROR: failed to store s3 event for subscription=" << *siter << " ret=" << retcode << dendl;
1110 } else {
1111 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
1112 event_handled = true;
1113 }
1114 if (sub->sub_conf->push_endpoint) {
1115 ldpp_dout(dpp, 20) << "push s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
1116 yield call(PSSubscription::push_event_cr(sc, sub, s3_event));
1117 if (retcode < 0) {
1118 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
1119 ldpp_dout(dpp, 1) << "ERROR: failed to push s3 event for subscription=" << *siter << " ret=" << retcode << dendl;
1120 } else {
1121 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
1122 event_handled = true;
1123 }
1124 }
1125 }
1126 }
1127 }
1128 if (has_subscriptions && !event_handled) {
1129 // event is considered "lost" of it has subscriptions on any of its topics
1130 // but it was not stored in, or pushed to, any of them
1131 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost);
1132 }
1133 if (retcode < 0) {
1134 return set_cr_error(retcode);
1135 }
1136 return set_cr_done();
1137 }
1138 return 0;
1139 }
1140 };
1141
1142 // coroutine invoked on remote object creation
1143 class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
1144 RGWDataSyncCtx *sc;
1145 rgw_bucket_sync_pipe sync_pipe;
1146 PSEnvRef env;
1147 std::optional<uint64_t> versioned_epoch;
1148 EventRef<rgw_pubsub_event> event;
1149 EventRef<rgw_pubsub_s3_event> s3_event;
1150 TopicsRef topics;
1151 public:
1152 RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
1153 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
1154 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
1155 TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1156 sc(_sc),
1157 sync_pipe(_sync_pipe),
1158 env(_env),
1159 versioned_epoch(_versioned_epoch),
1160 topics(_topics) {
1161 }
1162 int operate(const DoutPrefixProvider *dpp) override {
1163 reenter(this) {
1164 ldpp_dout(dpp, 20) << ": stat of remote obj: z=" << sc->source_zone
1165 << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
1166 << " attrs=" << attrs << dendl;
1167 {
1168 std::vector<std::pair<std::string, std::string> > attrs;
1169 for (auto& attr : attrs) {
1170 std::string k = attr.first;
1171 if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) {
1172 k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
1173 }
1174 attrs.push_back(std::make_pair(k, attr.second));
1175 }
1176 // at this point we don't know whether we need the ceph event or S3 event
1177 // this is why both are created here, once we have information about the
1178 // subscription, we will store/push only the relevant ones
1179 make_event_ref(sc->cct,
1180 sync_pipe.info.source_bs.bucket, key,
1181 mtime, &attrs,
1182 rgw::notify::ObjectCreated, &event);
1183 make_s3_event_ref(sc->cct,
1184 sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
1185 mtime, &attrs,
1186 rgw::notify::ObjectCreated, &s3_event);
1187 }
1188
1189 yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, s3_event, topics));
1190 if (retcode < 0) {
1191 return set_cr_error(retcode);
1192 }
1193 return set_cr_done();
1194 }
1195 return 0;
1196 }
1197 };
1198
1199 class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
1200 rgw_bucket_sync_pipe sync_pipe;
1201 PSEnvRef env;
1202 std::optional<uint64_t> versioned_epoch;
1203 TopicsRef topics;
1204 public:
1205 RGWPSHandleRemoteObjCR(RGWDataSyncCtx *_sc,
1206 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
1207 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
1208 TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1209 sync_pipe(_sync_pipe),
1210 env(_env), versioned_epoch(_versioned_epoch),
1211 topics(_topics) {
1212 }
1213
1214 ~RGWPSHandleRemoteObjCR() override {}
1215
1216 RGWStatRemoteObjCBCR *allocate_callback() override {
1217 return new RGWPSHandleRemoteObjCBCR(sc, sync_pipe, key, env, versioned_epoch, topics);
1218 }
1219 };
1220
1221 class RGWPSHandleObjCreateCR : public RGWCoroutine {
1222 RGWDataSyncCtx *sc;
1223 rgw_bucket_sync_pipe sync_pipe;
1224 rgw_obj_key key;
1225 PSEnvRef env;
1226 std::optional<uint64_t> versioned_epoch;
1227 TopicsRef topics;
1228 public:
1229 RGWPSHandleObjCreateCR(RGWDataSyncCtx *_sc,
1230 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
1231 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch) : RGWCoroutine(_sc->cct),
1232 sc(_sc),
1233 sync_pipe(_sync_pipe),
1234 key(_key),
1235 env(_env),
1236 versioned_epoch(_versioned_epoch) {
1237 }
1238
1239 ~RGWPSHandleObjCreateCR() override {}
1240
1241 int operate(const DoutPrefixProvider *dpp) override {
1242 reenter(this) {
1243 yield call(new RGWPSFindBucketTopicsCR(sc, env, sync_pipe.dest_bucket_info.owner,
1244 sync_pipe.info.source_bs.bucket, key,
1245 rgw::notify::ObjectCreated,
1246 &topics));
1247 if (retcode < 0) {
1248 ldpp_dout(dpp, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
1249 return set_cr_error(retcode);
1250 }
1251 if (topics->empty()) {
1252 ldpp_dout(dpp, 20) << "no topics found for " << sync_pipe.info.source_bs.bucket << "/" << key << dendl;
1253 return set_cr_done();
1254 }
1255 yield call(new RGWPSHandleRemoteObjCR(sc, sync_pipe, key, env, versioned_epoch, topics));
1256 if (retcode < 0) {
1257 return set_cr_error(retcode);
1258 }
1259 return set_cr_done();
1260 }
1261 return 0;
1262 }
1263 };
1264
1265 // coroutine invoked on remote object deletion
1266 class RGWPSGenericObjEventCBCR : public RGWCoroutine {
1267 RGWDataSyncCtx *sc;
1268 PSEnvRef env;
1269 rgw_user owner;
1270 rgw_bucket bucket;
1271 rgw_obj_key key;
1272 ceph::real_time mtime;
1273 rgw::notify::EventType event_type;
1274 EventRef<rgw_pubsub_event> event;
1275 EventRef<rgw_pubsub_s3_event> s3_event;
1276 TopicsRef topics;
1277 public:
1278 RGWPSGenericObjEventCBCR(RGWDataSyncCtx *_sc,
1279 PSEnvRef _env,
1280 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
1281 rgw::notify::EventType _event_type) : RGWCoroutine(_sc->cct),
1282 sc(_sc),
1283 env(_env),
1284 owner(_sync_pipe.dest_bucket_info.owner),
1285 bucket(_sync_pipe.dest_bucket_info.bucket),
1286 key(_key),
1287 mtime(_mtime), event_type(_event_type) {}
1288 int operate(const DoutPrefixProvider *dpp) override {
1289 reenter(this) {
1290 ldpp_dout(dpp, 20) << ": remove remote obj: z=" << sc->source_zone
1291 << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
1292 yield call(new RGWPSFindBucketTopicsCR(sc, env, owner, bucket, key, event_type, &topics));
1293 if (retcode < 0) {
1294 ldpp_dout(dpp, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
1295 return set_cr_error(retcode);
1296 }
1297 if (topics->empty()) {
1298 ldpp_dout(dpp, 20) << "no topics found for " << bucket << "/" << key << dendl;
1299 return set_cr_done();
1300 }
1301 // at this point we don't know whether we need the ceph event or S3 event
1302 // this is why both are created here, once we have information about the
1303 // subscription, we will store/push only the relevant ones
1304 make_event_ref(sc->cct,
1305 bucket, key,
1306 mtime, nullptr,
1307 event_type, &event);
1308 make_s3_event_ref(sc->cct,
1309 bucket, owner, key,
1310 mtime, nullptr,
1311 event_type, &s3_event);
1312 yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, s3_event, topics));
1313 if (retcode < 0) {
1314 return set_cr_error(retcode);
1315 }
1316 return set_cr_done();
1317 }
1318 return 0;
1319 }
1320
1321 };
1322
1323 class RGWPSDataSyncModule : public RGWDataSyncModule {
1324 PSEnvRef env;
1325 PSConfigRef& conf;
1326
1327 public:
1328 RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared<PSEnv>()), conf(env->conf) {
1329 env->init(cct, config);
1330 }
1331
1332 ~RGWPSDataSyncModule() override {}
1333
1334 void init(RGWDataSyncCtx *sc, uint64_t instance_id) override {
1335 auto sync_env = sc->env;
1336 PSManagerRef mgr = PSManager::get_shared(sc, env);
1337 env->init_instance(sync_env->svc->zone->get_realm(), instance_id, mgr);
1338 }
1339
1340 RGWCoroutine *start_sync(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc) override {
1341 ldpp_dout(dpp, 5) << conf->id << ": start" << dendl;
1342 return new RGWPSInitEnvCBCR(sc, env);
1343 }
1344
1345 RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
1346 rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
1347 ldpp_dout(dpp, 10) << conf->id << ": sync_object: b=" << sync_pipe <<
1348 " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
1349 return new RGWPSHandleObjCreateCR(sc, sync_pipe, key, env, versioned_epoch);
1350 }
1351
1352 RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
1353 rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
1354 ldpp_dout(dpp, 10) << conf->id << ": rm_object: b=" << sync_pipe <<
1355 " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1356 return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete);
1357 }
1358
1359 RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
1360 rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
1361 ldpp_dout(dpp, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe <<
1362 " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1363 return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated);
1364 }
1365
1366 PSConfigRef& get_conf() { return conf; }
1367 };
1368
1369 RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config)
1370 {
1371 data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config));
1372 const std::string jconf = json_str("conf", *data_handler->get_conf());
1373 JSONParser p;
1374 if (!p.parse(jconf.c_str(), jconf.size())) {
1375 ldpp_dout(dpp, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
1376 effective_conf = config;
1377 } else {
1378 effective_conf.decode_json(&p);
1379 }
1380 }
1381
1382 RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()
1383 {
1384 return data_handler.get();
1385 }
1386
1387 RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
1388 if (dialect != RGW_REST_S3) {
1389 return orig;
1390 }
1391 return new RGWRESTMgr_PubSub();
1392 }
1393
1394 bool RGWPSSyncModuleInstance::should_full_sync() const {
1395 return data_handler->get_conf()->start_with_full_sync;
1396 }
1397
1398 int RGWPSSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
1399 instance->reset(new RGWPSSyncModuleInstance(dpp, cct, config));
1400 return 0;
1401 }
1402
1403