]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_pubsub.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / rgw / rgw_pubsub.cc
CommitLineData
eafe8130
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4#include "services/svc_zone.h"
11fdf7f2
TL
5#include "rgw_b64.h"
6#include "rgw_rados.h"
7#include "rgw_pubsub.h"
8#include "rgw_tools.h"
eafe8130
TL
9#include "rgw_xml.h"
10#include "rgw_arn.h"
11#include "rgw_pubsub_push.h"
12#include "rgw_rados.h"
13#include <regex>
14#include <algorithm>
11fdf7f2
TL
15
16#define dout_subsys ceph_subsys_rgw
17
92f5a8d4
TL
18void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) {
19 char buf[64];
20 const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str());
21 if (len > 0) {
22 id.assign(buf, len);
23 }
24}
25
eafe8130
TL
26bool rgw_s3_key_filter::decode_xml(XMLObj* obj) {
27 XMLObjIter iter = obj->find("FilterRule");
28 XMLObj *o;
29
30 const auto throw_if_missing = true;
31 auto prefix_not_set = true;
32 auto suffix_not_set = true;
33 auto regex_not_set = true;
34 std::string name;
35
36 while ((o = iter.get_next())) {
37 RGWXMLDecoder::decode_xml("Name", name, o, throw_if_missing);
38 if (name == "prefix" && prefix_not_set) {
39 prefix_not_set = false;
40 RGWXMLDecoder::decode_xml("Value", prefix_rule, o, throw_if_missing);
41 } else if (name == "suffix" && suffix_not_set) {
42 suffix_not_set = false;
43 RGWXMLDecoder::decode_xml("Value", suffix_rule, o, throw_if_missing);
44 } else if (name == "regex" && regex_not_set) {
45 regex_not_set = false;
46 RGWXMLDecoder::decode_xml("Value", regex_rule, o, throw_if_missing);
47 } else {
48 throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name + "'");
49 }
50 }
51 return true;
52}
53
54void rgw_s3_key_filter::dump_xml(Formatter *f) const {
55 if (!prefix_rule.empty()) {
56 f->open_object_section("FilterRule");
57 ::encode_xml("Name", "prefix", f);
58 ::encode_xml("Value", prefix_rule, f);
59 f->close_section();
60 }
61 if (!suffix_rule.empty()) {
62 f->open_object_section("FilterRule");
63 ::encode_xml("Name", "suffix", f);
64 ::encode_xml("Value", suffix_rule, f);
65 f->close_section();
66 }
67 if (!regex_rule.empty()) {
68 f->open_object_section("FilterRule");
69 ::encode_xml("Name", "regex", f);
70 ::encode_xml("Value", regex_rule, f);
71 f->close_section();
72 }
73}
74
75bool rgw_s3_key_filter::has_content() const {
76 return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty());
77}
78
79bool rgw_s3_metadata_filter::decode_xml(XMLObj* obj) {
80 metadata.clear();
81 XMLObjIter iter = obj->find("FilterRule");
82 XMLObj *o;
83
84 const auto throw_if_missing = true;
85
86 std::string key;
87 std::string value;
88
89 while ((o = iter.get_next())) {
90 RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing);
91 RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing);
92 metadata.emplace(key, value);
93 }
94 return true;
95}
96
97void rgw_s3_metadata_filter::dump_xml(Formatter *f) const {
98 for (const auto& key_value : metadata) {
99 f->open_object_section("FilterRule");
100 ::encode_xml("Name", key_value.first, f);
101 ::encode_xml("Value", key_value.second, f);
102 f->close_section();
103 }
104}
105
106bool rgw_s3_metadata_filter::has_content() const {
107 return !metadata.empty();
108}
109
110bool rgw_s3_filter::decode_xml(XMLObj* obj) {
111 RGWXMLDecoder::decode_xml("S3Key", key_filter, obj);
112 RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj);
113 return true;
114}
115
116void rgw_s3_filter::dump_xml(Formatter *f) const {
117 if (key_filter.has_content()) {
118 ::encode_xml("S3Key", key_filter, f);
119 }
120 if (metadata_filter.has_content()) {
121 ::encode_xml("S3Metadata", metadata_filter, f);
122 }
123}
124
125bool rgw_s3_filter::has_content() const {
126 return key_filter.has_content() ||
127 metadata_filter.has_content();
128}
129
130bool match(const rgw_s3_key_filter& filter, const std::string& key) {
131 const auto key_size = key.size();
132 const auto prefix_size = filter.prefix_rule.size();
133 if (prefix_size != 0) {
134 // prefix rule exists
135 if (prefix_size > key_size) {
136 // if prefix is longer than key, we fail
137 return false;
138 }
139 if (!std::equal(filter.prefix_rule.begin(), filter.prefix_rule.end(), key.begin())) {
140 return false;
141 }
142 }
143 const auto suffix_size = filter.suffix_rule.size();
144 if (suffix_size != 0) {
145 // suffix rule exists
146 if (suffix_size > key_size) {
147 // if suffix is longer than key, we fail
148 return false;
149 }
150 if (!std::equal(filter.suffix_rule.begin(), filter.suffix_rule.end(), (key.end() - suffix_size))) {
151 return false;
152 }
153 }
154 if (!filter.regex_rule.empty()) {
155 // TODO add regex chaching in the filter
156 const std::regex base_regex(filter.regex_rule);
157 if (!std::regex_match(key, base_regex)) {
158 return false;
159 }
160 }
161 return true;
162}
163
164bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata) {
165 // all filter pairs must exist with the same value in the object's metadata
166 // object metadata may include items not in the filter
167 return std::includes(metadata.begin(), metadata.end(), filter.metadata.begin(), filter.metadata.end());
168}
169
170bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) {
171 // if event list exists, and none of the events in the list matches the event type, filter the message
172 if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) {
173 return false;
174 }
175 return true;
176}
177
178void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj) {
179 l.clear();
180
181 XMLObjIter iter = obj->find(name);
182 XMLObj *o;
183
184 while ((o = iter.get_next())) {
185 std::string val;
186 decode_xml_obj(val, o);
187 l.push_back(rgw::notify::from_string(val));
188 }
189}
190
191bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) {
192 const auto throw_if_missing = true;
193 RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing);
194
195 RGWXMLDecoder::decode_xml("Topic", topic_arn, obj, throw_if_missing);
196
197 RGWXMLDecoder::decode_xml("Filter", filter, obj);
198
199 do_decode_xml_obj(events, "Event", obj);
200 if (events.empty()) {
201 // if no events are provided, we assume all events
202 events.push_back(rgw::notify::ObjectCreated);
203 events.push_back(rgw::notify::ObjectRemoved);
204 }
205 return true;
206}
207
208void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const {
209 ::encode_xml("Id", id, f);
210 ::encode_xml("Topic", topic_arn.c_str(), f);
211 if (filter.has_content()) {
212 ::encode_xml("Filter", filter, f);
213 }
214 for (const auto& event : events) {
215 ::encode_xml("Event", rgw::notify::to_string(event), f);
216 }
217}
218
219bool rgw_pubsub_s3_notifications::decode_xml(XMLObj *obj) {
220 do_decode_xml_obj(list, "TopicConfiguration", obj);
221 if (list.empty()) {
222 throw RGWXMLDecoder::err("at least one 'TopicConfiguration' must exist");
223 }
224 return true;
225}
226
227rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter) :
228 id(topic_filter.s3_id), events(topic_filter.events), topic_arn(topic_filter.topic.arn), filter(topic_filter.s3_filter) {}
229
230void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const {
231 do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f);
232}
233
234void rgw_pubsub_s3_record::dump(Formatter *f) const {
235 encode_json("eventVersion", eventVersion, f);
236 encode_json("eventSource", eventSource, f);
237 encode_json("awsRegion", awsRegion, f);
238 utime_t ut(eventTime);
239 encode_json("eventTime", ut, f);
240 encode_json("eventName", eventName, f);
241 {
242 Formatter::ObjectSection s(*f, "userIdentity");
243 encode_json("principalId", userIdentity, f);
244 }
245 {
246 Formatter::ObjectSection s(*f, "requestParameters");
247 encode_json("sourceIPAddress", sourceIPAddress, f);
248 }
249 {
250 Formatter::ObjectSection s(*f, "responseElements");
251 encode_json("x-amz-request-id", x_amz_request_id, f);
252 encode_json("x-amz-id-2", x_amz_id_2, f);
253 }
254 {
255 Formatter::ObjectSection s(*f, "s3");
256 encode_json("s3SchemaVersion", s3SchemaVersion, f);
257 encode_json("configurationId", configurationId, f);
258 {
259 Formatter::ObjectSection sub_s(*f, "bucket");
260 encode_json("name", bucket_name, f);
261 {
262 Formatter::ObjectSection sub_sub_s(*f, "ownerIdentity");
263 encode_json("principalId", bucket_ownerIdentity, f);
264 }
265 encode_json("arn", bucket_arn, f);
266 encode_json("id", bucket_id, f);
267 }
268 {
269 Formatter::ObjectSection sub_s(*f, "object");
270 encode_json("key", object_key, f);
271 encode_json("size", object_size, f);
272 encode_json("etag", object_etag, f);
273 encode_json("versionId", object_versionId, f);
274 encode_json("sequencer", object_sequencer, f);
275 encode_json("metadata", x_meta_map, f);
276 }
277 }
278 encode_json("eventId", id, f);
279}
11fdf7f2
TL
280
281void rgw_pubsub_event::dump(Formatter *f) const
282{
283 encode_json("id", id, f);
eafe8130 284 encode_json("event", event_name, f);
11fdf7f2
TL
285 utime_t ut(timestamp);
286 encode_json("timestamp", ut, f);
287 encode_json("info", info, f);
288}
289
290void rgw_pubsub_topic::dump(Formatter *f) const
291{
292 encode_json("user", user, f);
293 encode_json("name", name, f);
eafe8130
TL
294 encode_json("dest", dest, f);
295 encode_json("arn", arn, f);
296}
297
298void rgw_pubsub_topic::dump_xml(Formatter *f) const
299{
300 encode_xml("User", user, f);
301 encode_xml("Name", name, f);
302 encode_xml("EndPoint", dest, f);
303 encode_xml("TopicArn", arn, f);
304}
305
306void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f)
307{
308 f->open_array_section(name);
309 for (auto iter = l.cbegin(); iter != l.cend(); ++iter) {
310 f->dump_string("obj", rgw::notify::to_ceph_string(*iter));
311 }
312 f->close_section();
11fdf7f2
TL
313}
314
315void rgw_pubsub_topic_filter::dump(Formatter *f) const
316{
317 encode_json("topic", topic, f);
318 encode_json("events", events, f);
319}
320
321void rgw_pubsub_topic_subs::dump(Formatter *f) const
322{
323 encode_json("topic", topic, f);
324 encode_json("subs", subs, f);
325}
326
327void rgw_pubsub_bucket_topics::dump(Formatter *f) const
328{
329 Formatter::ArraySection s(*f, "topics");
330 for (auto& t : topics) {
331 encode_json(t.first.c_str(), t.second, f);
332 }
333}
334
335void rgw_pubsub_user_topics::dump(Formatter *f) const
336{
337 Formatter::ArraySection s(*f, "topics");
338 for (auto& t : topics) {
339 encode_json(t.first.c_str(), t.second, f);
340 }
341}
342
eafe8130
TL
343void rgw_pubsub_user_topics::dump_xml(Formatter *f) const
344{
345 for (auto& t : topics) {
346 encode_xml("member", t.second.topic, f);
347 }
348}
349
11fdf7f2
TL
350void rgw_pubsub_sub_dest::dump(Formatter *f) const
351{
352 encode_json("bucket_name", bucket_name, f);
353 encode_json("oid_prefix", oid_prefix, f);
354 encode_json("push_endpoint", push_endpoint, f);
eafe8130
TL
355 encode_json("push_endpoint_args", push_endpoint_args, f);
356 encode_json("push_endpoint_topic", arn_topic, f);
357}
358
359void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const
360{
361 encode_xml("EndpointAddress", push_endpoint, f);
362 encode_xml("EndpointArgs", push_endpoint_args, f);
363 encode_xml("EndpointTopic", arn_topic, f);
11fdf7f2
TL
364}
365
366void rgw_pubsub_sub_config::dump(Formatter *f) const
367{
368 encode_json("user", user, f);
369 encode_json("name", name, f);
370 encode_json("topic", topic, f);
371 encode_json("dest", dest, f);
eafe8130 372 encode_json("s3_id", s3_id, f);
11fdf7f2
TL
373}
374
375
376int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
377{
378 int ret = rgw_delete_system_obj(store, obj.pool, obj.oid, objv_tracker);
379 if (ret < 0) {
380 return ret;
381 }
382
383 return 0;
384}
385
386int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker)
387{
388 int ret = read(user_meta_obj, result, objv_tracker);
eafe8130
TL
389 if (ret < 0) {
390 ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
11fdf7f2
TL
391 return ret;
392 }
393 return 0;
394}
395
396int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker)
397{
398 int ret = write(user_meta_obj, topics, objv_tracker);
399 if (ret < 0 && ret != -ENOENT) {
eafe8130 400 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
401 return ret;
402 }
403 return 0;
404}
405
406int RGWUserPubSub::get_user_topics(rgw_pubsub_user_topics *result)
407{
408 return read_user_topics(result, nullptr);
409}
410
411int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
412{
413 int ret = ps->read(bucket_meta_obj, result, objv_tracker);
414 if (ret < 0 && ret != -ENOENT) {
eafe8130 415 ldout(ps->store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
11fdf7f2
TL
416 return ret;
417 }
418 return 0;
419}
420
421int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker)
422{
423 int ret = ps->write(bucket_meta_obj, topics, objv_tracker);
424 if (ret < 0) {
eafe8130 425 ldout(ps->store->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
11fdf7f2
TL
426 return ret;
427 }
428
429 return 0;
430}
431
432int RGWUserPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
433{
434 return read_topics(result, nullptr);
435}
436
437int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
438{
439 rgw_pubsub_user_topics topics;
440 int ret = get_user_topics(&topics);
441 if (ret < 0) {
eafe8130 442 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
11fdf7f2
TL
443 return ret;
444 }
445
446 auto iter = topics.topics.find(name);
447 if (iter == topics.topics.end()) {
eafe8130 448 ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
11fdf7f2
TL
449 return -ENOENT;
450 }
451
452 *result = iter->second;
453 return 0;
454}
455
eafe8130 456int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
11fdf7f2 457{
eafe8130
TL
458 rgw_pubsub_user_topics topics;
459 int ret = get_user_topics(&topics);
460 if (ret < 0) {
461 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
462 return ret;
463 }
464
465 auto iter = topics.topics.find(name);
466 if (iter == topics.topics.end()) {
467 ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
468 return -ENOENT;
469 }
470
471 *result = iter->second.topic;
472 return 0;
473}
474
475int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events) {
476 return create_notification(topic_name, events, std::nullopt, "");
477}
478
479int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name) {
11fdf7f2
TL
480 rgw_pubsub_topic_subs user_topic_info;
481 RGWRados *store = ps->store;
482
483 int ret = ps->get_topic(topic_name, &user_topic_info);
484 if (ret < 0) {
eafe8130 485 ldout(store->ctx(), 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
11fdf7f2
TL
486 return ret;
487 }
eafe8130 488 ldout(store->ctx(), 20) << "successfully read topic '" << topic_name << "' info" << dendl;
11fdf7f2
TL
489
490 RGWObjVersionTracker objv_tracker;
491 rgw_pubsub_bucket_topics bucket_topics;
492
493 ret = read_topics(&bucket_topics, &objv_tracker);
eafe8130
TL
494 if (ret < 0) {
495 ldout(store->ctx(), 1) << "ERROR: failed to read topics from bucket '" <<
496 bucket.name << "': ret=" << ret << dendl;
11fdf7f2
TL
497 return ret;
498 }
eafe8130
TL
499 ldout(store->ctx(), 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" <<
500 bucket.name << "'" << dendl;
11fdf7f2
TL
501
502 auto& topic_filter = bucket_topics.topics[topic_name];
503 topic_filter.topic = user_topic_info.topic;
504 topic_filter.events = events;
eafe8130
TL
505 topic_filter.s3_id = notif_name;
506 if (s3_filter) {
507 topic_filter.s3_filter = *s3_filter;
508 }
11fdf7f2
TL
509
510 ret = write_topics(bucket_topics, &objv_tracker);
511 if (ret < 0) {
eafe8130 512 ldout(store->ctx(), 1) << "ERROR: failed to write topics to bucket '" << bucket.name << "': ret=" << ret << dendl;
11fdf7f2
TL
513 return ret;
514 }
eafe8130
TL
515
516 ldout(store->ctx(), 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket.name << "'" << dendl;
11fdf7f2
TL
517
518 return 0;
519}
520
521int RGWUserPubSub::Bucket::remove_notification(const string& topic_name)
522{
523 rgw_pubsub_topic_subs user_topic_info;
524 RGWRados *store = ps->store;
525
526 int ret = ps->get_topic(topic_name, &user_topic_info);
527 if (ret < 0) {
eafe8130 528 ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl;
11fdf7f2
TL
529 return ret;
530 }
531
532 RGWObjVersionTracker objv_tracker;
533 rgw_pubsub_bucket_topics bucket_topics;
534
535 ret = read_topics(&bucket_topics, &objv_tracker);
eafe8130
TL
536 if (ret < 0) {
537 ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
11fdf7f2
TL
538 return ret;
539 }
540
541 bucket_topics.topics.erase(topic_name);
542
543 ret = write_topics(bucket_topics, &objv_tracker);
544 if (ret < 0) {
eafe8130 545 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
546 return ret;
547 }
548
549 return 0;
550}
551
eafe8130
TL
552int RGWUserPubSub::create_topic(const string& name) {
553 return create_topic(name, rgw_pubsub_sub_dest(), "");
554}
555
556int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn) {
11fdf7f2
TL
557 RGWObjVersionTracker objv_tracker;
558 rgw_pubsub_user_topics topics;
559
560 int ret = read_user_topics(&topics, &objv_tracker);
561 if (ret < 0 && ret != -ENOENT) {
eafe8130
TL
562 // its not an error if not topics exist, we create one
563 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
11fdf7f2
TL
564 return ret;
565 }
eafe8130 566
11fdf7f2
TL
567 rgw_pubsub_topic_subs& new_topic = topics.topics[name];
568 new_topic.topic.user = user;
569 new_topic.topic.name = name;
eafe8130
TL
570 new_topic.topic.dest = dest;
571 new_topic.topic.arn = arn;
11fdf7f2
TL
572
573 ret = write_user_topics(topics, &objv_tracker);
574 if (ret < 0) {
eafe8130 575 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
576 return ret;
577 }
578
579 return 0;
580}
581
582int RGWUserPubSub::remove_topic(const string& name)
583{
584 RGWObjVersionTracker objv_tracker;
585 rgw_pubsub_user_topics topics;
586
587 int ret = read_user_topics(&topics, &objv_tracker);
588 if (ret < 0 && ret != -ENOENT) {
eafe8130 589 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
11fdf7f2 590 return ret;
eafe8130
TL
591 } else if (ret == -ENOENT) {
592 // its not an error if no topics exist, just a no-op
593 ldout(store->ctx(), 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret << dendl;
594 return 0;
11fdf7f2
TL
595 }
596
597 topics.topics.erase(name);
598
599 ret = write_user_topics(topics, &objv_tracker);
600 if (ret < 0) {
eafe8130 601 ldout(store->ctx(), 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl;
11fdf7f2
TL
602 return ret;
603 }
604
605 return 0;
606}
607
608int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
609{
610 int ret = ps->read(sub_meta_obj, result, objv_tracker);
611 if (ret < 0 && ret != -ENOENT) {
eafe8130 612 ldout(ps->store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
11fdf7f2
TL
613 return ret;
614 }
615 return 0;
616}
617
618int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker)
619{
620 int ret = ps->write(sub_meta_obj, sub_conf, objv_tracker);
621 if (ret < 0) {
eafe8130 622 ldout(ps->store->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
11fdf7f2
TL
623 return ret;
624 }
625
626 return 0;
627}
628
629int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker)
630{
631 int ret = ps->remove(sub_meta_obj, objv_tracker);
632 if (ret < 0) {
eafe8130 633 ldout(ps->store->ctx(), 1) << "ERROR: failed to remove subscription info: ret=" << ret << dendl;
11fdf7f2
TL
634 return ret;
635 }
636
637 return 0;
638}
639
640int RGWUserPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
641{
642 return read_sub(result, nullptr);
643}
644
eafe8130 645int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, const std::string& s3_id)
11fdf7f2
TL
646{
647 RGWObjVersionTracker user_objv_tracker;
648 rgw_pubsub_user_topics topics;
649 RGWRados *store = ps->store;
650
651 int ret = ps->read_user_topics(&topics, &user_objv_tracker);
652 if (ret < 0) {
eafe8130
TL
653 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
654 return ret != -ENOENT ? ret : -EINVAL;
11fdf7f2
TL
655 }
656
657 auto iter = topics.topics.find(topic);
658 if (iter == topics.topics.end()) {
eafe8130
TL
659 ldout(store->ctx(), 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
660 return -EINVAL;
11fdf7f2
TL
661 }
662
663 auto& t = iter->second;
664
665 rgw_pubsub_sub_config sub_conf;
666
667 sub_conf.user = ps->user;
668 sub_conf.name = sub;
669 sub_conf.topic = topic;
670 sub_conf.dest = dest;
eafe8130 671 sub_conf.s3_id = s3_id;
11fdf7f2
TL
672
673 t.subs.insert(sub);
674
675 ret = ps->write_user_topics(topics, &user_objv_tracker);
676 if (ret < 0) {
eafe8130 677 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
678 return ret;
679 }
680
681 ret = write_sub(sub_conf, nullptr);
682 if (ret < 0) {
eafe8130 683 ldout(store->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
11fdf7f2
TL
684 return ret;
685 }
686 return 0;
687}
688
689int RGWUserPubSub::Sub::unsubscribe(const string& _topic)
690{
691 string topic = _topic;
692 RGWObjVersionTracker sobjv_tracker;
693 RGWRados *store = ps->store;
694
695 if (topic.empty()) {
696 rgw_pubsub_sub_config sub_conf;
697 int ret = read_sub(&sub_conf, &sobjv_tracker);
698 if (ret < 0) {
eafe8130 699 ldout(store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
11fdf7f2
TL
700 return ret;
701 }
702 topic = sub_conf.topic;
703 }
704
705 RGWObjVersionTracker objv_tracker;
706 rgw_pubsub_user_topics topics;
707
708 int ret = ps->read_user_topics(&topics, &objv_tracker);
709 if (ret < 0) {
eafe8130
TL
710 // not an error - could be that topic was already deleted
711 ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
712 } else {
11fdf7f2
TL
713 auto iter = topics.topics.find(topic);
714 if (iter != topics.topics.end()) {
715 auto& t = iter->second;
716
717 t.subs.erase(sub);
718
719 ret = ps->write_user_topics(topics, &objv_tracker);
720 if (ret < 0) {
eafe8130 721 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
11fdf7f2
TL
722 return ret;
723 }
724 }
725 }
726
727 ret = remove_sub(&sobjv_tracker);
728 if (ret < 0) {
eafe8130 729 ldout(store->ctx(), 1) << "ERROR: failed to delete subscription info: ret=" << ret << dendl;
11fdf7f2
TL
730 return ret;
731 }
732 return 0;
733}
734
eafe8130
TL
735template<typename EventType>
736void RGWUserPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
11fdf7f2
TL
737{
738 encode_json("next_marker", next_marker, f);
739 encode_json("is_truncated", is_truncated, f);
740
eafe8130 741 Formatter::ArraySection s(*f, EventType::json_type_plural);
11fdf7f2 742 for (auto& event : events) {
92f5a8d4 743 encode_json("", event, f);
11fdf7f2
TL
744 }
745}
746
eafe8130
TL
747template<typename EventType>
748int RGWUserPubSub::SubWithEvents<EventType>::list_events(const string& marker, int max_events)
11fdf7f2
TL
749{
750 RGWRados *store = ps->store;
751 rgw_pubsub_sub_config sub_conf;
752 int ret = get_conf(&sub_conf);
753 if (ret < 0) {
eafe8130 754 ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
11fdf7f2
TL
755 return ret;
756 }
757
758 RGWBucketInfo bucket_info;
759 string tenant;
760 RGWSysObjectCtx obj_ctx(store->svc.sysobj->init_obj_ctx());
761 ret = store->get_bucket_info(obj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr);
762 if (ret == -ENOENT) {
eafe8130 763 list.is_truncated = false;
11fdf7f2
TL
764 return 0;
765 }
766 if (ret < 0) {
eafe8130 767 ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
11fdf7f2
TL
768 return ret;
769 }
770
771 RGWRados::Bucket target(store, bucket_info);
772 RGWRados::Bucket::List list_op(&target);
773
774 list_op.params.prefix = sub_conf.dest.oid_prefix;
775 list_op.params.marker = marker;
776
eafe8130 777 std::vector<rgw_bucket_dir_entry> objs;
11fdf7f2 778
eafe8130 779 ret = list_op.list_objects(max_events, &objs, nullptr, &list.is_truncated);
11fdf7f2 780 if (ret < 0) {
eafe8130 781 ldout(store->ctx(), 1) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
11fdf7f2
TL
782 return ret;
783 }
eafe8130
TL
784 if (list.is_truncated) {
785 list.next_marker = list_op.get_next_marker().name;
11fdf7f2
TL
786 }
787
788 for (auto& obj : objs) {
789 bufferlist bl64;
790 bufferlist bl;
791 bl64.append(obj.meta.user_data);
792 try {
793 bl.decode_base64(bl64);
794 } catch (buffer::error& err) {
eafe8130 795 ldout(store->ctx(), 1) << "ERROR: failed to event (not a valid base64)" << dendl;
11fdf7f2
TL
796 continue;
797 }
eafe8130 798 EventType event;
11fdf7f2
TL
799
800 auto iter = bl.cbegin();
801 try {
802 decode(event, iter);
803 } catch (buffer::error& err) {
eafe8130 804 ldout(store->ctx(), 1) << "ERROR: failed to decode event" << dendl;
11fdf7f2
TL
805 continue;
806 };
807
eafe8130 808 list.events.push_back(event);
11fdf7f2
TL
809 }
810 return 0;
811}
812
eafe8130
TL
813template<typename EventType>
814int RGWUserPubSub::SubWithEvents<EventType>::remove_event(const string& event_id)
11fdf7f2
TL
815{
816 RGWRados *store = ps->store;
817 rgw_pubsub_sub_config sub_conf;
818 int ret = get_conf(&sub_conf);
819 if (ret < 0) {
eafe8130 820 ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
11fdf7f2
TL
821 return ret;
822 }
823
824 RGWBucketInfo bucket_info;
825 string tenant;
826 RGWSysObjectCtx sysobj_ctx(store->svc.sysobj->init_obj_ctx());
827 ret = store->get_bucket_info(sysobj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr);
828 if (ret < 0) {
eafe8130 829 ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
11fdf7f2
TL
830 return ret;
831 }
832
833 rgw_bucket& bucket = bucket_info.bucket;
834
835 RGWObjectCtx obj_ctx(store);
836 rgw_obj obj(bucket, sub_conf.dest.oid_prefix + event_id);
837
838 obj_ctx.set_atomic(obj);
839
840 RGWRados::Object del_target(store, bucket_info, obj_ctx, obj);
841 RGWRados::Object::Delete del_op(&del_target);
842
843 del_op.params.bucket_owner = bucket_info.owner;
844 del_op.params.versioning_status = bucket_info.versioning_status();
845
846 ret = del_op.delete_obj();
847 if (ret < 0) {
eafe8130 848 ldout(store->ctx(), 1) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl;
11fdf7f2
TL
849 }
850 return 0;
851}
eafe8130
TL
852
853template<typename EventType>
854void RGWUserPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
855 list.dump(f);
856}
857
858// explicit instantiation for the only two possible types
859// no need to move implementation to header
860template class RGWUserPubSub::SubWithEvents<rgw_pubsub_event>;
861template class RGWUserPubSub::SubWithEvents<rgw_pubsub_s3_record>;
862