1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "services/svc_zone.h"
5 #include "rgw_common.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_sync_module.h"
8 #include "rgw_data_sync.h"
9 #include "rgw_sync_module_pubsub.h"
10 #include "rgw_sync_module_pubsub_rest.h"
11 #include "rgw_rest_conn.h"
12 #include "rgw_cr_rados.h"
13 #include "rgw_cr_rest.h"
14 #include "rgw_cr_tools.h"
16 #include "rgw_pubsub.h"
17 #include "rgw_pubsub_push.h"
18 #include "rgw_notify_event_type.h"
19 #include "rgw_perf_counters.h"
20 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
23 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
24 #include "rgw_kafka.h"
27 #include <boost/algorithm/hex.hpp>
28 #include <boost/asio/yield.hpp>
30 #define dout_subsys ceph_subsys_rgw
33 #define PUBSUB_EVENTS_RETENTION_DEFAULT 7
40 "tenant": <tenant>, # default: <empty>
41 "uid": <uid>, # default: "pubsub"
42 "data_bucket_prefix": <prefix> # default: "pubsub-"
43 "data_oid_prefix": <prefix> #
44 "events_retention_days": <int> # default: 7
45 "start_with_full_sync" <bool> # default: false
50 "path": <notification-path>, # this can be either an explicit path: <bucket>, or <bucket>/<object>,
51 # or a prefix if it ends with a wildcard
58 "name": <subscription-name>,
60 "push_endpoint": <endpoint>,
61 "push_endpoint_args:" <arg list>. # any push endpoint specific args (include all args)
62 "data_bucket": <bucket>, # override name of bucket where subscription data will be store
63 "data_oid_prefix": <prefix> # set prefix for subscription data object ids
64 "s3_id": <id> # in case of S3 compatible notifications, the notification ID will be set here
72 // utility function to convert the args list from string format
73 // (ampresend separated with equal sign) to prased structure
74 RGWHTTPArgs
string_to_args(const std::string
& str_args
) {
84 std::string push_endpoint_name
;
85 std::string push_endpoint_args
;
86 std::string data_bucket_name
;
87 std::string data_oid_prefix
;
89 std::string arn_topic
;
90 RGWPubSubEndpoint::Ptr push_endpoint
;
92 void from_user_conf(CephContext
*cct
, const rgw_pubsub_sub_config
& uc
) {
95 push_endpoint_name
= uc
.dest
.push_endpoint
;
96 data_bucket_name
= uc
.dest
.bucket_name
;
97 data_oid_prefix
= uc
.dest
.oid_prefix
;
99 arn_topic
= uc
.dest
.arn_topic
;
100 if (!push_endpoint_name
.empty()) {
101 push_endpoint_args
= uc
.dest
.push_endpoint_args
;
103 push_endpoint
= RGWPubSubEndpoint::create(push_endpoint_name
, arn_topic
, string_to_args(push_endpoint_args
), cct
);
104 ldout(cct
, 20) << "push endpoint created: " << push_endpoint
->to_str() << dendl
;
105 } catch (const RGWPubSubEndpoint::configuration_error
& e
) {
106 ldout(cct
, 1) << "ERROR: failed to create push endpoint: "
107 << push_endpoint_name
<< " due to: " << e
.what() << dendl
;
112 void dump(Formatter
*f
) const {
113 encode_json("name", name
, f
);
114 encode_json("topic", topic
, f
);
115 encode_json("push_endpoint", push_endpoint_name
, f
);
116 encode_json("push_endpoint_args", push_endpoint_args
, f
);
117 encode_json("data_bucket_name", data_bucket_name
, f
);
118 encode_json("data_oid_prefix", data_oid_prefix
, f
);
119 encode_json("s3_id", s3_id
, f
);
122 void init(CephContext
*cct
, const JSONFormattable
& config
,
123 const string
& data_bucket_prefix
,
124 const string
& default_oid_prefix
) {
125 name
= config
["name"];
126 topic
= config
["topic"];
127 push_endpoint_name
= config
["push_endpoint"];
128 string default_bucket_name
= data_bucket_prefix
+ name
;
129 data_bucket_name
= config
["data_bucket"](default_bucket_name
.c_str());
130 data_oid_prefix
= config
["data_oid_prefix"](default_oid_prefix
.c_str());
131 s3_id
= config
["s3_id"];
132 arn_topic
= config
["arn_topic"];
133 if (!push_endpoint_name
.empty()) {
134 push_endpoint_args
= config
["push_endpoint_args"];
136 push_endpoint
= RGWPubSubEndpoint::create(push_endpoint_name
, arn_topic
, string_to_args(push_endpoint_args
), cct
);
137 ldout(cct
, 20) << "push endpoint created: " << push_endpoint
->to_str() << dendl
;
138 } catch (const RGWPubSubEndpoint::configuration_error
& e
) {
139 ldout(cct
, 1) << "ERROR: failed to create push endpoint: "
140 << push_endpoint_name
<< " due to: " << e
.what() << dendl
;
146 using PSSubConfigRef
= std::shared_ptr
<PSSubConfig
>;
148 struct PSTopicConfig
{
150 std::set
<std::string
> subs
;
151 std::string opaque_data
;
153 void dump(Formatter
*f
) const {
154 encode_json("name", name
, f
);
155 encode_json("subs", subs
, f
);
156 encode_json("opaque", opaque_data
, f
);
160 struct PSNotificationConfig
{
162 string path
; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */
164 bool is_prefix
{false};
167 void dump(Formatter
*f
) const {
168 encode_json("id", id
, f
);
169 encode_json("path", path
, f
);
170 encode_json("topic", topic
, f
);
171 encode_json("is_prefix", is_prefix
, f
);
174 void init(CephContext
*cct
, const JSONFormattable
& config
) {
175 path
= config
["path"];
176 if (!path
.empty() && path
[path
.size() - 1] == '*') {
177 path
= path
.substr(0, path
.size() - 1);
180 topic
= config
["topic"];
185 static string
json_str(const char *name
, const T
& obj
, bool pretty
= false)
188 JSONFormatter
f(pretty
);
190 encode_json(name
, obj
, &f
);
196 using PSTopicConfigRef
= std::shared_ptr
<PSTopicConfig
>;
197 using TopicsRef
= std::shared_ptr
<std::vector
<PSTopicConfigRef
>>;
200 const std::string id
{"pubsub"};
202 std::string data_bucket_prefix
;
203 std::string data_oid_prefix
;
205 int events_retention_days
{0};
207 uint64_t sync_instance
{0};
210 /* FIXME: no hard coded buckets, we'll have configurable topics */
211 std::map
<std::string
, PSSubConfigRef
> subs
;
212 std::map
<std::string
, PSTopicConfigRef
> topics
;
213 std::multimap
<std::string
, PSNotificationConfig
> notifications
;
215 bool start_with_full_sync
{false};
217 void dump(Formatter
*f
) const {
218 encode_json("id", id
, f
);
219 encode_json("user", user
, f
);
220 encode_json("data_bucket_prefix", data_bucket_prefix
, f
);
221 encode_json("data_oid_prefix", data_oid_prefix
, f
);
222 encode_json("events_retention_days", events_retention_days
, f
);
223 encode_json("sync_instance", sync_instance
, f
);
224 encode_json("max_id", max_id
, f
);
226 Formatter::ArraySection
section(*f
, "subs");
227 for (auto& sub
: subs
) {
228 encode_json("sub", *sub
.second
, f
);
232 Formatter::ArraySection
section(*f
, "topics");
233 for (auto& topic
: topics
) {
234 encode_json("topic", *topic
.second
, f
);
238 Formatter::ObjectSection
section(*f
, "notifications");
240 for (auto& notif
: notifications
) {
241 const string
& n
= notif
.first
;
246 f
->open_array_section(n
.c_str());
249 encode_json("notifications", notif
.second
, f
);
255 encode_json("start_with_full_sync", start_with_full_sync
, f
);
258 void init(CephContext
*cct
, const JSONFormattable
& config
) {
259 string uid
= config
["uid"]("pubsub");
260 user
= rgw_user(config
["tenant"], uid
);
261 data_bucket_prefix
= config
["data_bucket_prefix"]("pubsub-");
262 data_oid_prefix
= config
["data_oid_prefix"];
263 events_retention_days
= config
["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT
);
265 for (auto& c
: config
["notifications"].array()) {
266 PSNotificationConfig nc
;
269 notifications
.insert(std::make_pair(nc
.path
, nc
));
271 PSTopicConfig topic_config
= { .name
= nc
.topic
};
272 topics
[nc
.topic
] = make_shared
<PSTopicConfig
>(topic_config
);
274 for (auto& c
: config
["subscriptions"].array()) {
275 auto sc
= std::make_shared
<PSSubConfig
>();
276 sc
->init(cct
, c
, data_bucket_prefix
, data_oid_prefix
);
278 auto iter
= topics
.find(sc
->topic
);
279 if (iter
!= topics
.end()) {
280 iter
->second
->subs
.insert(sc
->name
);
283 start_with_full_sync
= config
["start_with_full_sync"](false);
285 ldout(cct
, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl
;
288 void init_instance(const RGWRealm
& realm
, uint64_t instance_id
) {
289 sync_instance
= instance_id
;
292 void get_topics(CephContext
*cct
, const rgw_bucket
& bucket
, const rgw_obj_key
& key
, TopicsRef
*result
) {
293 const std::string path
= bucket
.name
+ "/" + key
.name
;
295 auto iter
= notifications
.upper_bound(path
);
296 if (iter
== notifications
.begin()) {
302 if (iter
->first
.size() > path
.size()) {
305 if (path
.compare(0, iter
->first
.size(), iter
->first
) != 0) {
309 PSNotificationConfig
& target
= iter
->second
;
311 if (!target
.is_prefix
&&
312 path
.size() != iter
->first
.size()) {
316 auto topic
= topics
.find(target
.topic
);
317 if (topic
== topics
.end()) {
321 ldout(cct
, 20) << ": found topic for path=" << bucket
<< "/" << key
<< ": id=" << target
.id
<<
322 " target_path=" << target
.path
<< ", topic=" << target
.topic
<< dendl
;
323 (*result
)->push_back(topic
->second
);
324 } while (iter
!= notifications
.begin());
327 bool find_sub(const string
& name
, PSSubConfigRef
*ref
) {
328 auto iter
= subs
.find(name
);
329 if (iter
!= subs
.end()) {
337 using PSConfigRef
= std::shared_ptr
<PSConfig
>;
338 template<typename EventType
>
339 using EventRef
= std::shared_ptr
<EventType
>;
341 struct objstore_event
{
343 const rgw_bucket
& bucket
;
344 const rgw_obj_key
& key
;
345 const ceph::real_time
& mtime
;
346 const std::vector
<std::pair
<std::string
, std::string
> > *attrs
;
348 objstore_event(const rgw_bucket
& _bucket
,
349 const rgw_obj_key
& _key
,
350 const ceph::real_time
& _mtime
,
351 const std::vector
<std::pair
<std::string
, std::string
> > *_attrs
) : bucket(_bucket
),
359 hash
.update(bucket
.bucket_id
);
360 hash
.update(key
.name
);
361 hash
.update(key
.instance
);
364 assert(etag
.size() > 8);
366 return etag
.substr(0, 8);
369 void dump(Formatter
*f
) const {
371 Formatter::ObjectSection
s(*f
, "bucket");
372 encode_json("name", bucket
.name
, f
);
373 encode_json("tenant", bucket
.tenant
, f
);
374 encode_json("bucket_id", bucket
.bucket_id
, f
);
377 Formatter::ObjectSection
s(*f
, "key");
378 encode_json("name", key
.name
, f
);
379 encode_json("instance", key
.instance
, f
);
382 encode_json("mtime", mt
, f
);
383 Formatter::ObjectSection
s(*f
, "attrs");
385 for (auto& attr
: *attrs
) {
386 encode_json(attr
.first
.c_str(), attr
.second
.c_str(), f
);
392 static void make_event_ref(CephContext
*cct
, const rgw_bucket
& bucket
,
393 const rgw_obj_key
& key
,
394 const ceph::real_time
& mtime
,
395 const std::vector
<std::pair
<std::string
, std::string
> > *attrs
,
396 rgw::notify::EventType event_type
,
397 EventRef
<rgw_pubsub_event
> *event
) {
398 *event
= std::make_shared
<rgw_pubsub_event
>();
400 EventRef
<rgw_pubsub_event
>& e
= *event
;
401 e
->event_name
= rgw::notify::to_ceph_string(event_type
);
402 e
->source
= bucket
.name
+ "/" + key
.name
;
403 e
->timestamp
= real_clock::now();
405 objstore_event
oevent(bucket
, key
, mtime
, attrs
);
407 const utime_t
ts(e
->timestamp
);
408 set_event_id(e
->id
, oevent
.get_hash(), ts
);
410 encode_json("info", oevent
, &e
->info
);
413 static void make_s3_record_ref(CephContext
*cct
, const rgw_bucket
& bucket
,
414 const rgw_user
& owner
,
415 const rgw_obj_key
& key
,
416 const ceph::real_time
& mtime
,
417 const std::vector
<std::pair
<std::string
, std::string
> > *attrs
,
418 rgw::notify::EventType event_type
,
419 EventRef
<rgw_pubsub_s3_record
> *record
) {
420 *record
= std::make_shared
<rgw_pubsub_s3_record
>();
422 EventRef
<rgw_pubsub_s3_record
>& r
= *record
;
423 r
->eventTime
= mtime
;
424 r
->eventName
= rgw::notify::to_string(event_type
);
425 // userIdentity: not supported in sync module
426 // x_amz_request_id: not supported in sync module
427 // x_amz_id_2: not supported in sync module
428 // configurationId is filled from subscription configuration
429 r
->bucket_name
= bucket
.name
;
430 r
->bucket_ownerIdentity
= owner
.to_str();
431 r
->bucket_arn
= to_string(rgw::ARN(bucket
));
432 r
->bucket_id
= bucket
.bucket_id
; // rgw extension
433 r
->object_key
= key
.name
;
434 // object_size not supported in sync module
435 objstore_event
oevent(bucket
, key
, mtime
, attrs
);
436 r
->object_etag
= oevent
.get_hash();
437 r
->object_versionId
= key
.instance
;
439 // use timestamp as per key sequence id (hex encoded)
440 const utime_t
ts(real_clock::now());
441 boost::algorithm::hex((const char*)&ts
, (const char*)&ts
+ sizeof(utime_t
),
442 std::back_inserter(r
->object_sequencer
));
444 set_event_id(r
->id
, r
->object_etag
, ts
);
448 using PSManagerRef
= std::shared_ptr
<PSManager
>;
452 shared_ptr
<RGWUserInfo
> data_user_info
;
453 PSManagerRef manager
;
455 PSEnv() : conf(make_shared
<PSConfig
>()),
456 data_user_info(make_shared
<RGWUserInfo
>()) {}
458 void init(CephContext
*cct
, const JSONFormattable
& config
) {
459 conf
->init(cct
, config
);
462 void init_instance(const RGWRealm
& realm
, uint64_t instance_id
, PSManagerRef
& mgr
);
465 using PSEnvRef
= std::shared_ptr
<PSEnv
>;
467 template<typename EventType
>
469 const EventRef
<EventType
> event
;
472 PSEvent(const EventRef
<EventType
>& _event
) : event(_event
) {}
474 void format(bufferlist
*bl
) const {
475 bl
->append(json_str("", *event
));
478 void encode_event(bufferlist
& bl
) const {
482 const string
& id() const {
488 class RGWSingletonCR
: public RGWCoroutine
{
489 friend class WrapperCR
;
491 boost::asio::coroutine wrapper_state
;
496 RGWCoroutine
*cr
{nullptr};
499 using WaiterInfoRef
= std::shared_ptr
<WaiterInfo
>;
501 deque
<WaiterInfoRef
> waiters
;
503 void add_waiter(RGWCoroutine
*cr
, T
*result
) {
504 auto waiter
= std::make_shared
<WaiterInfo
>();
506 waiter
->result
= result
;
507 waiters
.push_back(waiter
);
510 bool get_next_waiter(WaiterInfoRef
*waiter
) {
511 if (waiters
.empty()) {
516 *waiter
= waiters
.front();
521 int operate_wrapper() override
{
522 reenter(&wrapper_state
) {
524 ldout(cct
, 20) << __func__
<< "(): operate_wrapper() -> operate()" << dendl
;
525 operate_ret
= operate();
526 if (operate_ret
< 0) {
527 ldout(cct
, 20) << *this << ": operate() returned r=" << operate_ret
<< dendl
;
534 ldout(cct
, 20) << __func__
<< "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters
.size() << " waiters" << dendl
;
535 /* we're done, can't yield anymore */
537 WaiterInfoRef waiter
;
538 while (get_next_waiter(&waiter
)) {
539 ldout(cct
, 20) << __func__
<< "(): RGWSingletonCR: waking up waiter" << dendl
;
540 waiter
->cr
->set_retcode(retcode
);
541 waiter
->cr
->set_sleeping(false);
542 return_result(waiter
->result
);
551 virtual void return_result(T
*result
) {}
554 RGWSingletonCR(CephContext
*_cct
)
555 : RGWCoroutine(_cct
) {}
557 int execute(RGWCoroutine
*caller
, T
*result
= nullptr) {
559 ldout(cct
, 20) << __func__
<< "(): singleton not started, starting" << dendl
;
563 } else if (!is_done()) {
564 ldout(cct
, 20) << __func__
<< "(): singleton not done yet, registering as waiter" << dendl
;
566 add_waiter(caller
, result
);
567 caller
->set_sleeping(true);
571 ldout(cct
, 20) << __func__
<< "(): singleton done, returning retcode=" << retcode
<< dendl
;
572 caller
->set_retcode(retcode
);
573 return_result(result
);
579 class PSSubscription
;
580 using PSSubscriptionRef
= std::shared_ptr
<PSSubscription
>;
582 class PSSubscription
{
585 friend class RGWPSHandleObjEventCR
;
588 RGWDataSyncEnv
*sync_env
;
590 PSSubConfigRef sub_conf
;
591 std::shared_ptr
<rgw_get_bucket_info_result
> get_bucket_info_result
;
592 RGWBucketInfo
*bucket_info
{nullptr};
593 RGWDataAccessRef data_access
;
594 RGWDataAccess::BucketRef bucket
;
596 InitCR
*init_cr
{nullptr};
598 class InitBucketLifecycleCR
: public RGWCoroutine
{
600 RGWDataSyncEnv
*sync_env
;
606 rgw_bucket_lifecycle_config_params lc_config
;
609 InitBucketLifecycleCR(RGWDataSyncCtx
*_sc
,
611 RGWBucketInfo
& _bucket_info
,
612 std::map
<string
, bufferlist
>& _bucket_attrs
) : RGWCoroutine(_sc
->cct
),
613 sc(_sc
), sync_env(_sc
->env
),
615 lc_config
.bucket_info
= _bucket_info
;
616 lc_config
.bucket_attrs
= _bucket_attrs
;
617 retention_days
= conf
->events_retention_days
;
620 int operate() override
{
623 rule
.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days
);
626 /* maybe we already have it configured? */
627 RGWLifecycleConfiguration old_config
;
628 auto aiter
= lc_config
.bucket_attrs
.find(RGW_ATTR_LC
);
629 if (aiter
!= lc_config
.bucket_attrs
.end()) {
630 bufferlist::const_iterator iter
{&aiter
->second
};
632 old_config
.decode(iter
);
633 } catch (const buffer::error
& e
) {
634 ldpp_dout(sync_env
->dpp
, 0) << __func__
<< "(): decode life cycle config failed" << dendl
;
638 auto old_rules
= old_config
.get_rule_map();
639 for (auto ori
: old_rules
) {
640 auto& old_rule
= ori
.second
;
642 if (old_rule
.get_prefix().empty() &&
643 old_rule
.get_expiration().get_days() == retention_days
&&
644 old_rule
.is_enabled()) {
645 ldpp_dout(sync_env
->dpp
, 20) << "no need to set lifecycle rule on bucket, existing rule matches config" << dendl
;
646 return set_cr_done();
651 lc_config
.config
.add_rule(rule
);
652 yield
call(new RGWBucketLifecycleConfigCR(sync_env
->async_rados
,
657 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode
<< dendl
;
658 return set_cr_error(retcode
);
661 return set_cr_done();
667 class InitCR
: public RGWSingletonCR
<bool> {
669 RGWDataSyncEnv
*sync_env
;
670 PSSubscriptionRef sub
;
671 rgw_get_bucket_info_params get_bucket_info
;
672 rgw_bucket_create_local_params create_bucket
;
674 PSSubConfigRef
& sub_conf
;
678 InitCR(RGWDataSyncCtx
*_sc
,
679 PSSubscriptionRef
& _sub
) : RGWSingletonCR
<bool>(_sc
->cct
),
680 sc(_sc
), sync_env(_sc
->env
),
681 sub(_sub
), conf(sub
->env
->conf
),
682 sub_conf(sub
->sub_conf
) {
685 int operate() override
{
687 get_bucket_info
.tenant
= conf
->user
.tenant
;
688 get_bucket_info
.bucket_name
= sub_conf
->data_bucket_name
;
689 sub
->get_bucket_info_result
= make_shared
<rgw_get_bucket_info_result
>();
691 for (i
= 0; i
< 2; ++i
) {
692 yield
call(new RGWGetBucketInfoCR(sync_env
->async_rados
,
695 sub
->get_bucket_info_result
));
696 if (retcode
< 0 && retcode
!= -ENOENT
) {
697 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to geting bucket info: " << "tenant="
698 << get_bucket_info
.tenant
<< " name=" << get_bucket_info
.bucket_name
<< ": ret=" << retcode
<< dendl
;
702 auto& result
= sub
->get_bucket_info_result
;
703 sub
->bucket_info
= &result
->bucket_info
;
705 int ret
= sub
->data_access
->get_bucket(result
->bucket_info
, result
->attrs
, &sub
->bucket
);
707 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: data_access.get_bucket() bucket=" << result
->bucket_info
.bucket
<< " failed, ret=" << ret
<< dendl
;
708 return set_cr_error(ret
);
712 yield
call(new InitBucketLifecycleCR(sc
, conf
,
713 sub
->get_bucket_info_result
->bucket_info
,
714 sub
->get_bucket_info_result
->attrs
));
716 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf
->data_bucket_name
<< ") ret=" << retcode
<< dendl
;
717 return set_cr_error(retcode
);
720 return set_cr_done();
723 create_bucket
.user_info
= sub
->env
->data_user_info
;
724 create_bucket
.bucket_name
= sub_conf
->data_bucket_name
;
725 ldpp_dout(sync_env
->dpp
, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub
->env
->data_user_info
, true) << dendl
;
726 yield
call(new RGWBucketCreateLocalCR(sync_env
->async_rados
,
731 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to create bucket: " << "tenant="
732 << get_bucket_info
.tenant
<< " name=" << get_bucket_info
.bucket_name
<< ": ret=" << retcode
<< dendl
;
733 return set_cr_error(retcode
);
736 /* second iteration: we got -ENOENT and created a bucket */
739 /* failed twice on -ENOENT, unexpected */
740 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info
.tenant
741 << " name=" << get_bucket_info
.bucket_name
<< dendl
;
742 return set_cr_error(-EIO
);
748 template<typename EventType
>
749 class StoreEventCR
: public RGWCoroutine
{
750 RGWDataSyncCtx
* const sc
;
751 RGWDataSyncEnv
* const sync_env
;
752 const PSSubscriptionRef sub
;
753 const PSEvent
<EventType
> pse
;
754 const string oid_prefix
;
757 StoreEventCR(RGWDataSyncCtx
* const _sc
,
758 const PSSubscriptionRef
& _sub
,
759 const EventRef
<EventType
>& _event
) : RGWCoroutine(_sc
->cct
),
760 sc(_sc
), sync_env(_sc
->env
),
763 oid_prefix(sub
->sub_conf
->data_oid_prefix
) {
766 int operate() override
{
767 rgw_object_simple_put_params put_obj
;
770 put_obj
.bucket
= sub
->bucket
;
771 put_obj
.key
= rgw_obj_key(oid_prefix
+ pse
.id());
773 pse
.format(&put_obj
.data
);
777 pse
.encode_event(bl
);
779 bl
.encode_base64(bl64
);
780 put_obj
.user_data
= bl64
.to_str();
783 yield
call(new RGWObjectSimplePutCR(sync_env
->async_rados
,
788 ldpp_dout(sync_env
->dpp
, 10) << "failed to store event: " << put_obj
.bucket
<< "/" << put_obj
.key
<< " ret=" << retcode
<< dendl
;
789 return set_cr_error(retcode
);
791 ldpp_dout(sync_env
->dpp
, 20) << "event stored: " << put_obj
.bucket
<< "/" << put_obj
.key
<< dendl
;
794 return set_cr_done();
800 template<typename EventType
>
801 class PushEventCR
: public RGWCoroutine
{
802 RGWDataSyncCtx
* const sc
;
803 RGWDataSyncEnv
* const sync_env
;
804 const EventRef
<EventType
> event
;
805 const PSSubConfigRef
& sub_conf
;
808 PushEventCR(RGWDataSyncCtx
* const _sc
,
809 const PSSubscriptionRef
& _sub
,
810 const EventRef
<EventType
>& _event
) : RGWCoroutine(_sc
->cct
),
811 sc(_sc
), sync_env(_sc
->env
),
813 sub_conf(_sub
->sub_conf
) {
816 int operate() override
{
818 ceph_assert(sub_conf
->push_endpoint
);
819 yield
call(sub_conf
->push_endpoint
->send_to_completion_async(*event
.get(), sync_env
));
822 ldout(sync_env
->cct
, 10) << "failed to push event: " << event
->id
<<
823 " to endpoint: " << sub_conf
->push_endpoint_name
<< " ret=" << retcode
<< dendl
;
824 return set_cr_error(retcode
);
827 ldout(sync_env
->cct
, 20) << "event: " << event
->id
<<
828 " pushed to endpoint: " << sub_conf
->push_endpoint_name
<< dendl
;
829 return set_cr_done();
836 PSSubscription(RGWDataSyncCtx
*_sc
,
838 PSSubConfigRef
& _sub_conf
) : sc(_sc
), sync_env(_sc
->env
),
841 data_access(std::make_shared
<RGWDataAccess
>(sync_env
->store
)) {}
843 PSSubscription(RGWDataSyncCtx
*_sc
,
845 rgw_pubsub_sub_config
& user_sub_conf
) : sc(_sc
), sync_env(_sc
->env
),
847 sub_conf(std::make_shared
<PSSubConfig
>()),
848 data_access(std::make_shared
<RGWDataAccess
>(sync_env
->store
)) {
849 sub_conf
->from_user_conf(sync_env
->cct
, user_sub_conf
);
851 virtual ~PSSubscription() {
858 static PSSubscriptionRef
get_shared(RGWDataSyncCtx
*_sc
,
861 auto sub
= std::make_shared
<PSSubscription
>(_sc
, _env
, _sub_conf
);
862 sub
->init_cr
= new InitCR(_sc
, sub
);
867 int call_init_cr(RGWCoroutine
*caller
) {
868 return init_cr
->execute(caller
);
871 template<typename EventType
>
872 static RGWCoroutine
*store_event_cr(RGWDataSyncCtx
* const sc
, const PSSubscriptionRef
& sub
, const EventRef
<EventType
>& event
) {
873 return new StoreEventCR
<EventType
>(sc
, sub
, event
);
876 template<typename EventType
>
877 static RGWCoroutine
*push_event_cr(RGWDataSyncCtx
* const sc
, const PSSubscriptionRef
& sub
, const EventRef
<EventType
>& event
) {
878 return new PushEventCR
<EventType
>(sc
, sub
, event
);
886 RGWDataSyncEnv
*sync_env
;
889 std::map
<string
, PSSubscriptionRef
> subs
;
891 class GetSubCR
: public RGWSingletonCR
<PSSubscriptionRef
> {
893 RGWDataSyncEnv
*sync_env
;
898 PSSubscriptionRef
*ref
;
902 PSSubConfigRef sub_conf
;
903 rgw_pubsub_sub_config user_sub_conf
;
906 GetSubCR(RGWDataSyncCtx
*_sc
,
908 const rgw_user
& _owner
,
909 const string
& _sub_name
,
910 PSSubscriptionRef
*_ref
) : RGWSingletonCR
<PSSubscriptionRef
>(_sc
->cct
),
911 sc(_sc
), sync_env(_sc
->env
),
916 conf(mgr
->env
->conf
) {
920 int operate() override
{
923 if (!conf
->find_sub(sub_name
, &sub_conf
)) {
924 ldout(sync_env
->cct
, 10) << "failed to find subscription config: name=" << sub_name
<< dendl
;
925 mgr
->remove_get_sub(owner
, sub_name
);
926 return set_cr_error(-ENOENT
);
929 *ref
= PSSubscription::get_shared(sc
, mgr
->env
, sub_conf
);
931 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_pubsub_sub_config
>;
933 RGWUserPubSub
ups(sync_env
->store
, owner
);
935 ups
.get_sub_meta_obj(sub_name
, &obj
);
936 bool empty_on_enoent
= false;
937 call(new ReadInfoCR(sync_env
->async_rados
, sync_env
->store
->svc()->sysobj
,
939 &user_sub_conf
, empty_on_enoent
));
942 mgr
->remove_get_sub(owner
, sub_name
);
943 return set_cr_error(retcode
);
946 *ref
= PSSubscription::get_shared(sc
, mgr
->env
, user_sub_conf
);
949 yield (*ref
)->call_init_cr(this);
951 ldout(sync_env
->cct
, 10) << "failed to init subscription" << dendl
;
952 mgr
->remove_get_sub(owner
, sub_name
);
953 return set_cr_error(retcode
);
957 mgr
->subs
[sub_name
] = *ref
;
959 mgr
->remove_get_sub(owner
, sub_name
);
961 return set_cr_done();
966 void return_result(PSSubscriptionRef
*result
) override
{
967 ldout(cct
, 20) << __func__
<< "(): returning result: retcode=" << retcode
<< " resultp=" << (void *)result
<< dendl
;
974 string
sub_id(const rgw_user
& owner
, const string
& sub_name
) {
976 if (!owner
.empty()) {
977 owner_prefix
= owner
.to_str() + "/";
980 return owner_prefix
+ sub_name
;
983 std::map
<std::string
, GetSubCR
*> get_subs
;
985 GetSubCR
*& get_get_subs(const rgw_user
& owner
, const string
& name
) {
986 return get_subs
[sub_id(owner
, name
)];
989 void remove_get_sub(const rgw_user
& owner
, const string
& name
) {
990 get_subs
.erase(sub_id(owner
, name
));
993 bool find_sub_instance(const rgw_user
& owner
, const string
& sub_name
, PSSubscriptionRef
*sub
) {
994 auto iter
= subs
.find(sub_id(owner
, sub_name
));
995 if (iter
!= subs
.end()) {
1002 PSManager(RGWDataSyncCtx
*_sc
,
1003 PSEnvRef _env
) : sc(_sc
), sync_env(_sc
->env
),
1007 static PSManagerRef
get_shared(RGWDataSyncCtx
*_sc
,
1009 return std::shared_ptr
<PSManager
>(new PSManager(_sc
, _env
));
1012 static int call_get_subscription_cr(RGWDataSyncCtx
*sc
, PSManagerRef
& mgr
,
1013 RGWCoroutine
*caller
, const rgw_user
& owner
, const string
& sub_name
, PSSubscriptionRef
*ref
) {
1014 if (mgr
->find_sub_instance(owner
, sub_name
, ref
)) {
1015 /* found it! nothing to execute */
1016 ldout(sc
->cct
, 20) << __func__
<< "(): found sub instance" << dendl
;
1018 auto& gs
= mgr
->get_get_subs(owner
, sub_name
);
1020 ldout(sc
->cct
, 20) << __func__
<< "(): first get subs" << dendl
;
1021 gs
= new GetSubCR(sc
, mgr
, owner
, sub_name
, ref
);
1023 ldout(sc
->cct
, 20) << __func__
<< "(): executing get subs" << dendl
;
1024 return gs
->execute(caller
, ref
);
1027 friend class GetSubCR
;
1030 void PSEnv::init_instance(const RGWRealm
& realm
, uint64_t instance_id
, PSManagerRef
& mgr
) {
1032 conf
->init_instance(realm
, instance_id
);
1035 class RGWPSInitEnvCBCR
: public RGWCoroutine
{
1037 RGWDataSyncEnv
*sync_env
;
1041 rgw_user_create_params create_user
;
1042 rgw_get_user_info_params get_user_info
;
1044 RGWPSInitEnvCBCR(RGWDataSyncCtx
*_sc
,
1045 PSEnvRef
& _env
) : RGWCoroutine(_sc
->cct
),
1046 sc(_sc
), sync_env(_sc
->env
),
1047 env(_env
), conf(env
->conf
) {}
1048 int operate() override
{
1050 ldpp_dout(sync_env
->dpp
, 1) << ": init pubsub config zone=" << sc
->source_zone
<< dendl
;
1052 /* nothing to do here right now */
1053 create_user
.user
= conf
->user
;
1054 create_user
.max_buckets
= 0; /* unlimited */
1055 create_user
.display_name
= "pubsub";
1056 create_user
.generate_key
= false;
1057 yield
call(new RGWUserCreateCR(sync_env
->async_rados
, sync_env
->store
, create_user
, sync_env
->dpp
));
1058 if (retcode
< 0 && retcode
!= -ERR_USER_EXIST
) {
1059 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to create rgw user: ret=" << retcode
<< dendl
;
1060 return set_cr_error(retcode
);
1063 get_user_info
.user
= conf
->user
;
1064 yield
call(new RGWGetUserInfoCR(sync_env
->async_rados
, sync_env
->store
, get_user_info
, env
->data_user_info
));
1066 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to create rgw user: ret=" << retcode
<< dendl
;
1067 return set_cr_error(retcode
);
1070 ldpp_dout(sync_env
->dpp
, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env
->data_user_info
, true) << dendl
;
1073 return set_cr_done();
1079 bool match(const rgw_pubsub_topic_filter
& filter
, const std::string
& key_name
, rgw::notify::EventType event_type
) {
1080 if (!match(filter
.events
, event_type
)) {
1083 if (!match(filter
.s3_filter
.key_filter
, key_name
)) {
1089 class RGWPSFindBucketTopicsCR
: public RGWCoroutine
{
1091 RGWDataSyncEnv
*sync_env
;
1096 rgw::notify::EventType event_type
;
1100 rgw_raw_obj bucket_obj
;
1101 rgw_raw_obj user_obj
;
1102 rgw_pubsub_bucket_topics bucket_topics
;
1103 rgw_pubsub_user_topics user_topics
;
1106 RGWPSFindBucketTopicsCR(RGWDataSyncCtx
*_sc
,
1108 const rgw_user
& _owner
,
1109 const rgw_bucket
& _bucket
,
1110 const rgw_obj_key
& _key
,
1111 rgw::notify::EventType _event_type
,
1112 TopicsRef
*_topics
) : RGWCoroutine(_sc
->cct
),
1113 sc(_sc
), sync_env(_sc
->env
),
1118 event_type(_event_type
),
1119 ups(sync_env
->store
, owner
),
1121 *topics
= std::make_shared
<vector
<PSTopicConfigRef
> >();
1123 int operate() override
{
1125 ups
.get_bucket_meta_obj(bucket
, &bucket_obj
);
1126 ups
.get_user_meta_obj(&user_obj
);
1128 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_pubsub_bucket_topics
>;
1130 bool empty_on_enoent
= true;
1131 call(new ReadInfoCR(sync_env
->async_rados
, sync_env
->store
->svc()->sysobj
,
1133 &bucket_topics
, empty_on_enoent
));
1135 if (retcode
< 0 && retcode
!= -ENOENT
) {
1136 return set_cr_error(retcode
);
1139 ldout(sync_env
->cct
, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics
.topics
.size() << " topics for bucket " << bucket
<< dendl
;
1141 if (!bucket_topics
.topics
.empty()) {
1142 using ReadUserTopicsInfoCR
= RGWSimpleRadosReadCR
<rgw_pubsub_user_topics
>;
1144 bool empty_on_enoent
= true;
1145 call(new ReadUserTopicsInfoCR(sync_env
->async_rados
, sync_env
->store
->svc()->sysobj
,
1147 &user_topics
, empty_on_enoent
));
1149 if (retcode
< 0 && retcode
!= -ENOENT
) {
1150 return set_cr_error(retcode
);
1154 for (auto& titer
: bucket_topics
.topics
) {
1155 auto& topic_filter
= titer
.second
;
1156 auto& info
= topic_filter
.topic
;
1157 if (!match(topic_filter
, key
.name
, event_type
)) {
1160 std::shared_ptr
<PSTopicConfig
> tc
= std::make_shared
<PSTopicConfig
>();
1161 tc
->name
= info
.name
;
1162 tc
->subs
= user_topics
.topics
[info
.name
].subs
;
1163 tc
->opaque_data
= info
.opaque_data
;
1164 (*topics
)->push_back(tc
);
1167 env
->conf
->get_topics(sync_env
->cct
, bucket
, key
, topics
);
1168 return set_cr_done();
1174 class RGWPSHandleObjEventCR
: public RGWCoroutine
{
1175 RGWDataSyncCtx
* const sc
;
1177 const rgw_user
& owner
;
1178 const EventRef
<rgw_pubsub_event
> event
;
1179 const EventRef
<rgw_pubsub_s3_record
> record
;
1180 const TopicsRef topics
;
1181 const std::array
<rgw_user
, 2> owners
;
1182 bool has_subscriptions
;
1184 bool sub_conf_found
;
1185 PSSubscriptionRef sub
;
1186 std::array
<rgw_user
, 2>::const_iterator oiter
;
1187 std::vector
<PSTopicConfigRef
>::const_iterator titer
;
1188 std::set
<std::string
>::const_iterator siter
;
1189 int last_sub_conf_error
;
1192 RGWPSHandleObjEventCR(RGWDataSyncCtx
* const _sc
,
1193 const PSEnvRef _env
,
1194 const rgw_user
& _owner
,
1195 const EventRef
<rgw_pubsub_event
>& _event
,
1196 const EventRef
<rgw_pubsub_s3_record
>& _record
,
1197 const TopicsRef
& _topics
) : RGWCoroutine(_sc
->cct
),
1204 owners({owner
, rgw_user
{}}),
1205 has_subscriptions(false),
1206 event_handled(false) {}
1208 int operate() override
{
1210 ldout(sc
->cct
, 20) << ": handle event: obj: z=" << sc
->source_zone
1211 << " event=" << json_str("event", *event
, false)
1212 << " owner=" << owner
<< dendl
;
1214 ldout(sc
->cct
, 20) << "pubsub: " << topics
->size() << " topics found for path" << dendl
;
1216 // outside caller should check that
1217 ceph_assert(!topics
->empty());
1219 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_event_triggered
);
1221 // loop over all topics related to the bucket/object
1222 for (titer
= topics
->begin(); titer
!= topics
->end(); ++titer
) {
1223 ldout(sc
->cct
, 20) << ": notification for " << event
->source
<< ": topic=" <<
1224 (*titer
)->name
<< ", has " << (*titer
)->subs
.size() << " subscriptions" << dendl
;
1225 // loop over all subscriptions of the topic
1226 for (siter
= (*titer
)->subs
.begin(); siter
!= (*titer
)->subs
.end(); ++siter
) {
1227 ldout(sc
->cct
, 20) << ": subscription: " << *siter
<< dendl
;
1228 has_subscriptions
= true;
1229 sub_conf_found
= false;
1230 // try to read subscription configuration from global/user cond
1231 // configuration is considered missing only if does not exist in either
1232 for (oiter
= owners
.begin(); oiter
!= owners
.end(); ++oiter
) {
1233 yield
PSManager::call_get_subscription_cr(sc
, env
->manager
, this, *oiter
, *siter
, &sub
);
1235 if (sub_conf_found
) {
1236 // not a real issue, sub conf already found
1239 last_sub_conf_error
= retcode
;
1242 sub_conf_found
= true;
1243 if (sub
->sub_conf
->s3_id
.empty()) {
1244 // subscription was not made by S3 compatible API
1245 ldout(sc
->cct
, 20) << "storing event for subscription=" << *siter
<< " owner=" << *oiter
<< " ret=" << retcode
<< dendl
;
1246 yield
call(PSSubscription::store_event_cr(sc
, sub
, event
));
1248 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_store_fail
);
1249 ldout(sc
->cct
, 1) << "ERROR: failed to store event for subscription=" << *siter
<< " ret=" << retcode
<< dendl
;
1251 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_store_ok
);
1252 event_handled
= true;
1254 if (sub
->sub_conf
->push_endpoint
) {
1255 ldout(sc
->cct
, 20) << "push event for subscription=" << *siter
<< " owner=" << *oiter
<< " ret=" << retcode
<< dendl
;
1256 yield
call(PSSubscription::push_event_cr(sc
, sub
, event
));
1258 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
1259 ldout(sc
->cct
, 1) << "ERROR: failed to push event for subscription=" << *siter
<< " ret=" << retcode
<< dendl
;
1261 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_ok
);
1262 event_handled
= true;
1266 // subscription was made by S3 compatible API
1267 ldout(sc
->cct
, 20) << "storing record for subscription=" << *siter
<< " owner=" << *oiter
<< " ret=" << retcode
<< dendl
;
1268 record
->configurationId
= sub
->sub_conf
->s3_id
;
1269 record
->opaque_data
= (*titer
)->opaque_data
;
1270 yield
call(PSSubscription::store_event_cr(sc
, sub
, record
));
1272 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_store_fail
);
1273 ldout(sc
->cct
, 1) << "ERROR: failed to store record for subscription=" << *siter
<< " ret=" << retcode
<< dendl
;
1275 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_store_ok
);
1276 event_handled
= true;
1278 if (sub
->sub_conf
->push_endpoint
) {
1279 ldout(sc
->cct
, 20) << "push record for subscription=" << *siter
<< " owner=" << *oiter
<< " ret=" << retcode
<< dendl
;
1280 yield
call(PSSubscription::push_event_cr(sc
, sub
, record
));
1282 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
1283 ldout(sc
->cct
, 1) << "ERROR: failed to push record for subscription=" << *siter
<< " ret=" << retcode
<< dendl
;
1285 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_ok
);
1286 event_handled
= true;
1291 if (!sub_conf_found
) {
1292 // could not find conf for subscription at user or global levels
1293 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_missing_conf
);
1294 ldout(sc
->cct
, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
1295 << " ret=" << last_sub_conf_error
<< dendl
;
1296 if (retcode
== -ENOENT
) {
1297 // missing subscription info should be reflected back as invalid argument
1298 // and not as missing object
1304 if (has_subscriptions
&& !event_handled
) {
1305 // event is considered "lost" of it has subscriptions on any of its topics
1306 // but it was not stored in, or pushed to, any of them
1307 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_event_lost
);
1310 return set_cr_error(retcode
);
1312 return set_cr_done();
1318 // coroutine invoked on remote object creation
1319 class RGWPSHandleRemoteObjCBCR
: public RGWStatRemoteObjCBCR
{
1321 rgw_bucket_sync_pipe sync_pipe
;
1323 std::optional
<uint64_t> versioned_epoch
;
1324 EventRef
<rgw_pubsub_event
> event
;
1325 EventRef
<rgw_pubsub_s3_record
> record
;
1328 RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx
*_sc
,
1329 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
,
1330 PSEnvRef _env
, std::optional
<uint64_t> _versioned_epoch
,
1331 TopicsRef
& _topics
) : RGWStatRemoteObjCBCR(_sc
, _sync_pipe
.info
.source_bs
.bucket
, _key
),
1333 sync_pipe(_sync_pipe
),
1335 versioned_epoch(_versioned_epoch
),
1338 int operate() override
{
1340 ldout(sc
->cct
, 20) << ": stat of remote obj: z=" << sc
->source_zone
1341 << " b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " size=" << size
<< " mtime=" << mtime
1342 << " attrs=" << attrs
<< dendl
;
1344 std::vector
<std::pair
<std::string
, std::string
> > attrs
;
1345 for (auto& attr
: attrs
) {
1346 std::string k
= attr
.first
;
1347 if (boost::algorithm::starts_with(k
, RGW_ATTR_PREFIX
)) {
1348 k
= k
.substr(sizeof(RGW_ATTR_PREFIX
) - 1);
1350 attrs
.push_back(std::make_pair(k
, attr
.second
));
1352 // at this point we don't know whether we need the ceph event or S3 record
1353 // this is why both are created here, once we have information about the
1354 // subscription, we will store/push only the relevant ones
1355 make_event_ref(sc
->cct
,
1356 sync_pipe
.info
.source_bs
.bucket
, key
,
1358 rgw::notify::ObjectCreated
, &event
);
1359 make_s3_record_ref(sc
->cct
,
1360 sync_pipe
.info
.source_bs
.bucket
, sync_pipe
.dest_bucket_info
.owner
, key
,
1362 rgw::notify::ObjectCreated
, &record
);
1365 yield
call(new RGWPSHandleObjEventCR(sc
, env
, sync_pipe
.source_bucket_info
.owner
, event
, record
, topics
));
1367 return set_cr_error(retcode
);
1369 return set_cr_done();
1375 class RGWPSHandleRemoteObjCR
: public RGWCallStatRemoteObjCR
{
1376 rgw_bucket_sync_pipe sync_pipe
;
1378 std::optional
<uint64_t> versioned_epoch
;
1381 RGWPSHandleRemoteObjCR(RGWDataSyncCtx
*_sc
,
1382 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
,
1383 PSEnvRef _env
, std::optional
<uint64_t> _versioned_epoch
,
1384 TopicsRef
& _topics
) : RGWCallStatRemoteObjCR(_sc
, _sync_pipe
.info
.source_bs
.bucket
, _key
),
1385 sync_pipe(_sync_pipe
),
1386 env(_env
), versioned_epoch(_versioned_epoch
),
1390 ~RGWPSHandleRemoteObjCR() override
{}
1392 RGWStatRemoteObjCBCR
*allocate_callback() override
{
1393 return new RGWPSHandleRemoteObjCBCR(sc
, sync_pipe
, key
, env
, versioned_epoch
, topics
);
1397 class RGWPSHandleObjCreateCR
: public RGWCoroutine
{
1399 rgw_bucket_sync_pipe sync_pipe
;
1402 std::optional
<uint64_t> versioned_epoch
;
1405 RGWPSHandleObjCreateCR(RGWDataSyncCtx
*_sc
,
1406 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
,
1407 PSEnvRef _env
, std::optional
<uint64_t> _versioned_epoch
) : RGWCoroutine(_sc
->cct
),
1409 sync_pipe(_sync_pipe
),
1412 versioned_epoch(_versioned_epoch
) {
1415 ~RGWPSHandleObjCreateCR() override
{}
1417 int operate() override
{
1419 yield
call(new RGWPSFindBucketTopicsCR(sc
, env
, sync_pipe
.dest_bucket_info
.owner
,
1420 sync_pipe
.info
.source_bs
.bucket
, key
,
1421 rgw::notify::ObjectCreated
,
1424 ldout(sc
->cct
, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode
<< dendl
;
1425 return set_cr_error(retcode
);
1427 if (topics
->empty()) {
1428 ldout(sc
->cct
, 20) << "no topics found for " << sync_pipe
.info
.source_bs
.bucket
<< "/" << key
<< dendl
;
1429 return set_cr_done();
1431 yield
call(new RGWPSHandleRemoteObjCR(sc
, sync_pipe
, key
, env
, versioned_epoch
, topics
));
1433 return set_cr_error(retcode
);
1435 return set_cr_done();
1441 // coroutine invoked on remote object deletion
1442 class RGWPSGenericObjEventCBCR
: public RGWCoroutine
{
1448 ceph::real_time mtime
;
1449 rgw::notify::EventType event_type
;
1450 EventRef
<rgw_pubsub_event
> event
;
1451 EventRef
<rgw_pubsub_s3_record
> record
;
1454 RGWPSGenericObjEventCBCR(RGWDataSyncCtx
*_sc
,
1456 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
, const ceph::real_time
& _mtime
,
1457 rgw::notify::EventType _event_type
) : RGWCoroutine(_sc
->cct
),
1460 owner(_sync_pipe
.dest_bucket_info
.owner
),
1461 bucket(_sync_pipe
.dest_bucket_info
.bucket
),
1463 mtime(_mtime
), event_type(_event_type
) {}
1464 int operate() override
{
1466 ldout(sc
->cct
, 20) << ": remove remote obj: z=" << sc
->source_zone
1467 << " b=" << bucket
<< " k=" << key
<< " mtime=" << mtime
<< dendl
;
1468 yield
call(new RGWPSFindBucketTopicsCR(sc
, env
, owner
, bucket
, key
, event_type
, &topics
));
1470 ldout(sc
->cct
, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode
<< dendl
;
1471 return set_cr_error(retcode
);
1473 if (topics
->empty()) {
1474 ldout(sc
->cct
, 20) << "no topics found for " << bucket
<< "/" << key
<< dendl
;
1475 return set_cr_done();
1477 // at this point we don't know whether we need the ceph event or S3 record
1478 // this is why both are created here, once we have information about the
1479 // subscription, we will store/push only the relevant ones
1480 make_event_ref(sc
->cct
,
1483 event_type
, &event
);
1484 make_s3_record_ref(sc
->cct
,
1487 event_type
, &record
);
1488 yield
call(new RGWPSHandleObjEventCR(sc
, env
, owner
, event
, record
, topics
));
1490 return set_cr_error(retcode
);
1492 return set_cr_done();
1499 class RGWPSDataSyncModule
: public RGWDataSyncModule
{
1504 RGWPSDataSyncModule(CephContext
*cct
, const JSONFormattable
& config
) : env(std::make_shared
<PSEnv
>()), conf(env
->conf
) {
1505 env
->init(cct
, config
);
1508 ~RGWPSDataSyncModule() override
{}
1510 void init(RGWDataSyncCtx
*sc
, uint64_t instance_id
) override
{
1511 auto sync_env
= sc
->env
;
1512 PSManagerRef mgr
= PSManager::get_shared(sc
, env
);
1513 env
->init_instance(sync_env
->svc
->zone
->get_realm(), instance_id
, mgr
);
1516 RGWCoroutine
*start_sync(RGWDataSyncCtx
*sc
) override
{
1517 ldout(sc
->cct
, 5) << conf
->id
<< ": start" << dendl
;
1518 return new RGWPSInitEnvCBCR(sc
, env
);
1521 RGWCoroutine
*sync_object(RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
,
1522 rgw_obj_key
& key
, std::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
1523 ldout(sc
->cct
, 10) << conf
->id
<< ": sync_object: b=" << sync_pipe
<<
1524 " k=" << key
<< " versioned_epoch=" << versioned_epoch
.value_or(0) << dendl
;
1525 return new RGWPSHandleObjCreateCR(sc
, sync_pipe
, key
, env
, versioned_epoch
);
1528 RGWCoroutine
*remove_object(RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
,
1529 rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
1530 ldout(sc
->cct
, 10) << conf
->id
<< ": rm_object: b=" << sync_pipe
<<
1531 " k=" << key
<< " mtime=" << mtime
<< " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
1532 return new RGWPSGenericObjEventCBCR(sc
, env
, sync_pipe
, key
, mtime
, rgw::notify::ObjectRemovedDelete
);
1535 RGWCoroutine
*create_delete_marker(RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
,
1536 rgw_obj_key
& key
, real_time
& mtime
, rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
1537 ldout(sc
->cct
, 10) << conf
->id
<< ": create_delete_marker: b=" << sync_pipe
<<
1538 " k=" << key
<< " mtime=" << mtime
<< " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
1539 return new RGWPSGenericObjEventCBCR(sc
, env
, sync_pipe
, key
, mtime
, rgw::notify::ObjectRemovedDeleteMarkerCreated
);
1542 PSConfigRef
& get_conf() { return conf
; }
1545 RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext
*cct
, const JSONFormattable
& config
)
1547 data_handler
= std::unique_ptr
<RGWPSDataSyncModule
>(new RGWPSDataSyncModule(cct
, config
));
1548 const std::string jconf
= json_str("conf", *data_handler
->get_conf());
1550 if (!p
.parse(jconf
.c_str(), jconf
.size())) {
1551 ldout(cct
, 1) << "ERROR: failed to parse sync module effective conf: " << jconf
<< dendl
;
1552 effective_conf
= config
;
1554 effective_conf
.decode_json(&p
);
1556 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
1557 if (!rgw::amqp::init(cct
)) {
1558 ldout(cct
, 1) << "ERROR: failed to initialize AMQP manager in pubsub sync module" << dendl
;
1561 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
1562 if (!rgw::kafka::init(cct
)) {
1563 ldout(cct
, 1) << "ERROR: failed to initialize Kafka manager in pubsub sync module" << dendl
;
1568 RGWPSSyncModuleInstance::~RGWPSSyncModuleInstance() {
1569 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
1570 rgw::amqp::shutdown();
1572 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
1573 rgw::kafka::shutdown();
1577 RGWDataSyncModule
*RGWPSSyncModuleInstance::get_data_handler()
1579 return data_handler
.get();
1582 RGWRESTMgr
*RGWPSSyncModuleInstance::get_rest_filter(int dialect
, RGWRESTMgr
*orig
) {
1583 if (dialect
!= RGW_REST_S3
) {
1586 return new RGWRESTMgr_PubSub();
1589 bool RGWPSSyncModuleInstance::should_full_sync() const {
1590 return data_handler
->get_conf()->start_with_full_sync
;
1593 int RGWPSSyncModule::create_instance(CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
) {
1594 instance
->reset(new RGWPSSyncModuleInstance(cct
, config
));