1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_PUBSUB_H
5 #define CEPH_RGW_PUBSUB_H
7 #include "rgw_common.h"
10 #include "rgw_rados.h"
11 #include "rgw_notify_event_type.h"
12 #include "services/svc_sys_obj.h"
16 struct rgw_s3_key_filter
{
17 std::string prefix_rule
;
18 std::string suffix_rule
;
19 std::string regex_rule
;
21 bool has_content() const;
23 bool decode_xml(XMLObj
*obj
);
24 void dump_xml(Formatter
*f
) const;
26 void encode(bufferlist
& bl
) const {
27 ENCODE_START(1, 1, bl
);
28 encode(prefix_rule
, bl
);
29 encode(suffix_rule
, bl
);
30 encode(regex_rule
, bl
);
34 void decode(bufferlist::const_iterator
& bl
) {
36 decode(prefix_rule
, bl
);
37 decode(suffix_rule
, bl
);
38 decode(regex_rule
, bl
);
42 WRITE_CLASS_ENCODER(rgw_s3_key_filter
)
44 using Metadata
= std::map
<std::string
, std::string
>;
46 struct rgw_s3_metadata_filter
{
49 bool has_content() const;
51 bool decode_xml(XMLObj
*obj
);
52 void dump_xml(Formatter
*f
) const;
54 void encode(bufferlist
& bl
) const {
55 ENCODE_START(1, 1, bl
);
59 void decode(bufferlist::const_iterator
& bl
) {
65 WRITE_CLASS_ENCODER(rgw_s3_metadata_filter
)
67 struct rgw_s3_filter
{
68 rgw_s3_key_filter key_filter
;
69 rgw_s3_metadata_filter metadata_filter
;
71 bool has_content() const;
73 bool decode_xml(XMLObj
*obj
);
74 void dump_xml(Formatter
*f
) const;
76 void encode(bufferlist
& bl
) const {
77 ENCODE_START(1, 1, bl
);
78 encode(key_filter
, bl
);
79 encode(metadata_filter
, bl
);
83 void decode(bufferlist::const_iterator
& bl
) {
85 decode(key_filter
, bl
);
86 decode(metadata_filter
, bl
);
90 WRITE_CLASS_ENCODER(rgw_s3_filter
)
92 using OptionalFilter
= std::optional
<rgw_s3_filter
>;
94 class rgw_pubsub_topic_filter
;
95 /* S3 notification configuration
96 * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
97 <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
113 <Id>notification1</Id>
114 <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
115 <Event>s3:ObjectCreated:*</Event>
116 <Event>s3:ObjectRemoved:*</Event>
117 </TopicConfiguration>
118 </NotificationConfiguration>
120 struct rgw_pubsub_s3_notification
{
124 rgw::notify::EventTypeList events
;
126 std::string topic_arn
;
128 rgw_s3_filter filter
;
130 bool decode_xml(XMLObj
*obj
);
131 void dump_xml(Formatter
*f
) const;
133 rgw_pubsub_s3_notification() = default;
134 // construct from rgw_pubsub_topic_filter (used by get/list notifications)
135 rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter
& topic_filter
);
138 // return true if the key matches the prefix/suffix/regex rules of the key filter
139 bool match(const rgw_s3_key_filter
& filter
, const std::string
& key
);
140 // return true if the key matches the metadata rules of the metadata filter
141 bool match(const rgw_s3_metadata_filter
& filter
, const Metadata
& metadata
);
142 // return true if the event type matches (equal or contained in) one of the events in the list
143 bool match(const rgw::notify::EventTypeList
& events
, rgw::notify::EventType event
);
145 struct rgw_pubsub_s3_notifications
{
146 std::list
<rgw_pubsub_s3_notification
> list
;
147 bool decode_xml(XMLObj
*obj
);
148 void dump_xml(Formatter
*f
) const;
151 /* S3 event records structure
152 * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
164 "requestParameters":{
168 "x-amz-request-id":"",
172 "s3SchemaVersion":"1.0",
173 "configurationId":"",
196 struct rgw_pubsub_s3_record
{
197 constexpr static const char* const json_type_single
= "Record";
198 constexpr static const char* const json_type_plural
= "Records";
200 std::string eventVersion
;
202 std::string eventSource
;
204 std::string awsRegion
;
205 // time of the request
206 ceph::real_time eventTime
;
208 std::string eventName
;
209 // user that sent the requet (not implemented)
210 std::string userIdentity
;
211 // IP address of source of the request (not implemented)
212 std::string sourceIPAddress
;
213 // request ID (not implemented)
214 std::string x_amz_request_id
;
215 // radosgw that received the request
216 std::string x_amz_id_2
;
218 std::string s3SchemaVersion
;
219 // ID received in the notification request
220 std::string configurationId
;
222 std::string bucket_name
;
223 // bucket owner (not implemented)
224 std::string bucket_ownerIdentity
;
226 std::string bucket_arn
;
228 std::string object_key
;
229 // object size (not implemented)
230 uint64_t object_size
;
232 std::string object_etag
;
233 // object version id bucket is versioned
234 std::string object_versionId
;
235 // hexadecimal value used to determine event order for specific key
236 std::string object_sequencer
;
237 // this is an rgw extension (not S3 standard)
238 // used to store a globally unique identifier of the event
239 // that could be used for acking
241 // this is an rgw extension holding the internal bucket id
242 std::string bucket_id
;
244 std::map
<std::string
, std::string
> x_meta_map
;
246 void encode(bufferlist
& bl
) const {
247 ENCODE_START(2, 1, bl
);
248 encode(eventVersion
, bl
);
249 encode(eventSource
, bl
);
250 encode(awsRegion
, bl
);
251 encode(eventTime
, bl
);
252 encode(eventName
, bl
);
253 encode(userIdentity
, bl
);
254 encode(sourceIPAddress
, bl
);
255 encode(x_amz_request_id
, bl
);
256 encode(x_amz_id_2
, bl
);
257 encode(s3SchemaVersion
, bl
);
258 encode(configurationId
, bl
);
259 encode(bucket_name
, bl
);
260 encode(bucket_ownerIdentity
, bl
);
261 encode(bucket_arn
, bl
);
262 encode(object_key
, bl
);
263 encode(object_size
, bl
);
264 encode(object_etag
, bl
);
265 encode(object_versionId
, bl
);
266 encode(object_sequencer
, bl
);
268 encode(bucket_id
, bl
);
269 encode(x_meta_map
, bl
);
273 void decode(bufferlist::const_iterator
& bl
) {
275 decode(eventVersion
, bl
);
276 decode(eventSource
, bl
);
277 decode(awsRegion
, bl
);
278 decode(eventTime
, bl
);
279 decode(eventName
, bl
);
280 decode(userIdentity
, bl
);
281 decode(sourceIPAddress
, bl
);
282 decode(x_amz_request_id
, bl
);
283 decode(x_amz_id_2
, bl
);
284 decode(s3SchemaVersion
, bl
);
285 decode(configurationId
, bl
);
286 decode(bucket_name
, bl
);
287 decode(bucket_ownerIdentity
, bl
);
288 decode(bucket_arn
, bl
);
289 decode(object_key
, bl
);
290 decode(object_size
, bl
);
291 decode(object_etag
, bl
);
292 decode(object_versionId
, bl
);
293 decode(object_sequencer
, bl
);
296 decode(bucket_id
, bl
);
297 decode(x_meta_map
, bl
);
302 void dump(Formatter
*f
) const;
304 WRITE_CLASS_ENCODER(rgw_pubsub_s3_record
)
306 struct rgw_pubsub_event
{
307 constexpr static const char* const json_type_single
= "event";
308 constexpr static const char* const json_type_plural
= "events";
310 std::string event_name
;
312 ceph::real_time timestamp
;
313 JSONFormattable info
;
315 void encode(bufferlist
& bl
) const {
316 ENCODE_START(1, 1, bl
);
318 encode(event_name
, bl
);
320 encode(timestamp
, bl
);
325 void decode(bufferlist::const_iterator
& bl
) {
328 decode(event_name
, bl
);
330 decode(timestamp
, bl
);
335 void dump(Formatter
*f
) const;
337 WRITE_CLASS_ENCODER(rgw_pubsub_event
)
339 struct rgw_pubsub_sub_dest
{
340 std::string bucket_name
;
341 std::string oid_prefix
;
342 std::string push_endpoint
;
343 std::string push_endpoint_args
;
344 std::string arn_topic
;
346 void encode(bufferlist
& bl
) const {
347 ENCODE_START(3, 1, bl
);
348 encode(bucket_name
, bl
);
349 encode(oid_prefix
, bl
);
350 encode(push_endpoint
, bl
);
351 encode(push_endpoint_args
, bl
);
352 encode(arn_topic
, bl
);
356 void decode(bufferlist::const_iterator
& bl
) {
358 decode(bucket_name
, bl
);
359 decode(oid_prefix
, bl
);
360 decode(push_endpoint
, bl
);
362 decode(push_endpoint_args
, bl
);
365 decode(arn_topic
, bl
);
370 void dump(Formatter
*f
) const;
371 void dump_xml(Formatter
*f
) const;
373 WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest
)
375 struct rgw_pubsub_sub_config
{
379 rgw_pubsub_sub_dest dest
;
382 void encode(bufferlist
& bl
) const {
383 ENCODE_START(2, 1, bl
);
392 void decode(bufferlist::const_iterator
& bl
) {
404 void dump(Formatter
*f
) const;
406 WRITE_CLASS_ENCODER(rgw_pubsub_sub_config
)
408 struct rgw_pubsub_topic
{
411 rgw_pubsub_sub_dest dest
;
414 void encode(bufferlist
& bl
) const {
415 ENCODE_START(2, 1, bl
);
423 void decode(bufferlist::const_iterator
& bl
) {
434 string
to_str() const {
435 return user
.to_str() + "/" + name
;
438 void dump(Formatter
*f
) const;
439 void dump_xml(Formatter
*f
) const;
441 bool operator<(const rgw_pubsub_topic
& t
) const {
442 return to_str().compare(t
.to_str());
445 WRITE_CLASS_ENCODER(rgw_pubsub_topic
)
447 struct rgw_pubsub_topic_subs
{
448 rgw_pubsub_topic topic
;
449 std::set
<std::string
> subs
;
451 void encode(bufferlist
& bl
) const {
452 ENCODE_START(1, 1, bl
);
458 void decode(bufferlist::const_iterator
& bl
) {
465 void dump(Formatter
*f
) const;
467 WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs
)
469 struct rgw_pubsub_topic_filter
{
470 rgw_pubsub_topic topic
;
471 rgw::notify::EventTypeList events
;
473 rgw_s3_filter s3_filter
;
475 void encode(bufferlist
& bl
) const {
476 ENCODE_START(3, 1, bl
);
478 // events are stored as a vector of strings
479 std::vector
<std::string
> tmp_events
;
480 const auto converter
= s3_id
.empty() ? rgw::notify::to_ceph_string
: rgw::notify::to_string
;
481 std::transform(events
.begin(), events
.end(), std::back_inserter(tmp_events
), converter
);
482 encode(tmp_events
, bl
);
484 encode(s3_filter
, bl
);
488 void decode(bufferlist::const_iterator
& bl
) {
491 // events are stored as a vector of strings
493 std::vector
<std::string
> tmp_events
;
494 decode(tmp_events
, bl
);
495 std::transform(tmp_events
.begin(), tmp_events
.end(), std::back_inserter(events
), rgw::notify::from_string
);
500 decode(s3_filter
, bl
);
505 void dump(Formatter
*f
) const;
507 WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter
)
509 struct rgw_pubsub_bucket_topics
{
510 std::map
<std::string
, rgw_pubsub_topic_filter
> topics
;
512 void encode(bufferlist
& bl
) const {
513 ENCODE_START(1, 1, bl
);
518 void decode(bufferlist::const_iterator
& bl
) {
524 void dump(Formatter
*f
) const;
526 WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics
)
528 struct rgw_pubsub_user_topics
{
529 std::map
<std::string
, rgw_pubsub_topic_subs
> topics
;
531 void encode(bufferlist
& bl
) const {
532 ENCODE_START(1, 1, bl
);
537 void decode(bufferlist::const_iterator
& bl
) {
543 void dump(Formatter
*f
) const;
544 void dump_xml(Formatter
*f
) const;
546 WRITE_CLASS_ENCODER(rgw_pubsub_user_topics
)
548 static std::string pubsub_user_oid_prefix
= "pubsub.user.";
556 RGWSysObjectCtx obj_ctx
;
558 rgw_raw_obj user_meta_obj
;
560 std::string
user_meta_oid() const {
561 return pubsub_user_oid_prefix
+ user
.to_str();
564 std::string
bucket_meta_oid(const rgw_bucket
& bucket
) const {
565 return pubsub_user_oid_prefix
+ user
.to_str() + ".bucket." + bucket
.name
+ "/" + bucket
.bucket_id
;
568 std::string
sub_meta_oid(const string
& name
) const {
569 return pubsub_user_oid_prefix
+ user
.to_str() + ".sub." + name
;
573 int read(const rgw_raw_obj
& obj
, T
*data
, RGWObjVersionTracker
*objv_tracker
);
576 int write(const rgw_raw_obj
& obj
, const T
& info
, RGWObjVersionTracker
*obj_tracker
);
578 int remove(const rgw_raw_obj
& obj
, RGWObjVersionTracker
*objv_tracker
);
580 int read_user_topics(rgw_pubsub_user_topics
*result
, RGWObjVersionTracker
*objv_tracker
);
581 int write_user_topics(const rgw_pubsub_user_topics
& topics
, RGWObjVersionTracker
*objv_tracker
);
584 RGWUserPubSub(RGWRados
*_store
, const rgw_user
& _user
) : store(_store
),
586 obj_ctx(store
->svc
.sysobj
->init_obj_ctx()) {
587 get_user_meta_obj(&user_meta_obj
);
591 friend class RGWUserPubSub
;
594 rgw_raw_obj bucket_meta_obj
;
596 // read the list of topics associated with a bucket and populate into result
597 // use version tacker to enforce atomicity between read/write
598 // return 0 on success or if no topic was associated with the bucket, error code otherwise
599 int read_topics(rgw_pubsub_bucket_topics
*result
, RGWObjVersionTracker
*objv_tracker
);
600 // set the list of topics associated with a bucket
601 // use version tacker to enforce atomicity between read/write
602 // return 0 on success, error code otherwise
603 int write_topics(const rgw_pubsub_bucket_topics
& topics
, RGWObjVersionTracker
*objv_tracker
);
605 Bucket(RGWUserPubSub
*_ps
, const rgw_bucket
& _bucket
) : ps(_ps
), bucket(_bucket
) {
606 ps
->get_bucket_meta_obj(bucket
, &bucket_meta_obj
);
609 // read the list of topics associated with a bucket and populate into result
610 // return 0 on success or if no topic was associated with the bucket, error code otherwise
611 int get_topics(rgw_pubsub_bucket_topics
*result
);
612 // adds a topic + filter (event list, and possibly name and metadata filters) to a bucket
613 // assigning a notification name is optional (needed for S3 compatible notifications)
614 // if the topic already exist on the bucket, the filter event list may be updated
615 // for S3 compliant notifications the version with: s3_filter and notif_name should be used
616 // return -ENOENT if the topic does not exists
617 // return 0 on success, error code otherwise
618 int create_notification(const string
& topic_name
, const rgw::notify::EventTypeList
& events
);
619 int create_notification(const string
& topic_name
, const rgw::notify::EventTypeList
& events
, OptionalFilter s3_filter
, const std::string
& notif_name
);
620 // remove a topic and filter from bucket
621 // if the topic does not exists on the bucket it is a no-op (considered success)
622 // return -ENOENT if the topic does not exists
623 // return 0 on success, error code otherwise
624 int remove_notification(const string
& topic_name
);
627 // base class for subscription
629 friend class RGWUserPubSub
;
631 RGWUserPubSub
* const ps
;
632 const std::string sub
;
633 rgw_raw_obj sub_meta_obj
;
635 int read_sub(rgw_pubsub_sub_config
*result
, RGWObjVersionTracker
*objv_tracker
);
636 int write_sub(const rgw_pubsub_sub_config
& sub_conf
, RGWObjVersionTracker
*objv_tracker
);
637 int remove_sub(RGWObjVersionTracker
*objv_tracker
);
639 Sub(RGWUserPubSub
*_ps
, const std::string
& _sub
) : ps(_ps
), sub(_sub
) {
640 ps
->get_sub_meta_obj(sub
, &sub_meta_obj
);
643 virtual ~Sub() = default;
645 int subscribe(const string
& topic_name
, const rgw_pubsub_sub_dest
& dest
, const std::string
& s3_id
="");
646 int unsubscribe(const string
& topic_name
);
647 int get_conf(rgw_pubsub_sub_config
* result
);
649 static const int DEFAULT_MAX_EVENTS
= 100;
650 // followint virtual methods should only be called in derived
651 virtual int list_events(const string
& marker
, int max_events
) {ceph_assert(false);}
652 virtual int remove_event(const string
& event_id
) {ceph_assert(false);}
653 virtual void dump(Formatter
* f
) const {ceph_assert(false);}
656 // subscription with templated list of events to support both S3 compliant and Ceph specific events
657 template<typename EventType
>
658 class SubWithEvents
: public Sub
{
660 struct list_events_result
{
661 std::string next_marker
;
662 bool is_truncated
{false};
663 void dump(Formatter
*f
) const;
664 std::vector
<EventType
> events
;
668 SubWithEvents(RGWUserPubSub
*_ps
, const string
& _sub
) : Sub(_ps
, _sub
) {}
670 virtual ~SubWithEvents() = default;
672 int list_events(const string
& marker
, int max_events
) override
;
673 int remove_event(const string
& event_id
) override
;
674 void dump(Formatter
* f
) const override
;
677 using BucketRef
= std::shared_ptr
<Bucket
>;
678 using SubRef
= std::shared_ptr
<Sub
>;
680 BucketRef
get_bucket(const rgw_bucket
& bucket
) {
681 return std::make_shared
<Bucket
>(this, bucket
);
684 SubRef
get_sub(const string
& sub
) {
685 return std::make_shared
<Sub
>(this, sub
);
688 SubRef
get_sub_with_events(const string
& sub
) {
689 auto tmpsub
= Sub(this, sub
);
690 rgw_pubsub_sub_config conf
;
691 if (tmpsub
.get_conf(&conf
) < 0) {
694 if (conf
.s3_id
.empty()) {
695 return std::make_shared
<SubWithEvents
<rgw_pubsub_event
>>(this, sub
);
697 return std::make_shared
<SubWithEvents
<rgw_pubsub_s3_record
>>(this, sub
);
700 void get_user_meta_obj(rgw_raw_obj
*obj
) const {
701 *obj
= rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, user_meta_oid());
704 void get_bucket_meta_obj(const rgw_bucket
& bucket
, rgw_raw_obj
*obj
) const {
705 *obj
= rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, bucket_meta_oid(bucket
));
708 void get_sub_meta_obj(const string
& name
, rgw_raw_obj
*obj
) const {
709 *obj
= rgw_raw_obj(store
->svc
.zone
->get_zone_params().log_pool
, sub_meta_oid(name
));
712 // get all topics defined for the user and populate them into "result"
713 // return 0 on success or if no topics exist, error code otherwise
714 int get_user_topics(rgw_pubsub_user_topics
*result
);
715 // get a topic with its subscriptions by its name and populate it into "result"
716 // return -ENOENT if the topic does not exists
717 // return 0 on success, error code otherwise
718 int get_topic(const string
& name
, rgw_pubsub_topic_subs
*result
);
719 // get a topic with by its name and populate it into "result"
720 // return -ENOENT if the topic does not exists
721 // return 0 on success, error code otherwise
722 int get_topic(const string
& name
, rgw_pubsub_topic
*result
);
723 // create a topic with a name only
724 // if the topic already exists it is a no-op (considered success)
725 // return 0 on success, error code otherwise
726 int create_topic(const string
& name
);
727 // create a topic with push destination information and ARN
728 // if the topic already exists the destination and ARN values may be updated (considered succsess)
729 // return 0 on success, error code otherwise
730 int create_topic(const string
& name
, const rgw_pubsub_sub_dest
& dest
, const std::string
& arn
);
731 // remove a topic according to its name
732 // if the topic does not exists it is a no-op (considered success)
733 // return 0 on success, error code otherwise
734 int remove_topic(const string
& name
);
738 int RGWUserPubSub::read(const rgw_raw_obj
& obj
, T
*result
, RGWObjVersionTracker
*objv_tracker
)
741 int ret
= rgw_get_system_obj(store
, obj_ctx
,
745 nullptr, nullptr, nullptr);
750 auto iter
= bl
.cbegin();
752 decode(*result
, iter
);
753 } catch (buffer::error
& err
) {
761 int RGWUserPubSub::write(const rgw_raw_obj
& obj
, const T
& info
, RGWObjVersionTracker
*objv_tracker
)
766 int ret
= rgw_put_system_obj(store
, obj
.pool
, obj
.oid
,
767 bl
, false, objv_tracker
,
773 obj_ctx
.invalidate(const_cast<rgw_raw_obj
&>(obj
));