]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_pubsub.cc
f1bacc83710eb452015e501725f966703e2775a0
[ceph.git] / ceph / src / rgw / rgw_pubsub.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "services/svc_zone.h"
5 #include "rgw_b64.h"
6 #include "rgw_sal.h"
7 #include "rgw_sal_rados.h"
8 #include "rgw_pubsub.h"
9 #include "rgw_tools.h"
10 #include "rgw_xml.h"
11 #include "rgw_arn.h"
12 #include "rgw_pubsub_push.h"
13 #include <regex>
14 #include <algorithm>
15
16 #define dout_subsys ceph_subsys_rgw
17
18 void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) {
19 char buf[64];
20 const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str());
21 if (len > 0) {
22 id.assign(buf, len);
23 }
24 }
25
26 bool rgw_s3_key_filter::decode_xml(XMLObj* obj) {
27 XMLObjIter iter = obj->find("FilterRule");
28 XMLObj *o;
29
30 const auto throw_if_missing = true;
31 auto prefix_not_set = true;
32 auto suffix_not_set = true;
33 auto regex_not_set = true;
34 std::string name;
35
36 while ((o = iter.get_next())) {
37 RGWXMLDecoder::decode_xml("Name", name, o, throw_if_missing);
38 if (name == "prefix" && prefix_not_set) {
39 prefix_not_set = false;
40 RGWXMLDecoder::decode_xml("Value", prefix_rule, o, throw_if_missing);
41 } else if (name == "suffix" && suffix_not_set) {
42 suffix_not_set = false;
43 RGWXMLDecoder::decode_xml("Value", suffix_rule, o, throw_if_missing);
44 } else if (name == "regex" && regex_not_set) {
45 regex_not_set = false;
46 RGWXMLDecoder::decode_xml("Value", regex_rule, o, throw_if_missing);
47 } else {
48 throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name + "'");
49 }
50 }
51 return true;
52 }
53
54 void rgw_s3_key_filter::dump_xml(Formatter *f) const {
55 if (!prefix_rule.empty()) {
56 f->open_object_section("FilterRule");
57 ::encode_xml("Name", "prefix", f);
58 ::encode_xml("Value", prefix_rule, f);
59 f->close_section();
60 }
61 if (!suffix_rule.empty()) {
62 f->open_object_section("FilterRule");
63 ::encode_xml("Name", "suffix", f);
64 ::encode_xml("Value", suffix_rule, f);
65 f->close_section();
66 }
67 if (!regex_rule.empty()) {
68 f->open_object_section("FilterRule");
69 ::encode_xml("Name", "regex", f);
70 ::encode_xml("Value", regex_rule, f);
71 f->close_section();
72 }
73 }
74
75 bool rgw_s3_key_filter::has_content() const {
76 return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty());
77 }
78
79 bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) {
80 kv.clear();
81 XMLObjIter iter = obj->find("FilterRule");
82 XMLObj *o;
83
84 const auto throw_if_missing = true;
85
86 std::string key;
87 std::string value;
88
89 while ((o = iter.get_next())) {
90 RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing);
91 RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing);
92 kv.emplace(key, value);
93 }
94 return true;
95 }
96
97 void rgw_s3_key_value_filter::dump_xml(Formatter *f) const {
98 for (const auto& key_value : kv) {
99 f->open_object_section("FilterRule");
100 ::encode_xml("Name", key_value.first, f);
101 ::encode_xml("Value", key_value.second, f);
102 f->close_section();
103 }
104 }
105
106 bool rgw_s3_key_value_filter::has_content() const {
107 return !kv.empty();
108 }
109
110 bool rgw_s3_filter::decode_xml(XMLObj* obj) {
111 RGWXMLDecoder::decode_xml("S3Key", key_filter, obj);
112 RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj);
113 RGWXMLDecoder::decode_xml("S3Tags", tag_filter, obj);
114 return true;
115 }
116
117 void rgw_s3_filter::dump_xml(Formatter *f) const {
118 if (key_filter.has_content()) {
119 ::encode_xml("S3Key", key_filter, f);
120 }
121 if (metadata_filter.has_content()) {
122 ::encode_xml("S3Metadata", metadata_filter, f);
123 }
124 if (tag_filter.has_content()) {
125 ::encode_xml("S3Tags", tag_filter, f);
126 }
127 }
128
129 bool rgw_s3_filter::has_content() const {
130 return key_filter.has_content() ||
131 metadata_filter.has_content() ||
132 tag_filter.has_content();
133 }
134
135 bool match(const rgw_s3_key_filter& filter, const std::string& key) {
136 const auto key_size = key.size();
137 const auto prefix_size = filter.prefix_rule.size();
138 if (prefix_size != 0) {
139 // prefix rule exists
140 if (prefix_size > key_size) {
141 // if prefix is longer than key, we fail
142 return false;
143 }
144 if (!std::equal(filter.prefix_rule.begin(), filter.prefix_rule.end(), key.begin())) {
145 return false;
146 }
147 }
148 const auto suffix_size = filter.suffix_rule.size();
149 if (suffix_size != 0) {
150 // suffix rule exists
151 if (suffix_size > key_size) {
152 // if suffix is longer than key, we fail
153 return false;
154 }
155 if (!std::equal(filter.suffix_rule.begin(), filter.suffix_rule.end(), (key.end() - suffix_size))) {
156 return false;
157 }
158 }
159 if (!filter.regex_rule.empty()) {
160 // TODO add regex chaching in the filter
161 const std::regex base_regex(filter.regex_rule);
162 if (!std::regex_match(key, base_regex)) {
163 return false;
164 }
165 }
166 return true;
167 }
168
169 bool match(const rgw_s3_key_value_filter& filter, const KeyValueMap& kv) {
170 // all filter pairs must exist with the same value in the object's metadata/tags
171 // object metadata/tags may include items not in the filter
172 return std::includes(kv.begin(), kv.end(), filter.kv.begin(), filter.kv.end());
173 }
174
175 bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) {
176 // if event list exists, and none of the events in the list matches the event type, filter the message
177 if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) {
178 return false;
179 }
180 return true;
181 }
182
183 void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj) {
184 l.clear();
185
186 XMLObjIter iter = obj->find(name);
187 XMLObj *o;
188
189 while ((o = iter.get_next())) {
190 std::string val;
191 decode_xml_obj(val, o);
192 l.push_back(rgw::notify::from_string(val));
193 }
194 }
195
196 bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) {
197 const auto throw_if_missing = true;
198 RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing);
199
200 RGWXMLDecoder::decode_xml("Topic", topic_arn, obj, throw_if_missing);
201
202 RGWXMLDecoder::decode_xml("Filter", filter, obj);
203
204 do_decode_xml_obj(events, "Event", obj);
205 if (events.empty()) {
206 // if no events are provided, we assume all events
207 events.push_back(rgw::notify::ObjectCreated);
208 events.push_back(rgw::notify::ObjectRemoved);
209 }
210 return true;
211 }
212
213 void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const {
214 ::encode_xml("Id", id, f);
215 ::encode_xml("Topic", topic_arn.c_str(), f);
216 if (filter.has_content()) {
217 ::encode_xml("Filter", filter, f);
218 }
219 for (const auto& event : events) {
220 ::encode_xml("Event", rgw::notify::to_string(event), f);
221 }
222 }
223
224 bool rgw_pubsub_s3_notifications::decode_xml(XMLObj *obj) {
225 do_decode_xml_obj(list, "TopicConfiguration", obj);
226 if (list.empty()) {
227 throw RGWXMLDecoder::err("at least one 'TopicConfiguration' must exist");
228 }
229 return true;
230 }
231
232 rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter) :
233 id(topic_filter.s3_id), events(topic_filter.events), topic_arn(topic_filter.topic.arn), filter(topic_filter.s3_filter) {}
234
235 void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const {
236 do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f);
237 }
238
239 void rgw_pubsub_s3_event::dump(Formatter *f) const {
240 encode_json("eventVersion", eventVersion, f);
241 encode_json("eventSource", eventSource, f);
242 encode_json("awsRegion", awsRegion, f);
243 utime_t ut(eventTime);
244 encode_json("eventTime", ut, f);
245 encode_json("eventName", eventName, f);
246 {
247 Formatter::ObjectSection s(*f, "userIdentity");
248 encode_json("principalId", userIdentity, f);
249 }
250 {
251 Formatter::ObjectSection s(*f, "requestParameters");
252 encode_json("sourceIPAddress", sourceIPAddress, f);
253 }
254 {
255 Formatter::ObjectSection s(*f, "responseElements");
256 encode_json("x-amz-request-id", x_amz_request_id, f);
257 encode_json("x-amz-id-2", x_amz_id_2, f);
258 }
259 {
260 Formatter::ObjectSection s(*f, "s3");
261 encode_json("s3SchemaVersion", s3SchemaVersion, f);
262 encode_json("configurationId", configurationId, f);
263 {
264 Formatter::ObjectSection sub_s(*f, "bucket");
265 encode_json("name", bucket_name, f);
266 {
267 Formatter::ObjectSection sub_sub_s(*f, "ownerIdentity");
268 encode_json("principalId", bucket_ownerIdentity, f);
269 }
270 encode_json("arn", bucket_arn, f);
271 encode_json("id", bucket_id, f);
272 }
273 {
274 Formatter::ObjectSection sub_s(*f, "object");
275 encode_json("key", object_key, f);
276 encode_json("size", object_size, f);
277 encode_json("etag", object_etag, f);
278 encode_json("versionId", object_versionId, f);
279 encode_json("sequencer", object_sequencer, f);
280 encode_json("metadata", x_meta_map, f);
281 encode_json("tags", tags, f);
282 }
283 }
284 encode_json("eventId", id, f);
285 encode_json("opaqueData", opaque_data, f);
286 }
287
288 void rgw_pubsub_event::dump(Formatter *f) const
289 {
290 encode_json("id", id, f);
291 encode_json("event", event_name, f);
292 utime_t ut(timestamp);
293 encode_json("timestamp", ut, f);
294 encode_json("info", info, f);
295 }
296
297 void rgw_pubsub_topic::dump(Formatter *f) const
298 {
299 encode_json("user", user, f);
300 encode_json("name", name, f);
301 encode_json("dest", dest, f);
302 encode_json("arn", arn, f);
303 encode_json("opaqueData", opaque_data, f);
304 }
305
306 void rgw_pubsub_topic::dump_xml(Formatter *f) const
307 {
308 encode_xml("User", user, f);
309 encode_xml("Name", name, f);
310 encode_xml("EndPoint", dest, f);
311 encode_xml("TopicArn", arn, f);
312 encode_xml("OpaqueData", opaque_data, f);
313 }
314
315 void encode_xml_key_value_entry(const std::string& key, const std::string& value, Formatter *f) {
316 f->open_object_section("entry");
317 encode_xml("key", key, f);
318 encode_xml("value", value, f);
319 f->close_section(); // entry
320 }
321
322 void rgw_pubsub_topic::dump_xml_as_attributes(Formatter *f) const
323 {
324 f->open_array_section("Attributes");
325 std::string str_user;
326 user.to_str(str_user);
327 encode_xml_key_value_entry("User", str_user, f);
328 encode_xml_key_value_entry("Name", name, f);
329 encode_xml_key_value_entry("EndPoint", dest.to_json_str(), f);
330 encode_xml_key_value_entry("TopicArn", arn, f);
331 encode_xml_key_value_entry("OpaqueData", opaque_data, f);
332 f->close_section(); // Attributes
333 }
334
335 void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f)
336 {
337 f->open_array_section(name);
338 for (auto iter = l.cbegin(); iter != l.cend(); ++iter) {
339 f->dump_string("obj", rgw::notify::to_ceph_string(*iter));
340 }
341 f->close_section();
342 }
343
344 void rgw_pubsub_topic_filter::dump(Formatter *f) const
345 {
346 encode_json("topic", topic, f);
347 encode_json("events", events, f);
348 }
349
350 void rgw_pubsub_topic_subs::dump(Formatter *f) const
351 {
352 encode_json("topic", topic, f);
353 encode_json("subs", subs, f);
354 }
355
356 void rgw_pubsub_bucket_topics::dump(Formatter *f) const
357 {
358 Formatter::ArraySection s(*f, "topics");
359 for (auto& t : topics) {
360 encode_json(t.first.c_str(), t.second, f);
361 }
362 }
363
364 void rgw_pubsub_topics::dump(Formatter *f) const
365 {
366 Formatter::ArraySection s(*f, "topics");
367 for (auto& t : topics) {
368 encode_json(t.first.c_str(), t.second, f);
369 }
370 }
371
372 void rgw_pubsub_topics::dump_xml(Formatter *f) const
373 {
374 for (auto& t : topics) {
375 encode_xml("member", t.second.topic, f);
376 }
377 }
378
379 void rgw_pubsub_sub_dest::dump(Formatter *f) const
380 {
381 encode_json("bucket_name", bucket_name, f);
382 encode_json("oid_prefix", oid_prefix, f);
383 encode_json("push_endpoint", push_endpoint, f);
384 encode_json("push_endpoint_args", push_endpoint_args, f);
385 encode_json("push_endpoint_topic", arn_topic, f);
386 encode_json("stored_secret", stored_secret, f);
387 encode_json("persistent", persistent, f);
388 }
389
390 void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const
391 {
392 // first 2 members are omitted here since they
393 // dont apply to AWS compliant topics
394 encode_xml("EndpointAddress", push_endpoint, f);
395 encode_xml("EndpointArgs", push_endpoint_args, f);
396 encode_xml("EndpointTopic", arn_topic, f);
397 encode_xml("HasStoredSecret", stored_secret, f);
398 encode_xml("Persistent", persistent, f);
399 }
400
401 std::string rgw_pubsub_sub_dest::to_json_str() const
402 {
403 // first 2 members are omitted here since they
404 // dont apply to AWS compliant topics
405 JSONFormatter f;
406 f.open_object_section("");
407 encode_json("EndpointAddress", push_endpoint, &f);
408 encode_json("EndpointArgs", push_endpoint_args, &f);
409 encode_json("EndpointTopic", arn_topic, &f);
410 encode_json("HasStoredSecret", stored_secret, &f);
411 encode_json("Persistent", persistent, &f);
412 f.close_section();
413 std::stringstream ss;
414 f.flush(ss);
415 return ss.str();
416 }
417
418 void rgw_pubsub_sub_config::dump(Formatter *f) const
419 {
420 encode_json("user", user, f);
421 encode_json("name", name, f);
422 encode_json("topic", topic, f);
423 encode_json("dest", dest, f);
424 encode_json("s3_id", s3_id, f);
425 }
426
427 RGWPubSub::RGWPubSub(rgw::sal::RGWRadosStore* _store, const std::string& _tenant) :
428 store(_store),
429 tenant(_tenant),
430 obj_ctx(store->svc()->sysobj->init_obj_ctx()) {
431 get_meta_obj(&meta_obj);
432 }
433
434 int RGWPubSub::remove(const rgw_raw_obj& obj,
435 RGWObjVersionTracker *objv_tracker,
436 optional_yield y)
437 {
438 int ret = rgw_delete_system_obj(store->svc()->sysobj, obj.pool, obj.oid, objv_tracker, y);
439 if (ret < 0) {
440 return ret;
441 }
442
443 return 0;
444 }
445
446 int RGWPubSub::read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker *objv_tracker)
447 {
448 int ret = read(meta_obj, result, objv_tracker);
449 if (ret < 0) {
450 ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
451 return ret;
452 }
453 return 0;
454 }
455
456 int RGWPubSub::write_topics(const rgw_pubsub_topics& topics,
457 RGWObjVersionTracker *objv_tracker, optional_yield y)
458 {
459 int ret = write(meta_obj, topics, objv_tracker, y);
460 if (ret < 0 && ret != -ENOENT) {
461 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
462 return ret;
463 }
464 return 0;
465 }
466
467 int RGWPubSub::get_topics(rgw_pubsub_topics *result)
468 {
469 return read_topics(result, nullptr);
470 }
471
472 int RGWPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
473 {
474 int ret = ps->read(bucket_meta_obj, result, objv_tracker);
475 if (ret < 0 && ret != -ENOENT) {
476 ldout(ps->store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
477 return ret;
478 }
479 return 0;
480 }
481
482 int RGWPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics,
483 RGWObjVersionTracker *objv_tracker,
484 optional_yield y)
485 {
486 int ret = ps->write(bucket_meta_obj, topics, objv_tracker, y);
487 if (ret < 0) {
488 ldout(ps->store->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
489 return ret;
490 }
491
492 return 0;
493 }
494
495 int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
496 {
497 return read_topics(result, nullptr);
498 }
499
500 int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
501 {
502 rgw_pubsub_topics topics;
503 int ret = get_topics(&topics);
504 if (ret < 0) {
505 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
506 return ret;
507 }
508
509 auto iter = topics.topics.find(name);
510 if (iter == topics.topics.end()) {
511 ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
512 return -ENOENT;
513 }
514
515 *result = iter->second;
516 return 0;
517 }
518
519 int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
520 {
521 rgw_pubsub_topics topics;
522 int ret = get_topics(&topics);
523 if (ret < 0) {
524 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
525 return ret;
526 }
527
528 auto iter = topics.topics.find(name);
529 if (iter == topics.topics.end()) {
530 ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
531 return -ENOENT;
532 }
533
534 *result = iter->second.topic;
535 return 0;
536 }
537
538 int RGWPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y) {
539 return create_notification(topic_name, events, std::nullopt, "", y);
540 }
541
542 int RGWPubSub::Bucket::create_notification(const string& topic_name,const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) {
543 rgw_pubsub_topic_subs topic_info;
544 rgw::sal::RGWRadosStore *store = ps->store;
545
546 int ret = ps->get_topic(topic_name, &topic_info);
547 if (ret < 0) {
548 ldout(store->ctx(), 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
549 return ret;
550 }
551 ldout(store->ctx(), 20) << "successfully read topic '" << topic_name << "' info" << dendl;
552
553 RGWObjVersionTracker objv_tracker;
554 rgw_pubsub_bucket_topics bucket_topics;
555
556 ret = read_topics(&bucket_topics, &objv_tracker);
557 if (ret < 0) {
558 ldout(store->ctx(), 1) << "ERROR: failed to read topics from bucket '" <<
559 bucket.name << "': ret=" << ret << dendl;
560 return ret;
561 }
562 ldout(store->ctx(), 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" <<
563 bucket.name << "'" << dendl;
564
565 auto& topic_filter = bucket_topics.topics[topic_name];
566 topic_filter.topic = topic_info.topic;
567 topic_filter.events = events;
568 topic_filter.s3_id = notif_name;
569 if (s3_filter) {
570 topic_filter.s3_filter = *s3_filter;
571 }
572
573 ret = write_topics(bucket_topics, &objv_tracker, y);
574 if (ret < 0) {
575 ldout(store->ctx(), 1) << "ERROR: failed to write topics to bucket '" << bucket.name << "': ret=" << ret << dendl;
576 return ret;
577 }
578
579 ldout(store->ctx(), 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket.name << "'" << dendl;
580
581 return 0;
582 }
583
584 int RGWPubSub::Bucket::remove_notification(const string& topic_name, optional_yield y)
585 {
586 rgw_pubsub_topic_subs topic_info;
587 rgw::sal::RGWRadosStore *store = ps->store;
588
589 int ret = ps->get_topic(topic_name, &topic_info);
590 if (ret < 0) {
591 ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl;
592 return ret;
593 }
594
595 RGWObjVersionTracker objv_tracker;
596 rgw_pubsub_bucket_topics bucket_topics;
597
598 ret = read_topics(&bucket_topics, &objv_tracker);
599 if (ret < 0) {
600 ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
601 return ret;
602 }
603
604 bucket_topics.topics.erase(topic_name);
605
606 ret = write_topics(bucket_topics, &objv_tracker, y);
607 if (ret < 0) {
608 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
609 return ret;
610 }
611
612 return 0;
613 }
614
615 int RGWPubSub::Bucket::remove_notifications(optional_yield y)
616 {
617 // get all topics on a bucket
618 rgw_pubsub_bucket_topics bucket_topics;
619 auto ret = get_topics(&bucket_topics);
620 if (ret < 0 && ret != -ENOENT) {
621 ldout(ps->store->ctx(), 1) << "ERROR: failed to get list of topics from bucket '" << bucket.name << "', ret=" << ret << dendl;
622 return ret ;
623 }
624
625 // remove all auto-genrated topics
626 for (const auto& topic : bucket_topics.topics) {
627 const auto& topic_name = topic.first;
628 ret = ps->remove_topic(topic_name, y);
629 if (ret < 0 && ret != -ENOENT) {
630 ldout(ps->store->ctx(), 5) << "WARNING: failed to remove auto-generated topic '" << topic_name << "', ret=" << ret << dendl;
631 }
632 }
633
634 // delete all notification of on a bucket
635 ret = ps->remove(bucket_meta_obj, nullptr, y);
636 if (ret < 0 && ret != -ENOENT) {
637 ldout(ps->store->ctx(), 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl;
638 return ret;
639 }
640
641 return 0;
642 }
643
644 int RGWPubSub::create_topic(const string& name, optional_yield y) {
645 return create_topic(name, rgw_pubsub_sub_dest(), "", "", y);
646 }
647
648 int RGWPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) {
649 RGWObjVersionTracker objv_tracker;
650 rgw_pubsub_topics topics;
651
652 int ret = read_topics(&topics, &objv_tracker);
653 if (ret < 0 && ret != -ENOENT) {
654 // its not an error if not topics exist, we create one
655 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
656 return ret;
657 }
658
659 rgw_pubsub_topic_subs& new_topic = topics.topics[name];
660 new_topic.topic.user = rgw_user("", tenant);
661 new_topic.topic.name = name;
662 new_topic.topic.dest = dest;
663 new_topic.topic.arn = arn;
664 new_topic.topic.opaque_data = opaque_data;
665
666 ret = write_topics(topics, &objv_tracker, y);
667 if (ret < 0) {
668 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
669 return ret;
670 }
671
672 return 0;
673 }
674
675 int RGWPubSub::remove_topic(const string& name, optional_yield y)
676 {
677 RGWObjVersionTracker objv_tracker;
678 rgw_pubsub_topics topics;
679
680 int ret = read_topics(&topics, &objv_tracker);
681 if (ret < 0 && ret != -ENOENT) {
682 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
683 return ret;
684 } else if (ret == -ENOENT) {
685 // its not an error if no topics exist, just a no-op
686 ldout(store->ctx(), 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret << dendl;
687 return 0;
688 }
689
690 topics.topics.erase(name);
691
692 ret = write_topics(topics, &objv_tracker, y);
693 if (ret < 0) {
694 ldout(store->ctx(), 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl;
695 return ret;
696 }
697
698 return 0;
699 }
700
701 int RGWPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
702 {
703 int ret = ps->read(sub_meta_obj, result, objv_tracker);
704 if (ret < 0 && ret != -ENOENT) {
705 ldout(ps->store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
706 return ret;
707 }
708 return 0;
709 }
710
711 int RGWPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf,
712 RGWObjVersionTracker *objv_tracker,
713 optional_yield y)
714 {
715 int ret = ps->write(sub_meta_obj, sub_conf, objv_tracker, y);
716 if (ret < 0) {
717 ldout(ps->store->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
718 return ret;
719 }
720
721 return 0;
722 }
723
724 int RGWPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker,
725 optional_yield y)
726 {
727 int ret = ps->remove(sub_meta_obj, objv_tracker, y);
728 if (ret < 0) {
729 ldout(ps->store->ctx(), 1) << "ERROR: failed to remove subscription info: ret=" << ret << dendl;
730 return ret;
731 }
732
733 return 0;
734 }
735
736 int RGWPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
737 {
738 return read_sub(result, nullptr);
739 }
740
741 int RGWPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, optional_yield y, const std::string& s3_id)
742 {
743 RGWObjVersionTracker objv_tracker;
744 rgw_pubsub_topics topics;
745 rgw::sal::RGWRadosStore *store = ps->store;
746
747 int ret = ps->read_topics(&topics, &objv_tracker);
748 if (ret < 0) {
749 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
750 return ret != -ENOENT ? ret : -EINVAL;
751 }
752
753 auto iter = topics.topics.find(topic);
754 if (iter == topics.topics.end()) {
755 ldout(store->ctx(), 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
756 return -EINVAL;
757 }
758
759 auto& t = iter->second;
760
761 rgw_pubsub_sub_config sub_conf;
762
763 sub_conf.user = rgw_user("", ps->tenant);
764 sub_conf.name = sub;
765 sub_conf.topic = topic;
766 sub_conf.dest = dest;
767 sub_conf.s3_id = s3_id;
768
769 t.subs.insert(sub);
770
771 ret = ps->write_topics(topics, &objv_tracker, y);
772 if (ret < 0) {
773 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
774 return ret;
775 }
776
777 ret = write_sub(sub_conf, nullptr, y);
778 if (ret < 0) {
779 ldout(store->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
780 return ret;
781 }
782 return 0;
783 }
784
785 int RGWPubSub::Sub::unsubscribe(const string& _topic, optional_yield y)
786 {
787 string topic = _topic;
788 RGWObjVersionTracker sobjv_tracker;
789 rgw::sal::RGWRadosStore *store = ps->store;
790
791 if (topic.empty()) {
792 rgw_pubsub_sub_config sub_conf;
793 int ret = read_sub(&sub_conf, &sobjv_tracker);
794 if (ret < 0) {
795 ldout(store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
796 return ret;
797 }
798 topic = sub_conf.topic;
799 }
800
801 RGWObjVersionTracker objv_tracker;
802 rgw_pubsub_topics topics;
803
804 int ret = ps->read_topics(&topics, &objv_tracker);
805 if (ret < 0) {
806 // not an error - could be that topic was already deleted
807 ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
808 } else {
809 auto iter = topics.topics.find(topic);
810 if (iter != topics.topics.end()) {
811 auto& t = iter->second;
812
813 t.subs.erase(sub);
814
815 ret = ps->write_topics(topics, &objv_tracker, y);
816 if (ret < 0) {
817 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
818 return ret;
819 }
820 }
821 }
822
823 ret = remove_sub(&sobjv_tracker, y);
824 if (ret < 0) {
825 ldout(store->ctx(), 1) << "ERROR: failed to delete subscription info: ret=" << ret << dendl;
826 return ret;
827 }
828 return 0;
829 }
830
831 template<typename EventType>
832 void RGWPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
833 {
834 encode_json("next_marker", next_marker, f);
835 encode_json("is_truncated", is_truncated, f);
836
837 Formatter::ArraySection s(*f, EventType::json_type_plural);
838 for (auto& event : events) {
839 encode_json("", event, f);
840 }
841 }
842
843 template<typename EventType>
844 int RGWPubSub::SubWithEvents<EventType>::list_events(const string& marker, int max_events)
845 {
846 RGWRados *store = ps->store->getRados();
847 rgw_pubsub_sub_config sub_conf;
848 int ret = get_conf(&sub_conf);
849 if (ret < 0) {
850 ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
851 return ret;
852 }
853
854 RGWBucketInfo bucket_info;
855 string tenant;
856 ret = store->get_bucket_info(&store->svc, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr);
857 if (ret == -ENOENT) {
858 list.is_truncated = false;
859 return 0;
860 }
861 if (ret < 0) {
862 ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
863 return ret;
864 }
865
866 RGWRados::Bucket target(store, bucket_info);
867 RGWRados::Bucket::List list_op(&target);
868
869 list_op.params.prefix = sub_conf.dest.oid_prefix;
870 list_op.params.marker = marker;
871
872 std::vector<rgw_bucket_dir_entry> objs;
873
874 ret = list_op.list_objects(max_events, &objs, nullptr, &list.is_truncated, null_yield);
875 if (ret < 0) {
876 ldout(store->ctx(), 1) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
877 return ret;
878 }
879 if (list.is_truncated) {
880 list.next_marker = list_op.get_next_marker().name;
881 }
882
883 for (auto& obj : objs) {
884 bufferlist bl64;
885 bufferlist bl;
886 bl64.append(obj.meta.user_data);
887 try {
888 bl.decode_base64(bl64);
889 } catch (buffer::error& err) {
890 ldout(store->ctx(), 1) << "ERROR: failed to event (not a valid base64)" << dendl;
891 continue;
892 }
893 EventType event;
894
895 auto iter = bl.cbegin();
896 try {
897 decode(event, iter);
898 } catch (buffer::error& err) {
899 ldout(store->ctx(), 1) << "ERROR: failed to decode event" << dendl;
900 continue;
901 };
902
903 list.events.push_back(event);
904 }
905 return 0;
906 }
907
908 template<typename EventType>
909 int RGWPubSub::SubWithEvents<EventType>::remove_event(const string& event_id)
910 {
911 rgw::sal::RGWRadosStore *store = ps->store;
912 rgw_pubsub_sub_config sub_conf;
913 int ret = get_conf(&sub_conf);
914 if (ret < 0) {
915 ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
916 return ret;
917 }
918
919 RGWBucketInfo bucket_info;
920 string tenant;
921 ret = store->getRados()->get_bucket_info(store->svc(), tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr);
922 if (ret < 0) {
923 ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
924 return ret;
925 }
926
927 rgw_bucket& bucket = bucket_info.bucket;
928
929 RGWObjectCtx obj_ctx(store);
930 rgw_obj obj(bucket, sub_conf.dest.oid_prefix + event_id);
931
932 obj_ctx.set_atomic(obj);
933
934 RGWRados::Object del_target(store->getRados(), bucket_info, obj_ctx, obj);
935 RGWRados::Object::Delete del_op(&del_target);
936
937 del_op.params.bucket_owner = bucket_info.owner;
938 del_op.params.versioning_status = bucket_info.versioning_status();
939
940 ret = del_op.delete_obj(null_yield);
941 if (ret < 0) {
942 ldout(store->ctx(), 1) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl;
943 }
944 return 0;
945 }
946
947 void RGWPubSub::get_meta_obj(rgw_raw_obj *obj) const {
948 *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, meta_oid());
949 }
950
951 void RGWPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
952 *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
953 }
954
955 void RGWPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
956 *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
957 }
958
959 template<typename EventType>
960 void RGWPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
961 list.dump(f);
962 }
963
964 // explicit instantiation for the only two possible types
965 // no need to move implementation to header
966 template class RGWPubSub::SubWithEvents<rgw_pubsub_event>;
967 template class RGWPubSub::SubWithEvents<rgw_pubsub_s3_event>;
968