]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <algorithm> | |
5 | #include <boost/tokenizer.hpp> | |
6 | #include <optional> | |
7 | #include "rgw_rest_pubsub_common.h" | |
8 | #include "rgw_rest_pubsub.h" | |
9 | #include "rgw_pubsub_push.h" | |
10 | #include "rgw_pubsub.h" | |
11 | #include "rgw_sync_module_pubsub.h" | |
12 | #include "rgw_op.h" | |
13 | #include "rgw_rest.h" | |
14 | #include "rgw_rest_s3.h" | |
15 | #include "rgw_arn.h" | |
16 | #include "rgw_auth_s3.h" | |
17 | #include "services/svc_zone.h" | |
18 | ||
19 | #define dout_context g_ceph_context | |
20 | #define dout_subsys ceph_subsys_rgw | |
21 | ||
9f95a23c | 22 | |
eafe8130 TL |
23 | // command (AWS compliant): |
24 | // POST | |
25 | // Action=CreateTopic&Name=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]] | |
26 | class RGWPSCreateTopic_ObjStore_AWS : public RGWPSCreateTopicOp { | |
27 | public: | |
28 | int get_params() override { | |
29 | topic_name = s->info.args.get("Name"); | |
30 | if (topic_name.empty()) { | |
9f95a23c TL |
31 | ldout(s->cct, 1) << "CreateTopic Action 'Name' argument is missing" << dendl; |
32 | return -EINVAL; | |
eafe8130 TL |
33 | } |
34 | ||
9f95a23c TL |
35 | opaque_data = s->info.args.get("OpaqueData"); |
36 | ||
eafe8130 | 37 | dest.push_endpoint = s->info.args.get("push-endpoint"); |
9f95a23c TL |
38 | |
39 | if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) { | |
40 | return -EINVAL; | |
41 | } | |
eafe8130 | 42 | for (const auto param : s->info.args.get_params()) { |
9f95a23c TL |
43 | if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") { |
44 | continue; | |
45 | } | |
46 | dest.push_endpoint_args.append(param.first+"="+param.second+"&"); | |
eafe8130 TL |
47 | } |
48 | ||
49 | if (!dest.push_endpoint_args.empty()) { | |
9f95a23c TL |
50 | // remove last separator |
51 | dest.push_endpoint_args.pop_back(); | |
eafe8130 TL |
52 | } |
53 | ||
54 | // dest object only stores endpoint info | |
55 | // bucket to store events/records will be set only when subscription is created | |
56 | dest.bucket_name = ""; | |
57 | dest.oid_prefix = ""; | |
58 | dest.arn_topic = topic_name; | |
59 | // the topic ARN will be sent in the reply | |
60 | const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, | |
9f95a23c TL |
61 | store->svc()->zone->get_zonegroup().get_name(), |
62 | s->user->get_tenant(), topic_name); | |
eafe8130 TL |
63 | topic_arn = arn.to_string(); |
64 | return 0; | |
65 | } | |
66 | ||
67 | void send_response() override { | |
68 | if (op_ret) { | |
69 | set_req_state_err(s, op_ret); | |
70 | } | |
71 | dump_errno(s); | |
72 | end_header(s, this, "application/xml"); | |
73 | ||
74 | if (op_ret < 0) { | |
75 | return; | |
76 | } | |
77 | ||
78 | const auto f = s->formatter; | |
79 | f->open_object_section_in_ns("CreateTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/"); | |
80 | f->open_object_section("CreateTopicResult"); | |
81 | encode_xml("TopicArn", topic_arn, f); | |
82 | f->close_section(); | |
83 | f->open_object_section("ResponseMetadata"); | |
84 | encode_xml("RequestId", s->req_id, f); | |
85 | f->close_section(); | |
86 | f->close_section(); | |
87 | rgw_flush_formatter_and_reset(s, f); | |
88 | } | |
89 | }; | |
90 | ||
91 | // command (AWS compliant): | |
92 | // POST | |
93 | // Action=ListTopics | |
94 | class RGWPSListTopics_ObjStore_AWS : public RGWPSListTopicsOp { | |
95 | public: | |
96 | void send_response() override { | |
97 | if (op_ret) { | |
98 | set_req_state_err(s, op_ret); | |
99 | } | |
100 | dump_errno(s); | |
101 | end_header(s, this, "application/xml"); | |
102 | ||
103 | if (op_ret < 0) { | |
104 | return; | |
105 | } | |
106 | ||
107 | const auto f = s->formatter; | |
108 | f->open_object_section_in_ns("ListTopicsResponse", "https://sns.amazonaws.com/doc/2010-03-31/"); | |
109 | f->open_object_section("ListTopicsResult"); | |
110 | encode_xml("Topics", result, f); | |
111 | f->close_section(); | |
112 | f->open_object_section("ResponseMetadata"); | |
113 | encode_xml("RequestId", s->req_id, f); | |
114 | f->close_section(); | |
115 | f->close_section(); | |
116 | rgw_flush_formatter_and_reset(s, f); | |
117 | } | |
118 | }; | |
119 | ||
120 | // command (extension to AWS): | |
121 | // POST | |
122 | // Action=GetTopic&TopicArn=<topic-arn> | |
123 | class RGWPSGetTopic_ObjStore_AWS : public RGWPSGetTopicOp { | |
124 | public: | |
125 | int get_params() override { | |
126 | const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn"))); | |
127 | ||
128 | if (!topic_arn || topic_arn->resource.empty()) { | |
129 | ldout(s->cct, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl; | |
130 | return -EINVAL; | |
131 | } | |
132 | ||
133 | topic_name = topic_arn->resource; | |
134 | return 0; | |
135 | } | |
136 | ||
137 | void send_response() override { | |
138 | if (op_ret) { | |
139 | set_req_state_err(s, op_ret); | |
140 | } | |
141 | dump_errno(s); | |
142 | end_header(s, this, "application/xml"); | |
143 | ||
144 | if (op_ret < 0) { | |
145 | return; | |
146 | } | |
147 | ||
148 | const auto f = s->formatter; | |
149 | f->open_object_section("GetTopicResponse"); | |
150 | f->open_object_section("GetTopicResult"); | |
151 | encode_xml("Topic", result.topic, f); | |
152 | f->close_section(); | |
153 | f->open_object_section("ResponseMetadata"); | |
154 | encode_xml("RequestId", s->req_id, f); | |
155 | f->close_section(); | |
156 | f->close_section(); | |
157 | rgw_flush_formatter_and_reset(s, f); | |
158 | } | |
159 | }; | |
160 | ||
161 | // command (AWS compliant): | |
162 | // POST | |
163 | // Action=DeleteTopic&TopicArn=<topic-arn> | |
164 | class RGWPSDeleteTopic_ObjStore_AWS : public RGWPSDeleteTopicOp { | |
165 | public: | |
166 | int get_params() override { | |
167 | const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn"))); | |
168 | ||
169 | if (!topic_arn || topic_arn->resource.empty()) { | |
9f95a23c TL |
170 | ldout(s->cct, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl; |
171 | return -EINVAL; | |
eafe8130 TL |
172 | } |
173 | ||
174 | topic_name = topic_arn->resource; | |
175 | return 0; | |
176 | } | |
177 | ||
178 | void send_response() override { | |
179 | if (op_ret) { | |
180 | set_req_state_err(s, op_ret); | |
181 | } | |
182 | dump_errno(s); | |
183 | end_header(s, this, "application/xml"); | |
184 | ||
185 | if (op_ret < 0) { | |
186 | return; | |
187 | } | |
188 | ||
189 | const auto f = s->formatter; | |
190 | f->open_object_section_in_ns("DeleteTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/"); | |
191 | f->open_object_section("ResponseMetadata"); | |
192 | encode_xml("RequestId", s->req_id, f); | |
193 | f->close_section(); | |
194 | f->close_section(); | |
195 | rgw_flush_formatter_and_reset(s, f); | |
196 | } | |
197 | }; | |
198 | ||
199 | namespace { | |
200 | // utility classes and functions for handling parameters with the following format: | |
201 | // Attributes.entry.{N}.{key|value}={VALUE} | |
202 | // N - any unsigned number | |
203 | // VALUE - url encoded string | |
204 | ||
205 | // and Attribute is holding key and value | |
206 | // ctor and set are done according to the "type" argument | |
207 | // if type is not "key" or "value" its a no-op | |
208 | class Attribute { | |
9f95a23c TL |
209 | std::string key; |
210 | std::string value; | |
eafe8130 | 211 | public: |
9f95a23c TL |
212 | Attribute(const std::string& type, const std::string& key_or_value) { |
213 | set(type, key_or_value); | |
214 | } | |
215 | void set(const std::string& type, const std::string& key_or_value) { | |
216 | if (type == "key") { | |
217 | key = key_or_value; | |
218 | } else if (type == "value") { | |
219 | value = key_or_value; | |
eafe8130 | 220 | } |
9f95a23c TL |
221 | } |
222 | const std::string& get_key() const { return key; } | |
223 | const std::string& get_value() const { return value; } | |
eafe8130 TL |
224 | }; |
225 | ||
226 | using AttributeMap = std::map<unsigned, Attribute>; | |
227 | ||
228 | // aggregate the attributes into a map | |
229 | // the key and value are associated by the index (N) | |
230 | // no assumptions are made on the order in which these parameters are added | |
231 | void update_attribute_map(const std::string& input, AttributeMap& map) { | |
232 | const boost::char_separator<char> sep("."); | |
233 | const boost::tokenizer tokens(input, sep); | |
234 | auto token = tokens.begin(); | |
235 | if (*token != "Attributes") { | |
236 | return; | |
237 | } | |
238 | ++token; | |
239 | ||
240 | if (*token != "entry") { | |
241 | return; | |
242 | } | |
243 | ++token; | |
244 | ||
245 | unsigned idx; | |
246 | try { | |
247 | idx = std::stoul(*token); | |
248 | } catch (const std::invalid_argument&) { | |
249 | return; | |
250 | } | |
251 | ++token; | |
252 | ||
253 | std::string key_or_value = ""; | |
254 | // get the rest of the string regardless of dots | |
255 | // this is to allow dots in the value | |
256 | while (token != tokens.end()) { | |
257 | key_or_value.append(*token+"."); | |
258 | ++token; | |
259 | } | |
260 | // remove last separator | |
261 | key_or_value.pop_back(); | |
262 | ||
263 | auto pos = key_or_value.find("="); | |
264 | if (pos != string::npos) { | |
265 | const auto key_or_value_lhs = key_or_value.substr(0, pos); | |
266 | const auto key_or_value_rhs = url_decode(key_or_value.substr(pos + 1, key_or_value.size() - 1)); | |
267 | const auto map_it = map.find(idx); | |
268 | if (map_it == map.end()) { | |
269 | // new entry | |
270 | map.emplace(std::make_pair(idx, Attribute(key_or_value_lhs, key_or_value_rhs))); | |
271 | } else { | |
272 | // existing entry | |
273 | map_it->second.set(key_or_value_lhs, key_or_value_rhs); | |
274 | } | |
275 | } | |
276 | } | |
277 | } | |
278 | ||
279 | void RGWHandler_REST_PSTopic_AWS::rgw_topic_parse_input() { | |
280 | if (post_body.size() > 0) { | |
281 | ldout(s->cct, 10) << "Content of POST: " << post_body << dendl; | |
282 | ||
283 | if (post_body.find("Action") != string::npos) { | |
284 | const boost::char_separator<char> sep("&"); | |
285 | const boost::tokenizer<boost::char_separator<char>> tokens(post_body, sep); | |
286 | AttributeMap map; | |
287 | for (const auto& t : tokens) { | |
288 | auto pos = t.find("="); | |
289 | if (pos != string::npos) { | |
290 | const auto key = t.substr(0, pos); | |
291 | if (key == "Action") { | |
292 | s->info.args.append(key, t.substr(pos + 1, t.size() - 1)); | |
293 | } else if (key == "Name" || key == "TopicArn") { | |
294 | const auto value = url_decode(t.substr(pos + 1, t.size() - 1)); | |
295 | s->info.args.append(key, value); | |
296 | } else { | |
297 | update_attribute_map(t, map); | |
298 | } | |
299 | } | |
300 | } | |
301 | // update the regular args with the content of the attribute map | |
302 | for (const auto attr : map) { | |
303 | s->info.args.append(attr.second.get_key(), attr.second.get_value()); | |
304 | } | |
305 | } | |
306 | const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body); | |
307 | s->info.args.append("PayloadHash", payload_hash); | |
308 | } | |
309 | } | |
310 | ||
311 | RGWOp* RGWHandler_REST_PSTopic_AWS::op_post() { | |
312 | rgw_topic_parse_input(); | |
313 | ||
314 | if (s->info.args.exists("Action")) { | |
315 | const auto action = s->info.args.get("Action"); | |
316 | if (action.compare("CreateTopic") == 0) | |
317 | return new RGWPSCreateTopic_ObjStore_AWS(); | |
318 | if (action.compare("DeleteTopic") == 0) | |
319 | return new RGWPSDeleteTopic_ObjStore_AWS; | |
320 | if (action.compare("ListTopics") == 0) | |
321 | return new RGWPSListTopics_ObjStore_AWS(); | |
322 | if (action.compare("GetTopic") == 0) | |
323 | return new RGWPSGetTopic_ObjStore_AWS(); | |
324 | } | |
325 | ||
326 | return nullptr; | |
327 | } | |
328 | ||
329 | int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp) { | |
330 | /*if (s->info.args.exists("Action") && s->info.args.get("Action").find("Topic") != std::string::npos) { | |
331 | // TODO: some topic specific authorization | |
332 | return 0; | |
333 | }*/ | |
334 | return RGW_Auth_S3::authorize(dpp, store, auth_registry, s); | |
335 | } | |
336 | ||
337 | ||
338 | namespace { | |
339 | // return a unique topic by prefexing with the notification name: <notification>_<topic> | |
340 | std::string topic_to_unique(const std::string& topic, const std::string& notification) { | |
341 | return notification + "_" + topic; | |
342 | } | |
343 | ||
344 | // extract the topic from a unique topic of the form: <notification>_<topic> | |
345 | [[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) { | |
346 | if (unique_topic.find(notification + "_") == string::npos) { | |
347 | return ""; | |
348 | } | |
349 | return unique_topic.substr(notification.length() + 1); | |
350 | } | |
351 | ||
352 | // from list of bucket topics, find the one that was auto-generated by a notification | |
353 | auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std::string& notif_name) { | |
354 | auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), [&](const auto& val) { return notif_name == val.second.s3_id; }); | |
355 | return it != bucket_topics.topics.end() ? | |
356 | std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second): | |
357 | std::nullopt; | |
358 | } | |
359 | } | |
360 | ||
361 | // command (S3 compliant): PUT /<bucket name>?notification | |
362 | // a "notification" and a subscription will be auto-generated | |
363 | // actual configuration is XML encoded in the body of the message | |
364 | class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp { | |
365 | rgw_pubsub_s3_notifications configurations; | |
366 | ||
367 | int get_params_from_body() { | |
368 | const auto max_size = s->cct->_conf->rgw_max_put_param_size; | |
369 | int r; | |
370 | bufferlist data; | |
371 | std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false); | |
372 | ||
373 | if (r < 0) { | |
374 | ldout(s->cct, 1) << "failed to read XML payload" << dendl; | |
375 | return r; | |
376 | } | |
377 | if (data.length() == 0) { | |
378 | ldout(s->cct, 1) << "XML payload missing" << dendl; | |
379 | return -EINVAL; | |
380 | } | |
381 | ||
382 | RGWXMLDecoder::XMLParser parser; | |
383 | ||
384 | if (!parser.init()){ | |
385 | ldout(s->cct, 1) << "failed to initialize XML parser" << dendl; | |
386 | return -EINVAL; | |
387 | } | |
388 | if (!parser.parse(data.c_str(), data.length(), 1)) { | |
389 | ldout(s->cct, 1) << "failed to parse XML payload" << dendl; | |
390 | return -ERR_MALFORMED_XML; | |
391 | } | |
392 | try { | |
393 | // NotificationConfigurations is mandatory | |
394 | RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true); | |
395 | } catch (RGWXMLDecoder::err& err) { | |
396 | ldout(s->cct, 1) << "failed to parse XML payload. error: " << err << dendl; | |
397 | return -ERR_MALFORMED_XML; | |
398 | } | |
399 | return 0; | |
400 | } | |
401 | ||
402 | int get_params() override { | |
403 | bool exists; | |
404 | const auto no_value = s->info.args.get("notification", &exists); | |
405 | if (!exists) { | |
406 | ldout(s->cct, 1) << "missing required param 'notification'" << dendl; | |
407 | return -EINVAL; | |
408 | } | |
409 | if (no_value.length() > 0) { | |
410 | ldout(s->cct, 1) << "param 'notification' should not have any value" << dendl; | |
411 | return -EINVAL; | |
412 | } | |
413 | if (s->bucket_name.empty()) { | |
414 | ldout(s->cct, 1) << "request must be on a bucket" << dendl; | |
415 | return -EINVAL; | |
416 | } | |
417 | bucket_name = s->bucket_name; | |
418 | return 0; | |
419 | } | |
420 | ||
421 | public: | |
422 | const char* name() const override { return "pubsub_notification_create_s3"; } | |
423 | void execute() override; | |
424 | }; | |
425 | ||
426 | void RGWPSCreateNotif_ObjStore_S3::execute() { | |
427 | op_ret = get_params_from_body(); | |
428 | if (op_ret < 0) { | |
429 | return; | |
430 | } | |
431 | ||
432 | ups.emplace(store, s->owner.get_id()); | |
433 | auto b = ups->get_bucket(bucket_info.bucket); | |
434 | ceph_assert(b); | |
435 | std::string data_bucket_prefix = ""; | |
436 | std::string data_oid_prefix = ""; | |
437 | bool push_only = true; | |
9f95a23c TL |
438 | if (store->getRados()->get_sync_module()) { |
439 | const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->getRados()->get_sync_module().get()); | |
eafe8130 | 440 | if (psmodule) { |
9f95a23c TL |
441 | const auto& conf = psmodule->get_effective_conf(); |
442 | data_bucket_prefix = conf["data_bucket_prefix"]; | |
443 | data_oid_prefix = conf["data_oid_prefix"]; | |
444 | // TODO: allow "push-only" on PS zone as well | |
445 | push_only = false; | |
eafe8130 TL |
446 | } |
447 | } | |
448 | ||
449 | for (const auto& c : configurations.list) { | |
450 | const auto& notif_name = c.id; | |
451 | if (notif_name.empty()) { | |
452 | ldout(s->cct, 1) << "missing notification id" << dendl; | |
453 | op_ret = -EINVAL; | |
454 | return; | |
455 | } | |
456 | if (c.topic_arn.empty()) { | |
457 | ldout(s->cct, 1) << "missing topic ARN in notification: '" << notif_name << "'" << dendl; | |
458 | op_ret = -EINVAL; | |
459 | return; | |
460 | } | |
461 | ||
462 | const auto arn = rgw::ARN::parse(c.topic_arn); | |
463 | if (!arn || arn->resource.empty()) { | |
464 | ldout(s->cct, 1) << "topic ARN has invalid format: '" << c.topic_arn << "' in notification: '" << notif_name << "'" << dendl; | |
465 | op_ret = -EINVAL; | |
466 | return; | |
467 | } | |
468 | ||
469 | if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) { | |
470 | ldout(s->cct, 1) << "unknown event type in notification: '" << notif_name << "'" << dendl; | |
471 | op_ret = -EINVAL; | |
472 | return; | |
473 | } | |
474 | ||
475 | const auto topic_name = arn->resource; | |
476 | ||
477 | // get topic information. destination information is stored in the topic | |
478 | rgw_pubsub_topic topic_info; | |
479 | op_ret = ups->get_topic(topic_name, &topic_info); | |
480 | if (op_ret < 0) { | |
481 | ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; | |
482 | return; | |
483 | } | |
484 | // make sure that full topic configuration match | |
485 | // TODO: use ARN match function | |
486 | ||
487 | // create unique topic name. this has 2 reasons: | |
488 | // (1) topics cannot be shared between different S3 notifications because they hold the filter information | |
489 | // (2) make topic clneaup easier, when notification is removed | |
490 | const auto unique_topic_name = topic_to_unique(topic_name, notif_name); | |
491 | // generate the internal topic. destination is stored here for the "push-only" case | |
492 | // when no subscription exists | |
493 | // ARN is cached to make the "GET" method faster | |
9f95a23c | 494 | op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data); |
eafe8130 TL |
495 | if (op_ret < 0) { |
496 | ldout(s->cct, 1) << "failed to auto-generate unique topic '" << unique_topic_name << | |
497 | "', ret=" << op_ret << dendl; | |
498 | return; | |
499 | } | |
500 | ldout(s->cct, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl; | |
501 | // generate the notification | |
502 | rgw::notify::EventTypeList events; | |
503 | op_ret = b->create_notification(unique_topic_name, c.events, std::make_optional(c.filter), notif_name); | |
504 | if (op_ret < 0) { | |
505 | ldout(s->cct, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name << | |
506 | "', ret=" << op_ret << dendl; | |
507 | // rollback generated topic (ignore return value) | |
508 | ups->remove_topic(unique_topic_name); | |
509 | return; | |
510 | } | |
511 | ldout(s->cct, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl; | |
512 | ||
513 | if (!push_only) { | |
514 | // generate the subscription with destination information from the original topic | |
515 | rgw_pubsub_sub_dest dest = topic_info.dest; | |
516 | dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name; | |
517 | dest.oid_prefix = data_oid_prefix + notif_name + "/"; | |
518 | auto sub = ups->get_sub(notif_name); | |
519 | op_ret = sub->subscribe(unique_topic_name, dest, notif_name); | |
520 | if (op_ret < 0) { | |
521 | ldout(s->cct, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl; | |
522 | // rollback generated notification (ignore return value) | |
523 | b->remove_notification(unique_topic_name); | |
524 | // rollback generated topic (ignore return value) | |
525 | ups->remove_topic(unique_topic_name); | |
526 | return; | |
527 | } | |
528 | ldout(s->cct, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl; | |
529 | } | |
530 | } | |
531 | } | |
532 | ||
533 | // command (extension to S3): DELETE /bucket?notification[=<notification-id>] | |
534 | class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSDeleteNotifOp { | |
535 | private: | |
536 | std::string notif_name; | |
537 | ||
538 | int get_params() override { | |
539 | bool exists; | |
540 | notif_name = s->info.args.get("notification", &exists); | |
541 | if (!exists) { | |
542 | ldout(s->cct, 1) << "missing required param 'notification'" << dendl; | |
543 | return -EINVAL; | |
544 | } | |
545 | if (s->bucket_name.empty()) { | |
546 | ldout(s->cct, 1) << "request must be on a bucket" << dendl; | |
547 | return -EINVAL; | |
548 | } | |
549 | bucket_name = s->bucket_name; | |
550 | return 0; | |
551 | } | |
552 | ||
553 | void remove_notification_by_topic(const std::string& topic_name, const RGWUserPubSub::BucketRef& b) { | |
554 | op_ret = b->remove_notification(topic_name); | |
555 | if (op_ret < 0) { | |
556 | ldout(s->cct, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl; | |
557 | } | |
558 | op_ret = ups->remove_topic(topic_name); | |
559 | if (op_ret < 0) { | |
560 | ldout(s->cct, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl; | |
561 | } | |
562 | } | |
563 | ||
564 | public: | |
565 | void execute() override; | |
566 | const char* name() const override { return "pubsub_notification_delete_s3"; } | |
567 | }; | |
568 | ||
569 | void RGWPSDeleteNotif_ObjStore_S3::execute() { | |
570 | op_ret = get_params(); | |
571 | if (op_ret < 0) { | |
572 | return; | |
573 | } | |
574 | ||
575 | ups.emplace(store, s->owner.get_id()); | |
576 | auto b = ups->get_bucket(bucket_info.bucket); | |
577 | ceph_assert(b); | |
578 | ||
579 | // get all topics on a bucket | |
580 | rgw_pubsub_bucket_topics bucket_topics; | |
581 | op_ret = b->get_topics(&bucket_topics); | |
582 | if (op_ret < 0) { | |
583 | ldout(s->cct, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl; | |
584 | return; | |
585 | } | |
586 | ||
587 | if (!notif_name.empty()) { | |
588 | // delete a specific notification | |
589 | const auto unique_topic = find_unique_topic(bucket_topics, notif_name); | |
590 | if (unique_topic) { | |
591 | // remove the auto generated subscription according to notification name (if exist) | |
592 | const auto unique_topic_name = unique_topic->get().topic.name; | |
593 | auto sub = ups->get_sub(notif_name); | |
594 | op_ret = sub->unsubscribe(unique_topic_name); | |
595 | if (op_ret < 0 && op_ret != -ENOENT) { | |
596 | ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl; | |
597 | return; | |
598 | } | |
599 | remove_notification_by_topic(unique_topic_name, b); | |
600 | return; | |
601 | } | |
602 | // notification to be removed is not found - considered success | |
603 | ldout(s->cct, 20) << "notification '" << notif_name << "' already removed" << dendl; | |
604 | return; | |
605 | } | |
606 | ||
607 | // delete all notification of on a bucket | |
608 | for (const auto& topic : bucket_topics.topics) { | |
609 | // remove the auto generated subscription of the topic (if exist) | |
610 | rgw_pubsub_topic_subs topic_subs; | |
611 | op_ret = ups->get_topic(topic.first, &topic_subs); | |
612 | for (const auto& topic_sub_name : topic_subs.subs) { | |
613 | auto sub = ups->get_sub(topic_sub_name); | |
614 | rgw_pubsub_sub_config sub_conf; | |
615 | op_ret = sub->get_conf(&sub_conf); | |
616 | if (op_ret < 0) { | |
617 | ldout(s->cct, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl; | |
618 | return; | |
619 | } | |
620 | if (!sub_conf.s3_id.empty()) { | |
621 | // S3 notification, has autogenerated subscription | |
622 | const auto& sub_topic_name = sub_conf.topic; | |
623 | op_ret = sub->unsubscribe(sub_topic_name); | |
624 | if (op_ret < 0) { | |
625 | ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl; | |
626 | return; | |
627 | } | |
628 | } | |
629 | } | |
630 | remove_notification_by_topic(topic.first, b); | |
631 | } | |
632 | } | |
633 | ||
634 | // command (S3 compliant): GET /bucket?notification[=<notification-id>] | |
635 | class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp { | |
636 | private: | |
637 | std::string notif_name; | |
638 | rgw_pubsub_s3_notifications notifications; | |
639 | ||
640 | int get_params() override { | |
641 | bool exists; | |
642 | notif_name = s->info.args.get("notification", &exists); | |
643 | if (!exists) { | |
644 | ldout(s->cct, 1) << "missing required param 'notification'" << dendl; | |
645 | return -EINVAL; | |
646 | } | |
647 | if (s->bucket_name.empty()) { | |
648 | ldout(s->cct, 1) << "request must be on a bucket" << dendl; | |
649 | return -EINVAL; | |
650 | } | |
651 | bucket_name = s->bucket_name; | |
652 | return 0; | |
653 | } | |
654 | ||
655 | public: | |
656 | void execute() override; | |
657 | void send_response() override { | |
658 | if (op_ret) { | |
659 | set_req_state_err(s, op_ret); | |
660 | } | |
661 | dump_errno(s); | |
662 | end_header(s, this, "application/xml"); | |
663 | ||
664 | if (op_ret < 0) { | |
665 | return; | |
666 | } | |
667 | notifications.dump_xml(s->formatter); | |
668 | rgw_flush_formatter_and_reset(s, s->formatter); | |
669 | } | |
670 | const char* name() const override { return "pubsub_notifications_get_s3"; } | |
671 | }; | |
672 | ||
673 | void RGWPSListNotifs_ObjStore_S3::execute() { | |
674 | ups.emplace(store, s->owner.get_id()); | |
675 | auto b = ups->get_bucket(bucket_info.bucket); | |
676 | ceph_assert(b); | |
677 | ||
678 | // get all topics on a bucket | |
679 | rgw_pubsub_bucket_topics bucket_topics; | |
680 | op_ret = b->get_topics(&bucket_topics); | |
681 | if (op_ret < 0) { | |
682 | ldout(s->cct, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl; | |
683 | return; | |
684 | } | |
685 | if (!notif_name.empty()) { | |
686 | // get info of a specific notification | |
687 | const auto unique_topic = find_unique_topic(bucket_topics, notif_name); | |
688 | if (unique_topic) { | |
689 | notifications.list.emplace_back(unique_topic->get()); | |
690 | return; | |
691 | } | |
692 | op_ret = -ENOENT; | |
693 | ldout(s->cct, 1) << "failed to get notification info for '" << notif_name << "', ret=" << op_ret << dendl; | |
694 | return; | |
695 | } | |
696 | // loop through all topics of the bucket | |
697 | for (const auto& topic : bucket_topics.topics) { | |
698 | if (topic.second.s3_id.empty()) { | |
699 | // not an s3 notification | |
700 | continue; | |
701 | } | |
702 | notifications.list.emplace_back(topic.second); | |
703 | } | |
704 | } | |
705 | ||
706 | RGWOp* RGWHandler_REST_PSNotifs_S3::op_get() { | |
707 | return new RGWPSListNotifs_ObjStore_S3(); | |
708 | } | |
709 | ||
710 | RGWOp* RGWHandler_REST_PSNotifs_S3::op_put() { | |
711 | return new RGWPSCreateNotif_ObjStore_S3(); | |
712 | } | |
713 | ||
714 | RGWOp* RGWHandler_REST_PSNotifs_S3::op_delete() { | |
715 | return new RGWPSDeleteNotif_ObjStore_S3(); | |
716 | } | |
717 | ||
718 | RGWOp* RGWHandler_REST_PSNotifs_S3::create_get_op() { | |
719 | return new RGWPSListNotifs_ObjStore_S3(); | |
720 | } | |
721 | ||
722 | RGWOp* RGWHandler_REST_PSNotifs_S3::create_put_op() { | |
723 | return new RGWPSCreateNotif_ObjStore_S3(); | |
724 | } | |
725 | ||
726 | RGWOp* RGWHandler_REST_PSNotifs_S3::create_delete_op() { | |
727 | return new RGWPSDeleteNotif_ObjStore_S3(); | |
728 | } | |
729 |