]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_rest_pubsub.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / rgw / rgw_rest_pubsub.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <algorithm>
5 #include <boost/tokenizer.hpp>
6 #include <optional>
7 #include "rgw_rest_pubsub.h"
8 #include "rgw_pubsub_push.h"
9 #include "rgw_pubsub.h"
10 #include "rgw_op.h"
11 #include "rgw_rest.h"
12 #include "rgw_rest_s3.h"
13 #include "rgw_arn.h"
14 #include "rgw_auth_s3.h"
15 #include "rgw_notify.h"
16 #include "services/svc_zone.h"
17 #include "common/dout.h"
18 #include "rgw_url.h"
19
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_rgw
22
23 static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/");
24
25 bool verify_transport_security(CephContext *cct, const RGWEnv& env) {
26 const auto is_secure = rgw_transport_is_secure(cct, env);
27 if (!is_secure && g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) {
28 ldout(cct, 0) << "WARNING: bypassing endpoint validation, allows sending secrets over insecure transport" << dendl;
29 return true;
30 }
31 return is_secure;
32 }
33
34 // make sure that endpoint is a valid URL
35 // make sure that if user/password are passed inside URL, it is over secure connection
36 // update rgw_pubsub_dest to indicate that a password is stored in the URL
37 bool validate_and_update_endpoint_secret(rgw_pubsub_dest& dest, CephContext *cct, const RGWEnv& env) {
38 if (dest.push_endpoint.empty()) {
39 return true;
40 }
41 std::string user;
42 std::string password;
43 if (!rgw::parse_url_userinfo(dest.push_endpoint, user, password)) {
44 ldout(cct, 1) << "endpoint validation error: malformed endpoint URL:" << dest.push_endpoint << dendl;
45 return false;
46 }
47 // this should be verified inside parse_url()
48 ceph_assert(user.empty() == password.empty());
49 if (!user.empty()) {
50 dest.stored_secret = true;
51 if (!verify_transport_security(cct, env)) {
52 ldout(cct, 1) << "endpoint validation error: sending secrets over insecure transport" << dendl;
53 return false;
54 }
55 }
56 return true;
57 }
58
59 bool topic_has_endpoint_secret(const rgw_pubsub_topic& topic) {
60 return topic.dest.stored_secret;
61 }
62
63 bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
64 for (const auto& topic : topics.topics) {
65 if (topic_has_endpoint_secret(topic.second)) return true;
66 }
67 return false;
68 }
69
70 // command (AWS compliant):
71 // POST
72 // Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
73 class RGWPSCreateTopicOp : public RGWOp {
74 private:
75 std::string topic_name;
76 rgw_pubsub_dest dest;
77 std::string topic_arn;
78 std::string opaque_data;
79
80 int get_params() {
81 topic_name = s->info.args.get("Name");
82 if (topic_name.empty()) {
83 ldpp_dout(this, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
84 return -EINVAL;
85 }
86
87 opaque_data = s->info.args.get("OpaqueData");
88
89 dest.push_endpoint = s->info.args.get("push-endpoint");
90 s->info.args.get_bool("persistent", &dest.persistent, false);
91
92 if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
93 return -EINVAL;
94 }
95 for (const auto& param : s->info.args.get_params()) {
96 if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
97 continue;
98 }
99 dest.push_endpoint_args.append(param.first+"="+param.second+"&");
100 }
101
102 if (!dest.push_endpoint_args.empty()) {
103 // remove last separator
104 dest.push_endpoint_args.pop_back();
105 }
106 if (!dest.push_endpoint.empty() && dest.persistent) {
107 const auto ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
108 if (ret < 0) {
109 ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for persistent topics. error:" << ret << dendl;
110 return ret;
111 }
112 }
113
114 // dest object only stores endpoint info
115 dest.arn_topic = topic_name;
116 // the topic ARN will be sent in the reply
117 const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
118 driver->get_zone()->get_zonegroup().get_name(),
119 s->user->get_tenant(), topic_name);
120 topic_arn = arn.to_string();
121 return 0;
122 }
123
124 public:
125 int verify_permission(optional_yield) override {
126 return 0;
127 }
128
129 void pre_exec() override {
130 rgw_bucket_object_pre_exec(s);
131 }
132 void execute(optional_yield) override;
133
134 const char* name() const override { return "pubsub_topic_create"; }
135 RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
136 uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
137
138 void send_response() override {
139 if (op_ret) {
140 set_req_state_err(s, op_ret);
141 }
142 dump_errno(s);
143 end_header(s, this, "application/xml");
144
145 if (op_ret < 0) {
146 return;
147 }
148
149 const auto f = s->formatter;
150 f->open_object_section_in_ns("CreateTopicResponse", AWS_SNS_NS);
151 f->open_object_section("CreateTopicResult");
152 encode_xml("TopicArn", topic_arn, f);
153 f->close_section(); // CreateTopicResult
154 f->open_object_section("ResponseMetadata");
155 encode_xml("RequestId", s->req_id, f);
156 f->close_section(); // ResponseMetadata
157 f->close_section(); // CreateTopicResponse
158 rgw_flush_formatter_and_reset(s, f);
159 }
160 };
161
162 void RGWPSCreateTopicOp::execute(optional_yield y) {
163 op_ret = get_params();
164 if (op_ret < 0) {
165 return;
166 }
167
168 const RGWPubSub ps(driver, s->owner.get_id().tenant);
169 op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data, y);
170 if (op_ret < 0) {
171 ldpp_dout(this, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
172 return;
173 }
174 ldpp_dout(this, 20) << "successfully created topic '" << topic_name << "'" << dendl;
175 }
176
177 // command (AWS compliant):
178 // POST
179 // Action=ListTopics
180 class RGWPSListTopicsOp : public RGWOp {
181 private:
182 rgw_pubsub_topics result;
183
184 public:
185 int verify_permission(optional_yield) override {
186 return 0;
187 }
188 void pre_exec() override {
189 rgw_bucket_object_pre_exec(s);
190 }
191 void execute(optional_yield) override;
192
193 const char* name() const override { return "pubsub_topics_list"; }
194 RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
195 uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
196
197 void send_response() override {
198 if (op_ret) {
199 set_req_state_err(s, op_ret);
200 }
201 dump_errno(s);
202 end_header(s, this, "application/xml");
203
204 if (op_ret < 0) {
205 return;
206 }
207
208 const auto f = s->formatter;
209 f->open_object_section_in_ns("ListTopicsResponse", AWS_SNS_NS);
210 f->open_object_section("ListTopicsResult");
211 encode_xml("Topics", result, f);
212 f->close_section(); // ListTopicsResult
213 f->open_object_section("ResponseMetadata");
214 encode_xml("RequestId", s->req_id, f);
215 f->close_section(); // ResponseMetadat
216 f->close_section(); // ListTopicsResponse
217 rgw_flush_formatter_and_reset(s, f);
218 }
219 };
220
221 void RGWPSListTopicsOp::execute(optional_yield y) {
222 const RGWPubSub ps(driver, s->owner.get_id().tenant);
223 op_ret = ps.get_topics(this, result, y);
224 // if there are no topics it is not considered an error
225 op_ret = op_ret == -ENOENT ? 0 : op_ret;
226 if (op_ret < 0) {
227 ldpp_dout(this, 1) << "failed to get topics, ret=" << op_ret << dendl;
228 return;
229 }
230 if (topics_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
231 ldpp_dout(this, 1) << "topics contain secrets and cannot be sent over insecure transport" << dendl;
232 op_ret = -EPERM;
233 return;
234 }
235 ldpp_dout(this, 20) << "successfully got topics" << dendl;
236 }
237
238 // command (extension to AWS):
239 // POST
240 // Action=GetTopic&TopicArn=<topic-arn>
241 class RGWPSGetTopicOp : public RGWOp {
242 private:
243 std::string topic_name;
244 rgw_pubsub_topic result;
245
246 int get_params() {
247 const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
248
249 if (!topic_arn || topic_arn->resource.empty()) {
250 ldpp_dout(this, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl;
251 return -EINVAL;
252 }
253
254 topic_name = topic_arn->resource;
255 return 0;
256 }
257
258 public:
259 int verify_permission(optional_yield y) override {
260 return 0;
261 }
262 void pre_exec() override {
263 rgw_bucket_object_pre_exec(s);
264 }
265 void execute(optional_yield y) override;
266
267 const char* name() const override { return "pubsub_topic_get"; }
268 RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
269 uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
270
271 void send_response() override {
272 if (op_ret) {
273 set_req_state_err(s, op_ret);
274 }
275 dump_errno(s);
276 end_header(s, this, "application/xml");
277
278 if (op_ret < 0) {
279 return;
280 }
281
282 const auto f = s->formatter;
283 f->open_object_section("GetTopicResponse");
284 f->open_object_section("GetTopicResult");
285 encode_xml("Topic", result, f);
286 f->close_section();
287 f->open_object_section("ResponseMetadata");
288 encode_xml("RequestId", s->req_id, f);
289 f->close_section();
290 f->close_section();
291 rgw_flush_formatter_and_reset(s, f);
292 }
293 };
294
295 void RGWPSGetTopicOp::execute(optional_yield y) {
296 op_ret = get_params();
297 if (op_ret < 0) {
298 return;
299 }
300 const RGWPubSub ps(driver, s->owner.get_id().tenant);
301 op_ret = ps.get_topic(this, topic_name, result, y);
302 if (op_ret < 0) {
303 ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
304 return;
305 }
306 if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
307 ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
308 op_ret = -EPERM;
309 return;
310 }
311 ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
312 }
313
314 // command (AWS compliant):
315 // POST
316 // Action=GetTopicAttributes&TopicArn=<topic-arn>
317 class RGWPSGetTopicAttributesOp : public RGWOp {
318 private:
319 std::string topic_name;
320 rgw_pubsub_topic result;
321
322 int get_params() {
323 const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
324
325 if (!topic_arn || topic_arn->resource.empty()) {
326 ldpp_dout(this, 1) << "GetTopicAttribute Action 'TopicArn' argument is missing or invalid" << dendl;
327 return -EINVAL;
328 }
329
330 topic_name = topic_arn->resource;
331 return 0;
332 }
333
334 public:
335 int verify_permission(optional_yield y) override {
336 return 0;
337 }
338 void pre_exec() override {
339 rgw_bucket_object_pre_exec(s);
340 }
341 void execute(optional_yield y) override;
342
343 const char* name() const override { return "pubsub_topic_get"; }
344 RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
345 uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
346
347 void send_response() override {
348 if (op_ret) {
349 set_req_state_err(s, op_ret);
350 }
351 dump_errno(s);
352 end_header(s, this, "application/xml");
353
354 if (op_ret < 0) {
355 return;
356 }
357
358 const auto f = s->formatter;
359 f->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS);
360 f->open_object_section("GetTopicAttributesResult");
361 result.dump_xml_as_attributes(f);
362 f->close_section(); // GetTopicAttributesResult
363 f->open_object_section("ResponseMetadata");
364 encode_xml("RequestId", s->req_id, f);
365 f->close_section(); // ResponseMetadata
366 f->close_section(); // GetTopicAttributesResponse
367 rgw_flush_formatter_and_reset(s, f);
368 }
369 };
370
371 void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
372 op_ret = get_params();
373 if (op_ret < 0) {
374 return;
375 }
376 const RGWPubSub ps(driver, s->owner.get_id().tenant);
377 op_ret = ps.get_topic(this, topic_name, result, y);
378 if (op_ret < 0) {
379 ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
380 return;
381 }
382 if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
383 ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
384 op_ret = -EPERM;
385 return;
386 }
387 ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
388 }
389
390 // command (AWS compliant):
391 // POST
392 // Action=DeleteTopic&TopicArn=<topic-arn>
393 class RGWPSDeleteTopicOp : public RGWOp {
394 private:
395 std::string topic_name;
396
397 int get_params() {
398 const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
399
400 if (!topic_arn || topic_arn->resource.empty()) {
401 ldpp_dout(this, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
402 return -EINVAL;
403 }
404
405 topic_name = topic_arn->resource;
406
407 // upon deletion it is not known if topic is persistent or not
408 // will try to delete the persistent topic anyway
409 const auto ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
410 if (ret == -ENOENT) {
411 // topic was not persistent, or already deleted
412 return 0;
413 }
414 if (ret < 0) {
415 ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for persistent topics. error:" << ret << dendl;
416 return ret;
417 }
418
419 return 0;
420 }
421
422 public:
423 int verify_permission(optional_yield) override {
424 return 0;
425 }
426 void pre_exec() override {
427 rgw_bucket_object_pre_exec(s);
428 }
429 void execute(optional_yield y) override;
430
431 const char* name() const override { return "pubsub_topic_delete"; }
432 RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
433 uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
434
435 void send_response() override {
436 if (op_ret) {
437 set_req_state_err(s, op_ret);
438 }
439 dump_errno(s);
440 end_header(s, this, "application/xml");
441
442 if (op_ret < 0) {
443 return;
444 }
445
446 const auto f = s->formatter;
447 f->open_object_section_in_ns("DeleteTopicResponse", AWS_SNS_NS);
448 f->open_object_section("ResponseMetadata");
449 encode_xml("RequestId", s->req_id, f);
450 f->close_section(); // ResponseMetadata
451 f->close_section(); // DeleteTopicResponse
452 rgw_flush_formatter_and_reset(s, f);
453 }
454 };
455
456 void RGWPSDeleteTopicOp::execute(optional_yield y) {
457 op_ret = get_params();
458 if (op_ret < 0) {
459 return;
460 }
461 const RGWPubSub ps(driver, s->owner.get_id().tenant);
462 op_ret = ps.remove_topic(this, topic_name, y);
463 if (op_ret < 0) {
464 ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
465 return;
466 }
467 ldpp_dout(this, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
468 }
469
470 using op_generator = RGWOp*(*)();
471 static const std::unordered_map<std::string, op_generator> op_generators = {
472 {"CreateTopic", []() -> RGWOp* {return new RGWPSCreateTopicOp;}},
473 {"DeleteTopic", []() -> RGWOp* {return new RGWPSDeleteTopicOp;}},
474 {"ListTopics", []() -> RGWOp* {return new RGWPSListTopicsOp;}},
475 {"GetTopic", []() -> RGWOp* {return new RGWPSGetTopicOp;}},
476 {"GetTopicAttributes", []() -> RGWOp* {return new RGWPSGetTopicAttributesOp;}}
477 };
478
479 bool RGWHandler_REST_PSTopic_AWS::action_exists(const req_state* s)
480 {
481 if (s->info.args.exists("Action")) {
482 const std::string action_name = s->info.args.get("Action");
483 return op_generators.contains(action_name);
484 }
485 return false;
486 }
487
488 RGWOp *RGWHandler_REST_PSTopic_AWS::op_post()
489 {
490 s->dialect = "sns";
491 s->prot_flags = RGW_REST_STS;
492
493 if (s->info.args.exists("Action")) {
494 const std::string action_name = s->info.args.get("Action");
495 const auto action_it = op_generators.find(action_name);
496 if (action_it != op_generators.end()) {
497 return action_it->second();
498 }
499 ldpp_dout(s, 10) << "unknown action '" << action_name << "' for Topic handler" << dendl;
500 } else {
501 ldpp_dout(s, 10) << "missing action argument in Topic handler" << dendl;
502 }
503 return nullptr;
504 }
505
506 int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp, optional_yield y) {
507 const auto rc = RGW_Auth_S3::authorize(dpp, driver, auth_registry, s, y);
508 if (rc < 0) {
509 return rc;
510 }
511 if (s->auth.identity->is_anonymous()) {
512 ldpp_dout(dpp, 1) << "anonymous user not allowed in topic operations" << dendl;
513 return -ERR_INVALID_REQUEST;
514 }
515 return 0;
516 }
517
518 namespace {
519 // return a unique topic by prefexing with the notification name: <notification>_<topic>
520 std::string topic_to_unique(const std::string& topic, const std::string& notification) {
521 return notification + "_" + topic;
522 }
523
524 // extract the topic from a unique topic of the form: <notification>_<topic>
525 [[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
526 if (unique_topic.find(notification + "_") == std::string::npos) {
527 return "";
528 }
529 return unique_topic.substr(notification.length() + 1);
530 }
531
532 // from list of bucket topics, find the one that was auto-generated by a notification
533 auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std::string& notif_name) {
534 auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), [&](const auto& val) { return notif_name == val.second.s3_id; });
535 return it != bucket_topics.topics.end() ?
536 std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second):
537 std::nullopt;
538 }
539 }
540
541 int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) {
542 int op_ret = b.remove_notification(dpp, topic_name, y);
543 if (op_ret < 0) {
544 ldpp_dout(dpp, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
545 }
546 op_ret = ps.remove_topic(dpp, topic_name, y);
547 if (op_ret < 0) {
548 ldpp_dout(dpp, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
549 }
550 return op_ret;
551 }
552
553 int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) {
554 // delete all notifications of on a bucket
555 for (const auto& topic : bucket_topics.topics) {
556 const auto op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
557 if (op_ret < 0) {
558 return op_ret;
559 }
560 }
561 return 0;
562 }
563
564 // command (S3 compliant): PUT /<bucket name>?notification
565 // a "notification" and a subscription will be auto-generated
566 // actual configuration is XML encoded in the body of the message
567 class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
568 int verify_params() override {
569 bool exists;
570 const auto no_value = s->info.args.get("notification", &exists);
571 if (!exists) {
572 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
573 return -EINVAL;
574 }
575 if (no_value.length() > 0) {
576 ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl;
577 return -EINVAL;
578 }
579 if (s->bucket_name.empty()) {
580 ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
581 return -EINVAL;
582 }
583 return 0;
584 }
585
586 int get_params_from_body(rgw_pubsub_s3_notifications& configurations) {
587 const auto max_size = s->cct->_conf->rgw_max_put_param_size;
588 int r;
589 bufferlist data;
590 std::tie(r, data) = read_all_input(s, max_size, false);
591
592 if (r < 0) {
593 ldpp_dout(this, 1) << "failed to read XML payload" << dendl;
594 return r;
595 }
596 if (data.length() == 0) {
597 ldpp_dout(this, 1) << "XML payload missing" << dendl;
598 return -EINVAL;
599 }
600
601 RGWXMLDecoder::XMLParser parser;
602
603 if (!parser.init()){
604 ldpp_dout(this, 1) << "failed to initialize XML parser" << dendl;
605 return -EINVAL;
606 }
607 if (!parser.parse(data.c_str(), data.length(), 1)) {
608 ldpp_dout(this, 1) << "failed to parse XML payload" << dendl;
609 return -ERR_MALFORMED_XML;
610 }
611 try {
612 // NotificationConfigurations is mandatory
613 // It can be empty which means we delete all the notifications
614 RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true);
615 } catch (RGWXMLDecoder::err& err) {
616 ldpp_dout(this, 1) << "failed to parse XML payload. error: " << err << dendl;
617 return -ERR_MALFORMED_XML;
618 }
619 return 0;
620 }
621 public:
622 int verify_permission(optional_yield y) override;
623
624 void pre_exec() override {
625 rgw_bucket_object_pre_exec(s);
626 }
627
628 const char* name() const override { return "pubsub_notification_create_s3"; }
629 RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
630 uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
631
632
633 void execute(optional_yield) override;
634 };
635
636 void RGWPSCreateNotifOp::execute(optional_yield y) {
637 op_ret = verify_params();
638 if (op_ret < 0) {
639 return;
640 }
641
642 rgw_pubsub_s3_notifications configurations;
643 op_ret = get_params_from_body(configurations);
644 if (op_ret < 0) {
645 return;
646 }
647
648 std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
649 std::unique_ptr<rgw::sal::Bucket> bucket;
650 op_ret = driver->get_bucket(this, user.get(), s->bucket_tenant, s->bucket_name, &bucket, y);
651 if (op_ret < 0) {
652 ldpp_dout(this, 1) << "failed to get bucket '" <<
653 (s->bucket_tenant.empty() ? s->bucket_name : s->bucket_tenant + ":" + s->bucket_name) <<
654 "' info, ret = " << op_ret << dendl;
655 return;
656 }
657
658 const RGWPubSub ps(driver, s->owner.get_id().tenant);
659 const RGWPubSub::Bucket b(ps, bucket.get());
660
661 if(configurations.list.empty()) {
662 // get all topics on a bucket
663 rgw_pubsub_bucket_topics bucket_topics;
664 op_ret = b.get_topics(this, bucket_topics, y);
665 if (op_ret < 0) {
666 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
667 return;
668 }
669
670 op_ret = delete_all_notifications(this, bucket_topics, b, y, ps);
671 return;
672 }
673
674 for (const auto& c : configurations.list) {
675 const auto& notif_name = c.id;
676 if (notif_name.empty()) {
677 ldpp_dout(this, 1) << "missing notification id" << dendl;
678 op_ret = -EINVAL;
679 return;
680 }
681 if (c.topic_arn.empty()) {
682 ldpp_dout(this, 1) << "missing topic ARN in notification: '" << notif_name << "'" << dendl;
683 op_ret = -EINVAL;
684 return;
685 }
686
687 const auto arn = rgw::ARN::parse(c.topic_arn);
688 if (!arn || arn->resource.empty()) {
689 ldpp_dout(this, 1) << "topic ARN has invalid format: '" << c.topic_arn << "' in notification: '" << notif_name << "'" << dendl;
690 op_ret = -EINVAL;
691 return;
692 }
693
694 if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) {
695 ldpp_dout(this, 1) << "unknown event type in notification: '" << notif_name << "'" << dendl;
696 op_ret = -EINVAL;
697 return;
698 }
699
700 const auto topic_name = arn->resource;
701
702 // get topic information. destination information is stored in the topic
703 rgw_pubsub_topic topic_info;
704 op_ret = ps.get_topic(this, topic_name, topic_info, y);
705 if (op_ret < 0) {
706 ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
707 return;
708 }
709 // make sure that full topic configuration match
710 // TODO: use ARN match function
711
712 // create unique topic name. this has 2 reasons:
713 // (1) topics cannot be shared between different S3 notifications because they hold the filter information
714 // (2) make topic clneaup easier, when notification is removed
715 const auto unique_topic_name = topic_to_unique(topic_name, notif_name);
716 // generate the internal topic. destination is stored here for the "push-only" case
717 // when no subscription exists
718 // ARN is cached to make the "GET" method faster
719 op_ret = ps.create_topic(this, unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
720 if (op_ret < 0) {
721 ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name <<
722 "', ret=" << op_ret << dendl;
723 return;
724 }
725 ldpp_dout(this, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl;
726 // generate the notification
727 rgw::notify::EventTypeList events;
728 op_ret = b.create_notification(this, unique_topic_name, c.events, std::make_optional(c.filter), notif_name, y);
729 if (op_ret < 0) {
730 ldpp_dout(this, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
731 "', ret=" << op_ret << dendl;
732 // rollback generated topic (ignore return value)
733 ps.remove_topic(this, unique_topic_name, y);
734 return;
735 }
736 ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
737 }
738 }
739
740 int RGWPSCreateNotifOp::verify_permission(optional_yield y) {
741 if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) {
742 return -EACCES;
743 }
744
745 return 0;
746 }
747
748 // command (extension to S3): DELETE /bucket?notification[=<notification-id>]
749 class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
750 int get_params(std::string& notif_name) const {
751 bool exists;
752 notif_name = s->info.args.get("notification", &exists);
753 if (!exists) {
754 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
755 return -EINVAL;
756 }
757 if (s->bucket_name.empty()) {
758 ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
759 return -EINVAL;
760 }
761 return 0;
762 }
763
764 public:
765 int verify_permission(optional_yield y) override;
766
767 void pre_exec() override {
768 rgw_bucket_object_pre_exec(s);
769 }
770
771 const char* name() const override { return "pubsub_notification_delete_s3"; }
772 RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
773 uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
774
775 void execute(optional_yield y) override;
776 };
777
778 void RGWPSDeleteNotifOp::execute(optional_yield y) {
779 std::string notif_name;
780 op_ret = get_params(notif_name);
781 if (op_ret < 0) {
782 return;
783 }
784
785 std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
786 std::unique_ptr<rgw::sal::Bucket> bucket;
787 op_ret = driver->get_bucket(this, user.get(), s->bucket_tenant, s->bucket_name, &bucket, y);
788 if (op_ret < 0) {
789 ldpp_dout(this, 1) << "failed to get bucket '" <<
790 (s->bucket_tenant.empty() ? s->bucket_name : s->bucket_tenant + ":" + s->bucket_name) <<
791 "' info, ret = " << op_ret << dendl;
792 return;
793 }
794
795 const RGWPubSub ps(driver, s->owner.get_id().tenant);
796 const RGWPubSub::Bucket b(ps, bucket.get());
797
798 // get all topics on a bucket
799 rgw_pubsub_bucket_topics bucket_topics;
800 op_ret = b.get_topics(this, bucket_topics, y);
801 if (op_ret < 0) {
802 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
803 return;
804 }
805
806 if (!notif_name.empty()) {
807 // delete a specific notification
808 const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
809 if (unique_topic) {
810 const auto unique_topic_name = unique_topic->get().topic.name;
811 op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, ps);
812 return;
813 }
814 // notification to be removed is not found - considered success
815 ldpp_dout(this, 20) << "notification '" << notif_name << "' already removed" << dendl;
816 return;
817 }
818
819 op_ret = delete_all_notifications(this, bucket_topics, b, y, ps);
820 }
821
822 int RGWPSDeleteNotifOp::verify_permission(optional_yield y) {
823 if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) {
824 return -EACCES;
825 }
826
827 return 0;
828 }
829
830 // command (S3 compliant): GET /bucket?notification[=<notification-id>]
831 class RGWPSListNotifsOp : public RGWOp {
832 rgw_pubsub_s3_notifications notifications;
833
834 int get_params(std::string& notif_name) const {
835 bool exists;
836 notif_name = s->info.args.get("notification", &exists);
837 if (!exists) {
838 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
839 return -EINVAL;
840 }
841 if (s->bucket_name.empty()) {
842 ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
843 return -EINVAL;
844 }
845 return 0;
846 }
847
848 public:
849 int verify_permission(optional_yield y) override;
850
851 void pre_exec() override {
852 rgw_bucket_object_pre_exec(s);
853 }
854
855 const char* name() const override { return "pubsub_notifications_get_s3"; }
856 RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
857 uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
858
859 void execute(optional_yield y) override;
860 void send_response() override {
861 if (op_ret) {
862 set_req_state_err(s, op_ret);
863 }
864 dump_errno(s);
865 end_header(s, this, "application/xml");
866
867 if (op_ret < 0) {
868 return;
869 }
870 notifications.dump_xml(s->formatter);
871 rgw_flush_formatter_and_reset(s, s->formatter);
872 }
873 };
874
875 void RGWPSListNotifsOp::execute(optional_yield y) {
876 std::string notif_name;
877 op_ret = get_params(notif_name);
878 if (op_ret < 0) {
879 return;
880 }
881
882 std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
883 std::unique_ptr<rgw::sal::Bucket> bucket;
884 op_ret = driver->get_bucket(this, user.get(), s->bucket_tenant, s->bucket_name, &bucket, y);
885 if (op_ret < 0) {
886 ldpp_dout(this, 1) << "failed to get bucket '" <<
887 (s->bucket_tenant.empty() ? s->bucket_name : s->bucket_tenant + ":" + s->bucket_name) <<
888 "' info, ret = " << op_ret << dendl;
889 return;
890 }
891
892 const RGWPubSub ps(driver, s->owner.get_id().tenant);
893 const RGWPubSub::Bucket b(ps, bucket.get());
894
895 // get all topics on a bucket
896 rgw_pubsub_bucket_topics bucket_topics;
897 op_ret = b.get_topics(this, bucket_topics, y);
898 if (op_ret < 0) {
899 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
900 return;
901 }
902 if (!notif_name.empty()) {
903 // get info of a specific notification
904 const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
905 if (unique_topic) {
906 notifications.list.emplace_back(unique_topic->get());
907 return;
908 }
909 op_ret = -ENOENT;
910 ldpp_dout(this, 1) << "failed to get notification info for '" << notif_name << "', ret=" << op_ret << dendl;
911 return;
912 }
913 // loop through all topics of the bucket
914 for (const auto& topic : bucket_topics.topics) {
915 if (topic.second.s3_id.empty()) {
916 // not an s3 notification
917 continue;
918 }
919 notifications.list.emplace_back(topic.second);
920 }
921 }
922
923 int RGWPSListNotifsOp::verify_permission(optional_yield y) {
924 if (!verify_bucket_permission(this, s, rgw::IAM::s3GetBucketNotification)) {
925 return -EACCES;
926 }
927
928 return 0;
929 }
930
931 RGWOp* RGWHandler_REST_PSNotifs_S3::op_get() {
932 return new RGWPSListNotifsOp();
933 }
934
935 RGWOp* RGWHandler_REST_PSNotifs_S3::op_put() {
936 return new RGWPSCreateNotifOp();
937 }
938
939 RGWOp* RGWHandler_REST_PSNotifs_S3::op_delete() {
940 return new RGWPSDeleteNotifOp();
941 }
942
943 RGWOp* RGWHandler_REST_PSNotifs_S3::create_get_op() {
944 return new RGWPSListNotifsOp();
945 }
946
947 RGWOp* RGWHandler_REST_PSNotifs_S3::create_put_op() {
948 return new RGWPSCreateNotifOp();
949 }
950
951 RGWOp* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
952 return new RGWPSDeleteNotifOp();
953 }
954