]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_rest_pubsub.cc
17a75be0674a47a943d7b3ede821875d2de9cc10
[ceph.git] / ceph / src / rgw / rgw_rest_pubsub.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <algorithm>
5 #include <boost/tokenizer.hpp>
6 #include <optional>
7 #include "rgw_rest_pubsub_common.h"
8 #include "rgw_rest_pubsub.h"
9 #include "rgw_pubsub_push.h"
10 #include "rgw_pubsub.h"
11 #include "rgw_sync_module_pubsub.h"
12 #include "rgw_op.h"
13 #include "rgw_rest.h"
14 #include "rgw_rest_s3.h"
15 #include "rgw_arn.h"
16 #include "rgw_auth_s3.h"
17 #include "rgw_notify.h"
18 #include "rgw_sal_rados.h"
19 #include "services/svc_zone.h"
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rgw
23
24 static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/");
25
26 // command (AWS compliant):
27 // POST
28 // Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
29 class RGWPSCreateTopic_ObjStore_AWS : public RGWPSCreateTopicOp {
30 public:
31 int get_params() override {
32 topic_name = s->info.args.get("Name");
33 if (topic_name.empty()) {
34 ldout(s->cct, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
35 return -EINVAL;
36 }
37
38 opaque_data = s->info.args.get("OpaqueData");
39
40 dest.push_endpoint = s->info.args.get("push-endpoint");
41 dest.persistent = s->info.args.exists("persistent");
42
43 if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
44 return -EINVAL;
45 }
46 for (const auto& param : s->info.args.get_params()) {
47 if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
48 continue;
49 }
50 dest.push_endpoint_args.append(param.first+"="+param.second+"&");
51 }
52
53 if (!dest.push_endpoint_args.empty()) {
54 // remove last separator
55 dest.push_endpoint_args.pop_back();
56 }
57 if (!dest.push_endpoint.empty() && dest.persistent) {
58 const auto ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
59 if (ret < 0) {
60 ldout(s->cct, 1) << "CreateTopic Action failed to create queue for persistent topics. error:" << ret << dendl;
61 return ret;
62 }
63 }
64
65 // dest object only stores endpoint info
66 // bucket to store events/records will be set only when subscription is created
67 dest.bucket_name = "";
68 dest.oid_prefix = "";
69 dest.arn_topic = topic_name;
70 // the topic ARN will be sent in the reply
71 const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
72 store->svc()->zone->get_zonegroup().get_name(),
73 s->user->get_tenant(), topic_name);
74 topic_arn = arn.to_string();
75 return 0;
76 }
77
78 void send_response() override {
79 if (op_ret) {
80 set_req_state_err(s, op_ret);
81 }
82 dump_errno(s);
83 end_header(s, this, "application/xml");
84
85 if (op_ret < 0) {
86 return;
87 }
88
89 const auto f = s->formatter;
90 f->open_object_section_in_ns("CreateTopicResponse", AWS_SNS_NS);
91 f->open_object_section("CreateTopicResult");
92 encode_xml("TopicArn", topic_arn, f);
93 f->close_section(); // CreateTopicResult
94 f->open_object_section("ResponseMetadata");
95 encode_xml("RequestId", s->req_id, f);
96 f->close_section(); // ResponseMetadata
97 f->close_section(); // CreateTopicResponse
98 rgw_flush_formatter_and_reset(s, f);
99 }
100 };
101
102 // command (AWS compliant):
103 // POST
104 // Action=ListTopics
105 class RGWPSListTopics_ObjStore_AWS : public RGWPSListTopicsOp {
106 public:
107 void send_response() override {
108 if (op_ret) {
109 set_req_state_err(s, op_ret);
110 }
111 dump_errno(s);
112 end_header(s, this, "application/xml");
113
114 if (op_ret < 0) {
115 return;
116 }
117
118 const auto f = s->formatter;
119 f->open_object_section_in_ns("ListTopicsResponse", AWS_SNS_NS);
120 f->open_object_section("ListTopicsResult");
121 encode_xml("Topics", result, f);
122 f->close_section(); // ListTopicsResult
123 f->open_object_section("ResponseMetadata");
124 encode_xml("RequestId", s->req_id, f);
125 f->close_section(); // ResponseMetadat
126 f->close_section(); // ListTopicsResponse
127 rgw_flush_formatter_and_reset(s, f);
128 }
129 };
130
131 // command (extension to AWS):
132 // POST
133 // Action=GetTopic&TopicArn=<topic-arn>
134 class RGWPSGetTopic_ObjStore_AWS : public RGWPSGetTopicOp {
135 public:
136 int get_params() override {
137 const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
138
139 if (!topic_arn || topic_arn->resource.empty()) {
140 ldout(s->cct, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl;
141 return -EINVAL;
142 }
143
144 topic_name = topic_arn->resource;
145 return 0;
146 }
147
148 void send_response() override {
149 if (op_ret) {
150 set_req_state_err(s, op_ret);
151 }
152 dump_errno(s);
153 end_header(s, this, "application/xml");
154
155 if (op_ret < 0) {
156 return;
157 }
158
159 const auto f = s->formatter;
160 f->open_object_section("GetTopicResponse");
161 f->open_object_section("GetTopicResult");
162 encode_xml("Topic", result.topic, f);
163 f->close_section();
164 f->open_object_section("ResponseMetadata");
165 encode_xml("RequestId", s->req_id, f);
166 f->close_section();
167 f->close_section();
168 rgw_flush_formatter_and_reset(s, f);
169 }
170 };
171
172 // command (AWS compliant):
173 // POST
174 // Action=GetTopicAttributes&TopicArn=<topic-arn>
175 class RGWPSGetTopicAttributes_ObjStore_AWS : public RGWPSGetTopicOp {
176 public:
177 int get_params() override {
178 const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
179
180 if (!topic_arn || topic_arn->resource.empty()) {
181 ldout(s->cct, 1) << "GetTopicAttribute Action 'TopicArn' argument is missing or invalid" << dendl;
182 return -EINVAL;
183 }
184
185 topic_name = topic_arn->resource;
186 return 0;
187 }
188
189 void send_response() override {
190 if (op_ret) {
191 set_req_state_err(s, op_ret);
192 }
193 dump_errno(s);
194 end_header(s, this, "application/xml");
195
196 if (op_ret < 0) {
197 return;
198 }
199
200 const auto f = s->formatter;
201 f->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS);
202 f->open_object_section("GetTopicAttributesResult");
203 result.topic.dump_xml_as_attributes(f);
204 f->close_section(); // GetTopicAttributesResult
205 f->open_object_section("ResponseMetadata");
206 encode_xml("RequestId", s->req_id, f);
207 f->close_section(); // ResponseMetadata
208 f->close_section(); // GetTopicAttributesResponse
209 rgw_flush_formatter_and_reset(s, f);
210 }
211 };
212
213 // command (AWS compliant):
214 // POST
215 // Action=DeleteTopic&TopicArn=<topic-arn>
216 class RGWPSDeleteTopic_ObjStore_AWS : public RGWPSDeleteTopicOp {
217 public:
218 int get_params() override {
219 const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
220
221 if (!topic_arn || topic_arn->resource.empty()) {
222 ldout(s->cct, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
223 return -EINVAL;
224 }
225
226 topic_name = topic_arn->resource;
227
228 // upon deletion it is not known if topic is persistent or not
229 // will try to delete the persistent topic anyway
230 const auto ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
231 if (ret == -ENOENT) {
232 // topic was not persistent, or already deleted
233 return 0;
234 }
235 if (ret < 0) {
236 ldout(s->cct, 1) << "DeleteTopic Action failed to remove queue for persistent topics. error:" << ret << dendl;
237 return ret;
238 }
239
240 return 0;
241 }
242
243 void send_response() override {
244 if (op_ret) {
245 set_req_state_err(s, op_ret);
246 }
247 dump_errno(s);
248 end_header(s, this, "application/xml");
249
250 if (op_ret < 0) {
251 return;
252 }
253
254 const auto f = s->formatter;
255 f->open_object_section_in_ns("DeleteTopicResponse", AWS_SNS_NS);
256 f->open_object_section("ResponseMetadata");
257 encode_xml("RequestId", s->req_id, f);
258 f->close_section(); // ResponseMetadata
259 f->close_section(); // DeleteTopicResponse
260 rgw_flush_formatter_and_reset(s, f);
261 }
262 };
263
264 namespace {
265 // utility classes and functions for handling parameters with the following format:
266 // Attributes.entry.{N}.{key|value}={VALUE}
267 // N - any unsigned number
268 // VALUE - url encoded string
269
270 // and Attribute is holding key and value
271 // ctor and set are done according to the "type" argument
272 // if type is not "key" or "value" its a no-op
273 class Attribute {
274 std::string key;
275 std::string value;
276 public:
277 Attribute(const std::string& type, const std::string& key_or_value) {
278 set(type, key_or_value);
279 }
280 void set(const std::string& type, const std::string& key_or_value) {
281 if (type == "key") {
282 key = key_or_value;
283 } else if (type == "value") {
284 value = key_or_value;
285 }
286 }
287 const std::string& get_key() const { return key; }
288 const std::string& get_value() const { return value; }
289 };
290
291 using AttributeMap = std::map<unsigned, Attribute>;
292
293 // aggregate the attributes into a map
294 // the key and value are associated by the index (N)
295 // no assumptions are made on the order in which these parameters are added
296 void update_attribute_map(const std::string& input, AttributeMap& map) {
297 const boost::char_separator<char> sep(".");
298 const boost::tokenizer tokens(input, sep);
299 auto token = tokens.begin();
300 if (*token != "Attributes") {
301 return;
302 }
303 ++token;
304
305 if (*token != "entry") {
306 return;
307 }
308 ++token;
309
310 unsigned idx;
311 try {
312 idx = std::stoul(*token);
313 } catch (const std::invalid_argument&) {
314 return;
315 }
316 ++token;
317
318 std::string key_or_value = "";
319 // get the rest of the string regardless of dots
320 // this is to allow dots in the value
321 while (token != tokens.end()) {
322 key_or_value.append(*token+".");
323 ++token;
324 }
325 // remove last separator
326 key_or_value.pop_back();
327
328 auto pos = key_or_value.find("=");
329 if (pos != string::npos) {
330 const auto key_or_value_lhs = key_or_value.substr(0, pos);
331 const auto key_or_value_rhs = url_decode(key_or_value.substr(pos + 1, key_or_value.size() - 1));
332 const auto map_it = map.find(idx);
333 if (map_it == map.end()) {
334 // new entry
335 map.emplace(std::make_pair(idx, Attribute(key_or_value_lhs, key_or_value_rhs)));
336 } else {
337 // existing entry
338 map_it->second.set(key_or_value_lhs, key_or_value_rhs);
339 }
340 }
341 }
342 }
343
344 void RGWHandler_REST_PSTopic_AWS::rgw_topic_parse_input() {
345 if (post_body.size() > 0) {
346 ldout(s->cct, 10) << "Content of POST: " << post_body << dendl;
347
348 if (post_body.find("Action") != string::npos) {
349 const boost::char_separator<char> sep("&");
350 const boost::tokenizer<boost::char_separator<char>> tokens(post_body, sep);
351 AttributeMap map;
352 for (const auto& t : tokens) {
353 auto pos = t.find("=");
354 if (pos != string::npos) {
355 const auto key = t.substr(0, pos);
356 if (key == "Action") {
357 s->info.args.append(key, t.substr(pos + 1, t.size() - 1));
358 } else if (key == "Name" || key == "TopicArn") {
359 const auto value = url_decode(t.substr(pos + 1, t.size() - 1));
360 s->info.args.append(key, value);
361 } else {
362 update_attribute_map(t, map);
363 }
364 }
365 }
366 // update the regular args with the content of the attribute map
367 for (const auto& attr : map) {
368 s->info.args.append(attr.second.get_key(), attr.second.get_value());
369 }
370 }
371 const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body);
372 s->info.args.append("PayloadHash", payload_hash);
373 }
374 }
375
376 RGWOp* RGWHandler_REST_PSTopic_AWS::op_post() {
377 rgw_topic_parse_input();
378
379 if (s->info.args.exists("Action")) {
380 const auto action = s->info.args.get("Action");
381 if (action.compare("CreateTopic") == 0)
382 return new RGWPSCreateTopic_ObjStore_AWS();
383 if (action.compare("DeleteTopic") == 0)
384 return new RGWPSDeleteTopic_ObjStore_AWS;
385 if (action.compare("ListTopics") == 0)
386 return new RGWPSListTopics_ObjStore_AWS();
387 if (action.compare("GetTopic") == 0)
388 return new RGWPSGetTopic_ObjStore_AWS();
389 if (action.compare("GetTopicAttributes") == 0)
390 return new RGWPSGetTopicAttributes_ObjStore_AWS();
391 }
392
393 return nullptr;
394 }
395
396 int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp, optional_yield y) {
397 return RGW_Auth_S3::authorize(dpp, store, auth_registry, s, y);
398 }
399
400
401 namespace {
402 // return a unique topic by prefexing with the notification name: <notification>_<topic>
403 std::string topic_to_unique(const std::string& topic, const std::string& notification) {
404 return notification + "_" + topic;
405 }
406
407 // extract the topic from a unique topic of the form: <notification>_<topic>
408 [[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
409 if (unique_topic.find(notification + "_") == string::npos) {
410 return "";
411 }
412 return unique_topic.substr(notification.length() + 1);
413 }
414
415 // from list of bucket topics, find the one that was auto-generated by a notification
416 auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std::string& notif_name) {
417 auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), [&](const auto& val) { return notif_name == val.second.s3_id; });
418 return it != bucket_topics.topics.end() ?
419 std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second):
420 std::nullopt;
421 }
422 }
423
424 // command (S3 compliant): PUT /<bucket name>?notification
425 // a "notification" and a subscription will be auto-generated
426 // actual configuration is XML encoded in the body of the message
427 class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
428 rgw_pubsub_s3_notifications configurations;
429
430 int get_params_from_body() {
431 const auto max_size = s->cct->_conf->rgw_max_put_param_size;
432 int r;
433 bufferlist data;
434 std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false);
435
436 if (r < 0) {
437 ldout(s->cct, 1) << "failed to read XML payload" << dendl;
438 return r;
439 }
440 if (data.length() == 0) {
441 ldout(s->cct, 1) << "XML payload missing" << dendl;
442 return -EINVAL;
443 }
444
445 RGWXMLDecoder::XMLParser parser;
446
447 if (!parser.init()){
448 ldout(s->cct, 1) << "failed to initialize XML parser" << dendl;
449 return -EINVAL;
450 }
451 if (!parser.parse(data.c_str(), data.length(), 1)) {
452 ldout(s->cct, 1) << "failed to parse XML payload" << dendl;
453 return -ERR_MALFORMED_XML;
454 }
455 try {
456 // NotificationConfigurations is mandatory
457 RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true);
458 } catch (RGWXMLDecoder::err& err) {
459 ldout(s->cct, 1) << "failed to parse XML payload. error: " << err << dendl;
460 return -ERR_MALFORMED_XML;
461 }
462 return 0;
463 }
464
465 int get_params() override {
466 bool exists;
467 const auto no_value = s->info.args.get("notification", &exists);
468 if (!exists) {
469 ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
470 return -EINVAL;
471 }
472 if (no_value.length() > 0) {
473 ldout(s->cct, 1) << "param 'notification' should not have any value" << dendl;
474 return -EINVAL;
475 }
476 if (s->bucket_name.empty()) {
477 ldout(s->cct, 1) << "request must be on a bucket" << dendl;
478 return -EINVAL;
479 }
480 bucket_name = s->bucket_name;
481 return 0;
482 }
483
484 public:
485 const char* name() const override { return "pubsub_notification_create_s3"; }
486 void execute(optional_yield) override;
487 };
488
489 void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
490 op_ret = get_params_from_body();
491 if (op_ret < 0) {
492 return;
493 }
494
495 ps.emplace(store, s->owner.get_id().tenant);
496 auto b = ps->get_bucket(bucket_info.bucket);
497 ceph_assert(b);
498 std::string data_bucket_prefix = "";
499 std::string data_oid_prefix = "";
500 bool push_only = true;
501 if (store->getRados()->get_sync_module()) {
502 const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->getRados()->get_sync_module().get());
503 if (psmodule) {
504 const auto& conf = psmodule->get_effective_conf();
505 data_bucket_prefix = conf["data_bucket_prefix"];
506 data_oid_prefix = conf["data_oid_prefix"];
507 // TODO: allow "push-only" on PS zone as well
508 push_only = false;
509 }
510 }
511
512 for (const auto& c : configurations.list) {
513 const auto& notif_name = c.id;
514 if (notif_name.empty()) {
515 ldout(s->cct, 1) << "missing notification id" << dendl;
516 op_ret = -EINVAL;
517 return;
518 }
519 if (c.topic_arn.empty()) {
520 ldout(s->cct, 1) << "missing topic ARN in notification: '" << notif_name << "'" << dendl;
521 op_ret = -EINVAL;
522 return;
523 }
524
525 const auto arn = rgw::ARN::parse(c.topic_arn);
526 if (!arn || arn->resource.empty()) {
527 ldout(s->cct, 1) << "topic ARN has invalid format: '" << c.topic_arn << "' in notification: '" << notif_name << "'" << dendl;
528 op_ret = -EINVAL;
529 return;
530 }
531
532 if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) {
533 ldout(s->cct, 1) << "unknown event type in notification: '" << notif_name << "'" << dendl;
534 op_ret = -EINVAL;
535 return;
536 }
537
538 const auto topic_name = arn->resource;
539
540 // get topic information. destination information is stored in the topic
541 rgw_pubsub_topic topic_info;
542 op_ret = ps->get_topic(topic_name, &topic_info);
543 if (op_ret < 0) {
544 ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
545 return;
546 }
547 // make sure that full topic configuration match
548 // TODO: use ARN match function
549
550 // create unique topic name. this has 2 reasons:
551 // (1) topics cannot be shared between different S3 notifications because they hold the filter information
552 // (2) make topic clneaup easier, when notification is removed
553 const auto unique_topic_name = topic_to_unique(topic_name, notif_name);
554 // generate the internal topic. destination is stored here for the "push-only" case
555 // when no subscription exists
556 // ARN is cached to make the "GET" method faster
557 op_ret = ps->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
558 if (op_ret < 0) {
559 ldout(s->cct, 1) << "failed to auto-generate unique topic '" << unique_topic_name <<
560 "', ret=" << op_ret << dendl;
561 return;
562 }
563 ldout(s->cct, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl;
564 // generate the notification
565 rgw::notify::EventTypeList events;
566 op_ret = b->create_notification(unique_topic_name, c.events, std::make_optional(c.filter), notif_name, y);
567 if (op_ret < 0) {
568 ldout(s->cct, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
569 "', ret=" << op_ret << dendl;
570 // rollback generated topic (ignore return value)
571 ps->remove_topic(unique_topic_name, y);
572 return;
573 }
574 ldout(s->cct, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
575
576 if (!push_only) {
577 // generate the subscription with destination information from the original topic
578 rgw_pubsub_sub_dest dest = topic_info.dest;
579 dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
580 dest.oid_prefix = data_oid_prefix + notif_name + "/";
581 auto sub = ps->get_sub(notif_name);
582 op_ret = sub->subscribe(unique_topic_name, dest, y, notif_name);
583 if (op_ret < 0) {
584 ldout(s->cct, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl;
585 // rollback generated notification (ignore return value)
586 b->remove_notification(unique_topic_name, y);
587 // rollback generated topic (ignore return value)
588 ps->remove_topic(unique_topic_name, y);
589 return;
590 }
591 ldout(s->cct, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl;
592 }
593 }
594 }
595
596 // command (extension to S3): DELETE /bucket?notification[=<notification-id>]
597 class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSDeleteNotifOp {
598 private:
599 std::string notif_name;
600
601 int get_params() override {
602 bool exists;
603 notif_name = s->info.args.get("notification", &exists);
604 if (!exists) {
605 ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
606 return -EINVAL;
607 }
608 if (s->bucket_name.empty()) {
609 ldout(s->cct, 1) << "request must be on a bucket" << dendl;
610 return -EINVAL;
611 }
612 bucket_name = s->bucket_name;
613 return 0;
614 }
615
616 void remove_notification_by_topic(const std::string& topic_name, const RGWPubSub::BucketRef& b, optional_yield y) {
617 op_ret = b->remove_notification(topic_name, y);
618 if (op_ret < 0) {
619 ldout(s->cct, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
620 }
621 op_ret = ps->remove_topic(topic_name, y);
622 if (op_ret < 0) {
623 ldout(s->cct, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
624 }
625 }
626
627 public:
628 void execute(optional_yield y) override;
629 const char* name() const override { return "pubsub_notification_delete_s3"; }
630 };
631
632 void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) {
633 op_ret = get_params();
634 if (op_ret < 0) {
635 return;
636 }
637
638 ps.emplace(store, s->owner.get_id().tenant);
639 auto b = ps->get_bucket(bucket_info.bucket);
640 ceph_assert(b);
641
642 // get all topics on a bucket
643 rgw_pubsub_bucket_topics bucket_topics;
644 op_ret = b->get_topics(&bucket_topics);
645 if (op_ret < 0) {
646 ldout(s->cct, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
647 return;
648 }
649
650 if (!notif_name.empty()) {
651 // delete a specific notification
652 const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
653 if (unique_topic) {
654 // remove the auto generated subscription according to notification name (if exist)
655 const auto unique_topic_name = unique_topic->get().topic.name;
656 auto sub = ps->get_sub(notif_name);
657 op_ret = sub->unsubscribe(unique_topic_name, y);
658 if (op_ret < 0 && op_ret != -ENOENT) {
659 ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
660 return;
661 }
662 remove_notification_by_topic(unique_topic_name, b, y);
663 return;
664 }
665 // notification to be removed is not found - considered success
666 ldout(s->cct, 20) << "notification '" << notif_name << "' already removed" << dendl;
667 return;
668 }
669
670 // delete all notification of on a bucket
671 for (const auto& topic : bucket_topics.topics) {
672 // remove the auto generated subscription of the topic (if exist)
673 rgw_pubsub_topic_subs topic_subs;
674 op_ret = ps->get_topic(topic.first, &topic_subs);
675 for (const auto& topic_sub_name : topic_subs.subs) {
676 auto sub = ps->get_sub(topic_sub_name);
677 rgw_pubsub_sub_config sub_conf;
678 op_ret = sub->get_conf(&sub_conf);
679 if (op_ret < 0) {
680 ldout(s->cct, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl;
681 return;
682 }
683 if (!sub_conf.s3_id.empty()) {
684 // S3 notification, has autogenerated subscription
685 const auto& sub_topic_name = sub_conf.topic;
686 op_ret = sub->unsubscribe(sub_topic_name, y);
687 if (op_ret < 0) {
688 ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl;
689 return;
690 }
691 }
692 }
693 remove_notification_by_topic(topic.first, b, y);
694 }
695 }
696
697 // command (S3 compliant): GET /bucket?notification[=<notification-id>]
698 class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
699 private:
700 std::string notif_name;
701 rgw_pubsub_s3_notifications notifications;
702
703 int get_params() override {
704 bool exists;
705 notif_name = s->info.args.get("notification", &exists);
706 if (!exists) {
707 ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
708 return -EINVAL;
709 }
710 if (s->bucket_name.empty()) {
711 ldout(s->cct, 1) << "request must be on a bucket" << dendl;
712 return -EINVAL;
713 }
714 bucket_name = s->bucket_name;
715 return 0;
716 }
717
718 public:
719 void execute(optional_yield y) override;
720 void send_response() override {
721 if (op_ret) {
722 set_req_state_err(s, op_ret);
723 }
724 dump_errno(s);
725 end_header(s, this, "application/xml");
726
727 if (op_ret < 0) {
728 return;
729 }
730 notifications.dump_xml(s->formatter);
731 rgw_flush_formatter_and_reset(s, s->formatter);
732 }
733 const char* name() const override { return "pubsub_notifications_get_s3"; }
734 };
735
736 void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y) {
737 ps.emplace(store, s->owner.get_id().tenant);
738 auto b = ps->get_bucket(bucket_info.bucket);
739 ceph_assert(b);
740
741 // get all topics on a bucket
742 rgw_pubsub_bucket_topics bucket_topics;
743 op_ret = b->get_topics(&bucket_topics);
744 if (op_ret < 0) {
745 ldout(s->cct, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
746 return;
747 }
748 if (!notif_name.empty()) {
749 // get info of a specific notification
750 const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
751 if (unique_topic) {
752 notifications.list.emplace_back(unique_topic->get());
753 return;
754 }
755 op_ret = -ENOENT;
756 ldout(s->cct, 1) << "failed to get notification info for '" << notif_name << "', ret=" << op_ret << dendl;
757 return;
758 }
759 // loop through all topics of the bucket
760 for (const auto& topic : bucket_topics.topics) {
761 if (topic.second.s3_id.empty()) {
762 // not an s3 notification
763 continue;
764 }
765 notifications.list.emplace_back(topic.second);
766 }
767 }
768
769 RGWOp* RGWHandler_REST_PSNotifs_S3::op_get() {
770 return new RGWPSListNotifs_ObjStore_S3();
771 }
772
773 RGWOp* RGWHandler_REST_PSNotifs_S3::op_put() {
774 return new RGWPSCreateNotif_ObjStore_S3();
775 }
776
777 RGWOp* RGWHandler_REST_PSNotifs_S3::op_delete() {
778 return new RGWPSDeleteNotif_ObjStore_S3();
779 }
780
781 RGWOp* RGWHandler_REST_PSNotifs_S3::create_get_op() {
782 return new RGWPSListNotifs_ObjStore_S3();
783 }
784
785 RGWOp* RGWHandler_REST_PSNotifs_S3::create_put_op() {
786 return new RGWPSCreateNotif_ObjStore_S3();
787 }
788
789 RGWOp* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
790 return new RGWPSDeleteNotif_ObjStore_S3();
791 }
792