1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_PUBSUB_H
5 #define CEPH_RGW_PUBSUB_H
8 #include "services/svc_sys_obj.h"
11 #include "rgw_notify_event_type.h"
12 #include <boost/container/flat_map.hpp>
16 struct rgw_s3_key_filter
{
17 std::string prefix_rule
;
18 std::string suffix_rule
;
19 std::string regex_rule
;
21 bool has_content() const;
23 bool decode_xml(XMLObj
*obj
);
24 void dump_xml(Formatter
*f
) const;
26 void encode(bufferlist
& bl
) const {
27 ENCODE_START(1, 1, bl
);
28 encode(prefix_rule
, bl
);
29 encode(suffix_rule
, bl
);
30 encode(regex_rule
, bl
);
34 void decode(bufferlist::const_iterator
& bl
) {
36 decode(prefix_rule
, bl
);
37 decode(suffix_rule
, bl
);
38 decode(regex_rule
, bl
);
42 WRITE_CLASS_ENCODER(rgw_s3_key_filter
)
44 using KeyValueMap
= boost::container::flat_map
<std::string
, std::string
>;
46 struct rgw_s3_key_value_filter
{
49 bool has_content() const;
51 bool decode_xml(XMLObj
*obj
);
52 void dump_xml(Formatter
*f
) const;
54 void encode(bufferlist
& bl
) const {
55 ENCODE_START(1, 1, bl
);
59 void decode(bufferlist::const_iterator
& bl
) {
65 WRITE_CLASS_ENCODER(rgw_s3_key_value_filter
)
67 struct rgw_s3_filter
{
68 rgw_s3_key_filter key_filter
;
69 rgw_s3_key_value_filter metadata_filter
;
70 rgw_s3_key_value_filter tag_filter
;
72 bool has_content() const;
74 bool decode_xml(XMLObj
*obj
);
75 void dump_xml(Formatter
*f
) const;
77 void encode(bufferlist
& bl
) const {
78 ENCODE_START(2, 1, bl
);
79 encode(key_filter
, bl
);
80 encode(metadata_filter
, bl
);
81 encode(tag_filter
, bl
);
85 void decode(bufferlist::const_iterator
& bl
) {
87 decode(key_filter
, bl
);
88 decode(metadata_filter
, bl
);
90 decode(tag_filter
, bl
);
95 WRITE_CLASS_ENCODER(rgw_s3_filter
)
97 using OptionalFilter
= std::optional
<rgw_s3_filter
>;
99 struct rgw_pubsub_topic_filter
;
100 /* S3 notification configuration
101 * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
102 <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
124 <Id>notification1</Id>
125 <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
126 <Event>s3:ObjectCreated:*</Event>
127 <Event>s3:ObjectRemoved:*</Event>
128 </TopicConfiguration>
129 </NotificationConfiguration>
131 struct rgw_pubsub_s3_notification
{
135 rgw::notify::EventTypeList events
;
137 std::string topic_arn
;
139 rgw_s3_filter filter
;
141 bool decode_xml(XMLObj
*obj
);
142 void dump_xml(Formatter
*f
) const;
144 rgw_pubsub_s3_notification() = default;
145 // construct from rgw_pubsub_topic_filter (used by get/list notifications)
146 explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter
& topic_filter
);
149 // return true if the key matches the prefix/suffix/regex rules of the key filter
150 bool match(const rgw_s3_key_filter
& filter
, const std::string
& key
);
151 // return true if the key matches the metadata/tags rules of the metadata/tags filter
152 bool match(const rgw_s3_key_value_filter
& filter
, const KeyValueMap
& kv
);
153 // return true if the event type matches (equal or contained in) one of the events in the list
154 bool match(const rgw::notify::EventTypeList
& events
, rgw::notify::EventType event
);
156 struct rgw_pubsub_s3_notifications
{
157 std::list
<rgw_pubsub_s3_notification
> list
;
158 bool decode_xml(XMLObj
*obj
);
159 void dump_xml(Formatter
*f
) const;
162 /* S3 event records structure
163 * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
175 "requestParameters":{
179 "x-amz-request-id":"",
183 "s3SchemaVersion":"1.0",
184 "configurationId":"",
208 struct rgw_pubsub_s3_event
{
209 constexpr static const char* const json_type_plural
= "Records";
210 std::string eventVersion
= "2.2";
212 std::string eventSource
= "ceph:s3";
214 std::string awsRegion
;
215 // time of the request
216 ceph::real_time eventTime
;
218 std::string eventName
;
219 // user that sent the request
220 std::string userIdentity
;
221 // IP address of source of the request (not implemented)
222 std::string sourceIPAddress
;
223 // request ID (not implemented)
224 std::string x_amz_request_id
;
225 // radosgw that received the request
226 std::string x_amz_id_2
;
227 std::string s3SchemaVersion
= "1.0";
228 // ID received in the notification request
229 std::string configurationId
;
231 std::string bucket_name
;
233 std::string bucket_ownerIdentity
;
235 std::string bucket_arn
;
237 std::string object_key
;
239 uint64_t object_size
= 0;
241 std::string object_etag
;
242 // object version id bucket is versioned
243 std::string object_versionId
;
244 // hexadecimal value used to determine event order for specific key
245 std::string object_sequencer
;
246 // this is an rgw extension (not S3 standard)
247 // used to store a globally unique identifier of the event
248 // that could be used for acking or any other identification of the event
250 // this is an rgw extension holding the internal bucket id
251 std::string bucket_id
;
253 KeyValueMap x_meta_map
;
256 // opaque data received from the topic
257 // could be used to identify the gateway
258 std::string opaque_data
;
260 void encode(bufferlist
& bl
) const {
261 ENCODE_START(4, 1, bl
);
262 encode(eventVersion
, bl
);
263 encode(eventSource
, bl
);
264 encode(awsRegion
, bl
);
265 encode(eventTime
, bl
);
266 encode(eventName
, bl
);
267 encode(userIdentity
, bl
);
268 encode(sourceIPAddress
, bl
);
269 encode(x_amz_request_id
, bl
);
270 encode(x_amz_id_2
, bl
);
271 encode(s3SchemaVersion
, bl
);
272 encode(configurationId
, bl
);
273 encode(bucket_name
, bl
);
274 encode(bucket_ownerIdentity
, bl
);
275 encode(bucket_arn
, bl
);
276 encode(object_key
, bl
);
277 encode(object_size
, bl
);
278 encode(object_etag
, bl
);
279 encode(object_versionId
, bl
);
280 encode(object_sequencer
, bl
);
282 encode(bucket_id
, bl
);
283 encode(x_meta_map
, bl
);
285 encode(opaque_data
, bl
);
289 void decode(bufferlist::const_iterator
& bl
) {
291 decode(eventVersion
, bl
);
292 decode(eventSource
, bl
);
293 decode(awsRegion
, bl
);
294 decode(eventTime
, bl
);
295 decode(eventName
, bl
);
296 decode(userIdentity
, bl
);
297 decode(sourceIPAddress
, bl
);
298 decode(x_amz_request_id
, bl
);
299 decode(x_amz_id_2
, bl
);
300 decode(s3SchemaVersion
, bl
);
301 decode(configurationId
, bl
);
302 decode(bucket_name
, bl
);
303 decode(bucket_ownerIdentity
, bl
);
304 decode(bucket_arn
, bl
);
305 decode(object_key
, bl
);
306 decode(object_size
, bl
);
307 decode(object_etag
, bl
);
308 decode(object_versionId
, bl
);
309 decode(object_sequencer
, bl
);
312 decode(bucket_id
, bl
);
313 decode(x_meta_map
, bl
);
319 decode(opaque_data
, bl
);
324 void dump(Formatter
*f
) const;
326 WRITE_CLASS_ENCODER(rgw_pubsub_s3_event
)
328 struct rgw_pubsub_event
{
329 constexpr static const char* const json_type_plural
= "events";
331 std::string event_name
;
333 ceph::real_time timestamp
;
334 JSONFormattable info
;
336 void encode(bufferlist
& bl
) const {
337 ENCODE_START(1, 1, bl
);
339 encode(event_name
, bl
);
341 encode(timestamp
, bl
);
346 void decode(bufferlist::const_iterator
& bl
) {
349 decode(event_name
, bl
);
351 decode(timestamp
, bl
);
356 void dump(Formatter
*f
) const;
358 WRITE_CLASS_ENCODER(rgw_pubsub_event
)
360 // settign a unique ID for an event based on object hash and timestamp
361 void set_event_id(std::string
& id
, const std::string
& hash
, const utime_t
& ts
);
363 struct rgw_pubsub_sub_dest
{
364 std::string bucket_name
;
365 std::string oid_prefix
;
366 std::string push_endpoint
;
367 std::string push_endpoint_args
;
368 std::string arn_topic
;
369 bool stored_secret
= false;
370 bool persistent
= false;
372 void encode(bufferlist
& bl
) const {
373 ENCODE_START(5, 1, bl
);
374 encode(bucket_name
, bl
);
375 encode(oid_prefix
, bl
);
376 encode(push_endpoint
, bl
);
377 encode(push_endpoint_args
, bl
);
378 encode(arn_topic
, bl
);
379 encode(stored_secret
, bl
);
380 encode(persistent
, bl
);
384 void decode(bufferlist::const_iterator
& bl
) {
386 decode(bucket_name
, bl
);
387 decode(oid_prefix
, bl
);
388 decode(push_endpoint
, bl
);
390 decode(push_endpoint_args
, bl
);
393 decode(arn_topic
, bl
);
396 decode(stored_secret
, bl
);
399 decode(persistent
, bl
);
404 void dump(Formatter
*f
) const;
405 void dump_xml(Formatter
*f
) const;
406 std::string
to_json_str() const;
408 WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest
)
410 struct rgw_pubsub_sub_config
{
414 rgw_pubsub_sub_dest dest
;
417 void encode(bufferlist
& bl
) const {
418 ENCODE_START(2, 1, bl
);
427 void decode(bufferlist::const_iterator
& bl
) {
439 void dump(Formatter
*f
) const;
441 WRITE_CLASS_ENCODER(rgw_pubsub_sub_config
)
443 struct rgw_pubsub_topic
{
446 rgw_pubsub_sub_dest dest
;
448 std::string opaque_data
;
450 void encode(bufferlist
& bl
) const {
451 ENCODE_START(3, 1, bl
);
456 encode(opaque_data
, bl
);
460 void decode(bufferlist::const_iterator
& bl
) {
469 decode(opaque_data
, bl
);
474 string
to_str() const {
475 return user
.tenant
+ "/" + name
;
478 void dump(Formatter
*f
) const;
479 void dump_xml(Formatter
*f
) const;
480 void dump_xml_as_attributes(Formatter
*f
) const;
482 bool operator<(const rgw_pubsub_topic
& t
) const {
483 return to_str().compare(t
.to_str());
486 WRITE_CLASS_ENCODER(rgw_pubsub_topic
)
488 struct rgw_pubsub_topic_subs
{
489 rgw_pubsub_topic topic
;
490 std::set
<std::string
> subs
;
492 void encode(bufferlist
& bl
) const {
493 ENCODE_START(1, 1, bl
);
499 void decode(bufferlist::const_iterator
& bl
) {
506 void dump(Formatter
*f
) const;
508 WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs
)
510 struct rgw_pubsub_topic_filter
{
511 rgw_pubsub_topic topic
;
512 rgw::notify::EventTypeList events
;
514 rgw_s3_filter s3_filter
;
516 void encode(bufferlist
& bl
) const {
517 ENCODE_START(3, 1, bl
);
519 // events are stored as a vector of strings
520 std::vector
<std::string
> tmp_events
;
521 const auto converter
= s3_id
.empty() ? rgw::notify::to_ceph_string
: rgw::notify::to_string
;
522 std::transform(events
.begin(), events
.end(), std::back_inserter(tmp_events
), converter
);
523 encode(tmp_events
, bl
);
525 encode(s3_filter
, bl
);
529 void decode(bufferlist::const_iterator
& bl
) {
532 // events are stored as a vector of strings
534 std::vector
<std::string
> tmp_events
;
535 decode(tmp_events
, bl
);
536 std::transform(tmp_events
.begin(), tmp_events
.end(), std::back_inserter(events
), rgw::notify::from_string
);
541 decode(s3_filter
, bl
);
546 void dump(Formatter
*f
) const;
548 WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter
)
550 struct rgw_pubsub_bucket_topics
{
551 std::map
<std::string
, rgw_pubsub_topic_filter
> topics
;
553 void encode(bufferlist
& bl
) const {
554 ENCODE_START(1, 1, bl
);
559 void decode(bufferlist::const_iterator
& bl
) {
565 void dump(Formatter
*f
) const;
567 WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics
)
569 struct rgw_pubsub_topics
{
570 std::map
<std::string
, rgw_pubsub_topic_subs
> topics
;
572 void encode(bufferlist
& bl
) const {
573 ENCODE_START(1, 1, bl
);
578 void decode(bufferlist::const_iterator
& bl
) {
584 void dump(Formatter
*f
) const;
585 void dump_xml(Formatter
*f
) const;
587 WRITE_CLASS_ENCODER(rgw_pubsub_topics
)
589 static std::string pubsub_oid_prefix
= "pubsub.";
595 rgw::sal::RGWRadosStore
*store
;
596 const std::string tenant
;
597 RGWSysObjectCtx obj_ctx
;
599 rgw_raw_obj meta_obj
;
601 std::string
meta_oid() const {
602 return pubsub_oid_prefix
+ tenant
;
605 std::string
bucket_meta_oid(const rgw_bucket
& bucket
) const {
606 return pubsub_oid_prefix
+ tenant
+ ".bucket." + bucket
.name
+ "/" + bucket
.bucket_id
;
609 std::string
sub_meta_oid(const string
& name
) const {
610 return pubsub_oid_prefix
+ tenant
+ ".sub." + name
;
614 int read(const rgw_raw_obj
& obj
, T
* data
, RGWObjVersionTracker
* objv_tracker
);
617 int write(const rgw_raw_obj
& obj
, const T
& info
,
618 RGWObjVersionTracker
* obj_tracker
, optional_yield y
);
620 int remove(const rgw_raw_obj
& obj
, RGWObjVersionTracker
* objv_tracker
,
623 int read_topics(rgw_pubsub_topics
*result
, RGWObjVersionTracker
* objv_tracker
);
624 int write_topics(const rgw_pubsub_topics
& topics
,
625 RGWObjVersionTracker
* objv_tracker
, optional_yield y
);
628 RGWPubSub(rgw::sal::RGWRadosStore
*_store
, const std::string
& tenant
);
631 friend class RGWPubSub
;
634 rgw_raw_obj bucket_meta_obj
;
636 // read the list of topics associated with a bucket and populate into result
637 // use version tacker to enforce atomicity between read/write
638 // return 0 on success or if no topic was associated with the bucket, error code otherwise
639 int read_topics(rgw_pubsub_bucket_topics
*result
, RGWObjVersionTracker
* objv_tracker
);
640 // set the list of topics associated with a bucket
641 // use version tacker to enforce atomicity between read/write
642 // return 0 on success, error code otherwise
643 int write_topics(const rgw_pubsub_bucket_topics
& topics
,
644 RGWObjVersionTracker
* objv_tracker
, optional_yield y
);
646 Bucket(RGWPubSub
*_ps
, const rgw_bucket
& _bucket
) : ps(_ps
), bucket(_bucket
) {
647 ps
->get_bucket_meta_obj(bucket
, &bucket_meta_obj
);
650 // read the list of topics associated with a bucket and populate into result
651 // return 0 on success or if no topic was associated with the bucket, error code otherwise
652 int get_topics(rgw_pubsub_bucket_topics
*result
);
653 // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
654 // assigning a notification name is optional (needed for S3 compatible notifications)
655 // if the topic already exist on the bucket, the filter event list may be updated
656 // for S3 compliant notifications the version with: s3_filter and notif_name should be used
657 // return -ENOENT if the topic does not exists
658 // return 0 on success, error code otherwise
659 int create_notification(const string
& topic_name
, const rgw::notify::EventTypeList
& events
, optional_yield y
);
660 int create_notification(const string
& topic_name
, const rgw::notify::EventTypeList
& events
, OptionalFilter s3_filter
, const std::string
& notif_name
, optional_yield y
);
661 // remove a topic and filter from bucket
662 // if the topic does not exists on the bucket it is a no-op (considered success)
663 // return -ENOENT if the topic does not exists
664 // return 0 on success, error code otherwise
665 int remove_notification(const string
& topic_name
, optional_yield y
);
666 // remove all notifications (and autogenerated topics) associated with the bucket
667 // return 0 on success or if no topic was associated with the bucket, error code otherwise
668 int remove_notifications(optional_yield y
);
671 // base class for subscription
673 friend class RGWPubSub
;
676 const std::string sub
;
677 rgw_raw_obj sub_meta_obj
;
679 int read_sub(rgw_pubsub_sub_config
*result
, RGWObjVersionTracker
* objv_tracker
);
680 int write_sub(const rgw_pubsub_sub_config
& sub_conf
,
681 RGWObjVersionTracker
* objv_tracker
, optional_yield y
);
682 int remove_sub(RGWObjVersionTracker
* objv_tracker
, optional_yield y
);
684 Sub(RGWPubSub
*_ps
, const std::string
& _sub
) : ps(_ps
), sub(_sub
) {
685 ps
->get_sub_meta_obj(sub
, &sub_meta_obj
);
688 virtual ~Sub() = default;
690 int subscribe(const string
& topic_name
, const rgw_pubsub_sub_dest
& dest
, optional_yield y
,
691 const std::string
& s3_id
="");
692 int unsubscribe(const string
& topic_name
, optional_yield y
);
693 int get_conf(rgw_pubsub_sub_config
* result
);
695 static const int DEFAULT_MAX_EVENTS
= 100;
696 // followint virtual methods should only be called in derived
697 virtual int list_events(const string
& marker
, int max_events
) {ceph_assert(false);}
698 virtual int remove_event(const string
& event_id
) {ceph_assert(false);}
699 virtual void dump(Formatter
* f
) const {ceph_assert(false);}
702 // subscription with templated list of events to support both S3 compliant and Ceph specific events
703 template<typename EventType
>
704 class SubWithEvents
: public Sub
{
706 struct list_events_result
{
707 std::string next_marker
;
708 bool is_truncated
{false};
709 void dump(Formatter
*f
) const;
710 std::vector
<EventType
> events
;
714 SubWithEvents(RGWPubSub
*_ps
, const string
& _sub
) : Sub(_ps
, _sub
) {}
716 virtual ~SubWithEvents() = default;
718 int list_events(const string
& marker
, int max_events
) override
;
719 int remove_event(const string
& event_id
) override
;
720 void dump(Formatter
* f
) const override
;
723 using BucketRef
= std::shared_ptr
<Bucket
>;
724 using SubRef
= std::shared_ptr
<Sub
>;
726 BucketRef
get_bucket(const rgw_bucket
& bucket
) {
727 return std::make_shared
<Bucket
>(this, bucket
);
730 SubRef
get_sub(const string
& sub
) {
731 return std::make_shared
<Sub
>(this, sub
);
734 SubRef
get_sub_with_events(const string
& sub
) {
735 auto tmpsub
= Sub(this, sub
);
736 rgw_pubsub_sub_config conf
;
737 if (tmpsub
.get_conf(&conf
) < 0) {
740 if (conf
.s3_id
.empty()) {
741 return std::make_shared
<SubWithEvents
<rgw_pubsub_event
>>(this, sub
);
743 return std::make_shared
<SubWithEvents
<rgw_pubsub_s3_event
>>(this, sub
);
746 void get_meta_obj(rgw_raw_obj
*obj
) const;
747 void get_bucket_meta_obj(const rgw_bucket
& bucket
, rgw_raw_obj
*obj
) const;
749 void get_sub_meta_obj(const string
& name
, rgw_raw_obj
*obj
) const;
751 // get all topics (per tenant, if used)) and populate them into "result"
752 // return 0 on success or if no topics exist, error code otherwise
753 int get_topics(rgw_pubsub_topics
*result
);
754 // get a topic with its subscriptions by its name and populate it into "result"
755 // return -ENOENT if the topic does not exists
756 // return 0 on success, error code otherwise
757 int get_topic(const string
& name
, rgw_pubsub_topic_subs
*result
);
758 // get a topic with by its name and populate it into "result"
759 // return -ENOENT if the topic does not exists
760 // return 0 on success, error code otherwise
761 int get_topic(const string
& name
, rgw_pubsub_topic
*result
);
762 // create a topic with a name only
763 // if the topic already exists it is a no-op (considered success)
764 // return 0 on success, error code otherwise
765 int create_topic(const string
& name
, optional_yield y
);
766 // create a topic with push destination information and ARN
767 // if the topic already exists the destination and ARN values may be updated (considered succsess)
768 // return 0 on success, error code otherwise
769 int create_topic(const string
& name
, const rgw_pubsub_sub_dest
& dest
, const std::string
& arn
, const std::string
& opaque_data
, optional_yield y
);
770 // remove a topic according to its name
771 // if the topic does not exists it is a no-op (considered success)
772 // return 0 on success, error code otherwise
773 int remove_topic(const string
& name
, optional_yield y
);
778 int RGWPubSub::read(const rgw_raw_obj
& obj
, T
* result
, RGWObjVersionTracker
* objv_tracker
)
781 int ret
= rgw_get_system_obj(obj_ctx
,
785 nullptr, null_yield
, nullptr, nullptr);
790 auto iter
= bl
.cbegin();
792 decode(*result
, iter
);
793 } catch (buffer::error
& err
) {
801 int RGWPubSub::write(const rgw_raw_obj
& obj
, const T
& info
,
802 RGWObjVersionTracker
* objv_tracker
, optional_yield y
)
807 int ret
= rgw_put_system_obj(obj_ctx
, obj
.pool
, obj
.oid
,
808 bl
, false, objv_tracker
,
814 obj_ctx
.invalidate(obj
);