]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_module_pubsub.cc
0dbe3400e69ab28847a09b70f25e16b9072385e3
[ceph.git] / ceph / src / rgw / rgw_sync_module_pubsub.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "services/svc_zone.h"
5 #include "rgw_common.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_sync_module.h"
8 #include "rgw_data_sync.h"
9 #include "rgw_sync_module_pubsub.h"
10 #include "rgw_sync_module_pubsub_rest.h"
11 #include "rgw_rest_conn.h"
12 #include "rgw_cr_rados.h"
13 #include "rgw_cr_rest.h"
14 #include "rgw_cr_tools.h"
15 #include "rgw_op.h"
16 #include "rgw_pubsub.h"
17 #include "rgw_pubsub_push.h"
18 #include "rgw_notify_event_type.h"
19 #include "rgw_perf_counters.h"
20 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
21 #include "rgw_amqp.h"
22 #endif
23 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
24 #include "rgw_kafka.h"
25 #endif
26
27 #include <boost/algorithm/hex.hpp>
28 #include <boost/asio/yield.hpp>
29
30 #define dout_subsys ceph_subsys_rgw
31
32
33 #define PUBSUB_EVENTS_RETENTION_DEFAULT 7
34
35 /*
36
37 config:
38
39 {
40 "tenant": <tenant>, # default: <empty>
41 "uid": <uid>, # default: "pubsub"
42 "data_bucket_prefix": <prefix> # default: "pubsub-"
43 "data_oid_prefix": <prefix> #
44 "events_retention_days": <int> # default: 7
45 "start_with_full_sync" <bool> # default: false
46
47 # non-dynamic config
48 "notifications": [
49 {
50 "path": <notification-path>, # this can be either an explicit path: <bucket>, or <bucket>/<object>,
51 # or a prefix if it ends with a wildcard
52 "topic": <topic-name>
53 },
54 ...
55 ],
56 "subscriptions": [
57 {
58 "name": <subscription-name>,
59 "topic": <topic>,
60 "push_endpoint": <endpoint>,
61 "push_endpoint_args:" <arg list>. # any push endpoint specific args (include all args)
62 "data_bucket": <bucket>, # override name of bucket where subscription data will be store
63 "data_oid_prefix": <prefix> # set prefix for subscription data object ids
64 "s3_id": <id> # in case of S3 compatible notifications, the notification ID will be set here
65 },
66 ...
67 ]
68 }
69
70 */
71
72 // utility function to convert the args list from string format
73 // (ampresend separated with equal sign) to prased structure
74 RGWHTTPArgs string_to_args(const std::string& str_args) {
75 RGWHTTPArgs args;
76 args.set(str_args);
77 args.parse();
78 return args;
79 }
80
81 struct PSSubConfig {
82 std::string name;
83 std::string topic;
84 std::string push_endpoint_name;
85 std::string push_endpoint_args;
86 std::string data_bucket_name;
87 std::string data_oid_prefix;
88 std::string s3_id;
89 std::string arn_topic;
90 RGWPubSubEndpoint::Ptr push_endpoint;
91
92 void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc) {
93 name = uc.name;
94 topic = uc.topic;
95 push_endpoint_name = uc.dest.push_endpoint;
96 data_bucket_name = uc.dest.bucket_name;
97 data_oid_prefix = uc.dest.oid_prefix;
98 s3_id = uc.s3_id;
99 arn_topic = uc.dest.arn_topic;
100 if (!push_endpoint_name.empty()) {
101 push_endpoint_args = uc.dest.push_endpoint_args;
102 try {
103 push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
104 ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
105 } catch (const RGWPubSubEndpoint::configuration_error& e) {
106 ldout(cct, 1) << "ERROR: failed to create push endpoint: "
107 << push_endpoint_name << " due to: " << e.what() << dendl;
108 }
109 }
110 }
111
112 void dump(Formatter *f) const {
113 encode_json("name", name, f);
114 encode_json("topic", topic, f);
115 encode_json("push_endpoint", push_endpoint_name, f);
116 encode_json("push_endpoint_args", push_endpoint_args, f);
117 encode_json("data_bucket_name", data_bucket_name, f);
118 encode_json("data_oid_prefix", data_oid_prefix, f);
119 encode_json("s3_id", s3_id, f);
120 }
121
122 void init(CephContext *cct, const JSONFormattable& config,
123 const string& data_bucket_prefix,
124 const string& default_oid_prefix) {
125 name = config["name"];
126 topic = config["topic"];
127 push_endpoint_name = config["push_endpoint"];
128 string default_bucket_name = data_bucket_prefix + name;
129 data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
130 data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str());
131 s3_id = config["s3_id"];
132 arn_topic = config["arn_topic"];
133 if (!push_endpoint_name.empty()) {
134 push_endpoint_args = config["push_endpoint_args"];
135 try {
136 push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
137 ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
138 } catch (const RGWPubSubEndpoint::configuration_error& e) {
139 ldout(cct, 1) << "ERROR: failed to create push endpoint: "
140 << push_endpoint_name << " due to: " << e.what() << dendl;
141 }
142 }
143 }
144 };
145
146 using PSSubConfigRef = std::shared_ptr<PSSubConfig>;
147
148 struct PSTopicConfig {
149 std::string name;
150 std::set<std::string> subs;
151 std::string opaque_data;
152
153 void dump(Formatter *f) const {
154 encode_json("name", name, f);
155 encode_json("subs", subs, f);
156 encode_json("opaque", opaque_data, f);
157 }
158 };
159
160 struct PSNotificationConfig {
161 uint64_t id{0};
162 string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */
163 string topic;
164 bool is_prefix{false};
165
166
167 void dump(Formatter *f) const {
168 encode_json("id", id, f);
169 encode_json("path", path, f);
170 encode_json("topic", topic, f);
171 encode_json("is_prefix", is_prefix, f);
172 }
173
174 void init(CephContext *cct, const JSONFormattable& config) {
175 path = config["path"];
176 if (!path.empty() && path[path.size() - 1] == '*') {
177 path = path.substr(0, path.size() - 1);
178 is_prefix = true;
179 }
180 topic = config["topic"];
181 }
182 };
183
184 template<class T>
185 static string json_str(const char *name, const T& obj, bool pretty = false)
186 {
187 stringstream ss;
188 JSONFormatter f(pretty);
189
190 encode_json(name, obj, &f);
191 f.flush(ss);
192
193 return ss.str();
194 }
195
196 using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
197 using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
198
199 struct PSConfig {
200 const std::string id{"pubsub"};
201 rgw_user user;
202 std::string data_bucket_prefix;
203 std::string data_oid_prefix;
204
205 int events_retention_days{0};
206
207 uint64_t sync_instance{0};
208 uint64_t max_id{0};
209
210 /* FIXME: no hard coded buckets, we'll have configurable topics */
211 std::map<std::string, PSSubConfigRef> subs;
212 std::map<std::string, PSTopicConfigRef> topics;
213 std::multimap<std::string, PSNotificationConfig> notifications;
214
215 bool start_with_full_sync{false};
216
217 void dump(Formatter *f) const {
218 encode_json("id", id, f);
219 encode_json("user", user, f);
220 encode_json("data_bucket_prefix", data_bucket_prefix, f);
221 encode_json("data_oid_prefix", data_oid_prefix, f);
222 encode_json("events_retention_days", events_retention_days, f);
223 encode_json("sync_instance", sync_instance, f);
224 encode_json("max_id", max_id, f);
225 {
226 Formatter::ArraySection section(*f, "subs");
227 for (auto& sub : subs) {
228 encode_json("sub", *sub.second, f);
229 }
230 }
231 {
232 Formatter::ArraySection section(*f, "topics");
233 for (auto& topic : topics) {
234 encode_json("topic", *topic.second, f);
235 }
236 }
237 {
238 Formatter::ObjectSection section(*f, "notifications");
239 std::string last;
240 for (auto& notif : notifications) {
241 const string& n = notif.first;
242 if (n != last) {
243 if (!last.empty()) {
244 f->close_section();
245 }
246 f->open_array_section(n.c_str());
247 }
248 last = n;
249 encode_json("notifications", notif.second, f);
250 }
251 if (!last.empty()) {
252 f->close_section();
253 }
254 }
255 encode_json("start_with_full_sync", start_with_full_sync, f);
256 }
257
258 void init(CephContext *cct, const JSONFormattable& config) {
259 string uid = config["uid"]("pubsub");
260 user = rgw_user(config["tenant"], uid);
261 data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
262 data_oid_prefix = config["data_oid_prefix"];
263 events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT);
264
265 for (auto& c : config["notifications"].array()) {
266 PSNotificationConfig nc;
267 nc.id = ++max_id;
268 nc.init(cct, c);
269 notifications.insert(std::make_pair(nc.path, nc));
270
271 PSTopicConfig topic_config = { .name = nc.topic };
272 topics[nc.topic] = make_shared<PSTopicConfig>(topic_config);
273 }
274 for (auto& c : config["subscriptions"].array()) {
275 auto sc = std::make_shared<PSSubConfig>();
276 sc->init(cct, c, data_bucket_prefix, data_oid_prefix);
277 subs[sc->name] = sc;
278 auto iter = topics.find(sc->topic);
279 if (iter != topics.end()) {
280 iter->second->subs.insert(sc->name);
281 }
282 }
283 start_with_full_sync = config["start_with_full_sync"](false);
284
285 ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
286 }
287
288 void init_instance(const RGWRealm& realm, uint64_t instance_id) {
289 sync_instance = instance_id;
290 }
291
292 void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
293 const std::string path = bucket.name + "/" + key.name;
294
295 auto iter = notifications.upper_bound(path);
296 if (iter == notifications.begin()) {
297 return;
298 }
299
300 do {
301 --iter;
302 if (iter->first.size() > path.size()) {
303 break;
304 }
305 if (path.compare(0, iter->first.size(), iter->first) != 0) {
306 break;
307 }
308
309 PSNotificationConfig& target = iter->second;
310
311 if (!target.is_prefix &&
312 path.size() != iter->first.size()) {
313 continue;
314 }
315
316 auto topic = topics.find(target.topic);
317 if (topic == topics.end()) {
318 continue;
319 }
320
321 ldout(cct, 20) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id <<
322 " target_path=" << target.path << ", topic=" << target.topic << dendl;
323 (*result)->push_back(topic->second);
324 } while (iter != notifications.begin());
325 }
326
327 bool find_sub(const string& name, PSSubConfigRef *ref) {
328 auto iter = subs.find(name);
329 if (iter != subs.end()) {
330 *ref = iter->second;
331 return true;
332 }
333 return false;
334 }
335 };
336
337 using PSConfigRef = std::shared_ptr<PSConfig>;
338 template<typename EventType>
339 using EventRef = std::shared_ptr<EventType>;
340
341 struct objstore_event {
342 string id;
343 const rgw_bucket& bucket;
344 const rgw_obj_key& key;
345 const ceph::real_time& mtime;
346 const std::vector<std::pair<std::string, std::string> > *attrs;
347
348 objstore_event(const rgw_bucket& _bucket,
349 const rgw_obj_key& _key,
350 const ceph::real_time& _mtime,
351 const std::vector<std::pair<std::string, std::string> > *_attrs) : bucket(_bucket),
352 key(_key),
353 mtime(_mtime),
354 attrs(_attrs) {}
355
356 string get_hash() {
357 string etag;
358 RGWMD5Etag hash;
359 hash.update(bucket.bucket_id);
360 hash.update(key.name);
361 hash.update(key.instance);
362 hash.finish(&etag);
363
364 assert(etag.size() > 8);
365
366 return etag.substr(0, 8);
367 }
368
369 void dump(Formatter *f) const {
370 {
371 Formatter::ObjectSection s(*f, "bucket");
372 encode_json("name", bucket.name, f);
373 encode_json("tenant", bucket.tenant, f);
374 encode_json("bucket_id", bucket.bucket_id, f);
375 }
376 {
377 Formatter::ObjectSection s(*f, "key");
378 encode_json("name", key.name, f);
379 encode_json("instance", key.instance, f);
380 }
381 utime_t mt(mtime);
382 encode_json("mtime", mt, f);
383 Formatter::ObjectSection s(*f, "attrs");
384 if (attrs) {
385 for (auto& attr : *attrs) {
386 encode_json(attr.first.c_str(), attr.second.c_str(), f);
387 }
388 }
389 }
390 };
391
392 static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
393 const rgw_obj_key& key,
394 const ceph::real_time& mtime,
395 const std::vector<std::pair<std::string, std::string> > *attrs,
396 rgw::notify::EventType event_type,
397 EventRef<rgw_pubsub_event> *event) {
398 *event = std::make_shared<rgw_pubsub_event>();
399
400 EventRef<rgw_pubsub_event>& e = *event;
401 e->event_name = rgw::notify::to_ceph_string(event_type);
402 e->source = bucket.name + "/" + key.name;
403 e->timestamp = real_clock::now();
404
405 objstore_event oevent(bucket, key, mtime, attrs);
406
407 const utime_t ts(e->timestamp);
408 set_event_id(e->id, oevent.get_hash(), ts);
409
410 encode_json("info", oevent, &e->info);
411 }
412
413 static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket,
414 const rgw_user& owner,
415 const rgw_obj_key& key,
416 const ceph::real_time& mtime,
417 const std::vector<std::pair<std::string, std::string> > *attrs,
418 rgw::notify::EventType event_type,
419 EventRef<rgw_pubsub_s3_record> *record) {
420 *record = std::make_shared<rgw_pubsub_s3_record>();
421
422 EventRef<rgw_pubsub_s3_record>& r = *record;
423 r->eventTime = mtime;
424 r->eventName = rgw::notify::to_string(event_type);
425 // userIdentity: not supported in sync module
426 // x_amz_request_id: not supported in sync module
427 // x_amz_id_2: not supported in sync module
428 // configurationId is filled from subscription configuration
429 r->bucket_name = bucket.name;
430 r->bucket_ownerIdentity = owner.to_str();
431 r->bucket_arn = to_string(rgw::ARN(bucket));
432 r->bucket_id = bucket.bucket_id; // rgw extension
433 r->object_key = key.name;
434 // object_size not supported in sync module
435 objstore_event oevent(bucket, key, mtime, attrs);
436 r->object_etag = oevent.get_hash();
437 r->object_versionId = key.instance;
438
439 // use timestamp as per key sequence id (hex encoded)
440 const utime_t ts(real_clock::now());
441 boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
442 std::back_inserter(r->object_sequencer));
443
444 set_event_id(r->id, r->object_etag, ts);
445 }
446
447 class PSManager;
448 using PSManagerRef = std::shared_ptr<PSManager>;
449
450 struct PSEnv {
451 PSConfigRef conf;
452 shared_ptr<RGWUserInfo> data_user_info;
453 PSManagerRef manager;
454
455 PSEnv() : conf(make_shared<PSConfig>()),
456 data_user_info(make_shared<RGWUserInfo>()) {}
457
458 void init(CephContext *cct, const JSONFormattable& config) {
459 conf->init(cct, config);
460 }
461
462 void init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr);
463 };
464
465 using PSEnvRef = std::shared_ptr<PSEnv>;
466
467 template<typename EventType>
468 class PSEvent {
469 const EventRef<EventType> event;
470
471 public:
472 PSEvent(const EventRef<EventType>& _event) : event(_event) {}
473
474 void format(bufferlist *bl) const {
475 bl->append(json_str("", *event));
476 }
477
478 void encode_event(bufferlist& bl) const {
479 encode(*event, bl);
480 }
481
482 const string& id() const {
483 return event->id;
484 }
485 };
486
487 template <class T>
488 class RGWSingletonCR : public RGWCoroutine {
489 friend class WrapperCR;
490
491 boost::asio::coroutine wrapper_state;
492 bool started{false};
493 int operate_ret{0};
494
495 struct WaiterInfo {
496 RGWCoroutine *cr{nullptr};
497 T *result;
498 };
499 using WaiterInfoRef = std::shared_ptr<WaiterInfo>;
500
501 deque<WaiterInfoRef> waiters;
502
503 void add_waiter(RGWCoroutine *cr, T *result) {
504 auto waiter = std::make_shared<WaiterInfo>();
505 waiter->cr = cr;
506 waiter->result = result;
507 waiters.push_back(waiter);
508 };
509
510 bool get_next_waiter(WaiterInfoRef *waiter) {
511 if (waiters.empty()) {
512 waiter->reset();
513 return false;
514 }
515
516 *waiter = waiters.front();
517 waiters.pop_front();
518 return true;
519 }
520
521 int operate_wrapper() override {
522 reenter(&wrapper_state) {
523 while (!is_done()) {
524 ldout(cct, 20) << __func__ << "(): operate_wrapper() -> operate()" << dendl;
525 operate_ret = operate();
526 if (operate_ret < 0) {
527 ldout(cct, 20) << *this << ": operate() returned r=" << operate_ret << dendl;
528 }
529 if (!is_done()) {
530 yield;
531 }
532 }
533
534 ldout(cct, 20) << __func__ << "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters.size() << " waiters" << dendl;
535 /* we're done, can't yield anymore */
536
537 WaiterInfoRef waiter;
538 while (get_next_waiter(&waiter)) {
539 ldout(cct, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl;
540 waiter->cr->set_retcode(retcode);
541 waiter->cr->set_sleeping(false);
542 return_result(waiter->result);
543 put();
544 }
545
546 return retcode;
547 }
548 return 0;
549 }
550
551 virtual void return_result(T *result) {}
552
553 public:
554 RGWSingletonCR(CephContext *_cct)
555 : RGWCoroutine(_cct) {}
556
557 int execute(RGWCoroutine *caller, T *result = nullptr) {
558 if (!started) {
559 ldout(cct, 20) << __func__ << "(): singleton not started, starting" << dendl;
560 started = true;
561 caller->call(this);
562 return 0;
563 } else if (!is_done()) {
564 ldout(cct, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl;
565 get();
566 add_waiter(caller, result);
567 caller->set_sleeping(true);
568 return 0;
569 }
570
571 ldout(cct, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl;
572 caller->set_retcode(retcode);
573 return_result(result);
574 return retcode;
575 }
576 };
577
578
579 class PSSubscription;
580 using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
581
582 class PSSubscription {
583 class InitCR;
584 friend class InitCR;
585 friend class RGWPSHandleObjEventCR;
586
587 RGWDataSyncCtx *sc;
588 RGWDataSyncEnv *sync_env;
589 PSEnvRef env;
590 PSSubConfigRef sub_conf;
591 std::shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result;
592 RGWBucketInfo *bucket_info{nullptr};
593 RGWDataAccessRef data_access;
594 RGWDataAccess::BucketRef bucket;
595
596 InitCR *init_cr{nullptr};
597
598 class InitBucketLifecycleCR : public RGWCoroutine {
599 RGWDataSyncCtx *sc;
600 RGWDataSyncEnv *sync_env;
601 PSConfigRef& conf;
602 LCRule rule;
603
604 int retention_days;
605
606 rgw_bucket_lifecycle_config_params lc_config;
607
608 public:
609 InitBucketLifecycleCR(RGWDataSyncCtx *_sc,
610 PSConfigRef& _conf,
611 RGWBucketInfo& _bucket_info,
612 std::map<string, bufferlist>& _bucket_attrs) : RGWCoroutine(_sc->cct),
613 sc(_sc), sync_env(_sc->env),
614 conf(_conf) {
615 lc_config.bucket_info = _bucket_info;
616 lc_config.bucket_attrs = _bucket_attrs;
617 retention_days = conf->events_retention_days;
618 }
619
620 int operate() override {
621 reenter(this) {
622
623 rule.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days);
624
625 {
626 /* maybe we already have it configured? */
627 RGWLifecycleConfiguration old_config;
628 auto aiter = lc_config.bucket_attrs.find(RGW_ATTR_LC);
629 if (aiter != lc_config.bucket_attrs.end()) {
630 bufferlist::const_iterator iter{&aiter->second};
631 try {
632 old_config.decode(iter);
633 } catch (const buffer::error& e) {
634 ldpp_dout(sync_env->dpp, 0) << __func__ << "(): decode life cycle config failed" << dendl;
635 }
636 }
637
638 auto old_rules = old_config.get_rule_map();
639 for (auto ori : old_rules) {
640 auto& old_rule = ori.second;
641
642 if (old_rule.get_prefix().empty() &&
643 old_rule.get_expiration().get_days() == retention_days &&
644 old_rule.is_enabled()) {
645 ldpp_dout(sync_env->dpp, 20) << "no need to set lifecycle rule on bucket, existing rule matches config" << dendl;
646 return set_cr_done();
647 }
648 }
649 }
650
651 lc_config.config.add_rule(rule);
652 yield call(new RGWBucketLifecycleConfigCR(sync_env->async_rados,
653 sync_env->store,
654 lc_config,
655 sync_env->dpp));
656 if (retcode < 0) {
657 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl;
658 return set_cr_error(retcode);
659 }
660
661 return set_cr_done();
662 }
663 return 0;
664 }
665 };
666
667 class InitCR : public RGWSingletonCR<bool> {
668 RGWDataSyncCtx *sc;
669 RGWDataSyncEnv *sync_env;
670 PSSubscriptionRef sub;
671 rgw_get_bucket_info_params get_bucket_info;
672 rgw_bucket_create_local_params create_bucket;
673 PSConfigRef& conf;
674 PSSubConfigRef& sub_conf;
675 int i;
676
677 public:
678 InitCR(RGWDataSyncCtx *_sc,
679 PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sc->cct),
680 sc(_sc), sync_env(_sc->env),
681 sub(_sub), conf(sub->env->conf),
682 sub_conf(sub->sub_conf) {
683 }
684
685 int operate() override {
686 reenter(this) {
687 get_bucket_info.tenant = conf->user.tenant;
688 get_bucket_info.bucket_name = sub_conf->data_bucket_name;
689 sub->get_bucket_info_result = make_shared<rgw_get_bucket_info_result>();
690
691 for (i = 0; i < 2; ++i) {
692 yield call(new RGWGetBucketInfoCR(sync_env->async_rados,
693 sync_env->store,
694 get_bucket_info,
695 sub->get_bucket_info_result));
696 if (retcode < 0 && retcode != -ENOENT) {
697 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to geting bucket info: " << "tenant="
698 << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
699 }
700 if (retcode == 0) {
701 {
702 auto& result = sub->get_bucket_info_result;
703 sub->bucket_info = &result->bucket_info;
704
705 int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket);
706 if (ret < 0) {
707 ldpp_dout(sync_env->dpp, 1) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl;
708 return set_cr_error(ret);
709 }
710 }
711
712 yield call(new InitBucketLifecycleCR(sc, conf,
713 sub->get_bucket_info_result->bucket_info,
714 sub->get_bucket_info_result->attrs));
715 if (retcode < 0) {
716 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl;
717 return set_cr_error(retcode);
718 }
719
720 return set_cr_done();
721 }
722
723 create_bucket.user_info = sub->env->data_user_info;
724 create_bucket.bucket_name = sub_conf->data_bucket_name;
725 ldpp_dout(sync_env->dpp, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl;
726 yield call(new RGWBucketCreateLocalCR(sync_env->async_rados,
727 sync_env->store,
728 create_bucket,
729 sync_env->dpp));
730 if (retcode < 0) {
731 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create bucket: " << "tenant="
732 << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
733 return set_cr_error(retcode);
734 }
735
736 /* second iteration: we got -ENOENT and created a bucket */
737 }
738
739 /* failed twice on -ENOENT, unexpected */
740 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant
741 << " name=" << get_bucket_info.bucket_name << dendl;
742 return set_cr_error(-EIO);
743 }
744 return 0;
745 }
746 };
747
748 template<typename EventType>
749 class StoreEventCR : public RGWCoroutine {
750 RGWDataSyncCtx* const sc;
751 RGWDataSyncEnv* const sync_env;
752 const PSSubscriptionRef sub;
753 const PSEvent<EventType> pse;
754 const string oid_prefix;
755
756 public:
757 StoreEventCR(RGWDataSyncCtx* const _sc,
758 const PSSubscriptionRef& _sub,
759 const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct),
760 sc(_sc), sync_env(_sc->env),
761 sub(_sub),
762 pse(_event),
763 oid_prefix(sub->sub_conf->data_oid_prefix) {
764 }
765
766 int operate() override {
767 rgw_object_simple_put_params put_obj;
768 reenter(this) {
769
770 put_obj.bucket = sub->bucket;
771 put_obj.key = rgw_obj_key(oid_prefix + pse.id());
772
773 pse.format(&put_obj.data);
774
775 {
776 bufferlist bl;
777 pse.encode_event(bl);
778 bufferlist bl64;
779 bl.encode_base64(bl64);
780 put_obj.user_data = bl64.to_str();
781 }
782
783 yield call(new RGWObjectSimplePutCR(sync_env->async_rados,
784 sync_env->store,
785 put_obj,
786 sync_env->dpp));
787 if (retcode < 0) {
788 ldpp_dout(sync_env->dpp, 10) << "failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
789 return set_cr_error(retcode);
790 } else {
791 ldpp_dout(sync_env->dpp, 20) << "event stored: " << put_obj.bucket << "/" << put_obj.key << dendl;
792 }
793
794 return set_cr_done();
795 }
796 return 0;
797 }
798 };
799
800 template<typename EventType>
801 class PushEventCR : public RGWCoroutine {
802 RGWDataSyncCtx* const sc;
803 RGWDataSyncEnv* const sync_env;
804 const EventRef<EventType> event;
805 const PSSubConfigRef& sub_conf;
806
807 public:
808 PushEventCR(RGWDataSyncCtx* const _sc,
809 const PSSubscriptionRef& _sub,
810 const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct),
811 sc(_sc), sync_env(_sc->env),
812 event(_event),
813 sub_conf(_sub->sub_conf) {
814 }
815
816 int operate() override {
817 reenter(this) {
818 ceph_assert(sub_conf->push_endpoint);
819 yield call(sub_conf->push_endpoint->send_to_completion_async(*event.get(), sync_env));
820
821 if (retcode < 0) {
822 ldout(sync_env->cct, 10) << "failed to push event: " << event->id <<
823 " to endpoint: " << sub_conf->push_endpoint_name << " ret=" << retcode << dendl;
824 return set_cr_error(retcode);
825 }
826
827 ldout(sync_env->cct, 20) << "event: " << event->id <<
828 " pushed to endpoint: " << sub_conf->push_endpoint_name << dendl;
829 return set_cr_done();
830 }
831 return 0;
832 }
833 };
834
835 public:
836 PSSubscription(RGWDataSyncCtx *_sc,
837 PSEnvRef _env,
838 PSSubConfigRef& _sub_conf) : sc(_sc), sync_env(_sc->env),
839 env(_env),
840 sub_conf(_sub_conf),
841 data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {}
842
843 PSSubscription(RGWDataSyncCtx *_sc,
844 PSEnvRef _env,
845 rgw_pubsub_sub_config& user_sub_conf) : sc(_sc), sync_env(_sc->env),
846 env(_env),
847 sub_conf(std::make_shared<PSSubConfig>()),
848 data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {
849 sub_conf->from_user_conf(sync_env->cct, user_sub_conf);
850 }
851 virtual ~PSSubscription() {
852 if (init_cr) {
853 init_cr->put();
854 }
855 }
856
857 template <class C>
858 static PSSubscriptionRef get_shared(RGWDataSyncCtx *_sc,
859 PSEnvRef _env,
860 C& _sub_conf) {
861 auto sub = std::make_shared<PSSubscription>(_sc, _env, _sub_conf);
862 sub->init_cr = new InitCR(_sc, sub);
863 sub->init_cr->get();
864 return sub;
865 }
866
867 int call_init_cr(RGWCoroutine *caller) {
868 return init_cr->execute(caller);
869 }
870
871 template<typename EventType>
872 static RGWCoroutine *store_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
873 return new StoreEventCR<EventType>(sc, sub, event);
874 }
875
876 template<typename EventType>
877 static RGWCoroutine *push_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
878 return new PushEventCR<EventType>(sc, sub, event);
879 }
880 friend class InitCR;
881 };
882
883 class PSManager
884 {
885 RGWDataSyncCtx *sc;
886 RGWDataSyncEnv *sync_env;
887 PSEnvRef env;
888
889 std::map<string, PSSubscriptionRef> subs;
890
891 class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> {
892 RGWDataSyncCtx *sc;
893 RGWDataSyncEnv *sync_env;
894 PSManagerRef mgr;
895 rgw_user owner;
896 string sub_name;
897 string sub_id;
898 PSSubscriptionRef *ref;
899
900 PSConfigRef conf;
901
902 PSSubConfigRef sub_conf;
903 rgw_pubsub_sub_config user_sub_conf;
904
905 public:
906 GetSubCR(RGWDataSyncCtx *_sc,
907 PSManagerRef& _mgr,
908 const rgw_user& _owner,
909 const string& _sub_name,
910 PSSubscriptionRef *_ref) : RGWSingletonCR<PSSubscriptionRef>(_sc->cct),
911 sc(_sc), sync_env(_sc->env),
912 mgr(_mgr),
913 owner(_owner),
914 sub_name(_sub_name),
915 ref(_ref),
916 conf(mgr->env->conf) {
917 }
918 ~GetSubCR() { }
919
920 int operate() override {
921 reenter(this) {
922 if (owner.empty()) {
923 if (!conf->find_sub(sub_name, &sub_conf)) {
924 ldout(sync_env->cct, 10) << "failed to find subscription config: name=" << sub_name << dendl;
925 mgr->remove_get_sub(owner, sub_name);
926 return set_cr_error(-ENOENT);
927 }
928
929 *ref = PSSubscription::get_shared(sc, mgr->env, sub_conf);
930 } else {
931 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
932 yield {
933 RGWPubSub ps(sync_env->store, owner.tenant);
934 rgw_raw_obj obj;
935 ps.get_sub_meta_obj(sub_name, &obj);
936 bool empty_on_enoent = false;
937 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
938 obj,
939 &user_sub_conf, empty_on_enoent));
940 }
941 if (retcode < 0) {
942 mgr->remove_get_sub(owner, sub_name);
943 return set_cr_error(retcode);
944 }
945
946 *ref = PSSubscription::get_shared(sc, mgr->env, user_sub_conf);
947 }
948
949 yield (*ref)->call_init_cr(this);
950 if (retcode < 0) {
951 ldout(sync_env->cct, 10) << "failed to init subscription" << dendl;
952 mgr->remove_get_sub(owner, sub_name);
953 return set_cr_error(retcode);
954 }
955
956 if (owner.empty()) {
957 mgr->subs[sub_name] = *ref;
958 }
959 mgr->remove_get_sub(owner, sub_name);
960
961 return set_cr_done();
962 }
963 return 0;
964 }
965
966 void return_result(PSSubscriptionRef *result) override {
967 ldout(cct, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl;
968 if (retcode >= 0) {
969 *result = *ref;
970 }
971 }
972 };
973
974 string sub_id(const rgw_user& owner, const string& sub_name) {
975 string owner_prefix;
976 if (!owner.empty()) {
977 owner_prefix = owner.to_str() + "/";
978 }
979
980 return owner_prefix + sub_name;
981 }
982
983 std::map<std::string, GetSubCR *> get_subs;
984
985 GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) {
986 return get_subs[sub_id(owner, name)];
987 }
988
989 void remove_get_sub(const rgw_user& owner, const string& name) {
990 get_subs.erase(sub_id(owner, name));
991 }
992
993 bool find_sub_instance(const rgw_user& owner, const string& sub_name, PSSubscriptionRef *sub) {
994 auto iter = subs.find(sub_id(owner, sub_name));
995 if (iter != subs.end()) {
996 *sub = iter->second;
997 return true;
998 }
999 return false;
1000 }
1001
1002 PSManager(RGWDataSyncCtx *_sc,
1003 PSEnvRef _env) : sc(_sc), sync_env(_sc->env),
1004 env(_env) {}
1005
1006 public:
1007 static PSManagerRef get_shared(RGWDataSyncCtx *_sc,
1008 PSEnvRef _env) {
1009 return std::shared_ptr<PSManager>(new PSManager(_sc, _env));
1010 }
1011
1012 static int call_get_subscription_cr(RGWDataSyncCtx *sc, PSManagerRef& mgr,
1013 RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) {
1014 if (mgr->find_sub_instance(owner, sub_name, ref)) {
1015 /* found it! nothing to execute */
1016 ldout(sc->cct, 20) << __func__ << "(): found sub instance" << dendl;
1017 }
1018 auto& gs = mgr->get_get_subs(owner, sub_name);
1019 if (!gs) {
1020 ldout(sc->cct, 20) << __func__ << "(): first get subs" << dendl;
1021 gs = new GetSubCR(sc, mgr, owner, sub_name, ref);
1022 }
1023 ldout(sc->cct, 20) << __func__ << "(): executing get subs" << dendl;
1024 return gs->execute(caller, ref);
1025 }
1026
1027 friend class GetSubCR;
1028 };
1029
1030 void PSEnv::init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr) {
1031 manager = mgr;
1032 conf->init_instance(realm, instance_id);
1033 }
1034
1035 class RGWPSInitEnvCBCR : public RGWCoroutine {
1036 RGWDataSyncCtx *sc;
1037 RGWDataSyncEnv *sync_env;
1038 PSEnvRef env;
1039 PSConfigRef& conf;
1040
1041 rgw_user_create_params create_user;
1042 rgw_get_user_info_params get_user_info;
1043 public:
1044 RGWPSInitEnvCBCR(RGWDataSyncCtx *_sc,
1045 PSEnvRef& _env) : RGWCoroutine(_sc->cct),
1046 sc(_sc), sync_env(_sc->env),
1047 env(_env), conf(env->conf) {}
1048 int operate() override {
1049 reenter(this) {
1050 ldpp_dout(sync_env->dpp, 1) << ": init pubsub config zone=" << sc->source_zone << dendl;
1051
1052 /* nothing to do here right now */
1053 create_user.user = conf->user;
1054 create_user.max_buckets = 0; /* unlimited */
1055 create_user.display_name = "pubsub";
1056 create_user.generate_key = false;
1057 yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user, sync_env->dpp));
1058 if (retcode < 0 && retcode != -ERR_USER_EXIST) {
1059 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
1060 return set_cr_error(retcode);
1061 }
1062
1063 get_user_info.user = conf->user;
1064 yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info));
1065 if (retcode < 0) {
1066 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
1067 return set_cr_error(retcode);
1068 }
1069
1070 ldpp_dout(sync_env->dpp, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env->data_user_info, true) << dendl;
1071
1072
1073 return set_cr_done();
1074 }
1075 return 0;
1076 }
1077 };
1078
1079 bool match(const rgw_pubsub_topic_filter& filter, const std::string& key_name, rgw::notify::EventType event_type) {
1080 if (!match(filter.events, event_type)) {
1081 return false;
1082 }
1083 if (!match(filter.s3_filter.key_filter, key_name)) {
1084 return false;
1085 }
1086 return true;
1087 }
1088
1089 class RGWPSFindBucketTopicsCR : public RGWCoroutine {
1090 RGWDataSyncCtx *sc;
1091 RGWDataSyncEnv *sync_env;
1092 PSEnvRef env;
1093 rgw_user owner;
1094 rgw_bucket bucket;
1095 rgw_obj_key key;
1096 rgw::notify::EventType event_type;
1097
1098 RGWPubSub ps;
1099
1100 rgw_raw_obj bucket_obj;
1101 rgw_raw_obj user_obj;
1102 rgw_pubsub_bucket_topics bucket_topics;
1103 rgw_pubsub_topics user_topics;
1104 TopicsRef *topics;
1105 public:
1106 RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc,
1107 PSEnvRef& _env,
1108 const rgw_user& _owner,
1109 const rgw_bucket& _bucket,
1110 const rgw_obj_key& _key,
1111 rgw::notify::EventType _event_type,
1112 TopicsRef *_topics) : RGWCoroutine(_sc->cct),
1113 sc(_sc), sync_env(_sc->env),
1114 env(_env),
1115 owner(_owner),
1116 bucket(_bucket),
1117 key(_key),
1118 event_type(_event_type),
1119 ps(sync_env->store, owner.tenant),
1120 topics(_topics) {
1121 *topics = std::make_shared<vector<PSTopicConfigRef> >();
1122 }
1123 int operate() override {
1124 reenter(this) {
1125 ps.get_bucket_meta_obj(bucket, &bucket_obj);
1126 ps.get_meta_obj(&user_obj);
1127
1128 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
1129 yield {
1130 bool empty_on_enoent = true;
1131 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
1132 bucket_obj,
1133 &bucket_topics, empty_on_enoent));
1134 }
1135 if (retcode < 0 && retcode != -ENOENT) {
1136 return set_cr_error(retcode);
1137 }
1138
1139 ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
1140
1141 if (!bucket_topics.topics.empty()) {
1142 using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_topics>;
1143 yield {
1144 bool empty_on_enoent = true;
1145 call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
1146 user_obj,
1147 &user_topics, empty_on_enoent));
1148 }
1149 if (retcode < 0 && retcode != -ENOENT) {
1150 return set_cr_error(retcode);
1151 }
1152 }
1153
1154 for (auto& titer : bucket_topics.topics) {
1155 auto& topic_filter = titer.second;
1156 auto& info = topic_filter.topic;
1157 if (!match(topic_filter, key.name, event_type)) {
1158 continue;
1159 }
1160 std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
1161 tc->name = info.name;
1162 tc->subs = user_topics.topics[info.name].subs;
1163 tc->opaque_data = info.opaque_data;
1164 (*topics)->push_back(tc);
1165 }
1166
1167 env->conf->get_topics(sync_env->cct, bucket, key, topics);
1168 return set_cr_done();
1169 }
1170 return 0;
1171 }
1172 };
1173
1174 class RGWPSHandleObjEventCR : public RGWCoroutine {
1175 RGWDataSyncCtx* const sc;
1176 const PSEnvRef env;
1177 const rgw_user& owner;
1178 const EventRef<rgw_pubsub_event> event;
1179 const EventRef<rgw_pubsub_s3_record> record;
1180 const TopicsRef topics;
1181 const std::array<rgw_user, 2> owners;
1182 bool has_subscriptions;
1183 bool event_handled;
1184 bool sub_conf_found;
1185 PSSubscriptionRef sub;
1186 std::array<rgw_user, 2>::const_iterator oiter;
1187 std::vector<PSTopicConfigRef>::const_iterator titer;
1188 std::set<std::string>::const_iterator siter;
1189 int last_sub_conf_error;
1190
1191 public:
1192 RGWPSHandleObjEventCR(RGWDataSyncCtx* const _sc,
1193 const PSEnvRef _env,
1194 const rgw_user& _owner,
1195 const EventRef<rgw_pubsub_event>& _event,
1196 const EventRef<rgw_pubsub_s3_record>& _record,
1197 const TopicsRef& _topics) : RGWCoroutine(_sc->cct),
1198 sc(_sc),
1199 env(_env),
1200 owner(_owner),
1201 event(_event),
1202 record(_record),
1203 topics(_topics),
1204 owners({owner, rgw_user{}}),
1205 has_subscriptions(false),
1206 event_handled(false) {}
1207
1208 int operate() override {
1209 reenter(this) {
1210 ldout(sc->cct, 20) << ": handle event: obj: z=" << sc->source_zone
1211 << " event=" << json_str("event", *event, false)
1212 << " owner=" << owner << dendl;
1213
1214 ldout(sc->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl;
1215
1216 // outside caller should check that
1217 ceph_assert(!topics->empty());
1218
1219 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered);
1220
1221 // loop over all topics related to the bucket/object
1222 for (titer = topics->begin(); titer != topics->end(); ++titer) {
1223 ldout(sc->cct, 20) << ": notification for " << event->source << ": topic=" <<
1224 (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
1225 // loop over all subscriptions of the topic
1226 for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
1227 ldout(sc->cct, 20) << ": subscription: " << *siter << dendl;
1228 has_subscriptions = true;
1229 sub_conf_found = false;
1230 // try to read subscription configuration from global/user cond
1231 // configuration is considered missing only if does not exist in either
1232 for (oiter = owners.begin(); oiter != owners.end(); ++oiter) {
1233 yield PSManager::call_get_subscription_cr(sc, env->manager, this, *oiter, *siter, &sub);
1234 if (retcode < 0) {
1235 if (sub_conf_found) {
1236 // not a real issue, sub conf already found
1237 retcode = 0;
1238 }
1239 last_sub_conf_error = retcode;
1240 continue;
1241 }
1242 sub_conf_found = true;
1243 if (sub->sub_conf->s3_id.empty()) {
1244 // subscription was not made by S3 compatible API
1245 ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
1246 yield call(PSSubscription::store_event_cr(sc, sub, event));
1247 if (retcode < 0) {
1248 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
1249 ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
1250 } else {
1251 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
1252 event_handled = true;
1253 }
1254 if (sub->sub_conf->push_endpoint) {
1255 ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
1256 yield call(PSSubscription::push_event_cr(sc, sub, event));
1257 if (retcode < 0) {
1258 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
1259 ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
1260 } else {
1261 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
1262 event_handled = true;
1263 }
1264 }
1265 } else {
1266 // subscription was made by S3 compatible API
1267 ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
1268 record->configurationId = sub->sub_conf->s3_id;
1269 record->opaque_data = (*titer)->opaque_data;
1270 yield call(PSSubscription::store_event_cr(sc, sub, record));
1271 if (retcode < 0) {
1272 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
1273 ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
1274 } else {
1275 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
1276 event_handled = true;
1277 }
1278 if (sub->sub_conf->push_endpoint) {
1279 ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
1280 yield call(PSSubscription::push_event_cr(sc, sub, record));
1281 if (retcode < 0) {
1282 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
1283 ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
1284 } else {
1285 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
1286 event_handled = true;
1287 }
1288 }
1289 }
1290 }
1291 if (!sub_conf_found) {
1292 // could not find conf for subscription at user or global levels
1293 if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
1294 ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
1295 << " ret=" << last_sub_conf_error << dendl;
1296 if (retcode == -ENOENT) {
1297 // missing subscription info should be reflected back as invalid argument
1298 // and not as missing object
1299 retcode = -EINVAL;
1300 }
1301 }
1302 }
1303 }
1304 if (has_subscriptions && !event_handled) {
1305 // event is considered "lost" of it has subscriptions on any of its topics
1306 // but it was not stored in, or pushed to, any of them
1307 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost);
1308 }
1309 if (retcode < 0) {
1310 return set_cr_error(retcode);
1311 }
1312 return set_cr_done();
1313 }
1314 return 0;
1315 }
1316 };
1317
1318 // coroutine invoked on remote object creation
1319 class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
1320 RGWDataSyncCtx *sc;
1321 rgw_bucket_sync_pipe sync_pipe;
1322 PSEnvRef env;
1323 std::optional<uint64_t> versioned_epoch;
1324 EventRef<rgw_pubsub_event> event;
1325 EventRef<rgw_pubsub_s3_record> record;
1326 TopicsRef topics;
1327 public:
1328 RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
1329 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
1330 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
1331 TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1332 sc(_sc),
1333 sync_pipe(_sync_pipe),
1334 env(_env),
1335 versioned_epoch(_versioned_epoch),
1336 topics(_topics) {
1337 }
1338 int operate() override {
1339 reenter(this) {
1340 ldout(sc->cct, 20) << ": stat of remote obj: z=" << sc->source_zone
1341 << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
1342 << " attrs=" << attrs << dendl;
1343 {
1344 std::vector<std::pair<std::string, std::string> > attrs;
1345 for (auto& attr : attrs) {
1346 std::string k = attr.first;
1347 if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) {
1348 k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
1349 }
1350 attrs.push_back(std::make_pair(k, attr.second));
1351 }
1352 // at this point we don't know whether we need the ceph event or S3 record
1353 // this is why both are created here, once we have information about the
1354 // subscription, we will store/push only the relevant ones
1355 make_event_ref(sc->cct,
1356 sync_pipe.info.source_bs.bucket, key,
1357 mtime, &attrs,
1358 rgw::notify::ObjectCreated, &event);
1359 make_s3_record_ref(sc->cct,
1360 sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
1361 mtime, &attrs,
1362 rgw::notify::ObjectCreated, &record);
1363 }
1364
1365 yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, record, topics));
1366 if (retcode < 0) {
1367 return set_cr_error(retcode);
1368 }
1369 return set_cr_done();
1370 }
1371 return 0;
1372 }
1373 };
1374
1375 class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
1376 rgw_bucket_sync_pipe sync_pipe;
1377 PSEnvRef env;
1378 std::optional<uint64_t> versioned_epoch;
1379 TopicsRef topics;
1380 public:
1381 RGWPSHandleRemoteObjCR(RGWDataSyncCtx *_sc,
1382 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
1383 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
1384 TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1385 sync_pipe(_sync_pipe),
1386 env(_env), versioned_epoch(_versioned_epoch),
1387 topics(_topics) {
1388 }
1389
1390 ~RGWPSHandleRemoteObjCR() override {}
1391
1392 RGWStatRemoteObjCBCR *allocate_callback() override {
1393 return new RGWPSHandleRemoteObjCBCR(sc, sync_pipe, key, env, versioned_epoch, topics);
1394 }
1395 };
1396
1397 class RGWPSHandleObjCreateCR : public RGWCoroutine {
1398 RGWDataSyncCtx *sc;
1399 rgw_bucket_sync_pipe sync_pipe;
1400 rgw_obj_key key;
1401 PSEnvRef env;
1402 std::optional<uint64_t> versioned_epoch;
1403 TopicsRef topics;
1404 public:
1405 RGWPSHandleObjCreateCR(RGWDataSyncCtx *_sc,
1406 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
1407 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch) : RGWCoroutine(_sc->cct),
1408 sc(_sc),
1409 sync_pipe(_sync_pipe),
1410 key(_key),
1411 env(_env),
1412 versioned_epoch(_versioned_epoch) {
1413 }
1414
1415 ~RGWPSHandleObjCreateCR() override {}
1416
1417 int operate() override {
1418 reenter(this) {
1419 yield call(new RGWPSFindBucketTopicsCR(sc, env, sync_pipe.dest_bucket_info.owner,
1420 sync_pipe.info.source_bs.bucket, key,
1421 rgw::notify::ObjectCreated,
1422 &topics));
1423 if (retcode < 0) {
1424 ldout(sc->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
1425 return set_cr_error(retcode);
1426 }
1427 if (topics->empty()) {
1428 ldout(sc->cct, 20) << "no topics found for " << sync_pipe.info.source_bs.bucket << "/" << key << dendl;
1429 return set_cr_done();
1430 }
1431 yield call(new RGWPSHandleRemoteObjCR(sc, sync_pipe, key, env, versioned_epoch, topics));
1432 if (retcode < 0) {
1433 return set_cr_error(retcode);
1434 }
1435 return set_cr_done();
1436 }
1437 return 0;
1438 }
1439 };
1440
1441 // coroutine invoked on remote object deletion
1442 class RGWPSGenericObjEventCBCR : public RGWCoroutine {
1443 RGWDataSyncCtx *sc;
1444 PSEnvRef env;
1445 rgw_user owner;
1446 rgw_bucket bucket;
1447 rgw_obj_key key;
1448 ceph::real_time mtime;
1449 rgw::notify::EventType event_type;
1450 EventRef<rgw_pubsub_event> event;
1451 EventRef<rgw_pubsub_s3_record> record;
1452 TopicsRef topics;
1453 public:
1454 RGWPSGenericObjEventCBCR(RGWDataSyncCtx *_sc,
1455 PSEnvRef _env,
1456 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
1457 rgw::notify::EventType _event_type) : RGWCoroutine(_sc->cct),
1458 sc(_sc),
1459 env(_env),
1460 owner(_sync_pipe.dest_bucket_info.owner),
1461 bucket(_sync_pipe.dest_bucket_info.bucket),
1462 key(_key),
1463 mtime(_mtime), event_type(_event_type) {}
1464 int operate() override {
1465 reenter(this) {
1466 ldout(sc->cct, 20) << ": remove remote obj: z=" << sc->source_zone
1467 << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
1468 yield call(new RGWPSFindBucketTopicsCR(sc, env, owner, bucket, key, event_type, &topics));
1469 if (retcode < 0) {
1470 ldout(sc->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
1471 return set_cr_error(retcode);
1472 }
1473 if (topics->empty()) {
1474 ldout(sc->cct, 20) << "no topics found for " << bucket << "/" << key << dendl;
1475 return set_cr_done();
1476 }
1477 // at this point we don't know whether we need the ceph event or S3 record
1478 // this is why both are created here, once we have information about the
1479 // subscription, we will store/push only the relevant ones
1480 make_event_ref(sc->cct,
1481 bucket, key,
1482 mtime, nullptr,
1483 event_type, &event);
1484 make_s3_record_ref(sc->cct,
1485 bucket, owner, key,
1486 mtime, nullptr,
1487 event_type, &record);
1488 yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, record, topics));
1489 if (retcode < 0) {
1490 return set_cr_error(retcode);
1491 }
1492 return set_cr_done();
1493 }
1494 return 0;
1495 }
1496
1497 };
1498
1499 class RGWPSDataSyncModule : public RGWDataSyncModule {
1500 PSEnvRef env;
1501 PSConfigRef& conf;
1502
1503 public:
1504 RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared<PSEnv>()), conf(env->conf) {
1505 env->init(cct, config);
1506 }
1507
1508 ~RGWPSDataSyncModule() override {}
1509
1510 void init(RGWDataSyncCtx *sc, uint64_t instance_id) override {
1511 auto sync_env = sc->env;
1512 PSManagerRef mgr = PSManager::get_shared(sc, env);
1513 env->init_instance(sync_env->svc->zone->get_realm(), instance_id, mgr);
1514 }
1515
1516 RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override {
1517 ldout(sc->cct, 5) << conf->id << ": start" << dendl;
1518 return new RGWPSInitEnvCBCR(sc, env);
1519 }
1520
1521 RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
1522 rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
1523 ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe <<
1524 " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
1525 return new RGWPSHandleObjCreateCR(sc, sync_pipe, key, env, versioned_epoch);
1526 }
1527
1528 RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
1529 rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
1530 ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe <<
1531 " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1532 return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete);
1533 }
1534
1535 RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
1536 rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
1537 ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe <<
1538 " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1539 return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated);
1540 }
1541
1542 PSConfigRef& get_conf() { return conf; }
1543 };
1544
1545 RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config)
1546 {
1547 data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config));
1548 const std::string jconf = json_str("conf", *data_handler->get_conf());
1549 JSONParser p;
1550 if (!p.parse(jconf.c_str(), jconf.size())) {
1551 ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
1552 effective_conf = config;
1553 } else {
1554 effective_conf.decode_json(&p);
1555 }
1556 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
1557 if (!rgw::amqp::init(cct)) {
1558 ldout(cct, 1) << "ERROR: failed to initialize AMQP manager in pubsub sync module" << dendl;
1559 }
1560 #endif
1561 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
1562 if (!rgw::kafka::init(cct)) {
1563 ldout(cct, 1) << "ERROR: failed to initialize Kafka manager in pubsub sync module" << dendl;
1564 }
1565 #endif
1566 }
1567
1568 RGWPSSyncModuleInstance::~RGWPSSyncModuleInstance() {
1569 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
1570 rgw::amqp::shutdown();
1571 #endif
1572 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
1573 rgw::kafka::shutdown();
1574 #endif
1575 }
1576
1577 RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()
1578 {
1579 return data_handler.get();
1580 }
1581
1582 RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
1583 if (dialect != RGW_REST_S3) {
1584 return orig;
1585 }
1586 return new RGWRESTMgr_PubSub();
1587 }
1588
1589 bool RGWPSSyncModuleInstance::should_full_sync() const {
1590 return data_handler->get_conf()->start_with_full_sync;
1591 }
1592
1593 int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
1594 instance->reset(new RGWPSSyncModuleInstance(cct, config));
1595 return 0;
1596 }
1597
1598