]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sync_module_pubsub.cc
import ceph 15.2.10
[ceph.git] / ceph / src / rgw / rgw_sync_module_pubsub.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4#include "services/svc_zone.h"
11fdf7f2
TL
5#include "rgw_common.h"
6#include "rgw_coroutine.h"
7#include "rgw_sync_module.h"
8#include "rgw_data_sync.h"
9#include "rgw_sync_module_pubsub.h"
10#include "rgw_sync_module_pubsub_rest.h"
11#include "rgw_rest_conn.h"
12#include "rgw_cr_rados.h"
13#include "rgw_cr_rest.h"
14#include "rgw_cr_tools.h"
15#include "rgw_op.h"
16#include "rgw_pubsub.h"
17#include "rgw_pubsub_push.h"
eafe8130 18#include "rgw_notify_event_type.h"
11fdf7f2 19#include "rgw_perf_counters.h"
eafe8130
TL
20#ifdef WITH_RADOSGW_AMQP_ENDPOINT
21#include "rgw_amqp.h"
22#endif
9f95a23c
TL
23#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
24#include "rgw_kafka.h"
25#endif
11fdf7f2 26
eafe8130 27#include <boost/algorithm/hex.hpp>
11fdf7f2
TL
28#include <boost/asio/yield.hpp>
29
30#define dout_subsys ceph_subsys_rgw
31
32
33#define PUBSUB_EVENTS_RETENTION_DEFAULT 7
34
35/*
36
37config:
38
39{
40 "tenant": <tenant>, # default: <empty>
41 "uid": <uid>, # default: "pubsub"
42 "data_bucket_prefix": <prefix> # default: "pubsub-"
43 "data_oid_prefix": <prefix> #
eafe8130
TL
44 "events_retention_days": <int> # default: 7
45 "start_with_full_sync" <bool> # default: false
11fdf7f2
TL
46
47 # non-dynamic config
48 "notifications": [
49 {
50 "path": <notification-path>, # this can be either an explicit path: <bucket>, or <bucket>/<object>,
51 # or a prefix if it ends with a wildcard
52 "topic": <topic-name>
53 },
54 ...
55 ],
56 "subscriptions": [
57 {
58 "name": <subscription-name>,
59 "topic": <topic>,
60 "push_endpoint": <endpoint>,
eafe8130 61 "push_endpoint_args:" <arg list>. # any push endpoint specific args (include all args)
11fdf7f2
TL
62 "data_bucket": <bucket>, # override name of bucket where subscription data will be store
63 "data_oid_prefix": <prefix> # set prefix for subscription data object ids
eafe8130 64 "s3_id": <id> # in case of S3 compatible notifications, the notification ID will be set here
11fdf7f2
TL
65 },
66 ...
67 ]
68}
69
70*/
71
72// utility function to convert the args list from string format
73// (ampresend separated with equal sign) to prased structure
74RGWHTTPArgs string_to_args(const std::string& str_args) {
75 RGWHTTPArgs args;
76 args.set(str_args);
77 args.parse();
78 return args;
79}
80
eafe8130
TL
81struct PSSubConfig {
82 std::string name;
83 std::string topic;
84 std::string push_endpoint_name;
85 std::string push_endpoint_args;
86 std::string data_bucket_name;
87 std::string data_oid_prefix;
88 std::string s3_id;
89 std::string arn_topic;
11fdf7f2
TL
90 RGWPubSubEndpoint::Ptr push_endpoint;
91
11fdf7f2
TL
92 void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc) {
93 name = uc.name;
94 topic = uc.topic;
95 push_endpoint_name = uc.dest.push_endpoint;
96 data_bucket_name = uc.dest.bucket_name;
97 data_oid_prefix = uc.dest.oid_prefix;
eafe8130
TL
98 s3_id = uc.s3_id;
99 arn_topic = uc.dest.arn_topic;
100 if (!push_endpoint_name.empty()) {
11fdf7f2
TL
101 push_endpoint_args = uc.dest.push_endpoint_args;
102 try {
eafe8130
TL
103 push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
104 ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
11fdf7f2 105 } catch (const RGWPubSubEndpoint::configuration_error& e) {
eafe8130 106 ldout(cct, 1) << "ERROR: failed to create push endpoint: "
11fdf7f2
TL
107 << push_endpoint_name << " due to: " << e.what() << dendl;
108 }
109 }
110 }
111
112 void dump(Formatter *f) const {
113 encode_json("name", name, f);
114 encode_json("topic", topic, f);
115 encode_json("push_endpoint", push_endpoint_name, f);
eafe8130 116 encode_json("push_endpoint_args", push_endpoint_args, f);
11fdf7f2
TL
117 encode_json("data_bucket_name", data_bucket_name, f);
118 encode_json("data_oid_prefix", data_oid_prefix, f);
eafe8130 119 encode_json("s3_id", s3_id, f);
11fdf7f2
TL
120 }
121
122 void init(CephContext *cct, const JSONFormattable& config,
123 const string& data_bucket_prefix,
124 const string& default_oid_prefix) {
125 name = config["name"];
126 topic = config["topic"];
127 push_endpoint_name = config["push_endpoint"];
128 string default_bucket_name = data_bucket_prefix + name;
129 data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
130 data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str());
eafe8130
TL
131 s3_id = config["s3_id"];
132 arn_topic = config["arn_topic"];
11fdf7f2
TL
133 if (!push_endpoint_name.empty()) {
134 push_endpoint_args = config["push_endpoint_args"];
135 try {
eafe8130
TL
136 push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
137 ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
11fdf7f2 138 } catch (const RGWPubSubEndpoint::configuration_error& e) {
eafe8130 139 ldout(cct, 1) << "ERROR: failed to create push endpoint: "
11fdf7f2
TL
140 << push_endpoint_name << " due to: " << e.what() << dendl;
141 }
142 }
143 }
144};
145
146using PSSubConfigRef = std::shared_ptr<PSSubConfig>;
147
148struct PSTopicConfig {
eafe8130
TL
149 std::string name;
150 std::set<std::string> subs;
9f95a23c 151 std::string opaque_data;
11fdf7f2
TL
152
153 void dump(Formatter *f) const {
154 encode_json("name", name, f);
155 encode_json("subs", subs, f);
9f95a23c 156 encode_json("opaque", opaque_data, f);
11fdf7f2
TL
157 }
158};
159
160struct PSNotificationConfig {
161 uint64_t id{0};
162 string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */
163 string topic;
164 bool is_prefix{false};
165
166
167 void dump(Formatter *f) const {
168 encode_json("id", id, f);
169 encode_json("path", path, f);
170 encode_json("topic", topic, f);
171 encode_json("is_prefix", is_prefix, f);
172 }
173
174 void init(CephContext *cct, const JSONFormattable& config) {
175 path = config["path"];
176 if (!path.empty() && path[path.size() - 1] == '*') {
177 path = path.substr(0, path.size() - 1);
178 is_prefix = true;
179 }
180 topic = config["topic"];
181 }
182};
183
184template<class T>
185static string json_str(const char *name, const T& obj, bool pretty = false)
186{
187 stringstream ss;
188 JSONFormatter f(pretty);
189
190 encode_json(name, obj, &f);
191 f.flush(ss);
192
193 return ss.str();
194}
195
196using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
eafe8130 197using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
11fdf7f2
TL
198
199struct PSConfig {
9f95a23c 200 const std::string id{"pubsub"};
11fdf7f2 201 rgw_user user;
9f95a23c
TL
202 std::string data_bucket_prefix;
203 std::string data_oid_prefix;
11fdf7f2
TL
204
205 int events_retention_days{0};
206
207 uint64_t sync_instance{0};
208 uint64_t max_id{0};
209
210 /* FIXME: no hard coded buckets, we'll have configurable topics */
eafe8130
TL
211 std::map<std::string, PSSubConfigRef> subs;
212 std::map<std::string, PSTopicConfigRef> topics;
213 std::multimap<std::string, PSNotificationConfig> notifications;
214
215 bool start_with_full_sync{false};
11fdf7f2
TL
216
217 void dump(Formatter *f) const {
218 encode_json("id", id, f);
219 encode_json("user", user, f);
220 encode_json("data_bucket_prefix", data_bucket_prefix, f);
221 encode_json("data_oid_prefix", data_oid_prefix, f);
222 encode_json("events_retention_days", events_retention_days, f);
223 encode_json("sync_instance", sync_instance, f);
224 encode_json("max_id", max_id, f);
225 {
226 Formatter::ArraySection section(*f, "subs");
227 for (auto& sub : subs) {
228 encode_json("sub", *sub.second, f);
229 }
230 }
231 {
232 Formatter::ArraySection section(*f, "topics");
233 for (auto& topic : topics) {
234 encode_json("topic", *topic.second, f);
235 }
236 }
237 {
238 Formatter::ObjectSection section(*f, "notifications");
9f95a23c 239 std::string last;
11fdf7f2
TL
240 for (auto& notif : notifications) {
241 const string& n = notif.first;
242 if (n != last) {
243 if (!last.empty()) {
244 f->close_section();
245 }
246 f->open_array_section(n.c_str());
247 }
248 last = n;
249 encode_json("notifications", notif.second, f);
250 }
251 if (!last.empty()) {
252 f->close_section();
253 }
254 }
eafe8130 255 encode_json("start_with_full_sync", start_with_full_sync, f);
11fdf7f2
TL
256 }
257
258 void init(CephContext *cct, const JSONFormattable& config) {
259 string uid = config["uid"]("pubsub");
260 user = rgw_user(config["tenant"], uid);
261 data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
262 data_oid_prefix = config["data_oid_prefix"];
263 events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT);
264
265 for (auto& c : config["notifications"].array()) {
266 PSNotificationConfig nc;
267 nc.id = ++max_id;
268 nc.init(cct, c);
269 notifications.insert(std::make_pair(nc.path, nc));
270
271 PSTopicConfig topic_config = { .name = nc.topic };
272 topics[nc.topic] = make_shared<PSTopicConfig>(topic_config);
273 }
274 for (auto& c : config["subscriptions"].array()) {
275 auto sc = std::make_shared<PSSubConfig>();
276 sc->init(cct, c, data_bucket_prefix, data_oid_prefix);
277 subs[sc->name] = sc;
278 auto iter = topics.find(sc->topic);
279 if (iter != topics.end()) {
280 iter->second->subs.insert(sc->name);
281 }
282 }
eafe8130 283 start_with_full_sync = config["start_with_full_sync"](false);
11fdf7f2
TL
284
285 ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
286 }
287
288 void init_instance(const RGWRealm& realm, uint64_t instance_id) {
289 sync_instance = instance_id;
290 }
291
292 void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
9f95a23c 293 const std::string path = bucket.name + "/" + key.name;
11fdf7f2
TL
294
295 auto iter = notifications.upper_bound(path);
296 if (iter == notifications.begin()) {
297 return;
298 }
299
300 do {
301 --iter;
302 if (iter->first.size() > path.size()) {
303 break;
304 }
305 if (path.compare(0, iter->first.size(), iter->first) != 0) {
306 break;
307 }
308
309 PSNotificationConfig& target = iter->second;
310
311 if (!target.is_prefix &&
312 path.size() != iter->first.size()) {
313 continue;
314 }
315
316 auto topic = topics.find(target.topic);
317 if (topic == topics.end()) {
318 continue;
319 }
320
eafe8130
TL
321 ldout(cct, 20) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id <<
322 " target_path=" << target.path << ", topic=" << target.topic << dendl;
11fdf7f2
TL
323 (*result)->push_back(topic->second);
324 } while (iter != notifications.begin());
325 }
326
327 bool find_sub(const string& name, PSSubConfigRef *ref) {
328 auto iter = subs.find(name);
329 if (iter != subs.end()) {
330 *ref = iter->second;
331 return true;
332 }
333 return false;
334 }
335};
336
11fdf7f2 337using PSConfigRef = std::shared_ptr<PSConfig>;
eafe8130
TL
338template<typename EventType>
339using EventRef = std::shared_ptr<EventType>;
11fdf7f2
TL
340
341struct objstore_event {
342 string id;
343 const rgw_bucket& bucket;
344 const rgw_obj_key& key;
345 const ceph::real_time& mtime;
346 const std::vector<std::pair<std::string, std::string> > *attrs;
347
348 objstore_event(const rgw_bucket& _bucket,
349 const rgw_obj_key& _key,
350 const ceph::real_time& _mtime,
351 const std::vector<std::pair<std::string, std::string> > *_attrs) : bucket(_bucket),
352 key(_key),
353 mtime(_mtime),
354 attrs(_attrs) {}
355
356 string get_hash() {
357 string etag;
358 RGWMD5Etag hash;
359 hash.update(bucket.bucket_id);
360 hash.update(key.name);
361 hash.update(key.instance);
362 hash.finish(&etag);
363
364 assert(etag.size() > 8);
365
366 return etag.substr(0, 8);
367 }
368
369 void dump(Formatter *f) const {
370 {
371 Formatter::ObjectSection s(*f, "bucket");
372 encode_json("name", bucket.name, f);
373 encode_json("tenant", bucket.tenant, f);
374 encode_json("bucket_id", bucket.bucket_id, f);
375 }
376 {
377 Formatter::ObjectSection s(*f, "key");
378 encode_json("name", key.name, f);
379 encode_json("instance", key.instance, f);
380 }
381 utime_t mt(mtime);
382 encode_json("mtime", mt, f);
383 Formatter::ObjectSection s(*f, "attrs");
384 if (attrs) {
385 for (auto& attr : *attrs) {
386 encode_json(attr.first.c_str(), attr.second.c_str(), f);
387 }
388 }
389 }
390};
391
392static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
393 const rgw_obj_key& key,
394 const ceph::real_time& mtime,
395 const std::vector<std::pair<std::string, std::string> > *attrs,
eafe8130
TL
396 rgw::notify::EventType event_type,
397 EventRef<rgw_pubsub_event> *event) {
11fdf7f2
TL
398 *event = std::make_shared<rgw_pubsub_event>();
399
eafe8130
TL
400 EventRef<rgw_pubsub_event>& e = *event;
401 e->event_name = rgw::notify::to_ceph_string(event_type);
11fdf7f2
TL
402 e->source = bucket.name + "/" + key.name;
403 e->timestamp = real_clock::now();
404
405 objstore_event oevent(bucket, key, mtime, attrs);
406
eafe8130
TL
407 const utime_t ts(e->timestamp);
408 set_event_id(e->id, oevent.get_hash(), ts);
11fdf7f2
TL
409
410 encode_json("info", oevent, &e->info);
411}
412
eafe8130
TL
413static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket,
414 const rgw_user& owner,
415 const rgw_obj_key& key,
416 const ceph::real_time& mtime,
417 const std::vector<std::pair<std::string, std::string> > *attrs,
418 rgw::notify::EventType event_type,
419 EventRef<rgw_pubsub_s3_record> *record) {
420 *record = std::make_shared<rgw_pubsub_s3_record>();
421
422 EventRef<rgw_pubsub_s3_record>& r = *record;
eafe8130
TL
423 r->eventTime = mtime;
424 r->eventName = rgw::notify::to_string(event_type);
92f5a8d4
TL
425 // userIdentity: not supported in sync module
426 // x_amz_request_id: not supported in sync module
427 // x_amz_id_2: not supported in sync module
eafe8130
TL
428 // configurationId is filled from subscription configuration
429 r->bucket_name = bucket.name;
430 r->bucket_ownerIdentity = owner.to_str();
431 r->bucket_arn = to_string(rgw::ARN(bucket));
432 r->bucket_id = bucket.bucket_id; // rgw extension
433 r->object_key = key.name;
92f5a8d4 434 // object_size not supported in sync module
eafe8130
TL
435 objstore_event oevent(bucket, key, mtime, attrs);
436 r->object_etag = oevent.get_hash();
437 r->object_versionId = key.instance;
438
439 // use timestamp as per key sequence id (hex encoded)
440 const utime_t ts(real_clock::now());
441 boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
442 std::back_inserter(r->object_sequencer));
443
eafe8130
TL
444 set_event_id(r->id, r->object_etag, ts);
445}
446
11fdf7f2
TL
447class PSManager;
448using PSManagerRef = std::shared_ptr<PSManager>;
449
450struct PSEnv {
451 PSConfigRef conf;
452 shared_ptr<RGWUserInfo> data_user_info;
453 PSManagerRef manager;
454
455 PSEnv() : conf(make_shared<PSConfig>()),
456 data_user_info(make_shared<RGWUserInfo>()) {}
457
458 void init(CephContext *cct, const JSONFormattable& config) {
459 conf->init(cct, config);
460 }
461
462 void init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr);
463};
464
465using PSEnvRef = std::shared_ptr<PSEnv>;
466
eafe8130 467template<typename EventType>
11fdf7f2 468class PSEvent {
eafe8130 469 const EventRef<EventType> event;
11fdf7f2
TL
470
471public:
eafe8130 472 PSEvent(const EventRef<EventType>& _event) : event(_event) {}
11fdf7f2
TL
473
474 void format(bufferlist *bl) const {
92f5a8d4 475 bl->append(json_str("", *event));
11fdf7f2
TL
476 }
477
478 void encode_event(bufferlist& bl) const {
479 encode(*event, bl);
480 }
481
482 const string& id() const {
483 return event->id;
484 }
485};
486
487template <class T>
488class RGWSingletonCR : public RGWCoroutine {
489 friend class WrapperCR;
490
491 boost::asio::coroutine wrapper_state;
492 bool started{false};
493 int operate_ret{0};
494
495 struct WaiterInfo {
496 RGWCoroutine *cr{nullptr};
497 T *result;
498 };
499 using WaiterInfoRef = std::shared_ptr<WaiterInfo>;
500
501 deque<WaiterInfoRef> waiters;
502
503 void add_waiter(RGWCoroutine *cr, T *result) {
504 auto waiter = std::make_shared<WaiterInfo>();
505 waiter->cr = cr;
506 waiter->result = result;
507 waiters.push_back(waiter);
508 };
509
510 bool get_next_waiter(WaiterInfoRef *waiter) {
511 if (waiters.empty()) {
512 waiter->reset();
513 return false;
514 }
515
516 *waiter = waiters.front();
517 waiters.pop_front();
518 return true;
519 }
520
521 int operate_wrapper() override {
522 reenter(&wrapper_state) {
523 while (!is_done()) {
524 ldout(cct, 20) << __func__ << "(): operate_wrapper() -> operate()" << dendl;
525 operate_ret = operate();
526 if (operate_ret < 0) {
527 ldout(cct, 20) << *this << ": operate() returned r=" << operate_ret << dendl;
528 }
529 if (!is_done()) {
530 yield;
531 }
532 }
533
534 ldout(cct, 20) << __func__ << "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters.size() << " waiters" << dendl;
535 /* we're done, can't yield anymore */
536
537 WaiterInfoRef waiter;
538 while (get_next_waiter(&waiter)) {
539 ldout(cct, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl;
540 waiter->cr->set_retcode(retcode);
541 waiter->cr->set_sleeping(false);
542 return_result(waiter->result);
543 put();
544 }
545
546 return retcode;
547 }
548 return 0;
549 }
550
551 virtual void return_result(T *result) {}
552
553public:
554 RGWSingletonCR(CephContext *_cct)
555 : RGWCoroutine(_cct) {}
556
557 int execute(RGWCoroutine *caller, T *result = nullptr) {
558 if (!started) {
559 ldout(cct, 20) << __func__ << "(): singleton not started, starting" << dendl;
560 started = true;
561 caller->call(this);
562 return 0;
563 } else if (!is_done()) {
564 ldout(cct, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl;
565 get();
566 add_waiter(caller, result);
567 caller->set_sleeping(true);
568 return 0;
569 }
570
571 ldout(cct, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl;
572 caller->set_retcode(retcode);
573 return_result(result);
574 return retcode;
575 }
576};
577
578
579class PSSubscription;
580using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
581
582class PSSubscription {
583 class InitCR;
584 friend class InitCR;
585 friend class RGWPSHandleObjEventCR;
586
9f95a23c 587 RGWDataSyncCtx *sc;
11fdf7f2
TL
588 RGWDataSyncEnv *sync_env;
589 PSEnvRef env;
590 PSSubConfigRef sub_conf;
eafe8130 591 std::shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result;
11fdf7f2
TL
592 RGWBucketInfo *bucket_info{nullptr};
593 RGWDataAccessRef data_access;
594 RGWDataAccess::BucketRef bucket;
595
11fdf7f2
TL
596 InitCR *init_cr{nullptr};
597
598 class InitBucketLifecycleCR : public RGWCoroutine {
9f95a23c 599 RGWDataSyncCtx *sc;
11fdf7f2
TL
600 RGWDataSyncEnv *sync_env;
601 PSConfigRef& conf;
602 LCRule rule;
603
604 int retention_days;
605
606 rgw_bucket_lifecycle_config_params lc_config;
607
608 public:
9f95a23c 609 InitBucketLifecycleCR(RGWDataSyncCtx *_sc,
11fdf7f2
TL
610 PSConfigRef& _conf,
611 RGWBucketInfo& _bucket_info,
9f95a23c
TL
612 std::map<string, bufferlist>& _bucket_attrs) : RGWCoroutine(_sc->cct),
613 sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
614 conf(_conf) {
615 lc_config.bucket_info = _bucket_info;
616 lc_config.bucket_attrs = _bucket_attrs;
617 retention_days = conf->events_retention_days;
618 }
619
620 int operate() override {
621 reenter(this) {
622
623 rule.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days);
624
625 {
626 /* maybe we already have it configured? */
627 RGWLifecycleConfiguration old_config;
628 auto aiter = lc_config.bucket_attrs.find(RGW_ATTR_LC);
629 if (aiter != lc_config.bucket_attrs.end()) {
630 bufferlist::const_iterator iter{&aiter->second};
631 try {
632 old_config.decode(iter);
633 } catch (const buffer::error& e) {
9f95a23c 634 ldpp_dout(sync_env->dpp, 0) << __func__ << "(): decode life cycle config failed" << dendl;
11fdf7f2
TL
635 }
636 }
637
638 auto old_rules = old_config.get_rule_map();
639 for (auto ori : old_rules) {
640 auto& old_rule = ori.second;
641
642 if (old_rule.get_prefix().empty() &&
643 old_rule.get_expiration().get_days() == retention_days &&
644 old_rule.is_enabled()) {
9f95a23c 645 ldpp_dout(sync_env->dpp, 20) << "no need to set lifecycle rule on bucket, existing rule matches config" << dendl;
11fdf7f2
TL
646 return set_cr_done();
647 }
648 }
649 }
650
651 lc_config.config.add_rule(rule);
652 yield call(new RGWBucketLifecycleConfigCR(sync_env->async_rados,
653 sync_env->store,
9f95a23c
TL
654 lc_config,
655 sync_env->dpp));
11fdf7f2 656 if (retcode < 0) {
9f95a23c 657 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl;
11fdf7f2
TL
658 return set_cr_error(retcode);
659 }
660
661 return set_cr_done();
662 }
663 return 0;
664 }
665 };
666
667 class InitCR : public RGWSingletonCR<bool> {
9f95a23c 668 RGWDataSyncCtx *sc;
11fdf7f2
TL
669 RGWDataSyncEnv *sync_env;
670 PSSubscriptionRef sub;
671 rgw_get_bucket_info_params get_bucket_info;
672 rgw_bucket_create_local_params create_bucket;
673 PSConfigRef& conf;
674 PSSubConfigRef& sub_conf;
675 int i;
676
677 public:
9f95a23c
TL
678 InitCR(RGWDataSyncCtx *_sc,
679 PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sc->cct),
680 sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
681 sub(_sub), conf(sub->env->conf),
682 sub_conf(sub->sub_conf) {
683 }
684
685 int operate() override {
686 reenter(this) {
687 get_bucket_info.tenant = conf->user.tenant;
688 get_bucket_info.bucket_name = sub_conf->data_bucket_name;
689 sub->get_bucket_info_result = make_shared<rgw_get_bucket_info_result>();
690
691 for (i = 0; i < 2; ++i) {
692 yield call(new RGWGetBucketInfoCR(sync_env->async_rados,
693 sync_env->store,
694 get_bucket_info,
695 sub->get_bucket_info_result));
696 if (retcode < 0 && retcode != -ENOENT) {
9f95a23c 697 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to geting bucket info: " << "tenant="
11fdf7f2
TL
698 << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
699 }
700 if (retcode == 0) {
701 {
702 auto& result = sub->get_bucket_info_result;
703 sub->bucket_info = &result->bucket_info;
704
705 int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket);
706 if (ret < 0) {
9f95a23c 707 ldpp_dout(sync_env->dpp, 1) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl;
11fdf7f2
TL
708 return set_cr_error(ret);
709 }
710 }
711
9f95a23c 712 yield call(new InitBucketLifecycleCR(sc, conf,
11fdf7f2
TL
713 sub->get_bucket_info_result->bucket_info,
714 sub->get_bucket_info_result->attrs));
715 if (retcode < 0) {
9f95a23c 716 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl;
11fdf7f2
TL
717 return set_cr_error(retcode);
718 }
719
720 return set_cr_done();
721 }
722
723 create_bucket.user_info = sub->env->data_user_info;
724 create_bucket.bucket_name = sub_conf->data_bucket_name;
9f95a23c 725 ldpp_dout(sync_env->dpp, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl;
11fdf7f2
TL
726 yield call(new RGWBucketCreateLocalCR(sync_env->async_rados,
727 sync_env->store,
9f95a23c
TL
728 create_bucket,
729 sync_env->dpp));
11fdf7f2 730 if (retcode < 0) {
9f95a23c 731 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create bucket: " << "tenant="
11fdf7f2
TL
732 << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
733 return set_cr_error(retcode);
734 }
735
736 /* second iteration: we got -ENOENT and created a bucket */
737 }
738
739 /* failed twice on -ENOENT, unexpected */
9f95a23c 740 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant
11fdf7f2
TL
741 << " name=" << get_bucket_info.bucket_name << dendl;
742 return set_cr_error(-EIO);
743 }
744 return 0;
745 }
746 };
747
eafe8130 748 template<typename EventType>
11fdf7f2 749 class StoreEventCR : public RGWCoroutine {
9f95a23c 750 RGWDataSyncCtx* const sc;
11fdf7f2
TL
751 RGWDataSyncEnv* const sync_env;
752 const PSSubscriptionRef sub;
eafe8130 753 const PSEvent<EventType> pse;
11fdf7f2
TL
754 const string oid_prefix;
755
756 public:
9f95a23c 757 StoreEventCR(RGWDataSyncCtx* const _sc,
11fdf7f2 758 const PSSubscriptionRef& _sub,
9f95a23c
TL
759 const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct),
760 sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
761 sub(_sub),
762 pse(_event),
763 oid_prefix(sub->sub_conf->data_oid_prefix) {
764 }
765
766 int operate() override {
11fdf7f2
TL
767 rgw_object_simple_put_params put_obj;
768 reenter(this) {
769
770 put_obj.bucket = sub->bucket;
771 put_obj.key = rgw_obj_key(oid_prefix + pse.id());
772
773 pse.format(&put_obj.data);
774
775 {
776 bufferlist bl;
777 pse.encode_event(bl);
778 bufferlist bl64;
779 bl.encode_base64(bl64);
780 put_obj.user_data = bl64.to_str();
781 }
782
783 yield call(new RGWObjectSimplePutCR(sync_env->async_rados,
784 sync_env->store,
9f95a23c
TL
785 put_obj,
786 sync_env->dpp));
11fdf7f2 787 if (retcode < 0) {
eafe8130 788 ldpp_dout(sync_env->dpp, 10) << "failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
11fdf7f2
TL
789 return set_cr_error(retcode);
790 } else {
eafe8130 791 ldpp_dout(sync_env->dpp, 20) << "event stored: " << put_obj.bucket << "/" << put_obj.key << dendl;
11fdf7f2
TL
792 }
793
794 return set_cr_done();
795 }
796 return 0;
797 }
798 };
799
eafe8130 800 template<typename EventType>
11fdf7f2 801 class PushEventCR : public RGWCoroutine {
9f95a23c 802 RGWDataSyncCtx* const sc;
11fdf7f2 803 RGWDataSyncEnv* const sync_env;
eafe8130 804 const EventRef<EventType> event;
11fdf7f2
TL
805 const PSSubConfigRef& sub_conf;
806
807 public:
9f95a23c 808 PushEventCR(RGWDataSyncCtx* const _sc,
11fdf7f2 809 const PSSubscriptionRef& _sub,
9f95a23c
TL
810 const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct),
811 sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
812 event(_event),
813 sub_conf(_sub->sub_conf) {
814 }
815
816 int operate() override {
817 reenter(this) {
818 ceph_assert(sub_conf->push_endpoint);
819 yield call(sub_conf->push_endpoint->send_to_completion_async(*event.get(), sync_env));
820
821 if (retcode < 0) {
eafe8130 822 ldout(sync_env->cct, 10) << "failed to push event: " << event->id <<
11fdf7f2 823 " to endpoint: " << sub_conf->push_endpoint_name << " ret=" << retcode << dendl;
11fdf7f2
TL
824 return set_cr_error(retcode);
825 }
826
eafe8130 827 ldout(sync_env->cct, 20) << "event: " << event->id <<
11fdf7f2 828 " pushed to endpoint: " << sub_conf->push_endpoint_name << dendl;
11fdf7f2
TL
829 return set_cr_done();
830 }
831 return 0;
832 }
833 };
834
835public:
9f95a23c 836 PSSubscription(RGWDataSyncCtx *_sc,
11fdf7f2 837 PSEnvRef _env,
9f95a23c 838 PSSubConfigRef& _sub_conf) : sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
839 env(_env),
840 sub_conf(_sub_conf),
841 data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {}
842
9f95a23c 843 PSSubscription(RGWDataSyncCtx *_sc,
11fdf7f2 844 PSEnvRef _env,
9f95a23c 845 rgw_pubsub_sub_config& user_sub_conf) : sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
846 env(_env),
847 sub_conf(std::make_shared<PSSubConfig>()),
848 data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {
849 sub_conf->from_user_conf(sync_env->cct, user_sub_conf);
850 }
851 virtual ~PSSubscription() {
852 if (init_cr) {
853 init_cr->put();
854 }
855 }
856
857 template <class C>
9f95a23c 858 static PSSubscriptionRef get_shared(RGWDataSyncCtx *_sc,
11fdf7f2
TL
859 PSEnvRef _env,
860 C& _sub_conf) {
9f95a23c
TL
861 auto sub = std::make_shared<PSSubscription>(_sc, _env, _sub_conf);
862 sub->init_cr = new InitCR(_sc, sub);
11fdf7f2
TL
863 sub->init_cr->get();
864 return sub;
865 }
866
867 int call_init_cr(RGWCoroutine *caller) {
868 return init_cr->execute(caller);
869 }
870
eafe8130 871 template<typename EventType>
9f95a23c
TL
872 static RGWCoroutine *store_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
873 return new StoreEventCR<EventType>(sc, sub, event);
11fdf7f2
TL
874 }
875
eafe8130 876 template<typename EventType>
9f95a23c
TL
877 static RGWCoroutine *push_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
878 return new PushEventCR<EventType>(sc, sub, event);
11fdf7f2
TL
879 }
880 friend class InitCR;
881};
882
11fdf7f2
TL
883class PSManager
884{
9f95a23c 885 RGWDataSyncCtx *sc;
11fdf7f2
TL
886 RGWDataSyncEnv *sync_env;
887 PSEnvRef env;
888
eafe8130 889 std::map<string, PSSubscriptionRef> subs;
11fdf7f2
TL
890
891 class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> {
9f95a23c 892 RGWDataSyncCtx *sc;
11fdf7f2
TL
893 RGWDataSyncEnv *sync_env;
894 PSManagerRef mgr;
895 rgw_user owner;
896 string sub_name;
897 string sub_id;
898 PSSubscriptionRef *ref;
899
900 PSConfigRef conf;
901
902 PSSubConfigRef sub_conf;
903 rgw_pubsub_sub_config user_sub_conf;
eafe8130 904
11fdf7f2 905 public:
9f95a23c 906 GetSubCR(RGWDataSyncCtx *_sc,
11fdf7f2
TL
907 PSManagerRef& _mgr,
908 const rgw_user& _owner,
909 const string& _sub_name,
9f95a23c
TL
910 PSSubscriptionRef *_ref) : RGWSingletonCR<PSSubscriptionRef>(_sc->cct),
911 sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
912 mgr(_mgr),
913 owner(_owner),
914 sub_name(_sub_name),
915 ref(_ref),
916 conf(mgr->env->conf) {
917 }
eafe8130 918 ~GetSubCR() { }
11fdf7f2
TL
919
920 int operate() override {
921 reenter(this) {
922 if (owner.empty()) {
923 if (!conf->find_sub(sub_name, &sub_conf)) {
eafe8130 924 ldout(sync_env->cct, 10) << "failed to find subscription config: name=" << sub_name << dendl;
11fdf7f2
TL
925 mgr->remove_get_sub(owner, sub_name);
926 return set_cr_error(-ENOENT);
927 }
928
9f95a23c 929 *ref = PSSubscription::get_shared(sc, mgr->env, sub_conf);
11fdf7f2
TL
930 } else {
931 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
932 yield {
933 RGWUserPubSub ups(sync_env->store, owner);
934 rgw_raw_obj obj;
935 ups.get_sub_meta_obj(sub_name, &obj);
936 bool empty_on_enoent = false;
9f95a23c 937 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
11fdf7f2
TL
938 obj,
939 &user_sub_conf, empty_on_enoent));
940 }
941 if (retcode < 0) {
942 mgr->remove_get_sub(owner, sub_name);
943 return set_cr_error(retcode);
944 }
945
9f95a23c 946 *ref = PSSubscription::get_shared(sc, mgr->env, user_sub_conf);
11fdf7f2
TL
947 }
948
949 yield (*ref)->call_init_cr(this);
950 if (retcode < 0) {
eafe8130 951 ldout(sync_env->cct, 10) << "failed to init subscription" << dendl;
11fdf7f2
TL
952 mgr->remove_get_sub(owner, sub_name);
953 return set_cr_error(retcode);
954 }
955
956 if (owner.empty()) {
957 mgr->subs[sub_name] = *ref;
958 }
959 mgr->remove_get_sub(owner, sub_name);
960
961 return set_cr_done();
962 }
963 return 0;
964 }
965
966 void return_result(PSSubscriptionRef *result) override {
967 ldout(cct, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl;
968 if (retcode >= 0) {
969 *result = *ref;
970 }
971 }
972 };
973
974 string sub_id(const rgw_user& owner, const string& sub_name) {
975 string owner_prefix;
976 if (!owner.empty()) {
977 owner_prefix = owner.to_str() + "/";
978 }
979
980 return owner_prefix + sub_name;
981 }
982
eafe8130 983 std::map<std::string, GetSubCR *> get_subs;
11fdf7f2
TL
984
985 GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) {
986 return get_subs[sub_id(owner, name)];
987 }
988
989 void remove_get_sub(const rgw_user& owner, const string& name) {
990 get_subs.erase(sub_id(owner, name));
991 }
992
993 bool find_sub_instance(const rgw_user& owner, const string& sub_name, PSSubscriptionRef *sub) {
994 auto iter = subs.find(sub_id(owner, sub_name));
995 if (iter != subs.end()) {
996 *sub = iter->second;
997 return true;
998 }
999 return false;
1000 }
1001
9f95a23c
TL
1002 PSManager(RGWDataSyncCtx *_sc,
1003 PSEnvRef _env) : sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
1004 env(_env) {}
1005
1006public:
9f95a23c 1007 static PSManagerRef get_shared(RGWDataSyncCtx *_sc,
11fdf7f2 1008 PSEnvRef _env) {
9f95a23c 1009 return std::shared_ptr<PSManager>(new PSManager(_sc, _env));
11fdf7f2
TL
1010 }
1011
9f95a23c 1012 static int call_get_subscription_cr(RGWDataSyncCtx *sc, PSManagerRef& mgr,
11fdf7f2
TL
1013 RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) {
1014 if (mgr->find_sub_instance(owner, sub_name, ref)) {
1015 /* found it! nothing to execute */
9f95a23c 1016 ldout(sc->cct, 20) << __func__ << "(): found sub instance" << dendl;
11fdf7f2
TL
1017 }
1018 auto& gs = mgr->get_get_subs(owner, sub_name);
1019 if (!gs) {
9f95a23c
TL
1020 ldout(sc->cct, 20) << __func__ << "(): first get subs" << dendl;
1021 gs = new GetSubCR(sc, mgr, owner, sub_name, ref);
11fdf7f2 1022 }
9f95a23c 1023 ldout(sc->cct, 20) << __func__ << "(): executing get subs" << dendl;
11fdf7f2
TL
1024 return gs->execute(caller, ref);
1025 }
1026
1027 friend class GetSubCR;
1028};
1029
1030void PSEnv::init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr) {
1031 manager = mgr;
1032 conf->init_instance(realm, instance_id);
1033}
1034
1035class RGWPSInitEnvCBCR : public RGWCoroutine {
9f95a23c 1036 RGWDataSyncCtx *sc;
11fdf7f2
TL
1037 RGWDataSyncEnv *sync_env;
1038 PSEnvRef env;
1039 PSConfigRef& conf;
1040
1041 rgw_user_create_params create_user;
1042 rgw_get_user_info_params get_user_info;
1043public:
9f95a23c
TL
1044 RGWPSInitEnvCBCR(RGWDataSyncCtx *_sc,
1045 PSEnvRef& _env) : RGWCoroutine(_sc->cct),
1046 sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
1047 env(_env), conf(env->conf) {}
1048 int operate() override {
1049 reenter(this) {
9f95a23c 1050 ldpp_dout(sync_env->dpp, 1) << ": init pubsub config zone=" << sc->source_zone << dendl;
11fdf7f2
TL
1051
1052 /* nothing to do here right now */
1053 create_user.user = conf->user;
1054 create_user.max_buckets = 0; /* unlimited */
1055 create_user.display_name = "pubsub";
1056 create_user.generate_key = false;
9f95a23c 1057 yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user, sync_env->dpp));
1911f103 1058 if (retcode < 0 && retcode != -ERR_USER_EXIST) {
9f95a23c 1059 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
11fdf7f2
TL
1060 return set_cr_error(retcode);
1061 }
1062
1063 get_user_info.user = conf->user;
1064 yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info));
1065 if (retcode < 0) {
9f95a23c 1066 ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
11fdf7f2
TL
1067 return set_cr_error(retcode);
1068 }
1069
9f95a23c 1070 ldpp_dout(sync_env->dpp, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env->data_user_info, true) << dendl;
11fdf7f2
TL
1071
1072
1073 return set_cr_done();
1074 }
1075 return 0;
1076 }
1077};
1078
eafe8130
TL
1079bool match(const rgw_pubsub_topic_filter& filter, const std::string& key_name, rgw::notify::EventType event_type) {
1080 if (!match(filter.events, event_type)) {
1081 return false;
1082 }
1083 if (!match(filter.s3_filter.key_filter, key_name)) {
1084 return false;
1085 }
1086 return true;
1087}
1088
11fdf7f2 1089class RGWPSFindBucketTopicsCR : public RGWCoroutine {
9f95a23c 1090 RGWDataSyncCtx *sc;
11fdf7f2
TL
1091 RGWDataSyncEnv *sync_env;
1092 PSEnvRef env;
1093 rgw_user owner;
1094 rgw_bucket bucket;
1095 rgw_obj_key key;
eafe8130 1096 rgw::notify::EventType event_type;
11fdf7f2
TL
1097
1098 RGWUserPubSub ups;
1099
1100 rgw_raw_obj bucket_obj;
1101 rgw_raw_obj user_obj;
1102 rgw_pubsub_bucket_topics bucket_topics;
1103 rgw_pubsub_user_topics user_topics;
1104 TopicsRef *topics;
1105public:
9f95a23c 1106 RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc,
11fdf7f2
TL
1107 PSEnvRef& _env,
1108 const rgw_user& _owner,
1109 const rgw_bucket& _bucket,
1110 const rgw_obj_key& _key,
eafe8130 1111 rgw::notify::EventType _event_type,
9f95a23c
TL
1112 TopicsRef *_topics) : RGWCoroutine(_sc->cct),
1113 sc(_sc), sync_env(_sc->env),
11fdf7f2
TL
1114 env(_env),
1115 owner(_owner),
1116 bucket(_bucket),
1117 key(_key),
eafe8130 1118 event_type(_event_type),
9f95a23c 1119 ups(sync_env->store, owner),
11fdf7f2
TL
1120 topics(_topics) {
1121 *topics = std::make_shared<vector<PSTopicConfigRef> >();
1122 }
1123 int operate() override {
1124 reenter(this) {
1125 ups.get_bucket_meta_obj(bucket, &bucket_obj);
1126 ups.get_user_meta_obj(&user_obj);
1127
1128 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
1129 yield {
1130 bool empty_on_enoent = true;
9f95a23c 1131 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
11fdf7f2
TL
1132 bucket_obj,
1133 &bucket_topics, empty_on_enoent));
1134 }
1135 if (retcode < 0 && retcode != -ENOENT) {
1136 return set_cr_error(retcode);
1137 }
1138
1139 ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
1140
1141 if (!bucket_topics.topics.empty()) {
1142 using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>;
1143 yield {
1144 bool empty_on_enoent = true;
9f95a23c 1145 call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
11fdf7f2
TL
1146 user_obj,
1147 &user_topics, empty_on_enoent));
1148 }
1149 if (retcode < 0 && retcode != -ENOENT) {
1150 return set_cr_error(retcode);
1151 }
1152 }
1153
1154 for (auto& titer : bucket_topics.topics) {
1155 auto& topic_filter = titer.second;
1156 auto& info = topic_filter.topic;
eafe8130 1157 if (!match(topic_filter, key.name, event_type)) {
11fdf7f2
TL
1158 continue;
1159 }
eafe8130 1160 std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
11fdf7f2
TL
1161 tc->name = info.name;
1162 tc->subs = user_topics.topics[info.name].subs;
9f95a23c 1163 tc->opaque_data = info.opaque_data;
11fdf7f2
TL
1164 (*topics)->push_back(tc);
1165 }
1166
1167 env->conf->get_topics(sync_env->cct, bucket, key, topics);
1168 return set_cr_done();
1169 }
1170 return 0;
1171 }
1172};
1173
1174class RGWPSHandleObjEventCR : public RGWCoroutine {
9f95a23c 1175 RGWDataSyncCtx* const sc;
11fdf7f2
TL
1176 const PSEnvRef env;
1177 const rgw_user& owner;
eafe8130
TL
1178 const EventRef<rgw_pubsub_event> event;
1179 const EventRef<rgw_pubsub_s3_record> record;
11fdf7f2
TL
1180 const TopicsRef topics;
1181 const std::array<rgw_user, 2> owners;
1182 bool has_subscriptions;
1183 bool event_handled;
1184 bool sub_conf_found;
1185 PSSubscriptionRef sub;
1186 std::array<rgw_user, 2>::const_iterator oiter;
eafe8130 1187 std::vector<PSTopicConfigRef>::const_iterator titer;
9f95a23c 1188 std::set<std::string>::const_iterator siter;
11fdf7f2
TL
1189 int last_sub_conf_error;
1190
1191public:
9f95a23c 1192 RGWPSHandleObjEventCR(RGWDataSyncCtx* const _sc,
11fdf7f2
TL
1193 const PSEnvRef _env,
1194 const rgw_user& _owner,
eafe8130
TL
1195 const EventRef<rgw_pubsub_event>& _event,
1196 const EventRef<rgw_pubsub_s3_record>& _record,
9f95a23c
TL
1197 const TopicsRef& _topics) : RGWCoroutine(_sc->cct),
1198 sc(_sc),
11fdf7f2
TL
1199 env(_env),
1200 owner(_owner),
1201 event(_event),
eafe8130 1202 record(_record),
11fdf7f2
TL
1203 topics(_topics),
1204 owners({owner, rgw_user{}}),
1205 has_subscriptions(false),
1206 event_handled(false) {}
1207
1208 int operate() override {
1209 reenter(this) {
9f95a23c 1210 ldout(sc->cct, 20) << ": handle event: obj: z=" << sc->source_zone
11fdf7f2
TL
1211 << " event=" << json_str("event", *event, false)
1212 << " owner=" << owner << dendl;
1213
9f95a23c 1214 ldout(sc->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl;
eafe8130
TL
1215
1216 // outside caller should check that
1217 ceph_assert(!topics->empty());
11fdf7f2
TL
1218
1219 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered);
1220
eafe8130 1221 // loop over all topics related to the bucket/object
11fdf7f2 1222 for (titer = topics->begin(); titer != topics->end(); ++titer) {
9f95a23c 1223 ldout(sc->cct, 20) << ": notification for " << event->source << ": topic=" <<
11fdf7f2 1224 (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
eafe8130 1225 // loop over all subscriptions of the topic
11fdf7f2 1226 for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
9f95a23c 1227 ldout(sc->cct, 20) << ": subscription: " << *siter << dendl;
11fdf7f2
TL
1228 has_subscriptions = true;
1229 sub_conf_found = false;
eafe8130
TL
1230 // try to read subscription configuration from global/user cond
1231 // configuration is considered missing only if does not exist in either
11fdf7f2 1232 for (oiter = owners.begin(); oiter != owners.end(); ++oiter) {
9f95a23c 1233 yield PSManager::call_get_subscription_cr(sc, env->manager, this, *oiter, *siter, &sub);
11fdf7f2 1234 if (retcode < 0) {
eafe8130
TL
1235 if (sub_conf_found) {
1236 // not a real issue, sub conf already found
1237 retcode = 0;
1238 }
11fdf7f2
TL
1239 last_sub_conf_error = retcode;
1240 continue;
1241 }
1242 sub_conf_found = true;
eafe8130
TL
1243 if (sub->sub_conf->s3_id.empty()) {
1244 // subscription was not made by S3 compatible API
9f95a23c
TL
1245 ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
1246 yield call(PSSubscription::store_event_cr(sc, sub, event));
eafe8130
TL
1247 if (retcode < 0) {
1248 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
9f95a23c 1249 ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
eafe8130
TL
1250 } else {
1251 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
1252 event_handled = true;
1253 }
1254 if (sub->sub_conf->push_endpoint) {
9f95a23c
TL
1255 ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
1256 yield call(PSSubscription::push_event_cr(sc, sub, event));
eafe8130
TL
1257 if (retcode < 0) {
1258 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
9f95a23c 1259 ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
eafe8130
TL
1260 } else {
1261 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
1262 event_handled = true;
1263 }
1264 }
1265 } else {
1266 // subscription was made by S3 compatible API
9f95a23c 1267 ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
eafe8130 1268 record->configurationId = sub->sub_conf->s3_id;
9f95a23c
TL
1269 record->opaque_data = (*titer)->opaque_data;
1270 yield call(PSSubscription::store_event_cr(sc, sub, record));
11fdf7f2 1271 if (retcode < 0) {
eafe8130 1272 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
9f95a23c 1273 ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
11fdf7f2 1274 } else {
eafe8130 1275 if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
11fdf7f2
TL
1276 event_handled = true;
1277 }
eafe8130 1278 if (sub->sub_conf->push_endpoint) {
9f95a23c
TL
1279 ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
1280 yield call(PSSubscription::push_event_cr(sc, sub, record));
eafe8130
TL
1281 if (retcode < 0) {
1282 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
9f95a23c 1283 ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
eafe8130
TL
1284 } else {
1285 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
1286 event_handled = true;
1287 }
1288 }
11fdf7f2
TL
1289 }
1290 }
1291 if (!sub_conf_found) {
1292 // could not find conf for subscription at user or global levels
eafe8130 1293 if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
9f95a23c 1294 ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
11fdf7f2 1295 << " ret=" << last_sub_conf_error << dendl;
eafe8130
TL
1296 if (retcode == -ENOENT) {
1297 // missing subscription info should be reflected back as invalid argument
1298 // and not as missing object
1299 retcode = -EINVAL;
1300 }
11fdf7f2
TL
1301 }
1302 }
1303 }
1304 if (has_subscriptions && !event_handled) {
1305 // event is considered "lost" of it has subscriptions on any of its topics
1306 // but it was not stored in, or pushed to, any of them
1307 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost);
1308 }
1309 if (retcode < 0) {
1310 return set_cr_error(retcode);
1311 }
1312 return set_cr_done();
1313 }
1314 return 0;
1315 }
1316};
1317
eafe8130 1318// coroutine invoked on remote object creation
11fdf7f2 1319class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
9f95a23c
TL
1320 RGWDataSyncCtx *sc;
1321 rgw_bucket_sync_pipe sync_pipe;
11fdf7f2
TL
1322 PSEnvRef env;
1323 std::optional<uint64_t> versioned_epoch;
eafe8130
TL
1324 EventRef<rgw_pubsub_event> event;
1325 EventRef<rgw_pubsub_s3_record> record;
11fdf7f2
TL
1326 TopicsRef topics;
1327public:
9f95a23c
TL
1328 RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
1329 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
11fdf7f2 1330 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
9f95a23c
TL
1331 TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1332 sc(_sc),
1333 sync_pipe(_sync_pipe),
11fdf7f2
TL
1334 env(_env),
1335 versioned_epoch(_versioned_epoch),
1336 topics(_topics) {
1337 }
1338 int operate() override {
1339 reenter(this) {
9f95a23c
TL
1340 ldout(sc->cct, 20) << ": stat of remote obj: z=" << sc->source_zone
1341 << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
11fdf7f2
TL
1342 << " attrs=" << attrs << dendl;
1343 {
1344 std::vector<std::pair<std::string, std::string> > attrs;
1345 for (auto& attr : attrs) {
9f95a23c 1346 std::string k = attr.first;
11fdf7f2
TL
1347 if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) {
1348 k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
1349 }
1350 attrs.push_back(std::make_pair(k, attr.second));
eafe8130
TL
1351 }
1352 // at this point we don't know whether we need the ceph event or S3 record
1353 // this is why both are created here, once we have information about the
1354 // subscription, we will store/push only the relevant ones
9f95a23c
TL
1355 make_event_ref(sc->cct,
1356 sync_pipe.info.source_bs.bucket, key,
11fdf7f2 1357 mtime, &attrs,
eafe8130 1358 rgw::notify::ObjectCreated, &event);
9f95a23c
TL
1359 make_s3_record_ref(sc->cct,
1360 sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
eafe8130
TL
1361 mtime, &attrs,
1362 rgw::notify::ObjectCreated, &record);
11fdf7f2
TL
1363 }
1364
9f95a23c 1365 yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, record, topics));
11fdf7f2
TL
1366 if (retcode < 0) {
1367 return set_cr_error(retcode);
1368 }
1369 return set_cr_done();
1370 }
1371 return 0;
1372 }
1373};
1374
1375class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
9f95a23c 1376 rgw_bucket_sync_pipe sync_pipe;
11fdf7f2
TL
1377 PSEnvRef env;
1378 std::optional<uint64_t> versioned_epoch;
1379 TopicsRef topics;
1380public:
9f95a23c
TL
1381 RGWPSHandleRemoteObjCR(RGWDataSyncCtx *_sc,
1382 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
11fdf7f2 1383 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
9f95a23c
TL
1384 TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
1385 sync_pipe(_sync_pipe),
11fdf7f2
TL
1386 env(_env), versioned_epoch(_versioned_epoch),
1387 topics(_topics) {
1388 }
1389
1390 ~RGWPSHandleRemoteObjCR() override {}
1391
1392 RGWStatRemoteObjCBCR *allocate_callback() override {
9f95a23c 1393 return new RGWPSHandleRemoteObjCBCR(sc, sync_pipe, key, env, versioned_epoch, topics);
11fdf7f2
TL
1394 }
1395};
1396
1397class RGWPSHandleObjCreateCR : public RGWCoroutine {
9f95a23c
TL
1398 RGWDataSyncCtx *sc;
1399 rgw_bucket_sync_pipe sync_pipe;
11fdf7f2
TL
1400 rgw_obj_key key;
1401 PSEnvRef env;
1402 std::optional<uint64_t> versioned_epoch;
1403 TopicsRef topics;
1404public:
9f95a23c
TL
1405 RGWPSHandleObjCreateCR(RGWDataSyncCtx *_sc,
1406 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
1407 PSEnvRef _env, std::optional<uint64_t> _versioned_epoch) : RGWCoroutine(_sc->cct),
1408 sc(_sc),
1409 sync_pipe(_sync_pipe),
11fdf7f2
TL
1410 key(_key),
1411 env(_env),
1412 versioned_epoch(_versioned_epoch) {
1413 }
1414
1415 ~RGWPSHandleObjCreateCR() override {}
1416
1417 int operate() override {
1418 reenter(this) {
9f95a23c
TL
1419 yield call(new RGWPSFindBucketTopicsCR(sc, env, sync_pipe.dest_bucket_info.owner,
1420 sync_pipe.info.source_bs.bucket, key,
eafe8130 1421 rgw::notify::ObjectCreated,
11fdf7f2
TL
1422 &topics));
1423 if (retcode < 0) {
9f95a23c 1424 ldout(sc->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
11fdf7f2
TL
1425 return set_cr_error(retcode);
1426 }
1427 if (topics->empty()) {
9f95a23c 1428 ldout(sc->cct, 20) << "no topics found for " << sync_pipe.info.source_bs.bucket << "/" << key << dendl;
11fdf7f2
TL
1429 return set_cr_done();
1430 }
9f95a23c 1431 yield call(new RGWPSHandleRemoteObjCR(sc, sync_pipe, key, env, versioned_epoch, topics));
11fdf7f2
TL
1432 if (retcode < 0) {
1433 return set_cr_error(retcode);
1434 }
1435 return set_cr_done();
1436 }
1437 return 0;
1438 }
1439};
1440
eafe8130 1441// coroutine invoked on remote object deletion
11fdf7f2 1442class RGWPSGenericObjEventCBCR : public RGWCoroutine {
9f95a23c 1443 RGWDataSyncCtx *sc;
11fdf7f2
TL
1444 PSEnvRef env;
1445 rgw_user owner;
1446 rgw_bucket bucket;
1447 rgw_obj_key key;
1448 ceph::real_time mtime;
eafe8130
TL
1449 rgw::notify::EventType event_type;
1450 EventRef<rgw_pubsub_event> event;
1451 EventRef<rgw_pubsub_s3_record> record;
11fdf7f2
TL
1452 TopicsRef topics;
1453public:
9f95a23c 1454 RGWPSGenericObjEventCBCR(RGWDataSyncCtx *_sc,
11fdf7f2 1455 PSEnvRef _env,
9f95a23c
TL
1456 rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
1457 rgw::notify::EventType _event_type) : RGWCoroutine(_sc->cct),
1458 sc(_sc),
11fdf7f2 1459 env(_env),
9f95a23c
TL
1460 owner(_sync_pipe.dest_bucket_info.owner),
1461 bucket(_sync_pipe.dest_bucket_info.bucket),
11fdf7f2 1462 key(_key),
eafe8130 1463 mtime(_mtime), event_type(_event_type) {}
11fdf7f2
TL
1464 int operate() override {
1465 reenter(this) {
9f95a23c 1466 ldout(sc->cct, 20) << ": remove remote obj: z=" << sc->source_zone
11fdf7f2 1467 << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
9f95a23c 1468 yield call(new RGWPSFindBucketTopicsCR(sc, env, owner, bucket, key, event_type, &topics));
11fdf7f2 1469 if (retcode < 0) {
9f95a23c 1470 ldout(sc->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
11fdf7f2
TL
1471 return set_cr_error(retcode);
1472 }
1473 if (topics->empty()) {
9f95a23c 1474 ldout(sc->cct, 20) << "no topics found for " << bucket << "/" << key << dendl;
11fdf7f2
TL
1475 return set_cr_done();
1476 }
eafe8130
TL
1477 // at this point we don't know whether we need the ceph event or S3 record
1478 // this is why both are created here, once we have information about the
1479 // subscription, we will store/push only the relevant ones
9f95a23c 1480 make_event_ref(sc->cct,
eafe8130
TL
1481 bucket, key,
1482 mtime, nullptr,
1483 event_type, &event);
9f95a23c 1484 make_s3_record_ref(sc->cct,
eafe8130
TL
1485 bucket, owner, key,
1486 mtime, nullptr,
1487 event_type, &record);
9f95a23c 1488 yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, record, topics));
11fdf7f2
TL
1489 if (retcode < 0) {
1490 return set_cr_error(retcode);
1491 }
1492 return set_cr_done();
1493 }
1494 return 0;
1495 }
1496
1497};
1498
1499class RGWPSDataSyncModule : public RGWDataSyncModule {
1500 PSEnvRef env;
1501 PSConfigRef& conf;
eafe8130 1502
11fdf7f2
TL
1503public:
1504 RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared<PSEnv>()), conf(env->conf) {
1505 env->init(cct, config);
1506 }
eafe8130 1507
11fdf7f2
TL
1508 ~RGWPSDataSyncModule() override {}
1509
9f95a23c
TL
1510 void init(RGWDataSyncCtx *sc, uint64_t instance_id) override {
1511 auto sync_env = sc->env;
1512 PSManagerRef mgr = PSManager::get_shared(sc, env);
1513 env->init_instance(sync_env->svc->zone->get_realm(), instance_id, mgr);
11fdf7f2
TL
1514 }
1515
9f95a23c
TL
1516 RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override {
1517 ldout(sc->cct, 5) << conf->id << ": start" << dendl;
1518 return new RGWPSInitEnvCBCR(sc, env);
11fdf7f2 1519 }
eafe8130 1520
9f95a23c 1521 RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
eafe8130 1522 rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
9f95a23c 1523 ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe <<
eafe8130 1524 " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
9f95a23c 1525 return new RGWPSHandleObjCreateCR(sc, sync_pipe, key, env, versioned_epoch);
11fdf7f2 1526 }
eafe8130 1527
9f95a23c 1528 RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
eafe8130 1529 rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
9f95a23c 1530 ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe <<
eafe8130 1531 " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
9f95a23c 1532 return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete);
11fdf7f2 1533 }
eafe8130 1534
9f95a23c 1535 RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
eafe8130 1536 rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
9f95a23c 1537 ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe <<
eafe8130 1538 " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
9f95a23c 1539 return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated);
11fdf7f2
TL
1540 }
1541
1542 PSConfigRef& get_conf() { return conf; }
1543};
1544
1545RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config)
1546{
1547 data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config));
9f95a23c 1548 const std::string jconf = json_str("conf", *data_handler->get_conf());
11fdf7f2
TL
1549 JSONParser p;
1550 if (!p.parse(jconf.c_str(), jconf.size())) {
eafe8130 1551 ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
11fdf7f2
TL
1552 effective_conf = config;
1553 } else {
1554 effective_conf.decode_json(&p);
1555 }
eafe8130
TL
1556#ifdef WITH_RADOSGW_AMQP_ENDPOINT
1557 if (!rgw::amqp::init(cct)) {
1558 ldout(cct, 1) << "ERROR: failed to initialize AMQP manager in pubsub sync module" << dendl;
1559 }
1560#endif
9f95a23c
TL
1561#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
1562 if (!rgw::kafka::init(cct)) {
1563 ldout(cct, 1) << "ERROR: failed to initialize Kafka manager in pubsub sync module" << dendl;
1564 }
1565#endif
eafe8130
TL
1566}
1567
1568RGWPSSyncModuleInstance::~RGWPSSyncModuleInstance() {
1569#ifdef WITH_RADOSGW_AMQP_ENDPOINT
1570 rgw::amqp::shutdown();
1571#endif
9f95a23c
TL
1572#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
1573 rgw::kafka::shutdown();
1574#endif
11fdf7f2
TL
1575}
1576
1577RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()
1578{
1579 return data_handler.get();
1580}
1581
1582RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
1583 if (dialect != RGW_REST_S3) {
1584 return orig;
1585 }
eafe8130
TL
1586 return new RGWRESTMgr_PubSub();
1587}
1588
1589bool RGWPSSyncModuleInstance::should_full_sync() const {
1590 return data_handler->get_conf()->start_with_full_sync;
11fdf7f2
TL
1591}
1592
1593int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
1594 instance->reset(new RGWPSSyncModuleInstance(cct, config));
1595 return 0;
1596}
1597
eafe8130 1598