1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "services/svc_zone.h"
5 #include "rgw_common.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_sync_module.h"
8 #include "rgw_data_sync.h"
9 #include "rgw_sync_module_pubsub.h"
10 #include "rgw_sync_module_pubsub_rest.h"
11 #include "rgw_rest_conn.h"
12 #include "rgw_cr_rados.h"
13 #include "rgw_cr_rest.h"
14 #include "rgw_cr_tools.h"
16 #include "rgw_pubsub.h"
17 #include "rgw_pubsub_push.h"
18 #include "rgw_notify_event_type.h"
19 #include "rgw_perf_counters.h"
21 #include <boost/algorithm/hex.hpp>
22 #include <boost/asio/yield.hpp>
24 #define dout_subsys ceph_subsys_rgw
27 #define PUBSUB_EVENTS_RETENTION_DEFAULT 7
36 "tenant": <tenant>, # default: <empty>
37 "uid": <uid>, # default: "pubsub"
38 "data_bucket_prefix": <prefix> # default: "pubsub-"
39 "data_oid_prefix": <prefix> #
40 "events_retention_days": <int> # default: 7
41 "start_with_full_sync" <bool> # default: false
46 // utility function to convert the args list from string format
47 // (ampresend separated with equal sign) to prased structure
48 RGWHTTPArgs
string_to_args(const std::string
& str_args
, const DoutPrefixProvider
*dpp
) {
58 std::string push_endpoint_name
;
59 std::string push_endpoint_args
;
60 std::string data_bucket_name
;
61 std::string data_oid_prefix
;
63 std::string arn_topic
;
64 RGWPubSubEndpoint::Ptr push_endpoint
;
66 void from_user_conf(CephContext
*cct
, const rgw_pubsub_sub_config
& uc
, const DoutPrefixProvider
*dpp
) {
69 push_endpoint_name
= uc
.dest
.push_endpoint
;
70 data_bucket_name
= uc
.dest
.bucket_name
;
71 data_oid_prefix
= uc
.dest
.oid_prefix
;
73 arn_topic
= uc
.dest
.arn_topic
;
74 if (!push_endpoint_name
.empty()) {
75 push_endpoint_args
= uc
.dest
.push_endpoint_args
;
77 push_endpoint
= RGWPubSubEndpoint::create(push_endpoint_name
, arn_topic
, string_to_args(push_endpoint_args
, dpp
), cct
);
78 ldpp_dout(dpp
, 20) << "push endpoint created: " << push_endpoint
->to_str() << dendl
;
79 } catch (const RGWPubSubEndpoint::configuration_error
& e
) {
80 ldpp_dout(dpp
, 1) << "ERROR: failed to create push endpoint: "
81 << push_endpoint_name
<< " due to: " << e
.what() << dendl
;
86 void dump(Formatter
*f
) const {
87 encode_json("name", name
, f
);
88 encode_json("topic", topic
, f
);
89 encode_json("push_endpoint", push_endpoint_name
, f
);
90 encode_json("push_endpoint_args", push_endpoint_args
, f
);
91 encode_json("data_bucket_name", data_bucket_name
, f
);
92 encode_json("data_oid_prefix", data_oid_prefix
, f
);
93 encode_json("s3_id", s3_id
, f
);
98 using PSSubConfigRef
= std::shared_ptr
<PSSubConfig
>;
100 struct PSTopicConfig
{
102 std::set
<std::string
> subs
;
103 std::string opaque_data
;
105 void dump(Formatter
*f
) const {
106 encode_json("name", name
, f
);
107 encode_json("subs", subs
, f
);
108 encode_json("opaque", opaque_data
, f
);
112 struct PSNotificationConfig
{
114 string path
; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */
116 bool is_prefix
{false};
119 void dump(Formatter
*f
) const {
120 encode_json("id", id
, f
);
121 encode_json("path", path
, f
);
122 encode_json("topic", topic
, f
);
123 encode_json("is_prefix", is_prefix
, f
);
126 void init(CephContext
*cct
, const JSONFormattable
& config
) {
127 path
= config
["path"];
128 if (!path
.empty() && path
[path
.size() - 1] == '*') {
129 path
= path
.substr(0, path
.size() - 1);
132 topic
= config
["topic"];
137 static string
json_str(const char *name
, const T
& obj
, bool pretty
= false)
140 JSONFormatter
f(pretty
);
142 encode_json(name
, obj
, &f
);
148 using PSTopicConfigRef
= std::shared_ptr
<PSTopicConfig
>;
149 using TopicsRef
= std::shared_ptr
<std::vector
<PSTopicConfigRef
>>;
151 // global pubsub configuration
153 const std::string id
{"pubsub"};
155 std::string data_bucket_prefix
;
156 std::string data_oid_prefix
;
157 int events_retention_days
{0};
158 uint64_t sync_instance
{0};
159 bool start_with_full_sync
{false};
161 void dump(Formatter
*f
) const {
162 encode_json("id", id
, f
);
163 encode_json("user", user
, f
);
164 encode_json("data_bucket_prefix", data_bucket_prefix
, f
);
165 encode_json("data_oid_prefix", data_oid_prefix
, f
);
166 encode_json("events_retention_days", events_retention_days
, f
);
167 encode_json("sync_instance", sync_instance
, f
);
168 encode_json("start_with_full_sync", start_with_full_sync
, f
);
171 void init(CephContext
*cct
, const JSONFormattable
& config
) {
172 string uid
= config
["uid"]("pubsub");
173 user
= rgw_user(config
["tenant"], uid
);
174 data_bucket_prefix
= config
["data_bucket_prefix"]("pubsub-");
175 data_oid_prefix
= config
["data_oid_prefix"];
176 events_retention_days
= config
["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT
);
177 start_with_full_sync
= config
["start_with_full_sync"](false);
179 ldout(cct
, 20) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl
;
182 void init_instance(const RGWRealm
& realm
, uint64_t instance_id
) {
183 sync_instance
= instance_id
;
187 using PSConfigRef
= std::shared_ptr
<PSConfig
>;
188 template<typename EventType
>
189 using EventRef
= std::shared_ptr
<EventType
>;
191 struct objstore_event
{
193 const rgw_bucket
& bucket
;
194 const rgw_obj_key
& key
;
195 const ceph::real_time
& mtime
;
196 const std::vector
<std::pair
<std::string
, std::string
> > *attrs
;
198 objstore_event(const rgw_bucket
& _bucket
,
199 const rgw_obj_key
& _key
,
200 const ceph::real_time
& _mtime
,
201 const std::vector
<std::pair
<std::string
, std::string
> > *_attrs
) : bucket(_bucket
),
209 hash
.update(bucket
.bucket_id
);
210 hash
.update(key
.name
);
211 hash
.update(key
.instance
);
214 assert(etag
.size() > 8);
216 return etag
.substr(0, 8);
219 void dump(Formatter
*f
) const {
221 Formatter::ObjectSection
s(*f
, "bucket");
222 encode_json("name", bucket
.name
, f
);
223 encode_json("tenant", bucket
.tenant
, f
);
224 encode_json("bucket_id", bucket
.bucket_id
, f
);
227 Formatter::ObjectSection
s(*f
, "key");
228 encode_json("name", key
.name
, f
);
229 encode_json("instance", key
.instance
, f
);
232 encode_json("mtime", mt
, f
);
233 Formatter::ObjectSection
s(*f
, "attrs");
235 for (auto& attr
: *attrs
) {
236 encode_json(attr
.first
.c_str(), attr
.second
.c_str(), f
);
242 static void make_event_ref(CephContext
*cct
, const rgw_bucket
& bucket
,
243 const rgw_obj_key
& key
,
244 const ceph::real_time
& mtime
,
245 const std::vector
<std::pair
<std::string
, std::string
> > *attrs
,
246 rgw::notify::EventType event_type
,
247 EventRef
<rgw_pubsub_event
> *event
) {
248 *event
= std::make_shared
<rgw_pubsub_event
>();
250 EventRef
<rgw_pubsub_event
>& e
= *event
;
251 e
->event_name
= rgw::notify::to_ceph_string(event_type
);
252 e
->source
= bucket
.name
+ "/" + key
.name
;
253 e
->timestamp
= real_clock::now();
255 objstore_event
oevent(bucket
, key
, mtime
, attrs
);
257 const utime_t
ts(e
->timestamp
);
258 set_event_id(e
->id
, oevent
.get_hash(), ts
);
260 encode_json("info", oevent
, &e
->info
);
263 static void make_s3_event_ref(CephContext
*cct
, const rgw_bucket
& bucket
,
264 const rgw_user
& owner
,
265 const rgw_obj_key
& key
,
266 const ceph::real_time
& mtime
,
267 const std::vector
<std::pair
<std::string
, std::string
>>* attrs
,
268 rgw::notify::EventType event_type
,
269 EventRef
<rgw_pubsub_s3_event
>* event
) {
270 *event
= std::make_shared
<rgw_pubsub_s3_event
>();
272 EventRef
<rgw_pubsub_s3_event
>& e
= *event
;
273 e
->eventTime
= mtime
;
274 e
->eventName
= rgw::notify::to_event_string(event_type
);
275 // userIdentity: not supported in sync module
276 // x_amz_request_id: not supported in sync module
277 // x_amz_id_2: not supported in sync module
278 // configurationId is filled from subscription configuration
279 e
->bucket_name
= bucket
.name
;
280 e
->bucket_ownerIdentity
= owner
.to_str();
281 e
->bucket_arn
= to_string(rgw::ARN(bucket
));
282 e
->bucket_id
= bucket
.bucket_id
; // rgw extension
283 e
->object_key
= key
.name
;
284 // object_size not supported in sync module
285 objstore_event
oevent(bucket
, key
, mtime
, attrs
);
286 e
->object_etag
= oevent
.get_hash();
287 e
->object_versionId
= key
.instance
;
289 // use timestamp as per key sequence id (hex encoded)
290 const utime_t
ts(real_clock::now());
291 boost::algorithm::hex((const char*)&ts
, (const char*)&ts
+ sizeof(utime_t
),
292 std::back_inserter(e
->object_sequencer
));
294 set_event_id(e
->id
, e
->object_etag
, ts
);
298 using PSManagerRef
= std::shared_ptr
<PSManager
>;
302 shared_ptr
<RGWUserInfo
> data_user_info
;
303 PSManagerRef manager
;
305 PSEnv() : conf(make_shared
<PSConfig
>()),
306 data_user_info(make_shared
<RGWUserInfo
>()) {}
308 void init(CephContext
*cct
, const JSONFormattable
& config
) {
309 conf
->init(cct
, config
);
312 void init_instance(const RGWRealm
& realm
, uint64_t instance_id
, PSManagerRef
& mgr
);
315 using PSEnvRef
= std::shared_ptr
<PSEnv
>;
317 template<typename EventType
>
319 const EventRef
<EventType
> event
;
322 PSEvent(const EventRef
<EventType
>& _event
) : event(_event
) {}
324 void format(bufferlist
*bl
) const {
325 bl
->append(json_str("", *event
));
328 void encode_event(bufferlist
& bl
) const {
332 const string
& id() const {
338 class RGWSingletonCR
: public RGWCoroutine
{
339 friend class WrapperCR
;
341 boost::asio::coroutine wrapper_state
;
346 RGWCoroutine
*cr
{nullptr};
349 using WaiterInfoRef
= std::shared_ptr
<WaiterInfo
>;
351 deque
<WaiterInfoRef
> waiters
;
353 void add_waiter(RGWCoroutine
*cr
, T
*result
) {
354 auto waiter
= std::make_shared
<WaiterInfo
>();
356 waiter
->result
= result
;
357 waiters
.push_back(waiter
);
360 bool get_next_waiter(WaiterInfoRef
*waiter
) {
361 if (waiters
.empty()) {
366 *waiter
= waiters
.front();
371 int operate_wrapper(const DoutPrefixProvider
*dpp
) override
{
372 reenter(&wrapper_state
) {
374 ldpp_dout(dpp
, 20) << __func__
<< "(): operate_wrapper() -> operate()" << dendl
;
375 operate_ret
= operate(dpp
);
376 if (operate_ret
< 0) {
377 ldpp_dout(dpp
, 20) << *this << ": operate() returned r=" << operate_ret
<< dendl
;
384 ldpp_dout(dpp
, 20) << __func__
<< "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters
.size() << " waiters" << dendl
;
385 /* we're done, can't yield anymore */
387 WaiterInfoRef waiter
;
388 while (get_next_waiter(&waiter
)) {
389 ldpp_dout(dpp
, 20) << __func__
<< "(): RGWSingletonCR: waking up waiter" << dendl
;
390 waiter
->cr
->set_retcode(retcode
);
391 waiter
->cr
->set_sleeping(false);
392 return_result(dpp
, waiter
->result
);
401 virtual void return_result(const DoutPrefixProvider
*dpp
, T
*result
) {}
404 RGWSingletonCR(CephContext
*_cct
)
405 : RGWCoroutine(_cct
) {}
407 int execute(const DoutPrefixProvider
*dpp
, RGWCoroutine
*caller
, T
*result
= nullptr) {
409 ldpp_dout(dpp
, 20) << __func__
<< "(): singleton not started, starting" << dendl
;
413 } else if (!is_done()) {
414 ldpp_dout(dpp
, 20) << __func__
<< "(): singleton not done yet, registering as waiter" << dendl
;
416 add_waiter(caller
, result
);
417 caller
->set_sleeping(true);
421 ldpp_dout(dpp
, 20) << __func__
<< "(): singleton done, returning retcode=" << retcode
<< dendl
;
422 caller
->set_retcode(retcode
);
423 return_result(dpp
, result
);
429 class PSSubscription
;
430 using PSSubscriptionRef
= std::shared_ptr
<PSSubscription
>;
432 class PSSubscription
{
435 friend class RGWPSHandleObjEventCR
;
438 RGWDataSyncEnv
*sync_env
;
440 PSSubConfigRef sub_conf
;
441 std::shared_ptr
<rgw_get_bucket_info_result
> get_bucket_info_result
;
442 RGWBucketInfo
*bucket_info
{nullptr};
443 RGWDataAccessRef data_access
;
444 RGWDataAccess::BucketRef bucket
;
446 InitCR
*init_cr
{nullptr};
448 class InitBucketLifecycleCR
: public RGWCoroutine
{
450 RGWDataSyncEnv
*sync_env
;
456 rgw_bucket_lifecycle_config_params lc_config
;
459 InitBucketLifecycleCR(RGWDataSyncCtx
*_sc
,
461 rgw::sal::Bucket
* _bucket
) : RGWCoroutine(_sc
->cct
),
462 sc(_sc
), sync_env(_sc
->env
),
464 lc_config
.bucket
= _bucket
;
465 lc_config
.bucket_attrs
= _bucket
->get_attrs();
466 retention_days
= conf
->events_retention_days
;
469 int operate(const DoutPrefixProvider
*dpp
) override
{
472 rule
.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days
);
475 /* maybe we already have it configured? */
476 RGWLifecycleConfiguration old_config
;
477 auto aiter
= lc_config
.bucket_attrs
.find(RGW_ATTR_LC
);
478 if (aiter
!= lc_config
.bucket_attrs
.end()) {
479 bufferlist::const_iterator iter
{&aiter
->second
};
481 old_config
.decode(iter
);
482 } catch (const buffer::error
& e
) {
483 ldpp_dout(dpp
, 0) << __func__
<< "(): decode life cycle config failed" << dendl
;
487 auto old_rules
= old_config
.get_rule_map();
488 for (auto ori
: old_rules
) {
489 auto& old_rule
= ori
.second
;
491 if (old_rule
.get_prefix().empty() &&
492 old_rule
.get_expiration().get_days() == retention_days
&&
493 old_rule
.is_enabled()) {
494 ldpp_dout(dpp
, 20) << "no need to set lifecycle rule on bucket, existing rule matches config" << dendl
;
495 return set_cr_done();
500 lc_config
.config
.add_rule(rule
);
501 yield
call(new RGWBucketLifecycleConfigCR(sync_env
->async_rados
,
506 ldpp_dout(dpp
, 1) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode
<< dendl
;
507 return set_cr_error(retcode
);
510 return set_cr_done();
516 class InitCR
: public RGWSingletonCR
<bool> {
518 RGWDataSyncEnv
*sync_env
;
519 PSSubscriptionRef sub
;
520 rgw_get_bucket_info_params get_bucket_info
;
521 rgw_bucket_create_local_params create_bucket
;
523 PSSubConfigRef
& sub_conf
;
527 InitCR(RGWDataSyncCtx
*_sc
,
528 PSSubscriptionRef
& _sub
) : RGWSingletonCR
<bool>(_sc
->cct
),
529 sc(_sc
), sync_env(_sc
->env
),
530 sub(_sub
), conf(sub
->env
->conf
),
531 sub_conf(sub
->sub_conf
) {
534 int operate(const DoutPrefixProvider
*dpp
) override
{
536 get_bucket_info
.tenant
= conf
->user
.tenant
;
537 get_bucket_info
.bucket_name
= sub_conf
->data_bucket_name
;
538 sub
->get_bucket_info_result
= make_shared
<rgw_get_bucket_info_result
>();
540 for (i
= 0; i
< 2; ++i
) {
541 yield
call(new RGWGetBucketInfoCR(sync_env
->async_rados
,
544 sub
->get_bucket_info_result
,
546 if (retcode
< 0 && retcode
!= -ENOENT
) {
547 ldpp_dout(dpp
, 1) << "ERROR: failed to geting bucket info: " << "tenant="
548 << get_bucket_info
.tenant
<< " name=" << get_bucket_info
.bucket_name
<< ": ret=" << retcode
<< dendl
;
552 auto& result
= sub
->get_bucket_info_result
;
553 sub
->bucket_info
= &result
->bucket
->get_info();
555 int ret
= sub
->data_access
->get_bucket(result
->bucket
->get_info(), result
->bucket
->get_attrs(), &sub
->bucket
);
557 ldpp_dout(dpp
, 1) << "ERROR: data_access.get_bucket() bucket=" << result
->bucket
<< " failed, ret=" << ret
<< dendl
;
558 return set_cr_error(ret
);
562 yield
call(new InitBucketLifecycleCR(sc
, conf
,
563 sub
->get_bucket_info_result
->bucket
.get()));
565 ldpp_dout(dpp
, 1) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf
->data_bucket_name
<< ") ret=" << retcode
<< dendl
;
566 return set_cr_error(retcode
);
569 return set_cr_done();
572 create_bucket
.user_info
= sub
->env
->data_user_info
;
573 create_bucket
.bucket_name
= sub_conf
->data_bucket_name
;
574 ldpp_dout(dpp
, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub
->env
->data_user_info
, true) << dendl
;
575 yield
call(new RGWBucketCreateLocalCR(sync_env
->async_rados
,
580 ldpp_dout(dpp
, 1) << "ERROR: failed to create bucket: " << "tenant="
581 << get_bucket_info
.tenant
<< " name=" << get_bucket_info
.bucket_name
<< ": ret=" << retcode
<< dendl
;
582 return set_cr_error(retcode
);
585 /* second iteration: we got -ENOENT and created a bucket */
588 /* failed twice on -ENOENT, unexpected */
589 ldpp_dout(dpp
, 1) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info
.tenant
590 << " name=" << get_bucket_info
.bucket_name
<< dendl
;
591 return set_cr_error(-EIO
);
597 template<typename EventType
>
598 class StoreEventCR
: public RGWCoroutine
{
599 RGWDataSyncCtx
* const sc
;
600 RGWDataSyncEnv
* const sync_env
;
601 const PSSubscriptionRef sub
;
602 const PSEvent
<EventType
> pse
;
603 const string oid_prefix
;
606 StoreEventCR(RGWDataSyncCtx
* const _sc
,
607 const PSSubscriptionRef
& _sub
,
608 const EventRef
<EventType
>& _event
) : RGWCoroutine(_sc
->cct
),
609 sc(_sc
), sync_env(_sc
->env
),
612 oid_prefix(sub
->sub_conf
->data_oid_prefix
) {
615 int operate(const DoutPrefixProvider
*dpp
) override
{
616 rgw_object_simple_put_params put_obj
;
619 put_obj
.bucket
= sub
->bucket
;
620 put_obj
.key
= rgw_obj_key(oid_prefix
+ pse
.id());
622 pse
.format(&put_obj
.data
);
626 pse
.encode_event(bl
);
628 bl
.encode_base64(bl64
);
629 put_obj
.user_data
= bl64
.to_str();
632 yield
call(new RGWObjectSimplePutCR(sync_env
->async_rados
,
637 ldpp_dout(dpp
, 10) << "failed to store event: " << put_obj
.bucket
<< "/" << put_obj
.key
<< " ret=" << retcode
<< dendl
;
638 return set_cr_error(retcode
);
640 ldpp_dout(dpp
, 20) << "event stored: " << put_obj
.bucket
<< "/" << put_obj
.key
<< dendl
;
643 return set_cr_done();
649 template<typename EventType
>
650 class PushEventCR
: public RGWCoroutine
{
651 RGWDataSyncCtx
* const sc
;
652 RGWDataSyncEnv
* const sync_env
;
653 const EventRef
<EventType
> event
;
654 const PSSubConfigRef
& sub_conf
;
657 PushEventCR(RGWDataSyncCtx
* const _sc
,
658 const PSSubscriptionRef
& _sub
,
659 const EventRef
<EventType
>& _event
) : RGWCoroutine(_sc
->cct
),
660 sc(_sc
), sync_env(_sc
->env
),
662 sub_conf(_sub
->sub_conf
) {
665 int operate(const DoutPrefixProvider
*dpp
) override
{
667 ceph_assert(sub_conf
->push_endpoint
);
668 yield
call(sub_conf
->push_endpoint
->send_to_completion_async(*event
.get(), sync_env
));
671 ldpp_dout(dpp
, 10) << "failed to push event: " << event
->id
<<
672 " to endpoint: " << sub_conf
->push_endpoint_name
<< " ret=" << retcode
<< dendl
;
673 return set_cr_error(retcode
);
676 ldpp_dout(dpp
, 20) << "event: " << event
->id
<<
677 " pushed to endpoint: " << sub_conf
->push_endpoint_name
<< dendl
;
678 return set_cr_done();
685 PSSubscription(RGWDataSyncCtx
*_sc
,
687 PSSubConfigRef
& _sub_conf
) : sc(_sc
), sync_env(_sc
->env
),
690 data_access(std::make_shared
<RGWDataAccess
>(sync_env
->store
)) {}
692 PSSubscription(RGWDataSyncCtx
*_sc
,
694 rgw_pubsub_sub_config
& user_sub_conf
) : sc(_sc
), sync_env(_sc
->env
),
696 sub_conf(std::make_shared
<PSSubConfig
>()),
697 data_access(std::make_shared
<RGWDataAccess
>(sync_env
->store
)) {
698 sub_conf
->from_user_conf(sync_env
->cct
, user_sub_conf
, sync_env
->dpp
);
700 virtual ~PSSubscription() {
707 static PSSubscriptionRef
get_shared(RGWDataSyncCtx
*_sc
,
710 auto sub
= std::make_shared
<PSSubscription
>(_sc
, _env
, _sub_conf
);
711 sub
->init_cr
= new InitCR(_sc
, sub
);
716 int call_init_cr(const DoutPrefixProvider
*dpp
, RGWCoroutine
*caller
) {
717 return init_cr
->execute(dpp
, caller
);
720 template<typename EventType
>
721 static RGWCoroutine
*store_event_cr(RGWDataSyncCtx
* const sc
, const PSSubscriptionRef
& sub
, const EventRef
<EventType
>& event
) {
722 return new StoreEventCR
<EventType
>(sc
, sub
, event
);
725 template<typename EventType
>
726 static RGWCoroutine
*push_event_cr(RGWDataSyncCtx
* const sc
, const PSSubscriptionRef
& sub
, const EventRef
<EventType
>& event
) {
727 return new PushEventCR
<EventType
>(sc
, sub
, event
);
735 RGWDataSyncEnv
*sync_env
;
738 std::map
<string
, PSSubscriptionRef
> subs
;
740 class GetSubCR
: public RGWSingletonCR
<PSSubscriptionRef
> {
742 RGWDataSyncEnv
*sync_env
;
747 PSSubscriptionRef
*ref
;
751 PSSubConfigRef sub_conf
;
752 rgw_pubsub_sub_config user_sub_conf
;
755 GetSubCR(RGWDataSyncCtx
*_sc
,
757 const rgw_user
& _owner
,
758 const string
& _sub_name
,
759 PSSubscriptionRef
*_ref
) : RGWSingletonCR
<PSSubscriptionRef
>(_sc
->cct
),
760 sc(_sc
), sync_env(_sc
->env
),
765 conf(mgr
->env
->conf
) {
769 int operate(const DoutPrefixProvider
*dpp
) override
{
772 ldpp_dout(dpp
, 1) << "ERROR: missing user info when getting subscription: " << sub_name
<< dendl
;
773 mgr
->remove_get_sub(owner
, sub_name
);
774 return set_cr_error(-EINVAL
);
776 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_pubsub_sub_config
>;
778 RGWPubSub
ps(sync_env
->store
, owner
.tenant
);
780 ps
.get_sub_meta_obj(sub_name
, &obj
);
781 bool empty_on_enoent
= false;
782 call(new ReadInfoCR(dpp
, sync_env
->async_rados
, sync_env
->store
->svc()->sysobj
,
784 &user_sub_conf
, empty_on_enoent
));
787 mgr
->remove_get_sub(owner
, sub_name
);
788 return set_cr_error(retcode
);
791 *ref
= PSSubscription::get_shared(sc
, mgr
->env
, user_sub_conf
);
794 yield (*ref
)->call_init_cr(dpp
, this);
796 ldpp_dout(dpp
, 1) << "ERROR: failed to init subscription when getting subscription: " << sub_name
<< dendl
;
797 mgr
->remove_get_sub(owner
, sub_name
);
798 return set_cr_error(retcode
);
801 mgr
->remove_get_sub(owner
, sub_name
);
803 return set_cr_done();
808 void return_result(const DoutPrefixProvider
*dpp
, PSSubscriptionRef
*result
) override
{
809 ldpp_dout(dpp
, 20) << __func__
<< "(): returning result: retcode=" << retcode
<< " resultp=" << (void *)result
<< dendl
;
816 string
sub_id(const rgw_user
& owner
, const string
& sub_name
) {
818 if (!owner
.empty()) {
819 owner_prefix
= owner
.to_str() + "/";
822 return owner_prefix
+ sub_name
;
825 std::map
<std::string
, GetSubCR
*> get_subs
;
827 GetSubCR
*& get_get_subs(const rgw_user
& owner
, const string
& name
) {
828 return get_subs
[sub_id(owner
, name
)];
831 void remove_get_sub(const rgw_user
& owner
, const string
& name
) {
832 get_subs
.erase(sub_id(owner
, name
));
835 bool find_sub_instance(const rgw_user
& owner
, const string
& sub_name
, PSSubscriptionRef
*sub
) {
836 auto iter
= subs
.find(sub_id(owner
, sub_name
));
837 if (iter
!= subs
.end()) {
844 PSManager(RGWDataSyncCtx
*_sc
,
845 PSEnvRef _env
) : sc(_sc
), sync_env(_sc
->env
),
849 static PSManagerRef
get_shared(RGWDataSyncCtx
*_sc
,
851 return std::shared_ptr
<PSManager
>(new PSManager(_sc
, _env
));
854 static int call_get_subscription_cr(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, PSManagerRef
& mgr
,
855 RGWCoroutine
*caller
, const rgw_user
& owner
, const string
& sub_name
, PSSubscriptionRef
*ref
) {
856 if (mgr
->find_sub_instance(owner
, sub_name
, ref
)) {
857 /* found it! nothing to execute */
858 ldpp_dout(dpp
, 20) << __func__
<< "(): found sub instance" << dendl
;
860 auto& gs
= mgr
->get_get_subs(owner
, sub_name
);
862 ldpp_dout(dpp
, 20) << __func__
<< "(): first get subs" << dendl
;
863 gs
= new GetSubCR(sc
, mgr
, owner
, sub_name
, ref
);
865 ldpp_dout(dpp
, 20) << __func__
<< "(): executing get subs" << dendl
;
866 return gs
->execute(dpp
, caller
, ref
);
869 friend class GetSubCR
;
872 void PSEnv::init_instance(const RGWRealm
& realm
, uint64_t instance_id
, PSManagerRef
& mgr
) {
874 conf
->init_instance(realm
, instance_id
);
877 class RGWPSInitEnvCBCR
: public RGWCoroutine
{
879 RGWDataSyncEnv
*sync_env
;
883 rgw_user_create_params create_user
;
884 rgw_get_user_info_params get_user_info
;
886 RGWPSInitEnvCBCR(RGWDataSyncCtx
*_sc
,
887 PSEnvRef
& _env
) : RGWCoroutine(_sc
->cct
),
888 sc(_sc
), sync_env(_sc
->env
),
889 env(_env
), conf(env
->conf
) {}
890 int operate(const DoutPrefixProvider
*dpp
) override
{
892 ldpp_dout(dpp
, 1) << ": init pubsub config zone=" << sc
->source_zone
<< dendl
;
894 /* nothing to do here right now */
895 create_user
.user
= conf
->user
;
896 create_user
.max_buckets
= 0; /* unlimited */
897 create_user
.display_name
= "pubsub";
898 create_user
.generate_key
= false;
899 yield
call(new RGWUserCreateCR(sync_env
->async_rados
, sync_env
->store
, create_user
, dpp
));
900 if (retcode
< 0 && retcode
!= -ERR_USER_EXIST
) {
901 ldpp_dout(dpp
, 1) << "ERROR: failed to create rgw user: ret=" << retcode
<< dendl
;
902 return set_cr_error(retcode
);
905 get_user_info
.user
= conf
->user
;
906 yield
call(new RGWGetUserInfoCR(sync_env
->async_rados
, sync_env
->store
, get_user_info
, env
->data_user_info
, dpp
));
908 ldpp_dout(dpp
, 1) << "ERROR: failed to create rgw user: ret=" << retcode
<< dendl
;
909 return set_cr_error(retcode
);
912 ldpp_dout(dpp
, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env
->data_user_info
, true) << dendl
;
915 return set_cr_done();
921 bool match(const rgw_pubsub_topic_filter
& filter
, const std::string
& key_name
, rgw::notify::EventType event_type
) {
922 if (!match(filter
.events
, event_type
)) {
925 if (!match(filter
.s3_filter
.key_filter
, key_name
)) {
931 class RGWPSFindBucketTopicsCR
: public RGWCoroutine
{
933 RGWDataSyncEnv
*sync_env
;
938 rgw::notify::EventType event_type
;
942 rgw_raw_obj bucket_obj
;
943 rgw_raw_obj user_obj
;
944 rgw_pubsub_bucket_topics bucket_topics
;
945 rgw_pubsub_topics user_topics
;
948 RGWPSFindBucketTopicsCR(RGWDataSyncCtx
*_sc
,
950 const rgw_user
& _owner
,
951 const rgw_bucket
& _bucket
,
952 const rgw_obj_key
& _key
,
953 rgw::notify::EventType _event_type
,
954 TopicsRef
*_topics
) : RGWCoroutine(_sc
->cct
),
955 sc(_sc
), sync_env(_sc
->env
),
960 event_type(_event_type
),
961 ps(sync_env
->store
, owner
.tenant
),
963 *topics
= std::make_shared
<vector
<PSTopicConfigRef
> >();
965 int operate(const DoutPrefixProvider
*dpp
) override
{
967 ps
.get_bucket_meta_obj(bucket
, &bucket_obj
);
968 ps
.get_meta_obj(&user_obj
);
970 using ReadInfoCR
= RGWSimpleRadosReadCR
<rgw_pubsub_bucket_topics
>;
972 bool empty_on_enoent
= true;
973 call(new ReadInfoCR(dpp
, sync_env
->async_rados
, sync_env
->store
->svc()->sysobj
,
975 &bucket_topics
, empty_on_enoent
));
977 if (retcode
< 0 && retcode
!= -ENOENT
) {
978 return set_cr_error(retcode
);
981 ldpp_dout(dpp
, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics
.topics
.size() << " topics for bucket " << bucket
<< dendl
;
983 if (!bucket_topics
.topics
.empty()) {
984 using ReadUserTopicsInfoCR
= RGWSimpleRadosReadCR
<rgw_pubsub_topics
>;
986 bool empty_on_enoent
= true;
987 call(new ReadUserTopicsInfoCR(dpp
, sync_env
->async_rados
, sync_env
->store
->svc()->sysobj
,
989 &user_topics
, empty_on_enoent
));
991 if (retcode
< 0 && retcode
!= -ENOENT
) {
992 return set_cr_error(retcode
);
996 for (auto& titer
: bucket_topics
.topics
) {
997 auto& topic_filter
= titer
.second
;
998 auto& info
= topic_filter
.topic
;
999 if (!match(topic_filter
, key
.name
, event_type
)) {
1002 std::shared_ptr
<PSTopicConfig
> tc
= std::make_shared
<PSTopicConfig
>();
1003 tc
->name
= info
.name
;
1004 tc
->subs
= user_topics
.topics
[info
.name
].subs
;
1005 tc
->opaque_data
= info
.opaque_data
;
1006 (*topics
)->push_back(tc
);
1009 return set_cr_done();
1015 class RGWPSHandleObjEventCR
: public RGWCoroutine
{
1016 RGWDataSyncCtx
* const sc
;
1018 const rgw_user owner
;
1019 const EventRef
<rgw_pubsub_event
> event
;
1020 const EventRef
<rgw_pubsub_s3_event
> s3_event
;
1021 const TopicsRef topics
;
1022 bool has_subscriptions
;
1024 PSSubscriptionRef sub
;
1025 std::vector
<PSTopicConfigRef
>::const_iterator titer
;
1026 std::set
<std::string
>::const_iterator siter
;
1029 RGWPSHandleObjEventCR(RGWDataSyncCtx
* const _sc
,
1030 const PSEnvRef _env
,
1031 const rgw_user
& _owner
,
1032 const EventRef
<rgw_pubsub_event
>& _event
,
1033 const EventRef
<rgw_pubsub_s3_event
>& _s3_event
,
1034 const TopicsRef
& _topics
) : RGWCoroutine(_sc
->cct
),
1039 s3_event(_s3_event
),
1041 has_subscriptions(false),
1042 event_handled(false) {}
1044 int operate(const DoutPrefixProvider
*dpp
) override
{
1046 ldpp_dout(dpp
, 20) << ": handle event: obj: z=" << sc
->source_zone
1047 << " event=" << json_str("event", *event
, false)
1048 << " owner=" << owner
<< dendl
;
1050 ldpp_dout(dpp
, 20) << "pubsub: " << topics
->size() << " topics found for path" << dendl
;
1052 // outside caller should check that
1053 ceph_assert(!topics
->empty());
1055 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_event_triggered
);
1057 // loop over all topics related to the bucket/object
1058 for (titer
= topics
->begin(); titer
!= topics
->end(); ++titer
) {
1059 ldpp_dout(dpp
, 20) << ": notification for " << event
->source
<< ": topic=" <<
1060 (*titer
)->name
<< ", has " << (*titer
)->subs
.size() << " subscriptions" << dendl
;
1061 // loop over all subscriptions of the topic
1062 for (siter
= (*titer
)->subs
.begin(); siter
!= (*titer
)->subs
.end(); ++siter
) {
1063 ldpp_dout(dpp
, 20) << ": subscription: " << *siter
<< dendl
;
1064 has_subscriptions
= true;
1065 // try to read subscription configuration
1066 yield
PSManager::call_get_subscription_cr(dpp
, sc
, env
->manager
, this, owner
, *siter
, &sub
);
1068 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_missing_conf
);
1069 ldpp_dout(dpp
, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
1070 << " ret=" << retcode
<< dendl
;
1071 if (retcode
== -ENOENT
) {
1072 // missing subscription info should be reflected back as invalid argument
1073 // and not as missing object
1076 // try the next subscription
1079 if (sub
->sub_conf
->s3_id
.empty()) {
1080 // subscription was not made by S3 compatible API
1081 ldpp_dout(dpp
, 20) << "storing event for subscription=" << *siter
<< " owner=" << owner
<< " ret=" << retcode
<< dendl
;
1082 yield
call(PSSubscription::store_event_cr(sc
, sub
, event
));
1084 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_store_fail
);
1085 ldpp_dout(dpp
, 1) << "ERROR: failed to store event for subscription=" << *siter
<< " ret=" << retcode
<< dendl
;
1087 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_store_ok
);
1088 event_handled
= true;
1090 if (sub
->sub_conf
->push_endpoint
) {
1091 ldpp_dout(dpp
, 20) << "push event for subscription=" << *siter
<< " owner=" << owner
<< " ret=" << retcode
<< dendl
;
1092 yield
call(PSSubscription::push_event_cr(sc
, sub
, event
));
1094 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
1095 ldpp_dout(dpp
, 1) << "ERROR: failed to push event for subscription=" << *siter
<< " ret=" << retcode
<< dendl
;
1097 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_ok
);
1098 event_handled
= true;
1102 // subscription was made by S3 compatible API
1103 ldpp_dout(dpp
, 20) << "storing s3 event for subscription=" << *siter
<< " owner=" << owner
<< " ret=" << retcode
<< dendl
;
1104 s3_event
->configurationId
= sub
->sub_conf
->s3_id
;
1105 s3_event
->opaque_data
= (*titer
)->opaque_data
;
1106 yield
call(PSSubscription::store_event_cr(sc
, sub
, s3_event
));
1108 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_store_fail
);
1109 ldpp_dout(dpp
, 1) << "ERROR: failed to store s3 event for subscription=" << *siter
<< " ret=" << retcode
<< dendl
;
1111 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_store_ok
);
1112 event_handled
= true;
1114 if (sub
->sub_conf
->push_endpoint
) {
1115 ldpp_dout(dpp
, 20) << "push s3 event for subscription=" << *siter
<< " owner=" << owner
<< " ret=" << retcode
<< dendl
;
1116 yield
call(PSSubscription::push_event_cr(sc
, sub
, s3_event
));
1118 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
1119 ldpp_dout(dpp
, 1) << "ERROR: failed to push s3 event for subscription=" << *siter
<< " ret=" << retcode
<< dendl
;
1121 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_ok
);
1122 event_handled
= true;
1128 if (has_subscriptions
&& !event_handled
) {
1129 // event is considered "lost" of it has subscriptions on any of its topics
1130 // but it was not stored in, or pushed to, any of them
1131 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_event_lost
);
1134 return set_cr_error(retcode
);
1136 return set_cr_done();
1142 // coroutine invoked on remote object creation
1143 class RGWPSHandleRemoteObjCBCR
: public RGWStatRemoteObjCBCR
{
1145 rgw_bucket_sync_pipe sync_pipe
;
1147 std::optional
<uint64_t> versioned_epoch
;
1148 EventRef
<rgw_pubsub_event
> event
;
1149 EventRef
<rgw_pubsub_s3_event
> s3_event
;
1152 RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx
*_sc
,
1153 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
,
1154 PSEnvRef _env
, std::optional
<uint64_t> _versioned_epoch
,
1155 TopicsRef
& _topics
) : RGWStatRemoteObjCBCR(_sc
, _sync_pipe
.info
.source_bs
.bucket
, _key
),
1157 sync_pipe(_sync_pipe
),
1159 versioned_epoch(_versioned_epoch
),
1162 int operate(const DoutPrefixProvider
*dpp
) override
{
1164 ldpp_dout(dpp
, 20) << ": stat of remote obj: z=" << sc
->source_zone
1165 << " b=" << sync_pipe
.info
.source_bs
.bucket
<< " k=" << key
<< " size=" << size
<< " mtime=" << mtime
1166 << " attrs=" << attrs
<< dendl
;
1168 std::vector
<std::pair
<std::string
, std::string
> > attrs
;
1169 for (auto& attr
: attrs
) {
1170 std::string k
= attr
.first
;
1171 if (boost::algorithm::starts_with(k
, RGW_ATTR_PREFIX
)) {
1172 k
= k
.substr(sizeof(RGW_ATTR_PREFIX
) - 1);
1174 attrs
.push_back(std::make_pair(k
, attr
.second
));
1176 // at this point we don't know whether we need the ceph event or S3 event
1177 // this is why both are created here, once we have information about the
1178 // subscription, we will store/push only the relevant ones
1179 make_event_ref(sc
->cct
,
1180 sync_pipe
.info
.source_bs
.bucket
, key
,
1182 rgw::notify::ObjectCreated
, &event
);
1183 make_s3_event_ref(sc
->cct
,
1184 sync_pipe
.info
.source_bs
.bucket
, sync_pipe
.dest_bucket_info
.owner
, key
,
1186 rgw::notify::ObjectCreated
, &s3_event
);
1189 yield
call(new RGWPSHandleObjEventCR(sc
, env
, sync_pipe
.source_bucket_info
.owner
, event
, s3_event
, topics
));
1191 return set_cr_error(retcode
);
1193 return set_cr_done();
1199 class RGWPSHandleRemoteObjCR
: public RGWCallStatRemoteObjCR
{
1200 rgw_bucket_sync_pipe sync_pipe
;
1202 std::optional
<uint64_t> versioned_epoch
;
1205 RGWPSHandleRemoteObjCR(RGWDataSyncCtx
*_sc
,
1206 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
,
1207 PSEnvRef _env
, std::optional
<uint64_t> _versioned_epoch
,
1208 TopicsRef
& _topics
) : RGWCallStatRemoteObjCR(_sc
, _sync_pipe
.info
.source_bs
.bucket
, _key
),
1209 sync_pipe(_sync_pipe
),
1210 env(_env
), versioned_epoch(_versioned_epoch
),
1214 ~RGWPSHandleRemoteObjCR() override
{}
1216 RGWStatRemoteObjCBCR
*allocate_callback() override
{
1217 return new RGWPSHandleRemoteObjCBCR(sc
, sync_pipe
, key
, env
, versioned_epoch
, topics
);
1221 class RGWPSHandleObjCreateCR
: public RGWCoroutine
{
1223 rgw_bucket_sync_pipe sync_pipe
;
1226 std::optional
<uint64_t> versioned_epoch
;
1229 RGWPSHandleObjCreateCR(RGWDataSyncCtx
*_sc
,
1230 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
,
1231 PSEnvRef _env
, std::optional
<uint64_t> _versioned_epoch
) : RGWCoroutine(_sc
->cct
),
1233 sync_pipe(_sync_pipe
),
1236 versioned_epoch(_versioned_epoch
) {
1239 ~RGWPSHandleObjCreateCR() override
{}
1241 int operate(const DoutPrefixProvider
*dpp
) override
{
1243 yield
call(new RGWPSFindBucketTopicsCR(sc
, env
, sync_pipe
.dest_bucket_info
.owner
,
1244 sync_pipe
.info
.source_bs
.bucket
, key
,
1245 rgw::notify::ObjectCreated
,
1248 ldpp_dout(dpp
, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode
<< dendl
;
1249 return set_cr_error(retcode
);
1251 if (topics
->empty()) {
1252 ldpp_dout(dpp
, 20) << "no topics found for " << sync_pipe
.info
.source_bs
.bucket
<< "/" << key
<< dendl
;
1253 return set_cr_done();
1255 yield
call(new RGWPSHandleRemoteObjCR(sc
, sync_pipe
, key
, env
, versioned_epoch
, topics
));
1257 return set_cr_error(retcode
);
1259 return set_cr_done();
1265 // coroutine invoked on remote object deletion
1266 class RGWPSGenericObjEventCBCR
: public RGWCoroutine
{
1272 ceph::real_time mtime
;
1273 rgw::notify::EventType event_type
;
1274 EventRef
<rgw_pubsub_event
> event
;
1275 EventRef
<rgw_pubsub_s3_event
> s3_event
;
1278 RGWPSGenericObjEventCBCR(RGWDataSyncCtx
*_sc
,
1280 rgw_bucket_sync_pipe
& _sync_pipe
, rgw_obj_key
& _key
, const ceph::real_time
& _mtime
,
1281 rgw::notify::EventType _event_type
) : RGWCoroutine(_sc
->cct
),
1284 owner(_sync_pipe
.dest_bucket_info
.owner
),
1285 bucket(_sync_pipe
.dest_bucket_info
.bucket
),
1287 mtime(_mtime
), event_type(_event_type
) {}
1288 int operate(const DoutPrefixProvider
*dpp
) override
{
1290 ldpp_dout(dpp
, 20) << ": remove remote obj: z=" << sc
->source_zone
1291 << " b=" << bucket
<< " k=" << key
<< " mtime=" << mtime
<< dendl
;
1292 yield
call(new RGWPSFindBucketTopicsCR(sc
, env
, owner
, bucket
, key
, event_type
, &topics
));
1294 ldpp_dout(dpp
, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode
<< dendl
;
1295 return set_cr_error(retcode
);
1297 if (topics
->empty()) {
1298 ldpp_dout(dpp
, 20) << "no topics found for " << bucket
<< "/" << key
<< dendl
;
1299 return set_cr_done();
1301 // at this point we don't know whether we need the ceph event or S3 event
1302 // this is why both are created here, once we have information about the
1303 // subscription, we will store/push only the relevant ones
1304 make_event_ref(sc
->cct
,
1307 event_type
, &event
);
1308 make_s3_event_ref(sc
->cct
,
1311 event_type
, &s3_event
);
1312 yield
call(new RGWPSHandleObjEventCR(sc
, env
, owner
, event
, s3_event
, topics
));
1314 return set_cr_error(retcode
);
1316 return set_cr_done();
1323 class RGWPSDataSyncModule
: public RGWDataSyncModule
{
1328 RGWPSDataSyncModule(CephContext
*cct
, const JSONFormattable
& config
) : env(std::make_shared
<PSEnv
>()), conf(env
->conf
) {
1329 env
->init(cct
, config
);
1332 ~RGWPSDataSyncModule() override
{}
1334 void init(RGWDataSyncCtx
*sc
, uint64_t instance_id
) override
{
1335 auto sync_env
= sc
->env
;
1336 PSManagerRef mgr
= PSManager::get_shared(sc
, env
);
1337 env
->init_instance(sync_env
->svc
->zone
->get_realm(), instance_id
, mgr
);
1340 RGWCoroutine
*start_sync(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
) override
{
1341 ldpp_dout(dpp
, 5) << conf
->id
<< ": start" << dendl
;
1342 return new RGWPSInitEnvCBCR(sc
, env
);
1345 RGWCoroutine
*sync_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
,
1346 rgw_obj_key
& key
, std::optional
<uint64_t> versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
1347 ldpp_dout(dpp
, 10) << conf
->id
<< ": sync_object: b=" << sync_pipe
<<
1348 " k=" << key
<< " versioned_epoch=" << versioned_epoch
.value_or(0) << dendl
;
1349 return new RGWPSHandleObjCreateCR(sc
, sync_pipe
, key
, env
, versioned_epoch
);
1352 RGWCoroutine
*remove_object(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
,
1353 rgw_obj_key
& key
, real_time
& mtime
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
1354 ldpp_dout(dpp
, 10) << conf
->id
<< ": rm_object: b=" << sync_pipe
<<
1355 " k=" << key
<< " mtime=" << mtime
<< " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
1356 return new RGWPSGenericObjEventCBCR(sc
, env
, sync_pipe
, key
, mtime
, rgw::notify::ObjectRemovedDelete
);
1359 RGWCoroutine
*create_delete_marker(const DoutPrefixProvider
*dpp
, RGWDataSyncCtx
*sc
, rgw_bucket_sync_pipe
& sync_pipe
,
1360 rgw_obj_key
& key
, real_time
& mtime
, rgw_bucket_entry_owner
& owner
, bool versioned
, uint64_t versioned_epoch
, rgw_zone_set
*zones_trace
) override
{
1361 ldpp_dout(dpp
, 10) << conf
->id
<< ": create_delete_marker: b=" << sync_pipe
<<
1362 " k=" << key
<< " mtime=" << mtime
<< " versioned=" << versioned
<< " versioned_epoch=" << versioned_epoch
<< dendl
;
1363 return new RGWPSGenericObjEventCBCR(sc
, env
, sync_pipe
, key
, mtime
, rgw::notify::ObjectRemovedDeleteMarkerCreated
);
1366 PSConfigRef
& get_conf() { return conf
; }
1369 RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
)
1371 data_handler
= std::unique_ptr
<RGWPSDataSyncModule
>(new RGWPSDataSyncModule(cct
, config
));
1372 const std::string jconf
= json_str("conf", *data_handler
->get_conf());
1374 if (!p
.parse(jconf
.c_str(), jconf
.size())) {
1375 ldpp_dout(dpp
, 1) << "ERROR: failed to parse sync module effective conf: " << jconf
<< dendl
;
1376 effective_conf
= config
;
1378 effective_conf
.decode_json(&p
);
1382 RGWDataSyncModule
*RGWPSSyncModuleInstance::get_data_handler()
1384 return data_handler
.get();
1387 RGWRESTMgr
*RGWPSSyncModuleInstance::get_rest_filter(int dialect
, RGWRESTMgr
*orig
) {
1388 if (dialect
!= RGW_REST_S3
) {
1391 return new RGWRESTMgr_PubSub();
1394 bool RGWPSSyncModuleInstance::should_full_sync() const {
1395 return data_handler
->get_conf()->start_with_full_sync
;
1398 int RGWPSSyncModule::create_instance(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const JSONFormattable
& config
, RGWSyncModuleInstanceRef
*instance
) {
1399 instance
->reset(new RGWPSSyncModuleInstance(dpp
, cct
, config
));