]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_rest_pubsub.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_rest_pubsub.cc
CommitLineData
eafe8130
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <algorithm>
5#include <boost/tokenizer.hpp>
6#include <optional>
eafe8130
TL
7#include "rgw_rest_pubsub.h"
8#include "rgw_pubsub_push.h"
9#include "rgw_pubsub.h"
eafe8130
TL
10#include "rgw_op.h"
11#include "rgw_rest.h"
12#include "rgw_rest_s3.h"
13#include "rgw_arn.h"
14#include "rgw_auth_s3.h"
f67539c2 15#include "rgw_notify.h"
eafe8130 16#include "services/svc_zone.h"
1e59de90
TL
17#include "common/dout.h"
18#include "rgw_url.h"
eafe8130
TL
19
20#define dout_context g_ceph_context
21#define dout_subsys ceph_subsys_rgw
22
f67539c2 23static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/");
9f95a23c 24
1e59de90
TL
25bool verify_transport_security(CephContext *cct, const RGWEnv& env) {
26 const auto is_secure = rgw_transport_is_secure(cct, env);
27 if (!is_secure && g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) {
28 ldout(cct, 0) << "WARNING: bypassing endpoint validation, allows sending secrets over insecure transport" << dendl;
29 return true;
30 }
31 return is_secure;
32}
33
34// make sure that endpoint is a valid URL
35// make sure that if user/password are passed inside URL, it is over secure connection
36// update rgw_pubsub_dest to indicate that a password is stored in the URL
37bool validate_and_update_endpoint_secret(rgw_pubsub_dest& dest, CephContext *cct, const RGWEnv& env) {
38 if (dest.push_endpoint.empty()) {
39 return true;
40 }
41 std::string user;
42 std::string password;
43 if (!rgw::parse_url_userinfo(dest.push_endpoint, user, password)) {
44 ldout(cct, 1) << "endpoint validation error: malformed endpoint URL:" << dest.push_endpoint << dendl;
45 return false;
46 }
47 // this should be verified inside parse_url()
48 ceph_assert(user.empty() == password.empty());
49 if (!user.empty()) {
50 dest.stored_secret = true;
51 if (!verify_transport_security(cct, env)) {
52 ldout(cct, 1) << "endpoint validation error: sending secrets over insecure transport" << dendl;
53 return false;
54 }
55 }
56 return true;
57}
58
59bool topic_has_endpoint_secret(const rgw_pubsub_topic& topic) {
60 return topic.dest.stored_secret;
61}
62
63bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
64 for (const auto& topic : topics.topics) {
65 if (topic_has_endpoint_secret(topic.second)) return true;
66 }
67 return false;
68}
69
eafe8130
TL
70// command (AWS compliant):
71// POST
f67539c2 72// Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
1e59de90
TL
73class RGWPSCreateTopicOp : public RGWOp {
74 private:
75 std::string topic_name;
76 rgw_pubsub_dest dest;
77 std::string topic_arn;
78 std::string opaque_data;
79
80 int get_params() {
eafe8130
TL
81 topic_name = s->info.args.get("Name");
82 if (topic_name.empty()) {
b3b6e05e 83 ldpp_dout(this, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
9f95a23c 84 return -EINVAL;
eafe8130
TL
85 }
86
9f95a23c
TL
87 opaque_data = s->info.args.get("OpaqueData");
88
eafe8130 89 dest.push_endpoint = s->info.args.get("push-endpoint");
522d829b 90 s->info.args.get_bool("persistent", &dest.persistent, false);
9f95a23c
TL
91
92 if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
93 return -EINVAL;
94 }
f67539c2 95 for (const auto& param : s->info.args.get_params()) {
9f95a23c
TL
96 if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
97 continue;
98 }
99 dest.push_endpoint_args.append(param.first+"="+param.second+"&");
eafe8130
TL
100 }
101
102 if (!dest.push_endpoint_args.empty()) {
9f95a23c
TL
103 // remove last separator
104 dest.push_endpoint_args.pop_back();
eafe8130 105 }
f67539c2
TL
106 if (!dest.push_endpoint.empty() && dest.persistent) {
107 const auto ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
108 if (ret < 0) {
b3b6e05e 109 ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for persistent topics. error:" << ret << dendl;
f67539c2
TL
110 return ret;
111 }
112 }
eafe8130
TL
113
114 // dest object only stores endpoint info
eafe8130
TL
115 dest.arn_topic = topic_name;
116 // the topic ARN will be sent in the reply
117 const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
1e59de90 118 driver->get_zone()->get_zonegroup().get_name(),
9f95a23c 119 s->user->get_tenant(), topic_name);
eafe8130
TL
120 topic_arn = arn.to_string();
121 return 0;
122 }
123
1e59de90
TL
124 public:
125 int verify_permission(optional_yield) override {
126 return 0;
127 }
128
129 void pre_exec() override {
130 rgw_bucket_object_pre_exec(s);
131 }
132 void execute(optional_yield) override;
133
134 const char* name() const override { return "pubsub_topic_create"; }
135 RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
136 uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
137
eafe8130
TL
138 void send_response() override {
139 if (op_ret) {
140 set_req_state_err(s, op_ret);
141 }
142 dump_errno(s);
143 end_header(s, this, "application/xml");
144
145 if (op_ret < 0) {
146 return;
147 }
148
149 const auto f = s->formatter;
f67539c2 150 f->open_object_section_in_ns("CreateTopicResponse", AWS_SNS_NS);
eafe8130
TL
151 f->open_object_section("CreateTopicResult");
152 encode_xml("TopicArn", topic_arn, f);
f67539c2 153 f->close_section(); // CreateTopicResult
eafe8130
TL
154 f->open_object_section("ResponseMetadata");
155 encode_xml("RequestId", s->req_id, f);
f67539c2
TL
156 f->close_section(); // ResponseMetadata
157 f->close_section(); // CreateTopicResponse
eafe8130
TL
158 rgw_flush_formatter_and_reset(s, f);
159 }
160};
161
1e59de90
TL
162void RGWPSCreateTopicOp::execute(optional_yield y) {
163 op_ret = get_params();
164 if (op_ret < 0) {
165 return;
166 }
167
168 const RGWPubSub ps(driver, s->owner.get_id().tenant);
169 op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data, y);
170 if (op_ret < 0) {
171 ldpp_dout(this, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
172 return;
173 }
174 ldpp_dout(this, 20) << "successfully created topic '" << topic_name << "'" << dendl;
175}
176
eafe8130
TL
177// command (AWS compliant):
178// POST
179// Action=ListTopics
1e59de90
TL
180class RGWPSListTopicsOp : public RGWOp {
181private:
182 rgw_pubsub_topics result;
183
eafe8130 184public:
1e59de90
TL
185 int verify_permission(optional_yield) override {
186 return 0;
187 }
188 void pre_exec() override {
189 rgw_bucket_object_pre_exec(s);
190 }
191 void execute(optional_yield) override;
192
193 const char* name() const override { return "pubsub_topics_list"; }
194 RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
195 uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
196
eafe8130
TL
197 void send_response() override {
198 if (op_ret) {
199 set_req_state_err(s, op_ret);
200 }
201 dump_errno(s);
202 end_header(s, this, "application/xml");
203
204 if (op_ret < 0) {
205 return;
206 }
207
208 const auto f = s->formatter;
f67539c2 209 f->open_object_section_in_ns("ListTopicsResponse", AWS_SNS_NS);
eafe8130
TL
210 f->open_object_section("ListTopicsResult");
211 encode_xml("Topics", result, f);
f67539c2 212 f->close_section(); // ListTopicsResult
eafe8130
TL
213 f->open_object_section("ResponseMetadata");
214 encode_xml("RequestId", s->req_id, f);
f67539c2
TL
215 f->close_section(); // ResponseMetadat
216 f->close_section(); // ListTopicsResponse
eafe8130
TL
217 rgw_flush_formatter_and_reset(s, f);
218 }
219};
220
1e59de90
TL
221void RGWPSListTopicsOp::execute(optional_yield y) {
222 const RGWPubSub ps(driver, s->owner.get_id().tenant);
223 op_ret = ps.get_topics(this, result, y);
224 // if there are no topics it is not considered an error
225 op_ret = op_ret == -ENOENT ? 0 : op_ret;
226 if (op_ret < 0) {
227 ldpp_dout(this, 1) << "failed to get topics, ret=" << op_ret << dendl;
228 return;
229 }
230 if (topics_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
231 ldpp_dout(this, 1) << "topics contain secrets and cannot be sent over insecure transport" << dendl;
232 op_ret = -EPERM;
233 return;
234 }
235 ldpp_dout(this, 20) << "successfully got topics" << dendl;
236}
237
eafe8130
TL
238// command (extension to AWS):
239// POST
240// Action=GetTopic&TopicArn=<topic-arn>
1e59de90
TL
241class RGWPSGetTopicOp : public RGWOp {
242 private:
243 std::string topic_name;
244 rgw_pubsub_topic result;
245
246 int get_params() {
eafe8130
TL
247 const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
248
249 if (!topic_arn || topic_arn->resource.empty()) {
b3b6e05e 250 ldpp_dout(this, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl;
eafe8130
TL
251 return -EINVAL;
252 }
253
254 topic_name = topic_arn->resource;
255 return 0;
256 }
257
1e59de90
TL
258 public:
259 int verify_permission(optional_yield y) override {
260 return 0;
261 }
262 void pre_exec() override {
263 rgw_bucket_object_pre_exec(s);
264 }
265 void execute(optional_yield y) override;
266
267 const char* name() const override { return "pubsub_topic_get"; }
268 RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
269 uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
270
eafe8130
TL
271 void send_response() override {
272 if (op_ret) {
273 set_req_state_err(s, op_ret);
274 }
275 dump_errno(s);
276 end_header(s, this, "application/xml");
277
278 if (op_ret < 0) {
279 return;
280 }
281
282 const auto f = s->formatter;
283 f->open_object_section("GetTopicResponse");
284 f->open_object_section("GetTopicResult");
1e59de90 285 encode_xml("Topic", result, f);
eafe8130
TL
286 f->close_section();
287 f->open_object_section("ResponseMetadata");
288 encode_xml("RequestId", s->req_id, f);
289 f->close_section();
290 f->close_section();
291 rgw_flush_formatter_and_reset(s, f);
292 }
293};
294
1e59de90
TL
295void RGWPSGetTopicOp::execute(optional_yield y) {
296 op_ret = get_params();
297 if (op_ret < 0) {
298 return;
299 }
300 const RGWPubSub ps(driver, s->owner.get_id().tenant);
301 op_ret = ps.get_topic(this, topic_name, result, y);
302 if (op_ret < 0) {
303 ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
304 return;
305 }
306 if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
307 ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
308 op_ret = -EPERM;
309 return;
310 }
311 ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
312}
313
f67539c2
TL
314// command (AWS compliant):
315// POST
316// Action=GetTopicAttributes&TopicArn=<topic-arn>
1e59de90
TL
317class RGWPSGetTopicAttributesOp : public RGWOp {
318 private:
319 std::string topic_name;
320 rgw_pubsub_topic result;
321
322 int get_params() {
f67539c2
TL
323 const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
324
325 if (!topic_arn || topic_arn->resource.empty()) {
b3b6e05e 326 ldpp_dout(this, 1) << "GetTopicAttribute Action 'TopicArn' argument is missing or invalid" << dendl;
f67539c2
TL
327 return -EINVAL;
328 }
329
330 topic_name = topic_arn->resource;
331 return 0;
332 }
333
1e59de90
TL
334 public:
335 int verify_permission(optional_yield y) override {
336 return 0;
337 }
338 void pre_exec() override {
339 rgw_bucket_object_pre_exec(s);
340 }
341 void execute(optional_yield y) override;
342
343 const char* name() const override { return "pubsub_topic_get"; }
344 RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
345 uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
346
f67539c2
TL
347 void send_response() override {
348 if (op_ret) {
349 set_req_state_err(s, op_ret);
350 }
351 dump_errno(s);
352 end_header(s, this, "application/xml");
353
354 if (op_ret < 0) {
355 return;
356 }
357
358 const auto f = s->formatter;
359 f->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS);
360 f->open_object_section("GetTopicAttributesResult");
1e59de90 361 result.dump_xml_as_attributes(f);
f67539c2
TL
362 f->close_section(); // GetTopicAttributesResult
363 f->open_object_section("ResponseMetadata");
364 encode_xml("RequestId", s->req_id, f);
365 f->close_section(); // ResponseMetadata
366 f->close_section(); // GetTopicAttributesResponse
367 rgw_flush_formatter_and_reset(s, f);
368 }
369};
370
1e59de90
TL
371void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
372 op_ret = get_params();
373 if (op_ret < 0) {
374 return;
375 }
376 const RGWPubSub ps(driver, s->owner.get_id().tenant);
377 op_ret = ps.get_topic(this, topic_name, result, y);
378 if (op_ret < 0) {
379 ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
380 return;
381 }
382 if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
383 ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
384 op_ret = -EPERM;
385 return;
386 }
387 ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
388}
389
eafe8130
TL
390// command (AWS compliant):
391// POST
392// Action=DeleteTopic&TopicArn=<topic-arn>
1e59de90
TL
393class RGWPSDeleteTopicOp : public RGWOp {
394 private:
395 std::string topic_name;
396
397 int get_params() {
eafe8130
TL
398 const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
399
400 if (!topic_arn || topic_arn->resource.empty()) {
b3b6e05e 401 ldpp_dout(this, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
9f95a23c 402 return -EINVAL;
eafe8130
TL
403 }
404
405 topic_name = topic_arn->resource;
f67539c2
TL
406
407 // upon deletion it is not known if topic is persistent or not
408 // will try to delete the persistent topic anyway
409 const auto ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
410 if (ret == -ENOENT) {
411 // topic was not persistent, or already deleted
412 return 0;
413 }
414 if (ret < 0) {
b3b6e05e 415 ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for persistent topics. error:" << ret << dendl;
f67539c2
TL
416 return ret;
417 }
418
eafe8130
TL
419 return 0;
420 }
1e59de90
TL
421
422 public:
423 int verify_permission(optional_yield) override {
424 return 0;
425 }
426 void pre_exec() override {
427 rgw_bucket_object_pre_exec(s);
428 }
429 void execute(optional_yield y) override;
430
431 const char* name() const override { return "pubsub_topic_delete"; }
432 RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
433 uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
434
eafe8130
TL
435 void send_response() override {
436 if (op_ret) {
437 set_req_state_err(s, op_ret);
438 }
439 dump_errno(s);
440 end_header(s, this, "application/xml");
441
442 if (op_ret < 0) {
443 return;
444 }
445
446 const auto f = s->formatter;
f67539c2 447 f->open_object_section_in_ns("DeleteTopicResponse", AWS_SNS_NS);
eafe8130
TL
448 f->open_object_section("ResponseMetadata");
449 encode_xml("RequestId", s->req_id, f);
f67539c2
TL
450 f->close_section(); // ResponseMetadata
451 f->close_section(); // DeleteTopicResponse
eafe8130
TL
452 rgw_flush_formatter_and_reset(s, f);
453 }
454};
455
1e59de90
TL
456void RGWPSDeleteTopicOp::execute(optional_yield y) {
457 op_ret = get_params();
458 if (op_ret < 0) {
eafe8130
TL
459 return;
460 }
1e59de90
TL
461 const RGWPubSub ps(driver, s->owner.get_id().tenant);
462 op_ret = ps.remove_topic(this, topic_name, y);
463 if (op_ret < 0) {
464 ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
465 return;
eafe8130 466 }
1e59de90 467 ldpp_dout(this, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
eafe8130
TL
468}
469
1e59de90
TL
470using op_generator = RGWOp*(*)();
471static const std::unordered_map<std::string, op_generator> op_generators = {
472 {"CreateTopic", []() -> RGWOp* {return new RGWPSCreateTopicOp;}},
473 {"DeleteTopic", []() -> RGWOp* {return new RGWPSDeleteTopicOp;}},
474 {"ListTopics", []() -> RGWOp* {return new RGWPSListTopicsOp;}},
475 {"GetTopic", []() -> RGWOp* {return new RGWPSGetTopicOp;}},
476 {"GetTopicAttributes", []() -> RGWOp* {return new RGWPSGetTopicAttributesOp;}}
477};
478
479bool RGWHandler_REST_PSTopic_AWS::action_exists(const req_state* s)
480{
481 if (s->info.args.exists("Action")) {
482 const std::string action_name = s->info.args.get("Action");
483 return op_generators.contains(action_name);
eafe8130 484 }
1e59de90 485 return false;
eafe8130
TL
486}
487
1e59de90
TL
488RGWOp *RGWHandler_REST_PSTopic_AWS::op_post()
489{
490 s->dialect = "sns";
491 s->prot_flags = RGW_REST_STS;
eafe8130
TL
492
493 if (s->info.args.exists("Action")) {
1e59de90
TL
494 const std::string action_name = s->info.args.get("Action");
495 const auto action_it = op_generators.find(action_name);
496 if (action_it != op_generators.end()) {
497 return action_it->second();
498 }
499 ldpp_dout(s, 10) << "unknown action '" << action_name << "' for Topic handler" << dendl;
500 } else {
501 ldpp_dout(s, 10) << "missing action argument in Topic handler" << dendl;
eafe8130 502 }
eafe8130
TL
503 return nullptr;
504}
505
f67539c2 506int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp, optional_yield y) {
1e59de90
TL
507 const auto rc = RGW_Auth_S3::authorize(dpp, driver, auth_registry, s, y);
508 if (rc < 0) {
509 return rc;
510 }
511 if (s->auth.identity->is_anonymous()) {
512 ldpp_dout(dpp, 1) << "anonymous user not allowed in topic operations" << dendl;
513 return -ERR_INVALID_REQUEST;
514 }
515 return 0;
eafe8130
TL
516}
517
eafe8130
TL
518namespace {
519// return a unique topic by prefexing with the notification name: <notification>_<topic>
520std::string topic_to_unique(const std::string& topic, const std::string& notification) {
521 return notification + "_" + topic;
522}
523
524// extract the topic from a unique topic of the form: <notification>_<topic>
525[[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
1e59de90 526 if (unique_topic.find(notification + "_") == std::string::npos) {
eafe8130
TL
527 return "";
528 }
529 return unique_topic.substr(notification.length() + 1);
530}
531
532// from list of bucket topics, find the one that was auto-generated by a notification
533auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std::string& notif_name) {
534 auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), [&](const auto& val) { return notif_name == val.second.s3_id; });
535 return it != bucket_topics.topics.end() ?
536 std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second):
537 std::nullopt;
538}
539}
540
1e59de90
TL
541int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) {
542 int op_ret = b.remove_notification(dpp, topic_name, y);
20effc67
TL
543 if (op_ret < 0) {
544 ldpp_dout(dpp, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
545 }
546 op_ret = ps.remove_topic(dpp, topic_name, y);
547 if (op_ret < 0) {
548 ldpp_dout(dpp, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
549 }
550 return op_ret;
551}
552
1e59de90 553int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) {
20effc67
TL
554 // delete all notifications of on a bucket
555 for (const auto& topic : bucket_topics.topics) {
1e59de90 556 const auto op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
20effc67
TL
557 if (op_ret < 0) {
558 return op_ret;
559 }
560 }
561 return 0;
562}
563
eafe8130
TL
564// command (S3 compliant): PUT /<bucket name>?notification
565// a "notification" and a subscription will be auto-generated
566// actual configuration is XML encoded in the body of the message
1e59de90
TL
567class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
568 int verify_params() override {
569 bool exists;
570 const auto no_value = s->info.args.get("notification", &exists);
571 if (!exists) {
572 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
573 return -EINVAL;
574 }
575 if (no_value.length() > 0) {
576 ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl;
577 return -EINVAL;
578 }
579 if (s->bucket_name.empty()) {
580 ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
581 return -EINVAL;
582 }
583 return 0;
584 }
eafe8130 585
1e59de90 586 int get_params_from_body(rgw_pubsub_s3_notifications& configurations) {
eafe8130
TL
587 const auto max_size = s->cct->_conf->rgw_max_put_param_size;
588 int r;
589 bufferlist data;
20effc67 590 std::tie(r, data) = read_all_input(s, max_size, false);
eafe8130
TL
591
592 if (r < 0) {
b3b6e05e 593 ldpp_dout(this, 1) << "failed to read XML payload" << dendl;
eafe8130
TL
594 return r;
595 }
596 if (data.length() == 0) {
b3b6e05e 597 ldpp_dout(this, 1) << "XML payload missing" << dendl;
eafe8130
TL
598 return -EINVAL;
599 }
600
601 RGWXMLDecoder::XMLParser parser;
602
603 if (!parser.init()){
b3b6e05e 604 ldpp_dout(this, 1) << "failed to initialize XML parser" << dendl;
eafe8130
TL
605 return -EINVAL;
606 }
607 if (!parser.parse(data.c_str(), data.length(), 1)) {
b3b6e05e 608 ldpp_dout(this, 1) << "failed to parse XML payload" << dendl;
eafe8130
TL
609 return -ERR_MALFORMED_XML;
610 }
611 try {
612 // NotificationConfigurations is mandatory
20effc67 613 // It can be empty which means we delete all the notifications
eafe8130
TL
614 RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true);
615 } catch (RGWXMLDecoder::err& err) {
b3b6e05e 616 ldpp_dout(this, 1) << "failed to parse XML payload. error: " << err << dendl;
eafe8130
TL
617 return -ERR_MALFORMED_XML;
618 }
619 return 0;
620 }
1e59de90
TL
621public:
622 int verify_permission(optional_yield y) override;
eafe8130 623
1e59de90
TL
624 void pre_exec() override {
625 rgw_bucket_object_pre_exec(s);
eafe8130
TL
626 }
627
eafe8130 628 const char* name() const override { return "pubsub_notification_create_s3"; }
1e59de90
TL
629 RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
630 uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
631
632
f67539c2 633 void execute(optional_yield) override;
eafe8130
TL
634};
635
1e59de90
TL
636void RGWPSCreateNotifOp::execute(optional_yield y) {
637 op_ret = verify_params();
eafe8130
TL
638 if (op_ret < 0) {
639 return;
640 }
641
1e59de90
TL
642 rgw_pubsub_s3_notifications configurations;
643 op_ret = get_params_from_body(configurations);
644 if (op_ret < 0) {
645 return;
646 }
20effc67 647
1e59de90
TL
648 std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
649 std::unique_ptr<rgw::sal::Bucket> bucket;
650 op_ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, s->bucket_name, &bucket, y);
651 if (op_ret < 0) {
652 ldpp_dout(this, 1) << "failed to get bucket '" << s->bucket_name << "' info, ret = " << op_ret << dendl;
653 return;
eafe8130
TL
654 }
655
1e59de90
TL
656 const RGWPubSub ps(driver, s->owner.get_id().tenant);
657 const RGWPubSub::Bucket b(ps, bucket.get());
658
20effc67
TL
659 if(configurations.list.empty()) {
660 // get all topics on a bucket
661 rgw_pubsub_bucket_topics bucket_topics;
1e59de90 662 op_ret = b.get_topics(this, bucket_topics, y);
20effc67 663 if (op_ret < 0) {
1e59de90 664 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
20effc67
TL
665 return;
666 }
667
1e59de90 668 op_ret = delete_all_notifications(this, bucket_topics, b, y, ps);
20effc67
TL
669 return;
670 }
671
eafe8130
TL
672 for (const auto& c : configurations.list) {
673 const auto& notif_name = c.id;
674 if (notif_name.empty()) {
b3b6e05e 675 ldpp_dout(this, 1) << "missing notification id" << dendl;
eafe8130
TL
676 op_ret = -EINVAL;
677 return;
678 }
679 if (c.topic_arn.empty()) {
b3b6e05e 680 ldpp_dout(this, 1) << "missing topic ARN in notification: '" << notif_name << "'" << dendl;
eafe8130
TL
681 op_ret = -EINVAL;
682 return;
683 }
684
685 const auto arn = rgw::ARN::parse(c.topic_arn);
686 if (!arn || arn->resource.empty()) {
b3b6e05e 687 ldpp_dout(this, 1) << "topic ARN has invalid format: '" << c.topic_arn << "' in notification: '" << notif_name << "'" << dendl;
eafe8130
TL
688 op_ret = -EINVAL;
689 return;
690 }
691
692 if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) {
b3b6e05e 693 ldpp_dout(this, 1) << "unknown event type in notification: '" << notif_name << "'" << dendl;
eafe8130
TL
694 op_ret = -EINVAL;
695 return;
696 }
697
698 const auto topic_name = arn->resource;
699
700 // get topic information. destination information is stored in the topic
701 rgw_pubsub_topic topic_info;
1e59de90 702 op_ret = ps.get_topic(this, topic_name, topic_info, y);
eafe8130 703 if (op_ret < 0) {
b3b6e05e 704 ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
eafe8130
TL
705 return;
706 }
707 // make sure that full topic configuration match
708 // TODO: use ARN match function
709
710 // create unique topic name. this has 2 reasons:
711 // (1) topics cannot be shared between different S3 notifications because they hold the filter information
712 // (2) make topic clneaup easier, when notification is removed
713 const auto unique_topic_name = topic_to_unique(topic_name, notif_name);
714 // generate the internal topic. destination is stored here for the "push-only" case
715 // when no subscription exists
716 // ARN is cached to make the "GET" method faster
1e59de90 717 op_ret = ps.create_topic(this, unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
eafe8130 718 if (op_ret < 0) {
b3b6e05e 719 ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name <<
eafe8130
TL
720 "', ret=" << op_ret << dendl;
721 return;
722 }
b3b6e05e 723 ldpp_dout(this, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl;
eafe8130
TL
724 // generate the notification
725 rgw::notify::EventTypeList events;
1e59de90 726 op_ret = b.create_notification(this, unique_topic_name, c.events, std::make_optional(c.filter), notif_name, y);
eafe8130 727 if (op_ret < 0) {
b3b6e05e 728 ldpp_dout(this, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
eafe8130
TL
729 "', ret=" << op_ret << dendl;
730 // rollback generated topic (ignore return value)
1e59de90 731 ps.remove_topic(this, unique_topic_name, y);
eafe8130
TL
732 return;
733 }
b3b6e05e 734 ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
eafe8130
TL
735 }
736}
737
1e59de90
TL
738int RGWPSCreateNotifOp::verify_permission(optional_yield y) {
739 if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) {
740 return -EACCES;
741 }
eafe8130 742
1e59de90
TL
743 return 0;
744}
745
746// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
747class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
748 int get_params(std::string& notif_name) const {
eafe8130
TL
749 bool exists;
750 notif_name = s->info.args.get("notification", &exists);
751 if (!exists) {
b3b6e05e 752 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
eafe8130
TL
753 return -EINVAL;
754 }
755 if (s->bucket_name.empty()) {
b3b6e05e 756 ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
eafe8130
TL
757 return -EINVAL;
758 }
eafe8130
TL
759 return 0;
760 }
761
eafe8130 762public:
1e59de90
TL
763 int verify_permission(optional_yield y) override;
764
765 void pre_exec() override {
766 rgw_bucket_object_pre_exec(s);
767 }
768
eafe8130 769 const char* name() const override { return "pubsub_notification_delete_s3"; }
1e59de90
TL
770 RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
771 uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
772
773 void execute(optional_yield y) override;
eafe8130
TL
774};
775
1e59de90
TL
776void RGWPSDeleteNotifOp::execute(optional_yield y) {
777 std::string notif_name;
778 op_ret = get_params(notif_name);
779 if (op_ret < 0) {
780 return;
781 }
782
783 std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
784 std::unique_ptr<rgw::sal::Bucket> bucket;
785 op_ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, s->bucket_name, &bucket, y);
eafe8130 786 if (op_ret < 0) {
1e59de90 787 ldpp_dout(this, 1) << "failed to get bucket '" << s->bucket_name << "' info, ret = " << op_ret << dendl;
eafe8130
TL
788 return;
789 }
790
1e59de90
TL
791 const RGWPubSub ps(driver, s->owner.get_id().tenant);
792 const RGWPubSub::Bucket b(ps, bucket.get());
eafe8130
TL
793
794 // get all topics on a bucket
795 rgw_pubsub_bucket_topics bucket_topics;
1e59de90 796 op_ret = b.get_topics(this, bucket_topics, y);
eafe8130 797 if (op_ret < 0) {
1e59de90 798 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
eafe8130
TL
799 return;
800 }
801
802 if (!notif_name.empty()) {
803 // delete a specific notification
804 const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
805 if (unique_topic) {
eafe8130 806 const auto unique_topic_name = unique_topic->get().topic.name;
1e59de90 807 op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, ps);
eafe8130
TL
808 return;
809 }
810 // notification to be removed is not found - considered success
b3b6e05e 811 ldpp_dout(this, 20) << "notification '" << notif_name << "' already removed" << dendl;
eafe8130
TL
812 return;
813 }
814
1e59de90
TL
815 op_ret = delete_all_notifications(this, bucket_topics, b, y, ps);
816}
817
818int RGWPSDeleteNotifOp::verify_permission(optional_yield y) {
819 if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) {
820 return -EACCES;
821 }
822
823 return 0;
eafe8130
TL
824}
825
826// command (S3 compliant): GET /bucket?notification[=<notification-id>]
1e59de90 827class RGWPSListNotifsOp : public RGWOp {
eafe8130
TL
828 rgw_pubsub_s3_notifications notifications;
829
1e59de90 830 int get_params(std::string& notif_name) const {
eafe8130
TL
831 bool exists;
832 notif_name = s->info.args.get("notification", &exists);
833 if (!exists) {
b3b6e05e 834 ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
eafe8130
TL
835 return -EINVAL;
836 }
837 if (s->bucket_name.empty()) {
b3b6e05e 838 ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
eafe8130
TL
839 return -EINVAL;
840 }
eafe8130
TL
841 return 0;
842 }
843
844public:
1e59de90
TL
845 int verify_permission(optional_yield y) override;
846
847 void pre_exec() override {
848 rgw_bucket_object_pre_exec(s);
849 }
850
851 const char* name() const override { return "pubsub_notifications_get_s3"; }
852 RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
853 uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
854
f67539c2 855 void execute(optional_yield y) override;
eafe8130
TL
856 void send_response() override {
857 if (op_ret) {
858 set_req_state_err(s, op_ret);
859 }
860 dump_errno(s);
861 end_header(s, this, "application/xml");
862
863 if (op_ret < 0) {
864 return;
865 }
866 notifications.dump_xml(s->formatter);
867 rgw_flush_formatter_and_reset(s, s->formatter);
868 }
eafe8130
TL
869};
870
1e59de90
TL
871void RGWPSListNotifsOp::execute(optional_yield y) {
872 std::string notif_name;
873 op_ret = get_params(notif_name);
874 if (op_ret < 0) {
875 return;
876 }
877
878 std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
879 std::unique_ptr<rgw::sal::Bucket> bucket;
880 op_ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, s->bucket_name, &bucket, y);
881 if (op_ret < 0) {
882 ldpp_dout(this, 1) << "failed to get bucket '" << s->bucket_name << "' info, ret = " << op_ret << dendl;
883 return;
884 }
885
886 const RGWPubSub ps(driver, s->owner.get_id().tenant);
887 const RGWPubSub::Bucket b(ps, bucket.get());
eafe8130
TL
888
889 // get all topics on a bucket
890 rgw_pubsub_bucket_topics bucket_topics;
1e59de90 891 op_ret = b.get_topics(this, bucket_topics, y);
eafe8130 892 if (op_ret < 0) {
1e59de90 893 ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
eafe8130
TL
894 return;
895 }
896 if (!notif_name.empty()) {
897 // get info of a specific notification
898 const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
899 if (unique_topic) {
900 notifications.list.emplace_back(unique_topic->get());
901 return;
902 }
903 op_ret = -ENOENT;
b3b6e05e 904 ldpp_dout(this, 1) << "failed to get notification info for '" << notif_name << "', ret=" << op_ret << dendl;
eafe8130
TL
905 return;
906 }
907 // loop through all topics of the bucket
908 for (const auto& topic : bucket_topics.topics) {
909 if (topic.second.s3_id.empty()) {
910 // not an s3 notification
911 continue;
912 }
913 notifications.list.emplace_back(topic.second);
914 }
915}
916
1e59de90
TL
917int RGWPSListNotifsOp::verify_permission(optional_yield y) {
918 if (!verify_bucket_permission(this, s, rgw::IAM::s3GetBucketNotification)) {
919 return -EACCES;
920 }
921
922 return 0;
923}
924
eafe8130 925RGWOp* RGWHandler_REST_PSNotifs_S3::op_get() {
1e59de90 926 return new RGWPSListNotifsOp();
eafe8130
TL
927}
928
929RGWOp* RGWHandler_REST_PSNotifs_S3::op_put() {
1e59de90 930 return new RGWPSCreateNotifOp();
eafe8130
TL
931}
932
933RGWOp* RGWHandler_REST_PSNotifs_S3::op_delete() {
1e59de90 934 return new RGWPSDeleteNotifOp();
eafe8130
TL
935}
936
937RGWOp* RGWHandler_REST_PSNotifs_S3::create_get_op() {
1e59de90 938 return new RGWPSListNotifsOp();
eafe8130
TL
939}
940
941RGWOp* RGWHandler_REST_PSNotifs_S3::create_put_op() {
1e59de90 942 return new RGWPSCreateNotifOp();
eafe8130
TL
943}
944
945RGWOp* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
1e59de90 946 return new RGWPSDeleteNotifOp();
eafe8130
TL
947}
948