1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_PUBSUB_H
5 #define CEPH_RGW_PUBSUB_H
7 #include "services/svc_sys_obj.h"
10 #include "rgw_notify_event_type.h"
11 #include <boost/container/flat_map.hpp>
13 namespace rgw::sal
{ class RadosStore
; }
17 struct rgw_s3_key_filter
{
18 std::string prefix_rule
;
19 std::string suffix_rule
;
20 std::string regex_rule
;
22 bool has_content() const;
24 bool decode_xml(XMLObj
*obj
);
25 void dump_xml(Formatter
*f
) const;
27 void encode(bufferlist
& bl
) const {
28 ENCODE_START(1, 1, bl
);
29 encode(prefix_rule
, bl
);
30 encode(suffix_rule
, bl
);
31 encode(regex_rule
, bl
);
35 void decode(bufferlist::const_iterator
& bl
) {
37 decode(prefix_rule
, bl
);
38 decode(suffix_rule
, bl
);
39 decode(regex_rule
, bl
);
43 WRITE_CLASS_ENCODER(rgw_s3_key_filter
)
45 using KeyValueMap
= boost::container::flat_map
<std::string
, std::string
>;
46 using KeyMultiValueMap
= std::multimap
<std::string
, std::string
>;
48 struct rgw_s3_key_value_filter
{
51 bool has_content() const;
53 bool decode_xml(XMLObj
*obj
);
54 void dump_xml(Formatter
*f
) const;
56 void encode(bufferlist
& bl
) const {
57 ENCODE_START(1, 1, bl
);
61 void decode(bufferlist::const_iterator
& bl
) {
67 WRITE_CLASS_ENCODER(rgw_s3_key_value_filter
)
69 struct rgw_s3_filter
{
70 rgw_s3_key_filter key_filter
;
71 rgw_s3_key_value_filter metadata_filter
;
72 rgw_s3_key_value_filter tag_filter
;
74 bool has_content() const;
76 bool decode_xml(XMLObj
*obj
);
77 void dump_xml(Formatter
*f
) const;
79 void encode(bufferlist
& bl
) const {
80 ENCODE_START(2, 1, bl
);
81 encode(key_filter
, bl
);
82 encode(metadata_filter
, bl
);
83 encode(tag_filter
, bl
);
87 void decode(bufferlist::const_iterator
& bl
) {
89 decode(key_filter
, bl
);
90 decode(metadata_filter
, bl
);
92 decode(tag_filter
, bl
);
97 WRITE_CLASS_ENCODER(rgw_s3_filter
)
99 using OptionalFilter
= std::optional
<rgw_s3_filter
>;
101 struct rgw_pubsub_topic_filter
;
102 /* S3 notification configuration
103 * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
104 <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
126 <Id>notification1</Id>
127 <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
128 <Event>s3:ObjectCreated:*</Event>
129 <Event>s3:ObjectRemoved:*</Event>
130 </TopicConfiguration>
131 </NotificationConfiguration>
133 struct rgw_pubsub_s3_notification
{
137 rgw::notify::EventTypeList events
;
139 std::string topic_arn
;
141 rgw_s3_filter filter
;
143 bool decode_xml(XMLObj
*obj
);
144 void dump_xml(Formatter
*f
) const;
146 rgw_pubsub_s3_notification() = default;
147 // construct from rgw_pubsub_topic_filter (used by get/list notifications)
148 explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter
& topic_filter
);
151 // return true if the key matches the prefix/suffix/regex rules of the key filter
152 bool match(const rgw_s3_key_filter
& filter
, const std::string
& key
);
154 // return true if the key matches the metadata rules of the metadata filter
155 bool match(const rgw_s3_key_value_filter
& filter
, const KeyValueMap
& kv
);
157 // return true if the key matches the tag rules of the tag filter
158 bool match(const rgw_s3_key_value_filter
& filter
, const KeyMultiValueMap
& kv
);
160 // return true if the event type matches (equal or contained in) one of the events in the list
161 bool match(const rgw::notify::EventTypeList
& events
, rgw::notify::EventType event
);
163 struct rgw_pubsub_s3_notifications
{
164 std::list
<rgw_pubsub_s3_notification
> list
;
165 bool decode_xml(XMLObj
*obj
);
166 void dump_xml(Formatter
*f
) const;
169 /* S3 event records structure
170 * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
182 "requestParameters":{
186 "x-amz-request-id":"",
190 "s3SchemaVersion":"1.0",
191 "configurationId":"",
215 struct rgw_pubsub_s3_event
{
216 constexpr static const char* const json_type_plural
= "Records";
217 std::string eventVersion
= "2.2";
219 std::string eventSource
= "ceph:s3";
221 std::string awsRegion
;
222 // time of the request
223 ceph::real_time eventTime
;
225 std::string eventName
;
226 // user that sent the request
227 std::string userIdentity
;
228 // IP address of source of the request (not implemented)
229 std::string sourceIPAddress
;
230 // request ID (not implemented)
231 std::string x_amz_request_id
;
232 // radosgw that received the request
233 std::string x_amz_id_2
;
234 std::string s3SchemaVersion
= "1.0";
235 // ID received in the notification request
236 std::string configurationId
;
238 std::string bucket_name
;
240 std::string bucket_ownerIdentity
;
242 std::string bucket_arn
;
244 std::string object_key
;
246 uint64_t object_size
= 0;
248 std::string object_etag
;
249 // object version id bucket is versioned
250 std::string object_versionId
;
251 // hexadecimal value used to determine event order for specific key
252 std::string object_sequencer
;
253 // this is an rgw extension (not S3 standard)
254 // used to store a globally unique identifier of the event
255 // that could be used for acking or any other identification of the event
257 // this is an rgw extension holding the internal bucket id
258 std::string bucket_id
;
260 KeyValueMap x_meta_map
;
262 KeyMultiValueMap tags
;
263 // opaque data received from the topic
264 // could be used to identify the gateway
265 std::string opaque_data
;
267 void encode(bufferlist
& bl
) const {
268 ENCODE_START(4, 1, bl
);
269 encode(eventVersion
, bl
);
270 encode(eventSource
, bl
);
271 encode(awsRegion
, bl
);
272 encode(eventTime
, bl
);
273 encode(eventName
, bl
);
274 encode(userIdentity
, bl
);
275 encode(sourceIPAddress
, bl
);
276 encode(x_amz_request_id
, bl
);
277 encode(x_amz_id_2
, bl
);
278 encode(s3SchemaVersion
, bl
);
279 encode(configurationId
, bl
);
280 encode(bucket_name
, bl
);
281 encode(bucket_ownerIdentity
, bl
);
282 encode(bucket_arn
, bl
);
283 encode(object_key
, bl
);
284 encode(object_size
, bl
);
285 encode(object_etag
, bl
);
286 encode(object_versionId
, bl
);
287 encode(object_sequencer
, bl
);
289 encode(bucket_id
, bl
);
290 encode(x_meta_map
, bl
);
292 encode(opaque_data
, bl
);
296 void decode(bufferlist::const_iterator
& bl
) {
298 decode(eventVersion
, bl
);
299 decode(eventSource
, bl
);
300 decode(awsRegion
, bl
);
301 decode(eventTime
, bl
);
302 decode(eventName
, bl
);
303 decode(userIdentity
, bl
);
304 decode(sourceIPAddress
, bl
);
305 decode(x_amz_request_id
, bl
);
306 decode(x_amz_id_2
, bl
);
307 decode(s3SchemaVersion
, bl
);
308 decode(configurationId
, bl
);
309 decode(bucket_name
, bl
);
310 decode(bucket_ownerIdentity
, bl
);
311 decode(bucket_arn
, bl
);
312 decode(object_key
, bl
);
313 decode(object_size
, bl
);
314 decode(object_etag
, bl
);
315 decode(object_versionId
, bl
);
316 decode(object_sequencer
, bl
);
319 decode(bucket_id
, bl
);
320 decode(x_meta_map
, bl
);
326 decode(opaque_data
, bl
);
331 void dump(Formatter
*f
) const;
333 WRITE_CLASS_ENCODER(rgw_pubsub_s3_event
)
335 struct rgw_pubsub_event
{
336 constexpr static const char* const json_type_plural
= "events";
338 std::string event_name
;
340 ceph::real_time timestamp
;
341 JSONFormattable info
;
343 void encode(bufferlist
& bl
) const {
344 ENCODE_START(1, 1, bl
);
346 encode(event_name
, bl
);
348 encode(timestamp
, bl
);
353 void decode(bufferlist::const_iterator
& bl
) {
356 decode(event_name
, bl
);
358 decode(timestamp
, bl
);
363 void dump(Formatter
*f
) const;
365 WRITE_CLASS_ENCODER(rgw_pubsub_event
)
367 // setting a unique ID for an event based on object hash and timestamp
368 void set_event_id(std::string
& id
, const std::string
& hash
, const utime_t
& ts
);
370 struct rgw_pubsub_sub_dest
{
371 std::string bucket_name
;
372 std::string oid_prefix
;
373 std::string push_endpoint
;
374 std::string push_endpoint_args
;
375 std::string arn_topic
;
376 bool stored_secret
= false;
377 bool persistent
= false;
379 void encode(bufferlist
& bl
) const {
380 ENCODE_START(5, 1, bl
);
381 encode(bucket_name
, bl
);
382 encode(oid_prefix
, bl
);
383 encode(push_endpoint
, bl
);
384 encode(push_endpoint_args
, bl
);
385 encode(arn_topic
, bl
);
386 encode(stored_secret
, bl
);
387 encode(persistent
, bl
);
391 void decode(bufferlist::const_iterator
& bl
) {
393 decode(bucket_name
, bl
);
394 decode(oid_prefix
, bl
);
395 decode(push_endpoint
, bl
);
397 decode(push_endpoint_args
, bl
);
400 decode(arn_topic
, bl
);
403 decode(stored_secret
, bl
);
406 decode(persistent
, bl
);
411 void dump(Formatter
*f
) const;
412 void dump_xml(Formatter
*f
) const;
413 std::string
to_json_str() const;
415 WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest
)
417 struct rgw_pubsub_sub_config
{
421 rgw_pubsub_sub_dest dest
;
424 void encode(bufferlist
& bl
) const {
425 ENCODE_START(2, 1, bl
);
434 void decode(bufferlist::const_iterator
& bl
) {
446 void dump(Formatter
*f
) const;
448 WRITE_CLASS_ENCODER(rgw_pubsub_sub_config
)
450 struct rgw_pubsub_topic
{
453 rgw_pubsub_sub_dest dest
;
455 std::string opaque_data
;
457 void encode(bufferlist
& bl
) const {
458 ENCODE_START(3, 1, bl
);
463 encode(opaque_data
, bl
);
467 void decode(bufferlist::const_iterator
& bl
) {
476 decode(opaque_data
, bl
);
481 std::string
to_str() const {
482 return user
.tenant
+ "/" + name
;
485 void dump(Formatter
*f
) const;
486 void dump_xml(Formatter
*f
) const;
487 void dump_xml_as_attributes(Formatter
*f
) const;
489 bool operator<(const rgw_pubsub_topic
& t
) const {
490 return to_str().compare(t
.to_str());
493 WRITE_CLASS_ENCODER(rgw_pubsub_topic
)
495 struct rgw_pubsub_topic_subs
{
496 rgw_pubsub_topic topic
;
497 std::set
<std::string
> subs
;
499 void encode(bufferlist
& bl
) const {
500 ENCODE_START(1, 1, bl
);
506 void decode(bufferlist::const_iterator
& bl
) {
513 void dump(Formatter
*f
) const;
515 WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs
)
517 struct rgw_pubsub_topic_filter
{
518 rgw_pubsub_topic topic
;
519 rgw::notify::EventTypeList events
;
521 rgw_s3_filter s3_filter
;
523 void encode(bufferlist
& bl
) const {
524 ENCODE_START(3, 1, bl
);
526 // events are stored as a vector of std::strings
527 std::vector
<std::string
> tmp_events
;
528 const auto converter
= s3_id
.empty() ? rgw::notify::to_ceph_string
: rgw::notify::to_string
;
529 std::transform(events
.begin(), events
.end(), std::back_inserter(tmp_events
), converter
);
530 encode(tmp_events
, bl
);
532 encode(s3_filter
, bl
);
536 void decode(bufferlist::const_iterator
& bl
) {
539 // events are stored as a vector of std::strings
541 std::vector
<std::string
> tmp_events
;
542 decode(tmp_events
, bl
);
543 std::transform(tmp_events
.begin(), tmp_events
.end(), std::back_inserter(events
), rgw::notify::from_string
);
548 decode(s3_filter
, bl
);
553 void dump(Formatter
*f
) const;
555 WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter
)
557 struct rgw_pubsub_bucket_topics
{
558 std::map
<std::string
, rgw_pubsub_topic_filter
> topics
;
560 void encode(bufferlist
& bl
) const {
561 ENCODE_START(1, 1, bl
);
566 void decode(bufferlist::const_iterator
& bl
) {
572 void dump(Formatter
*f
) const;
574 WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics
)
576 struct rgw_pubsub_topics
{
577 std::map
<std::string
, rgw_pubsub_topic_subs
> topics
;
579 void encode(bufferlist
& bl
) const {
580 ENCODE_START(1, 1, bl
);
585 void decode(bufferlist::const_iterator
& bl
) {
591 void dump(Formatter
*f
) const;
592 void dump_xml(Formatter
*f
) const;
594 WRITE_CLASS_ENCODER(rgw_pubsub_topics
)
596 static std::string pubsub_oid_prefix
= "pubsub.";
602 rgw::sal::RadosStore
* store
;
603 const std::string tenant
;
604 RGWSysObjectCtx obj_ctx
;
606 rgw_raw_obj meta_obj
;
608 std::string
meta_oid() const {
609 return pubsub_oid_prefix
+ tenant
;
612 std::string
bucket_meta_oid(const rgw_bucket
& bucket
) const {
613 return pubsub_oid_prefix
+ tenant
+ ".bucket." + bucket
.name
+ "/" + bucket
.marker
;
616 std::string
sub_meta_oid(const std::string
& name
) const {
617 return pubsub_oid_prefix
+ tenant
+ ".sub." + name
;
621 int read(const rgw_raw_obj
& obj
, T
* data
, RGWObjVersionTracker
* objv_tracker
);
624 int write(const DoutPrefixProvider
*dpp
, const rgw_raw_obj
& obj
, const T
& info
,
625 RGWObjVersionTracker
* obj_tracker
, optional_yield y
);
627 int remove(const DoutPrefixProvider
*dpp
, const rgw_raw_obj
& obj
, RGWObjVersionTracker
* objv_tracker
,
630 int read_topics(rgw_pubsub_topics
*result
, RGWObjVersionTracker
* objv_tracker
);
631 int write_topics(const DoutPrefixProvider
*dpp
, const rgw_pubsub_topics
& topics
,
632 RGWObjVersionTracker
* objv_tracker
, optional_yield y
);
635 RGWPubSub(rgw::sal::RadosStore
* _store
, const std::string
& tenant
);
638 friend class RGWPubSub
;
641 rgw_raw_obj bucket_meta_obj
;
643 // read the list of topics associated with a bucket and populate into result
644 // use version tacker to enforce atomicity between read/write
645 // return 0 on success or if no topic was associated with the bucket, error code otherwise
646 int read_topics(rgw_pubsub_bucket_topics
*result
, RGWObjVersionTracker
* objv_tracker
);
647 // set the list of topics associated with a bucket
648 // use version tacker to enforce atomicity between read/write
649 // return 0 on success, error code otherwise
650 int write_topics(const DoutPrefixProvider
*dpp
, const rgw_pubsub_bucket_topics
& topics
,
651 RGWObjVersionTracker
* objv_tracker
, optional_yield y
);
653 Bucket(RGWPubSub
*_ps
, const rgw_bucket
& _bucket
) : ps(_ps
), bucket(_bucket
) {
654 ps
->get_bucket_meta_obj(bucket
, &bucket_meta_obj
);
657 // read the list of topics associated with a bucket and populate into result
658 // return 0 on success or if no topic was associated with the bucket, error code otherwise
659 int get_topics(rgw_pubsub_bucket_topics
*result
);
660 // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
661 // assigning a notification name is optional (needed for S3 compatible notifications)
662 // if the topic already exist on the bucket, the filter event list may be updated
663 // for S3 compliant notifications the version with: s3_filter and notif_name should be used
664 // return -ENOENT if the topic does not exists
665 // return 0 on success, error code otherwise
666 int create_notification(const DoutPrefixProvider
*dpp
, const std::string
& topic_name
, const rgw::notify::EventTypeList
& events
, optional_yield y
);
667 int create_notification(const DoutPrefixProvider
*dpp
, const std::string
& topic_name
, const rgw::notify::EventTypeList
& events
, OptionalFilter s3_filter
, const std::string
& notif_name
, optional_yield y
);
668 // remove a topic and filter from bucket
669 // if the topic does not exists on the bucket it is a no-op (considered success)
670 // return -ENOENT if the topic does not exists
671 // return 0 on success, error code otherwise
672 int remove_notification(const DoutPrefixProvider
*dpp
, const std::string
& topic_name
, optional_yield y
);
673 // remove all notifications (and autogenerated topics) associated with the bucket
674 // return 0 on success or if no topic was associated with the bucket, error code otherwise
675 int remove_notifications(const DoutPrefixProvider
*dpp
, optional_yield y
);
678 // base class for subscription
680 friend class RGWPubSub
;
683 const std::string sub
;
684 rgw_raw_obj sub_meta_obj
;
686 int read_sub(rgw_pubsub_sub_config
*result
, RGWObjVersionTracker
* objv_tracker
);
687 int write_sub(const DoutPrefixProvider
*dpp
, const rgw_pubsub_sub_config
& sub_conf
,
688 RGWObjVersionTracker
* objv_tracker
, optional_yield y
);
689 int remove_sub(const DoutPrefixProvider
*dpp
, RGWObjVersionTracker
* objv_tracker
, optional_yield y
);
691 Sub(RGWPubSub
*_ps
, const std::string
& _sub
) : ps(_ps
), sub(_sub
) {
692 ps
->get_sub_meta_obj(sub
, &sub_meta_obj
);
695 virtual ~Sub() = default;
697 int subscribe(const DoutPrefixProvider
*dpp
, const std::string
& topic_name
, const rgw_pubsub_sub_dest
& dest
, optional_yield y
,
698 const std::string
& s3_id
="");
699 int unsubscribe(const DoutPrefixProvider
*dpp
, const std::string
& topic_name
, optional_yield y
);
700 int get_conf(rgw_pubsub_sub_config
* result
);
702 static const int DEFAULT_MAX_EVENTS
= 100;
703 // followint virtual methods should only be called in derived
704 virtual int list_events(const DoutPrefixProvider
*dpp
, const std::string
& marker
, int max_events
) {ceph_assert(false);}
705 virtual int remove_event(const DoutPrefixProvider
*dpp
, const std::string
& event_id
) {ceph_assert(false);}
706 virtual void dump(Formatter
* f
) const {ceph_assert(false);}
709 // subscription with templated list of events to support both S3 compliant and Ceph specific events
710 template<typename EventType
>
711 class SubWithEvents
: public Sub
{
713 struct list_events_result
{
714 std::string next_marker
;
715 bool is_truncated
{false};
716 void dump(Formatter
*f
) const;
717 std::vector
<EventType
> events
;
721 SubWithEvents(RGWPubSub
*_ps
, const std::string
& _sub
) : Sub(_ps
, _sub
) {}
723 virtual ~SubWithEvents() = default;
725 int list_events(const DoutPrefixProvider
*dpp
, const std::string
& marker
, int max_events
) override
;
726 int remove_event(const DoutPrefixProvider
*dpp
, const std::string
& event_id
) override
;
727 void dump(Formatter
* f
) const override
;
730 using BucketRef
= std::shared_ptr
<Bucket
>;
731 using SubRef
= std::shared_ptr
<Sub
>;
733 BucketRef
get_bucket(const rgw_bucket
& bucket
) {
734 return std::make_shared
<Bucket
>(this, bucket
);
737 SubRef
get_sub(const std::string
& sub
) {
738 return std::make_shared
<Sub
>(this, sub
);
741 SubRef
get_sub_with_events(const std::string
& sub
) {
742 auto tmpsub
= Sub(this, sub
);
743 rgw_pubsub_sub_config conf
;
744 if (tmpsub
.get_conf(&conf
) < 0) {
747 if (conf
.s3_id
.empty()) {
748 return std::make_shared
<SubWithEvents
<rgw_pubsub_event
>>(this, sub
);
750 return std::make_shared
<SubWithEvents
<rgw_pubsub_s3_event
>>(this, sub
);
753 void get_meta_obj(rgw_raw_obj
*obj
) const;
754 void get_bucket_meta_obj(const rgw_bucket
& bucket
, rgw_raw_obj
*obj
) const;
756 void get_sub_meta_obj(const std::string
& name
, rgw_raw_obj
*obj
) const;
758 // get all topics (per tenant, if used)) and populate them into "result"
759 // return 0 on success or if no topics exist, error code otherwise
760 int get_topics(rgw_pubsub_topics
*result
);
761 // get a topic with its subscriptions by its name and populate it into "result"
762 // return -ENOENT if the topic does not exists
763 // return 0 on success, error code otherwise
764 int get_topic(const std::string
& name
, rgw_pubsub_topic_subs
*result
);
765 // get a topic with by its name and populate it into "result"
766 // return -ENOENT if the topic does not exists
767 // return 0 on success, error code otherwise
768 int get_topic(const std::string
& name
, rgw_pubsub_topic
*result
);
769 // create a topic with a name only
770 // if the topic already exists it is a no-op (considered success)
771 // return 0 on success, error code otherwise
772 int create_topic(const DoutPrefixProvider
*dpp
, const std::string
& name
, optional_yield y
);
773 // create a topic with push destination information and ARN
774 // if the topic already exists the destination and ARN values may be updated (considered succsess)
775 // return 0 on success, error code otherwise
776 int create_topic(const DoutPrefixProvider
*dpp
, const std::string
& name
, const rgw_pubsub_sub_dest
& dest
, const std::string
& arn
, const std::string
& opaque_data
, optional_yield y
);
777 // remove a topic according to its name
778 // if the topic does not exists it is a no-op (considered success)
779 // return 0 on success, error code otherwise
780 int remove_topic(const DoutPrefixProvider
*dpp
, const std::string
& name
, optional_yield y
);
785 int RGWPubSub::read(const rgw_raw_obj
& obj
, T
* result
, RGWObjVersionTracker
* objv_tracker
)
788 int ret
= rgw_get_system_obj(obj_ctx
,
792 nullptr, null_yield
, nullptr, nullptr);
797 auto iter
= bl
.cbegin();
799 decode(*result
, iter
);
800 } catch (buffer::error
& err
) {
808 int RGWPubSub::write(const DoutPrefixProvider
*dpp
, const rgw_raw_obj
& obj
, const T
& info
,
809 RGWObjVersionTracker
* objv_tracker
, optional_yield y
)
814 int ret
= rgw_put_system_obj(dpp
, obj_ctx
, obj
.pool
, obj
.oid
,
815 bl
, false, objv_tracker
,
821 obj_ctx
.invalidate(obj
);