]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_pubsub.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_pubsub.cc
CommitLineData
eafe8130
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4#include "services/svc_zone.h"
11fdf7f2 5#include "rgw_b64.h"
9f95a23c 6#include "rgw_sal.h"
f67539c2 7#include "rgw_sal_rados.h"
11fdf7f2
TL
8#include "rgw_pubsub.h"
9#include "rgw_tools.h"
eafe8130
TL
10#include "rgw_xml.h"
11#include "rgw_arn.h"
12#include "rgw_pubsub_push.h"
eafe8130
TL
13#include <regex>
14#include <algorithm>
11fdf7f2
TL
15
16#define dout_subsys ceph_subsys_rgw
17
20effc67 18using namespace std;
92f5a8d4
TL
19void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) {
20 char buf[64];
21 const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str());
22 if (len > 0) {
23 id.assign(buf, len);
24 }
25}
26
eafe8130
TL
27bool rgw_s3_key_filter::decode_xml(XMLObj* obj) {
28 XMLObjIter iter = obj->find("FilterRule");
29 XMLObj *o;
30
31 const auto throw_if_missing = true;
32 auto prefix_not_set = true;
33 auto suffix_not_set = true;
34 auto regex_not_set = true;
35 std::string name;
36
37 while ((o = iter.get_next())) {
38 RGWXMLDecoder::decode_xml("Name", name, o, throw_if_missing);
39 if (name == "prefix" && prefix_not_set) {
40 prefix_not_set = false;
41 RGWXMLDecoder::decode_xml("Value", prefix_rule, o, throw_if_missing);
42 } else if (name == "suffix" && suffix_not_set) {
43 suffix_not_set = false;
44 RGWXMLDecoder::decode_xml("Value", suffix_rule, o, throw_if_missing);
45 } else if (name == "regex" && regex_not_set) {
46 regex_not_set = false;
47 RGWXMLDecoder::decode_xml("Value", regex_rule, o, throw_if_missing);
48 } else {
49 throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name + "'");
50 }
51 }
52 return true;
53}
54
55void rgw_s3_key_filter::dump_xml(Formatter *f) const {
56 if (!prefix_rule.empty()) {
57 f->open_object_section("FilterRule");
58 ::encode_xml("Name", "prefix", f);
59 ::encode_xml("Value", prefix_rule, f);
60 f->close_section();
61 }
62 if (!suffix_rule.empty()) {
63 f->open_object_section("FilterRule");
64 ::encode_xml("Name", "suffix", f);
65 ::encode_xml("Value", suffix_rule, f);
66 f->close_section();
67 }
68 if (!regex_rule.empty()) {
69 f->open_object_section("FilterRule");
70 ::encode_xml("Name", "regex", f);
71 ::encode_xml("Value", regex_rule, f);
72 f->close_section();
73 }
74}
75
76bool rgw_s3_key_filter::has_content() const {
77 return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty());
78}
79
9f95a23c 80bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) {
f67539c2 81 kv.clear();
eafe8130
TL
82 XMLObjIter iter = obj->find("FilterRule");
83 XMLObj *o;
84
85 const auto throw_if_missing = true;
86
87 std::string key;
88 std::string value;
89
90 while ((o = iter.get_next())) {
91 RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing);
92 RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing);
f67539c2 93 kv.emplace(key, value);
eafe8130
TL
94 }
95 return true;
96}
97
9f95a23c 98void rgw_s3_key_value_filter::dump_xml(Formatter *f) const {
f67539c2 99 for (const auto& key_value : kv) {
eafe8130
TL
100 f->open_object_section("FilterRule");
101 ::encode_xml("Name", key_value.first, f);
102 ::encode_xml("Value", key_value.second, f);
103 f->close_section();
104 }
105}
106
9f95a23c 107bool rgw_s3_key_value_filter::has_content() const {
f67539c2 108 return !kv.empty();
eafe8130
TL
109}
110
111bool rgw_s3_filter::decode_xml(XMLObj* obj) {
112 RGWXMLDecoder::decode_xml("S3Key", key_filter, obj);
113 RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj);
9f95a23c 114 RGWXMLDecoder::decode_xml("S3Tags", tag_filter, obj);
eafe8130
TL
115 return true;
116}
117
118void rgw_s3_filter::dump_xml(Formatter *f) const {
119 if (key_filter.has_content()) {
120 ::encode_xml("S3Key", key_filter, f);
121 }
122 if (metadata_filter.has_content()) {
123 ::encode_xml("S3Metadata", metadata_filter, f);
124 }
9f95a23c
TL
125 if (tag_filter.has_content()) {
126 ::encode_xml("S3Tags", tag_filter, f);
127 }
eafe8130
TL
128}
129
130bool rgw_s3_filter::has_content() const {
131 return key_filter.has_content() ||
9f95a23c
TL
132 metadata_filter.has_content() ||
133 tag_filter.has_content();
eafe8130
TL
134}
135
136bool match(const rgw_s3_key_filter& filter, const std::string& key) {
137 const auto key_size = key.size();
138 const auto prefix_size = filter.prefix_rule.size();
139 if (prefix_size != 0) {
140 // prefix rule exists
141 if (prefix_size > key_size) {
142 // if prefix is longer than key, we fail
143 return false;
144 }
145 if (!std::equal(filter.prefix_rule.begin(), filter.prefix_rule.end(), key.begin())) {
146 return false;
147 }
148 }
149 const auto suffix_size = filter.suffix_rule.size();
150 if (suffix_size != 0) {
151 // suffix rule exists
152 if (suffix_size > key_size) {
153 // if suffix is longer than key, we fail
154 return false;
155 }
156 if (!std::equal(filter.suffix_rule.begin(), filter.suffix_rule.end(), (key.end() - suffix_size))) {
157 return false;
158 }
159 }
160 if (!filter.regex_rule.empty()) {
161 // TODO add regex chaching in the filter
162 const std::regex base_regex(filter.regex_rule);
163 if (!std::regex_match(key, base_regex)) {
164 return false;
165 }
166 }
167 return true;
168}
169
f67539c2 170bool match(const rgw_s3_key_value_filter& filter, const KeyValueMap& kv) {
9f95a23c
TL
171 // all filter pairs must exist with the same value in the object's metadata/tags
172 // object metadata/tags may include items not in the filter
f67539c2 173 return std::includes(kv.begin(), kv.end(), filter.kv.begin(), filter.kv.end());
eafe8130
TL
174}
175
20effc67
TL
176bool match(const rgw_s3_key_value_filter& filter, const KeyMultiValueMap& kv) {
177 // all filter pairs must exist with the same value in the object's metadata/tags
178 // object metadata/tags may include items not in the filter
179 for (auto& filter : filter.kv) {
180 auto result = kv.equal_range(filter.first);
181 if (std::any_of(result.first, result.second, [&filter](const pair<string,string>& p) { return p.second == filter.second;}))
182 continue;
183 else
184 return false;
185 }
186 return true;
187}
188
eafe8130
TL
189bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) {
190 // if event list exists, and none of the events in the list matches the event type, filter the message
191 if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) {
192 return false;
193 }
194 return true;
195}
196
197void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj) {
198 l.clear();
199
200 XMLObjIter iter = obj->find(name);
201 XMLObj *o;
202
203 while ((o = iter.get_next())) {
204 std::string val;
205 decode_xml_obj(val, o);
206 l.push_back(rgw::notify::from_string(val));
207 }
208}
209
210bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) {
211 const auto throw_if_missing = true;
212 RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing);
213
214 RGWXMLDecoder::decode_xml("Topic", topic_arn, obj, throw_if_missing);
215
216 RGWXMLDecoder::decode_xml("Filter", filter, obj);
217
218 do_decode_xml_obj(events, "Event", obj);
219 if (events.empty()) {
220 // if no events are provided, we assume all events
221 events.push_back(rgw::notify::ObjectCreated);
222 events.push_back(rgw::notify::ObjectRemoved);
223 }
224 return true;
225}
226
227void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const {
228 ::encode_xml("Id", id, f);
229 ::encode_xml("Topic", topic_arn.c_str(), f);
230 if (filter.has_content()) {
231 ::encode_xml("Filter", filter, f);
232 }
233 for (const auto& event : events) {
234 ::encode_xml("Event", rgw::notify::to_string(event), f);
235 }
236}
237
238bool rgw_pubsub_s3_notifications::decode_xml(XMLObj *obj) {
239 do_decode_xml_obj(list, "TopicConfiguration", obj);
eafe8130
TL
240 return true;
241}
242
243rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter) :
244 id(topic_filter.s3_id), events(topic_filter.events), topic_arn(topic_filter.topic.arn), filter(topic_filter.s3_filter) {}
245
246void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const {
247 do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f);
248}
249
f67539c2 250void rgw_pubsub_s3_event::dump(Formatter *f) const {
eafe8130
TL
251 encode_json("eventVersion", eventVersion, f);
252 encode_json("eventSource", eventSource, f);
253 encode_json("awsRegion", awsRegion, f);
254 utime_t ut(eventTime);
255 encode_json("eventTime", ut, f);
256 encode_json("eventName", eventName, f);
257 {
258 Formatter::ObjectSection s(*f, "userIdentity");
259 encode_json("principalId", userIdentity, f);
260 }
261 {
262 Formatter::ObjectSection s(*f, "requestParameters");
263 encode_json("sourceIPAddress", sourceIPAddress, f);
264 }
265 {
266 Formatter::ObjectSection s(*f, "responseElements");
267 encode_json("x-amz-request-id", x_amz_request_id, f);
268 encode_json("x-amz-id-2", x_amz_id_2, f);
269 }
270 {
271 Formatter::ObjectSection s(*f, "s3");
272 encode_json("s3SchemaVersion", s3SchemaVersion, f);
273 encode_json("configurationId", configurationId, f);
274 {
275 Formatter::ObjectSection sub_s(*f, "bucket");
276 encode_json("name", bucket_name, f);
277 {
278 Formatter::ObjectSection sub_sub_s(*f, "ownerIdentity");
279 encode_json("principalId", bucket_ownerIdentity, f);
280 }
281 encode_json("arn", bucket_arn, f);
282 encode_json("id", bucket_id, f);
283 }
284 {
285 Formatter::ObjectSection sub_s(*f, "object");
286 encode_json("key", object_key, f);
287 encode_json("size", object_size, f);
a4b75251 288 encode_json("eTag", object_etag, f);
eafe8130
TL
289 encode_json("versionId", object_versionId, f);
290 encode_json("sequencer", object_sequencer, f);
291 encode_json("metadata", x_meta_map, f);
9f95a23c 292 encode_json("tags", tags, f);
eafe8130
TL
293 }
294 }
295 encode_json("eventId", id, f);
9f95a23c 296 encode_json("opaqueData", opaque_data, f);
eafe8130 297}
11fdf7f2
TL
298
299void rgw_pubsub_event::dump(Formatter *f) const
300{
301 encode_json("id", id, f);
eafe8130 302 encode_json("event", event_name, f);
11fdf7f2
TL
303 utime_t ut(timestamp);
304 encode_json("timestamp", ut, f);
305 encode_json("info", info, f);
306}
307
308void rgw_pubsub_topic::dump(Formatter *f) const
309{
310 encode_json("user", user, f);
311 encode_json("name", name, f);
eafe8130
TL
312 encode_json("dest", dest, f);
313 encode_json("arn", arn, f);
9f95a23c 314 encode_json("opaqueData", opaque_data, f);
eafe8130
TL
315}
316
317void rgw_pubsub_topic::dump_xml(Formatter *f) const
318{
319 encode_xml("User", user, f);
320 encode_xml("Name", name, f);
321 encode_xml("EndPoint", dest, f);
322 encode_xml("TopicArn", arn, f);
9f95a23c 323 encode_xml("OpaqueData", opaque_data, f);
eafe8130
TL
324}
325
f67539c2
TL
326void encode_xml_key_value_entry(const std::string& key, const std::string& value, Formatter *f) {
327 f->open_object_section("entry");
328 encode_xml("key", key, f);
329 encode_xml("value", value, f);
330 f->close_section(); // entry
331}
332
333void rgw_pubsub_topic::dump_xml_as_attributes(Formatter *f) const
334{
335 f->open_array_section("Attributes");
336 std::string str_user;
337 user.to_str(str_user);
338 encode_xml_key_value_entry("User", str_user, f);
339 encode_xml_key_value_entry("Name", name, f);
340 encode_xml_key_value_entry("EndPoint", dest.to_json_str(), f);
341 encode_xml_key_value_entry("TopicArn", arn, f);
342 encode_xml_key_value_entry("OpaqueData", opaque_data, f);
343 f->close_section(); // Attributes
344}
345
eafe8130
TL
346void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f)
347{
348 f->open_array_section(name);
349 for (auto iter = l.cbegin(); iter != l.cend(); ++iter) {
350 f->dump_string("obj", rgw::notify::to_ceph_string(*iter));
351 }
352 f->close_section();
11fdf7f2
TL
353}
354
355void rgw_pubsub_topic_filter::dump(Formatter *f) const
356{
357 encode_json("topic", topic, f);
358 encode_json("events", events, f);
359}
360
361void rgw_pubsub_topic_subs::dump(Formatter *f) const
362{
363 encode_json("topic", topic, f);
364 encode_json("subs", subs, f);
365}
366
367void rgw_pubsub_bucket_topics::dump(Formatter *f) const
368{
369 Formatter::ArraySection s(*f, "topics");
370 for (auto& t : topics) {
371 encode_json(t.first.c_str(), t.second, f);
372 }
373}
374
f67539c2 375void rgw_pubsub_topics::dump(Formatter *f) const
11fdf7f2
TL
376{
377 Formatter::ArraySection s(*f, "topics");
378 for (auto& t : topics) {
379 encode_json(t.first.c_str(), t.second, f);
380 }
381}
382
f67539c2 383void rgw_pubsub_topics::dump_xml(Formatter *f) const
eafe8130
TL
384{
385 for (auto& t : topics) {
386 encode_xml("member", t.second.topic, f);
387 }
388}
389
11fdf7f2
TL
390void rgw_pubsub_sub_dest::dump(Formatter *f) const
391{
392 encode_json("bucket_name", bucket_name, f);
393 encode_json("oid_prefix", oid_prefix, f);
394 encode_json("push_endpoint", push_endpoint, f);
eafe8130
TL
395 encode_json("push_endpoint_args", push_endpoint_args, f);
396 encode_json("push_endpoint_topic", arn_topic, f);
f67539c2
TL
397 encode_json("stored_secret", stored_secret, f);
398 encode_json("persistent", persistent, f);
eafe8130
TL
399}
400
401void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const
402{
f67539c2
TL
403 // first 2 members are omitted here since they
404 // dont apply to AWS compliant topics
eafe8130
TL
405 encode_xml("EndpointAddress", push_endpoint, f);
406 encode_xml("EndpointArgs", push_endpoint_args, f);
407 encode_xml("EndpointTopic", arn_topic, f);
f67539c2
TL
408 encode_xml("HasStoredSecret", stored_secret, f);
409 encode_xml("Persistent", persistent, f);
410}
411
412std::string rgw_pubsub_sub_dest::to_json_str() const
413{
414 // first 2 members are omitted here since they
415 // dont apply to AWS compliant topics
416 JSONFormatter f;
417 f.open_object_section("");
418 encode_json("EndpointAddress", push_endpoint, &f);
419 encode_json("EndpointArgs", push_endpoint_args, &f);
420 encode_json("EndpointTopic", arn_topic, &f);
421 encode_json("HasStoredSecret", stored_secret, &f);
422 encode_json("Persistent", persistent, &f);
423 f.close_section();
424 std::stringstream ss;
425 f.flush(ss);
426 return ss.str();
11fdf7f2
TL
427}
428
429void rgw_pubsub_sub_config::dump(Formatter *f) const
430{
431 encode_json("user", user, f);
432 encode_json("name", name, f);
433 encode_json("topic", topic, f);
434 encode_json("dest", dest, f);
eafe8130 435 encode_json("s3_id", s3_id, f);
11fdf7f2
TL
436}
437
20effc67 438RGWPubSub::RGWPubSub(rgw::sal::RadosStore* _store, const std::string& _tenant) :
9f95a23c 439 store(_store),
f67539c2 440 tenant(_tenant),
9f95a23c 441 obj_ctx(store->svc()->sysobj->init_obj_ctx()) {
f67539c2 442 get_meta_obj(&meta_obj);
9f95a23c 443}
11fdf7f2 444
b3b6e05e
TL
445int RGWPubSub::remove(const DoutPrefixProvider *dpp,
446 const rgw_raw_obj& obj,
f67539c2
TL
447 RGWObjVersionTracker *objv_tracker,
448 optional_yield y)
11fdf7f2 449{
b3b6e05e 450 int ret = rgw_delete_system_obj(dpp, store->svc()->sysobj, obj.pool, obj.oid, objv_tracker, y);
11fdf7f2
TL
451 if (ret < 0) {
452 return ret;
453 }
454
455 return 0;
456}
457
f67539c2 458int RGWPubSub::read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker *objv_tracker)
11fdf7f2 459{
f67539c2 460 int ret = read(meta_obj, result, objv_tracker);
eafe8130
TL
461 if (ret < 0) {
462 ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
11fdf7f2
TL
463 return ret;
464 }
465 return 0;
466}
467
b3b6e05e 468int RGWPubSub::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
f67539c2 469 RGWObjVersionTracker *objv_tracker, optional_yield y)
11fdf7f2 470{
b3b6e05e 471 int ret = write(dpp, meta_obj, topics, objv_tracker, y);
11fdf7f2 472 if (ret < 0 && ret != -ENOENT) {
b3b6e05e 473 ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
474 return ret;
475 }
476 return 0;
477}
478
f67539c2 479int RGWPubSub::get_topics(rgw_pubsub_topics *result)
11fdf7f2 480{
f67539c2 481 return read_topics(result, nullptr);
11fdf7f2
TL
482}
483
f67539c2 484int RGWPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
11fdf7f2
TL
485{
486 int ret = ps->read(bucket_meta_obj, result, objv_tracker);
487 if (ret < 0 && ret != -ENOENT) {
eafe8130 488 ldout(ps->store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
11fdf7f2
TL
489 return ret;
490 }
491 return 0;
492}
493
b3b6e05e 494int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics,
f67539c2
TL
495 RGWObjVersionTracker *objv_tracker,
496 optional_yield y)
11fdf7f2 497{
b3b6e05e 498 int ret = ps->write(dpp, bucket_meta_obj, topics, objv_tracker, y);
11fdf7f2 499 if (ret < 0) {
eafe8130 500 ldout(ps->store->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
11fdf7f2
TL
501 return ret;
502 }
503
504 return 0;
505}
506
f67539c2 507int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
11fdf7f2
TL
508{
509 return read_topics(result, nullptr);
510}
511
f67539c2 512int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
11fdf7f2 513{
f67539c2
TL
514 rgw_pubsub_topics topics;
515 int ret = get_topics(&topics);
11fdf7f2 516 if (ret < 0) {
eafe8130 517 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
11fdf7f2
TL
518 return ret;
519 }
520
521 auto iter = topics.topics.find(name);
522 if (iter == topics.topics.end()) {
eafe8130 523 ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
11fdf7f2
TL
524 return -ENOENT;
525 }
526
527 *result = iter->second;
528 return 0;
529}
530
f67539c2 531int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
11fdf7f2 532{
f67539c2
TL
533 rgw_pubsub_topics topics;
534 int ret = get_topics(&topics);
eafe8130
TL
535 if (ret < 0) {
536 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
537 return ret;
538 }
539
540 auto iter = topics.topics.find(name);
541 if (iter == topics.topics.end()) {
542 ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
543 return -ENOENT;
544 }
545
546 *result = iter->second.topic;
547 return 0;
548}
549
b3b6e05e
TL
550int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y) {
551 return create_notification(dpp, topic_name, events, std::nullopt, "", y);
eafe8130
TL
552}
553
b3b6e05e 554int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const string& topic_name,const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) {
f67539c2 555 rgw_pubsub_topic_subs topic_info;
11fdf7f2 556
f67539c2 557 int ret = ps->get_topic(topic_name, &topic_info);
11fdf7f2 558 if (ret < 0) {
b3b6e05e 559 ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
11fdf7f2
TL
560 return ret;
561 }
b3b6e05e 562 ldpp_dout(dpp, 20) << "successfully read topic '" << topic_name << "' info" << dendl;
11fdf7f2
TL
563
564 RGWObjVersionTracker objv_tracker;
565 rgw_pubsub_bucket_topics bucket_topics;
566
567 ret = read_topics(&bucket_topics, &objv_tracker);
eafe8130 568 if (ret < 0) {
b3b6e05e 569 ldpp_dout(dpp, 1) << "ERROR: failed to read topics from bucket '" <<
eafe8130 570 bucket.name << "': ret=" << ret << dendl;
11fdf7f2
TL
571 return ret;
572 }
b3b6e05e 573 ldpp_dout(dpp, 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" <<
eafe8130 574 bucket.name << "'" << dendl;
11fdf7f2
TL
575
576 auto& topic_filter = bucket_topics.topics[topic_name];
f67539c2 577 topic_filter.topic = topic_info.topic;
11fdf7f2 578 topic_filter.events = events;
eafe8130
TL
579 topic_filter.s3_id = notif_name;
580 if (s3_filter) {
581 topic_filter.s3_filter = *s3_filter;
582 }
11fdf7f2 583
b3b6e05e 584 ret = write_topics(dpp, bucket_topics, &objv_tracker, y);
11fdf7f2 585 if (ret < 0) {
b3b6e05e 586 ldpp_dout(dpp, 1) << "ERROR: failed to write topics to bucket '" << bucket.name << "': ret=" << ret << dendl;
11fdf7f2
TL
587 return ret;
588 }
eafe8130 589
b3b6e05e 590 ldpp_dout(dpp, 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket.name << "'" << dendl;
11fdf7f2
TL
591
592 return 0;
593}
594
b3b6e05e 595int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const string& topic_name, optional_yield y)
11fdf7f2 596{
f67539c2 597 rgw_pubsub_topic_subs topic_info;
11fdf7f2 598
f67539c2 599 int ret = ps->get_topic(topic_name, &topic_info);
11fdf7f2 600 if (ret < 0) {
b3b6e05e 601 ldpp_dout(dpp, 1) << "ERROR: failed to read topic info: ret=" << ret << dendl;
11fdf7f2
TL
602 return ret;
603 }
604
605 RGWObjVersionTracker objv_tracker;
606 rgw_pubsub_bucket_topics bucket_topics;
607
608 ret = read_topics(&bucket_topics, &objv_tracker);
eafe8130 609 if (ret < 0) {
b3b6e05e 610 ldpp_dout(dpp, 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
11fdf7f2
TL
611 return ret;
612 }
613
614 bucket_topics.topics.erase(topic_name);
615
522d829b
TL
616 if (bucket_topics.topics.empty()) {
617 // no more topics - delete the notification object of the bucket
618 ret = ps->remove(dpp, bucket_meta_obj, &objv_tracker, y);
619 if (ret < 0 && ret != -ENOENT) {
20effc67 620 ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl;
522d829b
TL
621 return ret;
622 }
623 return 0;
624 }
625
626 // write back the notifications without the deleted one
b3b6e05e 627 ret = write_topics(dpp, bucket_topics, &objv_tracker, y);
11fdf7f2 628 if (ret < 0) {
b3b6e05e 629 ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
630 return ret;
631 }
632
633 return 0;
634}
635
b3b6e05e 636int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optional_yield y)
f67539c2
TL
637{
638 // get all topics on a bucket
639 rgw_pubsub_bucket_topics bucket_topics;
640 auto ret = get_topics(&bucket_topics);
641 if (ret < 0 && ret != -ENOENT) {
b3b6e05e 642 ldpp_dout(dpp, 1) << "ERROR: failed to get list of topics from bucket '" << bucket.name << "', ret=" << ret << dendl;
f67539c2
TL
643 return ret ;
644 }
645
646 // remove all auto-genrated topics
647 for (const auto& topic : bucket_topics.topics) {
648 const auto& topic_name = topic.first;
b3b6e05e 649 ret = ps->remove_topic(dpp, topic_name, y);
f67539c2 650 if (ret < 0 && ret != -ENOENT) {
b3b6e05e 651 ldpp_dout(dpp, 5) << "WARNING: failed to remove auto-generated topic '" << topic_name << "', ret=" << ret << dendl;
f67539c2
TL
652 }
653 }
654
522d829b 655 // delete the notification object of the bucket
b3b6e05e 656 ret = ps->remove(dpp, bucket_meta_obj, nullptr, y);
f67539c2 657 if (ret < 0 && ret != -ENOENT) {
b3b6e05e 658 ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl;
f67539c2
TL
659 return ret;
660 }
661
662 return 0;
eafe8130
TL
663}
664
b3b6e05e
TL
665int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y) {
666 return create_topic(dpp, name, rgw_pubsub_sub_dest(), "", "", y);
f67539c2
TL
667}
668
b3b6e05e 669int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) {
11fdf7f2 670 RGWObjVersionTracker objv_tracker;
f67539c2 671 rgw_pubsub_topics topics;
11fdf7f2 672
f67539c2 673 int ret = read_topics(&topics, &objv_tracker);
11fdf7f2 674 if (ret < 0 && ret != -ENOENT) {
eafe8130 675 // its not an error if not topics exist, we create one
b3b6e05e 676 ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
11fdf7f2
TL
677 return ret;
678 }
eafe8130 679
11fdf7f2 680 rgw_pubsub_topic_subs& new_topic = topics.topics[name];
f67539c2 681 new_topic.topic.user = rgw_user("", tenant);
11fdf7f2 682 new_topic.topic.name = name;
eafe8130
TL
683 new_topic.topic.dest = dest;
684 new_topic.topic.arn = arn;
9f95a23c 685 new_topic.topic.opaque_data = opaque_data;
11fdf7f2 686
b3b6e05e 687 ret = write_topics(dpp, topics, &objv_tracker, y);
11fdf7f2 688 if (ret < 0) {
b3b6e05e 689 ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
690 return ret;
691 }
692
693 return 0;
694}
695
b3b6e05e 696int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y)
11fdf7f2
TL
697{
698 RGWObjVersionTracker objv_tracker;
f67539c2 699 rgw_pubsub_topics topics;
11fdf7f2 700
f67539c2 701 int ret = read_topics(&topics, &objv_tracker);
11fdf7f2 702 if (ret < 0 && ret != -ENOENT) {
b3b6e05e 703 ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
11fdf7f2 704 return ret;
eafe8130
TL
705 } else if (ret == -ENOENT) {
706 // its not an error if no topics exist, just a no-op
b3b6e05e 707 ldpp_dout(dpp, 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret << dendl;
eafe8130 708 return 0;
11fdf7f2
TL
709 }
710
711 topics.topics.erase(name);
712
b3b6e05e 713 ret = write_topics(dpp, topics, &objv_tracker, y);
11fdf7f2 714 if (ret < 0) {
b3b6e05e 715 ldpp_dout(dpp, 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl;
11fdf7f2
TL
716 return ret;
717 }
718
719 return 0;
720}
721
f67539c2 722int RGWPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
11fdf7f2
TL
723{
724 int ret = ps->read(sub_meta_obj, result, objv_tracker);
725 if (ret < 0 && ret != -ENOENT) {
eafe8130 726 ldout(ps->store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
11fdf7f2
TL
727 return ret;
728 }
729 return 0;
730}
731
b3b6e05e
TL
732int RGWPubSub::Sub::write_sub(const DoutPrefixProvider *dpp,
733 const rgw_pubsub_sub_config& sub_conf,
f67539c2
TL
734 RGWObjVersionTracker *objv_tracker,
735 optional_yield y)
11fdf7f2 736{
b3b6e05e 737 int ret = ps->write(dpp, sub_meta_obj, sub_conf, objv_tracker, y);
11fdf7f2 738 if (ret < 0) {
b3b6e05e 739 ldpp_dout(dpp, 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
11fdf7f2
TL
740 return ret;
741 }
742
743 return 0;
744}
745
b3b6e05e 746int RGWPubSub::Sub::remove_sub(const DoutPrefixProvider *dpp, RGWObjVersionTracker *objv_tracker,
f67539c2 747 optional_yield y)
11fdf7f2 748{
b3b6e05e 749 int ret = ps->remove(dpp, sub_meta_obj, objv_tracker, y);
11fdf7f2 750 if (ret < 0) {
b3b6e05e 751 ldpp_dout(dpp, 1) << "ERROR: failed to remove subscription info: ret=" << ret << dendl;
11fdf7f2
TL
752 return ret;
753 }
754
755 return 0;
756}
757
f67539c2 758int RGWPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
11fdf7f2
TL
759{
760 return read_sub(result, nullptr);
761}
762
b3b6e05e 763int RGWPubSub::Sub::subscribe(const DoutPrefixProvider *dpp, const string& topic, const rgw_pubsub_sub_dest& dest, optional_yield y, const std::string& s3_id)
11fdf7f2 764{
f67539c2
TL
765 RGWObjVersionTracker objv_tracker;
766 rgw_pubsub_topics topics;
11fdf7f2 767
f67539c2 768 int ret = ps->read_topics(&topics, &objv_tracker);
11fdf7f2 769 if (ret < 0) {
b3b6e05e 770 ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
eafe8130 771 return ret != -ENOENT ? ret : -EINVAL;
11fdf7f2
TL
772 }
773
774 auto iter = topics.topics.find(topic);
775 if (iter == topics.topics.end()) {
b3b6e05e 776 ldpp_dout(dpp, 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
eafe8130 777 return -EINVAL;
11fdf7f2
TL
778 }
779
780 auto& t = iter->second;
781
782 rgw_pubsub_sub_config sub_conf;
783
f67539c2 784 sub_conf.user = rgw_user("", ps->tenant);
11fdf7f2
TL
785 sub_conf.name = sub;
786 sub_conf.topic = topic;
787 sub_conf.dest = dest;
eafe8130 788 sub_conf.s3_id = s3_id;
11fdf7f2
TL
789
790 t.subs.insert(sub);
791
b3b6e05e 792 ret = ps->write_topics(dpp, topics, &objv_tracker, y);
11fdf7f2 793 if (ret < 0) {
b3b6e05e 794 ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
795 return ret;
796 }
797
b3b6e05e 798 ret = write_sub(dpp, sub_conf, nullptr, y);
11fdf7f2 799 if (ret < 0) {
b3b6e05e 800 ldpp_dout(dpp, 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
11fdf7f2
TL
801 return ret;
802 }
803 return 0;
804}
805
b3b6e05e 806int RGWPubSub::Sub::unsubscribe(const DoutPrefixProvider *dpp, const string& _topic, optional_yield y)
11fdf7f2
TL
807{
808 string topic = _topic;
809 RGWObjVersionTracker sobjv_tracker;
11fdf7f2
TL
810
811 if (topic.empty()) {
812 rgw_pubsub_sub_config sub_conf;
813 int ret = read_sub(&sub_conf, &sobjv_tracker);
814 if (ret < 0) {
b3b6e05e 815 ldpp_dout(dpp, 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
11fdf7f2
TL
816 return ret;
817 }
818 topic = sub_conf.topic;
819 }
820
821 RGWObjVersionTracker objv_tracker;
f67539c2 822 rgw_pubsub_topics topics;
11fdf7f2 823
f67539c2 824 int ret = ps->read_topics(&topics, &objv_tracker);
11fdf7f2 825 if (ret < 0) {
eafe8130 826 // not an error - could be that topic was already deleted
b3b6e05e 827 ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
eafe8130 828 } else {
11fdf7f2
TL
829 auto iter = topics.topics.find(topic);
830 if (iter != topics.topics.end()) {
831 auto& t = iter->second;
832
833 t.subs.erase(sub);
834
b3b6e05e 835 ret = ps->write_topics(dpp, topics, &objv_tracker, y);
11fdf7f2 836 if (ret < 0) {
b3b6e05e 837 ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
838 return ret;
839 }
840 }
841 }
842
b3b6e05e 843 ret = remove_sub(dpp, &sobjv_tracker, y);
11fdf7f2 844 if (ret < 0) {
b3b6e05e 845 ldpp_dout(dpp, 1) << "ERROR: failed to delete subscription info: ret=" << ret << dendl;
11fdf7f2
TL
846 return ret;
847 }
848 return 0;
849}
850
eafe8130 851template<typename EventType>
f67539c2 852void RGWPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
11fdf7f2
TL
853{
854 encode_json("next_marker", next_marker, f);
855 encode_json("is_truncated", is_truncated, f);
856
eafe8130 857 Formatter::ArraySection s(*f, EventType::json_type_plural);
11fdf7f2 858 for (auto& event : events) {
92f5a8d4 859 encode_json("", event, f);
11fdf7f2
TL
860 }
861}
862
eafe8130 863template<typename EventType>
b3b6e05e 864int RGWPubSub::SubWithEvents<EventType>::list_events(const DoutPrefixProvider *dpp, const string& marker, int max_events)
11fdf7f2 865{
9f95a23c 866 RGWRados *store = ps->store->getRados();
11fdf7f2
TL
867 rgw_pubsub_sub_config sub_conf;
868 int ret = get_conf(&sub_conf);
869 if (ret < 0) {
b3b6e05e 870 ldpp_dout(dpp, 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
11fdf7f2
TL
871 return ret;
872 }
873
874 RGWBucketInfo bucket_info;
875 string tenant;
9f95a23c 876 ret = store->get_bucket_info(&store->svc, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr);
11fdf7f2 877 if (ret == -ENOENT) {
eafe8130 878 list.is_truncated = false;
11fdf7f2
TL
879 return 0;
880 }
881 if (ret < 0) {
b3b6e05e 882 ldpp_dout(dpp, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
11fdf7f2
TL
883 return ret;
884 }
885
886 RGWRados::Bucket target(store, bucket_info);
887 RGWRados::Bucket::List list_op(&target);
888
889 list_op.params.prefix = sub_conf.dest.oid_prefix;
890 list_op.params.marker = marker;
891
eafe8130 892 std::vector<rgw_bucket_dir_entry> objs;
11fdf7f2 893
b3b6e05e 894 ret = list_op.list_objects(dpp, max_events, &objs, nullptr, &list.is_truncated, null_yield);
11fdf7f2 895 if (ret < 0) {
b3b6e05e 896 ldpp_dout(dpp, 1) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
11fdf7f2
TL
897 return ret;
898 }
eafe8130
TL
899 if (list.is_truncated) {
900 list.next_marker = list_op.get_next_marker().name;
11fdf7f2
TL
901 }
902
903 for (auto& obj : objs) {
904 bufferlist bl64;
905 bufferlist bl;
906 bl64.append(obj.meta.user_data);
907 try {
908 bl.decode_base64(bl64);
909 } catch (buffer::error& err) {
b3b6e05e 910 ldpp_dout(dpp, 1) << "ERROR: failed to event (not a valid base64)" << dendl;
11fdf7f2
TL
911 continue;
912 }
eafe8130 913 EventType event;
11fdf7f2
TL
914
915 auto iter = bl.cbegin();
916 try {
917 decode(event, iter);
918 } catch (buffer::error& err) {
b3b6e05e 919 ldpp_dout(dpp, 1) << "ERROR: failed to decode event" << dendl;
11fdf7f2
TL
920 continue;
921 };
922
eafe8130 923 list.events.push_back(event);
11fdf7f2
TL
924 }
925 return 0;
926}
927
eafe8130 928template<typename EventType>
b3b6e05e 929int RGWPubSub::SubWithEvents<EventType>::remove_event(const DoutPrefixProvider *dpp, const string& event_id)
11fdf7f2 930{
20effc67 931 rgw::sal::RadosStore* store = ps->store;
11fdf7f2
TL
932 rgw_pubsub_sub_config sub_conf;
933 int ret = get_conf(&sub_conf);
934 if (ret < 0) {
b3b6e05e 935 ldpp_dout(dpp, 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
11fdf7f2
TL
936 return ret;
937 }
938
939 RGWBucketInfo bucket_info;
940 string tenant;
9f95a23c 941 ret = store->getRados()->get_bucket_info(store->svc(), tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr);
11fdf7f2 942 if (ret < 0) {
b3b6e05e 943 ldpp_dout(dpp, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
11fdf7f2
TL
944 return ret;
945 }
946
947 rgw_bucket& bucket = bucket_info.bucket;
948
949 RGWObjectCtx obj_ctx(store);
950 rgw_obj obj(bucket, sub_conf.dest.oid_prefix + event_id);
951
952 obj_ctx.set_atomic(obj);
953
9f95a23c 954 RGWRados::Object del_target(store->getRados(), bucket_info, obj_ctx, obj);
11fdf7f2
TL
955 RGWRados::Object::Delete del_op(&del_target);
956
957 del_op.params.bucket_owner = bucket_info.owner;
958 del_op.params.versioning_status = bucket_info.versioning_status();
959
b3b6e05e 960 ret = del_op.delete_obj(null_yield, dpp);
11fdf7f2 961 if (ret < 0) {
b3b6e05e 962 ldpp_dout(dpp, 1) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl;
11fdf7f2
TL
963 }
964 return 0;
965}
eafe8130 966
f67539c2
TL
967void RGWPubSub::get_meta_obj(rgw_raw_obj *obj) const {
968 *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, meta_oid());
9f95a23c
TL
969}
970
f67539c2 971void RGWPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
9f95a23c
TL
972 *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
973}
974
f67539c2 975void RGWPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
9f95a23c
TL
976 *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
977}
978
eafe8130 979template<typename EventType>
f67539c2 980void RGWPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
eafe8130
TL
981 list.dump(f);
982}
983
984// explicit instantiation for the only two possible types
985// no need to move implementation to header
f67539c2
TL
986template class RGWPubSub::SubWithEvents<rgw_pubsub_event>;
987template class RGWPubSub::SubWithEvents<rgw_pubsub_s3_event>;
eafe8130 988