]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_rest_pubsub.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_rest_pubsub.cc
CommitLineData
eafe8130
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <algorithm>
5#include <boost/tokenizer.hpp>
6#include <optional>
7#include "rgw_rest_pubsub_common.h"
8#include "rgw_rest_pubsub.h"
9#include "rgw_pubsub_push.h"
10#include "rgw_pubsub.h"
11#include "rgw_sync_module_pubsub.h"
12#include "rgw_op.h"
13#include "rgw_rest.h"
14#include "rgw_rest_s3.h"
15#include "rgw_arn.h"
16#include "rgw_auth_s3.h"
17#include "services/svc_zone.h"
18
19#define dout_context g_ceph_context
20#define dout_subsys ceph_subsys_rgw
21
9f95a23c 22
eafe8130
TL
23// command (AWS compliant):
24// POST
25// Action=CreateTopic&Name=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
26class RGWPSCreateTopic_ObjStore_AWS : public RGWPSCreateTopicOp {
27public:
28 int get_params() override {
29 topic_name = s->info.args.get("Name");
30 if (topic_name.empty()) {
9f95a23c
TL
31 ldout(s->cct, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
32 return -EINVAL;
eafe8130
TL
33 }
34
9f95a23c
TL
35 opaque_data = s->info.args.get("OpaqueData");
36
eafe8130 37 dest.push_endpoint = s->info.args.get("push-endpoint");
9f95a23c
TL
38
39 if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
40 return -EINVAL;
41 }
eafe8130 42 for (const auto param : s->info.args.get_params()) {
9f95a23c
TL
43 if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
44 continue;
45 }
46 dest.push_endpoint_args.append(param.first+"="+param.second+"&");
eafe8130
TL
47 }
48
49 if (!dest.push_endpoint_args.empty()) {
9f95a23c
TL
50 // remove last separator
51 dest.push_endpoint_args.pop_back();
eafe8130
TL
52 }
53
54 // dest object only stores endpoint info
55 // bucket to store events/records will be set only when subscription is created
56 dest.bucket_name = "";
57 dest.oid_prefix = "";
58 dest.arn_topic = topic_name;
59 // the topic ARN will be sent in the reply
60 const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
9f95a23c
TL
61 store->svc()->zone->get_zonegroup().get_name(),
62 s->user->get_tenant(), topic_name);
eafe8130
TL
63 topic_arn = arn.to_string();
64 return 0;
65 }
66
67 void send_response() override {
68 if (op_ret) {
69 set_req_state_err(s, op_ret);
70 }
71 dump_errno(s);
72 end_header(s, this, "application/xml");
73
74 if (op_ret < 0) {
75 return;
76 }
77
78 const auto f = s->formatter;
79 f->open_object_section_in_ns("CreateTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
80 f->open_object_section("CreateTopicResult");
81 encode_xml("TopicArn", topic_arn, f);
82 f->close_section();
83 f->open_object_section("ResponseMetadata");
84 encode_xml("RequestId", s->req_id, f);
85 f->close_section();
86 f->close_section();
87 rgw_flush_formatter_and_reset(s, f);
88 }
89};
90
91// command (AWS compliant):
92// POST
93// Action=ListTopics
94class RGWPSListTopics_ObjStore_AWS : public RGWPSListTopicsOp {
95public:
96 void send_response() override {
97 if (op_ret) {
98 set_req_state_err(s, op_ret);
99 }
100 dump_errno(s);
101 end_header(s, this, "application/xml");
102
103 if (op_ret < 0) {
104 return;
105 }
106
107 const auto f = s->formatter;
108 f->open_object_section_in_ns("ListTopicsResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
109 f->open_object_section("ListTopicsResult");
110 encode_xml("Topics", result, f);
111 f->close_section();
112 f->open_object_section("ResponseMetadata");
113 encode_xml("RequestId", s->req_id, f);
114 f->close_section();
115 f->close_section();
116 rgw_flush_formatter_and_reset(s, f);
117 }
118};
119
120// command (extension to AWS):
121// POST
122// Action=GetTopic&TopicArn=<topic-arn>
123class RGWPSGetTopic_ObjStore_AWS : public RGWPSGetTopicOp {
124public:
125 int get_params() override {
126 const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
127
128 if (!topic_arn || topic_arn->resource.empty()) {
129 ldout(s->cct, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl;
130 return -EINVAL;
131 }
132
133 topic_name = topic_arn->resource;
134 return 0;
135 }
136
137 void send_response() override {
138 if (op_ret) {
139 set_req_state_err(s, op_ret);
140 }
141 dump_errno(s);
142 end_header(s, this, "application/xml");
143
144 if (op_ret < 0) {
145 return;
146 }
147
148 const auto f = s->formatter;
149 f->open_object_section("GetTopicResponse");
150 f->open_object_section("GetTopicResult");
151 encode_xml("Topic", result.topic, f);
152 f->close_section();
153 f->open_object_section("ResponseMetadata");
154 encode_xml("RequestId", s->req_id, f);
155 f->close_section();
156 f->close_section();
157 rgw_flush_formatter_and_reset(s, f);
158 }
159};
160
161// command (AWS compliant):
162// POST
163// Action=DeleteTopic&TopicArn=<topic-arn>
164class RGWPSDeleteTopic_ObjStore_AWS : public RGWPSDeleteTopicOp {
165public:
166 int get_params() override {
167 const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
168
169 if (!topic_arn || topic_arn->resource.empty()) {
9f95a23c
TL
170 ldout(s->cct, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
171 return -EINVAL;
eafe8130
TL
172 }
173
174 topic_name = topic_arn->resource;
175 return 0;
176 }
177
178 void send_response() override {
179 if (op_ret) {
180 set_req_state_err(s, op_ret);
181 }
182 dump_errno(s);
183 end_header(s, this, "application/xml");
184
185 if (op_ret < 0) {
186 return;
187 }
188
189 const auto f = s->formatter;
190 f->open_object_section_in_ns("DeleteTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
191 f->open_object_section("ResponseMetadata");
192 encode_xml("RequestId", s->req_id, f);
193 f->close_section();
194 f->close_section();
195 rgw_flush_formatter_and_reset(s, f);
196 }
197};
198
199namespace {
200// utility classes and functions for handling parameters with the following format:
201// Attributes.entry.{N}.{key|value}={VALUE}
202// N - any unsigned number
203// VALUE - url encoded string
204
205// and Attribute is holding key and value
206// ctor and set are done according to the "type" argument
207// if type is not "key" or "value" its a no-op
208class Attribute {
9f95a23c
TL
209 std::string key;
210 std::string value;
eafe8130 211public:
9f95a23c
TL
212 Attribute(const std::string& type, const std::string& key_or_value) {
213 set(type, key_or_value);
214 }
215 void set(const std::string& type, const std::string& key_or_value) {
216 if (type == "key") {
217 key = key_or_value;
218 } else if (type == "value") {
219 value = key_or_value;
eafe8130 220 }
9f95a23c
TL
221 }
222 const std::string& get_key() const { return key; }
223 const std::string& get_value() const { return value; }
eafe8130
TL
224};
225
226using AttributeMap = std::map<unsigned, Attribute>;
227
228// aggregate the attributes into a map
229// the key and value are associated by the index (N)
230// no assumptions are made on the order in which these parameters are added
231void update_attribute_map(const std::string& input, AttributeMap& map) {
232 const boost::char_separator<char> sep(".");
233 const boost::tokenizer tokens(input, sep);
234 auto token = tokens.begin();
235 if (*token != "Attributes") {
236 return;
237 }
238 ++token;
239
240 if (*token != "entry") {
241 return;
242 }
243 ++token;
244
245 unsigned idx;
246 try {
247 idx = std::stoul(*token);
248 } catch (const std::invalid_argument&) {
249 return;
250 }
251 ++token;
252
253 std::string key_or_value = "";
254 // get the rest of the string regardless of dots
255 // this is to allow dots in the value
256 while (token != tokens.end()) {
257 key_or_value.append(*token+".");
258 ++token;
259 }
260 // remove last separator
261 key_or_value.pop_back();
262
263 auto pos = key_or_value.find("=");
264 if (pos != string::npos) {
265 const auto key_or_value_lhs = key_or_value.substr(0, pos);
266 const auto key_or_value_rhs = url_decode(key_or_value.substr(pos + 1, key_or_value.size() - 1));
267 const auto map_it = map.find(idx);
268 if (map_it == map.end()) {
269 // new entry
270 map.emplace(std::make_pair(idx, Attribute(key_or_value_lhs, key_or_value_rhs)));
271 } else {
272 // existing entry
273 map_it->second.set(key_or_value_lhs, key_or_value_rhs);
274 }
275 }
276}
277}
278
279void RGWHandler_REST_PSTopic_AWS::rgw_topic_parse_input() {
280 if (post_body.size() > 0) {
281 ldout(s->cct, 10) << "Content of POST: " << post_body << dendl;
282
283 if (post_body.find("Action") != string::npos) {
284 const boost::char_separator<char> sep("&");
285 const boost::tokenizer<boost::char_separator<char>> tokens(post_body, sep);
286 AttributeMap map;
287 for (const auto& t : tokens) {
288 auto pos = t.find("=");
289 if (pos != string::npos) {
290 const auto key = t.substr(0, pos);
291 if (key == "Action") {
292 s->info.args.append(key, t.substr(pos + 1, t.size() - 1));
293 } else if (key == "Name" || key == "TopicArn") {
294 const auto value = url_decode(t.substr(pos + 1, t.size() - 1));
295 s->info.args.append(key, value);
296 } else {
297 update_attribute_map(t, map);
298 }
299 }
300 }
301 // update the regular args with the content of the attribute map
302 for (const auto attr : map) {
303 s->info.args.append(attr.second.get_key(), attr.second.get_value());
304 }
305 }
306 const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body);
307 s->info.args.append("PayloadHash", payload_hash);
308 }
309}
310
311RGWOp* RGWHandler_REST_PSTopic_AWS::op_post() {
312 rgw_topic_parse_input();
313
314 if (s->info.args.exists("Action")) {
315 const auto action = s->info.args.get("Action");
316 if (action.compare("CreateTopic") == 0)
317 return new RGWPSCreateTopic_ObjStore_AWS();
318 if (action.compare("DeleteTopic") == 0)
319 return new RGWPSDeleteTopic_ObjStore_AWS;
320 if (action.compare("ListTopics") == 0)
321 return new RGWPSListTopics_ObjStore_AWS();
322 if (action.compare("GetTopic") == 0)
323 return new RGWPSGetTopic_ObjStore_AWS();
324 }
325
326 return nullptr;
327}
328
329int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp) {
330 /*if (s->info.args.exists("Action") && s->info.args.get("Action").find("Topic") != std::string::npos) {
331 // TODO: some topic specific authorization
332 return 0;
333 }*/
334 return RGW_Auth_S3::authorize(dpp, store, auth_registry, s);
335}
336
337
338namespace {
339// return a unique topic by prefexing with the notification name: <notification>_<topic>
340std::string topic_to_unique(const std::string& topic, const std::string& notification) {
341 return notification + "_" + topic;
342}
343
344// extract the topic from a unique topic of the form: <notification>_<topic>
345[[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
346 if (unique_topic.find(notification + "_") == string::npos) {
347 return "";
348 }
349 return unique_topic.substr(notification.length() + 1);
350}
351
352// from list of bucket topics, find the one that was auto-generated by a notification
353auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std::string& notif_name) {
354 auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), [&](const auto& val) { return notif_name == val.second.s3_id; });
355 return it != bucket_topics.topics.end() ?
356 std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second):
357 std::nullopt;
358}
359}
360
361// command (S3 compliant): PUT /<bucket name>?notification
362// a "notification" and a subscription will be auto-generated
363// actual configuration is XML encoded in the body of the message
364class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
365 rgw_pubsub_s3_notifications configurations;
366
367 int get_params_from_body() {
368 const auto max_size = s->cct->_conf->rgw_max_put_param_size;
369 int r;
370 bufferlist data;
371 std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false);
372
373 if (r < 0) {
374 ldout(s->cct, 1) << "failed to read XML payload" << dendl;
375 return r;
376 }
377 if (data.length() == 0) {
378 ldout(s->cct, 1) << "XML payload missing" << dendl;
379 return -EINVAL;
380 }
381
382 RGWXMLDecoder::XMLParser parser;
383
384 if (!parser.init()){
385 ldout(s->cct, 1) << "failed to initialize XML parser" << dendl;
386 return -EINVAL;
387 }
388 if (!parser.parse(data.c_str(), data.length(), 1)) {
389 ldout(s->cct, 1) << "failed to parse XML payload" << dendl;
390 return -ERR_MALFORMED_XML;
391 }
392 try {
393 // NotificationConfigurations is mandatory
394 RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true);
395 } catch (RGWXMLDecoder::err& err) {
396 ldout(s->cct, 1) << "failed to parse XML payload. error: " << err << dendl;
397 return -ERR_MALFORMED_XML;
398 }
399 return 0;
400 }
401
402 int get_params() override {
403 bool exists;
404 const auto no_value = s->info.args.get("notification", &exists);
405 if (!exists) {
406 ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
407 return -EINVAL;
408 }
409 if (no_value.length() > 0) {
410 ldout(s->cct, 1) << "param 'notification' should not have any value" << dendl;
411 return -EINVAL;
412 }
413 if (s->bucket_name.empty()) {
414 ldout(s->cct, 1) << "request must be on a bucket" << dendl;
415 return -EINVAL;
416 }
417 bucket_name = s->bucket_name;
418 return 0;
419 }
420
421public:
422 const char* name() const override { return "pubsub_notification_create_s3"; }
423 void execute() override;
424};
425
426void RGWPSCreateNotif_ObjStore_S3::execute() {
427 op_ret = get_params_from_body();
428 if (op_ret < 0) {
429 return;
430 }
431
432 ups.emplace(store, s->owner.get_id());
433 auto b = ups->get_bucket(bucket_info.bucket);
434 ceph_assert(b);
435 std::string data_bucket_prefix = "";
436 std::string data_oid_prefix = "";
437 bool push_only = true;
9f95a23c
TL
438 if (store->getRados()->get_sync_module()) {
439 const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->getRados()->get_sync_module().get());
eafe8130 440 if (psmodule) {
9f95a23c
TL
441 const auto& conf = psmodule->get_effective_conf();
442 data_bucket_prefix = conf["data_bucket_prefix"];
443 data_oid_prefix = conf["data_oid_prefix"];
444 // TODO: allow "push-only" on PS zone as well
445 push_only = false;
eafe8130
TL
446 }
447 }
448
449 for (const auto& c : configurations.list) {
450 const auto& notif_name = c.id;
451 if (notif_name.empty()) {
452 ldout(s->cct, 1) << "missing notification id" << dendl;
453 op_ret = -EINVAL;
454 return;
455 }
456 if (c.topic_arn.empty()) {
457 ldout(s->cct, 1) << "missing topic ARN in notification: '" << notif_name << "'" << dendl;
458 op_ret = -EINVAL;
459 return;
460 }
461
462 const auto arn = rgw::ARN::parse(c.topic_arn);
463 if (!arn || arn->resource.empty()) {
464 ldout(s->cct, 1) << "topic ARN has invalid format: '" << c.topic_arn << "' in notification: '" << notif_name << "'" << dendl;
465 op_ret = -EINVAL;
466 return;
467 }
468
469 if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) {
470 ldout(s->cct, 1) << "unknown event type in notification: '" << notif_name << "'" << dendl;
471 op_ret = -EINVAL;
472 return;
473 }
474
475 const auto topic_name = arn->resource;
476
477 // get topic information. destination information is stored in the topic
478 rgw_pubsub_topic topic_info;
479 op_ret = ups->get_topic(topic_name, &topic_info);
480 if (op_ret < 0) {
481 ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
482 return;
483 }
484 // make sure that full topic configuration match
485 // TODO: use ARN match function
486
487 // create unique topic name. this has 2 reasons:
488 // (1) topics cannot be shared between different S3 notifications because they hold the filter information
489 // (2) make topic clneaup easier, when notification is removed
490 const auto unique_topic_name = topic_to_unique(topic_name, notif_name);
491 // generate the internal topic. destination is stored here for the "push-only" case
492 // when no subscription exists
493 // ARN is cached to make the "GET" method faster
9f95a23c 494 op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data);
eafe8130
TL
495 if (op_ret < 0) {
496 ldout(s->cct, 1) << "failed to auto-generate unique topic '" << unique_topic_name <<
497 "', ret=" << op_ret << dendl;
498 return;
499 }
500 ldout(s->cct, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl;
501 // generate the notification
502 rgw::notify::EventTypeList events;
503 op_ret = b->create_notification(unique_topic_name, c.events, std::make_optional(c.filter), notif_name);
504 if (op_ret < 0) {
505 ldout(s->cct, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
506 "', ret=" << op_ret << dendl;
507 // rollback generated topic (ignore return value)
508 ups->remove_topic(unique_topic_name);
509 return;
510 }
511 ldout(s->cct, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
512
513 if (!push_only) {
514 // generate the subscription with destination information from the original topic
515 rgw_pubsub_sub_dest dest = topic_info.dest;
516 dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
517 dest.oid_prefix = data_oid_prefix + notif_name + "/";
518 auto sub = ups->get_sub(notif_name);
519 op_ret = sub->subscribe(unique_topic_name, dest, notif_name);
520 if (op_ret < 0) {
521 ldout(s->cct, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl;
522 // rollback generated notification (ignore return value)
523 b->remove_notification(unique_topic_name);
524 // rollback generated topic (ignore return value)
525 ups->remove_topic(unique_topic_name);
526 return;
527 }
528 ldout(s->cct, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl;
529 }
530 }
531}
532
533// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
534class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSDeleteNotifOp {
535private:
536 std::string notif_name;
537
538 int get_params() override {
539 bool exists;
540 notif_name = s->info.args.get("notification", &exists);
541 if (!exists) {
542 ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
543 return -EINVAL;
544 }
545 if (s->bucket_name.empty()) {
546 ldout(s->cct, 1) << "request must be on a bucket" << dendl;
547 return -EINVAL;
548 }
549 bucket_name = s->bucket_name;
550 return 0;
551 }
552
553 void remove_notification_by_topic(const std::string& topic_name, const RGWUserPubSub::BucketRef& b) {
554 op_ret = b->remove_notification(topic_name);
555 if (op_ret < 0) {
556 ldout(s->cct, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
557 }
558 op_ret = ups->remove_topic(topic_name);
559 if (op_ret < 0) {
560 ldout(s->cct, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
561 }
562 }
563
564public:
565 void execute() override;
566 const char* name() const override { return "pubsub_notification_delete_s3"; }
567};
568
569void RGWPSDeleteNotif_ObjStore_S3::execute() {
570 op_ret = get_params();
571 if (op_ret < 0) {
572 return;
573 }
574
575 ups.emplace(store, s->owner.get_id());
576 auto b = ups->get_bucket(bucket_info.bucket);
577 ceph_assert(b);
578
579 // get all topics on a bucket
580 rgw_pubsub_bucket_topics bucket_topics;
581 op_ret = b->get_topics(&bucket_topics);
582 if (op_ret < 0) {
583 ldout(s->cct, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
584 return;
585 }
586
587 if (!notif_name.empty()) {
588 // delete a specific notification
589 const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
590 if (unique_topic) {
591 // remove the auto generated subscription according to notification name (if exist)
592 const auto unique_topic_name = unique_topic->get().topic.name;
593 auto sub = ups->get_sub(notif_name);
594 op_ret = sub->unsubscribe(unique_topic_name);
595 if (op_ret < 0 && op_ret != -ENOENT) {
596 ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
597 return;
598 }
599 remove_notification_by_topic(unique_topic_name, b);
600 return;
601 }
602 // notification to be removed is not found - considered success
603 ldout(s->cct, 20) << "notification '" << notif_name << "' already removed" << dendl;
604 return;
605 }
606
607 // delete all notification of on a bucket
608 for (const auto& topic : bucket_topics.topics) {
609 // remove the auto generated subscription of the topic (if exist)
610 rgw_pubsub_topic_subs topic_subs;
611 op_ret = ups->get_topic(topic.first, &topic_subs);
612 for (const auto& topic_sub_name : topic_subs.subs) {
613 auto sub = ups->get_sub(topic_sub_name);
614 rgw_pubsub_sub_config sub_conf;
615 op_ret = sub->get_conf(&sub_conf);
616 if (op_ret < 0) {
617 ldout(s->cct, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl;
618 return;
619 }
620 if (!sub_conf.s3_id.empty()) {
621 // S3 notification, has autogenerated subscription
622 const auto& sub_topic_name = sub_conf.topic;
623 op_ret = sub->unsubscribe(sub_topic_name);
624 if (op_ret < 0) {
625 ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl;
626 return;
627 }
628 }
629 }
630 remove_notification_by_topic(topic.first, b);
631 }
632}
633
634// command (S3 compliant): GET /bucket?notification[=<notification-id>]
635class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
636private:
637 std::string notif_name;
638 rgw_pubsub_s3_notifications notifications;
639
640 int get_params() override {
641 bool exists;
642 notif_name = s->info.args.get("notification", &exists);
643 if (!exists) {
644 ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
645 return -EINVAL;
646 }
647 if (s->bucket_name.empty()) {
648 ldout(s->cct, 1) << "request must be on a bucket" << dendl;
649 return -EINVAL;
650 }
651 bucket_name = s->bucket_name;
652 return 0;
653 }
654
655public:
656 void execute() override;
657 void send_response() override {
658 if (op_ret) {
659 set_req_state_err(s, op_ret);
660 }
661 dump_errno(s);
662 end_header(s, this, "application/xml");
663
664 if (op_ret < 0) {
665 return;
666 }
667 notifications.dump_xml(s->formatter);
668 rgw_flush_formatter_and_reset(s, s->formatter);
669 }
670 const char* name() const override { return "pubsub_notifications_get_s3"; }
671};
672
673void RGWPSListNotifs_ObjStore_S3::execute() {
674 ups.emplace(store, s->owner.get_id());
675 auto b = ups->get_bucket(bucket_info.bucket);
676 ceph_assert(b);
677
678 // get all topics on a bucket
679 rgw_pubsub_bucket_topics bucket_topics;
680 op_ret = b->get_topics(&bucket_topics);
681 if (op_ret < 0) {
682 ldout(s->cct, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
683 return;
684 }
685 if (!notif_name.empty()) {
686 // get info of a specific notification
687 const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
688 if (unique_topic) {
689 notifications.list.emplace_back(unique_topic->get());
690 return;
691 }
692 op_ret = -ENOENT;
693 ldout(s->cct, 1) << "failed to get notification info for '" << notif_name << "', ret=" << op_ret << dendl;
694 return;
695 }
696 // loop through all topics of the bucket
697 for (const auto& topic : bucket_topics.topics) {
698 if (topic.second.s3_id.empty()) {
699 // not an s3 notification
700 continue;
701 }
702 notifications.list.emplace_back(topic.second);
703 }
704}
705
706RGWOp* RGWHandler_REST_PSNotifs_S3::op_get() {
707 return new RGWPSListNotifs_ObjStore_S3();
708}
709
710RGWOp* RGWHandler_REST_PSNotifs_S3::op_put() {
711 return new RGWPSCreateNotif_ObjStore_S3();
712}
713
714RGWOp* RGWHandler_REST_PSNotifs_S3::op_delete() {
715 return new RGWPSDeleteNotif_ObjStore_S3();
716}
717
718RGWOp* RGWHandler_REST_PSNotifs_S3::create_get_op() {
719 return new RGWPSListNotifs_ObjStore_S3();
720}
721
722RGWOp* RGWHandler_REST_PSNotifs_S3::create_put_op() {
723 return new RGWPSCreateNotif_ObjStore_S3();
724}
725
726RGWOp* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
727 return new RGWPSDeleteNotif_ObjStore_S3();
728}
729