1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "services/svc_zone.h"
7 #include "rgw_pubsub.h"
11 #include "rgw_pubsub_push.h"
15 #define dout_subsys ceph_subsys_rgw
17 void set_event_id(std::string
& id
, const std::string
& hash
, const utime_t
& ts
) {
19 const auto len
= snprintf(buf
, sizeof(buf
), "%010ld.%06ld.%s", (long)ts
.sec(), (long)ts
.usec(), hash
.c_str());
25 bool rgw_s3_key_filter::decode_xml(XMLObj
* obj
) {
26 XMLObjIter iter
= obj
->find("FilterRule");
29 const auto throw_if_missing
= true;
30 auto prefix_not_set
= true;
31 auto suffix_not_set
= true;
32 auto regex_not_set
= true;
35 while ((o
= iter
.get_next())) {
36 RGWXMLDecoder::decode_xml("Name", name
, o
, throw_if_missing
);
37 if (name
== "prefix" && prefix_not_set
) {
38 prefix_not_set
= false;
39 RGWXMLDecoder::decode_xml("Value", prefix_rule
, o
, throw_if_missing
);
40 } else if (name
== "suffix" && suffix_not_set
) {
41 suffix_not_set
= false;
42 RGWXMLDecoder::decode_xml("Value", suffix_rule
, o
, throw_if_missing
);
43 } else if (name
== "regex" && regex_not_set
) {
44 regex_not_set
= false;
45 RGWXMLDecoder::decode_xml("Value", regex_rule
, o
, throw_if_missing
);
47 throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name
+ "'");
53 void rgw_s3_key_filter::dump_xml(Formatter
*f
) const {
54 if (!prefix_rule
.empty()) {
55 f
->open_object_section("FilterRule");
56 ::encode_xml("Name", "prefix", f
);
57 ::encode_xml("Value", prefix_rule
, f
);
60 if (!suffix_rule
.empty()) {
61 f
->open_object_section("FilterRule");
62 ::encode_xml("Name", "suffix", f
);
63 ::encode_xml("Value", suffix_rule
, f
);
66 if (!regex_rule
.empty()) {
67 f
->open_object_section("FilterRule");
68 ::encode_xml("Name", "regex", f
);
69 ::encode_xml("Value", regex_rule
, f
);
74 bool rgw_s3_key_filter::has_content() const {
75 return !(prefix_rule
.empty() && suffix_rule
.empty() && regex_rule
.empty());
78 bool rgw_s3_key_value_filter::decode_xml(XMLObj
* obj
) {
80 XMLObjIter iter
= obj
->find("FilterRule");
83 const auto throw_if_missing
= true;
88 while ((o
= iter
.get_next())) {
89 RGWXMLDecoder::decode_xml("Name", key
, o
, throw_if_missing
);
90 RGWXMLDecoder::decode_xml("Value", value
, o
, throw_if_missing
);
91 kv
.emplace(key
, value
);
96 void rgw_s3_key_value_filter::dump_xml(Formatter
*f
) const {
97 for (const auto& key_value
: kv
) {
98 f
->open_object_section("FilterRule");
99 ::encode_xml("Name", key_value
.first
, f
);
100 ::encode_xml("Value", key_value
.second
, f
);
105 bool rgw_s3_key_value_filter::has_content() const {
109 bool rgw_s3_filter::decode_xml(XMLObj
* obj
) {
110 RGWXMLDecoder::decode_xml("S3Key", key_filter
, obj
);
111 RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter
, obj
);
112 RGWXMLDecoder::decode_xml("S3Tags", tag_filter
, obj
);
116 void rgw_s3_filter::dump_xml(Formatter
*f
) const {
117 if (key_filter
.has_content()) {
118 ::encode_xml("S3Key", key_filter
, f
);
120 if (metadata_filter
.has_content()) {
121 ::encode_xml("S3Metadata", metadata_filter
, f
);
123 if (tag_filter
.has_content()) {
124 ::encode_xml("S3Tags", tag_filter
, f
);
128 bool rgw_s3_filter::has_content() const {
129 return key_filter
.has_content() ||
130 metadata_filter
.has_content() ||
131 tag_filter
.has_content();
134 bool match(const rgw_s3_key_filter
& filter
, const std::string
& key
) {
135 const auto key_size
= key
.size();
136 const auto prefix_size
= filter
.prefix_rule
.size();
137 if (prefix_size
!= 0) {
138 // prefix rule exists
139 if (prefix_size
> key_size
) {
140 // if prefix is longer than key, we fail
143 if (!std::equal(filter
.prefix_rule
.begin(), filter
.prefix_rule
.end(), key
.begin())) {
147 const auto suffix_size
= filter
.suffix_rule
.size();
148 if (suffix_size
!= 0) {
149 // suffix rule exists
150 if (suffix_size
> key_size
) {
151 // if suffix is longer than key, we fail
154 if (!std::equal(filter
.suffix_rule
.begin(), filter
.suffix_rule
.end(), (key
.end() - suffix_size
))) {
158 if (!filter
.regex_rule
.empty()) {
159 // TODO add regex chaching in the filter
160 const std::regex
base_regex(filter
.regex_rule
);
161 if (!std::regex_match(key
, base_regex
)) {
168 bool match(const rgw_s3_key_value_filter
& filter
, const KeyValueMap
& kv
) {
169 // all filter pairs must exist with the same value in the object's metadata/tags
170 // object metadata/tags may include items not in the filter
171 return std::includes(kv
.begin(), kv
.end(), filter
.kv
.begin(), filter
.kv
.end());
174 bool match(const rgw_s3_key_value_filter
& filter
, const KeyMultiValueMap
& kv
) {
175 // all filter pairs must exist with the same value in the object's metadata/tags
176 // object metadata/tags may include items not in the filter
177 for (auto& filter
: filter
.kv
) {
178 auto result
= kv
.equal_range(filter
.first
);
179 if (std::any_of(result
.first
, result
.second
, [&filter
](const std::pair
<std::string
, std::string
>& p
) { return p
.second
== filter
.second
;}))
187 bool match(const rgw::notify::EventTypeList
& events
, rgw::notify::EventType event
) {
188 // if event list exists, and none of the events in the list matches the event type, filter the message
189 if (!events
.empty() && std::find(events
.begin(), events
.end(), event
) == events
.end()) {
195 void do_decode_xml_obj(rgw::notify::EventTypeList
& l
, const std::string
& name
, XMLObj
*obj
) {
198 XMLObjIter iter
= obj
->find(name
);
201 while ((o
= iter
.get_next())) {
203 decode_xml_obj(val
, o
);
204 l
.push_back(rgw::notify::from_string(val
));
208 bool rgw_pubsub_s3_notification::decode_xml(XMLObj
*obj
) {
209 const auto throw_if_missing
= true;
210 RGWXMLDecoder::decode_xml("Id", id
, obj
, throw_if_missing
);
212 RGWXMLDecoder::decode_xml("Topic", topic_arn
, obj
, throw_if_missing
);
214 RGWXMLDecoder::decode_xml("Filter", filter
, obj
);
216 do_decode_xml_obj(events
, "Event", obj
);
217 if (events
.empty()) {
218 // if no events are provided, we assume all events
219 events
.push_back(rgw::notify::ObjectCreated
);
220 events
.push_back(rgw::notify::ObjectRemoved
);
225 void rgw_pubsub_s3_notification::dump_xml(Formatter
*f
) const {
226 ::encode_xml("Id", id
, f
);
227 ::encode_xml("Topic", topic_arn
.c_str(), f
);
228 if (filter
.has_content()) {
229 ::encode_xml("Filter", filter
, f
);
231 for (const auto& event
: events
) {
232 ::encode_xml("Event", rgw::notify::to_string(event
), f
);
236 bool rgw_pubsub_s3_notifications::decode_xml(XMLObj
*obj
) {
237 do_decode_xml_obj(list
, "TopicConfiguration", obj
);
241 rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter
& topic_filter
) :
242 id(topic_filter
.s3_id
), events(topic_filter
.events
), topic_arn(topic_filter
.topic
.arn
), filter(topic_filter
.s3_filter
) {}
244 void rgw_pubsub_s3_notifications::dump_xml(Formatter
*f
) const {
245 do_encode_xml("NotificationConfiguration", list
, "TopicConfiguration", f
);
248 void rgw_pubsub_s3_event::dump(Formatter
*f
) const {
249 encode_json("eventVersion", eventVersion
, f
);
250 encode_json("eventSource", eventSource
, f
);
251 encode_json("awsRegion", awsRegion
, f
);
252 utime_t
ut(eventTime
);
253 encode_json("eventTime", ut
, f
);
254 encode_json("eventName", eventName
, f
);
256 Formatter::ObjectSection
s(*f
, "userIdentity");
257 encode_json("principalId", userIdentity
, f
);
260 Formatter::ObjectSection
s(*f
, "requestParameters");
261 encode_json("sourceIPAddress", sourceIPAddress
, f
);
264 Formatter::ObjectSection
s(*f
, "responseElements");
265 encode_json("x-amz-request-id", x_amz_request_id
, f
);
266 encode_json("x-amz-id-2", x_amz_id_2
, f
);
269 Formatter::ObjectSection
s(*f
, "s3");
270 encode_json("s3SchemaVersion", s3SchemaVersion
, f
);
271 encode_json("configurationId", configurationId
, f
);
273 Formatter::ObjectSection
sub_s(*f
, "bucket");
274 encode_json("name", bucket_name
, f
);
276 Formatter::ObjectSection
sub_sub_s(*f
, "ownerIdentity");
277 encode_json("principalId", bucket_ownerIdentity
, f
);
279 encode_json("arn", bucket_arn
, f
);
280 encode_json("id", bucket_id
, f
);
283 Formatter::ObjectSection
sub_s(*f
, "object");
284 encode_json("key", object_key
, f
);
285 encode_json("size", object_size
, f
);
286 encode_json("eTag", object_etag
, f
);
287 encode_json("versionId", object_versionId
, f
);
288 encode_json("sequencer", object_sequencer
, f
);
289 encode_json("metadata", x_meta_map
, f
);
290 encode_json("tags", tags
, f
);
293 encode_json("eventId", id
, f
);
294 encode_json("opaqueData", opaque_data
, f
);
297 void rgw_pubsub_topic::dump(Formatter
*f
) const
299 encode_json("user", user
, f
);
300 encode_json("name", name
, f
);
301 encode_json("dest", dest
, f
);
302 encode_json("arn", arn
, f
);
303 encode_json("opaqueData", opaque_data
, f
);
306 void rgw_pubsub_topic::dump_xml(Formatter
*f
) const
308 encode_xml("User", user
, f
);
309 encode_xml("Name", name
, f
);
310 encode_xml("EndPoint", dest
, f
);
311 encode_xml("TopicArn", arn
, f
);
312 encode_xml("OpaqueData", opaque_data
, f
);
315 void encode_xml_key_value_entry(const std::string
& key
, const std::string
& value
, Formatter
*f
) {
316 f
->open_object_section("entry");
317 encode_xml("key", key
, f
);
318 encode_xml("value", value
, f
);
319 f
->close_section(); // entry
322 void rgw_pubsub_topic::dump_xml_as_attributes(Formatter
*f
) const
324 f
->open_array_section("Attributes");
325 std::string str_user
;
326 user
.to_str(str_user
);
327 encode_xml_key_value_entry("User", str_user
, f
);
328 encode_xml_key_value_entry("Name", name
, f
);
329 encode_xml_key_value_entry("EndPoint", dest
.to_json_str(), f
);
330 encode_xml_key_value_entry("TopicArn", arn
, f
);
331 encode_xml_key_value_entry("OpaqueData", opaque_data
, f
);
332 f
->close_section(); // Attributes
335 void encode_json(const char *name
, const rgw::notify::EventTypeList
& l
, Formatter
*f
)
337 f
->open_array_section(name
);
338 for (auto iter
= l
.cbegin(); iter
!= l
.cend(); ++iter
) {
339 f
->dump_string("obj", rgw::notify::to_string(*iter
));
344 void rgw_pubsub_topic_filter::dump(Formatter
*f
) const
346 encode_json("topic", topic
, f
);
347 encode_json("events", events
, f
);
350 void rgw_pubsub_bucket_topics::dump(Formatter
*f
) const
352 Formatter::ArraySection
s(*f
, "topics");
353 for (auto& t
: topics
) {
354 encode_json(t
.first
.c_str(), t
.second
, f
);
358 void rgw_pubsub_topics::dump(Formatter
*f
) const
360 Formatter::ArraySection
s(*f
, "topics");
361 for (auto& t
: topics
) {
362 encode_json(t
.first
.c_str(), t
.second
, f
);
366 void rgw_pubsub_topics::dump_xml(Formatter
*f
) const
368 for (auto& t
: topics
) {
369 encode_xml("member", t
.second
, f
);
373 void rgw_pubsub_dest::dump(Formatter
*f
) const
375 encode_json("push_endpoint", push_endpoint
, f
);
376 encode_json("push_endpoint_args", push_endpoint_args
, f
);
377 encode_json("push_endpoint_topic", arn_topic
, f
);
378 encode_json("stored_secret", stored_secret
, f
);
379 encode_json("persistent", persistent
, f
);
382 void rgw_pubsub_dest::dump_xml(Formatter
*f
) const
384 encode_xml("EndpointAddress", push_endpoint
, f
);
385 encode_xml("EndpointArgs", push_endpoint_args
, f
);
386 encode_xml("EndpointTopic", arn_topic
, f
);
387 encode_xml("HasStoredSecret", stored_secret
, f
);
388 encode_xml("Persistent", persistent
, f
);
391 std::string
rgw_pubsub_dest::to_json_str() const
394 f
.open_object_section("");
395 encode_json("EndpointAddress", push_endpoint
, &f
);
396 encode_json("EndpointArgs", push_endpoint_args
, &f
);
397 encode_json("EndpointTopic", arn_topic
, &f
);
398 encode_json("HasStoredSecret", stored_secret
, &f
);
399 encode_json("Persistent", persistent
, &f
);
401 std::stringstream ss
;
406 RGWPubSub::RGWPubSub(rgw::sal::Driver
* _driver
, const std::string
& _tenant
)
407 : driver(_driver
), tenant(_tenant
)
410 int RGWPubSub::read_topics(const DoutPrefixProvider
*dpp
, rgw_pubsub_topics
& result
,
411 RGWObjVersionTracker
*objv_tracker
, optional_yield y
) const
413 const int ret
= driver
->read_topics(tenant
, result
, objv_tracker
, y
, dpp
);
415 ldpp_dout(dpp
, 10) << "WARNING: failed to read topics info: ret=" << ret
<< dendl
;
421 int RGWPubSub::write_topics(const DoutPrefixProvider
*dpp
, const rgw_pubsub_topics
& topics
,
422 RGWObjVersionTracker
*objv_tracker
, optional_yield y
) const
424 const int ret
= driver
->write_topics(tenant
, topics
, objv_tracker
, y
, dpp
);
425 if (ret
< 0 && ret
!= -ENOENT
) {
426 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
432 int RGWPubSub::Bucket::read_topics(const DoutPrefixProvider
*dpp
, rgw_pubsub_bucket_topics
& result
,
433 RGWObjVersionTracker
*objv_tracker
, optional_yield y
) const
435 const int ret
= bucket
->read_topics(result
, objv_tracker
, y
, dpp
);
436 if (ret
< 0 && ret
!= -ENOENT
) {
437 ldpp_dout(dpp
, 1) << "ERROR: failed to read bucket topics info: ret=" << ret
<< dendl
;
443 int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider
*dpp
, const rgw_pubsub_bucket_topics
& topics
,
444 RGWObjVersionTracker
*objv_tracker
,
445 optional_yield y
) const
447 const int ret
= bucket
->write_topics(topics
, objv_tracker
, y
, dpp
);
449 ldpp_dout(dpp
, 1) << "ERROR: failed to write bucket topics info: ret=" << ret
<< dendl
;
456 int RGWPubSub::get_topic(const DoutPrefixProvider
*dpp
, const std::string
& name
, rgw_pubsub_topic
& result
, optional_yield y
) const
458 rgw_pubsub_topics topics
;
459 const int ret
= read_topics(dpp
, topics
, nullptr, y
);
461 ldpp_dout(dpp
, 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
465 auto iter
= topics
.topics
.find(name
);
466 if (iter
== topics
.topics
.end()) {
467 ldpp_dout(dpp
, 1) << "ERROR: topic not found" << dendl
;
471 result
= iter
->second
;
475 int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider
*dpp
, const std::string
& topic_name
,
476 const rgw::notify::EventTypeList
& events
, optional_yield y
) const {
477 return create_notification(dpp
, topic_name
, events
, std::nullopt
, "", y
);
480 int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider
*dpp
, const std::string
& topic_name
,
481 const rgw::notify::EventTypeList
& events
, OptionalFilter s3_filter
, const std::string
& notif_name
, optional_yield y
) const {
482 rgw_pubsub_topic topic_info
;
484 int ret
= ps
.get_topic(dpp
, topic_name
, topic_info
, y
);
486 ldpp_dout(dpp
, 1) << "ERROR: failed to read topic '" << topic_name
<< "' info: ret=" << ret
<< dendl
;
489 ldpp_dout(dpp
, 20) << "successfully read topic '" << topic_name
<< "' info" << dendl
;
491 RGWObjVersionTracker objv_tracker
;
492 rgw_pubsub_bucket_topics bucket_topics
;
494 ret
= read_topics(dpp
, bucket_topics
, &objv_tracker
, y
);
496 ldpp_dout(dpp
, 1) << "ERROR: failed to read topics from bucket '" <<
497 bucket
->get_name() << "': ret=" << ret
<< dendl
;
500 ldpp_dout(dpp
, 20) << "successfully read " << bucket_topics
.topics
.size() << " topics from bucket '" <<
501 bucket
->get_name() << "'" << dendl
;
503 auto& topic_filter
= bucket_topics
.topics
[topic_name
];
504 topic_filter
.topic
= topic_info
;
505 topic_filter
.events
= events
;
506 topic_filter
.s3_id
= notif_name
;
508 topic_filter
.s3_filter
= *s3_filter
;
511 ret
= write_topics(dpp
, bucket_topics
, &objv_tracker
, y
);
513 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics to bucket '" << bucket
->get_name() << "': ret=" << ret
<< dendl
;
517 ldpp_dout(dpp
, 20) << "successfully wrote " << bucket_topics
.topics
.size() << " topics to bucket '" << bucket
->get_name() << "'" << dendl
;
522 int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider
*dpp
, const std::string
& topic_name
, optional_yield y
) const
524 RGWObjVersionTracker objv_tracker
;
525 rgw_pubsub_bucket_topics bucket_topics
;
527 auto ret
= read_topics(dpp
, bucket_topics
, &objv_tracker
, y
);
529 ldpp_dout(dpp
, 1) << "ERROR: failed to read bucket topics info: ret=" << ret
<< dendl
;
533 if (bucket_topics
.topics
.erase(topic_name
) == 0) {
534 ldpp_dout(dpp
, 1) << "INFO: no need to remove, topic does not exist" << dendl
;
538 if (bucket_topics
.topics
.empty()) {
539 // no more topics - delete the notification object of the bucket
540 ret
= bucket
->remove_topics(&objv_tracker
, y
, dpp
);
541 if (ret
< 0 && ret
!= -ENOENT
) {
542 ldpp_dout(dpp
, 1) << "ERROR: failed to remove bucket topics: ret=" << ret
<< dendl
;
548 // write back the notifications without the deleted one
549 ret
= write_topics(dpp
, bucket_topics
, &objv_tracker
, y
);
551 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
558 int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider
*dpp
, optional_yield y
) const
560 // get all topics on a bucket
561 rgw_pubsub_bucket_topics bucket_topics
;
562 auto ret
= get_topics(dpp
, bucket_topics
, y
);
563 if (ret
< 0 && ret
!= -ENOENT
) {
564 ldpp_dout(dpp
, 1) << "ERROR: failed to get list of topics from bucket '" << bucket
->get_name() << "', ret=" << ret
<< dendl
;
568 // remove all auto-genrated topics
569 for (const auto& topic
: bucket_topics
.topics
) {
570 const auto& topic_name
= topic
.first
;
571 ret
= ps
.remove_topic(dpp
, topic_name
, y
);
572 if (ret
< 0 && ret
!= -ENOENT
) {
573 ldpp_dout(dpp
, 5) << "WARNING: failed to remove auto-generated topic '" << topic_name
<< "', ret=" << ret
<< dendl
;
577 // delete the notification object of the bucket
578 ret
= bucket
->remove_topics(nullptr, y
, dpp
);
579 if (ret
< 0 && ret
!= -ENOENT
) {
580 ldpp_dout(dpp
, 1) << "ERROR: failed to remove bucket topics: ret=" << ret
<< dendl
;
587 int RGWPubSub::create_topic(const DoutPrefixProvider
*dpp
, const std::string
& name
, optional_yield y
) const {
588 return create_topic(dpp
, name
, rgw_pubsub_dest
{}, "", "", y
);
591 int RGWPubSub::create_topic(const DoutPrefixProvider
*dpp
, const std::string
& name
, const rgw_pubsub_dest
& dest
,
592 const std::string
& arn
, const std::string
& opaque_data
, optional_yield y
) const {
593 RGWObjVersionTracker objv_tracker
;
594 rgw_pubsub_topics topics
;
596 int ret
= read_topics(dpp
, topics
, &objv_tracker
, y
);
597 if (ret
< 0 && ret
!= -ENOENT
) {
598 // its not an error if not topics exist, we create one
599 ldpp_dout(dpp
, 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
603 rgw_pubsub_topic
& new_topic
= topics
.topics
[name
];
604 new_topic
.user
= rgw_user("", tenant
);
605 new_topic
.name
= name
;
606 new_topic
.dest
= dest
;
608 new_topic
.opaque_data
= opaque_data
;
610 ret
= write_topics(dpp
, topics
, &objv_tracker
, y
);
612 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
619 int RGWPubSub::remove_topic(const DoutPrefixProvider
*dpp
, const std::string
& name
, optional_yield y
) const
621 RGWObjVersionTracker objv_tracker
;
622 rgw_pubsub_topics topics
;
624 int ret
= read_topics(dpp
, topics
, &objv_tracker
, y
);
625 if (ret
< 0 && ret
!= -ENOENT
) {
626 ldpp_dout(dpp
, 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
628 } else if (ret
== -ENOENT
) {
629 // its not an error if no topics exist, just a no-op
630 ldpp_dout(dpp
, 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret
<< dendl
;
634 topics
.topics
.erase(name
);
636 ret
= write_topics(dpp
, topics
, &objv_tracker
, y
);
638 ldpp_dout(dpp
, 1) << "ERROR: failed to remove topics info: ret=" << ret
<< dendl
;