]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <algorithm> | |
5 | #include <boost/tokenizer.hpp> | |
6 | #include <optional> | |
eafe8130 TL |
7 | #include "rgw_rest_pubsub.h" |
8 | #include "rgw_pubsub_push.h" | |
9 | #include "rgw_pubsub.h" | |
eafe8130 TL |
10 | #include "rgw_op.h" |
11 | #include "rgw_rest.h" | |
12 | #include "rgw_rest_s3.h" | |
13 | #include "rgw_arn.h" | |
14 | #include "rgw_auth_s3.h" | |
f67539c2 | 15 | #include "rgw_notify.h" |
eafe8130 | 16 | #include "services/svc_zone.h" |
1e59de90 TL |
17 | #include "common/dout.h" |
18 | #include "rgw_url.h" | |
eafe8130 TL |
19 | |
20 | #define dout_context g_ceph_context | |
21 | #define dout_subsys ceph_subsys_rgw | |
22 | ||
f67539c2 | 23 | static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/"); |
9f95a23c | 24 | |
1e59de90 TL |
25 | bool verify_transport_security(CephContext *cct, const RGWEnv& env) { |
26 | const auto is_secure = rgw_transport_is_secure(cct, env); | |
27 | if (!is_secure && g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) { | |
28 | ldout(cct, 0) << "WARNING: bypassing endpoint validation, allows sending secrets over insecure transport" << dendl; | |
29 | return true; | |
30 | } | |
31 | return is_secure; | |
32 | } | |
33 | ||
34 | // make sure that endpoint is a valid URL | |
35 | // make sure that if user/password are passed inside URL, it is over secure connection | |
36 | // update rgw_pubsub_dest to indicate that a password is stored in the URL | |
37 | bool validate_and_update_endpoint_secret(rgw_pubsub_dest& dest, CephContext *cct, const RGWEnv& env) { | |
38 | if (dest.push_endpoint.empty()) { | |
39 | return true; | |
40 | } | |
41 | std::string user; | |
42 | std::string password; | |
43 | if (!rgw::parse_url_userinfo(dest.push_endpoint, user, password)) { | |
44 | ldout(cct, 1) << "endpoint validation error: malformed endpoint URL:" << dest.push_endpoint << dendl; | |
45 | return false; | |
46 | } | |
47 | // this should be verified inside parse_url() | |
48 | ceph_assert(user.empty() == password.empty()); | |
49 | if (!user.empty()) { | |
50 | dest.stored_secret = true; | |
51 | if (!verify_transport_security(cct, env)) { | |
52 | ldout(cct, 1) << "endpoint validation error: sending secrets over insecure transport" << dendl; | |
53 | return false; | |
54 | } | |
55 | } | |
56 | return true; | |
57 | } | |
58 | ||
59 | bool topic_has_endpoint_secret(const rgw_pubsub_topic& topic) { | |
60 | return topic.dest.stored_secret; | |
61 | } | |
62 | ||
63 | bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) { | |
64 | for (const auto& topic : topics.topics) { | |
65 | if (topic_has_endpoint_secret(topic.second)) return true; | |
66 | } | |
67 | return false; | |
68 | } | |
69 | ||
eafe8130 TL |
70 | // command (AWS compliant): |
71 | // POST | |
f67539c2 | 72 | // Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]] |
1e59de90 TL |
73 | class RGWPSCreateTopicOp : public RGWOp { |
74 | private: | |
75 | std::string topic_name; | |
76 | rgw_pubsub_dest dest; | |
77 | std::string topic_arn; | |
78 | std::string opaque_data; | |
79 | ||
80 | int get_params() { | |
eafe8130 TL |
81 | topic_name = s->info.args.get("Name"); |
82 | if (topic_name.empty()) { | |
b3b6e05e | 83 | ldpp_dout(this, 1) << "CreateTopic Action 'Name' argument is missing" << dendl; |
9f95a23c | 84 | return -EINVAL; |
eafe8130 TL |
85 | } |
86 | ||
9f95a23c TL |
87 | opaque_data = s->info.args.get("OpaqueData"); |
88 | ||
eafe8130 | 89 | dest.push_endpoint = s->info.args.get("push-endpoint"); |
522d829b | 90 | s->info.args.get_bool("persistent", &dest.persistent, false); |
9f95a23c TL |
91 | |
92 | if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) { | |
93 | return -EINVAL; | |
94 | } | |
f67539c2 | 95 | for (const auto& param : s->info.args.get_params()) { |
9f95a23c TL |
96 | if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") { |
97 | continue; | |
98 | } | |
99 | dest.push_endpoint_args.append(param.first+"="+param.second+"&"); | |
eafe8130 TL |
100 | } |
101 | ||
102 | if (!dest.push_endpoint_args.empty()) { | |
9f95a23c TL |
103 | // remove last separator |
104 | dest.push_endpoint_args.pop_back(); | |
eafe8130 | 105 | } |
f67539c2 TL |
106 | if (!dest.push_endpoint.empty() && dest.persistent) { |
107 | const auto ret = rgw::notify::add_persistent_topic(topic_name, s->yield); | |
108 | if (ret < 0) { | |
b3b6e05e | 109 | ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for persistent topics. error:" << ret << dendl; |
f67539c2 TL |
110 | return ret; |
111 | } | |
112 | } | |
eafe8130 TL |
113 | |
114 | // dest object only stores endpoint info | |
eafe8130 TL |
115 | dest.arn_topic = topic_name; |
116 | // the topic ARN will be sent in the reply | |
117 | const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, | |
1e59de90 | 118 | driver->get_zone()->get_zonegroup().get_name(), |
9f95a23c | 119 | s->user->get_tenant(), topic_name); |
eafe8130 TL |
120 | topic_arn = arn.to_string(); |
121 | return 0; | |
122 | } | |
123 | ||
1e59de90 TL |
124 | public: |
125 | int verify_permission(optional_yield) override { | |
126 | return 0; | |
127 | } | |
128 | ||
129 | void pre_exec() override { | |
130 | rgw_bucket_object_pre_exec(s); | |
131 | } | |
132 | void execute(optional_yield) override; | |
133 | ||
134 | const char* name() const override { return "pubsub_topic_create"; } | |
135 | RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; } | |
136 | uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; } | |
137 | ||
eafe8130 TL |
138 | void send_response() override { |
139 | if (op_ret) { | |
140 | set_req_state_err(s, op_ret); | |
141 | } | |
142 | dump_errno(s); | |
143 | end_header(s, this, "application/xml"); | |
144 | ||
145 | if (op_ret < 0) { | |
146 | return; | |
147 | } | |
148 | ||
149 | const auto f = s->formatter; | |
f67539c2 | 150 | f->open_object_section_in_ns("CreateTopicResponse", AWS_SNS_NS); |
eafe8130 TL |
151 | f->open_object_section("CreateTopicResult"); |
152 | encode_xml("TopicArn", topic_arn, f); | |
f67539c2 | 153 | f->close_section(); // CreateTopicResult |
eafe8130 TL |
154 | f->open_object_section("ResponseMetadata"); |
155 | encode_xml("RequestId", s->req_id, f); | |
f67539c2 TL |
156 | f->close_section(); // ResponseMetadata |
157 | f->close_section(); // CreateTopicResponse | |
eafe8130 TL |
158 | rgw_flush_formatter_and_reset(s, f); |
159 | } | |
160 | }; | |
161 | ||
1e59de90 TL |
162 | void RGWPSCreateTopicOp::execute(optional_yield y) { |
163 | op_ret = get_params(); | |
164 | if (op_ret < 0) { | |
165 | return; | |
166 | } | |
167 | ||
168 | const RGWPubSub ps(driver, s->owner.get_id().tenant); | |
169 | op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data, y); | |
170 | if (op_ret < 0) { | |
171 | ldpp_dout(this, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl; | |
172 | return; | |
173 | } | |
174 | ldpp_dout(this, 20) << "successfully created topic '" << topic_name << "'" << dendl; | |
175 | } | |
176 | ||
eafe8130 TL |
177 | // command (AWS compliant): |
178 | // POST | |
179 | // Action=ListTopics | |
1e59de90 TL |
180 | class RGWPSListTopicsOp : public RGWOp { |
181 | private: | |
182 | rgw_pubsub_topics result; | |
183 | ||
eafe8130 | 184 | public: |
1e59de90 TL |
185 | int verify_permission(optional_yield) override { |
186 | return 0; | |
187 | } | |
188 | void pre_exec() override { | |
189 | rgw_bucket_object_pre_exec(s); | |
190 | } | |
191 | void execute(optional_yield) override; | |
192 | ||
193 | const char* name() const override { return "pubsub_topics_list"; } | |
194 | RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; } | |
195 | uint32_t op_mask() override { return RGW_OP_TYPE_READ; } | |
196 | ||
eafe8130 TL |
197 | void send_response() override { |
198 | if (op_ret) { | |
199 | set_req_state_err(s, op_ret); | |
200 | } | |
201 | dump_errno(s); | |
202 | end_header(s, this, "application/xml"); | |
203 | ||
204 | if (op_ret < 0) { | |
205 | return; | |
206 | } | |
207 | ||
208 | const auto f = s->formatter; | |
f67539c2 | 209 | f->open_object_section_in_ns("ListTopicsResponse", AWS_SNS_NS); |
eafe8130 TL |
210 | f->open_object_section("ListTopicsResult"); |
211 | encode_xml("Topics", result, f); | |
f67539c2 | 212 | f->close_section(); // ListTopicsResult |
eafe8130 TL |
213 | f->open_object_section("ResponseMetadata"); |
214 | encode_xml("RequestId", s->req_id, f); | |
f67539c2 TL |
215 | f->close_section(); // ResponseMetadat |
216 | f->close_section(); // ListTopicsResponse | |
eafe8130 TL |
217 | rgw_flush_formatter_and_reset(s, f); |
218 | } | |
219 | }; | |
220 | ||
1e59de90 TL |
221 | void RGWPSListTopicsOp::execute(optional_yield y) { |
222 | const RGWPubSub ps(driver, s->owner.get_id().tenant); | |
223 | op_ret = ps.get_topics(this, result, y); | |
224 | // if there are no topics it is not considered an error | |
225 | op_ret = op_ret == -ENOENT ? 0 : op_ret; | |
226 | if (op_ret < 0) { | |
227 | ldpp_dout(this, 1) << "failed to get topics, ret=" << op_ret << dendl; | |
228 | return; | |
229 | } | |
230 | if (topics_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) { | |
231 | ldpp_dout(this, 1) << "topics contain secrets and cannot be sent over insecure transport" << dendl; | |
232 | op_ret = -EPERM; | |
233 | return; | |
234 | } | |
235 | ldpp_dout(this, 20) << "successfully got topics" << dendl; | |
236 | } | |
237 | ||
eafe8130 TL |
238 | // command (extension to AWS): |
239 | // POST | |
240 | // Action=GetTopic&TopicArn=<topic-arn> | |
1e59de90 TL |
241 | class RGWPSGetTopicOp : public RGWOp { |
242 | private: | |
243 | std::string topic_name; | |
244 | rgw_pubsub_topic result; | |
245 | ||
246 | int get_params() { | |
eafe8130 TL |
247 | const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn"))); |
248 | ||
249 | if (!topic_arn || topic_arn->resource.empty()) { | |
b3b6e05e | 250 | ldpp_dout(this, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl; |
eafe8130 TL |
251 | return -EINVAL; |
252 | } | |
253 | ||
254 | topic_name = topic_arn->resource; | |
255 | return 0; | |
256 | } | |
257 | ||
1e59de90 TL |
258 | public: |
259 | int verify_permission(optional_yield y) override { | |
260 | return 0; | |
261 | } | |
262 | void pre_exec() override { | |
263 | rgw_bucket_object_pre_exec(s); | |
264 | } | |
265 | void execute(optional_yield y) override; | |
266 | ||
267 | const char* name() const override { return "pubsub_topic_get"; } | |
268 | RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; } | |
269 | uint32_t op_mask() override { return RGW_OP_TYPE_READ; } | |
270 | ||
eafe8130 TL |
271 | void send_response() override { |
272 | if (op_ret) { | |
273 | set_req_state_err(s, op_ret); | |
274 | } | |
275 | dump_errno(s); | |
276 | end_header(s, this, "application/xml"); | |
277 | ||
278 | if (op_ret < 0) { | |
279 | return; | |
280 | } | |
281 | ||
282 | const auto f = s->formatter; | |
283 | f->open_object_section("GetTopicResponse"); | |
284 | f->open_object_section("GetTopicResult"); | |
1e59de90 | 285 | encode_xml("Topic", result, f); |
eafe8130 TL |
286 | f->close_section(); |
287 | f->open_object_section("ResponseMetadata"); | |
288 | encode_xml("RequestId", s->req_id, f); | |
289 | f->close_section(); | |
290 | f->close_section(); | |
291 | rgw_flush_formatter_and_reset(s, f); | |
292 | } | |
293 | }; | |
294 | ||
1e59de90 TL |
295 | void RGWPSGetTopicOp::execute(optional_yield y) { |
296 | op_ret = get_params(); | |
297 | if (op_ret < 0) { | |
298 | return; | |
299 | } | |
300 | const RGWPubSub ps(driver, s->owner.get_id().tenant); | |
301 | op_ret = ps.get_topic(this, topic_name, result, y); | |
302 | if (op_ret < 0) { | |
303 | ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; | |
304 | return; | |
305 | } | |
306 | if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) { | |
307 | ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl; | |
308 | op_ret = -EPERM; | |
309 | return; | |
310 | } | |
311 | ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl; | |
312 | } | |
313 | ||
f67539c2 TL |
314 | // command (AWS compliant): |
315 | // POST | |
316 | // Action=GetTopicAttributes&TopicArn=<topic-arn> | |
1e59de90 TL |
317 | class RGWPSGetTopicAttributesOp : public RGWOp { |
318 | private: | |
319 | std::string topic_name; | |
320 | rgw_pubsub_topic result; | |
321 | ||
322 | int get_params() { | |
f67539c2 TL |
323 | const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn"))); |
324 | ||
325 | if (!topic_arn || topic_arn->resource.empty()) { | |
b3b6e05e | 326 | ldpp_dout(this, 1) << "GetTopicAttribute Action 'TopicArn' argument is missing or invalid" << dendl; |
f67539c2 TL |
327 | return -EINVAL; |
328 | } | |
329 | ||
330 | topic_name = topic_arn->resource; | |
331 | return 0; | |
332 | } | |
333 | ||
1e59de90 TL |
334 | public: |
335 | int verify_permission(optional_yield y) override { | |
336 | return 0; | |
337 | } | |
338 | void pre_exec() override { | |
339 | rgw_bucket_object_pre_exec(s); | |
340 | } | |
341 | void execute(optional_yield y) override; | |
342 | ||
343 | const char* name() const override { return "pubsub_topic_get"; } | |
344 | RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; } | |
345 | uint32_t op_mask() override { return RGW_OP_TYPE_READ; } | |
346 | ||
f67539c2 TL |
347 | void send_response() override { |
348 | if (op_ret) { | |
349 | set_req_state_err(s, op_ret); | |
350 | } | |
351 | dump_errno(s); | |
352 | end_header(s, this, "application/xml"); | |
353 | ||
354 | if (op_ret < 0) { | |
355 | return; | |
356 | } | |
357 | ||
358 | const auto f = s->formatter; | |
359 | f->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS); | |
360 | f->open_object_section("GetTopicAttributesResult"); | |
1e59de90 | 361 | result.dump_xml_as_attributes(f); |
f67539c2 TL |
362 | f->close_section(); // GetTopicAttributesResult |
363 | f->open_object_section("ResponseMetadata"); | |
364 | encode_xml("RequestId", s->req_id, f); | |
365 | f->close_section(); // ResponseMetadata | |
366 | f->close_section(); // GetTopicAttributesResponse | |
367 | rgw_flush_formatter_and_reset(s, f); | |
368 | } | |
369 | }; | |
370 | ||
1e59de90 TL |
371 | void RGWPSGetTopicAttributesOp::execute(optional_yield y) { |
372 | op_ret = get_params(); | |
373 | if (op_ret < 0) { | |
374 | return; | |
375 | } | |
376 | const RGWPubSub ps(driver, s->owner.get_id().tenant); | |
377 | op_ret = ps.get_topic(this, topic_name, result, y); | |
378 | if (op_ret < 0) { | |
379 | ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; | |
380 | return; | |
381 | } | |
382 | if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) { | |
383 | ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl; | |
384 | op_ret = -EPERM; | |
385 | return; | |
386 | } | |
387 | ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl; | |
388 | } | |
389 | ||
eafe8130 TL |
390 | // command (AWS compliant): |
391 | // POST | |
392 | // Action=DeleteTopic&TopicArn=<topic-arn> | |
1e59de90 TL |
393 | class RGWPSDeleteTopicOp : public RGWOp { |
394 | private: | |
395 | std::string topic_name; | |
396 | ||
397 | int get_params() { | |
eafe8130 TL |
398 | const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn"))); |
399 | ||
400 | if (!topic_arn || topic_arn->resource.empty()) { | |
b3b6e05e | 401 | ldpp_dout(this, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl; |
9f95a23c | 402 | return -EINVAL; |
eafe8130 TL |
403 | } |
404 | ||
405 | topic_name = topic_arn->resource; | |
f67539c2 TL |
406 | |
407 | // upon deletion it is not known if topic is persistent or not | |
408 | // will try to delete the persistent topic anyway | |
409 | const auto ret = rgw::notify::remove_persistent_topic(topic_name, s->yield); | |
410 | if (ret == -ENOENT) { | |
411 | // topic was not persistent, or already deleted | |
412 | return 0; | |
413 | } | |
414 | if (ret < 0) { | |
b3b6e05e | 415 | ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for persistent topics. error:" << ret << dendl; |
f67539c2 TL |
416 | return ret; |
417 | } | |
418 | ||
eafe8130 TL |
419 | return 0; |
420 | } | |
1e59de90 TL |
421 | |
422 | public: | |
423 | int verify_permission(optional_yield) override { | |
424 | return 0; | |
425 | } | |
426 | void pre_exec() override { | |
427 | rgw_bucket_object_pre_exec(s); | |
428 | } | |
429 | void execute(optional_yield y) override; | |
430 | ||
431 | const char* name() const override { return "pubsub_topic_delete"; } | |
432 | RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; } | |
433 | uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; } | |
434 | ||
eafe8130 TL |
435 | void send_response() override { |
436 | if (op_ret) { | |
437 | set_req_state_err(s, op_ret); | |
438 | } | |
439 | dump_errno(s); | |
440 | end_header(s, this, "application/xml"); | |
441 | ||
442 | if (op_ret < 0) { | |
443 | return; | |
444 | } | |
445 | ||
446 | const auto f = s->formatter; | |
f67539c2 | 447 | f->open_object_section_in_ns("DeleteTopicResponse", AWS_SNS_NS); |
eafe8130 TL |
448 | f->open_object_section("ResponseMetadata"); |
449 | encode_xml("RequestId", s->req_id, f); | |
f67539c2 TL |
450 | f->close_section(); // ResponseMetadata |
451 | f->close_section(); // DeleteTopicResponse | |
eafe8130 TL |
452 | rgw_flush_formatter_and_reset(s, f); |
453 | } | |
454 | }; | |
455 | ||
1e59de90 TL |
456 | void RGWPSDeleteTopicOp::execute(optional_yield y) { |
457 | op_ret = get_params(); | |
458 | if (op_ret < 0) { | |
eafe8130 TL |
459 | return; |
460 | } | |
1e59de90 TL |
461 | const RGWPubSub ps(driver, s->owner.get_id().tenant); |
462 | op_ret = ps.remove_topic(this, topic_name, y); | |
463 | if (op_ret < 0) { | |
464 | ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl; | |
465 | return; | |
eafe8130 | 466 | } |
1e59de90 | 467 | ldpp_dout(this, 1) << "successfully removed topic '" << topic_name << "'" << dendl; |
eafe8130 TL |
468 | } |
469 | ||
1e59de90 TL |
470 | using op_generator = RGWOp*(*)(); |
471 | static const std::unordered_map<std::string, op_generator> op_generators = { | |
472 | {"CreateTopic", []() -> RGWOp* {return new RGWPSCreateTopicOp;}}, | |
473 | {"DeleteTopic", []() -> RGWOp* {return new RGWPSDeleteTopicOp;}}, | |
474 | {"ListTopics", []() -> RGWOp* {return new RGWPSListTopicsOp;}}, | |
475 | {"GetTopic", []() -> RGWOp* {return new RGWPSGetTopicOp;}}, | |
476 | {"GetTopicAttributes", []() -> RGWOp* {return new RGWPSGetTopicAttributesOp;}} | |
477 | }; | |
478 | ||
479 | bool RGWHandler_REST_PSTopic_AWS::action_exists(const req_state* s) | |
480 | { | |
481 | if (s->info.args.exists("Action")) { | |
482 | const std::string action_name = s->info.args.get("Action"); | |
483 | return op_generators.contains(action_name); | |
eafe8130 | 484 | } |
1e59de90 | 485 | return false; |
eafe8130 TL |
486 | } |
487 | ||
1e59de90 TL |
488 | RGWOp *RGWHandler_REST_PSTopic_AWS::op_post() |
489 | { | |
490 | s->dialect = "sns"; | |
491 | s->prot_flags = RGW_REST_STS; | |
eafe8130 TL |
492 | |
493 | if (s->info.args.exists("Action")) { | |
1e59de90 TL |
494 | const std::string action_name = s->info.args.get("Action"); |
495 | const auto action_it = op_generators.find(action_name); | |
496 | if (action_it != op_generators.end()) { | |
497 | return action_it->second(); | |
498 | } | |
499 | ldpp_dout(s, 10) << "unknown action '" << action_name << "' for Topic handler" << dendl; | |
500 | } else { | |
501 | ldpp_dout(s, 10) << "missing action argument in Topic handler" << dendl; | |
eafe8130 | 502 | } |
eafe8130 TL |
503 | return nullptr; |
504 | } | |
505 | ||
f67539c2 | 506 | int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp, optional_yield y) { |
1e59de90 TL |
507 | const auto rc = RGW_Auth_S3::authorize(dpp, driver, auth_registry, s, y); |
508 | if (rc < 0) { | |
509 | return rc; | |
510 | } | |
511 | if (s->auth.identity->is_anonymous()) { | |
512 | ldpp_dout(dpp, 1) << "anonymous user not allowed in topic operations" << dendl; | |
513 | return -ERR_INVALID_REQUEST; | |
514 | } | |
515 | return 0; | |
eafe8130 TL |
516 | } |
517 | ||
eafe8130 TL |
518 | namespace { |
519 | // return a unique topic by prefexing with the notification name: <notification>_<topic> | |
520 | std::string topic_to_unique(const std::string& topic, const std::string& notification) { | |
521 | return notification + "_" + topic; | |
522 | } | |
523 | ||
524 | // extract the topic from a unique topic of the form: <notification>_<topic> | |
525 | [[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) { | |
1e59de90 | 526 | if (unique_topic.find(notification + "_") == std::string::npos) { |
eafe8130 TL |
527 | return ""; |
528 | } | |
529 | return unique_topic.substr(notification.length() + 1); | |
530 | } | |
531 | ||
532 | // from list of bucket topics, find the one that was auto-generated by a notification | |
533 | auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std::string& notif_name) { | |
534 | auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), [&](const auto& val) { return notif_name == val.second.s3_id; }); | |
535 | return it != bucket_topics.topics.end() ? | |
536 | std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second): | |
537 | std::nullopt; | |
538 | } | |
539 | } | |
540 | ||
1e59de90 TL |
541 | int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) { |
542 | int op_ret = b.remove_notification(dpp, topic_name, y); | |
20effc67 TL |
543 | if (op_ret < 0) { |
544 | ldpp_dout(dpp, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl; | |
545 | } | |
546 | op_ret = ps.remove_topic(dpp, topic_name, y); | |
547 | if (op_ret < 0) { | |
548 | ldpp_dout(dpp, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl; | |
549 | } | |
550 | return op_ret; | |
551 | } | |
552 | ||
1e59de90 | 553 | int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) { |
20effc67 TL |
554 | // delete all notifications of on a bucket |
555 | for (const auto& topic : bucket_topics.topics) { | |
1e59de90 | 556 | const auto op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps); |
20effc67 TL |
557 | if (op_ret < 0) { |
558 | return op_ret; | |
559 | } | |
560 | } | |
561 | return 0; | |
562 | } | |
563 | ||
eafe8130 TL |
564 | // command (S3 compliant): PUT /<bucket name>?notification |
565 | // a "notification" and a subscription will be auto-generated | |
566 | // actual configuration is XML encoded in the body of the message | |
1e59de90 TL |
567 | class RGWPSCreateNotifOp : public RGWDefaultResponseOp { |
568 | int verify_params() override { | |
569 | bool exists; | |
570 | const auto no_value = s->info.args.get("notification", &exists); | |
571 | if (!exists) { | |
572 | ldpp_dout(this, 1) << "missing required param 'notification'" << dendl; | |
573 | return -EINVAL; | |
574 | } | |
575 | if (no_value.length() > 0) { | |
576 | ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl; | |
577 | return -EINVAL; | |
578 | } | |
579 | if (s->bucket_name.empty()) { | |
580 | ldpp_dout(this, 1) << "request must be on a bucket" << dendl; | |
581 | return -EINVAL; | |
582 | } | |
583 | return 0; | |
584 | } | |
eafe8130 | 585 | |
1e59de90 | 586 | int get_params_from_body(rgw_pubsub_s3_notifications& configurations) { |
eafe8130 TL |
587 | const auto max_size = s->cct->_conf->rgw_max_put_param_size; |
588 | int r; | |
589 | bufferlist data; | |
20effc67 | 590 | std::tie(r, data) = read_all_input(s, max_size, false); |
eafe8130 TL |
591 | |
592 | if (r < 0) { | |
b3b6e05e | 593 | ldpp_dout(this, 1) << "failed to read XML payload" << dendl; |
eafe8130 TL |
594 | return r; |
595 | } | |
596 | if (data.length() == 0) { | |
b3b6e05e | 597 | ldpp_dout(this, 1) << "XML payload missing" << dendl; |
eafe8130 TL |
598 | return -EINVAL; |
599 | } | |
600 | ||
601 | RGWXMLDecoder::XMLParser parser; | |
602 | ||
603 | if (!parser.init()){ | |
b3b6e05e | 604 | ldpp_dout(this, 1) << "failed to initialize XML parser" << dendl; |
eafe8130 TL |
605 | return -EINVAL; |
606 | } | |
607 | if (!parser.parse(data.c_str(), data.length(), 1)) { | |
b3b6e05e | 608 | ldpp_dout(this, 1) << "failed to parse XML payload" << dendl; |
eafe8130 TL |
609 | return -ERR_MALFORMED_XML; |
610 | } | |
611 | try { | |
612 | // NotificationConfigurations is mandatory | |
20effc67 | 613 | // It can be empty which means we delete all the notifications |
eafe8130 TL |
614 | RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true); |
615 | } catch (RGWXMLDecoder::err& err) { | |
b3b6e05e | 616 | ldpp_dout(this, 1) << "failed to parse XML payload. error: " << err << dendl; |
eafe8130 TL |
617 | return -ERR_MALFORMED_XML; |
618 | } | |
619 | return 0; | |
620 | } | |
1e59de90 TL |
621 | public: |
622 | int verify_permission(optional_yield y) override; | |
eafe8130 | 623 | |
1e59de90 TL |
624 | void pre_exec() override { |
625 | rgw_bucket_object_pre_exec(s); | |
eafe8130 TL |
626 | } |
627 | ||
eafe8130 | 628 | const char* name() const override { return "pubsub_notification_create_s3"; } |
1e59de90 TL |
629 | RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; } |
630 | uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; } | |
631 | ||
632 | ||
f67539c2 | 633 | void execute(optional_yield) override; |
eafe8130 TL |
634 | }; |
635 | ||
1e59de90 TL |
636 | void RGWPSCreateNotifOp::execute(optional_yield y) { |
637 | op_ret = verify_params(); | |
eafe8130 TL |
638 | if (op_ret < 0) { |
639 | return; | |
640 | } | |
641 | ||
1e59de90 TL |
642 | rgw_pubsub_s3_notifications configurations; |
643 | op_ret = get_params_from_body(configurations); | |
644 | if (op_ret < 0) { | |
645 | return; | |
646 | } | |
20effc67 | 647 | |
1e59de90 TL |
648 | std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id()); |
649 | std::unique_ptr<rgw::sal::Bucket> bucket; | |
650 | op_ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, s->bucket_name, &bucket, y); | |
651 | if (op_ret < 0) { | |
652 | ldpp_dout(this, 1) << "failed to get bucket '" << s->bucket_name << "' info, ret = " << op_ret << dendl; | |
653 | return; | |
eafe8130 TL |
654 | } |
655 | ||
1e59de90 TL |
656 | const RGWPubSub ps(driver, s->owner.get_id().tenant); |
657 | const RGWPubSub::Bucket b(ps, bucket.get()); | |
658 | ||
20effc67 TL |
659 | if(configurations.list.empty()) { |
660 | // get all topics on a bucket | |
661 | rgw_pubsub_bucket_topics bucket_topics; | |
1e59de90 | 662 | op_ret = b.get_topics(this, bucket_topics, y); |
20effc67 | 663 | if (op_ret < 0) { |
1e59de90 | 664 | ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl; |
20effc67 TL |
665 | return; |
666 | } | |
667 | ||
1e59de90 | 668 | op_ret = delete_all_notifications(this, bucket_topics, b, y, ps); |
20effc67 TL |
669 | return; |
670 | } | |
671 | ||
eafe8130 TL |
672 | for (const auto& c : configurations.list) { |
673 | const auto& notif_name = c.id; | |
674 | if (notif_name.empty()) { | |
b3b6e05e | 675 | ldpp_dout(this, 1) << "missing notification id" << dendl; |
eafe8130 TL |
676 | op_ret = -EINVAL; |
677 | return; | |
678 | } | |
679 | if (c.topic_arn.empty()) { | |
b3b6e05e | 680 | ldpp_dout(this, 1) << "missing topic ARN in notification: '" << notif_name << "'" << dendl; |
eafe8130 TL |
681 | op_ret = -EINVAL; |
682 | return; | |
683 | } | |
684 | ||
685 | const auto arn = rgw::ARN::parse(c.topic_arn); | |
686 | if (!arn || arn->resource.empty()) { | |
b3b6e05e | 687 | ldpp_dout(this, 1) << "topic ARN has invalid format: '" << c.topic_arn << "' in notification: '" << notif_name << "'" << dendl; |
eafe8130 TL |
688 | op_ret = -EINVAL; |
689 | return; | |
690 | } | |
691 | ||
692 | if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) { | |
b3b6e05e | 693 | ldpp_dout(this, 1) << "unknown event type in notification: '" << notif_name << "'" << dendl; |
eafe8130 TL |
694 | op_ret = -EINVAL; |
695 | return; | |
696 | } | |
697 | ||
698 | const auto topic_name = arn->resource; | |
699 | ||
700 | // get topic information. destination information is stored in the topic | |
701 | rgw_pubsub_topic topic_info; | |
1e59de90 | 702 | op_ret = ps.get_topic(this, topic_name, topic_info, y); |
eafe8130 | 703 | if (op_ret < 0) { |
b3b6e05e | 704 | ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; |
eafe8130 TL |
705 | return; |
706 | } | |
707 | // make sure that full topic configuration match | |
708 | // TODO: use ARN match function | |
709 | ||
710 | // create unique topic name. this has 2 reasons: | |
711 | // (1) topics cannot be shared between different S3 notifications because they hold the filter information | |
712 | // (2) make topic clneaup easier, when notification is removed | |
713 | const auto unique_topic_name = topic_to_unique(topic_name, notif_name); | |
714 | // generate the internal topic. destination is stored here for the "push-only" case | |
715 | // when no subscription exists | |
716 | // ARN is cached to make the "GET" method faster | |
1e59de90 | 717 | op_ret = ps.create_topic(this, unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y); |
eafe8130 | 718 | if (op_ret < 0) { |
b3b6e05e | 719 | ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name << |
eafe8130 TL |
720 | "', ret=" << op_ret << dendl; |
721 | return; | |
722 | } | |
b3b6e05e | 723 | ldpp_dout(this, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl; |
eafe8130 TL |
724 | // generate the notification |
725 | rgw::notify::EventTypeList events; | |
1e59de90 | 726 | op_ret = b.create_notification(this, unique_topic_name, c.events, std::make_optional(c.filter), notif_name, y); |
eafe8130 | 727 | if (op_ret < 0) { |
b3b6e05e | 728 | ldpp_dout(this, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name << |
eafe8130 TL |
729 | "', ret=" << op_ret << dendl; |
730 | // rollback generated topic (ignore return value) | |
1e59de90 | 731 | ps.remove_topic(this, unique_topic_name, y); |
eafe8130 TL |
732 | return; |
733 | } | |
b3b6e05e | 734 | ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl; |
eafe8130 TL |
735 | } |
736 | } | |
737 | ||
1e59de90 TL |
738 | int RGWPSCreateNotifOp::verify_permission(optional_yield y) { |
739 | if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) { | |
740 | return -EACCES; | |
741 | } | |
eafe8130 | 742 | |
1e59de90 TL |
743 | return 0; |
744 | } | |
745 | ||
746 | // command (extension to S3): DELETE /bucket?notification[=<notification-id>] | |
747 | class RGWPSDeleteNotifOp : public RGWDefaultResponseOp { | |
748 | int get_params(std::string& notif_name) const { | |
eafe8130 TL |
749 | bool exists; |
750 | notif_name = s->info.args.get("notification", &exists); | |
751 | if (!exists) { | |
b3b6e05e | 752 | ldpp_dout(this, 1) << "missing required param 'notification'" << dendl; |
eafe8130 TL |
753 | return -EINVAL; |
754 | } | |
755 | if (s->bucket_name.empty()) { | |
b3b6e05e | 756 | ldpp_dout(this, 1) << "request must be on a bucket" << dendl; |
eafe8130 TL |
757 | return -EINVAL; |
758 | } | |
eafe8130 TL |
759 | return 0; |
760 | } | |
761 | ||
eafe8130 | 762 | public: |
1e59de90 TL |
763 | int verify_permission(optional_yield y) override; |
764 | ||
765 | void pre_exec() override { | |
766 | rgw_bucket_object_pre_exec(s); | |
767 | } | |
768 | ||
eafe8130 | 769 | const char* name() const override { return "pubsub_notification_delete_s3"; } |
1e59de90 TL |
770 | RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; } |
771 | uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; } | |
772 | ||
773 | void execute(optional_yield y) override; | |
eafe8130 TL |
774 | }; |
775 | ||
1e59de90 TL |
776 | void RGWPSDeleteNotifOp::execute(optional_yield y) { |
777 | std::string notif_name; | |
778 | op_ret = get_params(notif_name); | |
779 | if (op_ret < 0) { | |
780 | return; | |
781 | } | |
782 | ||
783 | std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id()); | |
784 | std::unique_ptr<rgw::sal::Bucket> bucket; | |
785 | op_ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, s->bucket_name, &bucket, y); | |
eafe8130 | 786 | if (op_ret < 0) { |
1e59de90 | 787 | ldpp_dout(this, 1) << "failed to get bucket '" << s->bucket_name << "' info, ret = " << op_ret << dendl; |
eafe8130 TL |
788 | return; |
789 | } | |
790 | ||
1e59de90 TL |
791 | const RGWPubSub ps(driver, s->owner.get_id().tenant); |
792 | const RGWPubSub::Bucket b(ps, bucket.get()); | |
eafe8130 TL |
793 | |
794 | // get all topics on a bucket | |
795 | rgw_pubsub_bucket_topics bucket_topics; | |
1e59de90 | 796 | op_ret = b.get_topics(this, bucket_topics, y); |
eafe8130 | 797 | if (op_ret < 0) { |
1e59de90 | 798 | ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl; |
eafe8130 TL |
799 | return; |
800 | } | |
801 | ||
802 | if (!notif_name.empty()) { | |
803 | // delete a specific notification | |
804 | const auto unique_topic = find_unique_topic(bucket_topics, notif_name); | |
805 | if (unique_topic) { | |
eafe8130 | 806 | const auto unique_topic_name = unique_topic->get().topic.name; |
1e59de90 | 807 | op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, ps); |
eafe8130 TL |
808 | return; |
809 | } | |
810 | // notification to be removed is not found - considered success | |
b3b6e05e | 811 | ldpp_dout(this, 20) << "notification '" << notif_name << "' already removed" << dendl; |
eafe8130 TL |
812 | return; |
813 | } | |
814 | ||
1e59de90 TL |
815 | op_ret = delete_all_notifications(this, bucket_topics, b, y, ps); |
816 | } | |
817 | ||
818 | int RGWPSDeleteNotifOp::verify_permission(optional_yield y) { | |
819 | if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) { | |
820 | return -EACCES; | |
821 | } | |
822 | ||
823 | return 0; | |
eafe8130 TL |
824 | } |
825 | ||
826 | // command (S3 compliant): GET /bucket?notification[=<notification-id>] | |
1e59de90 | 827 | class RGWPSListNotifsOp : public RGWOp { |
eafe8130 TL |
828 | rgw_pubsub_s3_notifications notifications; |
829 | ||
1e59de90 | 830 | int get_params(std::string& notif_name) const { |
eafe8130 TL |
831 | bool exists; |
832 | notif_name = s->info.args.get("notification", &exists); | |
833 | if (!exists) { | |
b3b6e05e | 834 | ldpp_dout(this, 1) << "missing required param 'notification'" << dendl; |
eafe8130 TL |
835 | return -EINVAL; |
836 | } | |
837 | if (s->bucket_name.empty()) { | |
b3b6e05e | 838 | ldpp_dout(this, 1) << "request must be on a bucket" << dendl; |
eafe8130 TL |
839 | return -EINVAL; |
840 | } | |
eafe8130 TL |
841 | return 0; |
842 | } | |
843 | ||
844 | public: | |
1e59de90 TL |
845 | int verify_permission(optional_yield y) override; |
846 | ||
847 | void pre_exec() override { | |
848 | rgw_bucket_object_pre_exec(s); | |
849 | } | |
850 | ||
851 | const char* name() const override { return "pubsub_notifications_get_s3"; } | |
852 | RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; } | |
853 | uint32_t op_mask() override { return RGW_OP_TYPE_READ; } | |
854 | ||
f67539c2 | 855 | void execute(optional_yield y) override; |
eafe8130 TL |
856 | void send_response() override { |
857 | if (op_ret) { | |
858 | set_req_state_err(s, op_ret); | |
859 | } | |
860 | dump_errno(s); | |
861 | end_header(s, this, "application/xml"); | |
862 | ||
863 | if (op_ret < 0) { | |
864 | return; | |
865 | } | |
866 | notifications.dump_xml(s->formatter); | |
867 | rgw_flush_formatter_and_reset(s, s->formatter); | |
868 | } | |
eafe8130 TL |
869 | }; |
870 | ||
1e59de90 TL |
871 | void RGWPSListNotifsOp::execute(optional_yield y) { |
872 | std::string notif_name; | |
873 | op_ret = get_params(notif_name); | |
874 | if (op_ret < 0) { | |
875 | return; | |
876 | } | |
877 | ||
878 | std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id()); | |
879 | std::unique_ptr<rgw::sal::Bucket> bucket; | |
880 | op_ret = driver->get_bucket(this, user.get(), s->owner.get_id().tenant, s->bucket_name, &bucket, y); | |
881 | if (op_ret < 0) { | |
882 | ldpp_dout(this, 1) << "failed to get bucket '" << s->bucket_name << "' info, ret = " << op_ret << dendl; | |
883 | return; | |
884 | } | |
885 | ||
886 | const RGWPubSub ps(driver, s->owner.get_id().tenant); | |
887 | const RGWPubSub::Bucket b(ps, bucket.get()); | |
eafe8130 TL |
888 | |
889 | // get all topics on a bucket | |
890 | rgw_pubsub_bucket_topics bucket_topics; | |
1e59de90 | 891 | op_ret = b.get_topics(this, bucket_topics, y); |
eafe8130 | 892 | if (op_ret < 0) { |
1e59de90 | 893 | ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl; |
eafe8130 TL |
894 | return; |
895 | } | |
896 | if (!notif_name.empty()) { | |
897 | // get info of a specific notification | |
898 | const auto unique_topic = find_unique_topic(bucket_topics, notif_name); | |
899 | if (unique_topic) { | |
900 | notifications.list.emplace_back(unique_topic->get()); | |
901 | return; | |
902 | } | |
903 | op_ret = -ENOENT; | |
b3b6e05e | 904 | ldpp_dout(this, 1) << "failed to get notification info for '" << notif_name << "', ret=" << op_ret << dendl; |
eafe8130 TL |
905 | return; |
906 | } | |
907 | // loop through all topics of the bucket | |
908 | for (const auto& topic : bucket_topics.topics) { | |
909 | if (topic.second.s3_id.empty()) { | |
910 | // not an s3 notification | |
911 | continue; | |
912 | } | |
913 | notifications.list.emplace_back(topic.second); | |
914 | } | |
915 | } | |
916 | ||
1e59de90 TL |
917 | int RGWPSListNotifsOp::verify_permission(optional_yield y) { |
918 | if (!verify_bucket_permission(this, s, rgw::IAM::s3GetBucketNotification)) { | |
919 | return -EACCES; | |
920 | } | |
921 | ||
922 | return 0; | |
923 | } | |
924 | ||
eafe8130 | 925 | RGWOp* RGWHandler_REST_PSNotifs_S3::op_get() { |
1e59de90 | 926 | return new RGWPSListNotifsOp(); |
eafe8130 TL |
927 | } |
928 | ||
929 | RGWOp* RGWHandler_REST_PSNotifs_S3::op_put() { | |
1e59de90 | 930 | return new RGWPSCreateNotifOp(); |
eafe8130 TL |
931 | } |
932 | ||
933 | RGWOp* RGWHandler_REST_PSNotifs_S3::op_delete() { | |
1e59de90 | 934 | return new RGWPSDeleteNotifOp(); |
eafe8130 TL |
935 | } |
936 | ||
937 | RGWOp* RGWHandler_REST_PSNotifs_S3::create_get_op() { | |
1e59de90 | 938 | return new RGWPSListNotifsOp(); |
eafe8130 TL |
939 | } |
940 | ||
941 | RGWOp* RGWHandler_REST_PSNotifs_S3::create_put_op() { | |
1e59de90 | 942 | return new RGWPSCreateNotifOp(); |
eafe8130 TL |
943 | } |
944 | ||
945 | RGWOp* RGWHandler_REST_PSNotifs_S3::create_delete_op() { | |
1e59de90 | 946 | return new RGWPSDeleteNotifOp(); |
eafe8130 TL |
947 | } |
948 |