]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_pubsub.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_pubsub.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "services/svc_zone.h"
5 #include "rgw_b64.h"
6 #include "rgw_sal.h"
7 #include "rgw_pubsub.h"
8 #include "rgw_tools.h"
9 #include "rgw_xml.h"
10 #include "rgw_arn.h"
11 #include "rgw_pubsub_push.h"
12 #include "rgw_rados.h"
13 #include <regex>
14 #include <algorithm>
15
16 #define dout_subsys ceph_subsys_rgw
17
18 void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) {
19 char buf[64];
20 const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str());
21 if (len > 0) {
22 id.assign(buf, len);
23 }
24 }
25
26 bool rgw_s3_key_filter::decode_xml(XMLObj* obj) {
27 XMLObjIter iter = obj->find("FilterRule");
28 XMLObj *o;
29
30 const auto throw_if_missing = true;
31 auto prefix_not_set = true;
32 auto suffix_not_set = true;
33 auto regex_not_set = true;
34 std::string name;
35
36 while ((o = iter.get_next())) {
37 RGWXMLDecoder::decode_xml("Name", name, o, throw_if_missing);
38 if (name == "prefix" && prefix_not_set) {
39 prefix_not_set = false;
40 RGWXMLDecoder::decode_xml("Value", prefix_rule, o, throw_if_missing);
41 } else if (name == "suffix" && suffix_not_set) {
42 suffix_not_set = false;
43 RGWXMLDecoder::decode_xml("Value", suffix_rule, o, throw_if_missing);
44 } else if (name == "regex" && regex_not_set) {
45 regex_not_set = false;
46 RGWXMLDecoder::decode_xml("Value", regex_rule, o, throw_if_missing);
47 } else {
48 throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name + "'");
49 }
50 }
51 return true;
52 }
53
54 void rgw_s3_key_filter::dump_xml(Formatter *f) const {
55 if (!prefix_rule.empty()) {
56 f->open_object_section("FilterRule");
57 ::encode_xml("Name", "prefix", f);
58 ::encode_xml("Value", prefix_rule, f);
59 f->close_section();
60 }
61 if (!suffix_rule.empty()) {
62 f->open_object_section("FilterRule");
63 ::encode_xml("Name", "suffix", f);
64 ::encode_xml("Value", suffix_rule, f);
65 f->close_section();
66 }
67 if (!regex_rule.empty()) {
68 f->open_object_section("FilterRule");
69 ::encode_xml("Name", "regex", f);
70 ::encode_xml("Value", regex_rule, f);
71 f->close_section();
72 }
73 }
74
75 bool rgw_s3_key_filter::has_content() const {
76 return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty());
77 }
78
79 bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) {
80 kvl.clear();
81 XMLObjIter iter = obj->find("FilterRule");
82 XMLObj *o;
83
84 const auto throw_if_missing = true;
85
86 std::string key;
87 std::string value;
88
89 while ((o = iter.get_next())) {
90 RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing);
91 RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing);
92 kvl.emplace(key, value);
93 }
94 return true;
95 }
96
97 void rgw_s3_key_value_filter::dump_xml(Formatter *f) const {
98 for (const auto& key_value : kvl) {
99 f->open_object_section("FilterRule");
100 ::encode_xml("Name", key_value.first, f);
101 ::encode_xml("Value", key_value.second, f);
102 f->close_section();
103 }
104 }
105
106 bool rgw_s3_key_value_filter::has_content() const {
107 return !kvl.empty();
108 }
109
110 bool rgw_s3_filter::decode_xml(XMLObj* obj) {
111 RGWXMLDecoder::decode_xml("S3Key", key_filter, obj);
112 RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj);
113 RGWXMLDecoder::decode_xml("S3Tags", tag_filter, obj);
114 return true;
115 }
116
117 void rgw_s3_filter::dump_xml(Formatter *f) const {
118 if (key_filter.has_content()) {
119 ::encode_xml("S3Key", key_filter, f);
120 }
121 if (metadata_filter.has_content()) {
122 ::encode_xml("S3Metadata", metadata_filter, f);
123 }
124 if (tag_filter.has_content()) {
125 ::encode_xml("S3Tags", tag_filter, f);
126 }
127 }
128
129 bool rgw_s3_filter::has_content() const {
130 return key_filter.has_content() ||
131 metadata_filter.has_content() ||
132 tag_filter.has_content();
133 }
134
135 bool match(const rgw_s3_key_filter& filter, const std::string& key) {
136 const auto key_size = key.size();
137 const auto prefix_size = filter.prefix_rule.size();
138 if (prefix_size != 0) {
139 // prefix rule exists
140 if (prefix_size > key_size) {
141 // if prefix is longer than key, we fail
142 return false;
143 }
144 if (!std::equal(filter.prefix_rule.begin(), filter.prefix_rule.end(), key.begin())) {
145 return false;
146 }
147 }
148 const auto suffix_size = filter.suffix_rule.size();
149 if (suffix_size != 0) {
150 // suffix rule exists
151 if (suffix_size > key_size) {
152 // if suffix is longer than key, we fail
153 return false;
154 }
155 if (!std::equal(filter.suffix_rule.begin(), filter.suffix_rule.end(), (key.end() - suffix_size))) {
156 return false;
157 }
158 }
159 if (!filter.regex_rule.empty()) {
160 // TODO add regex chaching in the filter
161 const std::regex base_regex(filter.regex_rule);
162 if (!std::regex_match(key, base_regex)) {
163 return false;
164 }
165 }
166 return true;
167 }
168
169 bool match(const rgw_s3_key_value_filter& filter, const KeyValueList& kvl) {
170 // all filter pairs must exist with the same value in the object's metadata/tags
171 // object metadata/tags may include items not in the filter
172 return std::includes(kvl.begin(), kvl.end(), filter.kvl.begin(), filter.kvl.end());
173 }
174
175 bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) {
176 // if event list exists, and none of the events in the list matches the event type, filter the message
177 if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) {
178 return false;
179 }
180 return true;
181 }
182
183 void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj) {
184 l.clear();
185
186 XMLObjIter iter = obj->find(name);
187 XMLObj *o;
188
189 while ((o = iter.get_next())) {
190 std::string val;
191 decode_xml_obj(val, o);
192 l.push_back(rgw::notify::from_string(val));
193 }
194 }
195
196 bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) {
197 const auto throw_if_missing = true;
198 RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing);
199
200 RGWXMLDecoder::decode_xml("Topic", topic_arn, obj, throw_if_missing);
201
202 RGWXMLDecoder::decode_xml("Filter", filter, obj);
203
204 do_decode_xml_obj(events, "Event", obj);
205 if (events.empty()) {
206 // if no events are provided, we assume all events
207 events.push_back(rgw::notify::ObjectCreated);
208 events.push_back(rgw::notify::ObjectRemoved);
209 }
210 return true;
211 }
212
213 void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const {
214 ::encode_xml("Id", id, f);
215 ::encode_xml("Topic", topic_arn.c_str(), f);
216 if (filter.has_content()) {
217 ::encode_xml("Filter", filter, f);
218 }
219 for (const auto& event : events) {
220 ::encode_xml("Event", rgw::notify::to_string(event), f);
221 }
222 }
223
224 bool rgw_pubsub_s3_notifications::decode_xml(XMLObj *obj) {
225 do_decode_xml_obj(list, "TopicConfiguration", obj);
226 if (list.empty()) {
227 throw RGWXMLDecoder::err("at least one 'TopicConfiguration' must exist");
228 }
229 return true;
230 }
231
232 rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter) :
233 id(topic_filter.s3_id), events(topic_filter.events), topic_arn(topic_filter.topic.arn), filter(topic_filter.s3_filter) {}
234
235 void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const {
236 do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f);
237 }
238
239 void rgw_pubsub_s3_record::dump(Formatter *f) const {
240 encode_json("eventVersion", eventVersion, f);
241 encode_json("eventSource", eventSource, f);
242 encode_json("awsRegion", awsRegion, f);
243 utime_t ut(eventTime);
244 encode_json("eventTime", ut, f);
245 encode_json("eventName", eventName, f);
246 {
247 Formatter::ObjectSection s(*f, "userIdentity");
248 encode_json("principalId", userIdentity, f);
249 }
250 {
251 Formatter::ObjectSection s(*f, "requestParameters");
252 encode_json("sourceIPAddress", sourceIPAddress, f);
253 }
254 {
255 Formatter::ObjectSection s(*f, "responseElements");
256 encode_json("x-amz-request-id", x_amz_request_id, f);
257 encode_json("x-amz-id-2", x_amz_id_2, f);
258 }
259 {
260 Formatter::ObjectSection s(*f, "s3");
261 encode_json("s3SchemaVersion", s3SchemaVersion, f);
262 encode_json("configurationId", configurationId, f);
263 {
264 Formatter::ObjectSection sub_s(*f, "bucket");
265 encode_json("name", bucket_name, f);
266 {
267 Formatter::ObjectSection sub_sub_s(*f, "ownerIdentity");
268 encode_json("principalId", bucket_ownerIdentity, f);
269 }
270 encode_json("arn", bucket_arn, f);
271 encode_json("id", bucket_id, f);
272 }
273 {
274 Formatter::ObjectSection sub_s(*f, "object");
275 encode_json("key", object_key, f);
276 encode_json("size", object_size, f);
277 encode_json("etag", object_etag, f);
278 encode_json("versionId", object_versionId, f);
279 encode_json("sequencer", object_sequencer, f);
280 encode_json("metadata", x_meta_map, f);
281 encode_json("tags", tags, f);
282 }
283 }
284 encode_json("eventId", id, f);
285 encode_json("opaqueData", opaque_data, f);
286 }
287
288 void rgw_pubsub_event::dump(Formatter *f) const
289 {
290 encode_json("id", id, f);
291 encode_json("event", event_name, f);
292 utime_t ut(timestamp);
293 encode_json("timestamp", ut, f);
294 encode_json("info", info, f);
295 }
296
297 void rgw_pubsub_topic::dump(Formatter *f) const
298 {
299 encode_json("user", user, f);
300 encode_json("name", name, f);
301 encode_json("dest", dest, f);
302 encode_json("arn", arn, f);
303 encode_json("opaqueData", opaque_data, f);
304 }
305
306 void rgw_pubsub_topic::dump_xml(Formatter *f) const
307 {
308 encode_xml("User", user, f);
309 encode_xml("Name", name, f);
310 encode_xml("EndPoint", dest, f);
311 encode_xml("TopicArn", arn, f);
312 encode_xml("OpaqueData", opaque_data, f);
313 }
314
315 void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f)
316 {
317 f->open_array_section(name);
318 for (auto iter = l.cbegin(); iter != l.cend(); ++iter) {
319 f->dump_string("obj", rgw::notify::to_ceph_string(*iter));
320 }
321 f->close_section();
322 }
323
324 void rgw_pubsub_topic_filter::dump(Formatter *f) const
325 {
326 encode_json("topic", topic, f);
327 encode_json("events", events, f);
328 }
329
330 void rgw_pubsub_topic_subs::dump(Formatter *f) const
331 {
332 encode_json("topic", topic, f);
333 encode_json("subs", subs, f);
334 }
335
336 void rgw_pubsub_bucket_topics::dump(Formatter *f) const
337 {
338 Formatter::ArraySection s(*f, "topics");
339 for (auto& t : topics) {
340 encode_json(t.first.c_str(), t.second, f);
341 }
342 }
343
344 void rgw_pubsub_user_topics::dump(Formatter *f) const
345 {
346 Formatter::ArraySection s(*f, "topics");
347 for (auto& t : topics) {
348 encode_json(t.first.c_str(), t.second, f);
349 }
350 }
351
352 void rgw_pubsub_user_topics::dump_xml(Formatter *f) const
353 {
354 for (auto& t : topics) {
355 encode_xml("member", t.second.topic, f);
356 }
357 }
358
359 void rgw_pubsub_sub_dest::dump(Formatter *f) const
360 {
361 encode_json("bucket_name", bucket_name, f);
362 encode_json("oid_prefix", oid_prefix, f);
363 encode_json("push_endpoint", push_endpoint, f);
364 encode_json("push_endpoint_args", push_endpoint_args, f);
365 encode_json("push_endpoint_topic", arn_topic, f);
366 }
367
368 void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const
369 {
370 encode_xml("EndpointAddress", push_endpoint, f);
371 encode_xml("EndpointArgs", push_endpoint_args, f);
372 encode_xml("EndpointTopic", arn_topic, f);
373 }
374
375 void rgw_pubsub_sub_config::dump(Formatter *f) const
376 {
377 encode_json("user", user, f);
378 encode_json("name", name, f);
379 encode_json("topic", topic, f);
380 encode_json("dest", dest, f);
381 encode_json("s3_id", s3_id, f);
382 }
383
384 RGWUserPubSub::RGWUserPubSub(rgw::sal::RGWRadosStore* _store, const rgw_user& _user) :
385 store(_store),
386 user(_user),
387 obj_ctx(store->svc()->sysobj->init_obj_ctx()) {
388 get_user_meta_obj(&user_meta_obj);
389 }
390
391 int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
392 {
393 int ret = rgw_delete_system_obj(store->svc()->sysobj, obj.pool, obj.oid, objv_tracker);
394 if (ret < 0) {
395 return ret;
396 }
397
398 return 0;
399 }
400
401 int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker)
402 {
403 int ret = read(user_meta_obj, result, objv_tracker);
404 if (ret < 0) {
405 ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
406 return ret;
407 }
408 return 0;
409 }
410
411 int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker)
412 {
413 int ret = write(user_meta_obj, topics, objv_tracker);
414 if (ret < 0 && ret != -ENOENT) {
415 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
416 return ret;
417 }
418 return 0;
419 }
420
421 int RGWUserPubSub::get_user_topics(rgw_pubsub_user_topics *result)
422 {
423 return read_user_topics(result, nullptr);
424 }
425
426 int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
427 {
428 int ret = ps->read(bucket_meta_obj, result, objv_tracker);
429 if (ret < 0 && ret != -ENOENT) {
430 ldout(ps->store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
431 return ret;
432 }
433 return 0;
434 }
435
436 int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker)
437 {
438 int ret = ps->write(bucket_meta_obj, topics, objv_tracker);
439 if (ret < 0) {
440 ldout(ps->store->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
441 return ret;
442 }
443
444 return 0;
445 }
446
447 int RGWUserPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
448 {
449 return read_topics(result, nullptr);
450 }
451
452 int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
453 {
454 rgw_pubsub_user_topics topics;
455 int ret = get_user_topics(&topics);
456 if (ret < 0) {
457 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
458 return ret;
459 }
460
461 auto iter = topics.topics.find(name);
462 if (iter == topics.topics.end()) {
463 ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
464 return -ENOENT;
465 }
466
467 *result = iter->second;
468 return 0;
469 }
470
471 int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
472 {
473 rgw_pubsub_user_topics topics;
474 int ret = get_user_topics(&topics);
475 if (ret < 0) {
476 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
477 return ret;
478 }
479
480 auto iter = topics.topics.find(name);
481 if (iter == topics.topics.end()) {
482 ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
483 return -ENOENT;
484 }
485
486 *result = iter->second.topic;
487 return 0;
488 }
489
490 int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events) {
491 return create_notification(topic_name, events, std::nullopt, "");
492 }
493
494 int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name) {
495 rgw_pubsub_topic_subs user_topic_info;
496 rgw::sal::RGWRadosStore *store = ps->store;
497
498 int ret = ps->get_topic(topic_name, &user_topic_info);
499 if (ret < 0) {
500 ldout(store->ctx(), 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
501 return ret;
502 }
503 ldout(store->ctx(), 20) << "successfully read topic '" << topic_name << "' info" << dendl;
504
505 RGWObjVersionTracker objv_tracker;
506 rgw_pubsub_bucket_topics bucket_topics;
507
508 ret = read_topics(&bucket_topics, &objv_tracker);
509 if (ret < 0) {
510 ldout(store->ctx(), 1) << "ERROR: failed to read topics from bucket '" <<
511 bucket.name << "': ret=" << ret << dendl;
512 return ret;
513 }
514 ldout(store->ctx(), 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" <<
515 bucket.name << "'" << dendl;
516
517 auto& topic_filter = bucket_topics.topics[topic_name];
518 topic_filter.topic = user_topic_info.topic;
519 topic_filter.events = events;
520 topic_filter.s3_id = notif_name;
521 if (s3_filter) {
522 topic_filter.s3_filter = *s3_filter;
523 }
524
525 ret = write_topics(bucket_topics, &objv_tracker);
526 if (ret < 0) {
527 ldout(store->ctx(), 1) << "ERROR: failed to write topics to bucket '" << bucket.name << "': ret=" << ret << dendl;
528 return ret;
529 }
530
531 ldout(store->ctx(), 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket.name << "'" << dendl;
532
533 return 0;
534 }
535
536 int RGWUserPubSub::Bucket::remove_notification(const string& topic_name)
537 {
538 rgw_pubsub_topic_subs user_topic_info;
539 rgw::sal::RGWRadosStore *store = ps->store;
540
541 int ret = ps->get_topic(topic_name, &user_topic_info);
542 if (ret < 0) {
543 ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl;
544 return ret;
545 }
546
547 RGWObjVersionTracker objv_tracker;
548 rgw_pubsub_bucket_topics bucket_topics;
549
550 ret = read_topics(&bucket_topics, &objv_tracker);
551 if (ret < 0) {
552 ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
553 return ret;
554 }
555
556 bucket_topics.topics.erase(topic_name);
557
558 ret = write_topics(bucket_topics, &objv_tracker);
559 if (ret < 0) {
560 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
561 return ret;
562 }
563
564 return 0;
565 }
566
567 int RGWUserPubSub::create_topic(const string& name) {
568 return create_topic(name, rgw_pubsub_sub_dest(), "", "");
569 }
570
571 int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data) {
572 RGWObjVersionTracker objv_tracker;
573 rgw_pubsub_user_topics topics;
574
575 int ret = read_user_topics(&topics, &objv_tracker);
576 if (ret < 0 && ret != -ENOENT) {
577 // its not an error if not topics exist, we create one
578 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
579 return ret;
580 }
581
582 rgw_pubsub_topic_subs& new_topic = topics.topics[name];
583 new_topic.topic.user = user;
584 new_topic.topic.name = name;
585 new_topic.topic.dest = dest;
586 new_topic.topic.arn = arn;
587 new_topic.topic.opaque_data = opaque_data;
588
589 ret = write_user_topics(topics, &objv_tracker);
590 if (ret < 0) {
591 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
592 return ret;
593 }
594
595 return 0;
596 }
597
598 int RGWUserPubSub::remove_topic(const string& name)
599 {
600 RGWObjVersionTracker objv_tracker;
601 rgw_pubsub_user_topics topics;
602
603 int ret = read_user_topics(&topics, &objv_tracker);
604 if (ret < 0 && ret != -ENOENT) {
605 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
606 return ret;
607 } else if (ret == -ENOENT) {
608 // its not an error if no topics exist, just a no-op
609 ldout(store->ctx(), 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret << dendl;
610 return 0;
611 }
612
613 topics.topics.erase(name);
614
615 ret = write_user_topics(topics, &objv_tracker);
616 if (ret < 0) {
617 ldout(store->ctx(), 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl;
618 return ret;
619 }
620
621 return 0;
622 }
623
624 int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
625 {
626 int ret = ps->read(sub_meta_obj, result, objv_tracker);
627 if (ret < 0 && ret != -ENOENT) {
628 ldout(ps->store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
629 return ret;
630 }
631 return 0;
632 }
633
634 int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker)
635 {
636 int ret = ps->write(sub_meta_obj, sub_conf, objv_tracker);
637 if (ret < 0) {
638 ldout(ps->store->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
639 return ret;
640 }
641
642 return 0;
643 }
644
645 int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker)
646 {
647 int ret = ps->remove(sub_meta_obj, objv_tracker);
648 if (ret < 0) {
649 ldout(ps->store->ctx(), 1) << "ERROR: failed to remove subscription info: ret=" << ret << dendl;
650 return ret;
651 }
652
653 return 0;
654 }
655
656 int RGWUserPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
657 {
658 return read_sub(result, nullptr);
659 }
660
661 int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, const std::string& s3_id)
662 {
663 RGWObjVersionTracker user_objv_tracker;
664 rgw_pubsub_user_topics topics;
665 rgw::sal::RGWRadosStore *store = ps->store;
666
667 int ret = ps->read_user_topics(&topics, &user_objv_tracker);
668 if (ret < 0) {
669 ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
670 return ret != -ENOENT ? ret : -EINVAL;
671 }
672
673 auto iter = topics.topics.find(topic);
674 if (iter == topics.topics.end()) {
675 ldout(store->ctx(), 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
676 return -EINVAL;
677 }
678
679 auto& t = iter->second;
680
681 rgw_pubsub_sub_config sub_conf;
682
683 sub_conf.user = ps->user;
684 sub_conf.name = sub;
685 sub_conf.topic = topic;
686 sub_conf.dest = dest;
687 sub_conf.s3_id = s3_id;
688
689 t.subs.insert(sub);
690
691 ret = ps->write_user_topics(topics, &user_objv_tracker);
692 if (ret < 0) {
693 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
694 return ret;
695 }
696
697 ret = write_sub(sub_conf, nullptr);
698 if (ret < 0) {
699 ldout(store->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
700 return ret;
701 }
702 return 0;
703 }
704
705 int RGWUserPubSub::Sub::unsubscribe(const string& _topic)
706 {
707 string topic = _topic;
708 RGWObjVersionTracker sobjv_tracker;
709 rgw::sal::RGWRadosStore *store = ps->store;
710
711 if (topic.empty()) {
712 rgw_pubsub_sub_config sub_conf;
713 int ret = read_sub(&sub_conf, &sobjv_tracker);
714 if (ret < 0) {
715 ldout(store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
716 return ret;
717 }
718 topic = sub_conf.topic;
719 }
720
721 RGWObjVersionTracker objv_tracker;
722 rgw_pubsub_user_topics topics;
723
724 int ret = ps->read_user_topics(&topics, &objv_tracker);
725 if (ret < 0) {
726 // not an error - could be that topic was already deleted
727 ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
728 } else {
729 auto iter = topics.topics.find(topic);
730 if (iter != topics.topics.end()) {
731 auto& t = iter->second;
732
733 t.subs.erase(sub);
734
735 ret = ps->write_user_topics(topics, &objv_tracker);
736 if (ret < 0) {
737 ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
738 return ret;
739 }
740 }
741 }
742
743 ret = remove_sub(&sobjv_tracker);
744 if (ret < 0) {
745 ldout(store->ctx(), 1) << "ERROR: failed to delete subscription info: ret=" << ret << dendl;
746 return ret;
747 }
748 return 0;
749 }
750
751 template<typename EventType>
752 void RGWUserPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
753 {
754 encode_json("next_marker", next_marker, f);
755 encode_json("is_truncated", is_truncated, f);
756
757 Formatter::ArraySection s(*f, EventType::json_type_plural);
758 for (auto& event : events) {
759 encode_json("", event, f);
760 }
761 }
762
763 template<typename EventType>
764 int RGWUserPubSub::SubWithEvents<EventType>::list_events(const string& marker, int max_events)
765 {
766 RGWRados *store = ps->store->getRados();
767 rgw_pubsub_sub_config sub_conf;
768 int ret = get_conf(&sub_conf);
769 if (ret < 0) {
770 ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
771 return ret;
772 }
773
774 RGWBucketInfo bucket_info;
775 string tenant;
776 ret = store->get_bucket_info(&store->svc, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr);
777 if (ret == -ENOENT) {
778 list.is_truncated = false;
779 return 0;
780 }
781 if (ret < 0) {
782 ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
783 return ret;
784 }
785
786 RGWRados::Bucket target(store, bucket_info);
787 RGWRados::Bucket::List list_op(&target);
788
789 list_op.params.prefix = sub_conf.dest.oid_prefix;
790 list_op.params.marker = marker;
791
792 std::vector<rgw_bucket_dir_entry> objs;
793
794 ret = list_op.list_objects(max_events, &objs, nullptr, &list.is_truncated, null_yield);
795 if (ret < 0) {
796 ldout(store->ctx(), 1) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
797 return ret;
798 }
799 if (list.is_truncated) {
800 list.next_marker = list_op.get_next_marker().name;
801 }
802
803 for (auto& obj : objs) {
804 bufferlist bl64;
805 bufferlist bl;
806 bl64.append(obj.meta.user_data);
807 try {
808 bl.decode_base64(bl64);
809 } catch (buffer::error& err) {
810 ldout(store->ctx(), 1) << "ERROR: failed to event (not a valid base64)" << dendl;
811 continue;
812 }
813 EventType event;
814
815 auto iter = bl.cbegin();
816 try {
817 decode(event, iter);
818 } catch (buffer::error& err) {
819 ldout(store->ctx(), 1) << "ERROR: failed to decode event" << dendl;
820 continue;
821 };
822
823 list.events.push_back(event);
824 }
825 return 0;
826 }
827
828 template<typename EventType>
829 int RGWUserPubSub::SubWithEvents<EventType>::remove_event(const string& event_id)
830 {
831 rgw::sal::RGWRadosStore *store = ps->store;
832 rgw_pubsub_sub_config sub_conf;
833 int ret = get_conf(&sub_conf);
834 if (ret < 0) {
835 ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
836 return ret;
837 }
838
839 RGWBucketInfo bucket_info;
840 string tenant;
841 ret = store->getRados()->get_bucket_info(store->svc(), tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr);
842 if (ret < 0) {
843 ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
844 return ret;
845 }
846
847 rgw_bucket& bucket = bucket_info.bucket;
848
849 RGWObjectCtx obj_ctx(store);
850 rgw_obj obj(bucket, sub_conf.dest.oid_prefix + event_id);
851
852 obj_ctx.set_atomic(obj);
853
854 RGWRados::Object del_target(store->getRados(), bucket_info, obj_ctx, obj);
855 RGWRados::Object::Delete del_op(&del_target);
856
857 del_op.params.bucket_owner = bucket_info.owner;
858 del_op.params.versioning_status = bucket_info.versioning_status();
859
860 ret = del_op.delete_obj(null_yield);
861 if (ret < 0) {
862 ldout(store->ctx(), 1) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl;
863 }
864 return 0;
865 }
866
867 void RGWUserPubSub::get_user_meta_obj(rgw_raw_obj *obj) const {
868 *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, user_meta_oid());
869 }
870
871 void RGWUserPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
872 *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
873 }
874
875 void RGWUserPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
876 *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
877 }
878
879 template<typename EventType>
880 void RGWUserPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
881 list.dump(f);
882 }
883
884 // explicit instantiation for the only two possible types
885 // no need to move implementation to header
886 template class RGWUserPubSub::SubWithEvents<rgw_pubsub_event>;
887 template class RGWUserPubSub::SubWithEvents<rgw_pubsub_s3_record>;
888