]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_module_pubsub.cc
import ceph 15.2.14
[ceph.git] / ceph / src / rgw / rgw_sync_module_pubsub.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "services/svc_zone.h"
5 #include "rgw_common.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_sync_module.h"
8 #include "rgw_data_sync.h"
9 #include "rgw_sync_module_pubsub.h"
10 #include "rgw_sync_module_pubsub_rest.h"
11 #include "rgw_rest_conn.h"
12 #include "rgw_cr_rados.h"
13 #include "rgw_cr_rest.h"
14 #include "rgw_cr_tools.h"
15 #include "rgw_op.h"
16 #include "rgw_pubsub.h"
17 #include "rgw_pubsub_push.h"
18 #include "rgw_notify_event_type.h"
19 #include "rgw_perf_counters.h"
20
21 #include <boost/algorithm/hex.hpp>
22 #include <boost/asio/yield.hpp>
23
24 #define dout_subsys ceph_subsys_rgw
25
26
27 #define PUBSUB_EVENTS_RETENTION_DEFAULT 7
28
29 /*
30
31 config:
32
33 {
34 "tenant": <tenant>, # default: <empty>
35 "uid": <uid>, # default: "pubsub"
36 "data_bucket_prefix": <prefix> # default: "pubsub-"
37 "data_oid_prefix": <prefix> #
38 "events_retention_days": <int> # default: 7
39 "start_with_full_sync" <bool> # default: false
40
41 # non-dynamic config
42 "notifications": [
43 {
44 "path": <notification-path>, # this can be either an explicit path: <bucket>, or <bucket>/<object>,
45 # or a prefix if it ends with a wildcard
46 "topic": <topic-name>
47 },
48 ...
49 ],
50 "subscriptions": [
51 {
52 "name": <subscription-name>,
53 "topic": <topic>,
54 "push_endpoint": <endpoint>,
55 "push_endpoint_args:" <arg list>. # any push endpoint specific args (include all args)
56 "data_bucket": <bucket>, # override name of bucket where subscription data will be store
57 "data_oid_prefix": <prefix> # set prefix for subscription data object ids
58 "s3_id": <id> # in case of S3 compatible notifications, the notification ID will be set here
59 },
60 ...
61 ]
62 }
63
64 */
65
66 // utility function to convert the args list from string format
67 // (ampresend separated with equal sign) to prased structure
68 RGWHTTPArgs string_to_args(const std::string& str_args) {
69 RGWHTTPArgs args;
70 args.set(str_args);
71 args.parse();
72 return args;
73 }
74
75 struct PSSubConfig {
76 std::string name;
77 std::string topic;
78 std::string push_endpoint_name;
79 std::string push_endpoint_args;
80 std::string data_bucket_name;
81 std::string data_oid_prefix;
82 std::string s3_id;
83 std::string arn_topic;
84 RGWPubSubEndpoint::Ptr push_endpoint;
85
86 void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc) {
87 name = uc.name;
88 topic = uc.topic;
89 push_endpoint_name = uc.dest.push_endpoint;
90 data_bucket_name = uc.dest.bucket_name;
91 data_oid_prefix = uc.dest.oid_prefix;
92 s3_id = uc.s3_id;
93 arn_topic = uc.dest.arn_topic;
94 if (!push_endpoint_name.empty()) {
95 push_endpoint_args = uc.dest.push_endpoint_args;
96 try {
97 push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
98 ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
99 } catch (const RGWPubSubEndpoint::configuration_error& e) {
100 ldout(cct, 1) << "ERROR: failed to create push endpoint: "
101 << push_endpoint_name << " due to: " << e.what() << dendl;
102 }
103 }
104 }
105
106 void dump(Formatter *f) const {
107 encode_json("name", name, f);
108 encode_json("topic", topic, f);
109 encode_json("push_endpoint", push_endpoint_name, f);
110 encode_json("push_endpoint_args", push_endpoint_args, f);
111 encode_json("data_bucket_name", data_bucket_name, f);
112 encode_json("data_oid_prefix", data_oid_prefix, f);
113 encode_json("s3_id", s3_id, f);
114 }
115
116 void init(CephContext *cct, const JSONFormattable& config,
117 const string& data_bucket_prefix,
118 const string& default_oid_prefix) {
119 name = config["name"];
120 topic = config["topic"];
121 push_endpoint_name = config["push_endpoint"];
122 string default_bucket_name = data_bucket_prefix + name;
123 data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
124 data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str());
125 s3_id = config["s3_id"];
126 arn_topic = config["arn_topic"];
127 if (!push_endpoint_name.empty()) {
128 push_endpoint_args = config["push_endpoint_args"];
129 try {
130 push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
131 ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
132 } catch (const RGWPubSubEndpoint::configuration_error& e) {
133 ldout(cct, 1) << "ERROR: failed to create push endpoint: "
134 << push_endpoint_name << " due to: " << e.what() << dendl;
135 }
136 }
137 }
138 };
139
140 using PSSubConfigRef = std::shared_ptr<PSSubConfig>;
141
142 struct PSTopicConfig {
143 std::string name;
144 std::set<std::string> subs;
145 std::string opaque_data;
146
147 void dump(Formatter *f) const {
148 encode_json("name", name, f);
149 encode_json("subs", subs, f);
150 encode_json("opaque", opaque_data, f);
151 }
152 };
153
154 struct PSNotificationConfig {
155 uint64_t id{0};
156 string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */
157 string topic;
158 bool is_prefix{false};
159
160
161 void dump(Formatter *f) const {
162 encode_json("id", id, f);
163 encode_json("path", path, f);
164 encode_json("topic", topic, f);
165 encode_json("is_prefix", is_prefix, f);
166 }
167
168 void init(CephContext *cct, const JSONFormattable& config) {
169 path = config["path"];
170 if (!path.empty() && path[path.size() - 1] == '*') {
171 path = path.substr(0, path.size() - 1);
172 is_prefix = true;
173 }
174 topic = config["topic"];
175 }
176 };
177
178 template<class T>
179 static string json_str(const char *name, const T& obj, bool pretty = false)
180 {
181 stringstream ss;
182 JSONFormatter f(pretty);
183
184 encode_json(name, obj, &f);
185 f.flush(ss);
186
187 return ss.str();
188 }
189
190 using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
191 using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
192
193 struct PSConfig {
194 const std::string id{"pubsub"};
195 rgw_user user;
196 std::string data_bucket_prefix;
197 std::string data_oid_prefix;
198
199 int events_retention_days{0};
200
201 uint64_t sync_instance{0};
202 uint64_t max_id{0};
203
204 /* FIXME: no hard coded buckets, we'll have configurable topics */
205 std::map<std::string, PSSubConfigRef> subs;
206 std::map<std::string, PSTopicConfigRef> topics;
207 std::multimap<std::string, PSNotificationConfig> notifications;
208
209 bool start_with_full_sync{false};
210
211 void dump(Formatter *f) const {
212 encode_json("id", id, f);
213 encode_json("user", user, f);
214 encode_json("data_bucket_prefix", data_bucket_prefix, f);
215 encode_json("data_oid_prefix", data_oid_prefix, f);
216 encode_json("events_retention_days", events_retention_days, f);
217 encode_json("sync_instance", sync_instance, f);
218 encode_json("max_id", max_id, f);
219 {
220 Formatter::ArraySection section(*f, "subs");
221 for (auto& sub : subs) {
222 encode_json("sub", *sub.second, f);
223 }
224 }
225 {
226 Formatter::ArraySection section(*f, "topics");
227 for (auto& topic : topics) {
228 encode_json("topic", *topic.second, f);
229 }
230 }
231 {
232 Formatter::ObjectSection section(*f, "notifications");
233 std::string last;
234 for (auto& notif : notifications) {
235 const string& n = notif.first;
236 if (n != last) {
237 if (!last.empty()) {
238 f->close_section();
239 }
240 f->open_array_section(n.c_str());
241 }
242 last = n;
243 encode_json("notifications", notif.second, f);
244 }
245 if (!last.empty()) {
246 f->close_section();
247 }
248 }
249 encode_json("start_with_full_sync", start_with_full_sync, f);
250 }
251
252 void init(CephContext *cct, const JSONFormattable& config) {
253 string uid = config["uid"]("pubsub");
254 user = rgw_user(config["tenant"], uid);
255 data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
256 data_oid_prefix = config["data_oid_prefix"];
257 events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT);
258
259 for (auto& c : config["notifications"].array()) {
260 PSNotificationConfig nc;
261 nc.id = ++max_id;
262 nc.init(cct, c);
263 notifications.insert(std::make_pair(nc.path, nc));
264
265 PSTopicConfig topic_config = { .name = nc.topic };
266 topics[nc.topic] = make_shared<PSTopicConfig>(topic_config);
267 }
268 for (auto& c : config["subscriptions"].array()) {
269 auto sc = std::make_shared<PSSubConfig>();
270 sc->init(cct, c, data_bucket_prefix, data_oid_prefix);
271 subs[sc->name] = sc;
272 auto iter = topics.find(sc->topic);
273 if (iter != topics.end()) {
274 iter->second->subs.insert(sc->name);
275 }
276 }
277 start_with_full_sync = config["start_with_full_sync"](false);
278
279 ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
280 }
281
282 void init_instance(const RGWRealm& realm, uint64_t instance_id) {
283 sync_instance = instance_id;
284 }
285
286 void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
287 const std::string path = bucket.name + "/" + key.name;
288
289 auto iter = notifications.upper_bound(path);
290 if (iter == notifications.begin()) {
291 return;
292 }
293
294 do {
295 --iter;
296 if (iter->first.size() > path.size()) {
297 break;
298 }
299 if (path.compare(0, iter->first.size(), iter->first) != 0) {
300 break;
301 }
302
303 PSNotificationConfig& target = iter->second;
304
305 if (!target.is_prefix &&
306 path.size() != iter->first.size()) {
307 continue;
308 }
309
310 auto topic = topics.find(target.topic);
311 if (topic == topics.end()) {
312 continue;
313 }
314
315 ldout(cct, 20) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id <<
316 " target_path=" << target.path << ", topic=" << target.topic << dendl;
317 (*result)->push_back(topic->second);
318 } while (iter != notifications.begin());
319 }
320
321 bool find_sub(const string& name, PSSubConfigRef *ref) {
322 auto iter = subs.find(name);
323 if (iter != subs.end()) {
324 *ref = iter->second;
325 return true;
326 }
327 return false;
328 }
329 };
330
331 using PSConfigRef = std::shared_ptr<PSConfig>;
332 template<typename EventType>
333 using EventRef = std::shared_ptr<EventType>;
334
335 struct objstore_event {
336 string id;
337 const rgw_bucket& bucket;
338 const rgw_obj_key& key;
339 const ceph::real_time& mtime;
340 const std::vector<std::pair<std::string, std::string> > *attrs;
341
342 objstore_event(const rgw_bucket& _bucket,
343 const rgw_obj_key& _key,
344 const ceph::real_time& _mtime,
345 const std::vector<std::pair<std::string, std::string> > *_attrs) : bucket(_bucket),
346 key(_key),
347 mtime(_mtime),
348 attrs(_attrs) {}
349
350 string get_hash() {
351 string etag;
352 RGWMD5Etag hash;
353 hash.update(bucket.bucket_id);
354 hash.update(key.name);
355 hash.update(key.instance);
356 hash.finish(&etag);
357
358 assert(etag.size() > 8);
359
360 return etag.substr(0, 8);
361 }
362
363 void dump(Formatter *f) const {
364 {
365 Formatter::ObjectSection s(*f, "bucket");
366 encode_json("name", bucket.name, f);
367 encode_json("tenant", bucket.tenant, f);
368 encode_json("bucket_id", bucket.bucket_id, f);
369 }
370 {
371 Formatter::ObjectSection s(*f, "key");
372 encode_json("name", key.name, f);
373 encode_json("instance", key.instance, f);
374 }
375 utime_t mt(mtime);
376 encode_json("mtime", mt, f);
377 Formatter::ObjectSection s(*f, "attrs");
378 if (attrs) {
379 for (auto& attr : *attrs) {
380 encode_json(attr.first.c_str(), attr.second.c_str(), f);
381 }
382 }
383 }
384 };
385
386 static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
387 const rgw_obj_key& key,
388 const ceph::real_time& mtime,
389 const std::vector<std::pair<std::string, std::string> > *attrs,
390 rgw::notify::EventType event_type,
391 EventRef<rgw_pubsub_event> *event) {
392 *event = std::make_shared<rgw_pubsub_event>();
393
394 EventRef<rgw_pubsub_event>& e = *event;
395 e->event_name = rgw::notify::to_ceph_string(event_type);
396 e->source = bucket.name + "/" + key.name;
397 e->timestamp = real_clock::now();
398
399 objstore_event oevent(bucket, key, mtime, attrs);
400
401 const utime_t ts(e->timestamp);
402 set_event_id(e->id, oevent.get_hash(), ts);
403
404 encode_json("info", oevent, &e->info);
405 }
406
407 static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket,
408 const rgw_user& owner,
409 const rgw_obj_key& key,
410 const ceph::real_time& mtime,
411 const std::vector<std::pair<std::string, std::string> > *attrs,
412 rgw::notify::EventType event_type,
413 EventRef<rgw_pubsub_s3_record> *record) {
414 *record = std::make_shared<rgw_pubsub_s3_record>();
415
416 EventRef<rgw_pubsub_s3_record>& r = *record;
417 r->eventTime = mtime;
418 r->eventName = rgw::notify::to_string(event_type);
419 // userIdentity: not supported in sync module
420 // x_amz_request_id: not supported in sync module
421 // x_amz_id_2: not supported in sync module
422 // configurationId is filled from subscription configuration
423 r->bucket_name = bucket.name;
424 r->bucket_ownerIdentity = owner.to_str();
425 r->bucket_arn = to_string(rgw::ARN(bucket));
426 r->bucket_id = bucket.bucket_id; // rgw extension
427 r->object_key = key.name;
428 // object_size not supported in sync module
429 objstore_event oevent(bucket, key, mtime, attrs);
430 r->object_etag = oevent.get_hash();
431 r->object_versionId = key.instance;
432
433 // use timestamp as per key sequence id (hex encoded)
434 const utime_t ts(real_clock::now());
435 boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
436 std::back_inserter(r->object_sequencer));
437
438 set_event_id(r->id, r->object_etag, ts);
439 }
440
441 class PSManager;
442 using PSManagerRef = std::shared_ptr<PSManager>;
443
444 struct PSEnv {
445 PSConfigRef conf;
446 shared_ptr<RGWUserInfo> data_user_info;
447 PSManagerRef manager;
448
449 PSEnv() : conf(make_shared<PSConfig>()),
450 data_user_info(make_shared<RGWUserInfo>()) {}
451
452 void init(CephContext *cct, const JSONFormattable& config) {
453 conf->init(cct, config);
454 }
455
456 void init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr);
457 };
458
459 using PSEnvRef = std::shared_ptr<PSEnv>;
460
461 template<typename EventType>
462 class PSEvent {
463 const EventRef<EventType> event;
464
465 public:
466 PSEvent(const EventRef<EventType>& _event) : event(_event) {}
467
468 void format(bufferlist *bl) const {
469 bl->append(json_str("", *event));
470 }
471
472 void encode_event(bufferlist& bl) const {
473 encode(*event, bl);
474 }
475
476 const string& id() const {
477 return event->id;
478 }
479 };
480
481 template <class T>
482 class RGWSingletonCR : public RGWCoroutine {
483 friend class WrapperCR;
484
485 boost::asio::coroutine wrapper_state;
486 bool started{false};
487 int operate_ret{0};
488
489 struct WaiterInfo {
490 RGWCoroutine *cr{nullptr};
491 T *result;
492 };
493 using WaiterInfoRef = std::shared_ptr<WaiterInfo>;
494
495 deque<WaiterInfoRef> waiters;
496
497 void add_waiter(RGWCoroutine *cr, T *result) {
498 auto waiter = std::make_shared<WaiterInfo>();
499 waiter->cr = cr;
500 waiter->result = result;
501 waiters.push_back(waiter);
502 };
503
504 bool get_next_waiter(WaiterInfoRef *waiter) {
505 if (waiters.empty()) {
506 waiter->reset();
507 return false;
508 }
509
510 *waiter = waiters.front();
511 waiters.pop_front();
512 return true;
513 }
514
515 int operate_wrapper() override {
516 reenter(&wrapper_state) {
517 while (!is_done()) {
518 ldout(cct, 20) << __func__ << "(): operate_wrapper() -> operate()" << dendl;
519 operate_ret = operate();
520 if (operate_ret < 0) {
521 ldout(cct, 20) << *this << ": operate() returned r=" << operate_ret << dendl;
522 }
523 if (!is_done()) {
524 yield;
525 }
526 }
527
528 ldout(cct, 20) << __func__ << "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters.size() << " waiters" << dendl;
529 /* we're done, can't yield anymore */
530
531 WaiterInfoRef waiter;
532 while (get_next_waiter(&waiter)) {
533 ldout(cct, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl;
534 waiter->cr->set_retcode(retcode);
535 waiter->cr->set_sleeping(false);
536 return_result(waiter->result);
537 put();
538 }
539
540 return retcode;
541 }
542 return 0;
543 }
544
545 virtual void return_result(T *result) {}
546
547 public:
548 RGWSingletonCR(CephContext *_cct)
549 : RGWCoroutine(_cct) {}
550
551 int execute(RGWCoroutine *caller, T *result = nullptr) {
552 if (!started) {
553 ldout(cct, 20) << __func__ << "(): singleton not started, starting" << dendl;
554 started = true;
555 caller->call(this);
556 return 0;
557 } else if (!is_done()) {
558 ldout(cct, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl;
559 get();
560 add_waiter(caller, result);
561 caller->set_sleeping(true);
562 return 0;
563 }
564
565 ldout(cct, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl;
566 caller->set_retcode(retcode);
567 return_result(result);
568 return retcode;
569 }
570 };
571
572
573 class PSSubscription;
574 using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
575
576 class PSSubscription {
577 class InitCR;
578 friend class InitCR;
579 friend class RGWPSHandleObjEventCR;
580
581 RGWDataSyncCtx *sc;
582 RGWDataSyncEnv *sync_env;
583 PSEnvRef env;
584 PSSubConfigRef sub_conf;
585 std::shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result;
586 RGWBucketInfo *bucket_info{nullptr};
587 RGWDataAccessRef data_access;
588 RGWDataAccess::BucketRef bucket;
589
590 InitCR *init_cr{nullptr};
591
592 class InitBucketLifecycleCR : public RGWCoroutine {
593 RGWDataSyncCtx *sc;
594 RGWDataSyncEnv *sync_env;
595 PSConfigRef& conf;
596 LCRule rule;
597
598 int retention_days;
599
600 rgw_bucket_lifecycle_config_params lc_config;
601
602 public:
603 InitBucketLifecycleCR(RGWDataSyncCtx *_sc,
604 PSConfigRef& _conf,
605 RGWBucketInfo& _bucket_info,
606 std::map<string, bufferlist>& _bucket_attrs) : RGWCoroutine(_sc->cct),
607 sc(_sc), sync_env(_sc->env),
608 conf(_conf) {
609 lc_config.bucket_info = _bucket_info;
610 lc_config.bucket_attrs = _bucket_attrs;
611 retention_days = conf->events_retention_days;
612 }
613
614 int operate() override {
615 reenter(this) {
616
617 rule.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days);
618
619 {
620 /* maybe we already have it configured? */
621 RGWLifecycleConfiguration old_config;
622 auto aiter = lc_config.bucket_attrs.find(RGW_ATTR_LC);
623 if (aiter != lc_config.bucket_attrs.end()) {
624 bufferlist::const_iterator iter{&aiter->second};
625 try {
626 old_config.decode(iter);
627 } catch (const buffer::error& e) {
628 ldpp_dout(sync_env->dpp, 0) << __func__ << "(): decode life cycle config failed" << dendl;
629 }
630 }
631
632 auto old_rules = old_config.get_rule_map();
633 for (auto ori : old_rules) {
634 auto& old_rule = ori.second;
635
636 if (old_rule.get_prefix().empty() &&
637 old_rule.get_expiration().get_days() == retention_days &&
638 old_rule.is_enabled()) {
639 ldpp_dout(sync_env->dpp, 20) << "no need to set lifecycle rule on bucket, existing rule matches config" << dendl;
640 return set_cr_done();
641 }
642 }
643 }
644
645 lc_config.config.add_rule(rule);
646 yield call(new RGWBucketLifecycleConfigCR(sync_env->async_rados,
647 sync_env->store,
648 lc_config,
649 sync_env->dpp));
650 if (retcode < 0) {
651 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl;
652 return set_cr_error(retcode);
653 }
654
655 return set_cr_done();
656 }
657 return 0;
658 }
659 };
660
661 class InitCR : public RGWSingletonCR<bool> {
662 RGWDataSyncCtx *sc;
663 RGWDataSyncEnv *sync_env;
664 PSSubscriptionRef sub;
665 rgw_get_bucket_info_params get_bucket_info;
666 rgw_bucket_create_local_params create_bucket;
667 PSConfigRef& conf;
668 PSSubConfigRef& sub_conf;
669 int i;
670
671 public:
672 InitCR(RGWDataSyncCtx *_sc,
673 PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sc->cct),
674 sc(_sc), sync_env(_sc->env),
675 sub(_sub), conf(sub->env->conf),
676 sub_conf(sub->sub_conf) {
677 }
678
679 int operate() override {
680 reenter(this) {
681 get_bucket_info.tenant = conf->user.tenant;
682 get_bucket_info.bucket_name = sub_conf->data_bucket_name;
683 sub->get_bucket_info_result = make_shared<rgw_get_bucket_info_result>();
684
685 for (i = 0; i < 2; ++i) {
686 yield call(new RGWGetBucketInfoCR(sync_env->async_rados,
687 sync_env->store,
688 get_bucket_info,
689 sub->get_bucket_info_result));
690 if (retcode < 0 && retcode != -ENOENT) {
691 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to geting bucket info: " << "tenant="
692 << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
693 }
694 if (retcode == 0) {
695 {
696 auto& result = sub->get_bucket_info_result;
697 sub->bucket_info = &result->bucket_info;
698
699 int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket);
700 if (ret < 0) {
701 ldpp_dout(sync_env->dpp, 1) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl;
702 return set_cr_error(ret);
703 }
704 }
705
706 yield call(new InitBucketLifecycleCR(sc, conf,
707 sub->get_bucket_info_result->bucket_info,
708 sub->get_bucket_info_result->attrs));
709 if (retcode < 0) {
710 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl;
711 return set_cr_error(retcode);
712 }
713
714 return set_cr_done();
715 }
716
717 create_bucket.user_info = sub->env->data_user_info;
718 create_bucket.bucket_name = sub_conf->data_bucket_name;
719 ldpp_dout(sync_env->dpp, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl;
720 yield call(new RGWBucketCreateLocalCR(sync_env->async_rados,
721 sync_env->store,
722 create_bucket,
723 sync_env->dpp));
724 if (retcode < 0) {
725 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create bucket: " << "tenant="
726 << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
727 return set_cr_error(retcode);
728 }
729
730 /* second iteration: we got -ENOENT and created a bucket */
731 }
732
733 /* failed twice on -ENOENT, unexpected */
734 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant
735 << " name=" << get_bucket_info.bucket_name << dendl;
736 return set_cr_error(-EIO);
737 }
738 return 0;
739 }
740 };
741
742 template<typename EventType>
743 class StoreEventCR : public RGWCoroutine {
744 RGWDataSyncCtx* const sc;
745 RGWDataSyncEnv* const sync_env;
746 const PSSubscriptionRef sub;
747 const PSEvent<EventType> pse;
748 const string oid_prefix;
749
750 public:
751 StoreEventCR(RGWDataSyncCtx* const _sc,
752 const PSSubscriptionRef& _sub,
753 const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct),
754 sc(_sc), sync_env(_sc->env),
755 sub(_sub),
756 pse(_event),
757 oid_prefix(sub->sub_conf->data_oid_prefix) {
758 }
759
760 int operate() override {
761 rgw_object_simple_put_params put_obj;
762 reenter(this) {
763
764 put_obj.bucket = sub->bucket;
765 put_obj.key = rgw_obj_key(oid_prefix + pse.id());
766
767 pse.format(&put_obj.data);
768
769 {
770 bufferlist bl;
771 pse.encode_event(bl);
772 bufferlist bl64;
773 bl.encode_base64(bl64);
774 put_obj.user_data = bl64.to_str();
775 }
776
777 yield call(new RGWObjectSimplePutCR(sync_env->async_rados,
778 sync_env->store,
779 put_obj,
780 sync_env->dpp));
781 if (retcode < 0) {
782 ldpp_dout(sync_env->dpp, 10) << "failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
783 return set_cr_error(retcode);
784 } else {
785 ldpp_dout(sync_env->dpp, 20) << "event stored: " << put_obj.bucket << "/" << put_obj.key << dendl;
786 }
787
788 return set_cr_done();
789 }
790 return 0;
791 }
792 };
793
794 template<typename EventType>
795 class PushEventCR : public RGWCoroutine {
796 RGWDataSyncCtx* const sc;
797 RGWDataSyncEnv* const sync_env;
798 const EventRef<EventType> event;
799 const PSSubConfigRef& sub_conf;
800
801 public:
802 PushEventCR(RGWDataSyncCtx* const _sc,
803 const PSSubscriptionRef& _sub,
804 const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct),
805 sc(_sc), sync_env(_sc->env),
806 event(_event),
807 sub_conf(_sub->sub_conf) {
808 }
809
810 int operate() override {
811 reenter(this) {
812 ceph_assert(sub_conf->push_endpoint);
813 yield call(sub_conf->push_endpoint->send_to_completion_async(*event.get(), sync_env));
814
815 if (retcode < 0) {
816 ldout(sync_env->cct, 10) << "failed to push event: " << event->id <<
817 " to endpoint: " << sub_conf->push_endpoint_name << " ret=" << retcode << dendl;
818 return set_cr_error(retcode);
819 }
820
821 ldout(sync_env->cct, 20) << "event: " << event->id <<
822 " pushed to endpoint: " << sub_conf->push_endpoint_name << dendl;
823 return set_cr_done();
824 }
825 return 0;
826 }
827 };
828
829 public:
830 PSSubscription(RGWDataSyncCtx *_sc,
831 PSEnvRef _env,
832 PSSubConfigRef& _sub_conf) : sc(_sc), sync_env(_sc->env),
833 env(_env),
834 sub_conf(_sub_conf),
835 data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {}
836
837 PSSubscription(RGWDataSyncCtx *_sc,
838 PSEnvRef _env,
839 rgw_pubsub_sub_config& user_sub_conf) : sc(_sc), sync_env(_sc->env),
840 env(_env),
841 sub_conf(std::make_shared<PSSubConfig>()),
842 data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {
843 sub_conf->from_user_conf(sync_env->cct, user_sub_conf);
844 }
845 virtual ~PSSubscription() {
846 if (init_cr) {
847 init_cr->put();
848 }
849 }
850
851 template <class C>
852 static PSSubscriptionRef get_shared(RGWDataSyncCtx *_sc,
853 PSEnvRef _env,
854 C& _sub_conf) {
855 auto sub = std::make_shared<PSSubscription>(_sc, _env, _sub_conf);
856 sub->init_cr = new InitCR(_sc, sub);
857 sub->init_cr->get();
858 return sub;
859 }
860
861 int call_init_cr(RGWCoroutine *caller) {
862 return init_cr->execute(caller);
863 }
864
865 template<typename EventType>
866 static RGWCoroutine *store_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
867 return new StoreEventCR<EventType>(sc, sub, event);
868 }
869
870 template<typename EventType>
871 static RGWCoroutine *push_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
872 return new PushEventCR<EventType>(sc, sub, event);
873 }
874 friend class InitCR;
875 };
876
877 class PSManager
878 {
879 RGWDataSyncCtx *sc;
880 RGWDataSyncEnv *sync_env;
881 PSEnvRef env;
882
883 std::map<string, PSSubscriptionRef> subs;
884
885 class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> {
886 RGWDataSyncCtx *sc;
887 RGWDataSyncEnv *sync_env;
888 PSManagerRef mgr;
889 rgw_user owner;
890 string sub_name;
891 string sub_id;
892 PSSubscriptionRef *ref;
893
894 PSConfigRef conf;
895
896 PSSubConfigRef sub_conf;
897 rgw_pubsub_sub_config user_sub_conf;
898
899 public:
900 GetSubCR(RGWDataSyncCtx *_sc,
901 PSManagerRef& _mgr,
902 const rgw_user& _owner,
903 const string& _sub_name,
904 PSSubscriptionRef *_ref) : RGWSingletonCR<PSSubscriptionRef>(_sc->cct),
905 sc(_sc), sync_env(_sc->env),
906 mgr(_mgr),
907 owner(_owner),
908 sub_name(_sub_name),
909 ref(_ref),
910 conf(mgr->env->conf) {
911 }
912 ~GetSubCR() { }
913
914 int operate() override {
915 reenter(this) {
916 if (owner.empty()) {
917 if (!conf->find_sub(sub_name, &sub_conf)) {
918 ldout(sync_env->cct, 10) << "failed to find subscription config: name=" << sub_name << dendl;
919 mgr->remove_get_sub(owner, sub_name);
920 return set_cr_error(-ENOENT);
921 }
922
923 *ref = PSSubscription::get_shared(sc, mgr->env, sub_conf);
924 } else {
925 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
926 yield {
927 RGWPubSub ps(sync_env->store, owner.tenant);
928 rgw_raw_obj obj;
929 ps.get_sub_meta_obj(sub_name, &obj);
930 bool empty_on_enoent = false;
931 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
932 obj,
933 &user_sub_conf, empty_on_enoent));
934 }
935 if (retcode < 0) {
936 mgr->remove_get_sub(owner, sub_name);
937 return set_cr_error(retcode);
938 }
939
940 *ref = PSSubscription::get_shared(sc, mgr->env, user_sub_conf);
941 }
942
943 yield (*ref)->call_init_cr(this);
944 if (retcode < 0) {
945 ldout(sync_env->cct, 10) << "failed to init subscription" << dendl;
946 mgr->remove_get_sub(owner, sub_name);
947 return set_cr_error(retcode);
948 }
949
950 if (owner.empty()) {
951 mgr->subs[sub_name] = *ref;
952 }
953 mgr->remove_get_sub(owner, sub_name);
954
955 return set_cr_done();
956 }
957 return 0;
958 }
959
960 void return_result(PSSubscriptionRef *result) override {
961 ldout(cct, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl;
962 if (retcode >= 0) {
963 *result = *ref;
964 }
965 }
966 };
967
968 string sub_id(const rgw_user& owner, const string& sub_name) {
969 string owner_prefix;
970 if (!owner.empty()) {
971 owner_prefix = owner.to_str() + "/";
972 }
973
974 return owner_prefix + sub_name;
975 }
976
977 std::map<std::string, GetSubCR *> get_subs;
978
979 GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) {
980 return get_subs[sub_id(owner, name)];
981 }
982
983 void remove_get_sub(const rgw_user& owner, const string& name) {
984 get_subs.erase(sub_id(owner, name));
985 }
986
987 bool find_sub_instance(const rgw_user& owner, const string& sub_name, PSSubscriptionRef *sub) {
988 auto iter = subs.find(sub_id(owner, sub_name));
989 if (iter != subs.end()) {
990 *sub = iter->second;
991 return true;
992 }
993 return false;
994 }
995
996 PSManager(RGWDataSyncCtx *_sc,
997 PSEnvRef _env) : sc(_sc), sync_env(_sc->env),
998 env(_env) {}
999
1000 public:
1001 static PSManagerRef get_shared(RGWDataSyncCtx *_sc,
1002 PSEnvRef _env) {
1003 return std::shared_ptr<PSManager>(new PSManager(_sc, _env));
1004 }
1005
1006 static int call_get_subscription_cr(RGWDataSyncCtx *sc, PSManagerRef& mgr,
1007 RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) {
1008 if (mgr->find_sub_instance(owner, sub_name, ref)) {
1009 /* found it! nothing to execute */
1010 ldout(sc->cct, 20) << __func__ << "(): found sub instance" << dendl;
1011 }
1012 auto& gs = mgr->get_get_subs(owner, sub_name);
1013 if (!gs) {
1014 ldout(sc->cct, 20) << __func__ << "(): first get subs" << dendl;
1015 gs = new GetSubCR(sc, mgr, owner, sub_name, ref);
1016 }
1017 ldout(sc->cct, 20) << __func__ << "(): executing get subs" << dendl;
1018 return gs->execute(caller, ref);
1019 }
1020
1021 friend class GetSubCR;
1022 };
1023
1024 void PSEnv::init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr) {
1025 manager = mgr;
1026 conf->init_instance(realm, instance_id);
1027 }
1028
1029 class RGWPSInitEnvCBCR : public RGWCoroutine {
1030 RGWDataSyncCtx *sc;
1031 RGWDataSyncEnv *sync_env;
1032 PSEnvRef env;
1033 PSConfigRef& conf;
1034
1035 rgw_user_create_params create_user;
1036 rgw_get_user_info_params get_user_info;
1037 public:
1038 RGWPSInitEnvCBCR(RGWDataSyncCtx *_sc,
1039 PSEnvRef& _env) : RGWCoroutine(_sc->cct),
1040 sc(_sc), sync_env(_sc->env),
1041 env(_env), conf(env->conf) {}
1042 int operate() override {
1043 reenter(this) {
1044 ldpp_dout(sync_env->dpp, 1) << ": init pubsub config zone=" << sc->source_zone << dendl;
1045
1046 /* nothing to do here right now */
1047 create_user.user = conf->user;
1048 create_user.max_buckets = 0; /* unlimited */
1049 create_user.display_name = "pubsub";
1050 create_user.generate_key = false;
1051 yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user, sync_env->dpp));
1052 if (retcode < 0 && retcode != -ERR_USER_EXIST) {
1053 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
1054 return set_cr_error(retcode);
1055 }
1056
1057 get_user_info.user = conf->user;
1058 yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info));
1059 if (retcode < 0) {
1060 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
1061 return set_cr_error(retcode);
1062 }
1063
1064 ldpp_dout(sync_env->dpp, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env->data_user_info, true) << dendl;
1065
1066
1067 return set_cr_done();
1068 }
1069 return 0;
1070 }
1071 };
1072
1073 bool match(const rgw_pubsub_topic_filter& filter, const std::string& key_name, rgw::notify::EventType event_type) {
1074 if (!match(filter.events, event_type)) {
1075 return false;
1076 }
1077 if (!match(filter.s3_filter.key_filter, key_name)) {
1078 return false;
1079 }
1080 return true;
1081 }
1082
1083 class RGWPSFindBucketTopicsCR : public RGWCoroutine {
1084 RGWDataSyncCtx *sc;
1085 RGWDataSyncEnv *sync_env;
1086 PSEnvRef env;
1087 rgw_user owner;
1088 rgw_bucket bucket;
1089 rgw_obj_key key;
1090 rgw::notify::EventType event_type;
1091
1092 RGWPubSub ps;
1093
1094 rgw_raw_obj bucket_obj;
1095 rgw_raw_obj user_obj;
1096 rgw_pubsub_bucket_topics bucket_topics;
1097 rgw_pubsub_topics user_topics;
1098 TopicsRef *topics;
1099 public:
1100 RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc,
1101 PSEnvRef& _env,
1102 const rgw_user& _owner,
1103 const rgw_bucket& _bucket,
1104 const rgw_obj_key& _key,
1105 rgw::notify::EventType _event_type,
1106 TopicsRef *_topics) : RGWCoroutine(_sc->cct),
1107 sc(_sc), sync_env(_sc->env),
1108 env(_env),
1109 owner(_owner),
1110 bucket(_bucket),
1111 key(_key),
1112 event_type(_event_type),
1113 ps(sync_env->store, owner.tenant),
1114 topics(_topics) {
1115 *topics = std::make_shared<vector<PSTopicConfigRef> >();
1116 }
1117 int operate() override {
1118 reenter(this) {
1119 ps.get_bucket_meta_obj(bucket, &bucket_obj);
1120 ps.get_meta_obj(&user_obj);
1121
1122 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
1123 yield {
1124 bool empty_on_enoent = true;
1125 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
1126 bucket_obj,
1127 &bucket_topics, empty_on_enoent));
1128 }
1129 if (retcode < 0 && retcode != -ENOENT) {
1130 return set_cr_error(retcode);
1131 }
1132
1133 ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
1134
1135 if (!bucket_topics.topics.empty()) {
1136 using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_topics>;
1137 yield {
1138 bool empty_on_enoent = true;
1139 call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
1140 user_obj,
1141 &user_topics, empty_on_enoent));
1142 }
1143 if (retcode < 0 && retcode != -ENOENT) {
1144 return set_cr_error(retcode);
1145 }
1146 }
1147
1148 for (auto& titer : bucket_topics.topics) {
1149 auto& topic_filter = titer.second;
1150 auto& info = topic_filter.topic;
1151 if (!match(topic_filter, key.name, event_type)) {
1152 continue;
1153 }
1154 std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
1155 tc->name = info.name;
1156 tc->subs = user_topics.topics[info.name].subs;
1157 tc->opaque_data = info.opaque_data;
1158 (*topics)->push_back(tc);
1159 }
1160
1161 env->conf->get_topics(sync_env->cct, bucket, key, topics);
1162 return set_cr_done();
1163 }
1164 return 0;
1165 }
1166 };
1167
1168 class RGWPSHandleObjEventCR : public RGWCoroutine {
1169 RGWDataSyncCtx* const sc;
1170 const PSEnvRef env;
1171 const rgw_user& owner;
1172 const EventRef<rgw_pubsub_event> event;
1173 const EventRef<rgw_pubsub_s3_record> record;
1174 const TopicsRef topics;
1175 const std::array<rgw_user, 2> owners;
1176 bool has_subscriptions;
1177 bool event_handled;
1178 bool sub_conf_found;
1179 PSSubscriptionRef sub;
1180 std::array<rgw_user, 2>::const_iterator oiter;
1181 std::vector<PSTopicConfigRef>::const_iterator titer;
1182 std::set<std::string>::const_iterator siter;
1183 int last_sub_conf_error;
1184
1185 public:
1186 RGWPSHandleObjEventCR(RGWDataSyncCtx* const _sc,
1187 const PSEnvRef _env,
1188 const rgw_user& _owner,
1189 const EventRef<rgw_pubsub_event>& _event,
1190 const EventRef<rgw_pubsub_s3_record>& _record,
1191 const TopicsRef& _topics) : RGWCoroutine(_sc->cct),
1192 sc(_sc),
1193 env(_env),
1194 owner(_owner),
1195 event(_event),
1196 record(_record),
1197 topics(_topics),
1198 owners({owner, rgw_user{}}),
1199 has_subscriptions(false),
1200 event_handled(false) {}
1201
1202 int operate() override {
1203 reenter(this) {
1204 ldout(sc->cct, 20) << ": handle event: obj: z=" << sc->source_zone
1205 << " event=" << json_str("event", *event, false)
1206 << " owner=" << owner << dendl;
1207
1208 ldout(sc->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl;
1209
1210 // outside caller should check that
1211 ceph_assert(!topics->empty());
1212
1213 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered);
1214
1215 // loop over all topics related to the bucket/object
1216 for (titer = topics->begin(); titer != topics->end(); ++titer) {
1217 ldout(sc->cct, 20) << ": notification for " << event->source << ": topic=" <<
1218 (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
1219 // loop over all subscriptions of the topic
1220 for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
1221 ldout(sc->cct, 20) << ": subscription: " << *siter << dendl;
1222 has_subscriptions = true;
1223 sub_conf_found = false;
1224 // try to read subscription configuration from global/user cond
1225 // configuration is considered missing only if does not exist in either
1226 for (oiter = owners.begin(); oiter != owners.end(); ++oiter) {
1227 yield PSManager::call_get_subscription_cr(sc, env->manager, this, *oiter, *siter, &sub);
1228 if (retcode < 0) {
1229 if (sub_conf_found) {
1230 // not a real issue, sub conf already found
1231 retcode = 0;
1232 }
1233 last_sub_conf_error = retcode;
1234 continue;
1235 }
1236 sub_conf_found = true;
1237 if (sub->sub_conf->s3_id.empty()) {
1238 // subscription was not made by S3 compatible API
1239 ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
1240 yield call(PSSubscription::store_event_cr(sc, sub, event));
1241 if (retcode < 0) {
1242 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
1243 ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
1244 } else {
1245 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
1246 event_handled = true;
1247 }
1248 if (sub->sub_conf->push_endpoint) {
1249 ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
1250 yield call(PSSubscription::push_event_cr(sc, sub, event));
1251 if (retcode < 0) {
1252 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
1253 ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
1254 } else {
1255 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
1256 event_handled = true;
1257 }
1258 }
1259 } else {
1260 // subscription was made by S3 compatible API
1261 ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
1262 record->configurationId = sub->sub_conf->s3_id;
1263 record->opaque_data = (*titer)->opaque_data;
1264 yield call(PSSubscription::store_event_cr(sc, sub, record));
1265 if (retcode < 0) {
1266 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
1267 ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
1268 } else {
1269 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
1270 event_handled = true;
1271 }
1272 if (sub->sub_conf->push_endpoint) {
1273 ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
1274 yield call(PSSubscription::push_event_cr(sc, sub, record));
1275 if (retcode < 0) {
1276 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
1277 ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
1278 } else {
1279 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
1280 event_handled = true;
1281 }
1282 }
1283 }
1284 }
1285 if (!sub_conf_found) {
1286 // could not find conf for subscription at user or global levels
1287 if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
1288 ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
1289 << " ret=" << last_sub_conf_error << dendl;
1290 if (retcode == -ENOENT) {
1291 // missing subscription info should be reflected back as invalid argument
1292 // and not as missing object
1293 retcode = -EINVAL;
1294 }
1295 }
1296 }
1297 }
1298 if (has_subscriptions && !event_handled) {
1299 // event is considered "lost" of it has subscriptions on any of its topics
1300 // but it was not stored in, or pushed to, any of them
1301 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost);
1302 }
1303 if (retcode < 0) {
1304 return set_cr_error(retcode);
1305 }
1306 return set_cr_done();
1307 }
1308 return 0;
1309 }
1310 };
1311
1312 // coroutine invoked on remote object creation
1313 class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
1314 RGWDataSyncCtx *sc;
1315 rgw_bucket_sync_pipe sync_pipe;
1316 PSEnvRef env;
1317 std::optional<uint64_t> versioned_epoch;
1318 EventRef<rgw_pubsub_event> event;
1319 EventRef<rgw_pubsub_s3_record> record;
1320 TopicsRef topics;
1321 public:
1322 RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
1323 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
1324 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
1325 TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1326 sc(_sc),
1327 sync_pipe(_sync_pipe),
1328 env(_env),
1329 versioned_epoch(_versioned_epoch),
1330 topics(_topics) {
1331 }
1332 int operate() override {
1333 reenter(this) {
1334 ldout(sc->cct, 20) << ": stat of remote obj: z=" << sc->source_zone
1335 << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
1336 << " attrs=" << attrs << dendl;
1337 {
1338 std::vector<std::pair<std::string, std::string> > attrs;
1339 for (auto& attr : attrs) {
1340 std::string k = attr.first;
1341 if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) {
1342 k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
1343 }
1344 attrs.push_back(std::make_pair(k, attr.second));
1345 }
1346 // at this point we don't know whether we need the ceph event or S3 record
1347 // this is why both are created here, once we have information about the
1348 // subscription, we will store/push only the relevant ones
1349 make_event_ref(sc->cct,
1350 sync_pipe.info.source_bs.bucket, key,
1351 mtime, &attrs,
1352 rgw::notify::ObjectCreated, &event);
1353 make_s3_record_ref(sc->cct,
1354 sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
1355 mtime, &attrs,
1356 rgw::notify::ObjectCreated, &record);
1357 }
1358
1359 yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, record, topics));
1360 if (retcode < 0) {
1361 return set_cr_error(retcode);
1362 }
1363 return set_cr_done();
1364 }
1365 return 0;
1366 }
1367 };
1368
1369 class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
1370 rgw_bucket_sync_pipe sync_pipe;
1371 PSEnvRef env;
1372 std::optional<uint64_t> versioned_epoch;
1373 TopicsRef topics;
1374 public:
1375 RGWPSHandleRemoteObjCR(RGWDataSyncCtx *_sc,
1376 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
1377 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
1378 TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1379 sync_pipe(_sync_pipe),
1380 env(_env), versioned_epoch(_versioned_epoch),
1381 topics(_topics) {
1382 }
1383
1384 ~RGWPSHandleRemoteObjCR() override {}
1385
1386 RGWStatRemoteObjCBCR *allocate_callback() override {
1387 return new RGWPSHandleRemoteObjCBCR(sc, sync_pipe, key, env, versioned_epoch, topics);
1388 }
1389 };
1390
1391 class RGWPSHandleObjCreateCR : public RGWCoroutine {
1392 RGWDataSyncCtx *sc;
1393 rgw_bucket_sync_pipe sync_pipe;
1394 rgw_obj_key key;
1395 PSEnvRef env;
1396 std::optional<uint64_t> versioned_epoch;
1397 TopicsRef topics;
1398 public:
1399 RGWPSHandleObjCreateCR(RGWDataSyncCtx *_sc,
1400 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
1401 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch) : RGWCoroutine(_sc->cct),
1402 sc(_sc),
1403 sync_pipe(_sync_pipe),
1404 key(_key),
1405 env(_env),
1406 versioned_epoch(_versioned_epoch) {
1407 }
1408
1409 ~RGWPSHandleObjCreateCR() override {}
1410
1411 int operate() override {
1412 reenter(this) {
1413 yield call(new RGWPSFindBucketTopicsCR(sc, env, sync_pipe.dest_bucket_info.owner,
1414 sync_pipe.info.source_bs.bucket, key,
1415 rgw::notify::ObjectCreated,
1416 &topics));
1417 if (retcode < 0) {
1418 ldout(sc->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
1419 return set_cr_error(retcode);
1420 }
1421 if (topics->empty()) {
1422 ldout(sc->cct, 20) << "no topics found for " << sync_pipe.info.source_bs.bucket << "/" << key << dendl;
1423 return set_cr_done();
1424 }
1425 yield call(new RGWPSHandleRemoteObjCR(sc, sync_pipe, key, env, versioned_epoch, topics));
1426 if (retcode < 0) {
1427 return set_cr_error(retcode);
1428 }
1429 return set_cr_done();
1430 }
1431 return 0;
1432 }
1433 };
1434
1435 // coroutine invoked on remote object deletion
1436 class RGWPSGenericObjEventCBCR : public RGWCoroutine {
1437 RGWDataSyncCtx *sc;
1438 PSEnvRef env;
1439 rgw_user owner;
1440 rgw_bucket bucket;
1441 rgw_obj_key key;
1442 ceph::real_time mtime;
1443 rgw::notify::EventType event_type;
1444 EventRef<rgw_pubsub_event> event;
1445 EventRef<rgw_pubsub_s3_record> record;
1446 TopicsRef topics;
1447 public:
1448 RGWPSGenericObjEventCBCR(RGWDataSyncCtx *_sc,
1449 PSEnvRef _env,
1450 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
1451 rgw::notify::EventType _event_type) : RGWCoroutine(_sc->cct),
1452 sc(_sc),
1453 env(_env),
1454 owner(_sync_pipe.dest_bucket_info.owner),
1455 bucket(_sync_pipe.dest_bucket_info.bucket),
1456 key(_key),
1457 mtime(_mtime), event_type(_event_type) {}
1458 int operate() override {
1459 reenter(this) {
1460 ldout(sc->cct, 20) << ": remove remote obj: z=" << sc->source_zone
1461 << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
1462 yield call(new RGWPSFindBucketTopicsCR(sc, env, owner, bucket, key, event_type, &topics));
1463 if (retcode < 0) {
1464 ldout(sc->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
1465 return set_cr_error(retcode);
1466 }
1467 if (topics->empty()) {
1468 ldout(sc->cct, 20) << "no topics found for " << bucket << "/" << key << dendl;
1469 return set_cr_done();
1470 }
1471 // at this point we don't know whether we need the ceph event or S3 record
1472 // this is why both are created here, once we have information about the
1473 // subscription, we will store/push only the relevant ones
1474 make_event_ref(sc->cct,
1475 bucket, key,
1476 mtime, nullptr,
1477 event_type, &event);
1478 make_s3_record_ref(sc->cct,
1479 bucket, owner, key,
1480 mtime, nullptr,
1481 event_type, &record);
1482 yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, record, topics));
1483 if (retcode < 0) {
1484 return set_cr_error(retcode);
1485 }
1486 return set_cr_done();
1487 }
1488 return 0;
1489 }
1490
1491 };
1492
1493 class RGWPSDataSyncModule : public RGWDataSyncModule {
1494 PSEnvRef env;
1495 PSConfigRef& conf;
1496
1497 public:
1498 RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared<PSEnv>()), conf(env->conf) {
1499 env->init(cct, config);
1500 }
1501
1502 ~RGWPSDataSyncModule() override {}
1503
1504 void init(RGWDataSyncCtx *sc, uint64_t instance_id) override {
1505 auto sync_env = sc->env;
1506 PSManagerRef mgr = PSManager::get_shared(sc, env);
1507 env->init_instance(sync_env->svc->zone->get_realm(), instance_id, mgr);
1508 }
1509
1510 RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override {
1511 ldout(sc->cct, 5) << conf->id << ": start" << dendl;
1512 return new RGWPSInitEnvCBCR(sc, env);
1513 }
1514
1515 RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
1516 rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
1517 ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe <<
1518 " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
1519 return new RGWPSHandleObjCreateCR(sc, sync_pipe, key, env, versioned_epoch);
1520 }
1521
1522 RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
1523 rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
1524 ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe <<
1525 " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1526 return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete);
1527 }
1528
1529 RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
1530 rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
1531 ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe <<
1532 " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1533 return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated);
1534 }
1535
1536 PSConfigRef& get_conf() { return conf; }
1537 };
1538
1539 RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config)
1540 {
1541 data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config));
1542 const std::string jconf = json_str("conf", *data_handler->get_conf());
1543 JSONParser p;
1544 if (!p.parse(jconf.c_str(), jconf.size())) {
1545 ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
1546 effective_conf = config;
1547 } else {
1548 effective_conf.decode_json(&p);
1549 }
1550 }
1551
1552 RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()
1553 {
1554 return data_handler.get();
1555 }
1556
1557 RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
1558 if (dialect != RGW_REST_S3) {
1559 return orig;
1560 }
1561 return new RGWRESTMgr_PubSub();
1562 }
1563
1564 bool RGWPSSyncModuleInstance::should_full_sync() const {
1565 return data_handler->get_conf()->start_with_full_sync;
1566 }
1567
1568 int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
1569 instance->reset(new RGWPSSyncModuleInstance(cct, config));
1570 return 0;
1571 }
1572
1573