1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_PUBSUB_H
5 #define CEPH_RGW_PUBSUB_H
8 #include "services/svc_sys_obj.h"
11 #include "rgw_rados.h"
12 #include "rgw_notify_event_type.h"
13 #include <boost/container/flat_map.hpp>
17 struct rgw_s3_key_filter
{
18 std::string prefix_rule
;
19 std::string suffix_rule
;
20 std::string regex_rule
;
22 bool has_content() const;
24 bool decode_xml(XMLObj
*obj
);
25 void dump_xml(Formatter
*f
) const;
27 void encode(bufferlist
& bl
) const {
28 ENCODE_START(1, 1, bl
);
29 encode(prefix_rule
, bl
);
30 encode(suffix_rule
, bl
);
31 encode(regex_rule
, bl
);
35 void decode(bufferlist::const_iterator
& bl
) {
37 decode(prefix_rule
, bl
);
38 decode(suffix_rule
, bl
);
39 decode(regex_rule
, bl
);
43 WRITE_CLASS_ENCODER(rgw_s3_key_filter
)
45 using KeyValueList
= boost::container::flat_map
<std::string
, std::string
>;
47 struct rgw_s3_key_value_filter
{
50 bool has_content() const;
52 bool decode_xml(XMLObj
*obj
);
53 void dump_xml(Formatter
*f
) const;
55 void encode(bufferlist
& bl
) const {
56 ENCODE_START(1, 1, bl
);
60 void decode(bufferlist::const_iterator
& bl
) {
66 WRITE_CLASS_ENCODER(rgw_s3_key_value_filter
)
68 struct rgw_s3_filter
{
69 rgw_s3_key_filter key_filter
;
70 rgw_s3_key_value_filter metadata_filter
;
71 rgw_s3_key_value_filter tag_filter
;
73 bool has_content() const;
75 bool decode_xml(XMLObj
*obj
);
76 void dump_xml(Formatter
*f
) const;
78 void encode(bufferlist
& bl
) const {
79 ENCODE_START(2, 1, bl
);
80 encode(key_filter
, bl
);
81 encode(metadata_filter
, bl
);
82 encode(tag_filter
, bl
);
86 void decode(bufferlist::const_iterator
& bl
) {
88 decode(key_filter
, bl
);
89 decode(metadata_filter
, bl
);
91 decode(tag_filter
, bl
);
96 WRITE_CLASS_ENCODER(rgw_s3_filter
)
98 using OptionalFilter
= std::optional
<rgw_s3_filter
>;
100 struct rgw_pubsub_topic_filter
;
101 /* S3 notification configuration
102 * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
103 <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
125 <Id>notification1</Id>
126 <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
127 <Event>s3:ObjectCreated:*</Event>
128 <Event>s3:ObjectRemoved:*</Event>
129 </TopicConfiguration>
130 </NotificationConfiguration>
132 struct rgw_pubsub_s3_notification
{
136 rgw::notify::EventTypeList events
;
138 std::string topic_arn
;
140 rgw_s3_filter filter
;
142 bool decode_xml(XMLObj
*obj
);
143 void dump_xml(Formatter
*f
) const;
145 rgw_pubsub_s3_notification() = default;
146 // construct from rgw_pubsub_topic_filter (used by get/list notifications)
147 explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter
& topic_filter
);
150 // return true if the key matches the prefix/suffix/regex rules of the key filter
151 bool match(const rgw_s3_key_filter
& filter
, const std::string
& key
);
152 // return true if the key matches the metadata/tags rules of the metadata/tags filter
153 bool match(const rgw_s3_key_value_filter
& filter
, const KeyValueList
& kvl
);
154 // return true if the event type matches (equal or contained in) one of the events in the list
155 bool match(const rgw::notify::EventTypeList
& events
, rgw::notify::EventType event
);
157 struct rgw_pubsub_s3_notifications
{
158 std::list
<rgw_pubsub_s3_notification
> list
;
159 bool decode_xml(XMLObj
*obj
);
160 void dump_xml(Formatter
*f
) const;
163 /* S3 event records structure
164 * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
176 "requestParameters":{
180 "x-amz-request-id":"",
184 "s3SchemaVersion":"1.0",
185 "configurationId":"",
209 struct rgw_pubsub_s3_record
{
210 constexpr static const char* const json_type_plural
= "Records";
211 std::string eventVersion
= "2.2";
213 std::string eventSource
= "ceph:s3";
215 std::string awsRegion
;
216 // time of the request
217 ceph::real_time eventTime
;
219 std::string eventName
;
220 // user that sent the request
221 std::string userIdentity
;
222 // IP address of source of the request (not implemented)
223 std::string sourceIPAddress
;
224 // request ID (not implemented)
225 std::string x_amz_request_id
;
226 // radosgw that received the request
227 std::string x_amz_id_2
;
228 std::string s3SchemaVersion
= "1.0";
229 // ID received in the notification request
230 std::string configurationId
;
232 std::string bucket_name
;
234 std::string bucket_ownerIdentity
;
236 std::string bucket_arn
;
238 std::string object_key
;
240 uint64_t object_size
= 0;
242 std::string object_etag
;
243 // object version id bucket is versioned
244 std::string object_versionId
;
245 // hexadecimal value used to determine event order for specific key
246 std::string object_sequencer
;
247 // this is an rgw extension (not S3 standard)
248 // used to store a globally unique identifier of the event
249 // that could be used for acking or any other identification of the event
251 // this is an rgw extension holding the internal bucket id
252 std::string bucket_id
;
254 KeyValueList x_meta_map
;
257 // opaque data received from the topic
258 // could be used to identify the gateway
259 std::string opaque_data
;
261 void encode(bufferlist
& bl
) const {
262 ENCODE_START(4, 1, bl
);
263 encode(eventVersion
, bl
);
264 encode(eventSource
, bl
);
265 encode(awsRegion
, bl
);
266 encode(eventTime
, bl
);
267 encode(eventName
, bl
);
268 encode(userIdentity
, bl
);
269 encode(sourceIPAddress
, bl
);
270 encode(x_amz_request_id
, bl
);
271 encode(x_amz_id_2
, bl
);
272 encode(s3SchemaVersion
, bl
);
273 encode(configurationId
, bl
);
274 encode(bucket_name
, bl
);
275 encode(bucket_ownerIdentity
, bl
);
276 encode(bucket_arn
, bl
);
277 encode(object_key
, bl
);
278 encode(object_size
, bl
);
279 encode(object_etag
, bl
);
280 encode(object_versionId
, bl
);
281 encode(object_sequencer
, bl
);
283 encode(bucket_id
, bl
);
284 encode(x_meta_map
, bl
);
286 encode(opaque_data
, bl
);
290 void decode(bufferlist::const_iterator
& bl
) {
292 decode(eventVersion
, bl
);
293 decode(eventSource
, bl
);
294 decode(awsRegion
, bl
);
295 decode(eventTime
, bl
);
296 decode(eventName
, bl
);
297 decode(userIdentity
, bl
);
298 decode(sourceIPAddress
, bl
);
299 decode(x_amz_request_id
, bl
);
300 decode(x_amz_id_2
, bl
);
301 decode(s3SchemaVersion
, bl
);
302 decode(configurationId
, bl
);
303 decode(bucket_name
, bl
);
304 decode(bucket_ownerIdentity
, bl
);
305 decode(bucket_arn
, bl
);
306 decode(object_key
, bl
);
307 decode(object_size
, bl
);
308 decode(object_etag
, bl
);
309 decode(object_versionId
, bl
);
310 decode(object_sequencer
, bl
);
313 decode(bucket_id
, bl
);
314 decode(x_meta_map
, bl
);
320 decode(opaque_data
, bl
);
325 void dump(Formatter
*f
) const;
327 WRITE_CLASS_ENCODER(rgw_pubsub_s3_record
)
329 struct rgw_pubsub_event
{
330 constexpr static const char* const json_type_plural
= "events";
332 std::string event_name
;
334 ceph::real_time timestamp
;
335 JSONFormattable info
;
337 void encode(bufferlist
& bl
) const {
338 ENCODE_START(1, 1, bl
);
340 encode(event_name
, bl
);
342 encode(timestamp
, bl
);
347 void decode(bufferlist::const_iterator
& bl
) {
350 decode(event_name
, bl
);
352 decode(timestamp
, bl
);
357 void dump(Formatter
*f
) const;
359 WRITE_CLASS_ENCODER(rgw_pubsub_event
)
361 // settign a unique ID for an event/record based on object hash and timestamp
362 void set_event_id(std::string
& id
, const std::string
& hash
, const utime_t
& ts
);
364 struct rgw_pubsub_sub_dest
{
365 std::string bucket_name
;
366 std::string oid_prefix
;
367 std::string push_endpoint
;
368 std::string push_endpoint_args
;
369 std::string arn_topic
;
370 bool stored_secret
= false;
372 void encode(bufferlist
& bl
) const {
373 ENCODE_START(4, 1, bl
);
374 encode(bucket_name
, bl
);
375 encode(oid_prefix
, bl
);
376 encode(push_endpoint
, bl
);
377 encode(push_endpoint_args
, bl
);
378 encode(arn_topic
, bl
);
379 encode(stored_secret
, bl
);
383 void decode(bufferlist::const_iterator
& bl
) {
385 decode(bucket_name
, bl
);
386 decode(oid_prefix
, bl
);
387 decode(push_endpoint
, bl
);
389 decode(push_endpoint_args
, bl
);
392 decode(arn_topic
, bl
);
395 decode(stored_secret
, bl
);
400 void dump(Formatter
*f
) const;
401 void dump_xml(Formatter
*f
) const;
403 WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest
)
405 struct rgw_pubsub_sub_config
{
409 rgw_pubsub_sub_dest dest
;
412 void encode(bufferlist
& bl
) const {
413 ENCODE_START(2, 1, bl
);
422 void decode(bufferlist::const_iterator
& bl
) {
434 void dump(Formatter
*f
) const;
436 WRITE_CLASS_ENCODER(rgw_pubsub_sub_config
)
438 struct rgw_pubsub_topic
{
441 rgw_pubsub_sub_dest dest
;
443 std::string opaque_data
;
445 void encode(bufferlist
& bl
) const {
446 ENCODE_START(3, 1, bl
);
451 encode(opaque_data
, bl
);
455 void decode(bufferlist::const_iterator
& bl
) {
464 decode(opaque_data
, bl
);
469 string
to_str() const {
470 return user
.to_str() + "/" + name
;
473 void dump(Formatter
*f
) const;
474 void dump_xml(Formatter
*f
) const;
476 bool operator<(const rgw_pubsub_topic
& t
) const {
477 return to_str().compare(t
.to_str());
480 WRITE_CLASS_ENCODER(rgw_pubsub_topic
)
482 struct rgw_pubsub_topic_subs
{
483 rgw_pubsub_topic topic
;
484 std::set
<std::string
> subs
;
486 void encode(bufferlist
& bl
) const {
487 ENCODE_START(1, 1, bl
);
493 void decode(bufferlist::const_iterator
& bl
) {
500 void dump(Formatter
*f
) const;
502 WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs
)
504 struct rgw_pubsub_topic_filter
{
505 rgw_pubsub_topic topic
;
506 rgw::notify::EventTypeList events
;
508 rgw_s3_filter s3_filter
;
510 void encode(bufferlist
& bl
) const {
511 ENCODE_START(3, 1, bl
);
513 // events are stored as a vector of strings
514 std::vector
<std::string
> tmp_events
;
515 const auto converter
= s3_id
.empty() ? rgw::notify::to_ceph_string
: rgw::notify::to_string
;
516 std::transform(events
.begin(), events
.end(), std::back_inserter(tmp_events
), converter
);
517 encode(tmp_events
, bl
);
519 encode(s3_filter
, bl
);
523 void decode(bufferlist::const_iterator
& bl
) {
526 // events are stored as a vector of strings
528 std::vector
<std::string
> tmp_events
;
529 decode(tmp_events
, bl
);
530 std::transform(tmp_events
.begin(), tmp_events
.end(), std::back_inserter(events
), rgw::notify::from_string
);
535 decode(s3_filter
, bl
);
540 void dump(Formatter
*f
) const;
542 WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter
)
544 struct rgw_pubsub_bucket_topics
{
545 std::map
<std::string
, rgw_pubsub_topic_filter
> topics
;
547 void encode(bufferlist
& bl
) const {
548 ENCODE_START(1, 1, bl
);
553 void decode(bufferlist::const_iterator
& bl
) {
559 void dump(Formatter
*f
) const;
561 WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics
)
563 struct rgw_pubsub_user_topics
{
564 std::map
<std::string
, rgw_pubsub_topic_subs
> topics
;
566 void encode(bufferlist
& bl
) const {
567 ENCODE_START(1, 1, bl
);
572 void decode(bufferlist::const_iterator
& bl
) {
578 void dump(Formatter
*f
) const;
579 void dump_xml(Formatter
*f
) const;
581 WRITE_CLASS_ENCODER(rgw_pubsub_user_topics
)
583 static std::string pubsub_user_oid_prefix
= "pubsub.user.";
589 rgw::sal::RGWRadosStore
*store
;
591 RGWSysObjectCtx obj_ctx
;
593 rgw_raw_obj user_meta_obj
;
595 std::string
user_meta_oid() const {
596 return pubsub_user_oid_prefix
+ user
.to_str();
599 std::string
bucket_meta_oid(const rgw_bucket
& bucket
) const {
600 return pubsub_user_oid_prefix
+ user
.to_str() + ".bucket." + bucket
.name
+ "/" + bucket
.bucket_id
;
603 std::string
sub_meta_oid(const string
& name
) const {
604 return pubsub_user_oid_prefix
+ user
.to_str() + ".sub." + name
;
608 int read(const rgw_raw_obj
& obj
, T
*data
, RGWObjVersionTracker
*objv_tracker
);
611 int write(const rgw_raw_obj
& obj
, const T
& info
, RGWObjVersionTracker
*obj_tracker
);
613 int remove(const rgw_raw_obj
& obj
, RGWObjVersionTracker
*objv_tracker
);
615 int read_user_topics(rgw_pubsub_user_topics
*result
, RGWObjVersionTracker
*objv_tracker
);
616 int write_user_topics(const rgw_pubsub_user_topics
& topics
, RGWObjVersionTracker
*objv_tracker
);
619 RGWUserPubSub(rgw::sal::RGWRadosStore
*_store
, const rgw_user
& _user
);
622 friend class RGWUserPubSub
;
625 rgw_raw_obj bucket_meta_obj
;
627 // read the list of topics associated with a bucket and populate into result
628 // use version tacker to enforce atomicity between read/write
629 // return 0 on success or if no topic was associated with the bucket, error code otherwise
630 int read_topics(rgw_pubsub_bucket_topics
*result
, RGWObjVersionTracker
*objv_tracker
);
631 // set the list of topics associated with a bucket
632 // use version tacker to enforce atomicity between read/write
633 // return 0 on success, error code otherwise
634 int write_topics(const rgw_pubsub_bucket_topics
& topics
, RGWObjVersionTracker
*objv_tracker
);
636 Bucket(RGWUserPubSub
*_ps
, const rgw_bucket
& _bucket
) : ps(_ps
), bucket(_bucket
) {
637 ps
->get_bucket_meta_obj(bucket
, &bucket_meta_obj
);
640 // read the list of topics associated with a bucket and populate into result
641 // return 0 on success or if no topic was associated with the bucket, error code otherwise
642 int get_topics(rgw_pubsub_bucket_topics
*result
);
643 // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
644 // assigning a notification name is optional (needed for S3 compatible notifications)
645 // if the topic already exist on the bucket, the filter event list may be updated
646 // for S3 compliant notifications the version with: s3_filter and notif_name should be used
647 // return -ENOENT if the topic does not exists
648 // return 0 on success, error code otherwise
649 int create_notification(const string
& topic_name
, const rgw::notify::EventTypeList
& events
);
650 int create_notification(const string
& topic_name
, const rgw::notify::EventTypeList
& events
, OptionalFilter s3_filter
, const std::string
& notif_name
);
651 // remove a topic and filter from bucket
652 // if the topic does not exists on the bucket it is a no-op (considered success)
653 // return -ENOENT if the topic does not exists
654 // return 0 on success, error code otherwise
655 int remove_notification(const string
& topic_name
);
658 // base class for subscription
660 friend class RGWUserPubSub
;
662 RGWUserPubSub
* const ps
;
663 const std::string sub
;
664 rgw_raw_obj sub_meta_obj
;
666 int read_sub(rgw_pubsub_sub_config
*result
, RGWObjVersionTracker
*objv_tracker
);
667 int write_sub(const rgw_pubsub_sub_config
& sub_conf
, RGWObjVersionTracker
*objv_tracker
);
668 int remove_sub(RGWObjVersionTracker
*objv_tracker
);
670 Sub(RGWUserPubSub
*_ps
, const std::string
& _sub
) : ps(_ps
), sub(_sub
) {
671 ps
->get_sub_meta_obj(sub
, &sub_meta_obj
);
674 virtual ~Sub() = default;
676 int subscribe(const string
& topic_name
, const rgw_pubsub_sub_dest
& dest
, const std::string
& s3_id
="");
677 int unsubscribe(const string
& topic_name
);
678 int get_conf(rgw_pubsub_sub_config
* result
);
680 static const int DEFAULT_MAX_EVENTS
= 100;
681 // followint virtual methods should only be called in derived
682 virtual int list_events(const string
& marker
, int max_events
) {ceph_assert(false);}
683 virtual int remove_event(const string
& event_id
) {ceph_assert(false);}
684 virtual void dump(Formatter
* f
) const {ceph_assert(false);}
687 // subscription with templated list of events to support both S3 compliant and Ceph specific events
688 template<typename EventType
>
689 class SubWithEvents
: public Sub
{
691 struct list_events_result
{
692 std::string next_marker
;
693 bool is_truncated
{false};
694 void dump(Formatter
*f
) const;
695 std::vector
<EventType
> events
;
699 SubWithEvents(RGWUserPubSub
*_ps
, const string
& _sub
) : Sub(_ps
, _sub
) {}
701 virtual ~SubWithEvents() = default;
703 int list_events(const string
& marker
, int max_events
) override
;
704 int remove_event(const string
& event_id
) override
;
705 void dump(Formatter
* f
) const override
;
708 using BucketRef
= std::shared_ptr
<Bucket
>;
709 using SubRef
= std::shared_ptr
<Sub
>;
711 BucketRef
get_bucket(const rgw_bucket
& bucket
) {
712 return std::make_shared
<Bucket
>(this, bucket
);
715 SubRef
get_sub(const string
& sub
) {
716 return std::make_shared
<Sub
>(this, sub
);
719 SubRef
get_sub_with_events(const string
& sub
) {
720 auto tmpsub
= Sub(this, sub
);
721 rgw_pubsub_sub_config conf
;
722 if (tmpsub
.get_conf(&conf
) < 0) {
725 if (conf
.s3_id
.empty()) {
726 return std::make_shared
<SubWithEvents
<rgw_pubsub_event
>>(this, sub
);
728 return std::make_shared
<SubWithEvents
<rgw_pubsub_s3_record
>>(this, sub
);
731 void get_user_meta_obj(rgw_raw_obj
*obj
) const;
732 void get_bucket_meta_obj(const rgw_bucket
& bucket
, rgw_raw_obj
*obj
) const;
734 void get_sub_meta_obj(const string
& name
, rgw_raw_obj
*obj
) const;
736 // get all topics defined for the user and populate them into "result"
737 // return 0 on success or if no topics exist, error code otherwise
738 int get_user_topics(rgw_pubsub_user_topics
*result
);
739 // get a topic with its subscriptions by its name and populate it into "result"
740 // return -ENOENT if the topic does not exists
741 // return 0 on success, error code otherwise
742 int get_topic(const string
& name
, rgw_pubsub_topic_subs
*result
);
743 // get a topic with by its name and populate it into "result"
744 // return -ENOENT if the topic does not exists
745 // return 0 on success, error code otherwise
746 int get_topic(const string
& name
, rgw_pubsub_topic
*result
);
747 // create a topic with a name only
748 // if the topic already exists it is a no-op (considered success)
749 // return 0 on success, error code otherwise
750 int create_topic(const string
& name
);
751 // create a topic with push destination information and ARN
752 // if the topic already exists the destination and ARN values may be updated (considered succsess)
753 // return 0 on success, error code otherwise
754 int create_topic(const string
& name
, const rgw_pubsub_sub_dest
& dest
, const std::string
& arn
, const std::string
& opaque_data
);
755 // remove a topic according to its name
756 // if the topic does not exists it is a no-op (considered success)
757 // return 0 on success, error code otherwise
758 int remove_topic(const string
& name
);
763 int RGWUserPubSub::read(const rgw_raw_obj
& obj
, T
*result
, RGWObjVersionTracker
*objv_tracker
)
766 int ret
= rgw_get_system_obj(obj_ctx
,
770 nullptr, null_yield
, nullptr, nullptr);
775 auto iter
= bl
.cbegin();
777 decode(*result
, iter
);
778 } catch (buffer::error
& err
) {
786 int RGWUserPubSub::write(const rgw_raw_obj
& obj
, const T
& info
, RGWObjVersionTracker
*objv_tracker
)
791 int ret
= rgw_put_system_obj(obj_ctx
, obj
.pool
, obj
.oid
,
792 bl
, false, objv_tracker
,
798 obj_ctx
.invalidate(obj
);