1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "services/svc_zone.h"
7 #include "rgw_sal_rados.h"
8 #include "rgw_pubsub.h"
12 #include "rgw_pubsub_push.h"
16 #define dout_subsys ceph_subsys_rgw
19 void set_event_id(std::string
& id
, const std::string
& hash
, const utime_t
& ts
) {
21 const auto len
= snprintf(buf
, sizeof(buf
), "%010ld.%06ld.%s", (long)ts
.sec(), (long)ts
.usec(), hash
.c_str());
27 bool rgw_s3_key_filter::decode_xml(XMLObj
* obj
) {
28 XMLObjIter iter
= obj
->find("FilterRule");
31 const auto throw_if_missing
= true;
32 auto prefix_not_set
= true;
33 auto suffix_not_set
= true;
34 auto regex_not_set
= true;
37 while ((o
= iter
.get_next())) {
38 RGWXMLDecoder::decode_xml("Name", name
, o
, throw_if_missing
);
39 if (name
== "prefix" && prefix_not_set
) {
40 prefix_not_set
= false;
41 RGWXMLDecoder::decode_xml("Value", prefix_rule
, o
, throw_if_missing
);
42 } else if (name
== "suffix" && suffix_not_set
) {
43 suffix_not_set
= false;
44 RGWXMLDecoder::decode_xml("Value", suffix_rule
, o
, throw_if_missing
);
45 } else if (name
== "regex" && regex_not_set
) {
46 regex_not_set
= false;
47 RGWXMLDecoder::decode_xml("Value", regex_rule
, o
, throw_if_missing
);
49 throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name
+ "'");
55 void rgw_s3_key_filter::dump_xml(Formatter
*f
) const {
56 if (!prefix_rule
.empty()) {
57 f
->open_object_section("FilterRule");
58 ::encode_xml("Name", "prefix", f
);
59 ::encode_xml("Value", prefix_rule
, f
);
62 if (!suffix_rule
.empty()) {
63 f
->open_object_section("FilterRule");
64 ::encode_xml("Name", "suffix", f
);
65 ::encode_xml("Value", suffix_rule
, f
);
68 if (!regex_rule
.empty()) {
69 f
->open_object_section("FilterRule");
70 ::encode_xml("Name", "regex", f
);
71 ::encode_xml("Value", regex_rule
, f
);
76 bool rgw_s3_key_filter::has_content() const {
77 return !(prefix_rule
.empty() && suffix_rule
.empty() && regex_rule
.empty());
80 bool rgw_s3_key_value_filter::decode_xml(XMLObj
* obj
) {
82 XMLObjIter iter
= obj
->find("FilterRule");
85 const auto throw_if_missing
= true;
90 while ((o
= iter
.get_next())) {
91 RGWXMLDecoder::decode_xml("Name", key
, o
, throw_if_missing
);
92 RGWXMLDecoder::decode_xml("Value", value
, o
, throw_if_missing
);
93 kv
.emplace(key
, value
);
98 void rgw_s3_key_value_filter::dump_xml(Formatter
*f
) const {
99 for (const auto& key_value
: kv
) {
100 f
->open_object_section("FilterRule");
101 ::encode_xml("Name", key_value
.first
, f
);
102 ::encode_xml("Value", key_value
.second
, f
);
107 bool rgw_s3_key_value_filter::has_content() const {
111 bool rgw_s3_filter::decode_xml(XMLObj
* obj
) {
112 RGWXMLDecoder::decode_xml("S3Key", key_filter
, obj
);
113 RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter
, obj
);
114 RGWXMLDecoder::decode_xml("S3Tags", tag_filter
, obj
);
118 void rgw_s3_filter::dump_xml(Formatter
*f
) const {
119 if (key_filter
.has_content()) {
120 ::encode_xml("S3Key", key_filter
, f
);
122 if (metadata_filter
.has_content()) {
123 ::encode_xml("S3Metadata", metadata_filter
, f
);
125 if (tag_filter
.has_content()) {
126 ::encode_xml("S3Tags", tag_filter
, f
);
130 bool rgw_s3_filter::has_content() const {
131 return key_filter
.has_content() ||
132 metadata_filter
.has_content() ||
133 tag_filter
.has_content();
136 bool match(const rgw_s3_key_filter
& filter
, const std::string
& key
) {
137 const auto key_size
= key
.size();
138 const auto prefix_size
= filter
.prefix_rule
.size();
139 if (prefix_size
!= 0) {
140 // prefix rule exists
141 if (prefix_size
> key_size
) {
142 // if prefix is longer than key, we fail
145 if (!std::equal(filter
.prefix_rule
.begin(), filter
.prefix_rule
.end(), key
.begin())) {
149 const auto suffix_size
= filter
.suffix_rule
.size();
150 if (suffix_size
!= 0) {
151 // suffix rule exists
152 if (suffix_size
> key_size
) {
153 // if suffix is longer than key, we fail
156 if (!std::equal(filter
.suffix_rule
.begin(), filter
.suffix_rule
.end(), (key
.end() - suffix_size
))) {
160 if (!filter
.regex_rule
.empty()) {
161 // TODO add regex chaching in the filter
162 const std::regex
base_regex(filter
.regex_rule
);
163 if (!std::regex_match(key
, base_regex
)) {
170 bool match(const rgw_s3_key_value_filter
& filter
, const KeyValueMap
& kv
) {
171 // all filter pairs must exist with the same value in the object's metadata/tags
172 // object metadata/tags may include items not in the filter
173 return std::includes(kv
.begin(), kv
.end(), filter
.kv
.begin(), filter
.kv
.end());
176 bool match(const rgw_s3_key_value_filter
& filter
, const KeyMultiValueMap
& kv
) {
177 // all filter pairs must exist with the same value in the object's metadata/tags
178 // object metadata/tags may include items not in the filter
179 for (auto& filter
: filter
.kv
) {
180 auto result
= kv
.equal_range(filter
.first
);
181 if (std::any_of(result
.first
, result
.second
, [&filter
](const pair
<string
,string
>& p
) { return p
.second
== filter
.second
;}))
189 bool match(const rgw::notify::EventTypeList
& events
, rgw::notify::EventType event
) {
190 // if event list exists, and none of the events in the list matches the event type, filter the message
191 if (!events
.empty() && std::find(events
.begin(), events
.end(), event
) == events
.end()) {
197 void do_decode_xml_obj(rgw::notify::EventTypeList
& l
, const string
& name
, XMLObj
*obj
) {
200 XMLObjIter iter
= obj
->find(name
);
203 while ((o
= iter
.get_next())) {
205 decode_xml_obj(val
, o
);
206 l
.push_back(rgw::notify::from_string(val
));
210 bool rgw_pubsub_s3_notification::decode_xml(XMLObj
*obj
) {
211 const auto throw_if_missing
= true;
212 RGWXMLDecoder::decode_xml("Id", id
, obj
, throw_if_missing
);
214 RGWXMLDecoder::decode_xml("Topic", topic_arn
, obj
, throw_if_missing
);
216 RGWXMLDecoder::decode_xml("Filter", filter
, obj
);
218 do_decode_xml_obj(events
, "Event", obj
);
219 if (events
.empty()) {
220 // if no events are provided, we assume all events
221 events
.push_back(rgw::notify::ObjectCreated
);
222 events
.push_back(rgw::notify::ObjectRemoved
);
227 void rgw_pubsub_s3_notification::dump_xml(Formatter
*f
) const {
228 ::encode_xml("Id", id
, f
);
229 ::encode_xml("Topic", topic_arn
.c_str(), f
);
230 if (filter
.has_content()) {
231 ::encode_xml("Filter", filter
, f
);
233 for (const auto& event
: events
) {
234 ::encode_xml("Event", rgw::notify::to_string(event
), f
);
238 bool rgw_pubsub_s3_notifications::decode_xml(XMLObj
*obj
) {
239 do_decode_xml_obj(list
, "TopicConfiguration", obj
);
243 rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter
& topic_filter
) :
244 id(topic_filter
.s3_id
), events(topic_filter
.events
), topic_arn(topic_filter
.topic
.arn
), filter(topic_filter
.s3_filter
) {}
246 void rgw_pubsub_s3_notifications::dump_xml(Formatter
*f
) const {
247 do_encode_xml("NotificationConfiguration", list
, "TopicConfiguration", f
);
250 void rgw_pubsub_s3_event::dump(Formatter
*f
) const {
251 encode_json("eventVersion", eventVersion
, f
);
252 encode_json("eventSource", eventSource
, f
);
253 encode_json("awsRegion", awsRegion
, f
);
254 utime_t
ut(eventTime
);
255 encode_json("eventTime", ut
, f
);
256 encode_json("eventName", eventName
, f
);
258 Formatter::ObjectSection
s(*f
, "userIdentity");
259 encode_json("principalId", userIdentity
, f
);
262 Formatter::ObjectSection
s(*f
, "requestParameters");
263 encode_json("sourceIPAddress", sourceIPAddress
, f
);
266 Formatter::ObjectSection
s(*f
, "responseElements");
267 encode_json("x-amz-request-id", x_amz_request_id
, f
);
268 encode_json("x-amz-id-2", x_amz_id_2
, f
);
271 Formatter::ObjectSection
s(*f
, "s3");
272 encode_json("s3SchemaVersion", s3SchemaVersion
, f
);
273 encode_json("configurationId", configurationId
, f
);
275 Formatter::ObjectSection
sub_s(*f
, "bucket");
276 encode_json("name", bucket_name
, f
);
278 Formatter::ObjectSection
sub_sub_s(*f
, "ownerIdentity");
279 encode_json("principalId", bucket_ownerIdentity
, f
);
281 encode_json("arn", bucket_arn
, f
);
282 encode_json("id", bucket_id
, f
);
285 Formatter::ObjectSection
sub_s(*f
, "object");
286 encode_json("key", object_key
, f
);
287 encode_json("size", object_size
, f
);
288 encode_json("eTag", object_etag
, f
);
289 encode_json("versionId", object_versionId
, f
);
290 encode_json("sequencer", object_sequencer
, f
);
291 encode_json("metadata", x_meta_map
, f
);
292 encode_json("tags", tags
, f
);
295 encode_json("eventId", id
, f
);
296 encode_json("opaqueData", opaque_data
, f
);
299 void rgw_pubsub_event::dump(Formatter
*f
) const
301 encode_json("id", id
, f
);
302 encode_json("event", event_name
, f
);
303 utime_t
ut(timestamp
);
304 encode_json("timestamp", ut
, f
);
305 encode_json("info", info
, f
);
308 void rgw_pubsub_topic::dump(Formatter
*f
) const
310 encode_json("user", user
, f
);
311 encode_json("name", name
, f
);
312 encode_json("dest", dest
, f
);
313 encode_json("arn", arn
, f
);
314 encode_json("opaqueData", opaque_data
, f
);
317 void rgw_pubsub_topic::dump_xml(Formatter
*f
) const
319 encode_xml("User", user
, f
);
320 encode_xml("Name", name
, f
);
321 encode_xml("EndPoint", dest
, f
);
322 encode_xml("TopicArn", arn
, f
);
323 encode_xml("OpaqueData", opaque_data
, f
);
326 void encode_xml_key_value_entry(const std::string
& key
, const std::string
& value
, Formatter
*f
) {
327 f
->open_object_section("entry");
328 encode_xml("key", key
, f
);
329 encode_xml("value", value
, f
);
330 f
->close_section(); // entry
333 void rgw_pubsub_topic::dump_xml_as_attributes(Formatter
*f
) const
335 f
->open_array_section("Attributes");
336 std::string str_user
;
337 user
.to_str(str_user
);
338 encode_xml_key_value_entry("User", str_user
, f
);
339 encode_xml_key_value_entry("Name", name
, f
);
340 encode_xml_key_value_entry("EndPoint", dest
.to_json_str(), f
);
341 encode_xml_key_value_entry("TopicArn", arn
, f
);
342 encode_xml_key_value_entry("OpaqueData", opaque_data
, f
);
343 f
->close_section(); // Attributes
346 void encode_json(const char *name
, const rgw::notify::EventTypeList
& l
, Formatter
*f
)
348 f
->open_array_section(name
);
349 for (auto iter
= l
.cbegin(); iter
!= l
.cend(); ++iter
) {
350 f
->dump_string("obj", rgw::notify::to_ceph_string(*iter
));
355 void rgw_pubsub_topic_filter::dump(Formatter
*f
) const
357 encode_json("topic", topic
, f
);
358 encode_json("events", events
, f
);
361 void rgw_pubsub_topic_subs::dump(Formatter
*f
) const
363 encode_json("topic", topic
, f
);
364 encode_json("subs", subs
, f
);
367 void rgw_pubsub_bucket_topics::dump(Formatter
*f
) const
369 Formatter::ArraySection
s(*f
, "topics");
370 for (auto& t
: topics
) {
371 encode_json(t
.first
.c_str(), t
.second
, f
);
375 void rgw_pubsub_topics::dump(Formatter
*f
) const
377 Formatter::ArraySection
s(*f
, "topics");
378 for (auto& t
: topics
) {
379 encode_json(t
.first
.c_str(), t
.second
, f
);
383 void rgw_pubsub_topics::dump_xml(Formatter
*f
) const
385 for (auto& t
: topics
) {
386 encode_xml("member", t
.second
.topic
, f
);
390 void rgw_pubsub_sub_dest::dump(Formatter
*f
) const
392 encode_json("bucket_name", bucket_name
, f
);
393 encode_json("oid_prefix", oid_prefix
, f
);
394 encode_json("push_endpoint", push_endpoint
, f
);
395 encode_json("push_endpoint_args", push_endpoint_args
, f
);
396 encode_json("push_endpoint_topic", arn_topic
, f
);
397 encode_json("stored_secret", stored_secret
, f
);
398 encode_json("persistent", persistent
, f
);
401 void rgw_pubsub_sub_dest::dump_xml(Formatter
*f
) const
403 // first 2 members are omitted here since they
404 // dont apply to AWS compliant topics
405 encode_xml("EndpointAddress", push_endpoint
, f
);
406 encode_xml("EndpointArgs", push_endpoint_args
, f
);
407 encode_xml("EndpointTopic", arn_topic
, f
);
408 encode_xml("HasStoredSecret", stored_secret
, f
);
409 encode_xml("Persistent", persistent
, f
);
412 std::string
rgw_pubsub_sub_dest::to_json_str() const
414 // first 2 members are omitted here since they
415 // dont apply to AWS compliant topics
417 f
.open_object_section("");
418 encode_json("EndpointAddress", push_endpoint
, &f
);
419 encode_json("EndpointArgs", push_endpoint_args
, &f
);
420 encode_json("EndpointTopic", arn_topic
, &f
);
421 encode_json("HasStoredSecret", stored_secret
, &f
);
422 encode_json("Persistent", persistent
, &f
);
424 std::stringstream ss
;
429 void rgw_pubsub_sub_config::dump(Formatter
*f
) const
431 encode_json("user", user
, f
);
432 encode_json("name", name
, f
);
433 encode_json("topic", topic
, f
);
434 encode_json("dest", dest
, f
);
435 encode_json("s3_id", s3_id
, f
);
438 RGWPubSub::RGWPubSub(rgw::sal::RadosStore
* _store
, const std::string
& _tenant
) :
441 obj_ctx(store
->svc()->sysobj
->init_obj_ctx()) {
442 get_meta_obj(&meta_obj
);
445 int RGWPubSub::remove(const DoutPrefixProvider
*dpp
,
446 const rgw_raw_obj
& obj
,
447 RGWObjVersionTracker
*objv_tracker
,
450 int ret
= rgw_delete_system_obj(dpp
, store
->svc()->sysobj
, obj
.pool
, obj
.oid
, objv_tracker
, y
);
458 int RGWPubSub::read_topics(rgw_pubsub_topics
*result
, RGWObjVersionTracker
*objv_tracker
)
460 int ret
= read(meta_obj
, result
, objv_tracker
);
462 ldout(store
->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret
<< dendl
;
468 int RGWPubSub::write_topics(const DoutPrefixProvider
*dpp
, const rgw_pubsub_topics
& topics
,
469 RGWObjVersionTracker
*objv_tracker
, optional_yield y
)
471 int ret
= write(dpp
, meta_obj
, topics
, objv_tracker
, y
);
472 if (ret
< 0 && ret
!= -ENOENT
) {
473 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
479 int RGWPubSub::get_topics(rgw_pubsub_topics
*result
)
481 return read_topics(result
, nullptr);
484 int RGWPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics
*result
, RGWObjVersionTracker
*objv_tracker
)
486 int ret
= ps
->read(bucket_meta_obj
, result
, objv_tracker
);
487 if (ret
< 0 && ret
!= -ENOENT
) {
488 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret
<< dendl
;
494 int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider
*dpp
, const rgw_pubsub_bucket_topics
& topics
,
495 RGWObjVersionTracker
*objv_tracker
,
498 int ret
= ps
->write(dpp
, bucket_meta_obj
, topics
, objv_tracker
, y
);
500 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret
<< dendl
;
507 int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics
*result
)
509 return read_topics(result
, nullptr);
512 int RGWPubSub::get_topic(const string
& name
, rgw_pubsub_topic_subs
*result
)
514 rgw_pubsub_topics topics
;
515 int ret
= get_topics(&topics
);
517 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
521 auto iter
= topics
.topics
.find(name
);
522 if (iter
== topics
.topics
.end()) {
523 ldout(store
->ctx(), 1) << "ERROR: topic not found" << dendl
;
527 *result
= iter
->second
;
531 int RGWPubSub::get_topic(const string
& name
, rgw_pubsub_topic
*result
)
533 rgw_pubsub_topics topics
;
534 int ret
= get_topics(&topics
);
536 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
540 auto iter
= topics
.topics
.find(name
);
541 if (iter
== topics
.topics
.end()) {
542 ldout(store
->ctx(), 1) << "ERROR: topic not found" << dendl
;
546 *result
= iter
->second
.topic
;
550 int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider
*dpp
, const string
& topic_name
, const rgw::notify::EventTypeList
& events
, optional_yield y
) {
551 return create_notification(dpp
, topic_name
, events
, std::nullopt
, "", y
);
554 int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider
*dpp
, const string
& topic_name
,const rgw::notify::EventTypeList
& events
, OptionalFilter s3_filter
, const std::string
& notif_name
, optional_yield y
) {
555 rgw_pubsub_topic_subs topic_info
;
557 int ret
= ps
->get_topic(topic_name
, &topic_info
);
559 ldpp_dout(dpp
, 1) << "ERROR: failed to read topic '" << topic_name
<< "' info: ret=" << ret
<< dendl
;
562 ldpp_dout(dpp
, 20) << "successfully read topic '" << topic_name
<< "' info" << dendl
;
564 RGWObjVersionTracker objv_tracker
;
565 rgw_pubsub_bucket_topics bucket_topics
;
567 ret
= read_topics(&bucket_topics
, &objv_tracker
);
569 ldpp_dout(dpp
, 1) << "ERROR: failed to read topics from bucket '" <<
570 bucket
.name
<< "': ret=" << ret
<< dendl
;
573 ldpp_dout(dpp
, 20) << "successfully read " << bucket_topics
.topics
.size() << " topics from bucket '" <<
574 bucket
.name
<< "'" << dendl
;
576 auto& topic_filter
= bucket_topics
.topics
[topic_name
];
577 topic_filter
.topic
= topic_info
.topic
;
578 topic_filter
.events
= events
;
579 topic_filter
.s3_id
= notif_name
;
581 topic_filter
.s3_filter
= *s3_filter
;
584 ret
= write_topics(dpp
, bucket_topics
, &objv_tracker
, y
);
586 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics to bucket '" << bucket
.name
<< "': ret=" << ret
<< dendl
;
590 ldpp_dout(dpp
, 20) << "successfully wrote " << bucket_topics
.topics
.size() << " topics to bucket '" << bucket
.name
<< "'" << dendl
;
595 int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider
*dpp
, const string
& topic_name
, optional_yield y
)
597 rgw_pubsub_topic_subs topic_info
;
599 int ret
= ps
->get_topic(topic_name
, &topic_info
);
601 ldpp_dout(dpp
, 1) << "ERROR: failed to read topic info: ret=" << ret
<< dendl
;
605 RGWObjVersionTracker objv_tracker
;
606 rgw_pubsub_bucket_topics bucket_topics
;
608 ret
= read_topics(&bucket_topics
, &objv_tracker
);
610 ldpp_dout(dpp
, 1) << "ERROR: failed to read bucket topics info: ret=" << ret
<< dendl
;
614 bucket_topics
.topics
.erase(topic_name
);
616 if (bucket_topics
.topics
.empty()) {
617 // no more topics - delete the notification object of the bucket
618 ret
= ps
->remove(dpp
, bucket_meta_obj
, &objv_tracker
, y
);
619 if (ret
< 0 && ret
!= -ENOENT
) {
620 ldpp_dout(dpp
, 1) << "ERROR: failed to remove bucket topics: ret=" << ret
<< dendl
;
626 // write back the notifications without the deleted one
627 ret
= write_topics(dpp
, bucket_topics
, &objv_tracker
, y
);
629 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
636 int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider
*dpp
, optional_yield y
)
638 // get all topics on a bucket
639 rgw_pubsub_bucket_topics bucket_topics
;
640 auto ret
= get_topics(&bucket_topics
);
641 if (ret
< 0 && ret
!= -ENOENT
) {
642 ldpp_dout(dpp
, 1) << "ERROR: failed to get list of topics from bucket '" << bucket
.name
<< "', ret=" << ret
<< dendl
;
646 // remove all auto-genrated topics
647 for (const auto& topic
: bucket_topics
.topics
) {
648 const auto& topic_name
= topic
.first
;
649 ret
= ps
->remove_topic(dpp
, topic_name
, y
);
650 if (ret
< 0 && ret
!= -ENOENT
) {
651 ldpp_dout(dpp
, 5) << "WARNING: failed to remove auto-generated topic '" << topic_name
<< "', ret=" << ret
<< dendl
;
655 // delete the notification object of the bucket
656 ret
= ps
->remove(dpp
, bucket_meta_obj
, nullptr, y
);
657 if (ret
< 0 && ret
!= -ENOENT
) {
658 ldpp_dout(dpp
, 1) << "ERROR: failed to remove bucket topics: ret=" << ret
<< dendl
;
665 int RGWPubSub::create_topic(const DoutPrefixProvider
*dpp
, const string
& name
, optional_yield y
) {
666 return create_topic(dpp
, name
, rgw_pubsub_sub_dest(), "", "", y
);
669 int RGWPubSub::create_topic(const DoutPrefixProvider
*dpp
, const string
& name
, const rgw_pubsub_sub_dest
& dest
, const std::string
& arn
, const std::string
& opaque_data
, optional_yield y
) {
670 RGWObjVersionTracker objv_tracker
;
671 rgw_pubsub_topics topics
;
673 int ret
= read_topics(&topics
, &objv_tracker
);
674 if (ret
< 0 && ret
!= -ENOENT
) {
675 // its not an error if not topics exist, we create one
676 ldpp_dout(dpp
, 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
680 rgw_pubsub_topic_subs
& new_topic
= topics
.topics
[name
];
681 new_topic
.topic
.user
= rgw_user("", tenant
);
682 new_topic
.topic
.name
= name
;
683 new_topic
.topic
.dest
= dest
;
684 new_topic
.topic
.arn
= arn
;
685 new_topic
.topic
.opaque_data
= opaque_data
;
687 ret
= write_topics(dpp
, topics
, &objv_tracker
, y
);
689 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
696 int RGWPubSub::remove_topic(const DoutPrefixProvider
*dpp
, const string
& name
, optional_yield y
)
698 RGWObjVersionTracker objv_tracker
;
699 rgw_pubsub_topics topics
;
701 int ret
= read_topics(&topics
, &objv_tracker
);
702 if (ret
< 0 && ret
!= -ENOENT
) {
703 ldpp_dout(dpp
, 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
705 } else if (ret
== -ENOENT
) {
706 // its not an error if no topics exist, just a no-op
707 ldpp_dout(dpp
, 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret
<< dendl
;
711 topics
.topics
.erase(name
);
713 ret
= write_topics(dpp
, topics
, &objv_tracker
, y
);
715 ldpp_dout(dpp
, 1) << "ERROR: failed to remove topics info: ret=" << ret
<< dendl
;
722 int RGWPubSub::Sub::read_sub(rgw_pubsub_sub_config
*result
, RGWObjVersionTracker
*objv_tracker
)
724 int ret
= ps
->read(sub_meta_obj
, result
, objv_tracker
);
725 if (ret
< 0 && ret
!= -ENOENT
) {
726 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret
<< dendl
;
732 int RGWPubSub::Sub::write_sub(const DoutPrefixProvider
*dpp
,
733 const rgw_pubsub_sub_config
& sub_conf
,
734 RGWObjVersionTracker
*objv_tracker
,
737 int ret
= ps
->write(dpp
, sub_meta_obj
, sub_conf
, objv_tracker
, y
);
739 ldpp_dout(dpp
, 1) << "ERROR: failed to write subscription info: ret=" << ret
<< dendl
;
746 int RGWPubSub::Sub::remove_sub(const DoutPrefixProvider
*dpp
, RGWObjVersionTracker
*objv_tracker
,
749 int ret
= ps
->remove(dpp
, sub_meta_obj
, objv_tracker
, y
);
751 ldpp_dout(dpp
, 1) << "ERROR: failed to remove subscription info: ret=" << ret
<< dendl
;
758 int RGWPubSub::Sub::get_conf(rgw_pubsub_sub_config
*result
)
760 return read_sub(result
, nullptr);
763 int RGWPubSub::Sub::subscribe(const DoutPrefixProvider
*dpp
, const string
& topic
, const rgw_pubsub_sub_dest
& dest
, optional_yield y
, const std::string
& s3_id
)
765 RGWObjVersionTracker objv_tracker
;
766 rgw_pubsub_topics topics
;
768 int ret
= ps
->read_topics(&topics
, &objv_tracker
);
770 ldpp_dout(dpp
, 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
771 return ret
!= -ENOENT
? ret
: -EINVAL
;
774 auto iter
= topics
.topics
.find(topic
);
775 if (iter
== topics
.topics
.end()) {
776 ldpp_dout(dpp
, 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl
;
780 auto& t
= iter
->second
;
782 rgw_pubsub_sub_config sub_conf
;
784 sub_conf
.user
= rgw_user("", ps
->tenant
);
786 sub_conf
.topic
= topic
;
787 sub_conf
.dest
= dest
;
788 sub_conf
.s3_id
= s3_id
;
792 ret
= ps
->write_topics(dpp
, topics
, &objv_tracker
, y
);
794 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
798 ret
= write_sub(dpp
, sub_conf
, nullptr, y
);
800 ldpp_dout(dpp
, 1) << "ERROR: failed to write subscription info: ret=" << ret
<< dendl
;
806 int RGWPubSub::Sub::unsubscribe(const DoutPrefixProvider
*dpp
, const string
& _topic
, optional_yield y
)
808 string topic
= _topic
;
809 RGWObjVersionTracker sobjv_tracker
;
812 rgw_pubsub_sub_config sub_conf
;
813 int ret
= read_sub(&sub_conf
, &sobjv_tracker
);
815 ldpp_dout(dpp
, 1) << "ERROR: failed to read subscription info: ret=" << ret
<< dendl
;
818 topic
= sub_conf
.topic
;
821 RGWObjVersionTracker objv_tracker
;
822 rgw_pubsub_topics topics
;
824 int ret
= ps
->read_topics(&topics
, &objv_tracker
);
826 // not an error - could be that topic was already deleted
827 ldpp_dout(dpp
, 10) << "WARNING: failed to read topics info: ret=" << ret
<< dendl
;
829 auto iter
= topics
.topics
.find(topic
);
830 if (iter
!= topics
.topics
.end()) {
831 auto& t
= iter
->second
;
835 ret
= ps
->write_topics(dpp
, topics
, &objv_tracker
, y
);
837 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
843 ret
= remove_sub(dpp
, &sobjv_tracker
, y
);
845 ldpp_dout(dpp
, 1) << "ERROR: failed to delete subscription info: ret=" << ret
<< dendl
;
851 template<typename EventType
>
852 void RGWPubSub::SubWithEvents
<EventType
>::list_events_result::dump(Formatter
*f
) const
854 encode_json("next_marker", next_marker
, f
);
855 encode_json("is_truncated", is_truncated
, f
);
857 Formatter::ArraySection
s(*f
, EventType::json_type_plural
);
858 for (auto& event
: events
) {
859 encode_json("", event
, f
);
863 template<typename EventType
>
864 int RGWPubSub::SubWithEvents
<EventType
>::list_events(const DoutPrefixProvider
*dpp
, const string
& marker
, int max_events
)
866 RGWRados
*store
= ps
->store
->getRados();
867 rgw_pubsub_sub_config sub_conf
;
868 int ret
= get_conf(&sub_conf
);
870 ldpp_dout(dpp
, 1) << "ERROR: failed to read sub config: ret=" << ret
<< dendl
;
874 RGWBucketInfo bucket_info
;
876 ret
= store
->get_bucket_info(&store
->svc
, tenant
, sub_conf
.dest
.bucket_name
, bucket_info
, nullptr, null_yield
, nullptr);
877 if (ret
== -ENOENT
) {
878 list
.is_truncated
= false;
882 ldpp_dout(dpp
, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf
.dest
.bucket_name
<< " ret=" << ret
<< dendl
;
886 RGWRados::Bucket
target(store
, bucket_info
);
887 RGWRados::Bucket::List
list_op(&target
);
889 list_op
.params
.prefix
= sub_conf
.dest
.oid_prefix
;
890 list_op
.params
.marker
= marker
;
892 std::vector
<rgw_bucket_dir_entry
> objs
;
894 ret
= list_op
.list_objects(dpp
, max_events
, &objs
, nullptr, &list
.is_truncated
, null_yield
);
896 ldpp_dout(dpp
, 1) << "ERROR: failed to list bucket: bucket=" << sub_conf
.dest
.bucket_name
<< " ret=" << ret
<< dendl
;
899 if (list
.is_truncated
) {
900 list
.next_marker
= list_op
.get_next_marker().name
;
903 for (auto& obj
: objs
) {
906 bl64
.append(obj
.meta
.user_data
);
908 bl
.decode_base64(bl64
);
909 } catch (buffer::error
& err
) {
910 ldpp_dout(dpp
, 1) << "ERROR: failed to event (not a valid base64)" << dendl
;
915 auto iter
= bl
.cbegin();
918 } catch (buffer::error
& err
) {
919 ldpp_dout(dpp
, 1) << "ERROR: failed to decode event" << dendl
;
923 list
.events
.push_back(event
);
928 template<typename EventType
>
929 int RGWPubSub::SubWithEvents
<EventType
>::remove_event(const DoutPrefixProvider
*dpp
, const string
& event_id
)
931 rgw::sal::RadosStore
* store
= ps
->store
;
932 rgw_pubsub_sub_config sub_conf
;
933 int ret
= get_conf(&sub_conf
);
935 ldpp_dout(dpp
, 1) << "ERROR: failed to read sub config: ret=" << ret
<< dendl
;
939 RGWBucketInfo bucket_info
;
941 ret
= store
->getRados()->get_bucket_info(store
->svc(), tenant
, sub_conf
.dest
.bucket_name
, bucket_info
, nullptr, null_yield
, nullptr);
943 ldpp_dout(dpp
, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf
.dest
.bucket_name
<< " ret=" << ret
<< dendl
;
947 rgw_bucket
& bucket
= bucket_info
.bucket
;
949 RGWObjectCtx
obj_ctx(store
);
950 rgw_obj
obj(bucket
, sub_conf
.dest
.oid_prefix
+ event_id
);
952 obj_ctx
.set_atomic(obj
);
954 RGWRados::Object
del_target(store
->getRados(), bucket_info
, obj_ctx
, obj
);
955 RGWRados::Object::Delete
del_op(&del_target
);
957 del_op
.params
.bucket_owner
= bucket_info
.owner
;
958 del_op
.params
.versioning_status
= bucket_info
.versioning_status();
960 ret
= del_op
.delete_obj(null_yield
, dpp
);
962 ldpp_dout(dpp
, 1) << "ERROR: failed to remove event (obj=" << obj
<< "): ret=" << ret
<< dendl
;
967 void RGWPubSub::get_meta_obj(rgw_raw_obj
*obj
) const {
968 *obj
= rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, meta_oid());
971 void RGWPubSub::get_bucket_meta_obj(const rgw_bucket
& bucket
, rgw_raw_obj
*obj
) const {
972 *obj
= rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, bucket_meta_oid(bucket
));
975 void RGWPubSub::get_sub_meta_obj(const string
& name
, rgw_raw_obj
*obj
) const {
976 *obj
= rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, sub_meta_oid(name
));
979 template<typename EventType
>
980 void RGWPubSub::SubWithEvents
<EventType
>::dump(Formatter
* f
) const {
984 // explicit instantiation for the only two possible types
985 // no need to move implementation to header
986 template class RGWPubSub::SubWithEvents
<rgw_pubsub_event
>;
987 template class RGWPubSub::SubWithEvents
<rgw_pubsub_s3_event
>;