]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sync_module_pubsub.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / rgw / rgw_sync_module_pubsub.cc
CommitLineData
11fdf7f2
TL
1#include "rgw_common.h"
2#include "rgw_coroutine.h"
3#include "rgw_sync_module.h"
4#include "rgw_data_sync.h"
5#include "rgw_sync_module_pubsub.h"
6#include "rgw_sync_module_pubsub_rest.h"
7#include "rgw_rest_conn.h"
8#include "rgw_cr_rados.h"
9#include "rgw_cr_rest.h"
10#include "rgw_cr_tools.h"
11#include "rgw_op.h"
12#include "rgw_pubsub.h"
13#include "rgw_pubsub_push.h"
eafe8130 14#include "rgw_notify_event_type.h"
11fdf7f2 15#include "rgw_perf_counters.h"
eafe8130
TL
16#ifdef WITH_RADOSGW_AMQP_ENDPOINT
17#include "rgw_amqp.h"
18#endif
11fdf7f2 19
eafe8130 20#include <boost/algorithm/hex.hpp>
11fdf7f2
TL
21#include <boost/asio/yield.hpp>
22
23#define dout_subsys ceph_subsys_rgw
24
25
26#define PUBSUB_EVENTS_RETENTION_DEFAULT 7
27
28/*
29
30config:
31
32{
33 "tenant": <tenant>, # default: <empty>
34 "uid": <uid>, # default: "pubsub"
35 "data_bucket_prefix": <prefix> # default: "pubsub-"
36 "data_oid_prefix": <prefix> #
eafe8130
TL
37 "events_retention_days": <int> # default: 7
38 "start_with_full_sync" <bool> # default: false
11fdf7f2
TL
39
40 # non-dynamic config
41 "notifications": [
42 {
43 "path": <notification-path>, # this can be either an explicit path: <bucket>, or <bucket>/<object>,
44 # or a prefix if it ends with a wildcard
45 "topic": <topic-name>
46 },
47 ...
48 ],
49 "subscriptions": [
50 {
51 "name": <subscription-name>,
52 "topic": <topic>,
53 "push_endpoint": <endpoint>,
eafe8130 54 "push_endpoint_args:" <arg list>. # any push endpoint specific args (include all args)
11fdf7f2
TL
55 "data_bucket": <bucket>, # override name of bucket where subscription data will be store
56 "data_oid_prefix": <prefix> # set prefix for subscription data object ids
eafe8130 57 "s3_id": <id> # in case of S3 compatible notifications, the notification ID will be set here
11fdf7f2
TL
58 },
59 ...
60 ]
61}
62
63*/
64
65// utility function to convert the args list from string format
66// (ampresend separated with equal sign) to prased structure
67RGWHTTPArgs string_to_args(const std::string& str_args) {
68 RGWHTTPArgs args;
69 args.set(str_args);
70 args.parse();
71 return args;
72}
73
eafe8130
TL
74struct PSSubConfig {
75 std::string name;
76 std::string topic;
77 std::string push_endpoint_name;
78 std::string push_endpoint_args;
79 std::string data_bucket_name;
80 std::string data_oid_prefix;
81 std::string s3_id;
82 std::string arn_topic;
11fdf7f2
TL
83 RGWPubSubEndpoint::Ptr push_endpoint;
84
11fdf7f2
TL
85 void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc) {
86 name = uc.name;
87 topic = uc.topic;
88 push_endpoint_name = uc.dest.push_endpoint;
89 data_bucket_name = uc.dest.bucket_name;
90 data_oid_prefix = uc.dest.oid_prefix;
eafe8130
TL
91 s3_id = uc.s3_id;
92 arn_topic = uc.dest.arn_topic;
93 if (!push_endpoint_name.empty()) {
11fdf7f2
TL
94 push_endpoint_args = uc.dest.push_endpoint_args;
95 try {
eafe8130
TL
96 push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
97 ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
11fdf7f2 98 } catch (const RGWPubSubEndpoint::configuration_error& e) {
eafe8130 99 ldout(cct, 1) << "ERROR: failed to create push endpoint: "
11fdf7f2
TL
100 << push_endpoint_name << " due to: " << e.what() << dendl;
101 }
102 }
103 }
104
105 void dump(Formatter *f) const {
106 encode_json("name", name, f);
107 encode_json("topic", topic, f);
108 encode_json("push_endpoint", push_endpoint_name, f);
eafe8130 109 encode_json("push_endpoint_args", push_endpoint_args, f);
11fdf7f2
TL
110 encode_json("data_bucket_name", data_bucket_name, f);
111 encode_json("data_oid_prefix", data_oid_prefix, f);
eafe8130 112 encode_json("s3_id", s3_id, f);
11fdf7f2
TL
113 }
114
115 void init(CephContext *cct, const JSONFormattable& config,
116 const string& data_bucket_prefix,
117 const string& default_oid_prefix) {
118 name = config["name"];
119 topic = config["topic"];
120 push_endpoint_name = config["push_endpoint"];
121 string default_bucket_name = data_bucket_prefix + name;
122 data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
123 data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str());
eafe8130
TL
124 s3_id = config["s3_id"];
125 arn_topic = config["arn_topic"];
11fdf7f2
TL
126 if (!push_endpoint_name.empty()) {
127 push_endpoint_args = config["push_endpoint_args"];
128 try {
eafe8130
TL
129 push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
130 ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
11fdf7f2 131 } catch (const RGWPubSubEndpoint::configuration_error& e) {
eafe8130 132 ldout(cct, 1) << "ERROR: failed to create push endpoint: "
11fdf7f2
TL
133 << push_endpoint_name << " due to: " << e.what() << dendl;
134 }
135 }
136 }
137};
138
139using PSSubConfigRef = std::shared_ptr<PSSubConfig>;
140
141struct PSTopicConfig {
eafe8130
TL
142 std::string name;
143 std::set<std::string> subs;
11fdf7f2
TL
144
145 void dump(Formatter *f) const {
146 encode_json("name", name, f);
147 encode_json("subs", subs, f);
148 }
149};
150
151struct PSNotificationConfig {
152 uint64_t id{0};
153 string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */
154 string topic;
155 bool is_prefix{false};
156
157
158 void dump(Formatter *f) const {
159 encode_json("id", id, f);
160 encode_json("path", path, f);
161 encode_json("topic", topic, f);
162 encode_json("is_prefix", is_prefix, f);
163 }
164
165 void init(CephContext *cct, const JSONFormattable& config) {
166 path = config["path"];
167 if (!path.empty() && path[path.size() - 1] == '*') {
168 path = path.substr(0, path.size() - 1);
169 is_prefix = true;
170 }
171 topic = config["topic"];
172 }
173};
174
175template<class T>
176static string json_str(const char *name, const T& obj, bool pretty = false)
177{
178 stringstream ss;
179 JSONFormatter f(pretty);
180
181 encode_json(name, obj, &f);
182 f.flush(ss);
183
184 return ss.str();
185}
186
187using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
eafe8130 188using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
11fdf7f2
TL
189
190struct PSConfig {
191 string id{"pubsub"};
192 rgw_user user;
193 string data_bucket_prefix;
194 string data_oid_prefix;
195
196 int events_retention_days{0};
197
198 uint64_t sync_instance{0};
199 uint64_t max_id{0};
200
201 /* FIXME: no hard coded buckets, we'll have configurable topics */
eafe8130
TL
202 std::map<std::string, PSSubConfigRef> subs;
203 std::map<std::string, PSTopicConfigRef> topics;
204 std::multimap<std::string, PSNotificationConfig> notifications;
205
206 bool start_with_full_sync{false};
11fdf7f2
TL
207
208 void dump(Formatter *f) const {
209 encode_json("id", id, f);
210 encode_json("user", user, f);
211 encode_json("data_bucket_prefix", data_bucket_prefix, f);
212 encode_json("data_oid_prefix", data_oid_prefix, f);
213 encode_json("events_retention_days", events_retention_days, f);
214 encode_json("sync_instance", sync_instance, f);
215 encode_json("max_id", max_id, f);
216 {
217 Formatter::ArraySection section(*f, "subs");
218 for (auto& sub : subs) {
219 encode_json("sub", *sub.second, f);
220 }
221 }
222 {
223 Formatter::ArraySection section(*f, "topics");
224 for (auto& topic : topics) {
225 encode_json("topic", *topic.second, f);
226 }
227 }
228 {
229 Formatter::ObjectSection section(*f, "notifications");
230 string last;
231 for (auto& notif : notifications) {
232 const string& n = notif.first;
233 if (n != last) {
234 if (!last.empty()) {
235 f->close_section();
236 }
237 f->open_array_section(n.c_str());
238 }
239 last = n;
240 encode_json("notifications", notif.second, f);
241 }
242 if (!last.empty()) {
243 f->close_section();
244 }
245 }
eafe8130 246 encode_json("start_with_full_sync", start_with_full_sync, f);
11fdf7f2
TL
247 }
248
249 void init(CephContext *cct, const JSONFormattable& config) {
250 string uid = config["uid"]("pubsub");
251 user = rgw_user(config["tenant"], uid);
252 data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
253 data_oid_prefix = config["data_oid_prefix"];
254 events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT);
255
256 for (auto& c : config["notifications"].array()) {
257 PSNotificationConfig nc;
258 nc.id = ++max_id;
259 nc.init(cct, c);
260 notifications.insert(std::make_pair(nc.path, nc));
261
262 PSTopicConfig topic_config = { .name = nc.topic };
263 topics[nc.topic] = make_shared<PSTopicConfig>(topic_config);
264 }
265 for (auto& c : config["subscriptions"].array()) {
266 auto sc = std::make_shared<PSSubConfig>();
267 sc->init(cct, c, data_bucket_prefix, data_oid_prefix);
268 subs[sc->name] = sc;
269 auto iter = topics.find(sc->topic);
270 if (iter != topics.end()) {
271 iter->second->subs.insert(sc->name);
272 }
273 }
eafe8130 274 start_with_full_sync = config["start_with_full_sync"](false);
11fdf7f2
TL
275
276 ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
277 }
278
279 void init_instance(const RGWRealm& realm, uint64_t instance_id) {
280 sync_instance = instance_id;
281 }
282
283 void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
284 string path = bucket.name + "/" + key.name;
285
286 auto iter = notifications.upper_bound(path);
287 if (iter == notifications.begin()) {
288 return;
289 }
290
291 do {
292 --iter;
293 if (iter->first.size() > path.size()) {
294 break;
295 }
296 if (path.compare(0, iter->first.size(), iter->first) != 0) {
297 break;
298 }
299
300 PSNotificationConfig& target = iter->second;
301
302 if (!target.is_prefix &&
303 path.size() != iter->first.size()) {
304 continue;
305 }
306
307 auto topic = topics.find(target.topic);
308 if (topic == topics.end()) {
309 continue;
310 }
311
eafe8130
TL
312 ldout(cct, 20) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id <<
313 " target_path=" << target.path << ", topic=" << target.topic << dendl;
11fdf7f2
TL
314 (*result)->push_back(topic->second);
315 } while (iter != notifications.begin());
316 }
317
318 bool find_sub(const string& name, PSSubConfigRef *ref) {
319 auto iter = subs.find(name);
320 if (iter != subs.end()) {
321 *ref = iter->second;
322 return true;
323 }
324 return false;
325 }
326};
327
11fdf7f2 328using PSConfigRef = std::shared_ptr<PSConfig>;
eafe8130
TL
329template<typename EventType>
330using EventRef = std::shared_ptr<EventType>;
11fdf7f2
TL
331
332struct objstore_event {
333 string id;
334 const rgw_bucket& bucket;
335 const rgw_obj_key& key;
336 const ceph::real_time& mtime;
337 const std::vector<std::pair<std::string, std::string> > *attrs;
338
339 objstore_event(const rgw_bucket& _bucket,
340 const rgw_obj_key& _key,
341 const ceph::real_time& _mtime,
342 const std::vector<std::pair<std::string, std::string> > *_attrs) : bucket(_bucket),
343 key(_key),
344 mtime(_mtime),
345 attrs(_attrs) {}
346
347 string get_hash() {
348 string etag;
349 RGWMD5Etag hash;
350 hash.update(bucket.bucket_id);
351 hash.update(key.name);
352 hash.update(key.instance);
353 hash.finish(&etag);
354
355 assert(etag.size() > 8);
356
357 return etag.substr(0, 8);
358 }
359
360 void dump(Formatter *f) const {
361 {
362 Formatter::ObjectSection s(*f, "bucket");
363 encode_json("name", bucket.name, f);
364 encode_json("tenant", bucket.tenant, f);
365 encode_json("bucket_id", bucket.bucket_id, f);
366 }
367 {
368 Formatter::ObjectSection s(*f, "key");
369 encode_json("name", key.name, f);
370 encode_json("instance", key.instance, f);
371 }
372 utime_t mt(mtime);
373 encode_json("mtime", mt, f);
374 Formatter::ObjectSection s(*f, "attrs");
375 if (attrs) {
376 for (auto& attr : *attrs) {
377 encode_json(attr.first.c_str(), attr.second.c_str(), f);
378 }
379 }
380 }
381};
382
383static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
384 const rgw_obj_key& key,
385 const ceph::real_time& mtime,
386 const std::vector<std::pair<std::string, std::string> > *attrs,
eafe8130
TL
387 rgw::notify::EventType event_type,
388 EventRef<rgw_pubsub_event> *event) {
11fdf7f2
TL
389 *event = std::make_shared<rgw_pubsub_event>();
390
eafe8130
TL
391 EventRef<rgw_pubsub_event>& e = *event;
392 e->event_name = rgw::notify::to_ceph_string(event_type);
11fdf7f2
TL
393 e->source = bucket.name + "/" + key.name;
394 e->timestamp = real_clock::now();
395
396 objstore_event oevent(bucket, key, mtime, attrs);
397
eafe8130
TL
398 const utime_t ts(e->timestamp);
399 set_event_id(e->id, oevent.get_hash(), ts);
11fdf7f2
TL
400
401 encode_json("info", oevent, &e->info);
402}
403
eafe8130
TL
404static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket,
405 const rgw_user& owner,
406 const rgw_obj_key& key,
407 const ceph::real_time& mtime,
408 const std::vector<std::pair<std::string, std::string> > *attrs,
409 rgw::notify::EventType event_type,
410 EventRef<rgw_pubsub_s3_record> *record) {
411 *record = std::make_shared<rgw_pubsub_s3_record>();
412
413 EventRef<rgw_pubsub_s3_record>& r = *record;
eafe8130
TL
414 r->eventTime = mtime;
415 r->eventName = rgw::notify::to_string(event_type);
92f5a8d4
TL
416 // userIdentity: not supported in sync module
417 // x_amz_request_id: not supported in sync module
418 // x_amz_id_2: not supported in sync module
eafe8130
TL
419 // configurationId is filled from subscription configuration
420 r->bucket_name = bucket.name;
421 r->bucket_ownerIdentity = owner.to_str();
422 r->bucket_arn = to_string(rgw::ARN(bucket));
423 r->bucket_id = bucket.bucket_id; // rgw extension
424 r->object_key = key.name;
92f5a8d4 425 // object_size not supported in sync module
eafe8130
TL
426 objstore_event oevent(bucket, key, mtime, attrs);
427 r->object_etag = oevent.get_hash();
428 r->object_versionId = key.instance;
429
430 // use timestamp as per key sequence id (hex encoded)
431 const utime_t ts(real_clock::now());
432 boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
433 std::back_inserter(r->object_sequencer));
434
eafe8130
TL
435 set_event_id(r->id, r->object_etag, ts);
436}
437
11fdf7f2
TL
438class PSManager;
439using PSManagerRef = std::shared_ptr<PSManager>;
440
441struct PSEnv {
442 PSConfigRef conf;
443 shared_ptr<RGWUserInfo> data_user_info;
444 PSManagerRef manager;
445
446 PSEnv() : conf(make_shared<PSConfig>()),
447 data_user_info(make_shared<RGWUserInfo>()) {}
448
449 void init(CephContext *cct, const JSONFormattable& config) {
450 conf->init(cct, config);
451 }
452
453 void init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr);
454};
455
456using PSEnvRef = std::shared_ptr<PSEnv>;
457
eafe8130 458template<typename EventType>
11fdf7f2 459class PSEvent {
eafe8130 460 const EventRef<EventType> event;
11fdf7f2
TL
461
462public:
eafe8130 463 PSEvent(const EventRef<EventType>& _event) : event(_event) {}
11fdf7f2
TL
464
465 void format(bufferlist *bl) const {
92f5a8d4 466 bl->append(json_str("", *event));
11fdf7f2
TL
467 }
468
469 void encode_event(bufferlist& bl) const {
470 encode(*event, bl);
471 }
472
473 const string& id() const {
474 return event->id;
475 }
476};
477
478template <class T>
479class RGWSingletonCR : public RGWCoroutine {
480 friend class WrapperCR;
481
482 boost::asio::coroutine wrapper_state;
483 bool started{false};
484 int operate_ret{0};
485
486 struct WaiterInfo {
487 RGWCoroutine *cr{nullptr};
488 T *result;
489 };
490 using WaiterInfoRef = std::shared_ptr<WaiterInfo>;
491
492 deque<WaiterInfoRef> waiters;
493
494 void add_waiter(RGWCoroutine *cr, T *result) {
495 auto waiter = std::make_shared<WaiterInfo>();
496 waiter->cr = cr;
497 waiter->result = result;
498 waiters.push_back(waiter);
499 };
500
501 bool get_next_waiter(WaiterInfoRef *waiter) {
502 if (waiters.empty()) {
503 waiter->reset();
504 return false;
505 }
506
507 *waiter = waiters.front();
508 waiters.pop_front();
509 return true;
510 }
511
512 int operate_wrapper() override {
513 reenter(&wrapper_state) {
514 while (!is_done()) {
515 ldout(cct, 20) << __func__ << "(): operate_wrapper() -> operate()" << dendl;
516 operate_ret = operate();
517 if (operate_ret < 0) {
518 ldout(cct, 20) << *this << ": operate() returned r=" << operate_ret << dendl;
519 }
520 if (!is_done()) {
521 yield;
522 }
523 }
524
525 ldout(cct, 20) << __func__ << "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters.size() << " waiters" << dendl;
526 /* we're done, can't yield anymore */
527
528 WaiterInfoRef waiter;
529 while (get_next_waiter(&waiter)) {
530 ldout(cct, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl;
531 waiter->cr->set_retcode(retcode);
532 waiter->cr->set_sleeping(false);
533 return_result(waiter->result);
534 put();
535 }
536
537 return retcode;
538 }
539 return 0;
540 }
541
542 virtual void return_result(T *result) {}
543
544public:
545 RGWSingletonCR(CephContext *_cct)
546 : RGWCoroutine(_cct) {}
547
548 int execute(RGWCoroutine *caller, T *result = nullptr) {
549 if (!started) {
550 ldout(cct, 20) << __func__ << "(): singleton not started, starting" << dendl;
551 started = true;
552 caller->call(this);
553 return 0;
554 } else if (!is_done()) {
555 ldout(cct, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl;
556 get();
557 add_waiter(caller, result);
558 caller->set_sleeping(true);
559 return 0;
560 }
561
562 ldout(cct, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl;
563 caller->set_retcode(retcode);
564 return_result(result);
565 return retcode;
566 }
567};
568
569
570class PSSubscription;
571using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
572
573class PSSubscription {
574 class InitCR;
575 friend class InitCR;
576 friend class RGWPSHandleObjEventCR;
577
578 RGWDataSyncEnv *sync_env;
579 PSEnvRef env;
580 PSSubConfigRef sub_conf;
eafe8130 581 std::shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result;
11fdf7f2
TL
582 RGWBucketInfo *bucket_info{nullptr};
583 RGWDataAccessRef data_access;
584 RGWDataAccess::BucketRef bucket;
585
11fdf7f2
TL
586 InitCR *init_cr{nullptr};
587
588 class InitBucketLifecycleCR : public RGWCoroutine {
589 RGWDataSyncEnv *sync_env;
590 PSConfigRef& conf;
591 LCRule rule;
592
593 int retention_days;
594
595 rgw_bucket_lifecycle_config_params lc_config;
596
597 public:
598 InitBucketLifecycleCR(RGWDataSyncEnv *_sync_env,
599 PSConfigRef& _conf,
600 RGWBucketInfo& _bucket_info,
eafe8130 601 std::map<string, bufferlist>& _bucket_attrs) : RGWCoroutine(_sync_env->cct),
11fdf7f2
TL
602 sync_env(_sync_env),
603 conf(_conf) {
604 lc_config.bucket_info = _bucket_info;
605 lc_config.bucket_attrs = _bucket_attrs;
606 retention_days = conf->events_retention_days;
607 }
608
609 int operate() override {
610 reenter(this) {
611
612 rule.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days);
613
614 {
615 /* maybe we already have it configured? */
616 RGWLifecycleConfiguration old_config;
617 auto aiter = lc_config.bucket_attrs.find(RGW_ATTR_LC);
618 if (aiter != lc_config.bucket_attrs.end()) {
619 bufferlist::const_iterator iter{&aiter->second};
620 try {
621 old_config.decode(iter);
622 } catch (const buffer::error& e) {
623 ldout(cct, 0) << __func__ << "(): decode life cycle config failed" << dendl;
624 }
625 }
626
627 auto old_rules = old_config.get_rule_map();
628 for (auto ori : old_rules) {
629 auto& old_rule = ori.second;
630
631 if (old_rule.get_prefix().empty() &&
632 old_rule.get_expiration().get_days() == retention_days &&
633 old_rule.is_enabled()) {
634 ldout(sync_env->cct, 20) << "no need to set lifecycle rule on bucketi, existing rule matches config" << dendl;
635 return set_cr_done();
636 }
637 }
638 }
639
640 lc_config.config.add_rule(rule);
641 yield call(new RGWBucketLifecycleConfigCR(sync_env->async_rados,
642 sync_env->store,
643 lc_config));
644 if (retcode < 0) {
645 ldout(sync_env->cct, 0) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl;
646 return set_cr_error(retcode);
647 }
648
649 return set_cr_done();
650 }
651 return 0;
652 }
653 };
654
655 class InitCR : public RGWSingletonCR<bool> {
656 RGWDataSyncEnv *sync_env;
657 PSSubscriptionRef sub;
658 rgw_get_bucket_info_params get_bucket_info;
659 rgw_bucket_create_local_params create_bucket;
660 PSConfigRef& conf;
661 PSSubConfigRef& sub_conf;
662 int i;
663
664 public:
665 InitCR(RGWDataSyncEnv *_sync_env,
666 PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sync_env->cct),
667 sync_env(_sync_env),
668 sub(_sub), conf(sub->env->conf),
669 sub_conf(sub->sub_conf) {
670 }
671
672 int operate() override {
673 reenter(this) {
674 get_bucket_info.tenant = conf->user.tenant;
675 get_bucket_info.bucket_name = sub_conf->data_bucket_name;
676 sub->get_bucket_info_result = make_shared<rgw_get_bucket_info_result>();
677
678 for (i = 0; i < 2; ++i) {
679 yield call(new RGWGetBucketInfoCR(sync_env->async_rados,
680 sync_env->store,
681 get_bucket_info,
682 sub->get_bucket_info_result));
683 if (retcode < 0 && retcode != -ENOENT) {
684 ldout(sync_env->cct, 0) << "ERROR: failed to geting bucket info: " << "tenant="
685 << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
686 }
687 if (retcode == 0) {
688 {
689 auto& result = sub->get_bucket_info_result;
690 sub->bucket_info = &result->bucket_info;
691
692 int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket);
693 if (ret < 0) {
694 ldout(sync_env->cct, 0) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl;
695 return set_cr_error(ret);
696 }
697 }
698
699 yield call(new InitBucketLifecycleCR(sync_env, conf,
700 sub->get_bucket_info_result->bucket_info,
701 sub->get_bucket_info_result->attrs));
702 if (retcode < 0) {
703 ldout(sync_env->cct, 0) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl;
704 return set_cr_error(retcode);
705 }
706
707 return set_cr_done();
708 }
709
710 create_bucket.user_info = sub->env->data_user_info;
711 create_bucket.bucket_name = sub_conf->data_bucket_name;
712 ldout(sync_env->cct, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl;
713 yield call(new RGWBucketCreateLocalCR(sync_env->async_rados,
714 sync_env->store,
715 create_bucket));
716 if (retcode < 0) {
717 ldout(sync_env->cct, 0) << "ERROR: failed to create bucket: " << "tenant="
718 << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
719 return set_cr_error(retcode);
720 }
721
722 /* second iteration: we got -ENOENT and created a bucket */
723 }
724
725 /* failed twice on -ENOENT, unexpected */
726 ldout(sync_env->cct, 0) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant
727 << " name=" << get_bucket_info.bucket_name << dendl;
728 return set_cr_error(-EIO);
729 }
730 return 0;
731 }
732 };
733
eafe8130 734 template<typename EventType>
11fdf7f2
TL
735 class StoreEventCR : public RGWCoroutine {
736 RGWDataSyncEnv* const sync_env;
737 const PSSubscriptionRef sub;
eafe8130 738 const PSEvent<EventType> pse;
11fdf7f2
TL
739 const string oid_prefix;
740
741 public:
742 StoreEventCR(RGWDataSyncEnv* const _sync_env,
743 const PSSubscriptionRef& _sub,
eafe8130 744 const EventRef<EventType>& _event) : RGWCoroutine(_sync_env->cct),
11fdf7f2
TL
745 sync_env(_sync_env),
746 sub(_sub),
747 pse(_event),
748 oid_prefix(sub->sub_conf->data_oid_prefix) {
749 }
750
751 int operate() override {
11fdf7f2
TL
752 rgw_object_simple_put_params put_obj;
753 reenter(this) {
754
755 put_obj.bucket = sub->bucket;
756 put_obj.key = rgw_obj_key(oid_prefix + pse.id());
757
758 pse.format(&put_obj.data);
759
760 {
761 bufferlist bl;
762 pse.encode_event(bl);
763 bufferlist bl64;
764 bl.encode_base64(bl64);
765 put_obj.user_data = bl64.to_str();
766 }
767
768 yield call(new RGWObjectSimplePutCR(sync_env->async_rados,
769 sync_env->store,
770 put_obj));
771 if (retcode < 0) {
eafe8130 772 ldpp_dout(sync_env->dpp, 10) << "failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
11fdf7f2
TL
773 return set_cr_error(retcode);
774 } else {
eafe8130 775 ldpp_dout(sync_env->dpp, 20) << "event stored: " << put_obj.bucket << "/" << put_obj.key << dendl;
11fdf7f2
TL
776 }
777
778 return set_cr_done();
779 }
780 return 0;
781 }
782 };
783
eafe8130 784 template<typename EventType>
11fdf7f2
TL
785 class PushEventCR : public RGWCoroutine {
786 RGWDataSyncEnv* const sync_env;
eafe8130 787 const EventRef<EventType> event;
11fdf7f2
TL
788 const PSSubConfigRef& sub_conf;
789
790 public:
791 PushEventCR(RGWDataSyncEnv* const _sync_env,
792 const PSSubscriptionRef& _sub,
eafe8130 793 const EventRef<EventType>& _event) : RGWCoroutine(_sync_env->cct),
11fdf7f2
TL
794 sync_env(_sync_env),
795 event(_event),
796 sub_conf(_sub->sub_conf) {
797 }
798
799 int operate() override {
800 reenter(this) {
801 ceph_assert(sub_conf->push_endpoint);
802 yield call(sub_conf->push_endpoint->send_to_completion_async(*event.get(), sync_env));
803
804 if (retcode < 0) {
eafe8130 805 ldout(sync_env->cct, 10) << "failed to push event: " << event->id <<
11fdf7f2 806 " to endpoint: " << sub_conf->push_endpoint_name << " ret=" << retcode << dendl;
11fdf7f2
TL
807 return set_cr_error(retcode);
808 }
809
eafe8130 810 ldout(sync_env->cct, 20) << "event: " << event->id <<
11fdf7f2 811 " pushed to endpoint: " << sub_conf->push_endpoint_name << dendl;
11fdf7f2
TL
812 return set_cr_done();
813 }
814 return 0;
815 }
816 };
817
818public:
819 PSSubscription(RGWDataSyncEnv *_sync_env,
820 PSEnvRef _env,
821 PSSubConfigRef& _sub_conf) : sync_env(_sync_env),
822 env(_env),
823 sub_conf(_sub_conf),
824 data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {}
825
826 PSSubscription(RGWDataSyncEnv *_sync_env,
827 PSEnvRef _env,
828 rgw_pubsub_sub_config& user_sub_conf) : sync_env(_sync_env),
829 env(_env),
830 sub_conf(std::make_shared<PSSubConfig>()),
831 data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {
832 sub_conf->from_user_conf(sync_env->cct, user_sub_conf);
833 }
834 virtual ~PSSubscription() {
835 if (init_cr) {
836 init_cr->put();
837 }
838 }
839
840 template <class C>
841 static PSSubscriptionRef get_shared(RGWDataSyncEnv *_sync_env,
842 PSEnvRef _env,
843 C& _sub_conf) {
844 auto sub = std::make_shared<PSSubscription>(_sync_env, _env, _sub_conf);
845 sub->init_cr = new InitCR(_sync_env, sub);
846 sub->init_cr->get();
847 return sub;
848 }
849
850 int call_init_cr(RGWCoroutine *caller) {
851 return init_cr->execute(caller);
852 }
853
eafe8130
TL
854 template<typename EventType>
855 static RGWCoroutine *store_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
856 return new StoreEventCR<EventType>(sync_env, sub, event);
11fdf7f2
TL
857 }
858
eafe8130
TL
859 template<typename EventType>
860 static RGWCoroutine *push_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
861 return new PushEventCR<EventType>(sync_env, sub, event);
11fdf7f2
TL
862 }
863 friend class InitCR;
864};
865
11fdf7f2
TL
866class PSManager
867{
868 RGWDataSyncEnv *sync_env;
869 PSEnvRef env;
870
eafe8130 871 std::map<string, PSSubscriptionRef> subs;
11fdf7f2
TL
872
873 class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> {
874 RGWDataSyncEnv *sync_env;
875 PSManagerRef mgr;
876 rgw_user owner;
877 string sub_name;
878 string sub_id;
879 PSSubscriptionRef *ref;
880
881 PSConfigRef conf;
882
883 PSSubConfigRef sub_conf;
884 rgw_pubsub_sub_config user_sub_conf;
eafe8130 885
11fdf7f2
TL
886 public:
887 GetSubCR(RGWDataSyncEnv *_sync_env,
888 PSManagerRef& _mgr,
889 const rgw_user& _owner,
890 const string& _sub_name,
891 PSSubscriptionRef *_ref) : RGWSingletonCR<PSSubscriptionRef>(_sync_env->cct),
892 sync_env(_sync_env),
893 mgr(_mgr),
894 owner(_owner),
895 sub_name(_sub_name),
896 ref(_ref),
897 conf(mgr->env->conf) {
898 }
eafe8130 899 ~GetSubCR() { }
11fdf7f2
TL
900
901 int operate() override {
902 reenter(this) {
903 if (owner.empty()) {
904 if (!conf->find_sub(sub_name, &sub_conf)) {
eafe8130 905 ldout(sync_env->cct, 10) << "failed to find subscription config: name=" << sub_name << dendl;
11fdf7f2
TL
906 mgr->remove_get_sub(owner, sub_name);
907 return set_cr_error(-ENOENT);
908 }
909
910 *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf);
911 } else {
912 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
913 yield {
914 RGWUserPubSub ups(sync_env->store, owner);
915 rgw_raw_obj obj;
916 ups.get_sub_meta_obj(sub_name, &obj);
917 bool empty_on_enoent = false;
918 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj,
919 obj,
920 &user_sub_conf, empty_on_enoent));
921 }
922 if (retcode < 0) {
923 mgr->remove_get_sub(owner, sub_name);
924 return set_cr_error(retcode);
925 }
926
927 *ref = PSSubscription::get_shared(sync_env, mgr->env, user_sub_conf);
928 }
929
930 yield (*ref)->call_init_cr(this);
931 if (retcode < 0) {
eafe8130 932 ldout(sync_env->cct, 10) << "failed to init subscription" << dendl;
11fdf7f2
TL
933 mgr->remove_get_sub(owner, sub_name);
934 return set_cr_error(retcode);
935 }
936
937 if (owner.empty()) {
938 mgr->subs[sub_name] = *ref;
939 }
940 mgr->remove_get_sub(owner, sub_name);
941
942 return set_cr_done();
943 }
944 return 0;
945 }
946
947 void return_result(PSSubscriptionRef *result) override {
948 ldout(cct, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl;
949 if (retcode >= 0) {
950 *result = *ref;
951 }
952 }
953 };
954
955 string sub_id(const rgw_user& owner, const string& sub_name) {
956 string owner_prefix;
957 if (!owner.empty()) {
958 owner_prefix = owner.to_str() + "/";
959 }
960
961 return owner_prefix + sub_name;
962 }
963
eafe8130 964 std::map<std::string, GetSubCR *> get_subs;
11fdf7f2
TL
965
966 GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) {
967 return get_subs[sub_id(owner, name)];
968 }
969
970 void remove_get_sub(const rgw_user& owner, const string& name) {
971 get_subs.erase(sub_id(owner, name));
972 }
973
974 bool find_sub_instance(const rgw_user& owner, const string& sub_name, PSSubscriptionRef *sub) {
975 auto iter = subs.find(sub_id(owner, sub_name));
976 if (iter != subs.end()) {
977 *sub = iter->second;
978 return true;
979 }
980 return false;
981 }
982
983 PSManager(RGWDataSyncEnv *_sync_env,
984 PSEnvRef _env) : sync_env(_sync_env),
985 env(_env) {}
986
987public:
988 static PSManagerRef get_shared(RGWDataSyncEnv *_sync_env,
989 PSEnvRef _env) {
990 return std::shared_ptr<PSManager>(new PSManager(_sync_env, _env));
991 }
992
993 static int call_get_subscription_cr(RGWDataSyncEnv *sync_env, PSManagerRef& mgr,
994 RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) {
995 if (mgr->find_sub_instance(owner, sub_name, ref)) {
996 /* found it! nothing to execute */
997 ldout(sync_env->cct, 20) << __func__ << "(): found sub instance" << dendl;
998 }
999 auto& gs = mgr->get_get_subs(owner, sub_name);
1000 if (!gs) {
1001 ldout(sync_env->cct, 20) << __func__ << "(): first get subs" << dendl;
1002 gs = new GetSubCR(sync_env, mgr, owner, sub_name, ref);
1003 }
1004 ldout(sync_env->cct, 20) << __func__ << "(): executing get subs" << dendl;
1005 return gs->execute(caller, ref);
1006 }
1007
1008 friend class GetSubCR;
1009};
1010
1011void PSEnv::init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr) {
1012 manager = mgr;
1013 conf->init_instance(realm, instance_id);
1014}
1015
1016class RGWPSInitEnvCBCR : public RGWCoroutine {
1017 RGWDataSyncEnv *sync_env;
1018 PSEnvRef env;
1019 PSConfigRef& conf;
1020
1021 rgw_user_create_params create_user;
1022 rgw_get_user_info_params get_user_info;
1023public:
1024 RGWPSInitEnvCBCR(RGWDataSyncEnv *_sync_env,
1025 PSEnvRef& _env) : RGWCoroutine(_sync_env->cct),
1026 sync_env(_sync_env),
1027 env(_env), conf(env->conf) {}
1028 int operate() override {
1029 reenter(this) {
1030 ldout(sync_env->cct, 0) << ": init pubsub config zone=" << sync_env->source_zone << dendl;
1031
1032 /* nothing to do here right now */
1033 create_user.user = conf->user;
1034 create_user.max_buckets = 0; /* unlimited */
1035 create_user.display_name = "pubsub";
1036 create_user.generate_key = false;
1037 yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user));
1038 if (retcode < 0) {
1039 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
1040 return set_cr_error(retcode);
1041 }
1042
1043 get_user_info.user = conf->user;
1044 yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info));
1045 if (retcode < 0) {
1046 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
1047 return set_cr_error(retcode);
1048 }
1049
1050 ldout(sync_env->cct, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env->data_user_info, true) << dendl;
1051
1052
1053 return set_cr_done();
1054 }
1055 return 0;
1056 }
1057};
1058
eafe8130
TL
1059bool match(const rgw_pubsub_topic_filter& filter, const std::string& key_name, rgw::notify::EventType event_type) {
1060 if (!match(filter.events, event_type)) {
1061 return false;
1062 }
1063 if (!match(filter.s3_filter.key_filter, key_name)) {
1064 return false;
1065 }
1066 return true;
1067}
1068
11fdf7f2
TL
1069class RGWPSFindBucketTopicsCR : public RGWCoroutine {
1070 RGWDataSyncEnv *sync_env;
1071 PSEnvRef env;
1072 rgw_user owner;
1073 rgw_bucket bucket;
1074 rgw_obj_key key;
eafe8130 1075 rgw::notify::EventType event_type;
11fdf7f2
TL
1076
1077 RGWUserPubSub ups;
1078
1079 rgw_raw_obj bucket_obj;
1080 rgw_raw_obj user_obj;
1081 rgw_pubsub_bucket_topics bucket_topics;
1082 rgw_pubsub_user_topics user_topics;
1083 TopicsRef *topics;
1084public:
1085 RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env,
1086 PSEnvRef& _env,
1087 const rgw_user& _owner,
1088 const rgw_bucket& _bucket,
1089 const rgw_obj_key& _key,
eafe8130 1090 rgw::notify::EventType _event_type,
11fdf7f2
TL
1091 TopicsRef *_topics) : RGWCoroutine(_sync_env->cct),
1092 sync_env(_sync_env),
1093 env(_env),
1094 owner(_owner),
1095 bucket(_bucket),
1096 key(_key),
eafe8130 1097 event_type(_event_type),
11fdf7f2
TL
1098 ups(_sync_env->store, owner),
1099 topics(_topics) {
1100 *topics = std::make_shared<vector<PSTopicConfigRef> >();
1101 }
1102 int operate() override {
1103 reenter(this) {
1104 ups.get_bucket_meta_obj(bucket, &bucket_obj);
1105 ups.get_user_meta_obj(&user_obj);
1106
1107 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
1108 yield {
1109 bool empty_on_enoent = true;
1110 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj,
1111 bucket_obj,
1112 &bucket_topics, empty_on_enoent));
1113 }
1114 if (retcode < 0 && retcode != -ENOENT) {
1115 return set_cr_error(retcode);
1116 }
1117
1118 ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
1119
1120 if (!bucket_topics.topics.empty()) {
1121 using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>;
1122 yield {
1123 bool empty_on_enoent = true;
1124 call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj,
1125 user_obj,
1126 &user_topics, empty_on_enoent));
1127 }
1128 if (retcode < 0 && retcode != -ENOENT) {
1129 return set_cr_error(retcode);
1130 }
1131 }
1132
1133 for (auto& titer : bucket_topics.topics) {
1134 auto& topic_filter = titer.second;
1135 auto& info = topic_filter.topic;
eafe8130 1136 if (!match(topic_filter, key.name, event_type)) {
11fdf7f2
TL
1137 continue;
1138 }
eafe8130 1139 std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
11fdf7f2
TL
1140 tc->name = info.name;
1141 tc->subs = user_topics.topics[info.name].subs;
1142 (*topics)->push_back(tc);
1143 }
1144
1145 env->conf->get_topics(sync_env->cct, bucket, key, topics);
1146 return set_cr_done();
1147 }
1148 return 0;
1149 }
1150};
1151
1152class RGWPSHandleObjEventCR : public RGWCoroutine {
1153 RGWDataSyncEnv* const sync_env;
1154 const PSEnvRef env;
1155 const rgw_user& owner;
eafe8130
TL
1156 const EventRef<rgw_pubsub_event> event;
1157 const EventRef<rgw_pubsub_s3_record> record;
11fdf7f2
TL
1158 const TopicsRef topics;
1159 const std::array<rgw_user, 2> owners;
1160 bool has_subscriptions;
1161 bool event_handled;
1162 bool sub_conf_found;
1163 PSSubscriptionRef sub;
1164 std::array<rgw_user, 2>::const_iterator oiter;
eafe8130
TL
1165 std::vector<PSTopicConfigRef>::const_iterator titer;
1166 std::set<string>::const_iterator siter;
11fdf7f2
TL
1167 int last_sub_conf_error;
1168
1169public:
1170 RGWPSHandleObjEventCR(RGWDataSyncEnv* const _sync_env,
1171 const PSEnvRef _env,
1172 const rgw_user& _owner,
eafe8130
TL
1173 const EventRef<rgw_pubsub_event>& _event,
1174 const EventRef<rgw_pubsub_s3_record>& _record,
11fdf7f2
TL
1175 const TopicsRef& _topics) : RGWCoroutine(_sync_env->cct),
1176 sync_env(_sync_env),
1177 env(_env),
1178 owner(_owner),
1179 event(_event),
eafe8130 1180 record(_record),
11fdf7f2
TL
1181 topics(_topics),
1182 owners({owner, rgw_user{}}),
1183 has_subscriptions(false),
1184 event_handled(false) {}
1185
1186 int operate() override {
1187 reenter(this) {
eafe8130 1188 ldout(sync_env->cct, 20) << ": handle event: obj: z=" << sync_env->source_zone
11fdf7f2
TL
1189 << " event=" << json_str("event", *event, false)
1190 << " owner=" << owner << dendl;
1191
1192 ldout(sync_env->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl;
eafe8130
TL
1193
1194 // outside caller should check that
1195 ceph_assert(!topics->empty());
11fdf7f2
TL
1196
1197 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered);
1198
eafe8130 1199 // loop over all topics related to the bucket/object
11fdf7f2 1200 for (titer = topics->begin(); titer != topics->end(); ++titer) {
eafe8130 1201 ldout(sync_env->cct, 20) << ": notification for " << event->source << ": topic=" <<
11fdf7f2 1202 (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
eafe8130 1203 // loop over all subscriptions of the topic
11fdf7f2 1204 for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
eafe8130 1205 ldout(sync_env->cct, 20) << ": subscription: " << *siter << dendl;
11fdf7f2
TL
1206 has_subscriptions = true;
1207 sub_conf_found = false;
eafe8130
TL
1208 // try to read subscription configuration from global/user cond
1209 // configuration is considered missing only if does not exist in either
11fdf7f2 1210 for (oiter = owners.begin(); oiter != owners.end(); ++oiter) {
11fdf7f2
TL
1211 yield PSManager::call_get_subscription_cr(sync_env, env->manager, this, *oiter, *siter, &sub);
1212 if (retcode < 0) {
eafe8130
TL
1213 if (sub_conf_found) {
1214 // not a real issue, sub conf already found
1215 retcode = 0;
1216 }
11fdf7f2
TL
1217 last_sub_conf_error = retcode;
1218 continue;
1219 }
1220 sub_conf_found = true;
eafe8130
TL
1221 if (sub->sub_conf->s3_id.empty()) {
1222 // subscription was not made by S3 compatible API
1223 ldout(sync_env->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
1224 yield call(PSSubscription::store_event_cr(sync_env, sub, event));
1225 if (retcode < 0) {
1226 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
1227 ldout(sync_env->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
1228 } else {
1229 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
1230 event_handled = true;
1231 }
1232 if (sub->sub_conf->push_endpoint) {
11fdf7f2 1233 ldout(sync_env->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
eafe8130
TL
1234 yield call(PSSubscription::push_event_cr(sync_env, sub, event));
1235 if (retcode < 0) {
1236 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
1237 ldout(sync_env->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
1238 } else {
1239 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
1240 event_handled = true;
1241 }
1242 }
1243 } else {
1244 // subscription was made by S3 compatible API
1245 ldout(sync_env->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
1246 record->configurationId = sub->sub_conf->s3_id;
1247 yield call(PSSubscription::store_event_cr(sync_env, sub, record));
11fdf7f2 1248 if (retcode < 0) {
eafe8130
TL
1249 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
1250 ldout(sync_env->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
11fdf7f2 1251 } else {
eafe8130 1252 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
11fdf7f2
TL
1253 event_handled = true;
1254 }
eafe8130
TL
1255 if (sub->sub_conf->push_endpoint) {
1256 ldout(sync_env->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
1257 yield call(PSSubscription::push_event_cr(sync_env, sub, record));
1258 if (retcode < 0) {
1259 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
1260 ldout(sync_env->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
1261 } else {
1262 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
1263 event_handled = true;
1264 }
1265 }
11fdf7f2
TL
1266 }
1267 }
1268 if (!sub_conf_found) {
1269 // could not find conf for subscription at user or global levels
eafe8130
TL
1270 if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
1271 ldout(sync_env->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
11fdf7f2 1272 << " ret=" << last_sub_conf_error << dendl;
eafe8130
TL
1273 if (retcode == -ENOENT) {
1274 // missing subscription info should be reflected back as invalid argument
1275 // and not as missing object
1276 retcode = -EINVAL;
1277 }
11fdf7f2
TL
1278 }
1279 }
1280 }
1281 if (has_subscriptions && !event_handled) {
1282 // event is considered "lost" of it has subscriptions on any of its topics
1283 // but it was not stored in, or pushed to, any of them
1284 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost);
1285 }
1286 if (retcode < 0) {
1287 return set_cr_error(retcode);
1288 }
1289 return set_cr_done();
1290 }
1291 return 0;
1292 }
1293};
1294
eafe8130 1295// coroutine invoked on remote object creation
11fdf7f2
TL
1296class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
1297 RGWDataSyncEnv *sync_env;
1298 PSEnvRef env;
1299 std::optional<uint64_t> versioned_epoch;
eafe8130
TL
1300 EventRef<rgw_pubsub_event> event;
1301 EventRef<rgw_pubsub_s3_record> record;
11fdf7f2
TL
1302 TopicsRef topics;
1303public:
1304 RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
1305 RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
1306 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
1307 TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
1308 sync_env(_sync_env),
1309 env(_env),
1310 versioned_epoch(_versioned_epoch),
1311 topics(_topics) {
1312 }
1313 int operate() override {
1314 reenter(this) {
eafe8130 1315 ldout(sync_env->cct, 20) << ": stat of remote obj: z=" << sync_env->source_zone
11fdf7f2
TL
1316 << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
1317 << " attrs=" << attrs << dendl;
1318 {
1319 std::vector<std::pair<std::string, std::string> > attrs;
1320 for (auto& attr : attrs) {
1321 string k = attr.first;
1322 if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) {
1323 k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
1324 }
1325 attrs.push_back(std::make_pair(k, attr.second));
eafe8130
TL
1326 }
1327 // at this point we don't know whether we need the ceph event or S3 record
1328 // this is why both are created here, once we have information about the
1329 // subscription, we will store/push only the relevant ones
11fdf7f2
TL
1330 make_event_ref(sync_env->cct,
1331 bucket_info.bucket, key,
1332 mtime, &attrs,
eafe8130
TL
1333 rgw::notify::ObjectCreated, &event);
1334 make_s3_record_ref(sync_env->cct,
1335 bucket_info.bucket, bucket_info.owner, key,
1336 mtime, &attrs,
1337 rgw::notify::ObjectCreated, &record);
11fdf7f2
TL
1338 }
1339
eafe8130 1340 yield call(new RGWPSHandleObjEventCR(sync_env, env, bucket_info.owner, event, record, topics));
11fdf7f2
TL
1341 if (retcode < 0) {
1342 return set_cr_error(retcode);
1343 }
1344 return set_cr_done();
1345 }
1346 return 0;
1347 }
1348};
1349
1350class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
1351 PSEnvRef env;
1352 std::optional<uint64_t> versioned_epoch;
1353 TopicsRef topics;
1354public:
1355 RGWPSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
1356 RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
1357 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
1358 TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
1359 env(_env), versioned_epoch(_versioned_epoch),
1360 topics(_topics) {
1361 }
1362
1363 ~RGWPSHandleRemoteObjCR() override {}
1364
1365 RGWStatRemoteObjCBCR *allocate_callback() override {
1366 return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, env, versioned_epoch, topics);
1367 }
1368};
1369
1370class RGWPSHandleObjCreateCR : public RGWCoroutine {
1371
1372 RGWDataSyncEnv *sync_env;
1373 RGWBucketInfo bucket_info;
1374 rgw_obj_key key;
1375 PSEnvRef env;
1376 std::optional<uint64_t> versioned_epoch;
1377 TopicsRef topics;
1378public:
1379 RGWPSHandleObjCreateCR(RGWDataSyncEnv *_sync_env,
1380 RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
1381 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch) : RGWCoroutine(_sync_env->cct),
1382 sync_env(_sync_env),
1383 bucket_info(_bucket_info),
1384 key(_key),
1385 env(_env),
1386 versioned_epoch(_versioned_epoch) {
1387 }
1388
1389 ~RGWPSHandleObjCreateCR() override {}
1390
1391 int operate() override {
1392 reenter(this) {
1393 yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner,
1394 bucket_info.bucket, key,
eafe8130 1395 rgw::notify::ObjectCreated,
11fdf7f2
TL
1396 &topics));
1397 if (retcode < 0) {
eafe8130 1398 ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
11fdf7f2
TL
1399 return set_cr_error(retcode);
1400 }
1401 if (topics->empty()) {
1402 ldout(sync_env->cct, 20) << "no topics found for " << bucket_info.bucket << "/" << key << dendl;
1403 return set_cr_done();
1404 }
1405 yield call(new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch, topics));
1406 if (retcode < 0) {
1407 return set_cr_error(retcode);
1408 }
1409 return set_cr_done();
1410 }
1411 return 0;
1412 }
1413};
1414
eafe8130 1415// coroutine invoked on remote object deletion
11fdf7f2
TL
1416class RGWPSGenericObjEventCBCR : public RGWCoroutine {
1417 RGWDataSyncEnv *sync_env;
1418 PSEnvRef env;
1419 rgw_user owner;
1420 rgw_bucket bucket;
1421 rgw_obj_key key;
1422 ceph::real_time mtime;
eafe8130
TL
1423 rgw::notify::EventType event_type;
1424 EventRef<rgw_pubsub_event> event;
1425 EventRef<rgw_pubsub_s3_record> record;
11fdf7f2
TL
1426 TopicsRef topics;
1427public:
1428 RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env,
1429 PSEnvRef _env,
1430 RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
eafe8130 1431 rgw::notify::EventType _event_type) : RGWCoroutine(_sync_env->cct),
11fdf7f2
TL
1432 sync_env(_sync_env),
1433 env(_env),
1434 owner(_bucket_info.owner),
1435 bucket(_bucket_info.bucket),
1436 key(_key),
eafe8130 1437 mtime(_mtime), event_type(_event_type) {}
11fdf7f2
TL
1438 int operate() override {
1439 reenter(this) {
eafe8130 1440 ldout(sync_env->cct, 20) << ": remove remote obj: z=" << sync_env->source_zone
11fdf7f2 1441 << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
eafe8130 1442 yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_type, &topics));
11fdf7f2 1443 if (retcode < 0) {
eafe8130 1444 ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
11fdf7f2
TL
1445 return set_cr_error(retcode);
1446 }
1447 if (topics->empty()) {
1448 ldout(sync_env->cct, 20) << "no topics found for " << bucket << "/" << key << dendl;
1449 return set_cr_done();
1450 }
eafe8130
TL
1451 // at this point we don't know whether we need the ceph event or S3 record
1452 // this is why both are created here, once we have information about the
1453 // subscription, we will store/push only the relevant ones
1454 make_event_ref(sync_env->cct,
1455 bucket, key,
1456 mtime, nullptr,
1457 event_type, &event);
1458 make_s3_record_ref(sync_env->cct,
1459 bucket, owner, key,
1460 mtime, nullptr,
1461 event_type, &record);
1462 yield call(new RGWPSHandleObjEventCR(sync_env, env, owner, event, record, topics));
11fdf7f2
TL
1463 if (retcode < 0) {
1464 return set_cr_error(retcode);
1465 }
1466 return set_cr_done();
1467 }
1468 return 0;
1469 }
1470
1471};
1472
1473class RGWPSDataSyncModule : public RGWDataSyncModule {
1474 PSEnvRef env;
1475 PSConfigRef& conf;
eafe8130 1476
11fdf7f2
TL
1477public:
1478 RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared<PSEnv>()), conf(env->conf) {
1479 env->init(cct, config);
1480 }
eafe8130 1481
11fdf7f2
TL
1482 ~RGWPSDataSyncModule() override {}
1483
1484 void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
1485 PSManagerRef mgr = PSManager::get_shared(sync_env, env);
1486 env->init_instance(sync_env->store->svc.zone->get_realm(), instance_id, mgr);
1487 }
1488
1489 RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) override {
1490 ldout(sync_env->cct, 5) << conf->id << ": start" << dendl;
1491 return new RGWPSInitEnvCBCR(sync_env, env);
1492 }
eafe8130
TL
1493
1494 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info,
1495 rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
1496 ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket <<
1497 " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
11fdf7f2
TL
1498 return new RGWPSHandleObjCreateCR(sync_env, bucket_info, key, env, versioned_epoch);
1499 }
eafe8130
TL
1500
1501 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info,
1502 rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
1503 ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket <<
1504 " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1505 return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDelete);
11fdf7f2 1506 }
eafe8130
TL
1507
1508 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info,
1509 rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
1510 ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket <<
1511 " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1512 return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated);
11fdf7f2
TL
1513 }
1514
1515 PSConfigRef& get_conf() { return conf; }
1516};
1517
1518RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config)
1519{
1520 data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config));
1521 string jconf = json_str("conf", *data_handler->get_conf());
1522 JSONParser p;
1523 if (!p.parse(jconf.c_str(), jconf.size())) {
eafe8130 1524 ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
11fdf7f2
TL
1525 effective_conf = config;
1526 } else {
1527 effective_conf.decode_json(&p);
1528 }
eafe8130
TL
1529#ifdef WITH_RADOSGW_AMQP_ENDPOINT
1530 if (!rgw::amqp::init(cct)) {
1531 ldout(cct, 1) << "ERROR: failed to initialize AMQP manager in pubsub sync module" << dendl;
1532 }
1533#endif
1534}
1535
1536RGWPSSyncModuleInstance::~RGWPSSyncModuleInstance() {
1537#ifdef WITH_RADOSGW_AMQP_ENDPOINT
1538 rgw::amqp::shutdown();
1539#endif
11fdf7f2
TL
1540}
1541
1542RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()
1543{
1544 return data_handler.get();
1545}
1546
1547RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
1548 if (dialect != RGW_REST_S3) {
1549 return orig;
1550 }
eafe8130
TL
1551 return new RGWRESTMgr_PubSub();
1552}
1553
1554bool RGWPSSyncModuleInstance::should_full_sync() const {
1555 return data_handler->get_conf()->start_with_full_sync;
11fdf7f2
TL
1556}
1557
1558int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
1559 instance->reset(new RGWPSSyncModuleInstance(cct, config));
1560 return 0;
1561}
1562
eafe8130 1563