1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "services/svc_zone.h"
5 #include "rgw_common.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_sync_module.h"
8 #include "rgw_data_sync.h"
9 #include "rgw_sync_module_pubsub.h"
10 #include "rgw_sync_module_pubsub_rest.h"
11 #include "rgw_rest_conn.h"
12 #include "rgw_cr_rados.h"
13 #include "rgw_cr_rest.h"
14 #include "rgw_cr_tools.h"
16 #include "rgw_pubsub.h"
17 #include "rgw_pubsub_push.h"
18 #include "rgw_notify_event_type.h"
19 #include "rgw_perf_counters.h"
21 #include <boost/algorithm/hex.hpp>
22 #include <boost/asio/yield.hpp>
24 #define dout_subsys ceph_subsys_rgw
27 #define PUBSUB_EVENTS_RETENTION_DEFAULT 7
34 "tenant": <tenant>, # default: <empty>
35 "uid": <uid>, # default: "pubsub"
36 "data_bucket_prefix": <prefix> # default: "pubsub-"
37 "data_oid_prefix": <prefix> #
38 "events_retention_days": <int> # default: 7
39 "start_with_full_sync" <bool> # default: false
44 "path": <notification-path>, # this can be either an explicit path: <bucket>, or <bucket>/<object>,
45 # or a prefix if it ends with a wildcard
52 "name": <subscription-name>,
54 "push_endpoint": <endpoint>,
55 "push_endpoint_args:" <arg list>. # any push endpoint specific args (include all args)
56 "data_bucket": <bucket>, # override name of bucket where subscription data will be store
57 "data_oid_prefix": <prefix> # set prefix for subscription data object ids
58 "s3_id": <id> # in case of S3 compatible notifications, the notification ID will be set here
66 // utility function to convert the args list from string format
67 // (ampresend separated with equal sign) to prased structure
68 RGWHTTPArgs
string_to_args(const std::string
& str_args
) {
78 std::string push_endpoint_name
;
79 std::string push_endpoint_args
;
80 std::string data_bucket_name
;
81 std::string data_oid_prefix
;
83 std::string arn_topic
;
84 RGWPubSubEndpoint::Ptr push_endpoint
;
86 void from_user_conf(CephContext
*cct
, const rgw_pubsub_sub_config
& uc
) {
89 push_endpoint_name
= uc
.dest
.push_endpoint
;
90 data_bucket_name
= uc
.dest
.bucket_name
;
91 data_oid_prefix
= uc
.dest
.oid_prefix
;
93 arn_topic
= uc
.dest
.arn_topic
;
94 if (!push_endpoint_name
.empty()) {
95 push_endpoint_args
= uc
.dest
.push_endpoint_args
;
97 push_endpoint
= RGWPubSubEndpoint::create(push_endpoint_name
, arn_topic
, string_to_args(push_endpoint_args
), cct
);
98 ldout(cct
, 20) << "push endpoint created: " << push_endpoint
->to_str() << dendl
;
99 } catch (const RGWPubSubEndpoint::configuration_error
& e
) {
100 ldout(cct
, 1) << "ERROR: failed to create push endpoint: "
101 << push_endpoint_name
<< " due to: " << e
.what() << dendl
;
106 void dump(Formatter
*f
) const {
107 encode_json("name", name
, f
);
108 encode_json("topic", topic
, f
);
109 encode_json("push_endpoint", push_endpoint_name
, f
);
110 encode_json("push_endpoint_args", push_endpoint_args
, f
);
111 encode_json("data_bucket_name", data_bucket_name
, f
);
112 encode_json("data_oid_prefix", data_oid_prefix
, f
);
113 encode_json("s3_id", s3_id
, f
);
116 void init(CephContext
*cct
, const JSONFormattable
& config
,
117 const string
& data_bucket_prefix
,
118 const string
& default_oid_prefix
) {
119 name
= config
["name"];
120 topic
= config
["topic"];
121 push_endpoint_name
= config
["push_endpoint"];
122 string default_bucket_name
= data_bucket_prefix
+ name
;
123 data_bucket_name
= config
["data_bucket"](default_bucket_name
.c_str());
124 data_oid_prefix
= config
["data_oid_prefix"](default_oid_prefix
.c_str());
125 s3_id
= config
["s3_id"];
126 arn_topic
= config
["arn_topic"];
127 if (!push_endpoint_name
.empty()) {
128 push_endpoint_args
= config
["push_endpoint_args"];
130 push_endpoint
= RGWPubSubEndpoint::create(push_endpoint_name
, arn_topic
, string_to_args(push_endpoint_args
), cct
);
131 ldout(cct
, 20) << "push endpoint created: " << push_endpoint
->to_str() << dendl
;
132 } catch (const RGWPubSubEndpoint::configuration_error
& e
) {
133 ldout(cct
, 1) << "ERROR: failed to create push endpoint: "
134 << push_endpoint_name
<< " due to: " << e
.what() << dendl
;
140 using PSSubConfigRef
= std::shared_ptr
<PSSubConfig
>;
142 struct PSTopicConfig
{
144 std::set
<std::string
> subs
;
145 std::string opaque_data
;
147 void dump(Formatter
*f
) const {
148 encode_json("name", name
, f
);
149 encode_json("subs", subs
, f
);
150 encode_json("opaque", opaque_data
, f
);
154 struct PSNotificationConfig
{
156 string path
; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */
158 bool is_prefix
{false};
161 void dump(Formatter
*f
) const {
162 encode_json("id", id
, f
);
163 encode_json("path", path
, f
);
164 encode_json("topic", topic
, f
);
165 encode_json("is_prefix", is_prefix
, f
);
168 void init(CephContext
*cct
, const JSONFormattable
& config
) {
169 path
= config
["path"];
170 if (!path
.empty() && path
[path
.size() - 1] == '*') {
171 path
= path
.substr(0, path
.size() - 1);
174 topic
= config
["topic"];
179 static string
json_str(const char *name
, const T
& obj
, bool pretty
= false)
182 JSONFormatter
f(pretty
);
184 encode_json(name
, obj
, &f
);
190 using PSTopicConfigRef
= std::shared_ptr
<PSTopicConfig
>;
191 using TopicsRef
= std::shared_ptr
<std::vector
<PSTopicConfigRef
>>;
194 const std::string id
{"pubsub"};
196 std::string data_bucket_prefix
;
197 std::string data_oid_prefix
;
199 int events_retention_days
{0};
201 uint64_t sync_instance
{0};
204 /* FIXME: no hard coded buckets, we'll have configurable topics */
205 std::map
<std::string
, PSSubConfigRef
> subs
;
206 std::map
<std::string
, PSTopicConfigRef
> topics
;
207 std::multimap
<std::string
, PSNotificationConfig
> notifications
;
209 bool start_with_full_sync
{false};
211 void dump(Formatter
*f
) const {
212 encode_json("id", id
, f
);
213 encode_json("user", user
, f
);
214 encode_json("data_bucket_prefix", data_bucket_prefix
, f
);
215 encode_json("data_oid_prefix", data_oid_prefix
, f
);
216 encode_json("events_retention_days", events_retention_days
, f
);
217 encode_json("sync_instance", sync_instance
, f
);
218 encode_json("max_id", max_id
, f
);
220 Formatter::ArraySection
section(*f
, "subs");
221 for (auto& sub
: subs
) {
222 encode_json("sub", *sub
.second
, f
);
226 Formatter::ArraySection
section(*f
, "topics");
227 for (auto& topic
: topics
) {
228 encode_json("topic", *topic
.second
, f
);
232 Formatter::ObjectSection
section(*f
, "notifications");
234 for (auto& notif
: notifications
) {
235 const string
& n
= notif
.first
;
240 f
->open_array_section(n
.c_str());
243 encode_json("notifications", notif
.second
, f
);
249 encode_json("start_with_full_sync", start_with_full_sync
, f
);
252 void init(CephContext
*cct
, const JSONFormattable
& config
) {
253 string uid
= config
["uid"]("pubsub");
254 user
= rgw_user(config
["tenant"], uid
);
255 data_bucket_prefix
= config
["data_bucket_prefix"]("pubsub-");
256 data_oid_prefix
= config
["data_oid_prefix"];
257 events_retention_days
= config
["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT
);
259 for (auto& c
: config
["notifications"].array()) {
260 PSNotificationConfig nc
;
263 notifications
.insert(std::make_pair(nc
.path
, nc
));
265 PSTopicConfig topic_config
= { .name
= nc
.topic
};
266 topics
[nc
.topic
] = make_shared
<PSTopicConfig
>(topic_config
);
268 for (auto& c
: config
["subscriptions"].array()) {
269 auto sc
= std::make_shared
<PSSubConfig
>();
270 sc
->init(cct
, c
, data_bucket_prefix
, data_oid_prefix
);
272 auto iter
= topics
.find(sc
->topic
);
273 if (iter
!= topics
.end()) {
274 iter
->second
->subs
.insert(sc
->name
);
277 start_with_full_sync
= config
["start_with_full_sync"](false);
279 ldout(cct
, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl
;
282 void init_instance(const RGWRealm
& realm
, uint64_t instance_id
) {
283 sync_instance
= instance_id
;
286 void get_topics(CephContext
*cct
, const rgw_bucket
& bucket
, const rgw_obj_key
& key
, TopicsRef
*result
) {
287 const std::string path
= bucket
.name
+ "/" + key
.name
;
289 auto iter
= notifications
.upper_bound(path
);
290 if (iter
== notifications
.begin()) {
296 if (iter
->first
.size() > path
.size()) {
299 if (path
.compare(0, iter
->first
.size(), iter
->first
) != 0) {
303 PSNotificationConfig
& target
= iter
->second
;
305 if (!target
.is_prefix
&&
306 path
.size() != iter
->first
.size()) {
310 auto topic
= topics
.find(target
.topic
);
311 if (topic
== topics
.end()) {
315 ldout(cct
, 20) << ": found topic for path=" << bucket
<< "/" << key
<< ": id=" << target
.id
<<
316 " target_path=" << target
.path
<< ", topic=" << target
.topic
<< dendl
;
317 (*result
)->push_back(topic
->second
);
318 } while (iter
!= notifications
.begin());
321 bool find_sub(const string
& name
, PSSubConfigRef
*ref
) {
322 auto iter
= subs
.find(name
);
323 if (iter
!= subs
.end()) {
331 using PSConfigRef
= std::shared_ptr
<PSConfig
>;
332 template<typename EventType
>
333 using EventRef
= std::shared_ptr
<EventType
>;
335 struct objstore_event
{
337 const rgw_bucket
& bucket
;
338 const rgw_obj_key
& key
;
339 const ceph::real_time
& mtime
;
340 const std::vector
<std::pair
<std::string
, std::string
> > *attrs
;
342 objstore_event(const rgw_bucket
& _bucket
,
343 const rgw_obj_key
& _key
,
344 const ceph::real_time
& _mtime
,
345 const std::vector
<std::pair
<std::string
, std::string
> > *_attrs
) : bucket(_bucket
),
353 hash
.update(bucket
.bucket_id
);
354 hash
.update(key
.name
);
355 hash
.update(key
.instance
);
358 assert(etag
.size() > 8);
360 return etag
.substr(0, 8);
363 void dump(Formatter
*f
) const {
365 Formatter::ObjectSection
s(*f
, "bucket");
366 encode_json("name", bucket
.name
, f
);
367 encode_json("tenant", bucket
.tenant
, f
);
368 encode_json("bucket_id", bucket
.bucket_id
, f
);
371 Formatter::ObjectSection
s(*f
, "key");
372 encode_json("name", key
.name
, f
);
373 encode_json("instance", key
.instance
, f
);
376 encode_json("mtime", mt
, f
);
377 Formatter::ObjectSection
s(*f
, "attrs");
379 for (auto& attr
: *attrs
) {
380 encode_json(attr
.first
.c_str(), attr
.second
.c_str(), f
);
386 static void make_event_ref(CephContext
*cct
, const rgw_bucket
& bucket
,
387 const rgw_obj_key
& key
,
388 const ceph::real_time
& mtime
,
389 const std::vector
<std::pair
<std::string
, std::string
> > *attrs
,
390 rgw::notify::EventType event_type
,
391 EventRef
<rgw_pubsub_event
> *event
) {
392 *event
= std::make_shared
<rgw_pubsub_event
>();
394 EventRef
<rgw_pubsub_event
>& e
= *event
;
395 e
->event_name
= rgw::notify::to_ceph_string(event_type
);
396 e
->source
= bucket
.name
+ "/" + key
.name
;
397 e
->timestamp
= real_clock::now();
399 objstore_event
oevent(bucket
, key
, mtime
, attrs
);
401 const utime_t
ts(e
->timestamp
);
402 set_event_id(e
->id
, oevent
.get_hash(), ts
);
404 encode_json("info", oevent
, &e
->info
);
407 static void make_s3_record_ref(CephContext
*cct
, const rgw_bucket
& bucket
,
408 const rgw_user
& owner
,
409 const rgw_obj_key
& key
,
410 const ceph::real_time
& mtime
,
411 const std::vector
<std::pair
<std::string
, std::string
> > *attrs
,
412 rgw::notify::EventType event_type
,
413 EventRef
<rgw_pubsub_s3_record
> *record
) {
414 *record
= std::make_shared
<rgw_pubsub_s3_record
>();
416 EventRef
<rgw_pubsub_s3_record
>& r
= *record
;
417 r
->eventTime
= mtime
;
418 r
->eventName
= rgw::notify::to_string(event_type
);
419 // userIdentity: not supported in sync module
420 // x_amz_request_id: not supported in sync module
421 // x_amz_id_2: not supported in sync module
422 // configurationId is filled from subscription configuration
423 r
->bucket_name
= bucket
.name
;
424 r
->bucket_ownerIdentity
= owner
.to_str();
425 r
->bucket_arn
= to_string(rgw::ARN(bucket
));
426 r
->bucket_id
= bucket
.bucket_id
; // rgw extension
427 r
->object_key
= key
.name
;
428 // object_size not supported in sync module
429 objstore_event
oevent(bucket
, key
, mtime
, attrs
);
430 r
->object_etag
= oevent
.get_hash();
431 r
->object_versionId
= key
.instance
;
433 // use timestamp as per key sequence id (hex encoded)
434 const utime_t
ts(real_clock::now());
435 boost::algorithm::hex((const char*)&ts
, (const char*)&ts
+ sizeof(utime_t
),
436 std::back_inserter(r
->object_sequencer
));
438 set_event_id(r
->id
, r
->object_etag
, ts
);
442 using PSManagerRef
= std::shared_ptr
<PSManager
>;
446 shared_ptr
<RGWUserInfo
> data_user_info
;
447 PSManagerRef manager
;
449 PSEnv() : conf(make_shared
<PSConfig
>()),
450 data_user_info(make_shared
<RGWUserInfo
>()) {}
452 void init(CephContext
*cct
, const JSONFormattable
& config
) {
453 conf
->init(cct
, config
);
456 void init_instance(const RGWRealm
& realm
, uint64_t instance_id
, PSManagerRef
& mgr
);
459 using PSEnvRef
= std::shared_ptr
<PSEnv
>;
461 template<typename EventType
>
463 const EventRef
<EventType
> event
;
466 PSEvent(const EventRef
<EventType
>& _event
) : event(_event
) {}
468 void format(bufferlist
*bl
) const {
469 bl
->append(json_str("", *event
));
472 void encode_event(bufferlist
& bl
) const {
476 const string
& id() const {
482 class RGWSingletonCR
: public RGWCoroutine
{
483 friend class WrapperCR
;
485 boost::asio::coroutine wrapper_state
;
490 RGWCoroutine
*cr
{nullptr};
493 using WaiterInfoRef
= std::shared_ptr
<WaiterInfo
>;
495 deque
<WaiterInfoRef
> waiters
;
497 void add_waiter(RGWCoroutine
*cr
, T
*result
) {
498 auto waiter
= std::make_shared
<WaiterInfo
>();
500 waiter
->result
= result
;
501 waiters
.push_back(waiter
);
504 bool get_next_waiter(WaiterInfoRef
*waiter
) {
505 if (waiters
.empty()) {
510 *waiter
= waiters
.front();
515 int operate_wrapper() override
{
516 reenter(&wrapper_state
) {
518 ldout(cct
, 20) << __func__
<< "(): operate_wrapper() -> operate()" << dendl
;
519 operate_ret
= operate();
520 if (operate_ret
< 0) {
521 ldout(cct
, 20) << *this << ": operate() returned r=" << operate_ret
<< dendl
;
528 ldout(cct
, 20) << __func__
<< "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters
.size() << " waiters" << dendl
;
529 /* we're done, can't yield anymore */
531 WaiterInfoRef waiter
;
532 while (get_next_waiter(&waiter
)) {
533 ldout(cct
, 20) << __func__
<< "(): RGWSingletonCR: waking up waiter" << dendl
;
534 waiter
->cr
->set_retcode(retcode
);
535 waiter
->cr
->set_sleeping(false);
536 return_result(waiter
->result
);
545 virtual void return_result(T
*result
) {}
548 RGWSingletonCR(CephContext
*_cct
)
549 : RGWCoroutine(_cct
) {}
551 int execute(RGWCoroutine
*caller
, T
*result
= nullptr) {
553 ldout(cct
, 20) << __func__
<< "(): singleton not started, starting" << dendl
;
557 } else if (!is_done()) {
558 ldout(cct
, 20) << __func__
<< "(): singleton not done yet, registering as waiter" << dendl
;
560 add_waiter(caller
, result
);
561 caller
->set_sleeping(true);
565 ldout(cct
, 20) << __func__
<< "(): singleton done, returning retcode=" << retcode
<< dendl
;
566 caller
->set_retcode(retcode
);
567 return_result(result
);
573 class PSSubscription
;
574 using PSSubscriptionRef
= std::shared_ptr
<PSSubscription
>;
576 class PSSubscription
{
579 friend class RGWPSHandleObjEventCR
;
582 RGWDataSyncEnv
*sync_env
;
584 PSSubConfigRef sub_conf
;
585 std::shared_ptr
<rgw_get_bucket_info_result
> get_bucket_info_result
;
586 RGWBucketInfo
*bucket_info
{nullptr};
587 RGWDataAccessRef data_access
;
588 RGWDataAccess::BucketRef bucket
;
590 InitCR
*init_cr
{nullptr};
592 class InitBucketLifecycleCR
: public RGWCoroutine
{
594 RGWDataSyncEnv
*sync_env
;
600 rgw_bucket_lifecycle_config_params lc_config
;
603 InitBucketLifecycleCR(RGWDataSyncCtx
*_sc
,
605 RGWBucketInfo
& _bucket_info
,
606 std::map
<string
, bufferlist
>& _bucket_attrs
) : RGWCoroutine(_sc
->cct
),
607 sc(_sc
), sync_env(_sc
->env
),
609 lc_config
.bucket_info
= _bucket_info
;
610 lc_config
.bucket_attrs
= _bucket_attrs
;
611 retention_days
= conf
->events_retention_days
;
614 int operate() override
{
617 rule
.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days
);
620 /* maybe we already have it configured? */
621 RGWLifecycleConfiguration old_config
;
622 auto aiter
= lc_config
.bucket_attrs
.find(RGW_ATTR_LC
);
623 if (aiter
!= lc_config
.bucket_attrs
.end()) {
624 bufferlist::const_iterator iter
{&aiter
->second
};
626 old_config
.decode(iter
);
627 } catch (const buffer::error
& e
) {
628 ldpp_dout(sync_env
->dpp
, 0) << __func__
<< "(): decode life cycle config failed" << dendl
;
632 auto old_rules
= old_config
.get_rule_map();
633 for (auto ori
: old_rules
) {
634 auto& old_rule
= ori
.second
;
636 if (old_rule
.get_prefix().empty() &&
637 old_rule
.get_expiration().get_days() == retention_days
&&
638 old_rule
.is_enabled()) {
639 ldpp_dout(sync_env
->dpp
, 20) << "no need to set lifecycle rule on bucket, existing rule matches config" << dendl
;
640 return set_cr_done();
645 lc_config
.config
.add_rule(rule
);
646 yield
call(new RGWBucketLifecycleConfigCR(sync_env
->async_rados
,
651 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode
<< dendl
;
652 return set_cr_error(retcode
);
655 return set_cr_done();
661 class InitCR
: public RGWSingletonCR
<bool> {
663 RGWDataSyncEnv
*sync_env
;
664 PSSubscriptionRef sub
;
665 rgw_get_bucket_info_params get_bucket_info
;
666 rgw_bucket_create_local_params create_bucket
;
668 PSSubConfigRef
& sub_conf
;
672 InitCR(RGWDataSyncCtx
*_sc
,
673 PSSubscriptionRef
& _sub
) : RGWSingletonCR
<bool>(_sc
->cct
),
674 sc(_sc
), sync_env(_sc
->env
),
675 sub(_sub
), conf(sub
->env
->conf
),
676 sub_conf(sub
->sub_conf
) {
679 int operate() override
{
681 get_bucket_info
.tenant
= conf
->user
.tenant
;
682 get_bucket_info
.bucket_name
= sub_conf
->data_bucket_name
;
683 sub
->get_bucket_info_result
= make_shared
<rgw_get_bucket_info_result
>();
685 for (i
= 0; i
< 2; ++i
) {
686 yield
call(new RGWGetBucketInfoCR(sync_env
->async_rados
,
689 sub
->get_bucket_info_result
));
690 if (retcode
< 0 && retcode
!= -ENOENT
) {
691 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to geting bucket info: " << "tenant="
692 << get_bucket_info
.tenant
<< " name=" << get_bucket_info
.bucket_name
<< ": ret=" << retcode
<< dendl
;
696 auto& result
= sub
->get_bucket_info_result
;
697 sub
->bucket_info
= &result
->bucket_info
;
699 int ret
= sub
->data_access
->get_bucket(result
->bucket_info
, result
->attrs
, &sub
->bucket
);
701 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: data_access.get_bucket() bucket=" << result
->bucket_info
.bucket
<< " failed, ret=" << ret
<< dendl
;
702 return set_cr_error(ret
);
706 yield
call(new InitBucketLifecycleCR(sc
, conf
,
707 sub
->get_bucket_info_result
->bucket_info
,
708 sub
->get_bucket_info_result
->attrs
));
710 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf
->data_bucket_name
<< ") ret=" << retcode
<< dendl
;
711 return set_cr_error(retcode
);
714 return set_cr_done();
717 create_bucket
.user_info
= sub
->env
->data_user_info
;
718 create_bucket
.bucket_name
= sub_conf
->data_bucket_name
;
719 ldpp_dout(sync_env
->dpp
, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub
->env
->data_user_info
, true) << dendl
;
720 yield
call(new RGWBucketCreateLocalCR(sync_env
->async_rados
,
725 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to create bucket: " << "tenant="
726 << get_bucket_info
.tenant
<< " name=" << get_bucket_info
.bucket_name
<< ": ret=" << retcode
<< dendl
;
727 return set_cr_error(retcode
);
730 /* second iteration: we got -ENOENT and created a bucket */
733 /* failed twice on -ENOENT, unexpected */
734 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info
.tenant
735 << " name=" << get_bucket_info
.bucket_name
<< dendl
;
736 return set_cr_error(-EIO
);
742 template<typename EventType
>
743 class StoreEventCR
: public RGWCoroutine
{
744 RGWDataSyncCtx
* const sc
;
745 RGWDataSyncEnv
* const sync_env
;
746 const PSSubscriptionRef sub
;
747 const PSEvent
<EventType
> pse
;
748 const string oid_prefix
;
751 StoreEventCR(RGWDataSyncCtx
* const _sc
,
752 const PSSubscriptionRef
& _sub
,
753 const EventRef
<EventType
>& _event
) : RGWCoroutine(_sc
->cct
),
754 sc(_sc
), sync_env(_sc
->env
),
757 oid_prefix(sub
->sub_conf
->data_oid_prefix
) {
760 int operate() override
{
761 rgw_object_simple_put_params put_obj
;
764 put_obj
.bucket
= sub
->bucket
;
765 put_obj
.key
= rgw_obj_key(oid_prefix
+ pse
.id());
767 pse
.format(&put_obj
.data
);
771 pse
.encode_event(bl
);
773 bl
.encode_base64(bl64
);
774 put_obj
.user_data
= bl64
.to_str();
777 yield
call(new RGWObjectSimplePutCR(sync_env
->async_rados
,
782 ldpp_dout(sync_env
->dpp
, 10) << "failed to store event: " << put_obj
.bucket
<< "/" << put_obj
.key
<< " ret=" << retcode
<< dendl
;
783 return set_cr_error(retcode
);
785 ldpp_dout(sync_env
->dpp
, 20) << "event stored: " << put_obj
.bucket
<< "/" << put_obj
.key
<< dendl
;
788 return set_cr_done();
794 template<typename EventType
>
795 class PushEventCR
: public RGWCoroutine
{
796 RGWDataSyncCtx
* const sc
;
797 RGWDataSyncEnv
* const sync_env
;
798 const EventRef
<EventType
> event
;
799 const PSSubConfigRef
& sub_conf
;
802 PushEventCR(RGWDataSyncCtx
* const _sc
,
803 const PSSubscriptionRef
& _sub
,
804 const EventRef
<EventType
>& _event
) : RGWCoroutine(_sc
->cct
),
805 sc(_sc
), sync_env(_sc
->env
),
807 sub_conf(_sub
->sub_conf
) {
810 int operate() override
{
812 ceph_assert(sub_conf
->push_endpoint
);
813 yield
call(sub_conf
->push_endpoint
->send_to_completion_async(*event
.get(), sync_env
));
816 ldout(sync_env
->cct
, 10) << "failed to push event: " << event
->id
<<
817 " to endpoint: " << sub_conf
->push_endpoint_name
<< " ret=" << retcode
<< dendl
;
818 return set_cr_error(retcode
);
821 ldout(sync_env
->cct
, 20) << "event: " << event
->id
<<
822 " pushed to endpoint: " << sub_conf
->push_endpoint_name
<< dendl
;
823 return set_cr_done();
830 PSSubscription(RGWDataSyncCtx
*_sc
,
832 PSSubConfigRef
& _sub_conf
) : sc(_sc
), sync_env(_sc
->env
),
835 data_access(std::make_shared
<RGWDataAccess
>(sync_env
->store
)) {}
837 PSSubscription(RGWDataSyncCtx
*_sc
,
839 rgw_pubsub_sub_config
& user_sub_conf
) : sc(_sc
), sync_env(_sc
->env
),
841 sub_conf(std::make_shared
<PSSubConfig
>()),
842 data_access(std::make_shared
<RGWDataAccess
>(sync_env
->store
)) {
843 sub_conf
->from_user_conf(sync_env
->cct
, user_sub_conf
);
845 virtual ~PSSubscription() {
852 static PSSubscriptionRef
get_shared(RGWDataSyncCtx
*_sc
,
855 auto sub
= std::make_shared
<PSSubscription
>(_sc
, _env
, _sub_conf
);
856 sub
->init_cr
= new InitCR(_sc
, sub
);
861 int call_init_cr(RGWCoroutine
*caller
) {
862 return init_cr
->execute(caller
);
865 template<typename EventType
>
866 static RGWCoroutine
*store_event_cr(RGWDataSyncCtx
* const sc
, const PSSubscriptionRef
& sub
, const EventRef
<EventType
>& event
) {
867 return new StoreEventCR
<EventType
>(sc
, sub
, event
);
870 template<typename EventType
>
871 static RGWCoroutine
*push_event_cr(RGWDataSyncCtx
* const sc
, const PSSubscriptionRef
& sub
, const EventRef
<EventType
>& event
) {
872 return new PushEventCR
<EventType
>(sc
, sub
, event
);
880 RGWDataSyncEnv
*sync_env
;
883 std::map
<string
, PSSubscriptionRef
> subs
;
885 class GetSubCR
: public RGWSingletonCR
<PSSubscriptionRef
> {
887 RGWDataSyncEnv
*sync_env
;
892 PSSubscriptionRef
*ref
;
896 PSSubConfigRef sub_conf
;
897 rgw_pubsub_sub_config user_sub_conf
;
900 GetSubCR(RGWDataSyncCtx
*_sc
,
902 const rgw_user
& _owner
,
903 const string
& _sub_name
,
904 PSSubscriptionRef
*_ref
) : RGWSingletonCR
<PSSubscriptionRef
>(_sc
->cct
),
905 sc(_sc
), sync_env(_sc
->env
),
910 conf(mgr
->env
->conf
) {
914 int operate() override
{
917 if (!conf
->find_sub(sub_name
, &sub_conf
)) {
918 ldout(sync_env
->cct
, 10) << "failed to find subscription config: name=" << sub_name
<< dendl
;
919 mgr
->remove_get_sub(owner
, sub_name
);
920 return set_cr_error(-ENOENT
);
923 *ref
= PSSubscription::get_shared(sc
, mgr
->env
, sub_conf
);
925 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_pubsub_sub_config
>;
927 RGWPubSub
ps(sync_env
->store
, owner
.tenant
);
929 ps
.get_sub_meta_obj(sub_name
, &obj
);
930 bool empty_on_enoent
= false;
931 call(new ReadInfoCR(sync_env
->async_rados
, sync_env
->store
->svc()->sysobj
,
933 &user_sub_conf
, empty_on_enoent
));
936 mgr
->remove_get_sub(owner
, sub_name
);
937 return set_cr_error(retcode
);
940 *ref
= PSSubscription::get_shared(sc
, mgr
->env
, user_sub_conf
);
943 yield (*ref
)->call_init_cr(this);
945 ldout(sync_env
->cct
, 10) << "failed to init subscription" << dendl
;
946 mgr
->remove_get_sub(owner
, sub_name
);
947 return set_cr_error(retcode
);
951 mgr
->subs
[sub_name
] = *ref
;
953 mgr
->remove_get_sub(owner
, sub_name
);
955 return set_cr_done();
960 void return_result(PSSubscriptionRef
*result
) override
{
961 ldout(cct
, 20) << __func__
<< "(): returning result: retcode=" << retcode
<< " resultp=" << (void *)result
<< dendl
;
968 string
sub_id(const rgw_user
& owner
, const string
& sub_name
) {
970 if (!owner
.empty()) {
971 owner_prefix
= owner
.to_str() + "/";
974 return owner_prefix
+ sub_name
;
977 std::map
<std::string
, GetSubCR
*> get_subs
;
979 GetSubCR
*& get_get_subs(const rgw_user
& owner
, const string
& name
) {
980 return get_subs
[sub_id(owner
, name
)];
983 void remove_get_sub(const rgw_user
& owner
, const string
& name
) {
984 get_subs
.erase(sub_id(owner
, name
));
987 bool find_sub_instance(const rgw_user
& owner
, const string
& sub_name
, PSSubscriptionRef
*sub
) {
988 auto iter
= subs
.find(sub_id(owner
, sub_name
));
989 if (iter
!= subs
.end()) {
996 PSManager(RGWDataSyncCtx
*_sc
,
997 PSEnvRef _env
) : sc(_sc
), sync_env(_sc
->env
),
1001 static PSManagerRef
get_shared(RGWDataSyncCtx
*_sc
,
1003 return std::shared_ptr
<PSManager
>(new PSManager(_sc
, _env
));
1006 static int call_get_subscription_cr(RGWDataSyncCtx
*sc
, PSManagerRef
& mgr
,
1007 RGWCoroutine
*caller
, const rgw_user
& owner
, const string
& sub_name
, PSSubscriptionRef
*ref
) {
1008 if (mgr
->find_sub_instance(owner
, sub_name
, ref
)) {
1009 /* found it! nothing to execute */
1010 ldout(sc
->cct
, 20) << __func__
<< "(): found sub instance" << dendl
;
1012 auto& gs
= mgr
->get_get_subs(owner
, sub_name
);
1014 ldout(sc
->cct
, 20) << __func__
<< "(): first get subs" << dendl
;
1015 gs
= new GetSubCR(sc
, mgr
, owner
, sub_name
, ref
);
1017 ldout(sc
->cct
, 20) << __func__
<< "(): executing get subs" << dendl
;
1018 return gs
->execute(caller
, ref
);
1021 friend class GetSubCR
;
1024 void PSEnv::init_instance(const RGWRealm
& realm
, uint64_t instance_id
, PSManagerRef
& mgr
) {
1026 conf
->init_instance(realm
, instance_id
);
1029 class RGWPSInitEnvCBCR
: public RGWCoroutine
{
1031 RGWDataSyncEnv
*sync_env
;
1035 rgw_user_create_params create_user
;
1036 rgw_get_user_info_params get_user_info
;
1038 RGWPSInitEnvCBCR(RGWDataSyncCtx
*_sc
,
1039 PSEnvRef
& _env
) : RGWCoroutine(_sc
->cct
),
1040 sc(_sc
), sync_env(_sc
->env
),
1041 env(_env
), conf(env
->conf
) {}
1042 int operate() override
{
1044 ldpp_dout(sync_env
->dpp
, 1) << ": init pubsub config zone=" << sc
->source_zone
<< dendl
;
1046 /* nothing to do here right now */
1047 create_user
.user
= conf
->user
;
1048 create_user
.max_buckets
= 0; /* unlimited */
1049 create_user
.display_name
= "pubsub";
1050 create_user
.generate_key
= false;
1051 yield
call(new RGWUserCreateCR(sync_env
->async_rados
, sync_env
->store
, create_user
, sync_env
->dpp
));
1052 if (retcode
< 0 && retcode
!= -ERR_USER_EXIST
) {
1053 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to create rgw user: ret=" << retcode
<< dendl
;
1054 return set_cr_error(retcode
);
1057 get_user_info
.user
= conf
->user
;
1058 yield
call(new RGWGetUserInfoCR(sync_env
->async_rados
, sync_env
->store
, get_user_info
, env
->data_user_info
));
1060 ldpp_dout(sync_env
->dpp
, 1) << "ERROR: failed to create rgw user: ret=" << retcode
<< dendl
;
1061 return set_cr_error(retcode
);
1064 ldpp_dout(sync_env
->dpp
, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env
->data_user_info
, true) << dendl
;
1067 return set_cr_done();
1073 bool match(const rgw_pubsub_topic_filter
& filter
, const std::string
& key_name
, rgw::notify::EventType event_type
) {
1074 if (!match(filter
.events
, event_type
)) {
1077 if (!match(filter
.s3_filter
.key_filter
, key_name
)) {
1083 class RGWPSFindBucketTopicsCR
: public RGWCoroutine
{
1085 RGWDataSyncEnv
*sync_env
;
1090 rgw::notify::EventType event_type
;
1094 rgw_raw_obj bucket_obj
;
1095 rgw_raw_obj user_obj
;
1096 rgw_pubsub_bucket_topics bucket_topics
;
1097 rgw_pubsub_topics user_topics
;
1100 RGWPSFindBucketTopicsCR(RGWDataSyncCtx
*_sc
,
1102 const rgw_user
& _owner
,
1103 const rgw_bucket
& _bucket
,
1104 const rgw_obj_key
& _key
,
1105 rgw::notify::EventType _event_type
,
1106 TopicsRef
*_topics
) : RGWCoroutine(_sc
->cct
),
1107 sc(_sc
), sync_env(_sc
->env
),
1112 event_type(_event_type
),
1113 ps(sync_env
->store
, owner
.tenant
),
1115 *topics
= std::make_shared
<vector
<PSTopicConfigRef
> >();
1117 int operate() override
{
1119 ps
.get_bucket_meta_obj(bucket
, &bucket_obj
);
1120 ps
.get_meta_obj(&user_obj
);
1122 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_pubsub_bucket_topics
>;
1124 bool empty_on_enoent
= true;
1125 call(new ReadInfoCR(sync_env
->async_rados
, sync_env
->store
->svc()->sysobj
,
1127 &bucket_topics
, empty_on_enoent
));
1129 if (retcode
< 0 && retcode
!= -ENOENT
) {
1130 return set_cr_error(retcode
);
1133 ldout(sync_env
->cct
, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics
.topics
.size() << " topics for bucket " << bucket
<< dendl
;
1135 if (!bucket_topics
.topics
.empty()) {
1136 using ReadUserTopicsInfoCR
= RGWSimpleRadosReadCR
<rgw_pubsub_topics
>;
1138 bool empty_on_enoent
= true;
1139 call(new ReadUserTopicsInfoCR(sync_env
->async_rados
, sync_env
->store
->svc()->sysobj
,
1141 &user_topics
, empty_on_enoent
));
1143 if (retcode
< 0 && retcode
!= -ENOENT
) {
1144 return set_cr_error(retcode
);
1148 for (auto& titer
: bucket_topics
.topics
) {
1149 auto& topic_filter
= titer
.second
;
1150 auto& info
= topic_filter
.topic
;
1151 if (!match(topic_filter
, key
.name
, event_type
)) {
1154 std::shared_ptr
<PSTopicConfig
> tc
= std::make_shared
<PSTopicConfig
>();
1155 tc
->name
= info
.name
;
1156 tc
->subs
= user_topics
.topics
[info
.name
].subs
;
1157 tc
->opaque_data
= info
.opaque_data
;
1158 (*topics
)->push_back(tc
);
1161 env
->conf
->get_topics(sync_env
->cct
, bucket
, key
, topics
);
1162 return set_cr_done();
1168 class RGWPSHandleObjEventCR
: public RGWCoroutine
{
1169 RGWDataSyncCtx
* const sc
;
1171 const rgw_user
& owner
;
1172 const EventRef
<rgw_pubsub_event
> event
;
1173 const EventRef
<rgw_pubsub_s3_record
> record
;
1174 const TopicsRef topics
;
1175 const std::array
<rgw_user
, 2> owners
;
1176 bool has_subscriptions
;
1178 bool sub_conf_found
;
1179 PSSubscriptionRef sub
;
1180 std::array
<rgw_user
, 2>::const_iterator oiter
;
1181 std::vector
<PSTopicConfigRef
>::const_iterator titer
;
1182 std::set
<std::string
>::const_iterator siter
;
1183 int last_sub_conf_error
;
1186 RGWPSHandleObjEventCR(RGWDataSyncCtx
* const _sc
,
1187 const PSEnvRef _env
,
1188 const rgw_user
& _owner
,
1189 const EventRef
<rgw_pubsub_event
>& _event
,
1190 const EventRef
<rgw_pubsub_s3_record
>& _record
,
1191 const TopicsRef
& _topics
) : RGWCoroutine(_sc
->cct
),
1198 owners({owner
, rgw_user
{}}),
1199 has_subscriptions(false),
1200 event_handled(false) {}
1202 int operate() override
{
1204 ldout(sc
->cct
, 20) << ": handle event: obj: z=" << sc
->source_zone
1205 << " event=" << json_str("event", *event
, false)
1206 << " owner=" << owner
<< dendl
;
1208 ldout(sc
->cct
, 20) << "pubsub: " << topics
->size() << " topics found for path" << dendl
;
1210 // outside caller should check that
1211 ceph_assert(!topics
->empty());
1213 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_event_triggered
);
1215 // loop over all topics related to the bucket/object
1216 for (titer
= topics
->begin(); titer
!= topics
->end(); ++titer
) {
1217 ldout(sc
->cct
, 20) << ": notification for " << event
->source
<< ": topic=" <<
1218 (*titer
)->name
<< ", has " << (*titer
)->subs
.size() << " subscriptions" << dendl
;
1219 // loop over all subscriptions of the topic
1220 for (siter
= (*titer
)->subs
.begin(); siter
!= (*titer
)->subs
.end(); ++siter
) {
1221 ldout(sc
->cct
, 20) << ": subscription: " << *siter
<< dendl
;
1222 has_subscriptions
= true;
1223 sub_conf_found
= false;
1224 // try to read subscription configuration from global/user cond
1225 // configuration is considered missing only if does not exist in either
1226 for (oiter
= owners
.begin(); oiter
!= owners
.end(); ++oiter
) {
1227 yield
PSManager::call_get_subscription_cr(sc
, env
->manager
, this, *oiter
, *siter
, &sub
);
1229 if (sub_conf_found
) {
1230 // not a real issue, sub conf already found
1233 last_sub_conf_error
= retcode
;
1236 sub_conf_found
= true;
1237 if (sub
->sub_conf
->s3_id
.empty()) {
1238 // subscription was not made by S3 compatible API
1239 ldout(sc
->cct
, 20) << "storing event for subscription=" << *siter
<< " owner=" << *oiter
<< " ret=" << retcode
<< dendl
;
1240 yield
call(PSSubscription::store_event_cr(sc
, sub
, event
));
1242 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_store_fail
);
1243 ldout(sc
->cct
, 1) << "ERROR: failed to store event for subscription=" << *siter
<< " ret=" << retcode
<< dendl
;
1245 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_store_ok
);
1246 event_handled
= true;
1248 if (sub
->sub_conf
->push_endpoint
) {
1249 ldout(sc
->cct
, 20) << "push event for subscription=" << *siter
<< " owner=" << *oiter
<< " ret=" << retcode
<< dendl
;
1250 yield
call(PSSubscription::push_event_cr(sc
, sub
, event
));
1252 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
1253 ldout(sc
->cct
, 1) << "ERROR: failed to push event for subscription=" << *siter
<< " ret=" << retcode
<< dendl
;
1255 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_ok
);
1256 event_handled
= true;
1260 // subscription was made by S3 compatible API
1261 ldout(sc
->cct
, 20) << "storing record for subscription=" << *siter
<< " owner=" << *oiter
<< " ret=" << retcode
<< dendl
;
1262 record
->configurationId
= sub
->sub_conf
->s3_id
;
1263 record
->opaque_data
= (*titer
)->opaque_data
;
1264 yield
call(PSSubscription::store_event_cr(sc
, sub
, record
));
1266 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_store_fail
);
1267 ldout(sc
->cct
, 1) << "ERROR: failed to store record for subscription=" << *siter
<< " ret=" << retcode
<< dendl
;
1269 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_store_ok
);
1270 event_handled
= true;
1272 if (sub
->sub_conf
->push_endpoint
) {
1273 ldout(sc
->cct
, 20) << "push record for subscription=" << *siter
<< " owner=" << *oiter
<< " ret=" << retcode
<< dendl
;
1274 yield
call(PSSubscription::push_event_cr(sc
, sub
, record
));
1276 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
1277 ldout(sc
->cct
, 1) << "ERROR: failed to push record for subscription=" << *siter
<< " ret=" << retcode
<< dendl
;
1279 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_ok
);
1280 event_handled
= true;
1285 if (!sub_conf_found
) {
1286 // could not find conf for subscription at user or global levels
1287 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_missing_conf
);
1288 ldout(sc
->cct
, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
1289 << " ret=" << last_sub_conf_error
<< dendl
;
1290 if (retcode
== -ENOENT
) {
1291 // missing subscription info should be reflected back as invalid argument
1292 // and not as missing object
1298 if (has_subscriptions
&& !event_handled
) {
1299 // event is considered "lost" of it has subscriptions on any of its topics
1300 // but it was not stored in, or pushed to, any of them
1301 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_event_lost
);
1304 return set_cr_error(retcode
);
1306 return set_cr_done();
1312 // coroutine invoked on remote object creation
1313 class RGWPSHandleRemoteObjCBCR
: public RGWStatRemoteObjCBCR
{
1315 rgw_bucket_sync_pipe sync_pipe
;
1317 std::optional
<uint64_t> versioned_epoch
;
1318 EventRef
<rgw_pubsub_event
> event
;
1319 EventRef
<rgw_pubsub_s3_record
> record
;
1322 RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx
*_sc
,
1323 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
,
1324 PSEnvRef _env
, std::optional
<uint64_t> _versioned_epoch
,
1325 TopicsRef
& _topics
) : RGWStatRemoteObjCBCR(_sc
, _sync_pipe
.info
.source_bs
.bucket
, _key
),
1327 sync_pipe(_sync_pipe
),
1329 versioned_epoch(_versioned_epoch
),
1332 int operate() override
{
1334 ldout(sc
->cct
, 20) << ": stat of remote obj: z=" << sc
->source_zone
1335 << " b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " size=" << size
<< " mtime=" << mtime
1336 << " attrs=" << attrs
<< dendl
;
1338 std::vector
<std::pair
<std::string
, std::string
> > attrs
;
1339 for (auto& attr
: attrs
) {
1340 std::string k
= attr
.first
;
1341 if (boost::algorithm::starts_with(k
, RGW_ATTR_PREFIX
)) {
1342 k
= k
.substr(sizeof(RGW_ATTR_PREFIX
) - 1);
1344 attrs
.push_back(std::make_pair(k
, attr
.second
));
1346 // at this point we don't know whether we need the ceph event or S3 record
1347 // this is why both are created here, once we have information about the
1348 // subscription, we will store/push only the relevant ones
1349 make_event_ref(sc
->cct
,
1350 sync_pipe
.info
.source_bs
.bucket
, key
,
1352 rgw::notify::ObjectCreated
, &event
);
1353 make_s3_record_ref(sc
->cct
,
1354 sync_pipe
.info
.source_bs
.bucket
, sync_pipe
.dest_bucket_info
.owner
, key
,
1356 rgw::notify::ObjectCreated
, &record
);
1359 yield
call(new RGWPSHandleObjEventCR(sc
, env
, sync_pipe
.source_bucket_info
.owner
, event
, record
, topics
));
1361 return set_cr_error(retcode
);
1363 return set_cr_done();
1369 class RGWPSHandleRemoteObjCR
: public RGWCallStatRemoteObjCR
{
1370 rgw_bucket_sync_pipe sync_pipe
;
1372 std::optional
<uint64_t> versioned_epoch
;
1375 RGWPSHandleRemoteObjCR(RGWDataSyncCtx
*_sc
,
1376 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
,
1377 PSEnvRef _env
, std::optional
<uint64_t> _versioned_epoch
,
1378 TopicsRef
& _topics
) : RGWCallStatRemoteObjCR(_sc
, _sync_pipe
.info
.source_bs
.bucket
, _key
),
1379 sync_pipe(_sync_pipe
),
1380 env(_env
), versioned_epoch(_versioned_epoch
),
1384 ~RGWPSHandleRemoteObjCR() override
{}
1386 RGWStatRemoteObjCBCR
*allocate_callback() override
{
1387 return new RGWPSHandleRemoteObjCBCR(sc
, sync_pipe
, key
, env
, versioned_epoch
, topics
);
1391 class RGWPSHandleObjCreateCR
: public RGWCoroutine
{
1393 rgw_bucket_sync_pipe sync_pipe
;
1396 std::optional
<uint64_t> versioned_epoch
;
1399 RGWPSHandleObjCreateCR(RGWDataSyncCtx
*_sc
,
1400 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
,
1401 PSEnvRef _env
, std::optional
<uint64_t> _versioned_epoch
) : RGWCoroutine(_sc
->cct
),
1403 sync_pipe(_sync_pipe
),
1406 versioned_epoch(_versioned_epoch
) {
1409 ~RGWPSHandleObjCreateCR() override
{}
1411 int operate() override
{
1413 yield
call(new RGWPSFindBucketTopicsCR(sc
, env
, sync_pipe
.dest_bucket_info
.owner
,
1414 sync_pipe
.info
.source_bs
.bucket
, key
,
1415 rgw::notify::ObjectCreated
,
1418 ldout(sc
->cct
, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode
<< dendl
;
1419 return set_cr_error(retcode
);
1421 if (topics
->empty()) {
1422 ldout(sc
->cct
, 20) << "no topics found for " << sync_pipe
.info
.source_bs
.bucket
<< "/" << key
<< dendl
;
1423 return set_cr_done();
1425 yield
call(new RGWPSHandleRemoteObjCR(sc
, sync_pipe
, key
, env
, versioned_epoch
, topics
));
1427 return set_cr_error(retcode
);
1429 return set_cr_done();
1435 // coroutine invoked on remote object deletion
1436 class RGWPSGenericObjEventCBCR
: public RGWCoroutine
{
1442 ceph::real_time mtime
;
1443 rgw::notify::EventType event_type
;
1444 EventRef
<rgw_pubsub_event
> event
;
1445 EventRef
<rgw_pubsub_s3_record
> record
;
1448 RGWPSGenericObjEventCBCR(RGWDataSyncCtx
*_sc
,
1450 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
, const ceph::real_time
& _mtime
,
1451 rgw::notify::EventType _event_type
) : RGWCoroutine(_sc
->cct
),
1454 owner(_sync_pipe
.dest_bucket_info
.owner
),
1455 bucket(_sync_pipe
.dest_bucket_info
.bucket
),
1457 mtime(_mtime
), event_type(_event_type
) {}
1458 int operate() override
{
1460 ldout(sc
->cct
, 20) << ": remove remote obj: z=" << sc
->source_zone
1461 << " b=" << bucket
<< " k=" << key
<< " mtime=" << mtime
<< dendl
;
1462 yield
call(new RGWPSFindBucketTopicsCR(sc
, env
, owner
, bucket
, key
, event_type
, &topics
));
1464 ldout(sc
->cct
, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode
<< dendl
;
1465 return set_cr_error(retcode
);
1467 if (topics
->empty()) {
1468 ldout(sc
->cct
, 20) << "no topics found for " << bucket
<< "/" << key
<< dendl
;
1469 return set_cr_done();
1471 // at this point we don't know whether we need the ceph event or S3 record
1472 // this is why both are created here, once we have information about the
1473 // subscription, we will store/push only the relevant ones
1474 make_event_ref(sc
->cct
,
1477 event_type
, &event
);
1478 make_s3_record_ref(sc
->cct
,
1481 event_type
, &record
);
1482 yield
call(new RGWPSHandleObjEventCR(sc
, env
, owner
, event
, record
, topics
));
1484 return set_cr_error(retcode
);
1486 return set_cr_done();
1493 class RGWPSDataSyncModule
: public RGWDataSyncModule
{
1498 RGWPSDataSyncModule(CephContext
*cct
, const JSONFormattable
& config
) : env(std::make_shared
<PSEnv
>()), conf(env
->conf
) {
1499 env
->init(cct
, config
);
1502 ~RGWPSDataSyncModule() override
{}
1504 void init(RGWDataSyncCtx
*sc
, uint64_t instance_id
) override
{
1505 auto sync_env
= sc
->env
;
1506 PSManagerRef mgr
= PSManager::get_shared(sc
, env
);
1507 env
->init_instance(sync_env
->svc
->zone
->get_realm(), instance_id
, mgr
);
1510 RGWCoroutine
*start_sync(RGWDataSyncCtx
*sc
) override
{
1511 ldout(sc
->cct
, 5) << conf
->id
<< ": start" << dendl
;
1512 return new RGWPSInitEnvCBCR(sc
, env
);
1515 RGWCoroutine
*sync_object(RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
,
1516 rgw_obj_key
& key
, std::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
1517 ldout(sc
->cct
, 10) << conf
->id
<< ": sync_object: b=" << sync_pipe
<<
1518 " k=" << key
<< " versioned_epoch=" << versioned_epoch
.value_or(0) << dendl
;
1519 return new RGWPSHandleObjCreateCR(sc
, sync_pipe
, key
, env
, versioned_epoch
);
1522 RGWCoroutine
*remove_object(RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
,
1523 rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
1524 ldout(sc
->cct
, 10) << conf
->id
<< ": rm_object: b=" << sync_pipe
<<
1525 " k=" << key
<< " mtime=" << mtime
<< " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
1526 return new RGWPSGenericObjEventCBCR(sc
, env
, sync_pipe
, key
, mtime
, rgw::notify::ObjectRemovedDelete
);
1529 RGWCoroutine
*create_delete_marker(RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
,
1530 rgw_obj_key
& key
, real_time
& mtime
, rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
1531 ldout(sc
->cct
, 10) << conf
->id
<< ": create_delete_marker: b=" << sync_pipe
<<
1532 " k=" << key
<< " mtime=" << mtime
<< " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
1533 return new RGWPSGenericObjEventCBCR(sc
, env
, sync_pipe
, key
, mtime
, rgw::notify::ObjectRemovedDeleteMarkerCreated
);
1536 PSConfigRef
& get_conf() { return conf
; }
1539 RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext
*cct
, const JSONFormattable
& config
)
1541 data_handler
= std::unique_ptr
<RGWPSDataSyncModule
>(new RGWPSDataSyncModule(cct
, config
));
1542 const std::string jconf
= json_str("conf", *data_handler
->get_conf());
1544 if (!p
.parse(jconf
.c_str(), jconf
.size())) {
1545 ldout(cct
, 1) << "ERROR: failed to parse sync module effective conf: " << jconf
<< dendl
;
1546 effective_conf
= config
;
1548 effective_conf
.decode_json(&p
);
1552 RGWDataSyncModule
*RGWPSSyncModuleInstance::get_data_handler()
1554 return data_handler
.get();
1557 RGWRESTMgr
*RGWPSSyncModuleInstance::get_rest_filter(int dialect
, RGWRESTMgr
*orig
) {
1558 if (dialect
!= RGW_REST_S3
) {
1561 return new RGWRESTMgr_PubSub();
1564 bool RGWPSSyncModuleInstance::should_full_sync() const {
1565 return data_handler
->get_conf()->start_with_full_sync
;
1568 int RGWPSSyncModule::create_instance(CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
) {
1569 instance
->reset(new RGWPSSyncModuleInstance(cct
, config
));