]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_rest_pubsub.cc
import quincy 17.2.0
[ceph.git] / ceph / src / rgw / rgw_rest_pubsub.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <algorithm>
5 #include <boost/tokenizer.hpp>
6 #include <optional>
7 #include "rgw_rest_pubsub_common.h"
8 #include "rgw_rest_pubsub.h"
9 #include "rgw_pubsub_push.h"
10 #include "rgw_pubsub.h"
11 #include "rgw_sync_module_pubsub.h"
12 #include "rgw_op.h"
13 #include "rgw_rest.h"
14 #include "rgw_rest_s3.h"
15 #include "rgw_arn.h"
16 #include "rgw_auth_s3.h"
17 #include "rgw_notify.h"
18 #include "rgw_sal_rados.h"
19 #include "services/svc_zone.h"
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rgw
23
24 using namespace std;
25
26 static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/");
27
28 // command (AWS compliant):
29 // POST
30 // Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
31 class RGWPSCreateTopic_ObjStore_AWS : public RGWPSCreateTopicOp {
32 public:
33 int get_params() override {
34 topic_name = s->info.args.get("Name");
35 if (topic_name.empty()) {
36 ldpp_dout(this, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
37 return -EINVAL;
38 }
39
40 opaque_data = s->info.args.get("OpaqueData");
41
42 dest.push_endpoint = s->info.args.get("push-endpoint");
43 s->info.args.get_bool("persistent", &dest.persistent, false);
44
45 if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
46 return -EINVAL;
47 }
48 for (const auto& param : s->info.args.get_params()) {
49 if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
50 continue;
51 }
52 dest.push_endpoint_args.append(param.first+"="+param.second+"&");
53 }
54
55 if (!dest.push_endpoint_args.empty()) {
56 // remove last separator
57 dest.push_endpoint_args.pop_back();
58 }
59 if (!dest.push_endpoint.empty() && dest.persistent) {
60 const auto ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
61 if (ret < 0) {
62 ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for persistent topics. error:" << ret << dendl;
63 return ret;
64 }
65 }
66
67 // dest object only stores endpoint info
68 // bucket to store events/records will be set only when subscription is created
69 dest.bucket_name = "";
70 dest.oid_prefix = "";
71 dest.arn_topic = topic_name;
72 // the topic ARN will be sent in the reply
73 const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
74 store->get_zone()->get_zonegroup().get_name(),
75 s->user->get_tenant(), topic_name);
76 topic_arn = arn.to_string();
77 return 0;
78 }
79
80 void send_response() override {
81 if (op_ret) {
82 set_req_state_err(s, op_ret);
83 }
84 dump_errno(s);
85 end_header(s, this, "application/xml");
86
87 if (op_ret < 0) {
88 return;
89 }
90
91 const auto f = s->formatter;
92 f->open_object_section_in_ns("CreateTopicResponse", AWS_SNS_NS);
93 f->open_object_section("CreateTopicResult");
94 encode_xml("TopicArn", topic_arn, f);
95 f->close_section(); // CreateTopicResult
96 f->open_object_section("ResponseMetadata");
97 encode_xml("RequestId", s->req_id, f);
98 f->close_section(); // ResponseMetadata
99 f->close_section(); // CreateTopicResponse
100 rgw_flush_formatter_and_reset(s, f);
101 }
102 };
103
104 // command (AWS compliant):
105 // POST
106 // Action=ListTopics
107 class RGWPSListTopics_ObjStore_AWS : public RGWPSListTopicsOp {
108 public:
109 void send_response() override {
110 if (op_ret) {
111 set_req_state_err(s, op_ret);
112 }
113 dump_errno(s);
114 end_header(s, this, "application/xml");
115
116 if (op_ret < 0) {
117 return;
118 }
119
120 const auto f = s->formatter;
121 f->open_object_section_in_ns("ListTopicsResponse", AWS_SNS_NS);
122 f->open_object_section("ListTopicsResult");
123 encode_xml("Topics", result, f);
124 f->close_section(); // ListTopicsResult
125 f->open_object_section("ResponseMetadata");
126 encode_xml("RequestId", s->req_id, f);
127 f->close_section(); // ResponseMetadat
128 f->close_section(); // ListTopicsResponse
129 rgw_flush_formatter_and_reset(s, f);
130 }
131 };
132
133 // command (extension to AWS):
134 // POST
135 // Action=GetTopic&TopicArn=<topic-arn>
136 class RGWPSGetTopic_ObjStore_AWS : public RGWPSGetTopicOp {
137 public:
138 int get_params() override {
139 const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
140
141 if (!topic_arn || topic_arn->resource.empty()) {
142 ldpp_dout(this, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl;
143 return -EINVAL;
144 }
145
146 topic_name = topic_arn->resource;
147 return 0;
148 }
149
150 void send_response() override {
151 if (op_ret) {
152 set_req_state_err(s, op_ret);
153 }
154 dump_errno(s);
155 end_header(s, this, "application/xml");
156
157 if (op_ret < 0) {
158 return;
159 }
160
161 const auto f = s->formatter;
162 f->open_object_section("GetTopicResponse");
163 f->open_object_section("GetTopicResult");
164 encode_xml("Topic", result.topic, f);
165 f->close_section();
166 f->open_object_section("ResponseMetadata");
167 encode_xml("RequestId", s->req_id, f);
168 f->close_section();
169 f->close_section();
170 rgw_flush_formatter_and_reset(s, f);
171 }
172 };
173
174 // command (AWS compliant):
175 // POST
176 // Action=GetTopicAttributes&TopicArn=<topic-arn>
177 class RGWPSGetTopicAttributes_ObjStore_AWS : public RGWPSGetTopicOp {
178 public:
179 int get_params() override {
180 const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
181
182 if (!topic_arn || topic_arn->resource.empty()) {
183 ldpp_dout(this, 1) << "GetTopicAttribute Action 'TopicArn' argument is missing or invalid" << dendl;
184 return -EINVAL;
185 }
186
187 topic_name = topic_arn->resource;
188 return 0;
189 }
190
191 void send_response() override {
192 if (op_ret) {
193 set_req_state_err(s, op_ret);
194 }
195 dump_errno(s);
196 end_header(s, this, "application/xml");
197
198 if (op_ret < 0) {
199 return;
200 }
201
202 const auto f = s->formatter;
203 f->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS);
204 f->open_object_section("GetTopicAttributesResult");
205 result.topic.dump_xml_as_attributes(f);
206 f->close_section(); // GetTopicAttributesResult
207 f->open_object_section("ResponseMetadata");
208 encode_xml("RequestId", s->req_id, f);
209 f->close_section(); // ResponseMetadata
210 f->close_section(); // GetTopicAttributesResponse
211 rgw_flush_formatter_and_reset(s, f);
212 }
213 };
214
215 // command (AWS compliant):
216 // POST
217 // Action=DeleteTopic&TopicArn=<topic-arn>
218 class RGWPSDeleteTopic_ObjStore_AWS : public RGWPSDeleteTopicOp {
219 public:
220 int get_params() override {
221 const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
222
223 if (!topic_arn || topic_arn->resource.empty()) {
224 ldpp_dout(this, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
225 return -EINVAL;
226 }
227
228 topic_name = topic_arn->resource;
229
230 // upon deletion it is not known if topic is persistent or not
231 // will try to delete the persistent topic anyway
232 const auto ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
233 if (ret == -ENOENT) {
234 // topic was not persistent, or already deleted
235 return 0;
236 }
237 if (ret < 0) {
238 ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for persistent topics. error:" << ret << dendl;
239 return ret;
240 }
241
242 return 0;
243 }
244
245 void send_response() override {
246 if (op_ret) {
247 set_req_state_err(s, op_ret);
248 }
249 dump_errno(s);
250 end_header(s, this, "application/xml");
251
252 if (op_ret < 0) {
253 return;
254 }
255
256 const auto f = s->formatter;
257 f->open_object_section_in_ns("DeleteTopicResponse", AWS_SNS_NS);
258 f->open_object_section("ResponseMetadata");
259 encode_xml("RequestId", s->req_id, f);
260 f->close_section(); // ResponseMetadata
261 f->close_section(); // DeleteTopicResponse
262 rgw_flush_formatter_and_reset(s, f);
263 }
264 };
265
266 namespace {
267 // utility classes and functions for handling parameters with the following format:
268 // Attributes.entry.{N}.{key|value}={VALUE}
269 // N - any unsigned number
270 // VALUE - url encoded string
271
272 // and Attribute is holding key and value
273 // ctor and set are done according to the "type" argument
274 // if type is not "key" or "value" its a no-op
275 class Attribute {
276 std::string key;
277 std::string value;
278 public:
279 Attribute(const std::string& type, const std::string& key_or_value) {
280 set(type, key_or_value);
281 }
282 void set(const std::string& type, const std::string& key_or_value) {
283 if (type == "key") {
284 key = key_or_value;
285 } else if (type == "value") {
286 value = key_or_value;
287 }
288 }
289 const std::string& get_key() const { return key; }
290 const std::string& get_value() const { return value; }
291 };
292
293 using AttributeMap = std::map<unsigned, Attribute>;
294
295 // aggregate the attributes into a map
296 // the key and value are associated by the index (N)
297 // no assumptions are made on the order in which these parameters are added
298 void update_attribute_map(const std::string& input, AttributeMap& map) {
299 const boost::char_separator<char> sep(".");
300 const boost::tokenizer tokens(input, sep);
301 auto token = tokens.begin();
302 if (*token != "Attributes") {
303 return;
304 }
305 ++token;
306
307 if (*token != "entry") {
308 return;
309 }
310 ++token;
311
312 unsigned idx;
313 try {
314 idx = std::stoul(*token);
315 } catch (const std::invalid_argument&) {
316 return;
317 }
318 ++token;
319
320 std::string key_or_value = "";
321 // get the rest of the string regardless of dots
322 // this is to allow dots in the value
323 while (token != tokens.end()) {
324 key_or_value.append(*token+".");
325 ++token;
326 }
327 // remove last separator
328 key_or_value.pop_back();
329
330 auto pos = key_or_value.find("=");
331 if (pos != string::npos) {
332 const auto key_or_value_lhs = key_or_value.substr(0, pos);
333 const auto key_or_value_rhs = url_decode(key_or_value.substr(pos + 1, key_or_value.size() - 1));
334 const auto map_it = map.find(idx);
335 if (map_it == map.end()) {
336 // new entry
337 map.emplace(std::make_pair(idx, Attribute(key_or_value_lhs, key_or_value_rhs)));
338 } else {
339 // existing entry
340 map_it->second.set(key_or_value_lhs, key_or_value_rhs);
341 }
342 }
343 }
344 }
345
346 void RGWHandler_REST_PSTopic_AWS::rgw_topic_parse_input() {
347 if (post_body.size() > 0) {
348 ldpp_dout(s, 10) << "Content of POST: " << post_body << dendl;
349
350 if (post_body.find("Action") != string::npos) {
351 const boost::char_separator<char> sep("&");
352 const boost::tokenizer<boost::char_separator<char>> tokens(post_body, sep);
353 AttributeMap map;
354 for (const auto& t : tokens) {
355 auto pos = t.find("=");
356 if (pos != string::npos) {
357 const auto key = t.substr(0, pos);
358 if (key == "Action") {
359 s->info.args.append(key, t.substr(pos + 1, t.size() - 1));
360 } else if (key == "Name" || key == "TopicArn") {
361 const auto value = url_decode(t.substr(pos + 1, t.size() - 1));
362 s->info.args.append(key, value);
363 } else {
364 update_attribute_map(t, map);
365 }
366 }
367 }
368 // update the regular args with the content of the attribute map
369 for (const auto& attr : map) {
370 s->info.args.append(attr.second.get_key(), attr.second.get_value());
371 }
372 }
373 const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body);
374 s->info.args.append("PayloadHash", payload_hash);
375 }
376 }
377
378 RGWOp* RGWHandler_REST_PSTopic_AWS::op_post() {
379 rgw_topic_parse_input();
380
381 if (s->info.args.exists("Action")) {
382 const auto action = s->info.args.get("Action");
383 if (action.compare("CreateTopic") == 0)
384 return new RGWPSCreateTopic_ObjStore_AWS();
385 if (action.compare("DeleteTopic") == 0)
386 return new RGWPSDeleteTopic_ObjStore_AWS;
387 if (action.compare("ListTopics") == 0)
388 return new RGWPSListTopics_ObjStore_AWS();
389 if (action.compare("GetTopic") == 0)
390 return new RGWPSGetTopic_ObjStore_AWS();
391 if (action.compare("GetTopicAttributes") == 0)
392 return new RGWPSGetTopicAttributes_ObjStore_AWS();
393 }
394
395 return nullptr;
396 }
397
398 int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp, optional_yield y) {
399 return RGW_Auth_S3::authorize(dpp, store, auth_registry, s, y);
400 }
401
402
403 namespace {
404 // return a unique topic by prefexing with the notification name: <notification>_<topic>
405 std::string topic_to_unique(const std::string& topic, const std::string& notification) {
406 return notification + "_" + topic;
407 }
408
409 // extract the topic from a unique topic of the form: <notification>_<topic>
410 [[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
411 if (unique_topic.find(notification + "_") == string::npos) {
412 return "";
413 }
414 return unique_topic.substr(notification.length() + 1);
415 }
416
417 // from list of bucket topics, find the one that was auto-generated by a notification
418 auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std::string& notif_name) {
419 auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), [&](const auto& val) { return notif_name == val.second.s3_id; });
420 return it != bucket_topics.topics.end() ?
421 std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second):
422 std::nullopt;
423 }
424 }
425
426 int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) {
427 int op_ret = b->remove_notification(dpp, topic_name, y);
428 if (op_ret < 0) {
429 ldpp_dout(dpp, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
430 }
431 op_ret = ps.remove_topic(dpp, topic_name, y);
432 if (op_ret < 0) {
433 ldpp_dout(dpp, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
434 }
435 return op_ret;
436 }
437
438 int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) {
439 // delete all notifications of on a bucket
440 for (const auto& topic : bucket_topics.topics) {
441 // remove the auto generated subscription of the topic (if exist)
442 rgw_pubsub_topic_subs topic_subs;
443 int op_ret = ps.get_topic(topic.first, &topic_subs);
444 for (const auto& topic_sub_name : topic_subs.subs) {
445 auto sub = ps.get_sub(topic_sub_name);
446 rgw_pubsub_sub_config sub_conf;
447 op_ret = sub->get_conf(&sub_conf);
448 if (op_ret < 0) {
449 ldpp_dout(dpp, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl;
450 return op_ret;
451 }
452 if (!sub_conf.s3_id.empty()) {
453 // S3 notification, has autogenerated subscription
454 const auto& sub_topic_name = sub_conf.topic;
455 op_ret = sub->unsubscribe(dpp, sub_topic_name, y);
456 if (op_ret < 0) {
457 ldpp_dout(dpp, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl;
458 return op_ret;
459 }
460 }
461 }
462 op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
463 if (op_ret < 0) {
464 return op_ret;
465 }
466 }
467 return 0;
468 }
469
470 // command (S3 compliant): PUT /<bucket name>?notification
471 // a "notification" and a subscription will be auto-generated
472 // actual configuration is XML encoded in the body of the message
473 class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
474 rgw_pubsub_s3_notifications configurations;
475
476 int get_params_from_body() {
477 const auto max_size = s->cct->_conf->rgw_max_put_param_size;
478 int r;
479 bufferlist data;
480 std::tie(r, data) = read_all_input(s, max_size, false);
481
482 if (r < 0) {
483 ldpp_dout(this, 1) << "failed to read XML payload" << dendl;
484 return r;
485 }
486 if (data.length() == 0) {
487 ldpp_dout(this, 1) << "XML payload missing" << dendl;
488 return -EINVAL;
489 }
490
491 RGWXMLDecoder::XMLParser parser;
492
493 if (!parser.init()){
494 ldpp_dout(this, 1) << "failed to initialize XML parser" << dendl;
495 return -EINVAL;
496 }
497 if (!parser.parse(data.c_str(), data.length(), 1)) {
498 ldpp_dout(this, 1) << "failed to parse XML payload" << dendl;
499 return -ERR_MALFORMED_XML;
500 }
501 try {
502 // NotificationConfigurations is mandatory
503 // It can be empty which means we delete all the notifications
504 RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true);
505 } catch (RGWXMLDecoder::err& err) {
506 ldpp_dout(this, 1) << "failed to parse XML payload. error: " << err << dendl;
507 return -ERR_MALFORMED_XML;
508 }
509 return 0;
510 }
511
512 int get_params() override {
513 bool exists;
514 const auto no_value = s->info.args.get("notification", &exists);
515 if (!exists) {
516 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
517 return -EINVAL;
518 }
519 if (no_value.length() > 0) {
520 ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl;
521 return -EINVAL;
522 }
523 if (s->bucket_name.empty()) {
524 ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
525 return -EINVAL;
526 }
527 bucket_name = s->bucket_name;
528 return 0;
529 }
530
531 public:
532 const char* name() const override { return "pubsub_notification_create_s3"; }
533 void execute(optional_yield) override;
534 };
535
536 void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
537 op_ret = get_params_from_body();
538 if (op_ret < 0) {
539 return;
540 }
541
542 ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
543 auto b = ps->get_bucket(bucket_info.bucket);
544 ceph_assert(b);
545
546 std::string data_bucket_prefix = "";
547 std::string data_oid_prefix = "";
548 bool push_only = true;
549 if (store->get_sync_module()) {
550 const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->get_sync_module().get());
551 if (psmodule) {
552 const auto& conf = psmodule->get_effective_conf();
553 data_bucket_prefix = conf["data_bucket_prefix"];
554 data_oid_prefix = conf["data_oid_prefix"];
555 // TODO: allow "push-only" on PS zone as well
556 push_only = false;
557 }
558 }
559
560 if(configurations.list.empty()) {
561 // get all topics on a bucket
562 rgw_pubsub_bucket_topics bucket_topics;
563 op_ret = b->get_topics(&bucket_topics);
564 if (op_ret < 0) {
565 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
566 return;
567 }
568
569 op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps);
570 return;
571 }
572
573 for (const auto& c : configurations.list) {
574 const auto& notif_name = c.id;
575 if (notif_name.empty()) {
576 ldpp_dout(this, 1) << "missing notification id" << dendl;
577 op_ret = -EINVAL;
578 return;
579 }
580 if (c.topic_arn.empty()) {
581 ldpp_dout(this, 1) << "missing topic ARN in notification: '" << notif_name << "'" << dendl;
582 op_ret = -EINVAL;
583 return;
584 }
585
586 const auto arn = rgw::ARN::parse(c.topic_arn);
587 if (!arn || arn->resource.empty()) {
588 ldpp_dout(this, 1) << "topic ARN has invalid format: '" << c.topic_arn << "' in notification: '" << notif_name << "'" << dendl;
589 op_ret = -EINVAL;
590 return;
591 }
592
593 if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) {
594 ldpp_dout(this, 1) << "unknown event type in notification: '" << notif_name << "'" << dendl;
595 op_ret = -EINVAL;
596 return;
597 }
598
599 const auto topic_name = arn->resource;
600
601 // get topic information. destination information is stored in the topic
602 rgw_pubsub_topic topic_info;
603 op_ret = ps->get_topic(topic_name, &topic_info);
604 if (op_ret < 0) {
605 ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
606 return;
607 }
608 // make sure that full topic configuration match
609 // TODO: use ARN match function
610
611 // create unique topic name. this has 2 reasons:
612 // (1) topics cannot be shared between different S3 notifications because they hold the filter information
613 // (2) make topic clneaup easier, when notification is removed
614 const auto unique_topic_name = topic_to_unique(topic_name, notif_name);
615 // generate the internal topic. destination is stored here for the "push-only" case
616 // when no subscription exists
617 // ARN is cached to make the "GET" method faster
618 op_ret = ps->create_topic(this, unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
619 if (op_ret < 0) {
620 ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name <<
621 "', ret=" << op_ret << dendl;
622 return;
623 }
624 ldpp_dout(this, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl;
625 // generate the notification
626 rgw::notify::EventTypeList events;
627 op_ret = b->create_notification(this, unique_topic_name, c.events, std::make_optional(c.filter), notif_name, y);
628 if (op_ret < 0) {
629 ldpp_dout(this, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
630 "', ret=" << op_ret << dendl;
631 // rollback generated topic (ignore return value)
632 ps->remove_topic(this, unique_topic_name, y);
633 return;
634 }
635 ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
636
637 if (!push_only) {
638 // generate the subscription with destination information from the original topic
639 rgw_pubsub_sub_dest dest = topic_info.dest;
640 dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
641 dest.oid_prefix = data_oid_prefix + notif_name + "/";
642 auto sub = ps->get_sub(notif_name);
643 op_ret = sub->subscribe(this, unique_topic_name, dest, y, notif_name);
644 if (op_ret < 0) {
645 ldpp_dout(this, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl;
646 // rollback generated notification (ignore return value)
647 b->remove_notification(this, unique_topic_name, y);
648 // rollback generated topic (ignore return value)
649 ps->remove_topic(this, unique_topic_name, y);
650 return;
651 }
652 ldpp_dout(this, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl;
653 }
654 }
655 }
656
657 // command (extension to S3): DELETE /bucket?notification[=<notification-id>]
658 class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSDeleteNotifOp {
659 private:
660 std::string notif_name;
661
662 int get_params() override {
663 bool exists;
664 notif_name = s->info.args.get("notification", &exists);
665 if (!exists) {
666 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
667 return -EINVAL;
668 }
669 if (s->bucket_name.empty()) {
670 ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
671 return -EINVAL;
672 }
673 bucket_name = s->bucket_name;
674 return 0;
675 }
676
677 public:
678 void execute(optional_yield y) override;
679 const char* name() const override { return "pubsub_notification_delete_s3"; }
680 };
681
682 void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) {
683 op_ret = get_params();
684 if (op_ret < 0) {
685 return;
686 }
687
688 ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
689 auto b = ps->get_bucket(bucket_info.bucket);
690 ceph_assert(b);
691
692 // get all topics on a bucket
693 rgw_pubsub_bucket_topics bucket_topics;
694 op_ret = b->get_topics(&bucket_topics);
695 if (op_ret < 0) {
696 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
697 return;
698 }
699
700 if (!notif_name.empty()) {
701 // delete a specific notification
702 const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
703 if (unique_topic) {
704 // remove the auto generated subscription according to notification name (if exist)
705 const auto unique_topic_name = unique_topic->get().topic.name;
706 auto sub = ps->get_sub(notif_name);
707 op_ret = sub->unsubscribe(this, unique_topic_name, y);
708 if (op_ret < 0 && op_ret != -ENOENT) {
709 ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
710 return;
711 }
712 op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, *ps);
713 return;
714 }
715 // notification to be removed is not found - considered success
716 ldpp_dout(this, 20) << "notification '" << notif_name << "' already removed" << dendl;
717 return;
718 }
719
720 op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps);
721 }
722
723 // command (S3 compliant): GET /bucket?notification[=<notification-id>]
724 class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
725 private:
726 std::string notif_name;
727 rgw_pubsub_s3_notifications notifications;
728
729 int get_params() override {
730 bool exists;
731 notif_name = s->info.args.get("notification", &exists);
732 if (!exists) {
733 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
734 return -EINVAL;
735 }
736 if (s->bucket_name.empty()) {
737 ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
738 return -EINVAL;
739 }
740 bucket_name = s->bucket_name;
741 return 0;
742 }
743
744 public:
745 void execute(optional_yield y) override;
746 void send_response() override {
747 if (op_ret) {
748 set_req_state_err(s, op_ret);
749 }
750 dump_errno(s);
751 end_header(s, this, "application/xml");
752
753 if (op_ret < 0) {
754 return;
755 }
756 notifications.dump_xml(s->formatter);
757 rgw_flush_formatter_and_reset(s, s->formatter);
758 }
759 const char* name() const override { return "pubsub_notifications_get_s3"; }
760 };
761
762 void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y) {
763 ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
764 auto b = ps->get_bucket(bucket_info.bucket);
765 ceph_assert(b);
766
767 // get all topics on a bucket
768 rgw_pubsub_bucket_topics bucket_topics;
769 op_ret = b->get_topics(&bucket_topics);
770 if (op_ret < 0) {
771 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
772 return;
773 }
774 if (!notif_name.empty()) {
775 // get info of a specific notification
776 const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
777 if (unique_topic) {
778 notifications.list.emplace_back(unique_topic->get());
779 return;
780 }
781 op_ret = -ENOENT;
782 ldpp_dout(this, 1) << "failed to get notification info for '" << notif_name << "', ret=" << op_ret << dendl;
783 return;
784 }
785 // loop through all topics of the bucket
786 for (const auto& topic : bucket_topics.topics) {
787 if (topic.second.s3_id.empty()) {
788 // not an s3 notification
789 continue;
790 }
791 notifications.list.emplace_back(topic.second);
792 }
793 }
794
795 RGWOp* RGWHandler_REST_PSNotifs_S3::op_get() {
796 return new RGWPSListNotifs_ObjStore_S3();
797 }
798
799 RGWOp* RGWHandler_REST_PSNotifs_S3::op_put() {
800 return new RGWPSCreateNotif_ObjStore_S3();
801 }
802
803 RGWOp* RGWHandler_REST_PSNotifs_S3::op_delete() {
804 return new RGWPSDeleteNotif_ObjStore_S3();
805 }
806
807 RGWOp* RGWHandler_REST_PSNotifs_S3::create_get_op() {
808 return new RGWPSListNotifs_ObjStore_S3();
809 }
810
811 RGWOp* RGWHandler_REST_PSNotifs_S3::create_put_op() {
812 return new RGWPSCreateNotif_ObjStore_S3();
813 }
814
815 RGWOp* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
816 return new RGWPSDeleteNotif_ObjStore_S3();
817 }
818