1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "services/svc_zone.h"
7 #include "rgw_pubsub.h"
11 #include "rgw_pubsub_push.h"
12 #include "rgw_rados.h"
16 #define dout_subsys ceph_subsys_rgw
18 bool rgw_s3_key_filter::decode_xml(XMLObj
* obj
) {
19 XMLObjIter iter
= obj
->find("FilterRule");
22 const auto throw_if_missing
= true;
23 auto prefix_not_set
= true;
24 auto suffix_not_set
= true;
25 auto regex_not_set
= true;
28 while ((o
= iter
.get_next())) {
29 RGWXMLDecoder::decode_xml("Name", name
, o
, throw_if_missing
);
30 if (name
== "prefix" && prefix_not_set
) {
31 prefix_not_set
= false;
32 RGWXMLDecoder::decode_xml("Value", prefix_rule
, o
, throw_if_missing
);
33 } else if (name
== "suffix" && suffix_not_set
) {
34 suffix_not_set
= false;
35 RGWXMLDecoder::decode_xml("Value", suffix_rule
, o
, throw_if_missing
);
36 } else if (name
== "regex" && regex_not_set
) {
37 regex_not_set
= false;
38 RGWXMLDecoder::decode_xml("Value", regex_rule
, o
, throw_if_missing
);
40 throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name
+ "'");
46 void rgw_s3_key_filter::dump_xml(Formatter
*f
) const {
47 if (!prefix_rule
.empty()) {
48 f
->open_object_section("FilterRule");
49 ::encode_xml("Name", "prefix", f
);
50 ::encode_xml("Value", prefix_rule
, f
);
53 if (!suffix_rule
.empty()) {
54 f
->open_object_section("FilterRule");
55 ::encode_xml("Name", "suffix", f
);
56 ::encode_xml("Value", suffix_rule
, f
);
59 if (!regex_rule
.empty()) {
60 f
->open_object_section("FilterRule");
61 ::encode_xml("Name", "regex", f
);
62 ::encode_xml("Value", regex_rule
, f
);
67 bool rgw_s3_key_filter::has_content() const {
68 return !(prefix_rule
.empty() && suffix_rule
.empty() && regex_rule
.empty());
71 bool rgw_s3_metadata_filter::decode_xml(XMLObj
* obj
) {
73 XMLObjIter iter
= obj
->find("FilterRule");
76 const auto throw_if_missing
= true;
81 while ((o
= iter
.get_next())) {
82 RGWXMLDecoder::decode_xml("Name", key
, o
, throw_if_missing
);
83 RGWXMLDecoder::decode_xml("Value", value
, o
, throw_if_missing
);
84 metadata
.emplace(key
, value
);
89 void rgw_s3_metadata_filter::dump_xml(Formatter
*f
) const {
90 for (const auto& key_value
: metadata
) {
91 f
->open_object_section("FilterRule");
92 ::encode_xml("Name", key_value
.first
, f
);
93 ::encode_xml("Value", key_value
.second
, f
);
98 bool rgw_s3_metadata_filter::has_content() const {
99 return !metadata
.empty();
102 bool rgw_s3_filter::decode_xml(XMLObj
* obj
) {
103 RGWXMLDecoder::decode_xml("S3Key", key_filter
, obj
);
104 RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter
, obj
);
108 void rgw_s3_filter::dump_xml(Formatter
*f
) const {
109 if (key_filter
.has_content()) {
110 ::encode_xml("S3Key", key_filter
, f
);
112 if (metadata_filter
.has_content()) {
113 ::encode_xml("S3Metadata", metadata_filter
, f
);
117 bool rgw_s3_filter::has_content() const {
118 return key_filter
.has_content() ||
119 metadata_filter
.has_content();
122 bool match(const rgw_s3_key_filter
& filter
, const std::string
& key
) {
123 const auto key_size
= key
.size();
124 const auto prefix_size
= filter
.prefix_rule
.size();
125 if (prefix_size
!= 0) {
126 // prefix rule exists
127 if (prefix_size
> key_size
) {
128 // if prefix is longer than key, we fail
131 if (!std::equal(filter
.prefix_rule
.begin(), filter
.prefix_rule
.end(), key
.begin())) {
135 const auto suffix_size
= filter
.suffix_rule
.size();
136 if (suffix_size
!= 0) {
137 // suffix rule exists
138 if (suffix_size
> key_size
) {
139 // if suffix is longer than key, we fail
142 if (!std::equal(filter
.suffix_rule
.begin(), filter
.suffix_rule
.end(), (key
.end() - suffix_size
))) {
146 if (!filter
.regex_rule
.empty()) {
147 // TODO add regex chaching in the filter
148 const std::regex
base_regex(filter
.regex_rule
);
149 if (!std::regex_match(key
, base_regex
)) {
156 bool match(const rgw_s3_metadata_filter
& filter
, const Metadata
& metadata
) {
157 // all filter pairs must exist with the same value in the object's metadata
158 // object metadata may include items not in the filter
159 return std::includes(metadata
.begin(), metadata
.end(), filter
.metadata
.begin(), filter
.metadata
.end());
162 bool match(const rgw::notify::EventTypeList
& events
, rgw::notify::EventType event
) {
163 // if event list exists, and none of the events in the list matches the event type, filter the message
164 if (!events
.empty() && std::find(events
.begin(), events
.end(), event
) == events
.end()) {
170 void do_decode_xml_obj(rgw::notify::EventTypeList
& l
, const string
& name
, XMLObj
*obj
) {
173 XMLObjIter iter
= obj
->find(name
);
176 while ((o
= iter
.get_next())) {
178 decode_xml_obj(val
, o
);
179 l
.push_back(rgw::notify::from_string(val
));
183 bool rgw_pubsub_s3_notification::decode_xml(XMLObj
*obj
) {
184 const auto throw_if_missing
= true;
185 RGWXMLDecoder::decode_xml("Id", id
, obj
, throw_if_missing
);
187 RGWXMLDecoder::decode_xml("Topic", topic_arn
, obj
, throw_if_missing
);
189 RGWXMLDecoder::decode_xml("Filter", filter
, obj
);
191 do_decode_xml_obj(events
, "Event", obj
);
192 if (events
.empty()) {
193 // if no events are provided, we assume all events
194 events
.push_back(rgw::notify::ObjectCreated
);
195 events
.push_back(rgw::notify::ObjectRemoved
);
200 void rgw_pubsub_s3_notification::dump_xml(Formatter
*f
) const {
201 ::encode_xml("Id", id
, f
);
202 ::encode_xml("Topic", topic_arn
.c_str(), f
);
203 if (filter
.has_content()) {
204 ::encode_xml("Filter", filter
, f
);
206 for (const auto& event
: events
) {
207 ::encode_xml("Event", rgw::notify::to_string(event
), f
);
211 bool rgw_pubsub_s3_notifications::decode_xml(XMLObj
*obj
) {
212 do_decode_xml_obj(list
, "TopicConfiguration", obj
);
214 throw RGWXMLDecoder::err("at least one 'TopicConfiguration' must exist");
219 rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter
& topic_filter
) :
220 id(topic_filter
.s3_id
), events(topic_filter
.events
), topic_arn(topic_filter
.topic
.arn
), filter(topic_filter
.s3_filter
) {}
222 void rgw_pubsub_s3_notifications::dump_xml(Formatter
*f
) const {
223 do_encode_xml("NotificationConfiguration", list
, "TopicConfiguration", f
);
226 void rgw_pubsub_s3_record::dump(Formatter
*f
) const {
227 encode_json("eventVersion", eventVersion
, f
);
228 encode_json("eventSource", eventSource
, f
);
229 encode_json("awsRegion", awsRegion
, f
);
230 utime_t
ut(eventTime
);
231 encode_json("eventTime", ut
, f
);
232 encode_json("eventName", eventName
, f
);
234 Formatter::ObjectSection
s(*f
, "userIdentity");
235 encode_json("principalId", userIdentity
, f
);
238 Formatter::ObjectSection
s(*f
, "requestParameters");
239 encode_json("sourceIPAddress", sourceIPAddress
, f
);
242 Formatter::ObjectSection
s(*f
, "responseElements");
243 encode_json("x-amz-request-id", x_amz_request_id
, f
);
244 encode_json("x-amz-id-2", x_amz_id_2
, f
);
247 Formatter::ObjectSection
s(*f
, "s3");
248 encode_json("s3SchemaVersion", s3SchemaVersion
, f
);
249 encode_json("configurationId", configurationId
, f
);
251 Formatter::ObjectSection
sub_s(*f
, "bucket");
252 encode_json("name", bucket_name
, f
);
254 Formatter::ObjectSection
sub_sub_s(*f
, "ownerIdentity");
255 encode_json("principalId", bucket_ownerIdentity
, f
);
257 encode_json("arn", bucket_arn
, f
);
258 encode_json("id", bucket_id
, f
);
261 Formatter::ObjectSection
sub_s(*f
, "object");
262 encode_json("key", object_key
, f
);
263 encode_json("size", object_size
, f
);
264 encode_json("etag", object_etag
, f
);
265 encode_json("versionId", object_versionId
, f
);
266 encode_json("sequencer", object_sequencer
, f
);
267 encode_json("metadata", x_meta_map
, f
);
270 encode_json("eventId", id
, f
);
273 void rgw_pubsub_event::dump(Formatter
*f
) const
275 encode_json("id", id
, f
);
276 encode_json("event", event_name
, f
);
277 utime_t
ut(timestamp
);
278 encode_json("timestamp", ut
, f
);
279 encode_json("info", info
, f
);
282 void rgw_pubsub_topic::dump(Formatter
*f
) const
284 encode_json("user", user
, f
);
285 encode_json("name", name
, f
);
286 encode_json("dest", dest
, f
);
287 encode_json("arn", arn
, f
);
290 void rgw_pubsub_topic::dump_xml(Formatter
*f
) const
292 encode_xml("User", user
, f
);
293 encode_xml("Name", name
, f
);
294 encode_xml("EndPoint", dest
, f
);
295 encode_xml("TopicArn", arn
, f
);
298 void encode_json(const char *name
, const rgw::notify::EventTypeList
& l
, Formatter
*f
)
300 f
->open_array_section(name
);
301 for (auto iter
= l
.cbegin(); iter
!= l
.cend(); ++iter
) {
302 f
->dump_string("obj", rgw::notify::to_ceph_string(*iter
));
307 void rgw_pubsub_topic_filter::dump(Formatter
*f
) const
309 encode_json("topic", topic
, f
);
310 encode_json("events", events
, f
);
313 void rgw_pubsub_topic_subs::dump(Formatter
*f
) const
315 encode_json("topic", topic
, f
);
316 encode_json("subs", subs
, f
);
319 void rgw_pubsub_bucket_topics::dump(Formatter
*f
) const
321 Formatter::ArraySection
s(*f
, "topics");
322 for (auto& t
: topics
) {
323 encode_json(t
.first
.c_str(), t
.second
, f
);
327 void rgw_pubsub_user_topics::dump(Formatter
*f
) const
329 Formatter::ArraySection
s(*f
, "topics");
330 for (auto& t
: topics
) {
331 encode_json(t
.first
.c_str(), t
.second
, f
);
335 void rgw_pubsub_user_topics::dump_xml(Formatter
*f
) const
337 for (auto& t
: topics
) {
338 encode_xml("member", t
.second
.topic
, f
);
342 void rgw_pubsub_sub_dest::dump(Formatter
*f
) const
344 encode_json("bucket_name", bucket_name
, f
);
345 encode_json("oid_prefix", oid_prefix
, f
);
346 encode_json("push_endpoint", push_endpoint
, f
);
347 encode_json("push_endpoint_args", push_endpoint_args
, f
);
348 encode_json("push_endpoint_topic", arn_topic
, f
);
351 void rgw_pubsub_sub_dest::dump_xml(Formatter
*f
) const
353 encode_xml("EndpointAddress", push_endpoint
, f
);
354 encode_xml("EndpointArgs", push_endpoint_args
, f
);
355 encode_xml("EndpointTopic", arn_topic
, f
);
358 void rgw_pubsub_sub_config::dump(Formatter
*f
) const
360 encode_json("user", user
, f
);
361 encode_json("name", name
, f
);
362 encode_json("topic", topic
, f
);
363 encode_json("dest", dest
, f
);
364 encode_json("s3_id", s3_id
, f
);
368 int RGWUserPubSub::remove(const rgw_raw_obj
& obj
, RGWObjVersionTracker
*objv_tracker
)
370 int ret
= rgw_delete_system_obj(store
, obj
.pool
, obj
.oid
, objv_tracker
);
378 int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics
*result
, RGWObjVersionTracker
*objv_tracker
)
380 int ret
= read(user_meta_obj
, result
, objv_tracker
);
382 ldout(store
->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret
<< dendl
;
388 int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics
& topics
, RGWObjVersionTracker
*objv_tracker
)
390 int ret
= write(user_meta_obj
, topics
, objv_tracker
);
391 if (ret
< 0 && ret
!= -ENOENT
) {
392 ldout(store
->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
398 int RGWUserPubSub::get_user_topics(rgw_pubsub_user_topics
*result
)
400 return read_user_topics(result
, nullptr);
403 int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics
*result
, RGWObjVersionTracker
*objv_tracker
)
405 int ret
= ps
->read(bucket_meta_obj
, result
, objv_tracker
);
406 if (ret
< 0 && ret
!= -ENOENT
) {
407 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret
<< dendl
;
413 int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics
& topics
, RGWObjVersionTracker
*objv_tracker
)
415 int ret
= ps
->write(bucket_meta_obj
, topics
, objv_tracker
);
417 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret
<< dendl
;
424 int RGWUserPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics
*result
)
426 return read_topics(result
, nullptr);
429 int RGWUserPubSub::get_topic(const string
& name
, rgw_pubsub_topic_subs
*result
)
431 rgw_pubsub_user_topics topics
;
432 int ret
= get_user_topics(&topics
);
434 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
438 auto iter
= topics
.topics
.find(name
);
439 if (iter
== topics
.topics
.end()) {
440 ldout(store
->ctx(), 1) << "ERROR: topic not found" << dendl
;
444 *result
= iter
->second
;
448 int RGWUserPubSub::get_topic(const string
& name
, rgw_pubsub_topic
*result
)
450 rgw_pubsub_user_topics topics
;
451 int ret
= get_user_topics(&topics
);
453 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
457 auto iter
= topics
.topics
.find(name
);
458 if (iter
== topics
.topics
.end()) {
459 ldout(store
->ctx(), 1) << "ERROR: topic not found" << dendl
;
463 *result
= iter
->second
.topic
;
467 int RGWUserPubSub::Bucket::create_notification(const string
& topic_name
, const rgw::notify::EventTypeList
& events
) {
468 return create_notification(topic_name
, events
, std::nullopt
, "");
471 int RGWUserPubSub::Bucket::create_notification(const string
& topic_name
, const rgw::notify::EventTypeList
& events
, OptionalFilter s3_filter
, const std::string
& notif_name
) {
472 rgw_pubsub_topic_subs user_topic_info
;
473 RGWRados
*store
= ps
->store
;
475 int ret
= ps
->get_topic(topic_name
, &user_topic_info
);
477 ldout(store
->ctx(), 1) << "ERROR: failed to read topic '" << topic_name
<< "' info: ret=" << ret
<< dendl
;
480 ldout(store
->ctx(), 20) << "successfully read topic '" << topic_name
<< "' info" << dendl
;
482 RGWObjVersionTracker objv_tracker
;
483 rgw_pubsub_bucket_topics bucket_topics
;
485 ret
= read_topics(&bucket_topics
, &objv_tracker
);
487 ldout(store
->ctx(), 1) << "ERROR: failed to read topics from bucket '" <<
488 bucket
.name
<< "': ret=" << ret
<< dendl
;
491 ldout(store
->ctx(), 20) << "successfully read " << bucket_topics
.topics
.size() << " topics from bucket '" <<
492 bucket
.name
<< "'" << dendl
;
494 auto& topic_filter
= bucket_topics
.topics
[topic_name
];
495 topic_filter
.topic
= user_topic_info
.topic
;
496 topic_filter
.events
= events
;
497 topic_filter
.s3_id
= notif_name
;
499 topic_filter
.s3_filter
= *s3_filter
;
502 ret
= write_topics(bucket_topics
, &objv_tracker
);
504 ldout(store
->ctx(), 1) << "ERROR: failed to write topics to bucket '" << bucket
.name
<< "': ret=" << ret
<< dendl
;
508 ldout(store
->ctx(), 20) << "successfully wrote " << bucket_topics
.topics
.size() << " topics to bucket '" << bucket
.name
<< "'" << dendl
;
513 int RGWUserPubSub::Bucket::remove_notification(const string
& topic_name
)
515 rgw_pubsub_topic_subs user_topic_info
;
516 RGWRados
*store
= ps
->store
;
518 int ret
= ps
->get_topic(topic_name
, &user_topic_info
);
520 ldout(store
->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret
<< dendl
;
524 RGWObjVersionTracker objv_tracker
;
525 rgw_pubsub_bucket_topics bucket_topics
;
527 ret
= read_topics(&bucket_topics
, &objv_tracker
);
529 ldout(store
->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret
<< dendl
;
533 bucket_topics
.topics
.erase(topic_name
);
535 ret
= write_topics(bucket_topics
, &objv_tracker
);
537 ldout(store
->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
544 int RGWUserPubSub::create_topic(const string
& name
) {
545 return create_topic(name
, rgw_pubsub_sub_dest(), "");
548 int RGWUserPubSub::create_topic(const string
& name
, const rgw_pubsub_sub_dest
& dest
, const std::string
& arn
) {
549 RGWObjVersionTracker objv_tracker
;
550 rgw_pubsub_user_topics topics
;
552 int ret
= read_user_topics(&topics
, &objv_tracker
);
553 if (ret
< 0 && ret
!= -ENOENT
) {
554 // its not an error if not topics exist, we create one
555 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
559 rgw_pubsub_topic_subs
& new_topic
= topics
.topics
[name
];
560 new_topic
.topic
.user
= user
;
561 new_topic
.topic
.name
= name
;
562 new_topic
.topic
.dest
= dest
;
563 new_topic
.topic
.arn
= arn
;
565 ret
= write_user_topics(topics
, &objv_tracker
);
567 ldout(store
->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
574 int RGWUserPubSub::remove_topic(const string
& name
)
576 RGWObjVersionTracker objv_tracker
;
577 rgw_pubsub_user_topics topics
;
579 int ret
= read_user_topics(&topics
, &objv_tracker
);
580 if (ret
< 0 && ret
!= -ENOENT
) {
581 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
583 } else if (ret
== -ENOENT
) {
584 // its not an error if no topics exist, just a no-op
585 ldout(store
->ctx(), 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret
<< dendl
;
589 topics
.topics
.erase(name
);
591 ret
= write_user_topics(topics
, &objv_tracker
);
593 ldout(store
->ctx(), 1) << "ERROR: failed to remove topics info: ret=" << ret
<< dendl
;
600 int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config
*result
, RGWObjVersionTracker
*objv_tracker
)
602 int ret
= ps
->read(sub_meta_obj
, result
, objv_tracker
);
603 if (ret
< 0 && ret
!= -ENOENT
) {
604 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret
<< dendl
;
610 int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config
& sub_conf
, RGWObjVersionTracker
*objv_tracker
)
612 int ret
= ps
->write(sub_meta_obj
, sub_conf
, objv_tracker
);
614 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret
<< dendl
;
621 int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker
*objv_tracker
)
623 int ret
= ps
->remove(sub_meta_obj
, objv_tracker
);
625 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to remove subscription info: ret=" << ret
<< dendl
;
632 int RGWUserPubSub::Sub::get_conf(rgw_pubsub_sub_config
*result
)
634 return read_sub(result
, nullptr);
637 int RGWUserPubSub::Sub::subscribe(const string
& topic
, const rgw_pubsub_sub_dest
& dest
, const std::string
& s3_id
)
639 RGWObjVersionTracker user_objv_tracker
;
640 rgw_pubsub_user_topics topics
;
641 RGWRados
*store
= ps
->store
;
643 int ret
= ps
->read_user_topics(&topics
, &user_objv_tracker
);
645 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
646 return ret
!= -ENOENT
? ret
: -EINVAL
;
649 auto iter
= topics
.topics
.find(topic
);
650 if (iter
== topics
.topics
.end()) {
651 ldout(store
->ctx(), 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl
;
655 auto& t
= iter
->second
;
657 rgw_pubsub_sub_config sub_conf
;
659 sub_conf
.user
= ps
->user
;
661 sub_conf
.topic
= topic
;
662 sub_conf
.dest
= dest
;
663 sub_conf
.s3_id
= s3_id
;
667 ret
= ps
->write_user_topics(topics
, &user_objv_tracker
);
669 ldout(store
->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
673 ret
= write_sub(sub_conf
, nullptr);
675 ldout(store
->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret
<< dendl
;
681 int RGWUserPubSub::Sub::unsubscribe(const string
& _topic
)
683 string topic
= _topic
;
684 RGWObjVersionTracker sobjv_tracker
;
685 RGWRados
*store
= ps
->store
;
688 rgw_pubsub_sub_config sub_conf
;
689 int ret
= read_sub(&sub_conf
, &sobjv_tracker
);
691 ldout(store
->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret
<< dendl
;
694 topic
= sub_conf
.topic
;
697 RGWObjVersionTracker objv_tracker
;
698 rgw_pubsub_user_topics topics
;
700 int ret
= ps
->read_user_topics(&topics
, &objv_tracker
);
702 // not an error - could be that topic was already deleted
703 ldout(store
->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret
<< dendl
;
705 auto iter
= topics
.topics
.find(topic
);
706 if (iter
!= topics
.topics
.end()) {
707 auto& t
= iter
->second
;
711 ret
= ps
->write_user_topics(topics
, &objv_tracker
);
713 ldout(store
->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
719 ret
= remove_sub(&sobjv_tracker
);
721 ldout(store
->ctx(), 1) << "ERROR: failed to delete subscription info: ret=" << ret
<< dendl
;
727 template<typename EventType
>
728 void RGWUserPubSub::SubWithEvents
<EventType
>::list_events_result::dump(Formatter
*f
) const
730 encode_json("next_marker", next_marker
, f
);
731 encode_json("is_truncated", is_truncated
, f
);
733 Formatter::ArraySection
s(*f
, EventType::json_type_plural
);
734 for (auto& event
: events
) {
735 encode_json(EventType::json_type_single
, event
, f
);
739 template<typename EventType
>
740 int RGWUserPubSub::SubWithEvents
<EventType
>::list_events(const string
& marker
, int max_events
)
742 RGWRados
*store
= ps
->store
;
743 rgw_pubsub_sub_config sub_conf
;
744 int ret
= get_conf(&sub_conf
);
746 ldout(store
->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret
<< dendl
;
750 RGWBucketInfo bucket_info
;
752 RGWSysObjectCtx
obj_ctx(store
->svc
.sysobj
->init_obj_ctx());
753 ret
= store
->get_bucket_info(obj_ctx
, tenant
, sub_conf
.dest
.bucket_name
, bucket_info
, nullptr, nullptr);
754 if (ret
== -ENOENT
) {
755 list
.is_truncated
= false;
759 ldout(store
->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf
.dest
.bucket_name
<< " ret=" << ret
<< dendl
;
763 RGWRados::Bucket
target(store
, bucket_info
);
764 RGWRados::Bucket::List
list_op(&target
);
766 list_op
.params
.prefix
= sub_conf
.dest
.oid_prefix
;
767 list_op
.params
.marker
= marker
;
769 std::vector
<rgw_bucket_dir_entry
> objs
;
771 ret
= list_op
.list_objects(max_events
, &objs
, nullptr, &list
.is_truncated
);
773 ldout(store
->ctx(), 1) << "ERROR: failed to list bucket: bucket=" << sub_conf
.dest
.bucket_name
<< " ret=" << ret
<< dendl
;
776 if (list
.is_truncated
) {
777 list
.next_marker
= list_op
.get_next_marker().name
;
780 for (auto& obj
: objs
) {
783 bl64
.append(obj
.meta
.user_data
);
785 bl
.decode_base64(bl64
);
786 } catch (buffer::error
& err
) {
787 ldout(store
->ctx(), 1) << "ERROR: failed to event (not a valid base64)" << dendl
;
792 auto iter
= bl
.cbegin();
795 } catch (buffer::error
& err
) {
796 ldout(store
->ctx(), 1) << "ERROR: failed to decode event" << dendl
;
800 list
.events
.push_back(event
);
805 template<typename EventType
>
806 int RGWUserPubSub::SubWithEvents
<EventType
>::remove_event(const string
& event_id
)
808 RGWRados
*store
= ps
->store
;
809 rgw_pubsub_sub_config sub_conf
;
810 int ret
= get_conf(&sub_conf
);
812 ldout(store
->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret
<< dendl
;
816 RGWBucketInfo bucket_info
;
818 RGWSysObjectCtx
sysobj_ctx(store
->svc
.sysobj
->init_obj_ctx());
819 ret
= store
->get_bucket_info(sysobj_ctx
, tenant
, sub_conf
.dest
.bucket_name
, bucket_info
, nullptr, nullptr);
821 ldout(store
->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf
.dest
.bucket_name
<< " ret=" << ret
<< dendl
;
825 rgw_bucket
& bucket
= bucket_info
.bucket
;
827 RGWObjectCtx
obj_ctx(store
);
828 rgw_obj
obj(bucket
, sub_conf
.dest
.oid_prefix
+ event_id
);
830 obj_ctx
.set_atomic(obj
);
832 RGWRados::Object
del_target(store
, bucket_info
, obj_ctx
, obj
);
833 RGWRados::Object::Delete
del_op(&del_target
);
835 del_op
.params
.bucket_owner
= bucket_info
.owner
;
836 del_op
.params
.versioning_status
= bucket_info
.versioning_status();
838 ret
= del_op
.delete_obj();
840 ldout(store
->ctx(), 1) << "ERROR: failed to remove event (obj=" << obj
<< "): ret=" << ret
<< dendl
;
845 template<typename EventType
>
846 void RGWUserPubSub::SubWithEvents
<EventType
>::dump(Formatter
* f
) const {
850 // explicit instantiation for the only two possible types
851 // no need to move implementation to header
852 template class RGWUserPubSub::SubWithEvents
<rgw_pubsub_event
>;
853 template class RGWUserPubSub::SubWithEvents
<rgw_pubsub_s3_record
>;