1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
9 #include "rgw_notify_event_type.h"
10 #include <boost/container/flat_map.hpp>
14 struct rgw_s3_key_filter
{
15 std::string prefix_rule
;
16 std::string suffix_rule
;
17 std::string regex_rule
;
19 bool has_content() const;
21 void dump(Formatter
*f
) const;
22 bool decode_xml(XMLObj
*obj
);
23 void dump_xml(Formatter
*f
) const;
25 void encode(bufferlist
& bl
) const {
26 ENCODE_START(1, 1, bl
);
27 encode(prefix_rule
, bl
);
28 encode(suffix_rule
, bl
);
29 encode(regex_rule
, bl
);
33 void decode(bufferlist::const_iterator
& bl
) {
35 decode(prefix_rule
, bl
);
36 decode(suffix_rule
, bl
);
37 decode(regex_rule
, bl
);
41 WRITE_CLASS_ENCODER(rgw_s3_key_filter
)
43 using KeyValueMap
= boost::container::flat_map
<std::string
, std::string
>;
44 using KeyMultiValueMap
= std::multimap
<std::string
, std::string
>;
46 struct rgw_s3_key_value_filter
{
49 bool has_content() const;
51 void dump(Formatter
*f
) const;
52 bool decode_xml(XMLObj
*obj
);
53 void dump_xml(Formatter
*f
) const;
55 void encode(bufferlist
& bl
) const {
56 ENCODE_START(1, 1, bl
);
60 void decode(bufferlist::const_iterator
& bl
) {
66 WRITE_CLASS_ENCODER(rgw_s3_key_value_filter
)
68 struct rgw_s3_filter
{
69 rgw_s3_key_filter key_filter
;
70 rgw_s3_key_value_filter metadata_filter
;
71 rgw_s3_key_value_filter tag_filter
;
73 bool has_content() const;
75 void dump(Formatter
*f
) const;
76 bool decode_xml(XMLObj
*obj
);
77 void dump_xml(Formatter
*f
) const;
79 void encode(bufferlist
& bl
) const {
80 ENCODE_START(2, 1, bl
);
81 encode(key_filter
, bl
);
82 encode(metadata_filter
, bl
);
83 encode(tag_filter
, bl
);
87 void decode(bufferlist::const_iterator
& bl
) {
89 decode(key_filter
, bl
);
90 decode(metadata_filter
, bl
);
92 decode(tag_filter
, bl
);
97 WRITE_CLASS_ENCODER(rgw_s3_filter
)
99 using OptionalFilter
= std::optional
<rgw_s3_filter
>;
101 struct rgw_pubsub_topic_filter
;
102 /* S3 notification configuration
103 * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
104 <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
126 <Id>notification1</Id>
127 <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
128 <Event>s3:ObjectCreated:*</Event>
129 <Event>s3:ObjectRemoved:*</Event>
130 </TopicConfiguration>
131 </NotificationConfiguration>
133 struct rgw_pubsub_s3_notification
{
137 rgw::notify::EventTypeList events
;
139 std::string topic_arn
;
141 rgw_s3_filter filter
;
143 bool decode_xml(XMLObj
*obj
);
144 void dump_xml(Formatter
*f
) const;
146 rgw_pubsub_s3_notification() = default;
147 // construct from rgw_pubsub_topic_filter (used by get/list notifications)
148 explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter
& topic_filter
);
151 // return true if the key matches the prefix/suffix/regex rules of the key filter
152 bool match(const rgw_s3_key_filter
& filter
, const std::string
& key
);
154 // return true if the key matches the metadata rules of the metadata filter
155 bool match(const rgw_s3_key_value_filter
& filter
, const KeyValueMap
& kv
);
157 // return true if the key matches the tag rules of the tag filter
158 bool match(const rgw_s3_key_value_filter
& filter
, const KeyMultiValueMap
& kv
);
160 // return true if the event type matches (equal or contained in) one of the events in the list
161 bool match(const rgw::notify::EventTypeList
& events
, rgw::notify::EventType event
);
163 struct rgw_pubsub_s3_notifications
{
164 std::list
<rgw_pubsub_s3_notification
> list
;
165 bool decode_xml(XMLObj
*obj
);
166 void dump_xml(Formatter
*f
) const;
169 /* S3 event records structure
170 * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
182 "requestParameters":{
186 "x-amz-request-id":"",
190 "s3SchemaVersion":"1.0",
191 "configurationId":"",
215 struct rgw_pubsub_s3_event
{
216 constexpr static const char* const json_type_plural
= "Records";
217 std::string eventVersion
= "2.2";
219 std::string eventSource
= "ceph:s3";
221 std::string awsRegion
;
222 // time of the request
223 ceph::real_time eventTime
;
225 std::string eventName
;
226 // user that sent the request
227 std::string userIdentity
;
228 // IP address of source of the request (not implemented)
229 std::string sourceIPAddress
;
230 // request ID (not implemented)
231 std::string x_amz_request_id
;
232 // radosgw that received the request
233 std::string x_amz_id_2
;
234 std::string s3SchemaVersion
= "1.0";
235 // ID received in the notification request
236 std::string configurationId
;
238 std::string bucket_name
;
240 std::string bucket_ownerIdentity
;
242 std::string bucket_arn
;
244 std::string object_key
;
246 uint64_t object_size
= 0;
248 std::string object_etag
;
249 // object version id bucket is versioned
250 std::string object_versionId
;
251 // hexadecimal value used to determine event order for specific key
252 std::string object_sequencer
;
253 // this is an rgw extension (not S3 standard)
254 // used to store a globally unique identifier of the event
255 // that could be used for acking or any other identification of the event
257 // this is an rgw extension holding the internal bucket id
258 std::string bucket_id
;
260 KeyValueMap x_meta_map
;
262 KeyMultiValueMap tags
;
263 // opaque data received from the topic
264 // could be used to identify the gateway
265 std::string opaque_data
;
267 void encode(bufferlist
& bl
) const {
268 ENCODE_START(4, 1, bl
);
269 encode(eventVersion
, bl
);
270 encode(eventSource
, bl
);
271 encode(awsRegion
, bl
);
272 encode(eventTime
, bl
);
273 encode(eventName
, bl
);
274 encode(userIdentity
, bl
);
275 encode(sourceIPAddress
, bl
);
276 encode(x_amz_request_id
, bl
);
277 encode(x_amz_id_2
, bl
);
278 encode(s3SchemaVersion
, bl
);
279 encode(configurationId
, bl
);
280 encode(bucket_name
, bl
);
281 encode(bucket_ownerIdentity
, bl
);
282 encode(bucket_arn
, bl
);
283 encode(object_key
, bl
);
284 encode(object_size
, bl
);
285 encode(object_etag
, bl
);
286 encode(object_versionId
, bl
);
287 encode(object_sequencer
, bl
);
289 encode(bucket_id
, bl
);
290 encode(x_meta_map
, bl
);
292 encode(opaque_data
, bl
);
296 void decode(bufferlist::const_iterator
& bl
) {
298 decode(eventVersion
, bl
);
299 decode(eventSource
, bl
);
300 decode(awsRegion
, bl
);
301 decode(eventTime
, bl
);
302 decode(eventName
, bl
);
303 decode(userIdentity
, bl
);
304 decode(sourceIPAddress
, bl
);
305 decode(x_amz_request_id
, bl
);
306 decode(x_amz_id_2
, bl
);
307 decode(s3SchemaVersion
, bl
);
308 decode(configurationId
, bl
);
309 decode(bucket_name
, bl
);
310 decode(bucket_ownerIdentity
, bl
);
311 decode(bucket_arn
, bl
);
312 decode(object_key
, bl
);
313 decode(object_size
, bl
);
314 decode(object_etag
, bl
);
315 decode(object_versionId
, bl
);
316 decode(object_sequencer
, bl
);
319 decode(bucket_id
, bl
);
320 decode(x_meta_map
, bl
);
326 decode(opaque_data
, bl
);
331 void dump(Formatter
*f
) const;
333 WRITE_CLASS_ENCODER(rgw_pubsub_s3_event
)
335 // setting a unique ID for an event based on object hash and timestamp
336 void set_event_id(std::string
& id
, const std::string
& hash
, const utime_t
& ts
);
338 struct rgw_pubsub_dest
{
339 std::string push_endpoint
;
340 std::string push_endpoint_args
;
341 std::string arn_topic
;
342 bool stored_secret
= false;
343 bool persistent
= false;
345 void encode(bufferlist
& bl
) const {
346 ENCODE_START(5, 1, bl
);
349 encode(push_endpoint
, bl
);
350 encode(push_endpoint_args
, bl
);
351 encode(arn_topic
, bl
);
352 encode(stored_secret
, bl
);
353 encode(persistent
, bl
);
357 void decode(bufferlist::const_iterator
& bl
) {
362 decode(push_endpoint
, bl
);
364 decode(push_endpoint_args
, bl
);
367 decode(arn_topic
, bl
);
370 decode(stored_secret
, bl
);
373 decode(persistent
, bl
);
378 void dump(Formatter
*f
) const;
379 void dump_xml(Formatter
*f
) const;
380 std::string
to_json_str() const;
382 WRITE_CLASS_ENCODER(rgw_pubsub_dest
)
384 struct rgw_pubsub_topic
{
387 rgw_pubsub_dest dest
;
389 std::string opaque_data
;
391 void encode(bufferlist
& bl
) const {
392 ENCODE_START(3, 1, bl
);
397 encode(opaque_data
, bl
);
401 void decode(bufferlist::const_iterator
& bl
) {
410 decode(opaque_data
, bl
);
415 std::string
to_str() const {
416 return user
.tenant
+ "/" + name
;
419 void dump(Formatter
*f
) const;
420 void dump_xml(Formatter
*f
) const;
421 void dump_xml_as_attributes(Formatter
*f
) const;
423 bool operator<(const rgw_pubsub_topic
& t
) const {
424 return to_str().compare(t
.to_str());
427 WRITE_CLASS_ENCODER(rgw_pubsub_topic
)
429 // this struct deprecated and remain only for backward compatibility
430 struct rgw_pubsub_topic_subs
{
431 rgw_pubsub_topic topic
;
432 std::set
<std::string
> subs
;
434 void encode(bufferlist
& bl
) const {
435 ENCODE_START(1, 1, bl
);
441 void decode(bufferlist::const_iterator
& bl
) {
448 void dump(Formatter
*f
) const;
450 WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs
)
452 struct rgw_pubsub_topic_filter
{
453 rgw_pubsub_topic topic
;
454 rgw::notify::EventTypeList events
;
456 rgw_s3_filter s3_filter
;
458 void encode(bufferlist
& bl
) const {
459 ENCODE_START(3, 1, bl
);
461 // events are stored as a vector of std::strings
462 std::vector
<std::string
> tmp_events
;
463 std::transform(events
.begin(), events
.end(), std::back_inserter(tmp_events
), rgw::notify::to_string
);
464 encode(tmp_events
, bl
);
466 encode(s3_filter
, bl
);
470 void decode(bufferlist::const_iterator
& bl
) {
473 // events are stored as a vector of std::strings
475 std::vector
<std::string
> tmp_events
;
476 decode(tmp_events
, bl
);
477 std::transform(tmp_events
.begin(), tmp_events
.end(), std::back_inserter(events
), rgw::notify::from_string
);
482 decode(s3_filter
, bl
);
487 void dump(Formatter
*f
) const;
489 WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter
)
491 struct rgw_pubsub_bucket_topics
{
492 std::map
<std::string
, rgw_pubsub_topic_filter
> topics
;
494 void encode(bufferlist
& bl
) const {
495 ENCODE_START(1, 1, bl
);
500 void decode(bufferlist::const_iterator
& bl
) {
506 void dump(Formatter
*f
) const;
508 WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics
)
510 struct rgw_pubsub_topics
{
511 std::map
<std::string
, rgw_pubsub_topic
> topics
;
513 void encode(bufferlist
& bl
) const {
514 ENCODE_START(2, 2, bl
);
519 void decode(bufferlist::const_iterator
& bl
) {
524 std::map
<std::string
, rgw_pubsub_topic_subs
> v1topics
;
525 decode(v1topics
, bl
);
526 std::transform(v1topics
.begin(), v1topics
.end(), std::inserter(topics
, topics
.end()),
527 [](const auto& entry
) {
528 return std::pair
<std::string
, rgw_pubsub_topic
>(entry
.first
, entry
.second
.topic
);
534 void dump(Formatter
*f
) const;
535 void dump_xml(Formatter
*f
) const;
537 WRITE_CLASS_ENCODER(rgw_pubsub_topics
)
543 rgw::sal::Driver
* const driver
;
544 const std::string tenant
;
546 int read_topics(const DoutPrefixProvider
*dpp
, rgw_pubsub_topics
& result
,
547 RGWObjVersionTracker
* objv_tracker
, optional_yield y
) const;
548 int write_topics(const DoutPrefixProvider
*dpp
, const rgw_pubsub_topics
& topics
,
549 RGWObjVersionTracker
* objv_tracker
, optional_yield y
) const;
552 RGWPubSub(rgw::sal::Driver
* _driver
, const std::string
& tenant
);
555 friend class RGWPubSub
;
557 rgw::sal::Bucket
* const bucket
;
559 // read the list of topics associated with a bucket and populate into result
560 // use version tacker to enforce atomicity between read/write
561 // return 0 on success or if no topic was associated with the bucket, error code otherwise
562 int read_topics(const DoutPrefixProvider
*dpp
, rgw_pubsub_bucket_topics
& result
,
563 RGWObjVersionTracker
* objv_tracker
, optional_yield y
) const;
564 // set the list of topics associated with a bucket
565 // use version tacker to enforce atomicity between read/write
566 // return 0 on success, error code otherwise
567 int write_topics(const DoutPrefixProvider
*dpp
, const rgw_pubsub_bucket_topics
& topics
,
568 RGWObjVersionTracker
* objv_tracker
, optional_yield y
) const;
569 int remove_notification_inner(const DoutPrefixProvider
*dpp
, const std::string
& notification_id
,
570 bool notif_id_or_topic
, optional_yield y
) const;
572 Bucket(const RGWPubSub
& _ps
, rgw::sal::Bucket
* _bucket
) :
573 ps(_ps
), bucket(_bucket
)
576 // get the list of topics associated with a bucket and populate into result
577 // return 0 on success or if no topic was associated with the bucket, error code otherwise
578 int get_topics(const DoutPrefixProvider
*dpp
, rgw_pubsub_bucket_topics
& result
, optional_yield y
) const {
579 return read_topics(dpp
, result
, nullptr, y
);
581 // get a bucket_topic with by its name and populate it into "result"
582 // return -ENOENT if the topic does not exists
583 // return 0 on success, error code otherwise
584 int get_notification_by_id(const DoutPrefixProvider
*dpp
, const std::string
& notification_id
, rgw_pubsub_topic_filter
& result
, optional_yield y
) const;
585 // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
586 // assigning a notification name is optional (needed for S3 compatible notifications)
587 // if the topic already exist on the bucket, the filter event list may be updated
588 // for S3 compliant notifications the version with: s3_filter and notif_name should be used
589 // return -ENOENT if the topic does not exists
590 // return 0 on success, error code otherwise
591 int create_notification(const DoutPrefixProvider
*dpp
, const std::string
& topic_name
,
592 const rgw::notify::EventTypeList
& events
, optional_yield y
) const;
593 int create_notification(const DoutPrefixProvider
*dpp
, const std::string
& topic_name
,
594 const rgw::notify::EventTypeList
& events
, OptionalFilter s3_filter
, const std::string
& notif_name
, optional_yield y
) const;
595 // remove a topic and filter from bucket
596 // if the topic does not exists on the bucket it is a no-op (considered success)
597 // return -ENOENT if the notification-id/topic does not exists
598 // return 0 on success, error code otherwise
599 int remove_notification_by_id(const DoutPrefixProvider
*dpp
, const std::string
& notif_id
, optional_yield y
) const;
600 int remove_notification(const DoutPrefixProvider
*dpp
, const std::string
& topic_name
, optional_yield y
) const;
601 // remove all notifications (and autogenerated topics) associated with the bucket
602 // return 0 on success or if no topic was associated with the bucket, error code otherwise
603 int remove_notifications(const DoutPrefixProvider
*dpp
, optional_yield y
) const;
606 // get the list of topics
607 // return 0 on success or if no topic was associated with the bucket, error code otherwise
608 int get_topics(const DoutPrefixProvider
*dpp
, rgw_pubsub_topics
& result
, optional_yield y
) const {
609 return read_topics(dpp
, result
, nullptr, y
);
611 // get a topic with by its name and populate it into "result"
612 // return -ENOENT if the topic does not exists
613 // return 0 on success, error code otherwise
614 int get_topic(const DoutPrefixProvider
*dpp
, const std::string
& name
, rgw_pubsub_topic
& result
, optional_yield y
) const;
615 // create a topic with a name only
616 // if the topic already exists it is a no-op (considered success)
617 // return 0 on success, error code otherwise
618 int create_topic(const DoutPrefixProvider
*dpp
, const std::string
& name
, optional_yield y
) const;
619 // create a topic with push destination information and ARN
620 // if the topic already exists the destination and ARN values may be updated (considered succsess)
621 // return 0 on success, error code otherwise
622 int create_topic(const DoutPrefixProvider
*dpp
, const std::string
& name
, const rgw_pubsub_dest
& dest
,
623 const std::string
& arn
, const std::string
& opaque_data
, optional_yield y
) const;
624 // remove a topic according to its name
625 // if the topic does not exists it is a no-op (considered success)
626 // return 0 on success, error code otherwise
627 int remove_topic(const DoutPrefixProvider
*dpp
, const std::string
& name
, optional_yield y
) const;