1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "services/svc_zone.h"
7 #include "rgw_sal_rados.h"
8 #include "rgw_pubsub.h"
12 #include "rgw_pubsub_push.h"
16 #define dout_subsys ceph_subsys_rgw
18 void set_event_id(std::string
& id
, const std::string
& hash
, const utime_t
& ts
) {
20 const auto len
= snprintf(buf
, sizeof(buf
), "%010ld.%06ld.%s", (long)ts
.sec(), (long)ts
.usec(), hash
.c_str());
26 bool rgw_s3_key_filter::decode_xml(XMLObj
* obj
) {
27 XMLObjIter iter
= obj
->find("FilterRule");
30 const auto throw_if_missing
= true;
31 auto prefix_not_set
= true;
32 auto suffix_not_set
= true;
33 auto regex_not_set
= true;
36 while ((o
= iter
.get_next())) {
37 RGWXMLDecoder::decode_xml("Name", name
, o
, throw_if_missing
);
38 if (name
== "prefix" && prefix_not_set
) {
39 prefix_not_set
= false;
40 RGWXMLDecoder::decode_xml("Value", prefix_rule
, o
, throw_if_missing
);
41 } else if (name
== "suffix" && suffix_not_set
) {
42 suffix_not_set
= false;
43 RGWXMLDecoder::decode_xml("Value", suffix_rule
, o
, throw_if_missing
);
44 } else if (name
== "regex" && regex_not_set
) {
45 regex_not_set
= false;
46 RGWXMLDecoder::decode_xml("Value", regex_rule
, o
, throw_if_missing
);
48 throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name
+ "'");
54 void rgw_s3_key_filter::dump_xml(Formatter
*f
) const {
55 if (!prefix_rule
.empty()) {
56 f
->open_object_section("FilterRule");
57 ::encode_xml("Name", "prefix", f
);
58 ::encode_xml("Value", prefix_rule
, f
);
61 if (!suffix_rule
.empty()) {
62 f
->open_object_section("FilterRule");
63 ::encode_xml("Name", "suffix", f
);
64 ::encode_xml("Value", suffix_rule
, f
);
67 if (!regex_rule
.empty()) {
68 f
->open_object_section("FilterRule");
69 ::encode_xml("Name", "regex", f
);
70 ::encode_xml("Value", regex_rule
, f
);
75 bool rgw_s3_key_filter::has_content() const {
76 return !(prefix_rule
.empty() && suffix_rule
.empty() && regex_rule
.empty());
79 bool rgw_s3_key_value_filter::decode_xml(XMLObj
* obj
) {
81 XMLObjIter iter
= obj
->find("FilterRule");
84 const auto throw_if_missing
= true;
89 while ((o
= iter
.get_next())) {
90 RGWXMLDecoder::decode_xml("Name", key
, o
, throw_if_missing
);
91 RGWXMLDecoder::decode_xml("Value", value
, o
, throw_if_missing
);
92 kv
.emplace(key
, value
);
97 void rgw_s3_key_value_filter::dump_xml(Formatter
*f
) const {
98 for (const auto& key_value
: kv
) {
99 f
->open_object_section("FilterRule");
100 ::encode_xml("Name", key_value
.first
, f
);
101 ::encode_xml("Value", key_value
.second
, f
);
106 bool rgw_s3_key_value_filter::has_content() const {
110 bool rgw_s3_filter::decode_xml(XMLObj
* obj
) {
111 RGWXMLDecoder::decode_xml("S3Key", key_filter
, obj
);
112 RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter
, obj
);
113 RGWXMLDecoder::decode_xml("S3Tags", tag_filter
, obj
);
117 void rgw_s3_filter::dump_xml(Formatter
*f
) const {
118 if (key_filter
.has_content()) {
119 ::encode_xml("S3Key", key_filter
, f
);
121 if (metadata_filter
.has_content()) {
122 ::encode_xml("S3Metadata", metadata_filter
, f
);
124 if (tag_filter
.has_content()) {
125 ::encode_xml("S3Tags", tag_filter
, f
);
129 bool rgw_s3_filter::has_content() const {
130 return key_filter
.has_content() ||
131 metadata_filter
.has_content() ||
132 tag_filter
.has_content();
135 bool match(const rgw_s3_key_filter
& filter
, const std::string
& key
) {
136 const auto key_size
= key
.size();
137 const auto prefix_size
= filter
.prefix_rule
.size();
138 if (prefix_size
!= 0) {
139 // prefix rule exists
140 if (prefix_size
> key_size
) {
141 // if prefix is longer than key, we fail
144 if (!std::equal(filter
.prefix_rule
.begin(), filter
.prefix_rule
.end(), key
.begin())) {
148 const auto suffix_size
= filter
.suffix_rule
.size();
149 if (suffix_size
!= 0) {
150 // suffix rule exists
151 if (suffix_size
> key_size
) {
152 // if suffix is longer than key, we fail
155 if (!std::equal(filter
.suffix_rule
.begin(), filter
.suffix_rule
.end(), (key
.end() - suffix_size
))) {
159 if (!filter
.regex_rule
.empty()) {
160 // TODO add regex chaching in the filter
161 const std::regex
base_regex(filter
.regex_rule
);
162 if (!std::regex_match(key
, base_regex
)) {
169 bool match(const rgw_s3_key_value_filter
& filter
, const KeyValueMap
& kv
) {
170 // all filter pairs must exist with the same value in the object's metadata/tags
171 // object metadata/tags may include items not in the filter
172 return std::includes(kv
.begin(), kv
.end(), filter
.kv
.begin(), filter
.kv
.end());
175 bool match(const rgw::notify::EventTypeList
& events
, rgw::notify::EventType event
) {
176 // if event list exists, and none of the events in the list matches the event type, filter the message
177 if (!events
.empty() && std::find(events
.begin(), events
.end(), event
) == events
.end()) {
183 void do_decode_xml_obj(rgw::notify::EventTypeList
& l
, const string
& name
, XMLObj
*obj
) {
186 XMLObjIter iter
= obj
->find(name
);
189 while ((o
= iter
.get_next())) {
191 decode_xml_obj(val
, o
);
192 l
.push_back(rgw::notify::from_string(val
));
196 bool rgw_pubsub_s3_notification::decode_xml(XMLObj
*obj
) {
197 const auto throw_if_missing
= true;
198 RGWXMLDecoder::decode_xml("Id", id
, obj
, throw_if_missing
);
200 RGWXMLDecoder::decode_xml("Topic", topic_arn
, obj
, throw_if_missing
);
202 RGWXMLDecoder::decode_xml("Filter", filter
, obj
);
204 do_decode_xml_obj(events
, "Event", obj
);
205 if (events
.empty()) {
206 // if no events are provided, we assume all events
207 events
.push_back(rgw::notify::ObjectCreated
);
208 events
.push_back(rgw::notify::ObjectRemoved
);
213 void rgw_pubsub_s3_notification::dump_xml(Formatter
*f
) const {
214 ::encode_xml("Id", id
, f
);
215 ::encode_xml("Topic", topic_arn
.c_str(), f
);
216 if (filter
.has_content()) {
217 ::encode_xml("Filter", filter
, f
);
219 for (const auto& event
: events
) {
220 ::encode_xml("Event", rgw::notify::to_string(event
), f
);
224 bool rgw_pubsub_s3_notifications::decode_xml(XMLObj
*obj
) {
225 do_decode_xml_obj(list
, "TopicConfiguration", obj
);
227 throw RGWXMLDecoder::err("at least one 'TopicConfiguration' must exist");
232 rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter
& topic_filter
) :
233 id(topic_filter
.s3_id
), events(topic_filter
.events
), topic_arn(topic_filter
.topic
.arn
), filter(topic_filter
.s3_filter
) {}
235 void rgw_pubsub_s3_notifications::dump_xml(Formatter
*f
) const {
236 do_encode_xml("NotificationConfiguration", list
, "TopicConfiguration", f
);
239 void rgw_pubsub_s3_event::dump(Formatter
*f
) const {
240 encode_json("eventVersion", eventVersion
, f
);
241 encode_json("eventSource", eventSource
, f
);
242 encode_json("awsRegion", awsRegion
, f
);
243 utime_t
ut(eventTime
);
244 encode_json("eventTime", ut
, f
);
245 encode_json("eventName", eventName
, f
);
247 Formatter::ObjectSection
s(*f
, "userIdentity");
248 encode_json("principalId", userIdentity
, f
);
251 Formatter::ObjectSection
s(*f
, "requestParameters");
252 encode_json("sourceIPAddress", sourceIPAddress
, f
);
255 Formatter::ObjectSection
s(*f
, "responseElements");
256 encode_json("x-amz-request-id", x_amz_request_id
, f
);
257 encode_json("x-amz-id-2", x_amz_id_2
, f
);
260 Formatter::ObjectSection
s(*f
, "s3");
261 encode_json("s3SchemaVersion", s3SchemaVersion
, f
);
262 encode_json("configurationId", configurationId
, f
);
264 Formatter::ObjectSection
sub_s(*f
, "bucket");
265 encode_json("name", bucket_name
, f
);
267 Formatter::ObjectSection
sub_sub_s(*f
, "ownerIdentity");
268 encode_json("principalId", bucket_ownerIdentity
, f
);
270 encode_json("arn", bucket_arn
, f
);
271 encode_json("id", bucket_id
, f
);
274 Formatter::ObjectSection
sub_s(*f
, "object");
275 encode_json("key", object_key
, f
);
276 encode_json("size", object_size
, f
);
277 encode_json("etag", object_etag
, f
);
278 encode_json("versionId", object_versionId
, f
);
279 encode_json("sequencer", object_sequencer
, f
);
280 encode_json("metadata", x_meta_map
, f
);
281 encode_json("tags", tags
, f
);
284 encode_json("eventId", id
, f
);
285 encode_json("opaqueData", opaque_data
, f
);
288 void rgw_pubsub_event::dump(Formatter
*f
) const
290 encode_json("id", id
, f
);
291 encode_json("event", event_name
, f
);
292 utime_t
ut(timestamp
);
293 encode_json("timestamp", ut
, f
);
294 encode_json("info", info
, f
);
297 void rgw_pubsub_topic::dump(Formatter
*f
) const
299 encode_json("user", user
, f
);
300 encode_json("name", name
, f
);
301 encode_json("dest", dest
, f
);
302 encode_json("arn", arn
, f
);
303 encode_json("opaqueData", opaque_data
, f
);
306 void rgw_pubsub_topic::dump_xml(Formatter
*f
) const
308 encode_xml("User", user
, f
);
309 encode_xml("Name", name
, f
);
310 encode_xml("EndPoint", dest
, f
);
311 encode_xml("TopicArn", arn
, f
);
312 encode_xml("OpaqueData", opaque_data
, f
);
315 void encode_xml_key_value_entry(const std::string
& key
, const std::string
& value
, Formatter
*f
) {
316 f
->open_object_section("entry");
317 encode_xml("key", key
, f
);
318 encode_xml("value", value
, f
);
319 f
->close_section(); // entry
322 void rgw_pubsub_topic::dump_xml_as_attributes(Formatter
*f
) const
324 f
->open_array_section("Attributes");
325 std::string str_user
;
326 user
.to_str(str_user
);
327 encode_xml_key_value_entry("User", str_user
, f
);
328 encode_xml_key_value_entry("Name", name
, f
);
329 encode_xml_key_value_entry("EndPoint", dest
.to_json_str(), f
);
330 encode_xml_key_value_entry("TopicArn", arn
, f
);
331 encode_xml_key_value_entry("OpaqueData", opaque_data
, f
);
332 f
->close_section(); // Attributes
335 void encode_json(const char *name
, const rgw::notify::EventTypeList
& l
, Formatter
*f
)
337 f
->open_array_section(name
);
338 for (auto iter
= l
.cbegin(); iter
!= l
.cend(); ++iter
) {
339 f
->dump_string("obj", rgw::notify::to_ceph_string(*iter
));
344 void rgw_pubsub_topic_filter::dump(Formatter
*f
) const
346 encode_json("topic", topic
, f
);
347 encode_json("events", events
, f
);
350 void rgw_pubsub_topic_subs::dump(Formatter
*f
) const
352 encode_json("topic", topic
, f
);
353 encode_json("subs", subs
, f
);
356 void rgw_pubsub_bucket_topics::dump(Formatter
*f
) const
358 Formatter::ArraySection
s(*f
, "topics");
359 for (auto& t
: topics
) {
360 encode_json(t
.first
.c_str(), t
.second
, f
);
364 void rgw_pubsub_topics::dump(Formatter
*f
) const
366 Formatter::ArraySection
s(*f
, "topics");
367 for (auto& t
: topics
) {
368 encode_json(t
.first
.c_str(), t
.second
, f
);
372 void rgw_pubsub_topics::dump_xml(Formatter
*f
) const
374 for (auto& t
: topics
) {
375 encode_xml("member", t
.second
.topic
, f
);
379 void rgw_pubsub_sub_dest::dump(Formatter
*f
) const
381 encode_json("bucket_name", bucket_name
, f
);
382 encode_json("oid_prefix", oid_prefix
, f
);
383 encode_json("push_endpoint", push_endpoint
, f
);
384 encode_json("push_endpoint_args", push_endpoint_args
, f
);
385 encode_json("push_endpoint_topic", arn_topic
, f
);
386 encode_json("stored_secret", stored_secret
, f
);
387 encode_json("persistent", persistent
, f
);
390 void rgw_pubsub_sub_dest::dump_xml(Formatter
*f
) const
392 // first 2 members are omitted here since they
393 // dont apply to AWS compliant topics
394 encode_xml("EndpointAddress", push_endpoint
, f
);
395 encode_xml("EndpointArgs", push_endpoint_args
, f
);
396 encode_xml("EndpointTopic", arn_topic
, f
);
397 encode_xml("HasStoredSecret", stored_secret
, f
);
398 encode_xml("Persistent", persistent
, f
);
401 std::string
rgw_pubsub_sub_dest::to_json_str() const
403 // first 2 members are omitted here since they
404 // dont apply to AWS compliant topics
406 f
.open_object_section("");
407 encode_json("EndpointAddress", push_endpoint
, &f
);
408 encode_json("EndpointArgs", push_endpoint_args
, &f
);
409 encode_json("EndpointTopic", arn_topic
, &f
);
410 encode_json("HasStoredSecret", stored_secret
, &f
);
411 encode_json("Persistent", persistent
, &f
);
413 std::stringstream ss
;
418 void rgw_pubsub_sub_config::dump(Formatter
*f
) const
420 encode_json("user", user
, f
);
421 encode_json("name", name
, f
);
422 encode_json("topic", topic
, f
);
423 encode_json("dest", dest
, f
);
424 encode_json("s3_id", s3_id
, f
);
427 RGWPubSub::RGWPubSub(rgw::sal::RGWRadosStore
* _store
, const std::string
& _tenant
) :
430 obj_ctx(store
->svc()->sysobj
->init_obj_ctx()) {
431 get_meta_obj(&meta_obj
);
434 int RGWPubSub::remove(const DoutPrefixProvider
*dpp
,
435 const rgw_raw_obj
& obj
,
436 RGWObjVersionTracker
*objv_tracker
,
439 int ret
= rgw_delete_system_obj(dpp
, store
->svc()->sysobj
, obj
.pool
, obj
.oid
, objv_tracker
, y
);
447 int RGWPubSub::read_topics(rgw_pubsub_topics
*result
, RGWObjVersionTracker
*objv_tracker
)
449 int ret
= read(meta_obj
, result
, objv_tracker
);
451 ldout(store
->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret
<< dendl
;
457 int RGWPubSub::write_topics(const DoutPrefixProvider
*dpp
, const rgw_pubsub_topics
& topics
,
458 RGWObjVersionTracker
*objv_tracker
, optional_yield y
)
460 int ret
= write(dpp
, meta_obj
, topics
, objv_tracker
, y
);
461 if (ret
< 0 && ret
!= -ENOENT
) {
462 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
468 int RGWPubSub::get_topics(rgw_pubsub_topics
*result
)
470 return read_topics(result
, nullptr);
473 int RGWPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics
*result
, RGWObjVersionTracker
*objv_tracker
)
475 int ret
= ps
->read(bucket_meta_obj
, result
, objv_tracker
);
476 if (ret
< 0 && ret
!= -ENOENT
) {
477 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret
<< dendl
;
483 int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider
*dpp
, const rgw_pubsub_bucket_topics
& topics
,
484 RGWObjVersionTracker
*objv_tracker
,
487 int ret
= ps
->write(dpp
, bucket_meta_obj
, topics
, objv_tracker
, y
);
489 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret
<< dendl
;
496 int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics
*result
)
498 return read_topics(result
, nullptr);
501 int RGWPubSub::get_topic(const string
& name
, rgw_pubsub_topic_subs
*result
)
503 rgw_pubsub_topics topics
;
504 int ret
= get_topics(&topics
);
506 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
510 auto iter
= topics
.topics
.find(name
);
511 if (iter
== topics
.topics
.end()) {
512 ldout(store
->ctx(), 1) << "ERROR: topic not found" << dendl
;
516 *result
= iter
->second
;
520 int RGWPubSub::get_topic(const string
& name
, rgw_pubsub_topic
*result
)
522 rgw_pubsub_topics topics
;
523 int ret
= get_topics(&topics
);
525 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
529 auto iter
= topics
.topics
.find(name
);
530 if (iter
== topics
.topics
.end()) {
531 ldout(store
->ctx(), 1) << "ERROR: topic not found" << dendl
;
535 *result
= iter
->second
.topic
;
539 int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider
*dpp
, const string
& topic_name
, const rgw::notify::EventTypeList
& events
, optional_yield y
) {
540 return create_notification(dpp
, topic_name
, events
, std::nullopt
, "", y
);
543 int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider
*dpp
, const string
& topic_name
,const rgw::notify::EventTypeList
& events
, OptionalFilter s3_filter
, const std::string
& notif_name
, optional_yield y
) {
544 rgw_pubsub_topic_subs topic_info
;
546 int ret
= ps
->get_topic(topic_name
, &topic_info
);
548 ldpp_dout(dpp
, 1) << "ERROR: failed to read topic '" << topic_name
<< "' info: ret=" << ret
<< dendl
;
551 ldpp_dout(dpp
, 20) << "successfully read topic '" << topic_name
<< "' info" << dendl
;
553 RGWObjVersionTracker objv_tracker
;
554 rgw_pubsub_bucket_topics bucket_topics
;
556 ret
= read_topics(&bucket_topics
, &objv_tracker
);
558 ldpp_dout(dpp
, 1) << "ERROR: failed to read topics from bucket '" <<
559 bucket
.name
<< "': ret=" << ret
<< dendl
;
562 ldpp_dout(dpp
, 20) << "successfully read " << bucket_topics
.topics
.size() << " topics from bucket '" <<
563 bucket
.name
<< "'" << dendl
;
565 auto& topic_filter
= bucket_topics
.topics
[topic_name
];
566 topic_filter
.topic
= topic_info
.topic
;
567 topic_filter
.events
= events
;
568 topic_filter
.s3_id
= notif_name
;
570 topic_filter
.s3_filter
= *s3_filter
;
573 ret
= write_topics(dpp
, bucket_topics
, &objv_tracker
, y
);
575 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics to bucket '" << bucket
.name
<< "': ret=" << ret
<< dendl
;
579 ldpp_dout(dpp
, 20) << "successfully wrote " << bucket_topics
.topics
.size() << " topics to bucket '" << bucket
.name
<< "'" << dendl
;
584 int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider
*dpp
, const string
& topic_name
, optional_yield y
)
586 rgw_pubsub_topic_subs topic_info
;
588 int ret
= ps
->get_topic(topic_name
, &topic_info
);
590 ldpp_dout(dpp
, 1) << "ERROR: failed to read topic info: ret=" << ret
<< dendl
;
594 RGWObjVersionTracker objv_tracker
;
595 rgw_pubsub_bucket_topics bucket_topics
;
597 ret
= read_topics(&bucket_topics
, &objv_tracker
);
599 ldpp_dout(dpp
, 1) << "ERROR: failed to read bucket topics info: ret=" << ret
<< dendl
;
603 bucket_topics
.topics
.erase(topic_name
);
605 ret
= write_topics(dpp
, bucket_topics
, &objv_tracker
, y
);
607 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
614 int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider
*dpp
, optional_yield y
)
616 // get all topics on a bucket
617 rgw_pubsub_bucket_topics bucket_topics
;
618 auto ret
= get_topics(&bucket_topics
);
619 if (ret
< 0 && ret
!= -ENOENT
) {
620 ldpp_dout(dpp
, 1) << "ERROR: failed to get list of topics from bucket '" << bucket
.name
<< "', ret=" << ret
<< dendl
;
624 // remove all auto-genrated topics
625 for (const auto& topic
: bucket_topics
.topics
) {
626 const auto& topic_name
= topic
.first
;
627 ret
= ps
->remove_topic(dpp
, topic_name
, y
);
628 if (ret
< 0 && ret
!= -ENOENT
) {
629 ldpp_dout(dpp
, 5) << "WARNING: failed to remove auto-generated topic '" << topic_name
<< "', ret=" << ret
<< dendl
;
633 // delete all notification of on a bucket
634 ret
= ps
->remove(dpp
, bucket_meta_obj
, nullptr, y
);
635 if (ret
< 0 && ret
!= -ENOENT
) {
636 ldpp_dout(dpp
, 1) << "ERROR: failed to remove bucket topics: ret=" << ret
<< dendl
;
643 int RGWPubSub::create_topic(const DoutPrefixProvider
*dpp
, const string
& name
, optional_yield y
) {
644 return create_topic(dpp
, name
, rgw_pubsub_sub_dest(), "", "", y
);
647 int RGWPubSub::create_topic(const DoutPrefixProvider
*dpp
, const string
& name
, const rgw_pubsub_sub_dest
& dest
, const std::string
& arn
, const std::string
& opaque_data
, optional_yield y
) {
648 RGWObjVersionTracker objv_tracker
;
649 rgw_pubsub_topics topics
;
651 int ret
= read_topics(&topics
, &objv_tracker
);
652 if (ret
< 0 && ret
!= -ENOENT
) {
653 // its not an error if not topics exist, we create one
654 ldpp_dout(dpp
, 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
658 rgw_pubsub_topic_subs
& new_topic
= topics
.topics
[name
];
659 new_topic
.topic
.user
= rgw_user("", tenant
);
660 new_topic
.topic
.name
= name
;
661 new_topic
.topic
.dest
= dest
;
662 new_topic
.topic
.arn
= arn
;
663 new_topic
.topic
.opaque_data
= opaque_data
;
665 ret
= write_topics(dpp
, topics
, &objv_tracker
, y
);
667 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
674 int RGWPubSub::remove_topic(const DoutPrefixProvider
*dpp
, const string
& name
, optional_yield y
)
676 RGWObjVersionTracker objv_tracker
;
677 rgw_pubsub_topics topics
;
679 int ret
= read_topics(&topics
, &objv_tracker
);
680 if (ret
< 0 && ret
!= -ENOENT
) {
681 ldpp_dout(dpp
, 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
683 } else if (ret
== -ENOENT
) {
684 // its not an error if no topics exist, just a no-op
685 ldpp_dout(dpp
, 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret
<< dendl
;
689 topics
.topics
.erase(name
);
691 ret
= write_topics(dpp
, topics
, &objv_tracker
, y
);
693 ldpp_dout(dpp
, 1) << "ERROR: failed to remove topics info: ret=" << ret
<< dendl
;
700 int RGWPubSub::Sub::read_sub(rgw_pubsub_sub_config
*result
, RGWObjVersionTracker
*objv_tracker
)
702 int ret
= ps
->read(sub_meta_obj
, result
, objv_tracker
);
703 if (ret
< 0 && ret
!= -ENOENT
) {
704 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret
<< dendl
;
710 int RGWPubSub::Sub::write_sub(const DoutPrefixProvider
*dpp
,
711 const rgw_pubsub_sub_config
& sub_conf
,
712 RGWObjVersionTracker
*objv_tracker
,
715 int ret
= ps
->write(dpp
, sub_meta_obj
, sub_conf
, objv_tracker
, y
);
717 ldpp_dout(dpp
, 1) << "ERROR: failed to write subscription info: ret=" << ret
<< dendl
;
724 int RGWPubSub::Sub::remove_sub(const DoutPrefixProvider
*dpp
, RGWObjVersionTracker
*objv_tracker
,
727 int ret
= ps
->remove(dpp
, sub_meta_obj
, objv_tracker
, y
);
729 ldpp_dout(dpp
, 1) << "ERROR: failed to remove subscription info: ret=" << ret
<< dendl
;
736 int RGWPubSub::Sub::get_conf(rgw_pubsub_sub_config
*result
)
738 return read_sub(result
, nullptr);
741 int RGWPubSub::Sub::subscribe(const DoutPrefixProvider
*dpp
, const string
& topic
, const rgw_pubsub_sub_dest
& dest
, optional_yield y
, const std::string
& s3_id
)
743 RGWObjVersionTracker objv_tracker
;
744 rgw_pubsub_topics topics
;
746 int ret
= ps
->read_topics(&topics
, &objv_tracker
);
748 ldpp_dout(dpp
, 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
749 return ret
!= -ENOENT
? ret
: -EINVAL
;
752 auto iter
= topics
.topics
.find(topic
);
753 if (iter
== topics
.topics
.end()) {
754 ldpp_dout(dpp
, 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl
;
758 auto& t
= iter
->second
;
760 rgw_pubsub_sub_config sub_conf
;
762 sub_conf
.user
= rgw_user("", ps
->tenant
);
764 sub_conf
.topic
= topic
;
765 sub_conf
.dest
= dest
;
766 sub_conf
.s3_id
= s3_id
;
770 ret
= ps
->write_topics(dpp
, topics
, &objv_tracker
, y
);
772 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
776 ret
= write_sub(dpp
, sub_conf
, nullptr, y
);
778 ldpp_dout(dpp
, 1) << "ERROR: failed to write subscription info: ret=" << ret
<< dendl
;
784 int RGWPubSub::Sub::unsubscribe(const DoutPrefixProvider
*dpp
, const string
& _topic
, optional_yield y
)
786 string topic
= _topic
;
787 RGWObjVersionTracker sobjv_tracker
;
790 rgw_pubsub_sub_config sub_conf
;
791 int ret
= read_sub(&sub_conf
, &sobjv_tracker
);
793 ldpp_dout(dpp
, 1) << "ERROR: failed to read subscription info: ret=" << ret
<< dendl
;
796 topic
= sub_conf
.topic
;
799 RGWObjVersionTracker objv_tracker
;
800 rgw_pubsub_topics topics
;
802 int ret
= ps
->read_topics(&topics
, &objv_tracker
);
804 // not an error - could be that topic was already deleted
805 ldpp_dout(dpp
, 10) << "WARNING: failed to read topics info: ret=" << ret
<< dendl
;
807 auto iter
= topics
.topics
.find(topic
);
808 if (iter
!= topics
.topics
.end()) {
809 auto& t
= iter
->second
;
813 ret
= ps
->write_topics(dpp
, topics
, &objv_tracker
, y
);
815 ldpp_dout(dpp
, 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
821 ret
= remove_sub(dpp
, &sobjv_tracker
, y
);
823 ldpp_dout(dpp
, 1) << "ERROR: failed to delete subscription info: ret=" << ret
<< dendl
;
829 template<typename EventType
>
830 void RGWPubSub::SubWithEvents
<EventType
>::list_events_result::dump(Formatter
*f
) const
832 encode_json("next_marker", next_marker
, f
);
833 encode_json("is_truncated", is_truncated
, f
);
835 Formatter::ArraySection
s(*f
, EventType::json_type_plural
);
836 for (auto& event
: events
) {
837 encode_json("", event
, f
);
841 template<typename EventType
>
842 int RGWPubSub::SubWithEvents
<EventType
>::list_events(const DoutPrefixProvider
*dpp
, const string
& marker
, int max_events
)
844 RGWRados
*store
= ps
->store
->getRados();
845 rgw_pubsub_sub_config sub_conf
;
846 int ret
= get_conf(&sub_conf
);
848 ldpp_dout(dpp
, 1) << "ERROR: failed to read sub config: ret=" << ret
<< dendl
;
852 RGWBucketInfo bucket_info
;
854 ret
= store
->get_bucket_info(&store
->svc
, tenant
, sub_conf
.dest
.bucket_name
, bucket_info
, nullptr, null_yield
, nullptr);
855 if (ret
== -ENOENT
) {
856 list
.is_truncated
= false;
860 ldpp_dout(dpp
, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf
.dest
.bucket_name
<< " ret=" << ret
<< dendl
;
864 RGWRados::Bucket
target(store
, bucket_info
);
865 RGWRados::Bucket::List
list_op(&target
);
867 list_op
.params
.prefix
= sub_conf
.dest
.oid_prefix
;
868 list_op
.params
.marker
= marker
;
870 std::vector
<rgw_bucket_dir_entry
> objs
;
872 ret
= list_op
.list_objects(dpp
, max_events
, &objs
, nullptr, &list
.is_truncated
, null_yield
);
874 ldpp_dout(dpp
, 1) << "ERROR: failed to list bucket: bucket=" << sub_conf
.dest
.bucket_name
<< " ret=" << ret
<< dendl
;
877 if (list
.is_truncated
) {
878 list
.next_marker
= list_op
.get_next_marker().name
;
881 for (auto& obj
: objs
) {
884 bl64
.append(obj
.meta
.user_data
);
886 bl
.decode_base64(bl64
);
887 } catch (buffer::error
& err
) {
888 ldpp_dout(dpp
, 1) << "ERROR: failed to event (not a valid base64)" << dendl
;
893 auto iter
= bl
.cbegin();
896 } catch (buffer::error
& err
) {
897 ldpp_dout(dpp
, 1) << "ERROR: failed to decode event" << dendl
;
901 list
.events
.push_back(event
);
906 template<typename EventType
>
907 int RGWPubSub::SubWithEvents
<EventType
>::remove_event(const DoutPrefixProvider
*dpp
, const string
& event_id
)
909 rgw::sal::RGWRadosStore
*store
= ps
->store
;
910 rgw_pubsub_sub_config sub_conf
;
911 int ret
= get_conf(&sub_conf
);
913 ldpp_dout(dpp
, 1) << "ERROR: failed to read sub config: ret=" << ret
<< dendl
;
917 RGWBucketInfo bucket_info
;
919 ret
= store
->getRados()->get_bucket_info(store
->svc(), tenant
, sub_conf
.dest
.bucket_name
, bucket_info
, nullptr, null_yield
, nullptr);
921 ldpp_dout(dpp
, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf
.dest
.bucket_name
<< " ret=" << ret
<< dendl
;
925 rgw_bucket
& bucket
= bucket_info
.bucket
;
927 RGWObjectCtx
obj_ctx(store
);
928 rgw_obj
obj(bucket
, sub_conf
.dest
.oid_prefix
+ event_id
);
930 obj_ctx
.set_atomic(obj
);
932 RGWRados::Object
del_target(store
->getRados(), bucket_info
, obj_ctx
, obj
);
933 RGWRados::Object::Delete
del_op(&del_target
);
935 del_op
.params
.bucket_owner
= bucket_info
.owner
;
936 del_op
.params
.versioning_status
= bucket_info
.versioning_status();
938 ret
= del_op
.delete_obj(null_yield
, dpp
);
940 ldpp_dout(dpp
, 1) << "ERROR: failed to remove event (obj=" << obj
<< "): ret=" << ret
<< dendl
;
945 void RGWPubSub::get_meta_obj(rgw_raw_obj
*obj
) const {
946 *obj
= rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, meta_oid());
949 void RGWPubSub::get_bucket_meta_obj(const rgw_bucket
& bucket
, rgw_raw_obj
*obj
) const {
950 *obj
= rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, bucket_meta_oid(bucket
));
953 void RGWPubSub::get_sub_meta_obj(const string
& name
, rgw_raw_obj
*obj
) const {
954 *obj
= rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, sub_meta_oid(name
));
957 template<typename EventType
>
958 void RGWPubSub::SubWithEvents
<EventType
>::dump(Formatter
* f
) const {
962 // explicit instantiation for the only two possible types
963 // no need to move implementation to header
964 template class RGWPubSub::SubWithEvents
<rgw_pubsub_event
>;
965 template class RGWPubSub::SubWithEvents
<rgw_pubsub_s3_event
>;