]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_pubsub.cc
import ceph 14.2.5
[ceph.git] / ceph / src / rgw / rgw_pubsub.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "services/svc_zone.h"
5 #include "rgw_b64.h"
6 #include "rgw_rados.h"
7 #include "rgw_pubsub.h"
8 #include "rgw_tools.h"
9 #include "rgw_xml.h"
10 #include "rgw_arn.h"
11 #include "rgw_pubsub_push.h"
12 #include "rgw_rados.h"
13 #include <regex>
14 #include <algorithm>
15
16 #define dout_subsys ceph_subsys_rgw
17
18 bool rgw_s3_key_filter::decode_xml(XMLObj* obj) {
19 XMLObjIter iter = obj->find("FilterRule");
20 XMLObj *o;
21
22 const auto throw_if_missing = true;
23 auto prefix_not_set = true;
24 auto suffix_not_set = true;
25 auto regex_not_set = true;
26 std::string name;
27
28 while ((o = iter.get_next())) {
29 RGWXMLDecoder::decode_xml("Name", name, o, throw_if_missing);
30 if (name == "prefix" && prefix_not_set) {
31 prefix_not_set = false;
32 RGWXMLDecoder::decode_xml("Value", prefix_rule, o, throw_if_missing);
33 } else if (name == "suffix" && suffix_not_set) {
34 suffix_not_set = false;
35 RGWXMLDecoder::decode_xml("Value", suffix_rule, o, throw_if_missing);
36 } else if (name == "regex" && regex_not_set) {
37 regex_not_set = false;
38 RGWXMLDecoder::decode_xml("Value", regex_rule, o, throw_if_missing);
39 } else {
40 throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name + "'");
41 }
42 }
43 return true;
44 }
45
46 void rgw_s3_key_filter::dump_xml(Formatter *f) const {
47 if (!prefix_rule.empty()) {
48 f->open_object_section("FilterRule");
49 ::encode_xml("Name", "prefix", f);
50 ::encode_xml("Value", prefix_rule, f);
51 f->close_section();
52 }
53 if (!suffix_rule.empty()) {
54 f->open_object_section("FilterRule");
55 ::encode_xml("Name", "suffix", f);
56 ::encode_xml("Value", suffix_rule, f);
57 f->close_section();
58 }
59 if (!regex_rule.empty()) {
60 f->open_object_section("FilterRule");
61 ::encode_xml("Name", "regex", f);
62 ::encode_xml("Value", regex_rule, f);
63 f->close_section();
64 }
65 }
66
67 bool rgw_s3_key_filter::has_content() const {
68 return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty());
69 }
70
71 bool rgw_s3_metadata_filter::decode_xml(XMLObj* obj) {
72 metadata.clear();
73 XMLObjIter iter = obj->find("FilterRule");
74 XMLObj *o;
75
76 const auto throw_if_missing = true;
77
78 std::string key;
79 std::string value;
80
81 while ((o = iter.get_next())) {
82 RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing);
83 RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing);
84 metadata.emplace(key, value);
85 }
86 return true;
87 }
88
89 void rgw_s3_metadata_filter::dump_xml(Formatter *f) const {
90 for (const auto& key_value : metadata) {
91 f->open_object_section("FilterRule");
92 ::encode_xml("Name", key_value.first, f);
93 ::encode_xml("Value", key_value.second, f);
94 f->close_section();
95 }
96 }
97
98 bool rgw_s3_metadata_filter::has_content() const {
99 return !metadata.empty();
100 }
101
102 bool rgw_s3_filter::decode_xml(XMLObj* obj) {
103 RGWXMLDecoder::decode_xml("S3Key", key_filter, obj);
104 RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj);
105 return true;
106 }
107
108 void rgw_s3_filter::dump_xml(Formatter *f) const {
109 if (key_filter.has_content()) {
110 ::encode_xml("S3Key", key_filter, f);
111 }
112 if (metadata_filter.has_content()) {
113 ::encode_xml("S3Metadata", metadata_filter, f);
114 }
115 }
116
117 bool rgw_s3_filter::has_content() const {
118 return key_filter.has_content() ||
119 metadata_filter.has_content();
120 }
121
122 bool match(const rgw_s3_key_filter& filter, const std::string& key) {
123 const auto key_size = key.size();
124 const auto prefix_size = filter.prefix_rule.size();
125 if (prefix_size != 0) {
126 // prefix rule exists
127 if (prefix_size > key_size) {
128 // if prefix is longer than key, we fail
129 return false;
130 }
131 if (!std::equal(filter.prefix_rule.begin(), filter.prefix_rule.end(), key.begin())) {
132 return false;
133 }
134 }
135 const auto suffix_size = filter.suffix_rule.size();
136 if (suffix_size != 0) {
137 // suffix rule exists
138 if (suffix_size > key_size) {
139 // if suffix is longer than key, we fail
140 return false;
141 }
142 if (!std::equal(filter.suffix_rule.begin(), filter.suffix_rule.end(), (key.end() - suffix_size))) {
143 return false;
144 }
145 }
146 if (!filter.regex_rule.empty()) {
147 // TODO add regex chaching in the filter
148 const std::regex base_regex(filter.regex_rule);
149 if (!std::regex_match(key, base_regex)) {
150 return false;
151 }
152 }
153 return true;
154 }
155
156 bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata) {
157 // all filter pairs must exist with the same value in the object's metadata
158 // object metadata may include items not in the filter
159 return std::includes(metadata.begin(), metadata.end(), filter.metadata.begin(), filter.metadata.end());
160 }
161
162 bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) {
163 // if event list exists, and none of the events in the list matches the event type, filter the message
164 if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) {
165 return false;
166 }
167 return true;
168 }
169
170 void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj) {
171 l.clear();
172
173 XMLObjIter iter = obj->find(name);
174 XMLObj *o;
175
176 while ((o = iter.get_next())) {
177 std::string val;
178 decode_xml_obj(val, o);
179 l.push_back(rgw::notify::from_string(val));
180 }
181 }
182
183 bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) {
184 const auto throw_if_missing = true;
185 RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing);
186
187 RGWXMLDecoder::decode_xml("Topic", topic_arn, obj, throw_if_missing);
188
189 RGWXMLDecoder::decode_xml("Filter", filter, obj);
190
191 do_decode_xml_obj(events, "Event", obj);
192 if (events.empty()) {
193 // if no events are provided, we assume all events
194 events.push_back(rgw::notify::ObjectCreated);
195 events.push_back(rgw::notify::ObjectRemoved);
196 }
197 return true;
198 }
199
200 void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const {
201 ::encode_xml("Id", id, f);
202 ::encode_xml("Topic", topic_arn.c_str(), f);
203 if (filter.has_content()) {
204 ::encode_xml("Filter", filter, f);
205 }
206 for (const auto& event : events) {
207 ::encode_xml("Event", rgw::notify::to_string(event), f);
208 }
209 }
210
211 bool rgw_pubsub_s3_notifications::decode_xml(XMLObj *obj) {
212 do_decode_xml_obj(list, "TopicConfiguration", obj);
213 if (list.empty()) {
214 throw RGWXMLDecoder::err("at least one 'TopicConfiguration' must exist");
215 }
216 return true;
217 }
218
219 rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter) :
220 id(topic_filter.s3_id), events(topic_filter.events), topic_arn(topic_filter.topic.arn), filter(topic_filter.s3_filter) {}
221
222 void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const {
223 do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f);
224 }
225
226 void rgw_pubsub_s3_record::dump(Formatter *f) const {
227 encode_json("eventVersion", eventVersion, f);
228 encode_json("eventSource", eventSource, f);
229 encode_json("awsRegion", awsRegion, f);
230 utime_t ut(eventTime);
231 encode_json("eventTime", ut, f);
232 encode_json("eventName", eventName, f);
233 {
234 Formatter::ObjectSection s(*f, "userIdentity");
235 encode_json("principalId", userIdentity, f);
236 }
237 {
238 Formatter::ObjectSection s(*f, "requestParameters");
239 encode_json("sourceIPAddress", sourceIPAddress, f);
240 }
241 {
242 Formatter::ObjectSection s(*f, "responseElements");
243 encode_json("x-amz-request-id", x_amz_request_id, f);
244 encode_json("x-amz-id-2", x_amz_id_2, f);
245 }
246 {
247 Formatter::ObjectSection s(*f, "s3");
248 encode_json("s3SchemaVersion", s3SchemaVersion, f);
249 encode_json("configurationId", configurationId, f);
250 {
251 Formatter::ObjectSection sub_s(*f, "bucket");
252 encode_json("name", bucket_name, f);
253 {
254 Formatter::ObjectSection sub_sub_s(*f, "ownerIdentity");
255 encode_json("principalId", bucket_ownerIdentity, f);
256 }
257 encode_json("arn", bucket_arn, f);
258 encode_json("id", bucket_id, f);
259 }
260 {
261 Formatter::ObjectSection sub_s(*f, "object");
262 encode_json("key", object_key, f);
263 encode_json("size", object_size, f);
264 encode_json("etag", object_etag, f);
265 encode_json("versionId", object_versionId, f);
266 encode_json("sequencer", object_sequencer, f);
267 encode_json("metadata", x_meta_map, f);
268 }
269 }
270 encode_json("eventId", id, f);
271 }
272
273 void rgw_pubsub_event::dump(Formatter *f) const
274 {
275 encode_json("id", id, f);
276 encode_json("event", event_name, f);
277 utime_t ut(timestamp);
278 encode_json("timestamp", ut, f);
279 encode_json("info", info, f);
280 }
281
282 void rgw_pubsub_topic::dump(Formatter *f) const
283 {
284 encode_json("user", user, f);
285 encode_json("name", name, f);
286 encode_json("dest", dest, f);
287 encode_json("arn", arn, f);
288 }
289
290 void rgw_pubsub_topic::dump_xml(Formatter *f) const
291 {
292 encode_xml("User", user, f);
293 encode_xml("Name", name, f);
294 encode_xml("EndPoint", dest, f);
295 encode_xml("TopicArn", arn, f);
296 }
297
298 void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f)
299 {
300 f->open_array_section(name);
301 for (auto iter = l.cbegin(); iter != l.cend(); ++iter) {
302 f->dump_string("obj", rgw::notify::to_ceph_string(*iter));
303 }
304 f->close_section();
305 }
306
307 void rgw_pubsub_topic_filter::dump(Formatter *f) const
308 {
309 encode_json("topic", topic, f);
310 encode_json("events", events, f);
311 }
312
313 void rgw_pubsub_topic_subs::dump(Formatter *f) const
314 {
315 encode_json("topic", topic, f);
316 encode_json("subs", subs, f);
317 }
318
319 void rgw_pubsub_bucket_topics::dump(Formatter *f) const
320 {
321 Formatter::ArraySection s(*f, "topics");
322 for (auto& t : topics) {
323 encode_json(t.first.c_str(), t.second, f);
324 }
325 }
326
327 void rgw_pubsub_user_topics::dump(Formatter *f) const
328 {
329 Formatter::ArraySection s(*f, "topics");
330 for (auto& t : topics) {
331 encode_json(t.first.c_str(), t.second, f);
332 }
333 }
334
335 void rgw_pubsub_user_topics::dump_xml(Formatter *f) const
336 {
337 for (auto& t : topics) {
338 encode_xml("member", t.second.topic, f);
339 }
340 }
341
342 void rgw_pubsub_sub_dest::dump(Formatter *f) const
343 {
344 encode_json("bucket_name", bucket_name, f);
345 encode_json("oid_prefix", oid_prefix, f);
346 encode_json("push_endpoint", push_endpoint, f);
347 encode_json("push_endpoint_args", push_endpoint_args, f);
348 encode_json("push_endpoint_topic", arn_topic, f);
349 }
350
351 void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const
352 {
353 encode_xml("EndpointAddress", push_endpoint, f);
354 encode_xml("EndpointArgs", push_endpoint_args, f);
355 encode_xml("EndpointTopic", arn_topic, f);
356 }
357
358 void rgw_pubsub_sub_config::dump(Formatter *f) const
359 {
360 encode_json("user", user, f);
361 encode_json("name", name, f);
362 encode_json("topic", topic, f);
363 encode_json("dest", dest, f);
364 encode_json("s3_id", s3_id, f);
365 }
366
367
368 int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
369 {
370 int ret = rgw_delete_system_obj(store, obj.pool, obj.oid, objv_tracker);
371 if (ret < 0) {
372 return ret;
373 }
374
375 return 0;
376 }
377
378 int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker)
379 {
380 int ret = read(user_meta_obj, result, objv_tracker);
381 if (ret < 0) {
382 ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
383 return ret;
384 }
385 return 0;
386 }
387
388 int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker)
389 {
390 int ret = write(user_meta_obj, topics, objv_tracker);
391 if (ret < 0 && ret != -ENOENT) {
392 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
393 return ret;
394 }
395 return 0;
396 }
397
398 int RGWUserPubSub::get_user_topics(rgw_pubsub_user_topics *result)
399 {
400 return read_user_topics(result, nullptr);
401 }
402
403 int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
404 {
405 int ret = ps->read(bucket_meta_obj, result, objv_tracker);
406 if (ret < 0 && ret != -ENOENT) {
407 ldout(ps->store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
408 return ret;
409 }
410 return 0;
411 }
412
413 int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker)
414 {
415 int ret = ps->write(bucket_meta_obj, topics, objv_tracker);
416 if (ret < 0) {
417 ldout(ps->store->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
418 return ret;
419 }
420
421 return 0;
422 }
423
424 int RGWUserPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
425 {
426 return read_topics(result, nullptr);
427 }
428
429 int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
430 {
431 rgw_pubsub_user_topics topics;
432 int ret = get_user_topics(&topics);
433 if (ret < 0) {
434 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
435 return ret;
436 }
437
438 auto iter = topics.topics.find(name);
439 if (iter == topics.topics.end()) {
440 ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
441 return -ENOENT;
442 }
443
444 *result = iter->second;
445 return 0;
446 }
447
448 int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
449 {
450 rgw_pubsub_user_topics topics;
451 int ret = get_user_topics(&topics);
452 if (ret < 0) {
453 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
454 return ret;
455 }
456
457 auto iter = topics.topics.find(name);
458 if (iter == topics.topics.end()) {
459 ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
460 return -ENOENT;
461 }
462
463 *result = iter->second.topic;
464 return 0;
465 }
466
467 int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events) {
468 return create_notification(topic_name, events, std::nullopt, "");
469 }
470
471 int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name) {
472 rgw_pubsub_topic_subs user_topic_info;
473 RGWRados *store = ps->store;
474
475 int ret = ps->get_topic(topic_name, &user_topic_info);
476 if (ret < 0) {
477 ldout(store->ctx(), 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
478 return ret;
479 }
480 ldout(store->ctx(), 20) << "successfully read topic '" << topic_name << "' info" << dendl;
481
482 RGWObjVersionTracker objv_tracker;
483 rgw_pubsub_bucket_topics bucket_topics;
484
485 ret = read_topics(&bucket_topics, &objv_tracker);
486 if (ret < 0) {
487 ldout(store->ctx(), 1) << "ERROR: failed to read topics from bucket '" <<
488 bucket.name << "': ret=" << ret << dendl;
489 return ret;
490 }
491 ldout(store->ctx(), 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" <<
492 bucket.name << "'" << dendl;
493
494 auto& topic_filter = bucket_topics.topics[topic_name];
495 topic_filter.topic = user_topic_info.topic;
496 topic_filter.events = events;
497 topic_filter.s3_id = notif_name;
498 if (s3_filter) {
499 topic_filter.s3_filter = *s3_filter;
500 }
501
502 ret = write_topics(bucket_topics, &objv_tracker);
503 if (ret < 0) {
504 ldout(store->ctx(), 1) << "ERROR: failed to write topics to bucket '" << bucket.name << "': ret=" << ret << dendl;
505 return ret;
506 }
507
508 ldout(store->ctx(), 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket.name << "'" << dendl;
509
510 return 0;
511 }
512
513 int RGWUserPubSub::Bucket::remove_notification(const string& topic_name)
514 {
515 rgw_pubsub_topic_subs user_topic_info;
516 RGWRados *store = ps->store;
517
518 int ret = ps->get_topic(topic_name, &user_topic_info);
519 if (ret < 0) {
520 ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl;
521 return ret;
522 }
523
524 RGWObjVersionTracker objv_tracker;
525 rgw_pubsub_bucket_topics bucket_topics;
526
527 ret = read_topics(&bucket_topics, &objv_tracker);
528 if (ret < 0) {
529 ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
530 return ret;
531 }
532
533 bucket_topics.topics.erase(topic_name);
534
535 ret = write_topics(bucket_topics, &objv_tracker);
536 if (ret < 0) {
537 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
538 return ret;
539 }
540
541 return 0;
542 }
543
544 int RGWUserPubSub::create_topic(const string& name) {
545 return create_topic(name, rgw_pubsub_sub_dest(), "");
546 }
547
548 int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn) {
549 RGWObjVersionTracker objv_tracker;
550 rgw_pubsub_user_topics topics;
551
552 int ret = read_user_topics(&topics, &objv_tracker);
553 if (ret < 0 && ret != -ENOENT) {
554 // its not an error if not topics exist, we create one
555 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
556 return ret;
557 }
558
559 rgw_pubsub_topic_subs& new_topic = topics.topics[name];
560 new_topic.topic.user = user;
561 new_topic.topic.name = name;
562 new_topic.topic.dest = dest;
563 new_topic.topic.arn = arn;
564
565 ret = write_user_topics(topics, &objv_tracker);
566 if (ret < 0) {
567 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
568 return ret;
569 }
570
571 return 0;
572 }
573
574 int RGWUserPubSub::remove_topic(const string& name)
575 {
576 RGWObjVersionTracker objv_tracker;
577 rgw_pubsub_user_topics topics;
578
579 int ret = read_user_topics(&topics, &objv_tracker);
580 if (ret < 0 && ret != -ENOENT) {
581 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
582 return ret;
583 } else if (ret == -ENOENT) {
584 // its not an error if no topics exist, just a no-op
585 ldout(store->ctx(), 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret << dendl;
586 return 0;
587 }
588
589 topics.topics.erase(name);
590
591 ret = write_user_topics(topics, &objv_tracker);
592 if (ret < 0) {
593 ldout(store->ctx(), 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl;
594 return ret;
595 }
596
597 return 0;
598 }
599
600 int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
601 {
602 int ret = ps->read(sub_meta_obj, result, objv_tracker);
603 if (ret < 0 && ret != -ENOENT) {
604 ldout(ps->store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
605 return ret;
606 }
607 return 0;
608 }
609
610 int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker)
611 {
612 int ret = ps->write(sub_meta_obj, sub_conf, objv_tracker);
613 if (ret < 0) {
614 ldout(ps->store->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
615 return ret;
616 }
617
618 return 0;
619 }
620
621 int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker)
622 {
623 int ret = ps->remove(sub_meta_obj, objv_tracker);
624 if (ret < 0) {
625 ldout(ps->store->ctx(), 1) << "ERROR: failed to remove subscription info: ret=" << ret << dendl;
626 return ret;
627 }
628
629 return 0;
630 }
631
632 int RGWUserPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
633 {
634 return read_sub(result, nullptr);
635 }
636
637 int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, const std::string& s3_id)
638 {
639 RGWObjVersionTracker user_objv_tracker;
640 rgw_pubsub_user_topics topics;
641 RGWRados *store = ps->store;
642
643 int ret = ps->read_user_topics(&topics, &user_objv_tracker);
644 if (ret < 0) {
645 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
646 return ret != -ENOENT ? ret : -EINVAL;
647 }
648
649 auto iter = topics.topics.find(topic);
650 if (iter == topics.topics.end()) {
651 ldout(store->ctx(), 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
652 return -EINVAL;
653 }
654
655 auto& t = iter->second;
656
657 rgw_pubsub_sub_config sub_conf;
658
659 sub_conf.user = ps->user;
660 sub_conf.name = sub;
661 sub_conf.topic = topic;
662 sub_conf.dest = dest;
663 sub_conf.s3_id = s3_id;
664
665 t.subs.insert(sub);
666
667 ret = ps->write_user_topics(topics, &user_objv_tracker);
668 if (ret < 0) {
669 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
670 return ret;
671 }
672
673 ret = write_sub(sub_conf, nullptr);
674 if (ret < 0) {
675 ldout(store->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
676 return ret;
677 }
678 return 0;
679 }
680
681 int RGWUserPubSub::Sub::unsubscribe(const string& _topic)
682 {
683 string topic = _topic;
684 RGWObjVersionTracker sobjv_tracker;
685 RGWRados *store = ps->store;
686
687 if (topic.empty()) {
688 rgw_pubsub_sub_config sub_conf;
689 int ret = read_sub(&sub_conf, &sobjv_tracker);
690 if (ret < 0) {
691 ldout(store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
692 return ret;
693 }
694 topic = sub_conf.topic;
695 }
696
697 RGWObjVersionTracker objv_tracker;
698 rgw_pubsub_user_topics topics;
699
700 int ret = ps->read_user_topics(&topics, &objv_tracker);
701 if (ret < 0) {
702 // not an error - could be that topic was already deleted
703 ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
704 } else {
705 auto iter = topics.topics.find(topic);
706 if (iter != topics.topics.end()) {
707 auto& t = iter->second;
708
709 t.subs.erase(sub);
710
711 ret = ps->write_user_topics(topics, &objv_tracker);
712 if (ret < 0) {
713 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
714 return ret;
715 }
716 }
717 }
718
719 ret = remove_sub(&sobjv_tracker);
720 if (ret < 0) {
721 ldout(store->ctx(), 1) << "ERROR: failed to delete subscription info: ret=" << ret << dendl;
722 return ret;
723 }
724 return 0;
725 }
726
727 template<typename EventType>
728 void RGWUserPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
729 {
730 encode_json("next_marker", next_marker, f);
731 encode_json("is_truncated", is_truncated, f);
732
733 Formatter::ArraySection s(*f, EventType::json_type_plural);
734 for (auto& event : events) {
735 encode_json(EventType::json_type_single, event, f);
736 }
737 }
738
739 template<typename EventType>
740 int RGWUserPubSub::SubWithEvents<EventType>::list_events(const string& marker, int max_events)
741 {
742 RGWRados *store = ps->store;
743 rgw_pubsub_sub_config sub_conf;
744 int ret = get_conf(&sub_conf);
745 if (ret < 0) {
746 ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
747 return ret;
748 }
749
750 RGWBucketInfo bucket_info;
751 string tenant;
752 RGWSysObjectCtx obj_ctx(store->svc.sysobj->init_obj_ctx());
753 ret = store->get_bucket_info(obj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr);
754 if (ret == -ENOENT) {
755 list.is_truncated = false;
756 return 0;
757 }
758 if (ret < 0) {
759 ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
760 return ret;
761 }
762
763 RGWRados::Bucket target(store, bucket_info);
764 RGWRados::Bucket::List list_op(&target);
765
766 list_op.params.prefix = sub_conf.dest.oid_prefix;
767 list_op.params.marker = marker;
768
769 std::vector<rgw_bucket_dir_entry> objs;
770
771 ret = list_op.list_objects(max_events, &objs, nullptr, &list.is_truncated);
772 if (ret < 0) {
773 ldout(store->ctx(), 1) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
774 return ret;
775 }
776 if (list.is_truncated) {
777 list.next_marker = list_op.get_next_marker().name;
778 }
779
780 for (auto& obj : objs) {
781 bufferlist bl64;
782 bufferlist bl;
783 bl64.append(obj.meta.user_data);
784 try {
785 bl.decode_base64(bl64);
786 } catch (buffer::error& err) {
787 ldout(store->ctx(), 1) << "ERROR: failed to event (not a valid base64)" << dendl;
788 continue;
789 }
790 EventType event;
791
792 auto iter = bl.cbegin();
793 try {
794 decode(event, iter);
795 } catch (buffer::error& err) {
796 ldout(store->ctx(), 1) << "ERROR: failed to decode event" << dendl;
797 continue;
798 };
799
800 list.events.push_back(event);
801 }
802 return 0;
803 }
804
805 template<typename EventType>
806 int RGWUserPubSub::SubWithEvents<EventType>::remove_event(const string& event_id)
807 {
808 RGWRados *store = ps->store;
809 rgw_pubsub_sub_config sub_conf;
810 int ret = get_conf(&sub_conf);
811 if (ret < 0) {
812 ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
813 return ret;
814 }
815
816 RGWBucketInfo bucket_info;
817 string tenant;
818 RGWSysObjectCtx sysobj_ctx(store->svc.sysobj->init_obj_ctx());
819 ret = store->get_bucket_info(sysobj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr);
820 if (ret < 0) {
821 ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
822 return ret;
823 }
824
825 rgw_bucket& bucket = bucket_info.bucket;
826
827 RGWObjectCtx obj_ctx(store);
828 rgw_obj obj(bucket, sub_conf.dest.oid_prefix + event_id);
829
830 obj_ctx.set_atomic(obj);
831
832 RGWRados::Object del_target(store, bucket_info, obj_ctx, obj);
833 RGWRados::Object::Delete del_op(&del_target);
834
835 del_op.params.bucket_owner = bucket_info.owner;
836 del_op.params.versioning_status = bucket_info.versioning_status();
837
838 ret = del_op.delete_obj();
839 if (ret < 0) {
840 ldout(store->ctx(), 1) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl;
841 }
842 return 0;
843 }
844
845 template<typename EventType>
846 void RGWUserPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
847 list.dump(f);
848 }
849
850 // explicit instantiation for the only two possible types
851 // no need to move implementation to header
852 template class RGWUserPubSub::SubWithEvents<rgw_pubsub_event>;
853 template class RGWUserPubSub::SubWithEvents<rgw_pubsub_s3_record>;
854