]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_pubsub.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / rgw / rgw_pubsub.cc
CommitLineData
eafe8130
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4#include "services/svc_zone.h"
11fdf7f2 5#include "rgw_b64.h"
9f95a23c 6#include "rgw_sal.h"
11fdf7f2
TL
7#include "rgw_pubsub.h"
8#include "rgw_tools.h"
eafe8130
TL
9#include "rgw_xml.h"
10#include "rgw_arn.h"
11#include "rgw_pubsub_push.h"
eafe8130
TL
12#include <regex>
13#include <algorithm>
11fdf7f2
TL
14
15#define dout_subsys ceph_subsys_rgw
16
92f5a8d4
TL
17void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) {
18 char buf[64];
19 const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str());
20 if (len > 0) {
21 id.assign(buf, len);
22 }
23}
24
aee94f69
TL
25void rgw_s3_key_filter::dump(Formatter *f) const {
26 if (!prefix_rule.empty()) {
27 f->open_object_section("FilterRule");
28 ::encode_json("Name", "prefix", f);
29 ::encode_json("Value", prefix_rule, f);
30 f->close_section();
31 }
32 if (!suffix_rule.empty()) {
33 f->open_object_section("FilterRule");
34 ::encode_json("Name", "suffix", f);
35 ::encode_json("Value", suffix_rule, f);
36 f->close_section();
37 }
38 if (!regex_rule.empty()) {
39 f->open_object_section("FilterRule");
40 ::encode_json("Name", "regex", f);
41 ::encode_json("Value", regex_rule, f);
42 f->close_section();
43 }
44}
45
eafe8130
TL
46bool rgw_s3_key_filter::decode_xml(XMLObj* obj) {
47 XMLObjIter iter = obj->find("FilterRule");
48 XMLObj *o;
49
50 const auto throw_if_missing = true;
51 auto prefix_not_set = true;
52 auto suffix_not_set = true;
53 auto regex_not_set = true;
54 std::string name;
55
56 while ((o = iter.get_next())) {
57 RGWXMLDecoder::decode_xml("Name", name, o, throw_if_missing);
58 if (name == "prefix" && prefix_not_set) {
59 prefix_not_set = false;
60 RGWXMLDecoder::decode_xml("Value", prefix_rule, o, throw_if_missing);
61 } else if (name == "suffix" && suffix_not_set) {
62 suffix_not_set = false;
63 RGWXMLDecoder::decode_xml("Value", suffix_rule, o, throw_if_missing);
64 } else if (name == "regex" && regex_not_set) {
65 regex_not_set = false;
66 RGWXMLDecoder::decode_xml("Value", regex_rule, o, throw_if_missing);
67 } else {
68 throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name + "'");
69 }
70 }
71 return true;
72}
73
74void rgw_s3_key_filter::dump_xml(Formatter *f) const {
75 if (!prefix_rule.empty()) {
76 f->open_object_section("FilterRule");
77 ::encode_xml("Name", "prefix", f);
78 ::encode_xml("Value", prefix_rule, f);
79 f->close_section();
80 }
81 if (!suffix_rule.empty()) {
82 f->open_object_section("FilterRule");
83 ::encode_xml("Name", "suffix", f);
84 ::encode_xml("Value", suffix_rule, f);
85 f->close_section();
86 }
87 if (!regex_rule.empty()) {
88 f->open_object_section("FilterRule");
89 ::encode_xml("Name", "regex", f);
90 ::encode_xml("Value", regex_rule, f);
91 f->close_section();
92 }
93}
94
95bool rgw_s3_key_filter::has_content() const {
96 return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty());
97}
98
aee94f69
TL
99void rgw_s3_key_value_filter::dump(Formatter *f) const {
100 for (const auto& key_value : kv) {
101 f->open_object_section("FilterRule");
102 ::encode_json("Name", key_value.first, f);
103 ::encode_json("Value", key_value.second, f);
104 f->close_section();
105 }
106}
107
9f95a23c 108bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) {
f67539c2 109 kv.clear();
eafe8130
TL
110 XMLObjIter iter = obj->find("FilterRule");
111 XMLObj *o;
112
113 const auto throw_if_missing = true;
114
115 std::string key;
116 std::string value;
117
118 while ((o = iter.get_next())) {
119 RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing);
120 RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing);
f67539c2 121 kv.emplace(key, value);
eafe8130
TL
122 }
123 return true;
124}
125
9f95a23c 126void rgw_s3_key_value_filter::dump_xml(Formatter *f) const {
f67539c2 127 for (const auto& key_value : kv) {
eafe8130
TL
128 f->open_object_section("FilterRule");
129 ::encode_xml("Name", key_value.first, f);
130 ::encode_xml("Value", key_value.second, f);
131 f->close_section();
132 }
133}
134
9f95a23c 135bool rgw_s3_key_value_filter::has_content() const {
f67539c2 136 return !kv.empty();
eafe8130
TL
137}
138
aee94f69
TL
139void rgw_s3_filter::dump(Formatter *f) const {
140 encode_json("S3Key", key_filter, f);
141 encode_json("S3Metadata", metadata_filter, f);
142 encode_json("S3Tags", tag_filter, f);
143}
144
eafe8130
TL
145bool rgw_s3_filter::decode_xml(XMLObj* obj) {
146 RGWXMLDecoder::decode_xml("S3Key", key_filter, obj);
147 RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj);
9f95a23c 148 RGWXMLDecoder::decode_xml("S3Tags", tag_filter, obj);
eafe8130
TL
149 return true;
150}
151
152void rgw_s3_filter::dump_xml(Formatter *f) const {
153 if (key_filter.has_content()) {
154 ::encode_xml("S3Key", key_filter, f);
155 }
156 if (metadata_filter.has_content()) {
157 ::encode_xml("S3Metadata", metadata_filter, f);
158 }
9f95a23c
TL
159 if (tag_filter.has_content()) {
160 ::encode_xml("S3Tags", tag_filter, f);
161 }
eafe8130
TL
162}
163
164bool rgw_s3_filter::has_content() const {
165 return key_filter.has_content() ||
9f95a23c
TL
166 metadata_filter.has_content() ||
167 tag_filter.has_content();
eafe8130
TL
168}
169
170bool match(const rgw_s3_key_filter& filter, const std::string& key) {
171 const auto key_size = key.size();
172 const auto prefix_size = filter.prefix_rule.size();
173 if (prefix_size != 0) {
174 // prefix rule exists
175 if (prefix_size > key_size) {
176 // if prefix is longer than key, we fail
177 return false;
178 }
179 if (!std::equal(filter.prefix_rule.begin(), filter.prefix_rule.end(), key.begin())) {
180 return false;
181 }
182 }
183 const auto suffix_size = filter.suffix_rule.size();
184 if (suffix_size != 0) {
185 // suffix rule exists
186 if (suffix_size > key_size) {
187 // if suffix is longer than key, we fail
188 return false;
189 }
190 if (!std::equal(filter.suffix_rule.begin(), filter.suffix_rule.end(), (key.end() - suffix_size))) {
191 return false;
192 }
193 }
194 if (!filter.regex_rule.empty()) {
195 // TODO add regex chaching in the filter
196 const std::regex base_regex(filter.regex_rule);
197 if (!std::regex_match(key, base_regex)) {
198 return false;
199 }
200 }
201 return true;
202}
203
f67539c2 204bool match(const rgw_s3_key_value_filter& filter, const KeyValueMap& kv) {
9f95a23c
TL
205 // all filter pairs must exist with the same value in the object's metadata/tags
206 // object metadata/tags may include items not in the filter
f67539c2 207 return std::includes(kv.begin(), kv.end(), filter.kv.begin(), filter.kv.end());
eafe8130
TL
208}
209
20effc67
TL
210bool match(const rgw_s3_key_value_filter& filter, const KeyMultiValueMap& kv) {
211 // all filter pairs must exist with the same value in the object's metadata/tags
212 // object metadata/tags may include items not in the filter
213 for (auto& filter : filter.kv) {
214 auto result = kv.equal_range(filter.first);
1e59de90 215 if (std::any_of(result.first, result.second, [&filter](const std::pair<std::string, std::string>& p) { return p.second == filter.second;}))
20effc67
TL
216 continue;
217 else
218 return false;
219 }
220 return true;
221}
222
eafe8130
TL
223bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) {
224 // if event list exists, and none of the events in the list matches the event type, filter the message
225 if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) {
226 return false;
227 }
228 return true;
229}
230
1e59de90 231void do_decode_xml_obj(rgw::notify::EventTypeList& l, const std::string& name, XMLObj *obj) {
eafe8130
TL
232 l.clear();
233
234 XMLObjIter iter = obj->find(name);
235 XMLObj *o;
236
237 while ((o = iter.get_next())) {
238 std::string val;
239 decode_xml_obj(val, o);
240 l.push_back(rgw::notify::from_string(val));
241 }
242}
243
244bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) {
245 const auto throw_if_missing = true;
246 RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing);
247
248 RGWXMLDecoder::decode_xml("Topic", topic_arn, obj, throw_if_missing);
249
250 RGWXMLDecoder::decode_xml("Filter", filter, obj);
251
252 do_decode_xml_obj(events, "Event", obj);
253 if (events.empty()) {
254 // if no events are provided, we assume all events
255 events.push_back(rgw::notify::ObjectCreated);
256 events.push_back(rgw::notify::ObjectRemoved);
257 }
258 return true;
259}
260
261void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const {
262 ::encode_xml("Id", id, f);
263 ::encode_xml("Topic", topic_arn.c_str(), f);
264 if (filter.has_content()) {
265 ::encode_xml("Filter", filter, f);
266 }
267 for (const auto& event : events) {
268 ::encode_xml("Event", rgw::notify::to_string(event), f);
269 }
270}
271
272bool rgw_pubsub_s3_notifications::decode_xml(XMLObj *obj) {
273 do_decode_xml_obj(list, "TopicConfiguration", obj);
eafe8130
TL
274 return true;
275}
276
277rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter) :
278 id(topic_filter.s3_id), events(topic_filter.events), topic_arn(topic_filter.topic.arn), filter(topic_filter.s3_filter) {}
279
280void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const {
281 do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f);
282}
283
f67539c2 284void rgw_pubsub_s3_event::dump(Formatter *f) const {
eafe8130
TL
285 encode_json("eventVersion", eventVersion, f);
286 encode_json("eventSource", eventSource, f);
287 encode_json("awsRegion", awsRegion, f);
288 utime_t ut(eventTime);
289 encode_json("eventTime", ut, f);
290 encode_json("eventName", eventName, f);
291 {
292 Formatter::ObjectSection s(*f, "userIdentity");
293 encode_json("principalId", userIdentity, f);
294 }
295 {
296 Formatter::ObjectSection s(*f, "requestParameters");
297 encode_json("sourceIPAddress", sourceIPAddress, f);
298 }
299 {
300 Formatter::ObjectSection s(*f, "responseElements");
301 encode_json("x-amz-request-id", x_amz_request_id, f);
302 encode_json("x-amz-id-2", x_amz_id_2, f);
303 }
304 {
305 Formatter::ObjectSection s(*f, "s3");
306 encode_json("s3SchemaVersion", s3SchemaVersion, f);
307 encode_json("configurationId", configurationId, f);
308 {
309 Formatter::ObjectSection sub_s(*f, "bucket");
310 encode_json("name", bucket_name, f);
311 {
312 Formatter::ObjectSection sub_sub_s(*f, "ownerIdentity");
313 encode_json("principalId", bucket_ownerIdentity, f);
314 }
315 encode_json("arn", bucket_arn, f);
316 encode_json("id", bucket_id, f);
317 }
318 {
319 Formatter::ObjectSection sub_s(*f, "object");
320 encode_json("key", object_key, f);
321 encode_json("size", object_size, f);
a4b75251 322 encode_json("eTag", object_etag, f);
eafe8130
TL
323 encode_json("versionId", object_versionId, f);
324 encode_json("sequencer", object_sequencer, f);
325 encode_json("metadata", x_meta_map, f);
9f95a23c 326 encode_json("tags", tags, f);
eafe8130
TL
327 }
328 }
329 encode_json("eventId", id, f);
9f95a23c 330 encode_json("opaqueData", opaque_data, f);
eafe8130 331}
11fdf7f2 332
11fdf7f2
TL
333void rgw_pubsub_topic::dump(Formatter *f) const
334{
335 encode_json("user", user, f);
336 encode_json("name", name, f);
eafe8130
TL
337 encode_json("dest", dest, f);
338 encode_json("arn", arn, f);
9f95a23c 339 encode_json("opaqueData", opaque_data, f);
eafe8130
TL
340}
341
342void rgw_pubsub_topic::dump_xml(Formatter *f) const
343{
344 encode_xml("User", user, f);
345 encode_xml("Name", name, f);
346 encode_xml("EndPoint", dest, f);
347 encode_xml("TopicArn", arn, f);
9f95a23c 348 encode_xml("OpaqueData", opaque_data, f);
eafe8130
TL
349}
350
f67539c2
TL
351void encode_xml_key_value_entry(const std::string& key, const std::string& value, Formatter *f) {
352 f->open_object_section("entry");
353 encode_xml("key", key, f);
354 encode_xml("value", value, f);
355 f->close_section(); // entry
356}
357
358void rgw_pubsub_topic::dump_xml_as_attributes(Formatter *f) const
359{
360 f->open_array_section("Attributes");
361 std::string str_user;
362 user.to_str(str_user);
363 encode_xml_key_value_entry("User", str_user, f);
364 encode_xml_key_value_entry("Name", name, f);
365 encode_xml_key_value_entry("EndPoint", dest.to_json_str(), f);
366 encode_xml_key_value_entry("TopicArn", arn, f);
367 encode_xml_key_value_entry("OpaqueData", opaque_data, f);
368 f->close_section(); // Attributes
369}
370
eafe8130
TL
371void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f)
372{
373 f->open_array_section(name);
374 for (auto iter = l.cbegin(); iter != l.cend(); ++iter) {
1e59de90 375 f->dump_string("obj", rgw::notify::to_string(*iter));
eafe8130
TL
376 }
377 f->close_section();
11fdf7f2
TL
378}
379
380void rgw_pubsub_topic_filter::dump(Formatter *f) const
381{
aee94f69
TL
382 encode_json("TopicArn", topic.arn, f);
383 encode_json("Id", s3_id, f);
384 encode_json("Events", events, f);
385 encode_json("Filter", s3_filter, f);
11fdf7f2
TL
386}
387
11fdf7f2
TL
388void rgw_pubsub_bucket_topics::dump(Formatter *f) const
389{
aee94f69 390 Formatter::ArraySection s(*f, "notifications");
11fdf7f2
TL
391 for (auto& t : topics) {
392 encode_json(t.first.c_str(), t.second, f);
393 }
394}
395
f67539c2 396void rgw_pubsub_topics::dump(Formatter *f) const
11fdf7f2
TL
397{
398 Formatter::ArraySection s(*f, "topics");
399 for (auto& t : topics) {
aee94f69
TL
400 auto& topic = t.second;
401 if (topic.name == topic.dest.arn_topic) {
402 encode_json(t.first.c_str(), topic, f);
403 }
11fdf7f2
TL
404 }
405}
406
f67539c2 407void rgw_pubsub_topics::dump_xml(Formatter *f) const
eafe8130
TL
408{
409 for (auto& t : topics) {
1e59de90 410 encode_xml("member", t.second, f);
eafe8130
TL
411 }
412}
413
1e59de90 414void rgw_pubsub_dest::dump(Formatter *f) const
11fdf7f2 415{
11fdf7f2 416 encode_json("push_endpoint", push_endpoint, f);
eafe8130
TL
417 encode_json("push_endpoint_args", push_endpoint_args, f);
418 encode_json("push_endpoint_topic", arn_topic, f);
f67539c2
TL
419 encode_json("stored_secret", stored_secret, f);
420 encode_json("persistent", persistent, f);
eafe8130
TL
421}
422
1e59de90 423void rgw_pubsub_dest::dump_xml(Formatter *f) const
eafe8130
TL
424{
425 encode_xml("EndpointAddress", push_endpoint, f);
426 encode_xml("EndpointArgs", push_endpoint_args, f);
427 encode_xml("EndpointTopic", arn_topic, f);
f67539c2
TL
428 encode_xml("HasStoredSecret", stored_secret, f);
429 encode_xml("Persistent", persistent, f);
430}
431
1e59de90 432std::string rgw_pubsub_dest::to_json_str() const
f67539c2 433{
f67539c2
TL
434 JSONFormatter f;
435 f.open_object_section("");
436 encode_json("EndpointAddress", push_endpoint, &f);
437 encode_json("EndpointArgs", push_endpoint_args, &f);
438 encode_json("EndpointTopic", arn_topic, &f);
439 encode_json("HasStoredSecret", stored_secret, &f);
440 encode_json("Persistent", persistent, &f);
441 f.close_section();
442 std::stringstream ss;
443 f.flush(ss);
444 return ss.str();
11fdf7f2
TL
445}
446
1e59de90
TL
447RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver, const std::string& _tenant)
448 : driver(_driver), tenant(_tenant)
449{}
11fdf7f2 450
1e59de90
TL
451int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
452 RGWObjVersionTracker *objv_tracker, optional_yield y) const
11fdf7f2 453{
1e59de90 454 const int ret = driver->read_topics(tenant, result, objv_tracker, y, dpp);
eafe8130 455 if (ret < 0) {
1e59de90 456 ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
11fdf7f2
TL
457 return ret;
458 }
459 return 0;
460}
461
b3b6e05e 462int RGWPubSub::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
1e59de90 463 RGWObjVersionTracker *objv_tracker, optional_yield y) const
11fdf7f2 464{
1e59de90 465 const int ret = driver->write_topics(tenant, topics, objv_tracker, y, dpp);
11fdf7f2 466 if (ret < 0 && ret != -ENOENT) {
b3b6e05e 467 ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
468 return ret;
469 }
470 return 0;
471}
472
1e59de90
TL
473int RGWPubSub::Bucket::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result,
474 RGWObjVersionTracker *objv_tracker, optional_yield y) const
11fdf7f2 475{
1e59de90 476 const int ret = bucket->read_topics(result, objv_tracker, y, dpp);
11fdf7f2 477 if (ret < 0 && ret != -ENOENT) {
1e59de90 478 ldpp_dout(dpp, 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
11fdf7f2
TL
479 return ret;
480 }
481 return 0;
482}
483
b3b6e05e 484int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics,
f67539c2 485 RGWObjVersionTracker *objv_tracker,
1e59de90 486 optional_yield y) const
11fdf7f2 487{
1e59de90 488 const int ret = bucket->write_topics(topics, objv_tracker, y, dpp);
11fdf7f2 489 if (ret < 0) {
1e59de90 490 ldpp_dout(dpp, 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
11fdf7f2
TL
491 return ret;
492 }
493
494 return 0;
495}
496
1e59de90 497int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const
11fdf7f2 498{
f67539c2 499 rgw_pubsub_topics topics;
1e59de90 500 const int ret = read_topics(dpp, topics, nullptr, y);
11fdf7f2 501 if (ret < 0) {
1e59de90 502 ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
eafe8130
TL
503 return ret;
504 }
505
506 auto iter = topics.topics.find(name);
507 if (iter == topics.topics.end()) {
1e59de90 508 ldpp_dout(dpp, 1) << "ERROR: topic not found" << dendl;
eafe8130
TL
509 return -ENOENT;
510 }
511
1e59de90 512 result = iter->second;
eafe8130
TL
513 return 0;
514}
515
aee94f69
TL
516// from list of bucket topics, find the one that was auto-generated by a notification
517auto find_unique_topic(const rgw_pubsub_bucket_topics &bucket_topics, const std::string &notification_id) {
518 auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(),
519 [&](const auto& val) { return notification_id == val.second.s3_id; });
520 return it != bucket_topics.topics.end() ?
521 std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second):
522 std::nullopt;
523}
524
525int RGWPubSub::Bucket::get_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notification_id,
526 rgw_pubsub_topic_filter& result, optional_yield y) const {
527 rgw_pubsub_bucket_topics bucket_topics;
528 const int ret = read_topics(dpp, bucket_topics, nullptr, y);
529 if (ret < 0) {
530 ldpp_dout(dpp, 1) << "ERROR: failed to read bucket_topics info: ret=" << ret << dendl;
531 return ret;
532 }
533
534 auto iter = find_unique_topic(bucket_topics, notification_id);
535 if (!iter) {
536 ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl;
537 return -ENOENT;
538 }
539
540 result = iter->get();
541 return 0;
542}
543
544
1e59de90
TL
545int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
546 const rgw::notify::EventTypeList& events, optional_yield y) const {
b3b6e05e 547 return create_notification(dpp, topic_name, events, std::nullopt, "", y);
eafe8130
TL
548}
549
1e59de90
TL
550int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
551 const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const {
552 rgw_pubsub_topic topic_info;
11fdf7f2 553
1e59de90 554 int ret = ps.get_topic(dpp, topic_name, topic_info, y);
11fdf7f2 555 if (ret < 0) {
b3b6e05e 556 ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
11fdf7f2
TL
557 return ret;
558 }
b3b6e05e 559 ldpp_dout(dpp, 20) << "successfully read topic '" << topic_name << "' info" << dendl;
11fdf7f2
TL
560
561 RGWObjVersionTracker objv_tracker;
562 rgw_pubsub_bucket_topics bucket_topics;
563
1e59de90 564 ret = read_topics(dpp, bucket_topics, &objv_tracker, y);
eafe8130 565 if (ret < 0) {
b3b6e05e 566 ldpp_dout(dpp, 1) << "ERROR: failed to read topics from bucket '" <<
1e59de90 567 bucket->get_name() << "': ret=" << ret << dendl;
11fdf7f2
TL
568 return ret;
569 }
b3b6e05e 570 ldpp_dout(dpp, 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" <<
1e59de90 571 bucket->get_name() << "'" << dendl;
11fdf7f2
TL
572
573 auto& topic_filter = bucket_topics.topics[topic_name];
1e59de90 574 topic_filter.topic = topic_info;
11fdf7f2 575 topic_filter.events = events;
eafe8130
TL
576 topic_filter.s3_id = notif_name;
577 if (s3_filter) {
578 topic_filter.s3_filter = *s3_filter;
579 }
11fdf7f2 580
b3b6e05e 581 ret = write_topics(dpp, bucket_topics, &objv_tracker, y);
11fdf7f2 582 if (ret < 0) {
1e59de90 583 ldpp_dout(dpp, 1) << "ERROR: failed to write topics to bucket '" << bucket->get_name() << "': ret=" << ret << dendl;
11fdf7f2
TL
584 return ret;
585 }
eafe8130 586
1e59de90 587 ldpp_dout(dpp, 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket->get_name() << "'" << dendl;
11fdf7f2
TL
588
589 return 0;
590}
591
1e59de90 592int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y) const
aee94f69
TL
593{
594 return remove_notification_inner(dpp, topic_name, false, y);
595}
596
597int RGWPubSub::Bucket::remove_notification_inner(const DoutPrefixProvider *dpp, const std::string& notification_id,
598 bool is_notification_id, optional_yield y) const
11fdf7f2 599{
11fdf7f2
TL
600 RGWObjVersionTracker objv_tracker;
601 rgw_pubsub_bucket_topics bucket_topics;
602
1e59de90 603 auto ret = read_topics(dpp, bucket_topics, &objv_tracker, y);
eafe8130 604 if (ret < 0) {
b3b6e05e 605 ldpp_dout(dpp, 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
11fdf7f2
TL
606 return ret;
607 }
608
aee94f69
TL
609
610 std::unique_ptr<std::string> topic_name = std::make_unique<std::string>(notification_id);
611 if(is_notification_id) {
612 auto iter = find_unique_topic(bucket_topics, notification_id);
613 if (!iter) {
614 ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl;
615 return -ENOENT;
616 }
617 topic_name = std::make_unique<std::string>(iter->get().topic.name);
618 }
619
620 if (bucket_topics.topics.erase(*topic_name) == 0) {
1e59de90
TL
621 ldpp_dout(dpp, 1) << "INFO: no need to remove, topic does not exist" << dendl;
622 return 0;
623 }
11fdf7f2 624
522d829b
TL
625 if (bucket_topics.topics.empty()) {
626 // no more topics - delete the notification object of the bucket
1e59de90 627 ret = bucket->remove_topics(&objv_tracker, y, dpp);
522d829b 628 if (ret < 0 && ret != -ENOENT) {
20effc67 629 ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl;
522d829b
TL
630 return ret;
631 }
632 return 0;
633 }
634
635 // write back the notifications without the deleted one
b3b6e05e 636 ret = write_topics(dpp, bucket_topics, &objv_tracker, y);
11fdf7f2 637 if (ret < 0) {
b3b6e05e 638 ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
639 return ret;
640 }
641
642 return 0;
643}
644
aee94f69
TL
645int RGWPubSub::Bucket::remove_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notif_id, optional_yield y) const
646{
647 return remove_notification_inner(dpp, notif_id, true, y);
648}
649
1e59de90 650int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const
f67539c2
TL
651{
652 // get all topics on a bucket
653 rgw_pubsub_bucket_topics bucket_topics;
1e59de90 654 auto ret = get_topics(dpp, bucket_topics, y);
f67539c2 655 if (ret < 0 && ret != -ENOENT) {
1e59de90 656 ldpp_dout(dpp, 1) << "ERROR: failed to get list of topics from bucket '" << bucket->get_name() << "', ret=" << ret << dendl;
f67539c2
TL
657 return ret ;
658 }
659
660 // remove all auto-genrated topics
661 for (const auto& topic : bucket_topics.topics) {
662 const auto& topic_name = topic.first;
1e59de90 663 ret = ps.remove_topic(dpp, topic_name, y);
f67539c2 664 if (ret < 0 && ret != -ENOENT) {
b3b6e05e 665 ldpp_dout(dpp, 5) << "WARNING: failed to remove auto-generated topic '" << topic_name << "', ret=" << ret << dendl;
f67539c2
TL
666 }
667 }
668
522d829b 669 // delete the notification object of the bucket
1e59de90 670 ret = bucket->remove_topics(nullptr, y, dpp);
f67539c2 671 if (ret < 0 && ret != -ENOENT) {
b3b6e05e 672 ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl;
f67539c2
TL
673 return ret;
674 }
675
676 return 0;
eafe8130
TL
677}
678
1e59de90
TL
679int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const {
680 return create_topic(dpp, name, rgw_pubsub_dest{}, "", "", y);
f67539c2
TL
681}
682
1e59de90
TL
683int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_dest& dest,
684 const std::string& arn, const std::string& opaque_data, optional_yield y) const {
11fdf7f2 685 RGWObjVersionTracker objv_tracker;
f67539c2 686 rgw_pubsub_topics topics;
11fdf7f2 687
1e59de90 688 int ret = read_topics(dpp, topics, &objv_tracker, y);
11fdf7f2 689 if (ret < 0 && ret != -ENOENT) {
eafe8130 690 // its not an error if not topics exist, we create one
b3b6e05e 691 ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
11fdf7f2
TL
692 return ret;
693 }
eafe8130 694
1e59de90
TL
695 rgw_pubsub_topic& new_topic = topics.topics[name];
696 new_topic.user = rgw_user("", tenant);
697 new_topic.name = name;
698 new_topic.dest = dest;
699 new_topic.arn = arn;
700 new_topic.opaque_data = opaque_data;
11fdf7f2 701
b3b6e05e 702 ret = write_topics(dpp, topics, &objv_tracker, y);
11fdf7f2 703 if (ret < 0) {
b3b6e05e 704 ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
705 return ret;
706 }
707
708 return 0;
709}
710
1e59de90 711int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const
11fdf7f2
TL
712{
713 RGWObjVersionTracker objv_tracker;
f67539c2 714 rgw_pubsub_topics topics;
11fdf7f2 715
1e59de90 716 int ret = read_topics(dpp, topics, &objv_tracker, y);
11fdf7f2 717 if (ret < 0 && ret != -ENOENT) {
b3b6e05e 718 ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
11fdf7f2 719 return ret;
eafe8130
TL
720 } else if (ret == -ENOENT) {
721 // its not an error if no topics exist, just a no-op
b3b6e05e 722 ldpp_dout(dpp, 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret << dendl;
eafe8130 723 return 0;
11fdf7f2
TL
724 }
725
726 topics.topics.erase(name);
727
b3b6e05e 728 ret = write_topics(dpp, topics, &objv_tracker, y);
11fdf7f2 729 if (ret < 0) {
b3b6e05e 730 ldpp_dout(dpp, 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl;
11fdf7f2
TL
731 return ret;
732 }
733
734 return 0;
735}
736