]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_pubsub.cc
import ceph 16.2.7
[ceph.git] / ceph / src / rgw / rgw_pubsub.cc
CommitLineData
eafe8130
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4#include "services/svc_zone.h"
11fdf7f2 5#include "rgw_b64.h"
9f95a23c 6#include "rgw_sal.h"
f67539c2 7#include "rgw_sal_rados.h"
11fdf7f2
TL
8#include "rgw_pubsub.h"
9#include "rgw_tools.h"
eafe8130
TL
10#include "rgw_xml.h"
11#include "rgw_arn.h"
12#include "rgw_pubsub_push.h"
eafe8130
TL
13#include <regex>
14#include <algorithm>
11fdf7f2
TL
15
16#define dout_subsys ceph_subsys_rgw
17
92f5a8d4
TL
18void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) {
19 char buf[64];
20 const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str());
21 if (len > 0) {
22 id.assign(buf, len);
23 }
24}
25
eafe8130
TL
26bool rgw_s3_key_filter::decode_xml(XMLObj* obj) {
27 XMLObjIter iter = obj->find("FilterRule");
28 XMLObj *o;
29
30 const auto throw_if_missing = true;
31 auto prefix_not_set = true;
32 auto suffix_not_set = true;
33 auto regex_not_set = true;
34 std::string name;
35
36 while ((o = iter.get_next())) {
37 RGWXMLDecoder::decode_xml("Name", name, o, throw_if_missing);
38 if (name == "prefix" && prefix_not_set) {
39 prefix_not_set = false;
40 RGWXMLDecoder::decode_xml("Value", prefix_rule, o, throw_if_missing);
41 } else if (name == "suffix" && suffix_not_set) {
42 suffix_not_set = false;
43 RGWXMLDecoder::decode_xml("Value", suffix_rule, o, throw_if_missing);
44 } else if (name == "regex" && regex_not_set) {
45 regex_not_set = false;
46 RGWXMLDecoder::decode_xml("Value", regex_rule, o, throw_if_missing);
47 } else {
48 throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name + "'");
49 }
50 }
51 return true;
52}
53
54void rgw_s3_key_filter::dump_xml(Formatter *f) const {
55 if (!prefix_rule.empty()) {
56 f->open_object_section("FilterRule");
57 ::encode_xml("Name", "prefix", f);
58 ::encode_xml("Value", prefix_rule, f);
59 f->close_section();
60 }
61 if (!suffix_rule.empty()) {
62 f->open_object_section("FilterRule");
63 ::encode_xml("Name", "suffix", f);
64 ::encode_xml("Value", suffix_rule, f);
65 f->close_section();
66 }
67 if (!regex_rule.empty()) {
68 f->open_object_section("FilterRule");
69 ::encode_xml("Name", "regex", f);
70 ::encode_xml("Value", regex_rule, f);
71 f->close_section();
72 }
73}
74
75bool rgw_s3_key_filter::has_content() const {
76 return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty());
77}
78
9f95a23c 79bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) {
f67539c2 80 kv.clear();
eafe8130
TL
81 XMLObjIter iter = obj->find("FilterRule");
82 XMLObj *o;
83
84 const auto throw_if_missing = true;
85
86 std::string key;
87 std::string value;
88
89 while ((o = iter.get_next())) {
90 RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing);
91 RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing);
f67539c2 92 kv.emplace(key, value);
eafe8130
TL
93 }
94 return true;
95}
96
9f95a23c 97void rgw_s3_key_value_filter::dump_xml(Formatter *f) const {
f67539c2 98 for (const auto& key_value : kv) {
eafe8130
TL
99 f->open_object_section("FilterRule");
100 ::encode_xml("Name", key_value.first, f);
101 ::encode_xml("Value", key_value.second, f);
102 f->close_section();
103 }
104}
105
9f95a23c 106bool rgw_s3_key_value_filter::has_content() const {
f67539c2 107 return !kv.empty();
eafe8130
TL
108}
109
110bool rgw_s3_filter::decode_xml(XMLObj* obj) {
111 RGWXMLDecoder::decode_xml("S3Key", key_filter, obj);
112 RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj);
9f95a23c 113 RGWXMLDecoder::decode_xml("S3Tags", tag_filter, obj);
eafe8130
TL
114 return true;
115}
116
117void rgw_s3_filter::dump_xml(Formatter *f) const {
118 if (key_filter.has_content()) {
119 ::encode_xml("S3Key", key_filter, f);
120 }
121 if (metadata_filter.has_content()) {
122 ::encode_xml("S3Metadata", metadata_filter, f);
123 }
9f95a23c
TL
124 if (tag_filter.has_content()) {
125 ::encode_xml("S3Tags", tag_filter, f);
126 }
eafe8130
TL
127}
128
129bool rgw_s3_filter::has_content() const {
130 return key_filter.has_content() ||
9f95a23c
TL
131 metadata_filter.has_content() ||
132 tag_filter.has_content();
eafe8130
TL
133}
134
135bool match(const rgw_s3_key_filter& filter, const std::string& key) {
136 const auto key_size = key.size();
137 const auto prefix_size = filter.prefix_rule.size();
138 if (prefix_size != 0) {
139 // prefix rule exists
140 if (prefix_size > key_size) {
141 // if prefix is longer than key, we fail
142 return false;
143 }
144 if (!std::equal(filter.prefix_rule.begin(), filter.prefix_rule.end(), key.begin())) {
145 return false;
146 }
147 }
148 const auto suffix_size = filter.suffix_rule.size();
149 if (suffix_size != 0) {
150 // suffix rule exists
151 if (suffix_size > key_size) {
152 // if suffix is longer than key, we fail
153 return false;
154 }
155 if (!std::equal(filter.suffix_rule.begin(), filter.suffix_rule.end(), (key.end() - suffix_size))) {
156 return false;
157 }
158 }
159 if (!filter.regex_rule.empty()) {
160 // TODO add regex chaching in the filter
161 const std::regex base_regex(filter.regex_rule);
162 if (!std::regex_match(key, base_regex)) {
163 return false;
164 }
165 }
166 return true;
167}
168
f67539c2 169bool match(const rgw_s3_key_value_filter& filter, const KeyValueMap& kv) {
9f95a23c
TL
170 // all filter pairs must exist with the same value in the object's metadata/tags
171 // object metadata/tags may include items not in the filter
f67539c2 172 return std::includes(kv.begin(), kv.end(), filter.kv.begin(), filter.kv.end());
eafe8130
TL
173}
174
175bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) {
176 // if event list exists, and none of the events in the list matches the event type, filter the message
177 if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) {
178 return false;
179 }
180 return true;
181}
182
183void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj) {
184 l.clear();
185
186 XMLObjIter iter = obj->find(name);
187 XMLObj *o;
188
189 while ((o = iter.get_next())) {
190 std::string val;
191 decode_xml_obj(val, o);
192 l.push_back(rgw::notify::from_string(val));
193 }
194}
195
196bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) {
197 const auto throw_if_missing = true;
198 RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing);
199
200 RGWXMLDecoder::decode_xml("Topic", topic_arn, obj, throw_if_missing);
201
202 RGWXMLDecoder::decode_xml("Filter", filter, obj);
203
204 do_decode_xml_obj(events, "Event", obj);
205 if (events.empty()) {
206 // if no events are provided, we assume all events
207 events.push_back(rgw::notify::ObjectCreated);
208 events.push_back(rgw::notify::ObjectRemoved);
209 }
210 return true;
211}
212
213void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const {
214 ::encode_xml("Id", id, f);
215 ::encode_xml("Topic", topic_arn.c_str(), f);
216 if (filter.has_content()) {
217 ::encode_xml("Filter", filter, f);
218 }
219 for (const auto& event : events) {
220 ::encode_xml("Event", rgw::notify::to_string(event), f);
221 }
222}
223
224bool rgw_pubsub_s3_notifications::decode_xml(XMLObj *obj) {
225 do_decode_xml_obj(list, "TopicConfiguration", obj);
226 if (list.empty()) {
227 throw RGWXMLDecoder::err("at least one 'TopicConfiguration' must exist");
228 }
229 return true;
230}
231
232rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter) :
233 id(topic_filter.s3_id), events(topic_filter.events), topic_arn(topic_filter.topic.arn), filter(topic_filter.s3_filter) {}
234
235void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const {
236 do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f);
237}
238
f67539c2 239void rgw_pubsub_s3_event::dump(Formatter *f) const {
eafe8130
TL
240 encode_json("eventVersion", eventVersion, f);
241 encode_json("eventSource", eventSource, f);
242 encode_json("awsRegion", awsRegion, f);
243 utime_t ut(eventTime);
244 encode_json("eventTime", ut, f);
245 encode_json("eventName", eventName, f);
246 {
247 Formatter::ObjectSection s(*f, "userIdentity");
248 encode_json("principalId", userIdentity, f);
249 }
250 {
251 Formatter::ObjectSection s(*f, "requestParameters");
252 encode_json("sourceIPAddress", sourceIPAddress, f);
253 }
254 {
255 Formatter::ObjectSection s(*f, "responseElements");
256 encode_json("x-amz-request-id", x_amz_request_id, f);
257 encode_json("x-amz-id-2", x_amz_id_2, f);
258 }
259 {
260 Formatter::ObjectSection s(*f, "s3");
261 encode_json("s3SchemaVersion", s3SchemaVersion, f);
262 encode_json("configurationId", configurationId, f);
263 {
264 Formatter::ObjectSection sub_s(*f, "bucket");
265 encode_json("name", bucket_name, f);
266 {
267 Formatter::ObjectSection sub_sub_s(*f, "ownerIdentity");
268 encode_json("principalId", bucket_ownerIdentity, f);
269 }
270 encode_json("arn", bucket_arn, f);
271 encode_json("id", bucket_id, f);
272 }
273 {
274 Formatter::ObjectSection sub_s(*f, "object");
275 encode_json("key", object_key, f);
276 encode_json("size", object_size, f);
a4b75251 277 encode_json("eTag", object_etag, f);
eafe8130
TL
278 encode_json("versionId", object_versionId, f);
279 encode_json("sequencer", object_sequencer, f);
280 encode_json("metadata", x_meta_map, f);
9f95a23c 281 encode_json("tags", tags, f);
eafe8130
TL
282 }
283 }
284 encode_json("eventId", id, f);
9f95a23c 285 encode_json("opaqueData", opaque_data, f);
eafe8130 286}
11fdf7f2
TL
287
288void rgw_pubsub_event::dump(Formatter *f) const
289{
290 encode_json("id", id, f);
eafe8130 291 encode_json("event", event_name, f);
11fdf7f2
TL
292 utime_t ut(timestamp);
293 encode_json("timestamp", ut, f);
294 encode_json("info", info, f);
295}
296
297void rgw_pubsub_topic::dump(Formatter *f) const
298{
299 encode_json("user", user, f);
300 encode_json("name", name, f);
eafe8130
TL
301 encode_json("dest", dest, f);
302 encode_json("arn", arn, f);
9f95a23c 303 encode_json("opaqueData", opaque_data, f);
eafe8130
TL
304}
305
306void rgw_pubsub_topic::dump_xml(Formatter *f) const
307{
308 encode_xml("User", user, f);
309 encode_xml("Name", name, f);
310 encode_xml("EndPoint", dest, f);
311 encode_xml("TopicArn", arn, f);
9f95a23c 312 encode_xml("OpaqueData", opaque_data, f);
eafe8130
TL
313}
314
f67539c2
TL
315void encode_xml_key_value_entry(const std::string& key, const std::string& value, Formatter *f) {
316 f->open_object_section("entry");
317 encode_xml("key", key, f);
318 encode_xml("value", value, f);
319 f->close_section(); // entry
320}
321
322void rgw_pubsub_topic::dump_xml_as_attributes(Formatter *f) const
323{
324 f->open_array_section("Attributes");
325 std::string str_user;
326 user.to_str(str_user);
327 encode_xml_key_value_entry("User", str_user, f);
328 encode_xml_key_value_entry("Name", name, f);
329 encode_xml_key_value_entry("EndPoint", dest.to_json_str(), f);
330 encode_xml_key_value_entry("TopicArn", arn, f);
331 encode_xml_key_value_entry("OpaqueData", opaque_data, f);
332 f->close_section(); // Attributes
333}
334
eafe8130
TL
335void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f)
336{
337 f->open_array_section(name);
338 for (auto iter = l.cbegin(); iter != l.cend(); ++iter) {
339 f->dump_string("obj", rgw::notify::to_ceph_string(*iter));
340 }
341 f->close_section();
11fdf7f2
TL
342}
343
344void rgw_pubsub_topic_filter::dump(Formatter *f) const
345{
346 encode_json("topic", topic, f);
347 encode_json("events", events, f);
348}
349
350void rgw_pubsub_topic_subs::dump(Formatter *f) const
351{
352 encode_json("topic", topic, f);
353 encode_json("subs", subs, f);
354}
355
356void rgw_pubsub_bucket_topics::dump(Formatter *f) const
357{
358 Formatter::ArraySection s(*f, "topics");
359 for (auto& t : topics) {
360 encode_json(t.first.c_str(), t.second, f);
361 }
362}
363
f67539c2 364void rgw_pubsub_topics::dump(Formatter *f) const
11fdf7f2
TL
365{
366 Formatter::ArraySection s(*f, "topics");
367 for (auto& t : topics) {
368 encode_json(t.first.c_str(), t.second, f);
369 }
370}
371
f67539c2 372void rgw_pubsub_topics::dump_xml(Formatter *f) const
eafe8130
TL
373{
374 for (auto& t : topics) {
375 encode_xml("member", t.second.topic, f);
376 }
377}
378
11fdf7f2
TL
379void rgw_pubsub_sub_dest::dump(Formatter *f) const
380{
381 encode_json("bucket_name", bucket_name, f);
382 encode_json("oid_prefix", oid_prefix, f);
383 encode_json("push_endpoint", push_endpoint, f);
eafe8130
TL
384 encode_json("push_endpoint_args", push_endpoint_args, f);
385 encode_json("push_endpoint_topic", arn_topic, f);
f67539c2
TL
386 encode_json("stored_secret", stored_secret, f);
387 encode_json("persistent", persistent, f);
eafe8130
TL
388}
389
390void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const
391{
f67539c2
TL
392 // first 2 members are omitted here since they
393 // dont apply to AWS compliant topics
eafe8130
TL
394 encode_xml("EndpointAddress", push_endpoint, f);
395 encode_xml("EndpointArgs", push_endpoint_args, f);
396 encode_xml("EndpointTopic", arn_topic, f);
f67539c2
TL
397 encode_xml("HasStoredSecret", stored_secret, f);
398 encode_xml("Persistent", persistent, f);
399}
400
401std::string rgw_pubsub_sub_dest::to_json_str() const
402{
403 // first 2 members are omitted here since they
404 // dont apply to AWS compliant topics
405 JSONFormatter f;
406 f.open_object_section("");
407 encode_json("EndpointAddress", push_endpoint, &f);
408 encode_json("EndpointArgs", push_endpoint_args, &f);
409 encode_json("EndpointTopic", arn_topic, &f);
410 encode_json("HasStoredSecret", stored_secret, &f);
411 encode_json("Persistent", persistent, &f);
412 f.close_section();
413 std::stringstream ss;
414 f.flush(ss);
415 return ss.str();
11fdf7f2
TL
416}
417
418void rgw_pubsub_sub_config::dump(Formatter *f) const
419{
420 encode_json("user", user, f);
421 encode_json("name", name, f);
422 encode_json("topic", topic, f);
423 encode_json("dest", dest, f);
eafe8130 424 encode_json("s3_id", s3_id, f);
11fdf7f2
TL
425}
426
f67539c2 427RGWPubSub::RGWPubSub(rgw::sal::RGWRadosStore* _store, const std::string& _tenant) :
9f95a23c 428 store(_store),
f67539c2 429 tenant(_tenant),
9f95a23c 430 obj_ctx(store->svc()->sysobj->init_obj_ctx()) {
f67539c2 431 get_meta_obj(&meta_obj);
9f95a23c 432}
11fdf7f2 433
b3b6e05e
TL
434int RGWPubSub::remove(const DoutPrefixProvider *dpp,
435 const rgw_raw_obj& obj,
f67539c2
TL
436 RGWObjVersionTracker *objv_tracker,
437 optional_yield y)
11fdf7f2 438{
b3b6e05e 439 int ret = rgw_delete_system_obj(dpp, store->svc()->sysobj, obj.pool, obj.oid, objv_tracker, y);
11fdf7f2
TL
440 if (ret < 0) {
441 return ret;
442 }
443
444 return 0;
445}
446
f67539c2 447int RGWPubSub::read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker *objv_tracker)
11fdf7f2 448{
f67539c2 449 int ret = read(meta_obj, result, objv_tracker);
eafe8130
TL
450 if (ret < 0) {
451 ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
11fdf7f2
TL
452 return ret;
453 }
454 return 0;
455}
456
b3b6e05e 457int RGWPubSub::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
f67539c2 458 RGWObjVersionTracker *objv_tracker, optional_yield y)
11fdf7f2 459{
b3b6e05e 460 int ret = write(dpp, meta_obj, topics, objv_tracker, y);
11fdf7f2 461 if (ret < 0 && ret != -ENOENT) {
b3b6e05e 462 ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
463 return ret;
464 }
465 return 0;
466}
467
f67539c2 468int RGWPubSub::get_topics(rgw_pubsub_topics *result)
11fdf7f2 469{
f67539c2 470 return read_topics(result, nullptr);
11fdf7f2
TL
471}
472
f67539c2 473int RGWPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
11fdf7f2
TL
474{
475 int ret = ps->read(bucket_meta_obj, result, objv_tracker);
476 if (ret < 0 && ret != -ENOENT) {
eafe8130 477 ldout(ps->store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
11fdf7f2
TL
478 return ret;
479 }
480 return 0;
481}
482
b3b6e05e 483int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics,
f67539c2
TL
484 RGWObjVersionTracker *objv_tracker,
485 optional_yield y)
11fdf7f2 486{
b3b6e05e 487 int ret = ps->write(dpp, bucket_meta_obj, topics, objv_tracker, y);
11fdf7f2 488 if (ret < 0) {
eafe8130 489 ldout(ps->store->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
11fdf7f2
TL
490 return ret;
491 }
492
493 return 0;
494}
495
f67539c2 496int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
11fdf7f2
TL
497{
498 return read_topics(result, nullptr);
499}
500
f67539c2 501int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
11fdf7f2 502{
f67539c2
TL
503 rgw_pubsub_topics topics;
504 int ret = get_topics(&topics);
11fdf7f2 505 if (ret < 0) {
eafe8130 506 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
11fdf7f2
TL
507 return ret;
508 }
509
510 auto iter = topics.topics.find(name);
511 if (iter == topics.topics.end()) {
eafe8130 512 ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
11fdf7f2
TL
513 return -ENOENT;
514 }
515
516 *result = iter->second;
517 return 0;
518}
519
f67539c2 520int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
11fdf7f2 521{
f67539c2
TL
522 rgw_pubsub_topics topics;
523 int ret = get_topics(&topics);
eafe8130
TL
524 if (ret < 0) {
525 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
526 return ret;
527 }
528
529 auto iter = topics.topics.find(name);
530 if (iter == topics.topics.end()) {
531 ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
532 return -ENOENT;
533 }
534
535 *result = iter->second.topic;
536 return 0;
537}
538
b3b6e05e
TL
539int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y) {
540 return create_notification(dpp, topic_name, events, std::nullopt, "", y);
eafe8130
TL
541}
542
b3b6e05e 543int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const string& topic_name,const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) {
f67539c2 544 rgw_pubsub_topic_subs topic_info;
11fdf7f2 545
f67539c2 546 int ret = ps->get_topic(topic_name, &topic_info);
11fdf7f2 547 if (ret < 0) {
b3b6e05e 548 ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
11fdf7f2
TL
549 return ret;
550 }
b3b6e05e 551 ldpp_dout(dpp, 20) << "successfully read topic '" << topic_name << "' info" << dendl;
11fdf7f2
TL
552
553 RGWObjVersionTracker objv_tracker;
554 rgw_pubsub_bucket_topics bucket_topics;
555
556 ret = read_topics(&bucket_topics, &objv_tracker);
eafe8130 557 if (ret < 0) {
b3b6e05e 558 ldpp_dout(dpp, 1) << "ERROR: failed to read topics from bucket '" <<
eafe8130 559 bucket.name << "': ret=" << ret << dendl;
11fdf7f2
TL
560 return ret;
561 }
b3b6e05e 562 ldpp_dout(dpp, 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" <<
eafe8130 563 bucket.name << "'" << dendl;
11fdf7f2
TL
564
565 auto& topic_filter = bucket_topics.topics[topic_name];
f67539c2 566 topic_filter.topic = topic_info.topic;
11fdf7f2 567 topic_filter.events = events;
eafe8130
TL
568 topic_filter.s3_id = notif_name;
569 if (s3_filter) {
570 topic_filter.s3_filter = *s3_filter;
571 }
11fdf7f2 572
b3b6e05e 573 ret = write_topics(dpp, bucket_topics, &objv_tracker, y);
11fdf7f2 574 if (ret < 0) {
b3b6e05e 575 ldpp_dout(dpp, 1) << "ERROR: failed to write topics to bucket '" << bucket.name << "': ret=" << ret << dendl;
11fdf7f2
TL
576 return ret;
577 }
eafe8130 578
b3b6e05e 579 ldpp_dout(dpp, 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket.name << "'" << dendl;
11fdf7f2
TL
580
581 return 0;
582}
583
b3b6e05e 584int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const string& topic_name, optional_yield y)
11fdf7f2 585{
f67539c2 586 rgw_pubsub_topic_subs topic_info;
11fdf7f2 587
f67539c2 588 int ret = ps->get_topic(topic_name, &topic_info);
11fdf7f2 589 if (ret < 0) {
b3b6e05e 590 ldpp_dout(dpp, 1) << "ERROR: failed to read topic info: ret=" << ret << dendl;
11fdf7f2
TL
591 return ret;
592 }
593
594 RGWObjVersionTracker objv_tracker;
595 rgw_pubsub_bucket_topics bucket_topics;
596
597 ret = read_topics(&bucket_topics, &objv_tracker);
eafe8130 598 if (ret < 0) {
b3b6e05e 599 ldpp_dout(dpp, 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
11fdf7f2
TL
600 return ret;
601 }
602
603 bucket_topics.topics.erase(topic_name);
604
522d829b
TL
605 if (bucket_topics.topics.empty()) {
606 // no more topics - delete the notification object of the bucket
607 ret = ps->remove(dpp, bucket_meta_obj, &objv_tracker, y);
608 if (ret < 0 && ret != -ENOENT) {
609 ldout(ps->store->ctx(), 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl;
610 return ret;
611 }
612 return 0;
613 }
614
615 // write back the notifications without the deleted one
b3b6e05e 616 ret = write_topics(dpp, bucket_topics, &objv_tracker, y);
11fdf7f2 617 if (ret < 0) {
b3b6e05e 618 ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
619 return ret;
620 }
621
622 return 0;
623}
624
b3b6e05e 625int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optional_yield y)
f67539c2
TL
626{
627 // get all topics on a bucket
628 rgw_pubsub_bucket_topics bucket_topics;
629 auto ret = get_topics(&bucket_topics);
630 if (ret < 0 && ret != -ENOENT) {
b3b6e05e 631 ldpp_dout(dpp, 1) << "ERROR: failed to get list of topics from bucket '" << bucket.name << "', ret=" << ret << dendl;
f67539c2
TL
632 return ret ;
633 }
634
635 // remove all auto-genrated topics
636 for (const auto& topic : bucket_topics.topics) {
637 const auto& topic_name = topic.first;
b3b6e05e 638 ret = ps->remove_topic(dpp, topic_name, y);
f67539c2 639 if (ret < 0 && ret != -ENOENT) {
b3b6e05e 640 ldpp_dout(dpp, 5) << "WARNING: failed to remove auto-generated topic '" << topic_name << "', ret=" << ret << dendl;
f67539c2
TL
641 }
642 }
643
522d829b 644 // delete the notification object of the bucket
b3b6e05e 645 ret = ps->remove(dpp, bucket_meta_obj, nullptr, y);
f67539c2 646 if (ret < 0 && ret != -ENOENT) {
b3b6e05e 647 ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl;
f67539c2
TL
648 return ret;
649 }
650
651 return 0;
eafe8130
TL
652}
653
b3b6e05e
TL
654int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y) {
655 return create_topic(dpp, name, rgw_pubsub_sub_dest(), "", "", y);
f67539c2
TL
656}
657
b3b6e05e 658int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) {
11fdf7f2 659 RGWObjVersionTracker objv_tracker;
f67539c2 660 rgw_pubsub_topics topics;
11fdf7f2 661
f67539c2 662 int ret = read_topics(&topics, &objv_tracker);
11fdf7f2 663 if (ret < 0 && ret != -ENOENT) {
eafe8130 664 // its not an error if not topics exist, we create one
b3b6e05e 665 ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
11fdf7f2
TL
666 return ret;
667 }
eafe8130 668
11fdf7f2 669 rgw_pubsub_topic_subs& new_topic = topics.topics[name];
f67539c2 670 new_topic.topic.user = rgw_user("", tenant);
11fdf7f2 671 new_topic.topic.name = name;
eafe8130
TL
672 new_topic.topic.dest = dest;
673 new_topic.topic.arn = arn;
9f95a23c 674 new_topic.topic.opaque_data = opaque_data;
11fdf7f2 675
b3b6e05e 676 ret = write_topics(dpp, topics, &objv_tracker, y);
11fdf7f2 677 if (ret < 0) {
b3b6e05e 678 ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
679 return ret;
680 }
681
682 return 0;
683}
684
b3b6e05e 685int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y)
11fdf7f2
TL
686{
687 RGWObjVersionTracker objv_tracker;
f67539c2 688 rgw_pubsub_topics topics;
11fdf7f2 689
f67539c2 690 int ret = read_topics(&topics, &objv_tracker);
11fdf7f2 691 if (ret < 0 && ret != -ENOENT) {
b3b6e05e 692 ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
11fdf7f2 693 return ret;
eafe8130
TL
694 } else if (ret == -ENOENT) {
695 // its not an error if no topics exist, just a no-op
b3b6e05e 696 ldpp_dout(dpp, 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret << dendl;
eafe8130 697 return 0;
11fdf7f2
TL
698 }
699
700 topics.topics.erase(name);
701
b3b6e05e 702 ret = write_topics(dpp, topics, &objv_tracker, y);
11fdf7f2 703 if (ret < 0) {
b3b6e05e 704 ldpp_dout(dpp, 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl;
11fdf7f2
TL
705 return ret;
706 }
707
708 return 0;
709}
710
f67539c2 711int RGWPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
11fdf7f2
TL
712{
713 int ret = ps->read(sub_meta_obj, result, objv_tracker);
714 if (ret < 0 && ret != -ENOENT) {
eafe8130 715 ldout(ps->store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
11fdf7f2
TL
716 return ret;
717 }
718 return 0;
719}
720
b3b6e05e
TL
721int RGWPubSub::Sub::write_sub(const DoutPrefixProvider *dpp,
722 const rgw_pubsub_sub_config& sub_conf,
f67539c2
TL
723 RGWObjVersionTracker *objv_tracker,
724 optional_yield y)
11fdf7f2 725{
b3b6e05e 726 int ret = ps->write(dpp, sub_meta_obj, sub_conf, objv_tracker, y);
11fdf7f2 727 if (ret < 0) {
b3b6e05e 728 ldpp_dout(dpp, 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
11fdf7f2
TL
729 return ret;
730 }
731
732 return 0;
733}
734
b3b6e05e 735int RGWPubSub::Sub::remove_sub(const DoutPrefixProvider *dpp, RGWObjVersionTracker *objv_tracker,
f67539c2 736 optional_yield y)
11fdf7f2 737{
b3b6e05e 738 int ret = ps->remove(dpp, sub_meta_obj, objv_tracker, y);
11fdf7f2 739 if (ret < 0) {
b3b6e05e 740 ldpp_dout(dpp, 1) << "ERROR: failed to remove subscription info: ret=" << ret << dendl;
11fdf7f2
TL
741 return ret;
742 }
743
744 return 0;
745}
746
f67539c2 747int RGWPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
11fdf7f2
TL
748{
749 return read_sub(result, nullptr);
750}
751
b3b6e05e 752int RGWPubSub::Sub::subscribe(const DoutPrefixProvider *dpp, const string& topic, const rgw_pubsub_sub_dest& dest, optional_yield y, const std::string& s3_id)
11fdf7f2 753{
f67539c2
TL
754 RGWObjVersionTracker objv_tracker;
755 rgw_pubsub_topics topics;
11fdf7f2 756
f67539c2 757 int ret = ps->read_topics(&topics, &objv_tracker);
11fdf7f2 758 if (ret < 0) {
b3b6e05e 759 ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
eafe8130 760 return ret != -ENOENT ? ret : -EINVAL;
11fdf7f2
TL
761 }
762
763 auto iter = topics.topics.find(topic);
764 if (iter == topics.topics.end()) {
b3b6e05e 765 ldpp_dout(dpp, 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
eafe8130 766 return -EINVAL;
11fdf7f2
TL
767 }
768
769 auto& t = iter->second;
770
771 rgw_pubsub_sub_config sub_conf;
772
f67539c2 773 sub_conf.user = rgw_user("", ps->tenant);
11fdf7f2
TL
774 sub_conf.name = sub;
775 sub_conf.topic = topic;
776 sub_conf.dest = dest;
eafe8130 777 sub_conf.s3_id = s3_id;
11fdf7f2
TL
778
779 t.subs.insert(sub);
780
b3b6e05e 781 ret = ps->write_topics(dpp, topics, &objv_tracker, y);
11fdf7f2 782 if (ret < 0) {
b3b6e05e 783 ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
784 return ret;
785 }
786
b3b6e05e 787 ret = write_sub(dpp, sub_conf, nullptr, y);
11fdf7f2 788 if (ret < 0) {
b3b6e05e 789 ldpp_dout(dpp, 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
11fdf7f2
TL
790 return ret;
791 }
792 return 0;
793}
794
b3b6e05e 795int RGWPubSub::Sub::unsubscribe(const DoutPrefixProvider *dpp, const string& _topic, optional_yield y)
11fdf7f2
TL
796{
797 string topic = _topic;
798 RGWObjVersionTracker sobjv_tracker;
11fdf7f2
TL
799
800 if (topic.empty()) {
801 rgw_pubsub_sub_config sub_conf;
802 int ret = read_sub(&sub_conf, &sobjv_tracker);
803 if (ret < 0) {
b3b6e05e 804 ldpp_dout(dpp, 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
11fdf7f2
TL
805 return ret;
806 }
807 topic = sub_conf.topic;
808 }
809
810 RGWObjVersionTracker objv_tracker;
f67539c2 811 rgw_pubsub_topics topics;
11fdf7f2 812
f67539c2 813 int ret = ps->read_topics(&topics, &objv_tracker);
11fdf7f2 814 if (ret < 0) {
eafe8130 815 // not an error - could be that topic was already deleted
b3b6e05e 816 ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
eafe8130 817 } else {
11fdf7f2
TL
818 auto iter = topics.topics.find(topic);
819 if (iter != topics.topics.end()) {
820 auto& t = iter->second;
821
822 t.subs.erase(sub);
823
b3b6e05e 824 ret = ps->write_topics(dpp, topics, &objv_tracker, y);
11fdf7f2 825 if (ret < 0) {
b3b6e05e 826 ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
827 return ret;
828 }
829 }
830 }
831
b3b6e05e 832 ret = remove_sub(dpp, &sobjv_tracker, y);
11fdf7f2 833 if (ret < 0) {
b3b6e05e 834 ldpp_dout(dpp, 1) << "ERROR: failed to delete subscription info: ret=" << ret << dendl;
11fdf7f2
TL
835 return ret;
836 }
837 return 0;
838}
839
eafe8130 840template<typename EventType>
f67539c2 841void RGWPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
11fdf7f2
TL
842{
843 encode_json("next_marker", next_marker, f);
844 encode_json("is_truncated", is_truncated, f);
845
eafe8130 846 Formatter::ArraySection s(*f, EventType::json_type_plural);
11fdf7f2 847 for (auto& event : events) {
92f5a8d4 848 encode_json("", event, f);
11fdf7f2
TL
849 }
850}
851
eafe8130 852template<typename EventType>
b3b6e05e 853int RGWPubSub::SubWithEvents<EventType>::list_events(const DoutPrefixProvider *dpp, const string& marker, int max_events)
11fdf7f2 854{
9f95a23c 855 RGWRados *store = ps->store->getRados();
11fdf7f2
TL
856 rgw_pubsub_sub_config sub_conf;
857 int ret = get_conf(&sub_conf);
858 if (ret < 0) {
b3b6e05e 859 ldpp_dout(dpp, 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
11fdf7f2
TL
860 return ret;
861 }
862
863 RGWBucketInfo bucket_info;
864 string tenant;
9f95a23c 865 ret = store->get_bucket_info(&store->svc, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr);
11fdf7f2 866 if (ret == -ENOENT) {
eafe8130 867 list.is_truncated = false;
11fdf7f2
TL
868 return 0;
869 }
870 if (ret < 0) {
b3b6e05e 871 ldpp_dout(dpp, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
11fdf7f2
TL
872 return ret;
873 }
874
875 RGWRados::Bucket target(store, bucket_info);
876 RGWRados::Bucket::List list_op(&target);
877
878 list_op.params.prefix = sub_conf.dest.oid_prefix;
879 list_op.params.marker = marker;
880
eafe8130 881 std::vector<rgw_bucket_dir_entry> objs;
11fdf7f2 882
b3b6e05e 883 ret = list_op.list_objects(dpp, max_events, &objs, nullptr, &list.is_truncated, null_yield);
11fdf7f2 884 if (ret < 0) {
b3b6e05e 885 ldpp_dout(dpp, 1) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
11fdf7f2
TL
886 return ret;
887 }
eafe8130
TL
888 if (list.is_truncated) {
889 list.next_marker = list_op.get_next_marker().name;
11fdf7f2
TL
890 }
891
892 for (auto& obj : objs) {
893 bufferlist bl64;
894 bufferlist bl;
895 bl64.append(obj.meta.user_data);
896 try {
897 bl.decode_base64(bl64);
898 } catch (buffer::error& err) {
b3b6e05e 899 ldpp_dout(dpp, 1) << "ERROR: failed to event (not a valid base64)" << dendl;
11fdf7f2
TL
900 continue;
901 }
eafe8130 902 EventType event;
11fdf7f2
TL
903
904 auto iter = bl.cbegin();
905 try {
906 decode(event, iter);
907 } catch (buffer::error& err) {
b3b6e05e 908 ldpp_dout(dpp, 1) << "ERROR: failed to decode event" << dendl;
11fdf7f2
TL
909 continue;
910 };
911
eafe8130 912 list.events.push_back(event);
11fdf7f2
TL
913 }
914 return 0;
915}
916
eafe8130 917template<typename EventType>
b3b6e05e 918int RGWPubSub::SubWithEvents<EventType>::remove_event(const DoutPrefixProvider *dpp, const string& event_id)
11fdf7f2 919{
9f95a23c 920 rgw::sal::RGWRadosStore *store = ps->store;
11fdf7f2
TL
921 rgw_pubsub_sub_config sub_conf;
922 int ret = get_conf(&sub_conf);
923 if (ret < 0) {
b3b6e05e 924 ldpp_dout(dpp, 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
11fdf7f2
TL
925 return ret;
926 }
927
928 RGWBucketInfo bucket_info;
929 string tenant;
9f95a23c 930 ret = store->getRados()->get_bucket_info(store->svc(), tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr);
11fdf7f2 931 if (ret < 0) {
b3b6e05e 932 ldpp_dout(dpp, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
11fdf7f2
TL
933 return ret;
934 }
935
936 rgw_bucket& bucket = bucket_info.bucket;
937
938 RGWObjectCtx obj_ctx(store);
939 rgw_obj obj(bucket, sub_conf.dest.oid_prefix + event_id);
940
941 obj_ctx.set_atomic(obj);
942
9f95a23c 943 RGWRados::Object del_target(store->getRados(), bucket_info, obj_ctx, obj);
11fdf7f2
TL
944 RGWRados::Object::Delete del_op(&del_target);
945
946 del_op.params.bucket_owner = bucket_info.owner;
947 del_op.params.versioning_status = bucket_info.versioning_status();
948
b3b6e05e 949 ret = del_op.delete_obj(null_yield, dpp);
11fdf7f2 950 if (ret < 0) {
b3b6e05e 951 ldpp_dout(dpp, 1) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl;
11fdf7f2
TL
952 }
953 return 0;
954}
eafe8130 955
f67539c2
TL
956void RGWPubSub::get_meta_obj(rgw_raw_obj *obj) const {
957 *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, meta_oid());
9f95a23c
TL
958}
959
f67539c2 960void RGWPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
9f95a23c
TL
961 *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
962}
963
f67539c2 964void RGWPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
9f95a23c
TL
965 *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
966}
967
eafe8130 968template<typename EventType>
f67539c2 969void RGWPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
eafe8130
TL
970 list.dump(f);
971}
972
973// explicit instantiation for the only two possible types
974// no need to move implementation to header
f67539c2
TL
975template class RGWPubSub::SubWithEvents<rgw_pubsub_event>;
976template class RGWPubSub::SubWithEvents<rgw_pubsub_s3_event>;
eafe8130 977