1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "services/svc_zone.h"
7 #include "rgw_pubsub.h"
11 #include "rgw_pubsub_push.h"
12 #include "rgw_rados.h"
16 #define dout_subsys ceph_subsys_rgw
18 void set_event_id(std::string
& id
, const std::string
& hash
, const utime_t
& ts
) {
20 const auto len
= snprintf(buf
, sizeof(buf
), "%010ld.%06ld.%s", (long)ts
.sec(), (long)ts
.usec(), hash
.c_str());
26 bool rgw_s3_key_filter::decode_xml(XMLObj
* obj
) {
27 XMLObjIter iter
= obj
->find("FilterRule");
30 const auto throw_if_missing
= true;
31 auto prefix_not_set
= true;
32 auto suffix_not_set
= true;
33 auto regex_not_set
= true;
36 while ((o
= iter
.get_next())) {
37 RGWXMLDecoder::decode_xml("Name", name
, o
, throw_if_missing
);
38 if (name
== "prefix" && prefix_not_set
) {
39 prefix_not_set
= false;
40 RGWXMLDecoder::decode_xml("Value", prefix_rule
, o
, throw_if_missing
);
41 } else if (name
== "suffix" && suffix_not_set
) {
42 suffix_not_set
= false;
43 RGWXMLDecoder::decode_xml("Value", suffix_rule
, o
, throw_if_missing
);
44 } else if (name
== "regex" && regex_not_set
) {
45 regex_not_set
= false;
46 RGWXMLDecoder::decode_xml("Value", regex_rule
, o
, throw_if_missing
);
48 throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name
+ "'");
54 void rgw_s3_key_filter::dump_xml(Formatter
*f
) const {
55 if (!prefix_rule
.empty()) {
56 f
->open_object_section("FilterRule");
57 ::encode_xml("Name", "prefix", f
);
58 ::encode_xml("Value", prefix_rule
, f
);
61 if (!suffix_rule
.empty()) {
62 f
->open_object_section("FilterRule");
63 ::encode_xml("Name", "suffix", f
);
64 ::encode_xml("Value", suffix_rule
, f
);
67 if (!regex_rule
.empty()) {
68 f
->open_object_section("FilterRule");
69 ::encode_xml("Name", "regex", f
);
70 ::encode_xml("Value", regex_rule
, f
);
75 bool rgw_s3_key_filter::has_content() const {
76 return !(prefix_rule
.empty() && suffix_rule
.empty() && regex_rule
.empty());
79 bool rgw_s3_key_value_filter::decode_xml(XMLObj
* obj
) {
81 XMLObjIter iter
= obj
->find("FilterRule");
84 const auto throw_if_missing
= true;
89 while ((o
= iter
.get_next())) {
90 RGWXMLDecoder::decode_xml("Name", key
, o
, throw_if_missing
);
91 RGWXMLDecoder::decode_xml("Value", value
, o
, throw_if_missing
);
92 kvl
.emplace(key
, value
);
97 void rgw_s3_key_value_filter::dump_xml(Formatter
*f
) const {
98 for (const auto& key_value
: kvl
) {
99 f
->open_object_section("FilterRule");
100 ::encode_xml("Name", key_value
.first
, f
);
101 ::encode_xml("Value", key_value
.second
, f
);
106 bool rgw_s3_key_value_filter::has_content() const {
110 bool rgw_s3_filter::decode_xml(XMLObj
* obj
) {
111 RGWXMLDecoder::decode_xml("S3Key", key_filter
, obj
);
112 RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter
, obj
);
113 RGWXMLDecoder::decode_xml("S3Tags", tag_filter
, obj
);
117 void rgw_s3_filter::dump_xml(Formatter
*f
) const {
118 if (key_filter
.has_content()) {
119 ::encode_xml("S3Key", key_filter
, f
);
121 if (metadata_filter
.has_content()) {
122 ::encode_xml("S3Metadata", metadata_filter
, f
);
124 if (tag_filter
.has_content()) {
125 ::encode_xml("S3Tags", tag_filter
, f
);
129 bool rgw_s3_filter::has_content() const {
130 return key_filter
.has_content() ||
131 metadata_filter
.has_content() ||
132 tag_filter
.has_content();
135 bool match(const rgw_s3_key_filter
& filter
, const std::string
& key
) {
136 const auto key_size
= key
.size();
137 const auto prefix_size
= filter
.prefix_rule
.size();
138 if (prefix_size
!= 0) {
139 // prefix rule exists
140 if (prefix_size
> key_size
) {
141 // if prefix is longer than key, we fail
144 if (!std::equal(filter
.prefix_rule
.begin(), filter
.prefix_rule
.end(), key
.begin())) {
148 const auto suffix_size
= filter
.suffix_rule
.size();
149 if (suffix_size
!= 0) {
150 // suffix rule exists
151 if (suffix_size
> key_size
) {
152 // if suffix is longer than key, we fail
155 if (!std::equal(filter
.suffix_rule
.begin(), filter
.suffix_rule
.end(), (key
.end() - suffix_size
))) {
159 if (!filter
.regex_rule
.empty()) {
160 // TODO add regex chaching in the filter
161 const std::regex
base_regex(filter
.regex_rule
);
162 if (!std::regex_match(key
, base_regex
)) {
169 bool match(const rgw_s3_key_value_filter
& filter
, const KeyValueList
& kvl
) {
170 // all filter pairs must exist with the same value in the object's metadata/tags
171 // object metadata/tags may include items not in the filter
172 return std::includes(kvl
.begin(), kvl
.end(), filter
.kvl
.begin(), filter
.kvl
.end());
175 bool match(const rgw::notify::EventTypeList
& events
, rgw::notify::EventType event
) {
176 // if event list exists, and none of the events in the list matches the event type, filter the message
177 if (!events
.empty() && std::find(events
.begin(), events
.end(), event
) == events
.end()) {
183 void do_decode_xml_obj(rgw::notify::EventTypeList
& l
, const string
& name
, XMLObj
*obj
) {
186 XMLObjIter iter
= obj
->find(name
);
189 while ((o
= iter
.get_next())) {
191 decode_xml_obj(val
, o
);
192 l
.push_back(rgw::notify::from_string(val
));
196 bool rgw_pubsub_s3_notification::decode_xml(XMLObj
*obj
) {
197 const auto throw_if_missing
= true;
198 RGWXMLDecoder::decode_xml("Id", id
, obj
, throw_if_missing
);
200 RGWXMLDecoder::decode_xml("Topic", topic_arn
, obj
, throw_if_missing
);
202 RGWXMLDecoder::decode_xml("Filter", filter
, obj
);
204 do_decode_xml_obj(events
, "Event", obj
);
205 if (events
.empty()) {
206 // if no events are provided, we assume all events
207 events
.push_back(rgw::notify::ObjectCreated
);
208 events
.push_back(rgw::notify::ObjectRemoved
);
213 void rgw_pubsub_s3_notification::dump_xml(Formatter
*f
) const {
214 ::encode_xml("Id", id
, f
);
215 ::encode_xml("Topic", topic_arn
.c_str(), f
);
216 if (filter
.has_content()) {
217 ::encode_xml("Filter", filter
, f
);
219 for (const auto& event
: events
) {
220 ::encode_xml("Event", rgw::notify::to_string(event
), f
);
224 bool rgw_pubsub_s3_notifications::decode_xml(XMLObj
*obj
) {
225 do_decode_xml_obj(list
, "TopicConfiguration", obj
);
227 throw RGWXMLDecoder::err("at least one 'TopicConfiguration' must exist");
232 rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter
& topic_filter
) :
233 id(topic_filter
.s3_id
), events(topic_filter
.events
), topic_arn(topic_filter
.topic
.arn
), filter(topic_filter
.s3_filter
) {}
235 void rgw_pubsub_s3_notifications::dump_xml(Formatter
*f
) const {
236 do_encode_xml("NotificationConfiguration", list
, "TopicConfiguration", f
);
239 void rgw_pubsub_s3_record::dump(Formatter
*f
) const {
240 encode_json("eventVersion", eventVersion
, f
);
241 encode_json("eventSource", eventSource
, f
);
242 encode_json("awsRegion", awsRegion
, f
);
243 utime_t
ut(eventTime
);
244 encode_json("eventTime", ut
, f
);
245 encode_json("eventName", eventName
, f
);
247 Formatter::ObjectSection
s(*f
, "userIdentity");
248 encode_json("principalId", userIdentity
, f
);
251 Formatter::ObjectSection
s(*f
, "requestParameters");
252 encode_json("sourceIPAddress", sourceIPAddress
, f
);
255 Formatter::ObjectSection
s(*f
, "responseElements");
256 encode_json("x-amz-request-id", x_amz_request_id
, f
);
257 encode_json("x-amz-id-2", x_amz_id_2
, f
);
260 Formatter::ObjectSection
s(*f
, "s3");
261 encode_json("s3SchemaVersion", s3SchemaVersion
, f
);
262 encode_json("configurationId", configurationId
, f
);
264 Formatter::ObjectSection
sub_s(*f
, "bucket");
265 encode_json("name", bucket_name
, f
);
267 Formatter::ObjectSection
sub_sub_s(*f
, "ownerIdentity");
268 encode_json("principalId", bucket_ownerIdentity
, f
);
270 encode_json("arn", bucket_arn
, f
);
271 encode_json("id", bucket_id
, f
);
274 Formatter::ObjectSection
sub_s(*f
, "object");
275 encode_json("key", object_key
, f
);
276 encode_json("size", object_size
, f
);
277 encode_json("etag", object_etag
, f
);
278 encode_json("versionId", object_versionId
, f
);
279 encode_json("sequencer", object_sequencer
, f
);
280 encode_json("metadata", x_meta_map
, f
);
281 encode_json("tags", tags
, f
);
284 encode_json("eventId", id
, f
);
285 encode_json("opaqueData", opaque_data
, f
);
288 void rgw_pubsub_event::dump(Formatter
*f
) const
290 encode_json("id", id
, f
);
291 encode_json("event", event_name
, f
);
292 utime_t
ut(timestamp
);
293 encode_json("timestamp", ut
, f
);
294 encode_json("info", info
, f
);
297 void rgw_pubsub_topic::dump(Formatter
*f
) const
299 encode_json("user", user
, f
);
300 encode_json("name", name
, f
);
301 encode_json("dest", dest
, f
);
302 encode_json("arn", arn
, f
);
303 encode_json("opaqueData", opaque_data
, f
);
306 void rgw_pubsub_topic::dump_xml(Formatter
*f
) const
308 encode_xml("User", user
, f
);
309 encode_xml("Name", name
, f
);
310 encode_xml("EndPoint", dest
, f
);
311 encode_xml("TopicArn", arn
, f
);
312 encode_xml("OpaqueData", opaque_data
, f
);
315 void encode_json(const char *name
, const rgw::notify::EventTypeList
& l
, Formatter
*f
)
317 f
->open_array_section(name
);
318 for (auto iter
= l
.cbegin(); iter
!= l
.cend(); ++iter
) {
319 f
->dump_string("obj", rgw::notify::to_ceph_string(*iter
));
324 void rgw_pubsub_topic_filter::dump(Formatter
*f
) const
326 encode_json("topic", topic
, f
);
327 encode_json("events", events
, f
);
330 void rgw_pubsub_topic_subs::dump(Formatter
*f
) const
332 encode_json("topic", topic
, f
);
333 encode_json("subs", subs
, f
);
336 void rgw_pubsub_bucket_topics::dump(Formatter
*f
) const
338 Formatter::ArraySection
s(*f
, "topics");
339 for (auto& t
: topics
) {
340 encode_json(t
.first
.c_str(), t
.second
, f
);
344 void rgw_pubsub_user_topics::dump(Formatter
*f
) const
346 Formatter::ArraySection
s(*f
, "topics");
347 for (auto& t
: topics
) {
348 encode_json(t
.first
.c_str(), t
.second
, f
);
352 void rgw_pubsub_user_topics::dump_xml(Formatter
*f
) const
354 for (auto& t
: topics
) {
355 encode_xml("member", t
.second
.topic
, f
);
359 void rgw_pubsub_sub_dest::dump(Formatter
*f
) const
361 encode_json("bucket_name", bucket_name
, f
);
362 encode_json("oid_prefix", oid_prefix
, f
);
363 encode_json("push_endpoint", push_endpoint
, f
);
364 encode_json("push_endpoint_args", push_endpoint_args
, f
);
365 encode_json("push_endpoint_topic", arn_topic
, f
);
368 void rgw_pubsub_sub_dest::dump_xml(Formatter
*f
) const
370 encode_xml("EndpointAddress", push_endpoint
, f
);
371 encode_xml("EndpointArgs", push_endpoint_args
, f
);
372 encode_xml("EndpointTopic", arn_topic
, f
);
375 void rgw_pubsub_sub_config::dump(Formatter
*f
) const
377 encode_json("user", user
, f
);
378 encode_json("name", name
, f
);
379 encode_json("topic", topic
, f
);
380 encode_json("dest", dest
, f
);
381 encode_json("s3_id", s3_id
, f
);
384 RGWUserPubSub::RGWUserPubSub(rgw::sal::RGWRadosStore
* _store
, const rgw_user
& _user
) :
387 obj_ctx(store
->svc()->sysobj
->init_obj_ctx()) {
388 get_user_meta_obj(&user_meta_obj
);
391 int RGWUserPubSub::remove(const rgw_raw_obj
& obj
, RGWObjVersionTracker
*objv_tracker
)
393 int ret
= rgw_delete_system_obj(store
->svc()->sysobj
, obj
.pool
, obj
.oid
, objv_tracker
);
401 int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics
*result
, RGWObjVersionTracker
*objv_tracker
)
403 int ret
= read(user_meta_obj
, result
, objv_tracker
);
405 ldout(store
->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret
<< dendl
;
411 int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics
& topics
, RGWObjVersionTracker
*objv_tracker
)
413 int ret
= write(user_meta_obj
, topics
, objv_tracker
);
414 if (ret
< 0 && ret
!= -ENOENT
) {
415 ldout(store
->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
421 int RGWUserPubSub::get_user_topics(rgw_pubsub_user_topics
*result
)
423 return read_user_topics(result
, nullptr);
426 int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics
*result
, RGWObjVersionTracker
*objv_tracker
)
428 int ret
= ps
->read(bucket_meta_obj
, result
, objv_tracker
);
429 if (ret
< 0 && ret
!= -ENOENT
) {
430 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret
<< dendl
;
436 int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics
& topics
, RGWObjVersionTracker
*objv_tracker
)
438 int ret
= ps
->write(bucket_meta_obj
, topics
, objv_tracker
);
440 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret
<< dendl
;
447 int RGWUserPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics
*result
)
449 return read_topics(result
, nullptr);
452 int RGWUserPubSub::get_topic(const string
& name
, rgw_pubsub_topic_subs
*result
)
454 rgw_pubsub_user_topics topics
;
455 int ret
= get_user_topics(&topics
);
457 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
461 auto iter
= topics
.topics
.find(name
);
462 if (iter
== topics
.topics
.end()) {
463 ldout(store
->ctx(), 1) << "ERROR: topic not found" << dendl
;
467 *result
= iter
->second
;
471 int RGWUserPubSub::get_topic(const string
& name
, rgw_pubsub_topic
*result
)
473 rgw_pubsub_user_topics topics
;
474 int ret
= get_user_topics(&topics
);
476 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
480 auto iter
= topics
.topics
.find(name
);
481 if (iter
== topics
.topics
.end()) {
482 ldout(store
->ctx(), 1) << "ERROR: topic not found" << dendl
;
486 *result
= iter
->second
.topic
;
490 int RGWUserPubSub::Bucket::create_notification(const string
& topic_name
, const rgw::notify::EventTypeList
& events
) {
491 return create_notification(topic_name
, events
, std::nullopt
, "");
494 int RGWUserPubSub::Bucket::create_notification(const string
& topic_name
, const rgw::notify::EventTypeList
& events
, OptionalFilter s3_filter
, const std::string
& notif_name
) {
495 rgw_pubsub_topic_subs user_topic_info
;
496 rgw::sal::RGWRadosStore
*store
= ps
->store
;
498 int ret
= ps
->get_topic(topic_name
, &user_topic_info
);
500 ldout(store
->ctx(), 1) << "ERROR: failed to read topic '" << topic_name
<< "' info: ret=" << ret
<< dendl
;
503 ldout(store
->ctx(), 20) << "successfully read topic '" << topic_name
<< "' info" << dendl
;
505 RGWObjVersionTracker objv_tracker
;
506 rgw_pubsub_bucket_topics bucket_topics
;
508 ret
= read_topics(&bucket_topics
, &objv_tracker
);
510 ldout(store
->ctx(), 1) << "ERROR: failed to read topics from bucket '" <<
511 bucket
.name
<< "': ret=" << ret
<< dendl
;
514 ldout(store
->ctx(), 20) << "successfully read " << bucket_topics
.topics
.size() << " topics from bucket '" <<
515 bucket
.name
<< "'" << dendl
;
517 auto& topic_filter
= bucket_topics
.topics
[topic_name
];
518 topic_filter
.topic
= user_topic_info
.topic
;
519 topic_filter
.events
= events
;
520 topic_filter
.s3_id
= notif_name
;
522 topic_filter
.s3_filter
= *s3_filter
;
525 ret
= write_topics(bucket_topics
, &objv_tracker
);
527 ldout(store
->ctx(), 1) << "ERROR: failed to write topics to bucket '" << bucket
.name
<< "': ret=" << ret
<< dendl
;
531 ldout(store
->ctx(), 20) << "successfully wrote " << bucket_topics
.topics
.size() << " topics to bucket '" << bucket
.name
<< "'" << dendl
;
536 int RGWUserPubSub::Bucket::remove_notification(const string
& topic_name
)
538 rgw_pubsub_topic_subs user_topic_info
;
539 rgw::sal::RGWRadosStore
*store
= ps
->store
;
541 int ret
= ps
->get_topic(topic_name
, &user_topic_info
);
543 ldout(store
->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret
<< dendl
;
547 RGWObjVersionTracker objv_tracker
;
548 rgw_pubsub_bucket_topics bucket_topics
;
550 ret
= read_topics(&bucket_topics
, &objv_tracker
);
552 ldout(store
->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret
<< dendl
;
556 bucket_topics
.topics
.erase(topic_name
);
558 ret
= write_topics(bucket_topics
, &objv_tracker
);
560 ldout(store
->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
567 int RGWUserPubSub::create_topic(const string
& name
) {
568 return create_topic(name
, rgw_pubsub_sub_dest(), "", "");
571 int RGWUserPubSub::create_topic(const string
& name
, const rgw_pubsub_sub_dest
& dest
, const std::string
& arn
, const std::string
& opaque_data
) {
572 RGWObjVersionTracker objv_tracker
;
573 rgw_pubsub_user_topics topics
;
575 int ret
= read_user_topics(&topics
, &objv_tracker
);
576 if (ret
< 0 && ret
!= -ENOENT
) {
577 // its not an error if not topics exist, we create one
578 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
582 rgw_pubsub_topic_subs
& new_topic
= topics
.topics
[name
];
583 new_topic
.topic
.user
= user
;
584 new_topic
.topic
.name
= name
;
585 new_topic
.topic
.dest
= dest
;
586 new_topic
.topic
.arn
= arn
;
587 new_topic
.topic
.opaque_data
= opaque_data
;
589 ret
= write_user_topics(topics
, &objv_tracker
);
591 ldout(store
->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
598 int RGWUserPubSub::remove_topic(const string
& name
)
600 RGWObjVersionTracker objv_tracker
;
601 rgw_pubsub_user_topics topics
;
603 int ret
= read_user_topics(&topics
, &objv_tracker
);
604 if (ret
< 0 && ret
!= -ENOENT
) {
605 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
607 } else if (ret
== -ENOENT
) {
608 // its not an error if no topics exist, just a no-op
609 ldout(store
->ctx(), 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret
<< dendl
;
613 topics
.topics
.erase(name
);
615 ret
= write_user_topics(topics
, &objv_tracker
);
617 ldout(store
->ctx(), 1) << "ERROR: failed to remove topics info: ret=" << ret
<< dendl
;
624 int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config
*result
, RGWObjVersionTracker
*objv_tracker
)
626 int ret
= ps
->read(sub_meta_obj
, result
, objv_tracker
);
627 if (ret
< 0 && ret
!= -ENOENT
) {
628 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret
<< dendl
;
634 int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config
& sub_conf
, RGWObjVersionTracker
*objv_tracker
)
636 int ret
= ps
->write(sub_meta_obj
, sub_conf
, objv_tracker
);
638 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret
<< dendl
;
645 int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker
*objv_tracker
)
647 int ret
= ps
->remove(sub_meta_obj
, objv_tracker
);
649 ldout(ps
->store
->ctx(), 1) << "ERROR: failed to remove subscription info: ret=" << ret
<< dendl
;
656 int RGWUserPubSub::Sub::get_conf(rgw_pubsub_sub_config
*result
)
658 return read_sub(result
, nullptr);
661 int RGWUserPubSub::Sub::subscribe(const string
& topic
, const rgw_pubsub_sub_dest
& dest
, const std::string
& s3_id
)
663 RGWObjVersionTracker user_objv_tracker
;
664 rgw_pubsub_user_topics topics
;
665 rgw::sal::RGWRadosStore
*store
= ps
->store
;
667 int ret
= ps
->read_user_topics(&topics
, &user_objv_tracker
);
669 ldout(store
->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret
<< dendl
;
670 return ret
!= -ENOENT
? ret
: -EINVAL
;
673 auto iter
= topics
.topics
.find(topic
);
674 if (iter
== topics
.topics
.end()) {
675 ldout(store
->ctx(), 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl
;
679 auto& t
= iter
->second
;
681 rgw_pubsub_sub_config sub_conf
;
683 sub_conf
.user
= ps
->user
;
685 sub_conf
.topic
= topic
;
686 sub_conf
.dest
= dest
;
687 sub_conf
.s3_id
= s3_id
;
691 ret
= ps
->write_user_topics(topics
, &user_objv_tracker
);
693 ldout(store
->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
697 ret
= write_sub(sub_conf
, nullptr);
699 ldout(store
->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret
<< dendl
;
705 int RGWUserPubSub::Sub::unsubscribe(const string
& _topic
)
707 string topic
= _topic
;
708 RGWObjVersionTracker sobjv_tracker
;
709 rgw::sal::RGWRadosStore
*store
= ps
->store
;
712 rgw_pubsub_sub_config sub_conf
;
713 int ret
= read_sub(&sub_conf
, &sobjv_tracker
);
715 ldout(store
->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret
<< dendl
;
718 topic
= sub_conf
.topic
;
721 RGWObjVersionTracker objv_tracker
;
722 rgw_pubsub_user_topics topics
;
724 int ret
= ps
->read_user_topics(&topics
, &objv_tracker
);
726 // not an error - could be that topic was already deleted
727 ldout(store
->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret
<< dendl
;
729 auto iter
= topics
.topics
.find(topic
);
730 if (iter
!= topics
.topics
.end()) {
731 auto& t
= iter
->second
;
735 ret
= ps
->write_user_topics(topics
, &objv_tracker
);
737 ldout(store
->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret
<< dendl
;
743 ret
= remove_sub(&sobjv_tracker
);
745 ldout(store
->ctx(), 1) << "ERROR: failed to delete subscription info: ret=" << ret
<< dendl
;
751 template<typename EventType
>
752 void RGWUserPubSub::SubWithEvents
<EventType
>::list_events_result::dump(Formatter
*f
) const
754 encode_json("next_marker", next_marker
, f
);
755 encode_json("is_truncated", is_truncated
, f
);
757 Formatter::ArraySection
s(*f
, EventType::json_type_plural
);
758 for (auto& event
: events
) {
759 encode_json("", event
, f
);
763 template<typename EventType
>
764 int RGWUserPubSub::SubWithEvents
<EventType
>::list_events(const string
& marker
, int max_events
)
766 RGWRados
*store
= ps
->store
->getRados();
767 rgw_pubsub_sub_config sub_conf
;
768 int ret
= get_conf(&sub_conf
);
770 ldout(store
->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret
<< dendl
;
774 RGWBucketInfo bucket_info
;
776 ret
= store
->get_bucket_info(&store
->svc
, tenant
, sub_conf
.dest
.bucket_name
, bucket_info
, nullptr, null_yield
, nullptr);
777 if (ret
== -ENOENT
) {
778 list
.is_truncated
= false;
782 ldout(store
->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf
.dest
.bucket_name
<< " ret=" << ret
<< dendl
;
786 RGWRados::Bucket
target(store
, bucket_info
);
787 RGWRados::Bucket::List
list_op(&target
);
789 list_op
.params
.prefix
= sub_conf
.dest
.oid_prefix
;
790 list_op
.params
.marker
= marker
;
792 std::vector
<rgw_bucket_dir_entry
> objs
;
794 ret
= list_op
.list_objects(max_events
, &objs
, nullptr, &list
.is_truncated
, null_yield
);
796 ldout(store
->ctx(), 1) << "ERROR: failed to list bucket: bucket=" << sub_conf
.dest
.bucket_name
<< " ret=" << ret
<< dendl
;
799 if (list
.is_truncated
) {
800 list
.next_marker
= list_op
.get_next_marker().name
;
803 for (auto& obj
: objs
) {
806 bl64
.append(obj
.meta
.user_data
);
808 bl
.decode_base64(bl64
);
809 } catch (buffer::error
& err
) {
810 ldout(store
->ctx(), 1) << "ERROR: failed to event (not a valid base64)" << dendl
;
815 auto iter
= bl
.cbegin();
818 } catch (buffer::error
& err
) {
819 ldout(store
->ctx(), 1) << "ERROR: failed to decode event" << dendl
;
823 list
.events
.push_back(event
);
828 template<typename EventType
>
829 int RGWUserPubSub::SubWithEvents
<EventType
>::remove_event(const string
& event_id
)
831 rgw::sal::RGWRadosStore
*store
= ps
->store
;
832 rgw_pubsub_sub_config sub_conf
;
833 int ret
= get_conf(&sub_conf
);
835 ldout(store
->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret
<< dendl
;
839 RGWBucketInfo bucket_info
;
841 ret
= store
->getRados()->get_bucket_info(store
->svc(), tenant
, sub_conf
.dest
.bucket_name
, bucket_info
, nullptr, null_yield
, nullptr);
843 ldout(store
->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf
.dest
.bucket_name
<< " ret=" << ret
<< dendl
;
847 rgw_bucket
& bucket
= bucket_info
.bucket
;
849 RGWObjectCtx
obj_ctx(store
);
850 rgw_obj
obj(bucket
, sub_conf
.dest
.oid_prefix
+ event_id
);
852 obj_ctx
.set_atomic(obj
);
854 RGWRados::Object
del_target(store
->getRados(), bucket_info
, obj_ctx
, obj
);
855 RGWRados::Object::Delete
del_op(&del_target
);
857 del_op
.params
.bucket_owner
= bucket_info
.owner
;
858 del_op
.params
.versioning_status
= bucket_info
.versioning_status();
860 ret
= del_op
.delete_obj(null_yield
);
862 ldout(store
->ctx(), 1) << "ERROR: failed to remove event (obj=" << obj
<< "): ret=" << ret
<< dendl
;
867 void RGWUserPubSub::get_user_meta_obj(rgw_raw_obj
*obj
) const {
868 *obj
= rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, user_meta_oid());
871 void RGWUserPubSub::get_bucket_meta_obj(const rgw_bucket
& bucket
, rgw_raw_obj
*obj
) const {
872 *obj
= rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, bucket_meta_oid(bucket
));
875 void RGWUserPubSub::get_sub_meta_obj(const string
& name
, rgw_raw_obj
*obj
) const {
876 *obj
= rgw_raw_obj(store
->svc()->zone
->get_zone_params().log_pool
, sub_meta_oid(name
));
879 template<typename EventType
>
880 void RGWUserPubSub::SubWithEvents
<EventType
>::dump(Formatter
* f
) const {
884 // explicit instantiation for the only two possible types
885 // no need to move implementation to header
886 template class RGWUserPubSub::SubWithEvents
<rgw_pubsub_event
>;
887 template class RGWUserPubSub::SubWithEvents
<rgw_pubsub_s3_record
>;