1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "services/svc_zone.h"
7 #include "rgw_sal_rados.h"
8 #include "rgw_pubsub.h"
12 #include "rgw_pubsub_push.h"
16 #define dout_subsys ceph_subsys_rgw
18 void set_event_id(std::string
& id
, const std::string
& hash
, const utime_t
& ts
) {
20 const auto len
= snprintf(buf
, sizeof(buf
), "%010ld.%06ld.%s", (long)ts
.sec(), (long)ts
.usec(), hash
.c_str());
26 bool rgw_s3_key_filter::decode_xml(XMLObj
* obj
) {
27 XMLObjIter iter
= obj
->find("FilterRule");
30 const auto throw_if_missing
= true;
31 auto prefix_not_set
= true;
32 auto suffix_not_set
= true;
33 auto regex_not_set
= true;
36 while ((o
= iter
.get_next())) {
37 RGWXMLDecoder::decode_xml("Name", name
, o
, throw_if_missing
);
38 if (name
== "prefix" && prefix_not_set
) {
39 prefix_not_set
= false;
40 RGWXMLDecoder::decode_xml("Value", prefix_rule
, o
, throw_if_missing
);
41 } else if (name
== "suffix" && suffix_not_set
) {
42 suffix_not_set
= false;
43 RGWXMLDecoder::decode_xml("Value", suffix_rule
, o
, throw_if_missing
);
44 } else if (name
== "regex" && regex_not_set
) {
45 regex_not_set
= false;
46 RGWXMLDecoder::decode_xml("Value", regex_rule
, o
, throw_if_missing
);
48 throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name
+ "'");
54 void rgw_s3_key_filter::dump_xml(Formatter
*f
) const {
55 if (!prefix_rule
.empty()) {
56 f
->open_object_section("FilterRule");
57 ::encode_xml("Name", "prefix", f
);
58 ::encode_xml("Value", prefix_rule
, f
);
61 if (!suffix_rule
.empty()) {
62 f
->open_object_section("FilterRule");
63 ::encode_xml("Name", "suffix", f
);
64 ::encode_xml("Value", suffix_rule
, f
);
67 if (!regex_rule
.empty()) {
68 f
->open_object_section("FilterRule");
69 ::encode_xml("Name", "regex", f
);
70 ::encode_xml("Value", regex_rule
, f
);
75 bool rgw_s3_key_filter::has_content() const {
76 return !(prefix_rule
.empty() && suffix_rule
.empty() && regex_rule
.empty());
79 bool rgw_s3_key_value_filter::decode_xml(XMLObj
* obj
) {
81 XMLObjIter iter
= obj
->find("FilterRule");
84 const auto throw_if_missing
= true;
89 while ((o
= iter
.get_next())) {
90 RGWXMLDecoder::decode_xml("Name", key
, o
, throw_if_missing
);
91 RGWXMLDecoder::decode_xml("Value", value
, o
, throw_if_missing
);
92 kv
.emplace(key
, value
);
97 void rgw_s3_key_value_filter::dump_xml(Formatter
*f
) const {
98 for (const auto& key_value
: kv
) {
99 f
->open_object_section("FilterRule");
100 ::encode_xml("Name", key_value
.first
, f
);
101 ::encode_xml("Value", key_value
.second
, f
);
106 bool rgw_s3_key_value_filter::has_content() const {
110 bool rgw_s3_filter::decode_xml(XMLObj
* obj
) {
111 RGWXMLDecoder::decode_xml("S3Key", key_filter
, obj
);
112 RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter
, obj
);
113 RGWXMLDecoder::decode_xml("S3Tags", tag_filter
, obj
);
117 void rgw_s3_filter::dump_xml(Formatter
*f
) const {
118 if (key_filter
.has_content()) {
119 ::encode_xml("S3Key", key_filter
, f
);
121 if (metadata_filter
.has_content()) {
122 ::encode_xml("S3Metadata", metadata_filter
, f
);
124 if (tag_filter
.has_content()) {
125 ::encode_xml("S3Tags", tag_filter
, f
);
129 bool rgw_s3_filter::has_content() const {
130 return key_filter
.has_content() ||
131 metadata_filter
.has_content() ||
132 tag_filter
.has_content();
135 bool match(const rgw_s3_key_filter
& filter
, const std::string
& key
) {
136 const auto key_size
= key
.size();
137 const auto prefix_size
= filter
.prefix_rule
.size();
138 if (prefix_size
!= 0) {
139 // prefix rule exists
140 if (prefix_size
> key_size
) {
141 // if prefix is longer than key, we fail
144 if (!std::equal(filter
.prefix_rule
.begin(), filter
.prefix_rule
.end(), key
.begin())) {
148 const auto suffix_size
= filter
.suffix_rule
.size();
149 if (suffix_size
!= 0) {
150 // suffix rule exists
151 if (suffix_size
> key_size
) {
152 // if suffix is longer than key, we fail
155 if (!std::equal(filter
.suffix_rule
.begin(), filter
.suffix_rule
.end(), (key
.end() - suffix_size
))) {
159 if (!filter
.regex_rule
.empty()) {
160 // TODO add regex chaching in the filter
161 const std::regex
base_regex(filter
.regex_rule
);
162 if (!std::regex_match(key
, base_regex
)) {
169 bool match(const rgw_s3_key_value_filter
& filter
, const KeyValueMap
& kv
) {
170 // all filter pairs must exist with the same value in the object's metadata/tags
171 // object metadata/tags may include items not in the filter
172 return std::includes(kv
.begin(), kv
.end(), filter
.kv
.begin(), filter
.kv
.end());
175 bool match(const rgw::notify::EventTypeList
& events
, rgw::notify::EventType event
) {
176 // if event list exists, and none of the events in the list matches the event type, filter the message
177 if (!events
.empty() && std::find(events
.begin(), events
.end(), event
) == events
.end()) {
183 void do_decode_xml_obj(rgw::notify::EventTypeList
& l
, const string
& name
, XMLObj
*obj
) {
186 XMLObjIter iter
= obj
->find(name
);
189 while ((o
= iter
.get_next())) {
191 decode_xml_obj(val
, o
);
192 l
.push_back(rgw::notify::from_string(val
));
196 bool rgw_pubsub_s3_notification::decode_xml(XMLObj
*obj
) {
197 const auto throw_if_missing
= true;
198 RGWXMLDecoder::decode_xml("Id", id
, obj
, throw_if_missing
);
200 RGWXMLDecoder::decode_xml("Topic", topic_arn
, obj
, throw_if_missing
);
202 RGWXMLDecoder::decode_xml("Filter", filter
, obj
);
204 do_decode_xml_obj(events
, "Event", obj
);
205 if (events
.empty()) {
206 // if no events are provided, we assume all events
207 events
.push_back(rgw::notify::ObjectCreated
);
208 events
.push_back(rgw::notify::ObjectRemoved
);
213 void rgw_pubsub_s3_notification::dump_xml(Formatter
*f
) const {
214 ::encode_xml("Id", id
, f
);
215 ::encode_xml("Topic", topic_arn
.c_str(), f
);
216 if (filter
.has_content()) {
217 ::encode_xml("Filter", filter
, f
);
219 for (const auto& event
: events
) {
220 ::encode_xml("Event", rgw::notify::to_string(event
), f
);
224 bool rgw_pubsub_s3_notifications::decode_xml(XMLObj
*obj
) {
225 do_decode_xml_obj(list
, "TopicConfiguration", obj
);
227 throw RGWXMLDecoder::err("at least one 'TopicConfiguration' must exist");
232 rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter
& topic_filter
) :
233 id(topic_filter
.s3_id
), events(topic_filter
.events
), topic_arn(topic_filter
.topic
.arn
), filter(topic_filter
.s3_filter
) {}
235 void rgw_pubsub_s3_notifications::dump_xml(Formatter
*f
) const {
236 do_encode_xml("NotificationConfiguration", list
, "TopicConfiguration", f
);
239 void rgw_pubsub_s3_event::dump(Formatter
*f
) const {
240 encode_json("eventVersion", eventVersion
, f
);
241 encode_json("eventSource", eventSource
, f
);
242 encode_json("awsRegion", awsRegion
, f
);
243 utime_t
ut(eventTime
);
244 encode_json("eventTime", ut
, f
);
245 encode_json("eventName", eventName
, f
);
247 Formatter::ObjectSection
s(*f
, "userIdentity");
248 encode_json("principalId", userIdentity
, f
);
251 Formatter::ObjectSection
s(*f
, "requestParameters");
252 encode_json("sourceIPAddress", sourceIPAddress
, f
);
255 Formatter::ObjectSection
s(*f
, "responseElements");
256 encode_json("x-amz-request-id", x_amz_request_id
, f
);
257 encode_json("x-amz-id-2", x_amz_id_2
, f
);
260 Formatter::ObjectSection
s(*f
, "s3");
261 encode_json("s3SchemaVersion", s3SchemaVersion
, f
);
262 encode_json("configurationId", configurationId
, f
);
264 Formatter::ObjectSection
sub_s(*f
, "bucket");
265 encode_json("name", bucket_name
, f
);
267 Formatter::ObjectSection
sub_sub_s(*f
, "ownerIdentity");
268 encode_json("principalId", bucket_ownerIdentity
, f
);
270 encode_json("arn", bucket_arn
, f
);
271 encode_json("id", bucket_id
, f
);
274 Formatter::ObjectSection
sub_s(*f
, "object");
275 encode_json("key", object_key
, f
);
276 encode_json("size", object_size
, f
);
277 encode_json("etag", object_etag
, f
);
278 encode_json("versionId", object_versionId
, f
);
279 encode_json("sequencer", object_sequencer
, f
);
280 encode_json("metadata", x_meta_map
, f
);
281 encode_json("tags", tags
, f
);
284 encode_json("eventId", id
, f
);
285 encode_json("opaqueData", opaque_data
, f
);
288 void rgw_pubsub_event::dump(Formatter
*f
) const
290 encode_json("id", id
, f
);
291 encode_json("event", event_name
, f
);
292 utime_t
ut(timestamp
);
293 encode_json("timestamp", ut
, f
);
294 encode_json("info", info
, f
);
297 void rgw_pubsub_topic::dump(Formatter
*f
) const
299 encode_json("user", user
, f
);
300 encode_json("name", name
, f
);
301 encode_json("dest", dest
, f
);
302 encode_json("arn", arn
, f
);
303 encode_json("opaqueData", opaque_data
, f
);
306 void rgw_pubsub_topic::dump_xml(Formatter
*f
) const
308 encode_xml("User", user
, f
);
309 encode_xml("Name", name
, f
);
310 encode_xml("EndPoint", dest
, f
);
311 encode_xml("TopicArn", arn
, f
);
312 encode_xml("OpaqueData", opaque_data
, f
);
315 void encode_xml_key_value_entry(const std::string
& key
, const std::string
& value
, Formatter
*f
) {
316 f
->open_object_section("entry");
317 encode_xml("key", key
, f
);
318 encode_xml("value", value
, f
);
319 f
->close_section(); // entry
322 void rgw_pubsub_topic::dump_xml_as_attributes(Formatter
*f
) const
324 f
->open_array_section("Attributes");
325 std::string str_user
;
326 user
.to_str(str_user
);
327 encode_xml_key_value_entry("User", str_user
, f
);
328 encode_xml_key_value_entry("Name", name
, f
);
329 encode_xml_key_value_entry("EndPoint", dest
.to_json_str(), f
);
330 encode_xml_key_value_entry("TopicArn", arn
, f
);
331 encode_xml_key_value_entry("OpaqueData", opaque_data
, f
);
332 f
->close_section(); // Attributes
335 void encode_json(const char *name
, const rgw::notify::EventTypeList
& l
, Formatter
*f
)
337 f
->open_array_section(name
);
338 for (auto iter
= l
.cbegin(); iter
!= l
.cend(); ++iter
) {
339 f
->dump_string("obj", rgw::notify::to_ceph_string(*iter
));
344 void rgw_pubsub_topic_filter::dump(Formatter
*f
) const
346 encode_json("topic", topic
, f
);
347 encode_json("events", events
, f
);
350 void rgw_pubsub_topic_subs::dump(Formatter
*f
) const
352 encode_json("topic", topic
, f
);
353 encode_json("subs", subs
, f
);
356 void rgw_pubsub_bucket_topics::dump(Formatter
*f
) const
358 Formatter::ArraySection
s(*f
, "topics");
359 for (auto& t
: topics
) {
360 encode_json(t
.first
.c_str(), t
.second
, f
);
364 void rgw_pubsub_topics::dump(Formatter
*f
) const
366 Formatter::ArraySection
s(*f
, "topics");
367 for (auto& t
: topics
) {
368 encode_json(t
.first
.c_str(), t
.second
, f
);
372 void rgw_pubsub_topics::dump_xml(Formatter
*f
) const
374 for (auto& t
: topics
) {
375 encode_xml("member", t
.second
.topic
, f
);
379 void rgw_pubsub_sub_dest::dump(Formatter
*f
) const
381 encode_json("bucket_name", bucket_name
, f
);
382 encode_json("oid_prefix", oid_prefix
, f
);
383 encode_json("push_endpoint", push_endpoint
, f
);
384 encode_json("push_endpoint_args", push_endpoint_args
, f
);
385 encode_json("push_endpoint_topic", arn_topic
, f
);
386 encode_json("stored_secret", stored_secret
, f
);
387 encode_json("persistent", persistent
, f
);
390 void rgw_pubsub_sub_dest::dump_xml(Formatter
*f
) const
392 // first 2 members are omitted here since they
393 // dont apply to AWS compliant topics
394 encode_xml("EndpointAddress", push_endpoint
, f
);
395 encode_xml("EndpointArgs", push_endpoint_args
, f
);
396 encode_xml("EndpointTopic", arn_topic
, f
);
397 encode_xml("HasStoredSecret", stored_secret
, f
);
398 encode_xml("Persistent", persistent
, f
);
401 std::string
rgw_pubsub_sub_dest::to_json_str() const
403 // first 2 members are omitted here since they
404 // dont apply to AWS compliant topics
406 f
.open_object_section("");
407 encode_json("EndpointAddress", push_endpoint
, &f
);
408 encode_json("EndpointArgs", push_endpoint_args
, &f
);
409 encode_json("EndpointTopic", arn_topic
, &f
);
410 encode_json("HasStoredSecret", stored_secret
, &f
);
411 encode_json("Persistent", persistent
, &f
);
413 std::stringstream ss
;
418 void rgw_pubsub_sub_config::dump(Formatter
*f
) const
420 encode_json("user", user
, f
);
421 encode_json("name", name
, f
);
422 encode_json("topic", topic
, f
);
423 encode_json("dest", dest
, f
);
424 encode_json("s3_id", s3_id
, f
);
427 RGWPubSub::RGWPubSub(rgw::sal::RGWRadosStore
* _store
, const std::string
& _tenant
) :
430 obj_ctx(store
->svc()->sysobj
->init_obj_ctx()) {
431 get_meta_obj(&meta_obj
);
434 int RGWPubSub::remove(const rgw_raw_obj
& obj
,
435 RGWObjVersionTracker
*objv_tracker
,
438 int ret
= rgw_delete_system_obj(store
->svc()->sysobj
, obj
.pool
, obj
.oid
, objv_tracker
, y
);
446 int RGWPubSub::read_topics(rgw_pubsub_topics
*result
, RGWObjVersionTracker
*objv_tracker
)
448 int ret
= read(meta_obj
, result
, objv_tracker
);
450 ldout(store
->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret
<< dendl
;
456 int RGWPubSub::write_topics(const rgw_pubsub_topics
& topics
,
457 RGWObjVersionTracker
*objv_tracker
, optional_yield y
)
459 int ret
= write(meta_obj
, topics
, objv_tracker
, y
);
460 if (ret
< 0 && ret
!= -ENOENT
) {
461 ldout(store
->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
467 int RGWPubSub::get_topics(rgw_pubsub_topics
*result
)
469 return read_topics(result
, nullptr);
472 int RGWPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics
*result
, RGWObjVersionTracker
*objv_tracker
)
474 int ret
= ps
->read(bucket_meta_obj
, result
, objv_tracker
);
475 if (ret
< 0 && ret
!= -ENOENT
) {
476 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret
<< dendl
;
482 int RGWPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics
& topics
,
483 RGWObjVersionTracker
*objv_tracker
,
486 int ret
= ps
->write(bucket_meta_obj
, topics
, objv_tracker
, y
);
488 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret
<< dendl
;
495 int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics
*result
)
497 return read_topics(result
, nullptr);
500 int RGWPubSub::get_topic(const string
& name
, rgw_pubsub_topic_subs
*result
)
502 rgw_pubsub_topics topics
;
503 int ret
= get_topics(&topics
);
505 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
509 auto iter
= topics
.topics
.find(name
);
510 if (iter
== topics
.topics
.end()) {
511 ldout(store
->ctx(), 1) << "ERROR: topic not found" << dendl
;
515 *result
= iter
->second
;
519 int RGWPubSub::get_topic(const string
& name
, rgw_pubsub_topic
*result
)
521 rgw_pubsub_topics topics
;
522 int ret
= get_topics(&topics
);
524 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
528 auto iter
= topics
.topics
.find(name
);
529 if (iter
== topics
.topics
.end()) {
530 ldout(store
->ctx(), 1) << "ERROR: topic not found" << dendl
;
534 *result
= iter
->second
.topic
;
538 int RGWPubSub::Bucket::create_notification(const string
& topic_name
, const rgw::notify::EventTypeList
& events
, optional_yield y
) {
539 return create_notification(topic_name
, events
, std::nullopt
, "", y
);
542 int RGWPubSub::Bucket::create_notification(const string
& topic_name
,const rgw::notify::EventTypeList
& events
, OptionalFilter s3_filter
, const std::string
& notif_name
, optional_yield y
) {
543 rgw_pubsub_topic_subs topic_info
;
544 rgw::sal::RGWRadosStore
*store
= ps
->store
;
546 int ret
= ps
->get_topic(topic_name
, &topic_info
);
548 ldout(store
->ctx(), 1) << "ERROR: failed to read topic '" << topic_name
<< "' info: ret=" << ret
<< dendl
;
551 ldout(store
->ctx(), 20) << "successfully read topic '" << topic_name
<< "' info" << dendl
;
553 RGWObjVersionTracker objv_tracker
;
554 rgw_pubsub_bucket_topics bucket_topics
;
556 ret
= read_topics(&bucket_topics
, &objv_tracker
);
558 ldout(store
->ctx(), 1) << "ERROR: failed to read topics from bucket '" <<
559 bucket
.name
<< "': ret=" << ret
<< dendl
;
562 ldout(store
->ctx(), 20) << "successfully read " << bucket_topics
.topics
.size() << " topics from bucket '" <<
563 bucket
.name
<< "'" << dendl
;
565 auto& topic_filter
= bucket_topics
.topics
[topic_name
];
566 topic_filter
.topic
= topic_info
.topic
;
567 topic_filter
.events
= events
;
568 topic_filter
.s3_id
= notif_name
;
570 topic_filter
.s3_filter
= *s3_filter
;
573 ret
= write_topics(bucket_topics
, &objv_tracker
, y
);
575 ldout(store
->ctx(), 1) << "ERROR: failed to write topics to bucket '" << bucket
.name
<< "': ret=" << ret
<< dendl
;
579 ldout(store
->ctx(), 20) << "successfully wrote " << bucket_topics
.topics
.size() << " topics to bucket '" << bucket
.name
<< "'" << dendl
;
584 int RGWPubSub::Bucket::remove_notification(const string
& topic_name
, optional_yield y
)
586 rgw_pubsub_topic_subs topic_info
;
587 rgw::sal::RGWRadosStore
*store
= ps
->store
;
589 int ret
= ps
->get_topic(topic_name
, &topic_info
);
591 ldout(store
->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret
<< dendl
;
595 RGWObjVersionTracker objv_tracker
;
596 rgw_pubsub_bucket_topics bucket_topics
;
598 ret
= read_topics(&bucket_topics
, &objv_tracker
);
600 ldout(store
->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret
<< dendl
;
604 bucket_topics
.topics
.erase(topic_name
);
606 ret
= write_topics(bucket_topics
, &objv_tracker
, y
);
608 ldout(store
->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
615 int RGWPubSub::Bucket::remove_notifications(optional_yield y
)
617 // get all topics on a bucket
618 rgw_pubsub_bucket_topics bucket_topics
;
619 auto ret
= get_topics(&bucket_topics
);
620 if (ret
< 0 && ret
!= -ENOENT
) {
621 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to get list of topics from bucket '" << bucket
.name
<< "', ret=" << ret
<< dendl
;
625 // remove all auto-genrated topics
626 for (const auto& topic
: bucket_topics
.topics
) {
627 const auto& topic_name
= topic
.first
;
628 ret
= ps
->remove_topic(topic_name
, y
);
629 if (ret
< 0 && ret
!= -ENOENT
) {
630 ldout(ps
->store
->ctx(), 5) << "WARNING: failed to remove auto-generated topic '" << topic_name
<< "', ret=" << ret
<< dendl
;
634 // delete all notification of on a bucket
635 ret
= ps
->remove(bucket_meta_obj
, nullptr, y
);
636 if (ret
< 0 && ret
!= -ENOENT
) {
637 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to remove bucket topics: ret=" << ret
<< dendl
;
644 int RGWPubSub::create_topic(const string
& name
, optional_yield y
) {
645 return create_topic(name
, rgw_pubsub_sub_dest(), "", "", y
);
648 int RGWPubSub::create_topic(const string
& name
, const rgw_pubsub_sub_dest
& dest
, const std::string
& arn
, const std::string
& opaque_data
, optional_yield y
) {
649 RGWObjVersionTracker objv_tracker
;
650 rgw_pubsub_topics topics
;
652 int ret
= read_topics(&topics
, &objv_tracker
);
653 if (ret
< 0 && ret
!= -ENOENT
) {
654 // its not an error if not topics exist, we create one
655 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
659 rgw_pubsub_topic_subs
& new_topic
= topics
.topics
[name
];
660 new_topic
.topic
.user
= rgw_user("", tenant
);
661 new_topic
.topic
.name
= name
;
662 new_topic
.topic
.dest
= dest
;
663 new_topic
.topic
.arn
= arn
;
664 new_topic
.topic
.opaque_data
= opaque_data
;
666 ret
= write_topics(topics
, &objv_tracker
, y
);
668 ldout(store
->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
675 int RGWPubSub::remove_topic(const string
& name
, optional_yield y
)
677 RGWObjVersionTracker objv_tracker
;
678 rgw_pubsub_topics topics
;
680 int ret
= read_topics(&topics
, &objv_tracker
);
681 if (ret
< 0 && ret
!= -ENOENT
) {
682 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
684 } else if (ret
== -ENOENT
) {
685 // its not an error if no topics exist, just a no-op
686 ldout(store
->ctx(), 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret
<< dendl
;
690 topics
.topics
.erase(name
);
692 ret
= write_topics(topics
, &objv_tracker
, y
);
694 ldout(store
->ctx(), 1) << "ERROR: failed to remove topics info: ret=" << ret
<< dendl
;
701 int RGWPubSub::Sub::read_sub(rgw_pubsub_sub_config
*result
, RGWObjVersionTracker
*objv_tracker
)
703 int ret
= ps
->read(sub_meta_obj
, result
, objv_tracker
);
704 if (ret
< 0 && ret
!= -ENOENT
) {
705 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret
<< dendl
;
711 int RGWPubSub::Sub::write_sub(const rgw_pubsub_sub_config
& sub_conf
,
712 RGWObjVersionTracker
*objv_tracker
,
715 int ret
= ps
->write(sub_meta_obj
, sub_conf
, objv_tracker
, y
);
717 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret
<< dendl
;
724 int RGWPubSub::Sub::remove_sub(RGWObjVersionTracker
*objv_tracker
,
727 int ret
= ps
->remove(sub_meta_obj
, objv_tracker
, y
);
729 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to remove subscription info: ret=" << ret
<< dendl
;
736 int RGWPubSub::Sub::get_conf(rgw_pubsub_sub_config
*result
)
738 return read_sub(result
, nullptr);
741 int RGWPubSub::Sub::subscribe(const string
& topic
, const rgw_pubsub_sub_dest
& dest
, optional_yield y
, const std::string
& s3_id
)
743 RGWObjVersionTracker objv_tracker
;
744 rgw_pubsub_topics topics
;
745 rgw::sal::RGWRadosStore
*store
= ps
->store
;
747 int ret
= ps
->read_topics(&topics
, &objv_tracker
);
749 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
750 return ret
!= -ENOENT
? ret
: -EINVAL
;
753 auto iter
= topics
.topics
.find(topic
);
754 if (iter
== topics
.topics
.end()) {
755 ldout(store
->ctx(), 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl
;
759 auto& t
= iter
->second
;
761 rgw_pubsub_sub_config sub_conf
;
763 sub_conf
.user
= rgw_user("", ps
->tenant
);
765 sub_conf
.topic
= topic
;
766 sub_conf
.dest
= dest
;
767 sub_conf
.s3_id
= s3_id
;
771 ret
= ps
->write_topics(topics
, &objv_tracker
, y
);
773 ldout(store
->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
777 ret
= write_sub(sub_conf
, nullptr, y
);
779 ldout(store
->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret
<< dendl
;
785 int RGWPubSub::Sub::unsubscribe(const string
& _topic
, optional_yield y
)
787 string topic
= _topic
;
788 RGWObjVersionTracker sobjv_tracker
;
789 rgw::sal::RGWRadosStore
*store
= ps
->store
;
792 rgw_pubsub_sub_config sub_conf
;
793 int ret
= read_sub(&sub_conf
, &sobjv_tracker
);
795 ldout(store
->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret
<< dendl
;
798 topic
= sub_conf
.topic
;
801 RGWObjVersionTracker objv_tracker
;
802 rgw_pubsub_topics topics
;
804 int ret
= ps
->read_topics(&topics
, &objv_tracker
);
806 // not an error - could be that topic was already deleted
807 ldout(store
->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret
<< dendl
;
809 auto iter
= topics
.topics
.find(topic
);
810 if (iter
!= topics
.topics
.end()) {
811 auto& t
= iter
->second
;
815 ret
= ps
->write_topics(topics
, &objv_tracker
, y
);
817 ldout(store
->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
823 ret
= remove_sub(&sobjv_tracker
, y
);
825 ldout(store
->ctx(), 1) << "ERROR: failed to delete subscription info: ret=" << ret
<< dendl
;
831 template<typename EventType
>
832 void RGWPubSub::SubWithEvents
<EventType
>::list_events_result::dump(Formatter
*f
) const
834 encode_json("next_marker", next_marker
, f
);
835 encode_json("is_truncated", is_truncated
, f
);
837 Formatter::ArraySection
s(*f
, EventType::json_type_plural
);
838 for (auto& event
: events
) {
839 encode_json("", event
, f
);
843 template<typename EventType
>
844 int RGWPubSub::SubWithEvents
<EventType
>::list_events(const string
& marker
, int max_events
)
846 RGWRados
*store
= ps
->store
->getRados();
847 rgw_pubsub_sub_config sub_conf
;
848 int ret
= get_conf(&sub_conf
);
850 ldout(store
->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret
<< dendl
;
854 RGWBucketInfo bucket_info
;
856 ret
= store
->get_bucket_info(&store
->svc
, tenant
, sub_conf
.dest
.bucket_name
, bucket_info
, nullptr, null_yield
, nullptr);
857 if (ret
== -ENOENT
) {
858 list
.is_truncated
= false;
862 ldout(store
->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf
.dest
.bucket_name
<< " ret=" << ret
<< dendl
;
866 RGWRados::Bucket
target(store
, bucket_info
);
867 RGWRados::Bucket::List
list_op(&target
);
869 list_op
.params
.prefix
= sub_conf
.dest
.oid_prefix
;
870 list_op
.params
.marker
= marker
;
872 std::vector
<rgw_bucket_dir_entry
> objs
;
874 ret
= list_op
.list_objects(max_events
, &objs
, nullptr, &list
.is_truncated
, null_yield
);
876 ldout(store
->ctx(), 1) << "ERROR: failed to list bucket: bucket=" << sub_conf
.dest
.bucket_name
<< " ret=" << ret
<< dendl
;
879 if (list
.is_truncated
) {
880 list
.next_marker
= list_op
.get_next_marker().name
;
883 for (auto& obj
: objs
) {
886 bl64
.append(obj
.meta
.user_data
);
888 bl
.decode_base64(bl64
);
889 } catch (buffer::error
& err
) {
890 ldout(store
->ctx(), 1) << "ERROR: failed to event (not a valid base64)" << dendl
;
895 auto iter
= bl
.cbegin();
898 } catch (buffer::error
& err
) {
899 ldout(store
->ctx(), 1) << "ERROR: failed to decode event" << dendl
;
903 list
.events
.push_back(event
);
908 template<typename EventType
>
909 int RGWPubSub::SubWithEvents
<EventType
>::remove_event(const string
& event_id
)
911 rgw::sal::RGWRadosStore
*store
= ps
->store
;
912 rgw_pubsub_sub_config sub_conf
;
913 int ret
= get_conf(&sub_conf
);
915 ldout(store
->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret
<< dendl
;
919 RGWBucketInfo bucket_info
;
921 ret
= store
->getRados()->get_bucket_info(store
->svc(), tenant
, sub_conf
.dest
.bucket_name
, bucket_info
, nullptr, null_yield
, nullptr);
923 ldout(store
->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf
.dest
.bucket_name
<< " ret=" << ret
<< dendl
;
927 rgw_bucket
& bucket
= bucket_info
.bucket
;
929 RGWObjectCtx
obj_ctx(store
);
930 rgw_obj
obj(bucket
, sub_conf
.dest
.oid_prefix
+ event_id
);
932 obj_ctx
.set_atomic(obj
);
934 RGWRados::Object
del_target(store
->getRados(), bucket_info
, obj_ctx
, obj
);
935 RGWRados::Object::Delete
del_op(&del_target
);
937 del_op
.params
.bucket_owner
= bucket_info
.owner
;
938 del_op
.params
.versioning_status
= bucket_info
.versioning_status();
940 ret
= del_op
.delete_obj(null_yield
);
942 ldout(store
->ctx(), 1) << "ERROR: failed to remove event (obj=" << obj
<< "): ret=" << ret
<< dendl
;
947 void RGWPubSub::get_meta_obj(rgw_raw_obj
*obj
) const {
948 *obj
= rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, meta_oid());
951 void RGWPubSub::get_bucket_meta_obj(const rgw_bucket
& bucket
, rgw_raw_obj
*obj
) const {
952 *obj
= rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, bucket_meta_oid(bucket
));
955 void RGWPubSub::get_sub_meta_obj(const string
& name
, rgw_raw_obj
*obj
) const {
956 *obj
= rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, sub_meta_oid(name
));
959 template<typename EventType
>
960 void RGWPubSub::SubWithEvents
<EventType
>::dump(Formatter
* f
) const {
964 // explicit instantiation for the only two possible types
965 // no need to move implementation to header
966 template class RGWPubSub::SubWithEvents
<rgw_pubsub_event
>;
967 template class RGWPubSub::SubWithEvents
<rgw_pubsub_s3_event
>;