]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_pubsub.h
import quincy 17.2.0
[ceph.git] / ceph / src / rgw / rgw_pubsub.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef CEPH_RGW_PUBSUB_H
5 #define CEPH_RGW_PUBSUB_H
6
7 #include "services/svc_sys_obj.h"
8 #include "rgw_tools.h"
9 #include "rgw_zone.h"
10 #include "rgw_notify_event_type.h"
11 #include <boost/container/flat_map.hpp>
12
13 namespace rgw::sal { class RadosStore; }
14
15 class XMLObj;
16
17 struct rgw_s3_key_filter {
18 std::string prefix_rule;
19 std::string suffix_rule;
20 std::string regex_rule;
21
22 bool has_content() const;
23
24 bool decode_xml(XMLObj *obj);
25 void dump_xml(Formatter *f) const;
26
27 void encode(bufferlist& bl) const {
28 ENCODE_START(1, 1, bl);
29 encode(prefix_rule, bl);
30 encode(suffix_rule, bl);
31 encode(regex_rule, bl);
32 ENCODE_FINISH(bl);
33 }
34
35 void decode(bufferlist::const_iterator& bl) {
36 DECODE_START(1, bl);
37 decode(prefix_rule, bl);
38 decode(suffix_rule, bl);
39 decode(regex_rule, bl);
40 DECODE_FINISH(bl);
41 }
42 };
43 WRITE_CLASS_ENCODER(rgw_s3_key_filter)
44
45 using KeyValueMap = boost::container::flat_map<std::string, std::string>;
46 using KeyMultiValueMap = std::multimap<std::string, std::string>;
47
48 struct rgw_s3_key_value_filter {
49 KeyValueMap kv;
50
51 bool has_content() const;
52
53 bool decode_xml(XMLObj *obj);
54 void dump_xml(Formatter *f) const;
55
56 void encode(bufferlist& bl) const {
57 ENCODE_START(1, 1, bl);
58 encode(kv, bl);
59 ENCODE_FINISH(bl);
60 }
61 void decode(bufferlist::const_iterator& bl) {
62 DECODE_START(1, bl);
63 decode(kv, bl);
64 DECODE_FINISH(bl);
65 }
66 };
67 WRITE_CLASS_ENCODER(rgw_s3_key_value_filter)
68
69 struct rgw_s3_filter {
70 rgw_s3_key_filter key_filter;
71 rgw_s3_key_value_filter metadata_filter;
72 rgw_s3_key_value_filter tag_filter;
73
74 bool has_content() const;
75
76 bool decode_xml(XMLObj *obj);
77 void dump_xml(Formatter *f) const;
78
79 void encode(bufferlist& bl) const {
80 ENCODE_START(2, 1, bl);
81 encode(key_filter, bl);
82 encode(metadata_filter, bl);
83 encode(tag_filter, bl);
84 ENCODE_FINISH(bl);
85 }
86
87 void decode(bufferlist::const_iterator& bl) {
88 DECODE_START(2, bl);
89 decode(key_filter, bl);
90 decode(metadata_filter, bl);
91 if (struct_v >= 2) {
92 decode(tag_filter, bl);
93 }
94 DECODE_FINISH(bl);
95 }
96 };
97 WRITE_CLASS_ENCODER(rgw_s3_filter)
98
99 using OptionalFilter = std::optional<rgw_s3_filter>;
100
101 struct rgw_pubsub_topic_filter;
102 /* S3 notification configuration
103 * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
104 <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
105 <TopicConfiguration>
106 <Filter>
107 <S3Key>
108 <FilterRule>
109 <Name>suffix</Name>
110 <Value>jpg</Value>
111 </FilterRule>
112 </S3Key>
113 <S3Metadata>
114 <FilterRule>
115 <Name></Name>
116 <Value></Value>
117 </FilterRule>
118 </S3Metadata>
119 <S3Tags>
120 <FilterRule>
121 <Name></Name>
122 <Value></Value>
123 </FilterRule>
124 </S3Tags>
125 </Filter>
126 <Id>notification1</Id>
127 <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
128 <Event>s3:ObjectCreated:*</Event>
129 <Event>s3:ObjectRemoved:*</Event>
130 </TopicConfiguration>
131 </NotificationConfiguration>
132 */
133 struct rgw_pubsub_s3_notification {
134 // notification id
135 std::string id;
136 // types of events
137 rgw::notify::EventTypeList events;
138 // topic ARN
139 std::string topic_arn;
140 // filter rules
141 rgw_s3_filter filter;
142
143 bool decode_xml(XMLObj *obj);
144 void dump_xml(Formatter *f) const;
145
146 rgw_pubsub_s3_notification() = default;
147 // construct from rgw_pubsub_topic_filter (used by get/list notifications)
148 explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
149 };
150
151 // return true if the key matches the prefix/suffix/regex rules of the key filter
152 bool match(const rgw_s3_key_filter& filter, const std::string& key);
153
154 // return true if the key matches the metadata rules of the metadata filter
155 bool match(const rgw_s3_key_value_filter& filter, const KeyValueMap& kv);
156
157 // return true if the key matches the tag rules of the tag filter
158 bool match(const rgw_s3_key_value_filter& filter, const KeyMultiValueMap& kv);
159
160 // return true if the event type matches (equal or contained in) one of the events in the list
161 bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event);
162
163 struct rgw_pubsub_s3_notifications {
164 std::list<rgw_pubsub_s3_notification> list;
165 bool decode_xml(XMLObj *obj);
166 void dump_xml(Formatter *f) const;
167 };
168
169 /* S3 event records structure
170 * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
171 {
172 "Records":[
173 {
174 "eventVersion":""
175 "eventSource":"",
176 "awsRegion":"",
177 "eventTime":"",
178 "eventName":"",
179 "userIdentity":{
180 "principalId":""
181 },
182 "requestParameters":{
183 "sourceIPAddress":""
184 },
185 "responseElements":{
186 "x-amz-request-id":"",
187 "x-amz-id-2":""
188 },
189 "s3":{
190 "s3SchemaVersion":"1.0",
191 "configurationId":"",
192 "bucket":{
193 "name":"",
194 "ownerIdentity":{
195 "principalId":""
196 },
197 "arn":""
198 "id": ""
199 },
200 "object":{
201 "key":"",
202 "size": ,
203 "eTag":"",
204 "versionId":"",
205 "sequencer": "",
206 "metadata": ""
207 "tags": ""
208 }
209 },
210 "eventId":"",
211 }
212 ]
213 }*/
214
215 struct rgw_pubsub_s3_event {
216 constexpr static const char* const json_type_plural = "Records";
217 std::string eventVersion = "2.2";
218 // aws:s3
219 std::string eventSource = "ceph:s3";
220 // zonegroup
221 std::string awsRegion;
222 // time of the request
223 ceph::real_time eventTime;
224 // type of the event
225 std::string eventName;
226 // user that sent the request
227 std::string userIdentity;
228 // IP address of source of the request (not implemented)
229 std::string sourceIPAddress;
230 // request ID (not implemented)
231 std::string x_amz_request_id;
232 // radosgw that received the request
233 std::string x_amz_id_2;
234 std::string s3SchemaVersion = "1.0";
235 // ID received in the notification request
236 std::string configurationId;
237 // bucket name
238 std::string bucket_name;
239 // bucket owner
240 std::string bucket_ownerIdentity;
241 // bucket ARN
242 std::string bucket_arn;
243 // object key
244 std::string object_key;
245 // object size
246 uint64_t object_size = 0;
247 // object etag
248 std::string object_etag;
249 // object version id bucket is versioned
250 std::string object_versionId;
251 // hexadecimal value used to determine event order for specific key
252 std::string object_sequencer;
253 // this is an rgw extension (not S3 standard)
254 // used to store a globally unique identifier of the event
255 // that could be used for acking or any other identification of the event
256 std::string id;
257 // this is an rgw extension holding the internal bucket id
258 std::string bucket_id;
259 // meta data
260 KeyValueMap x_meta_map;
261 // tags
262 KeyMultiValueMap tags;
263 // opaque data received from the topic
264 // could be used to identify the gateway
265 std::string opaque_data;
266
267 void encode(bufferlist& bl) const {
268 ENCODE_START(4, 1, bl);
269 encode(eventVersion, bl);
270 encode(eventSource, bl);
271 encode(awsRegion, bl);
272 encode(eventTime, bl);
273 encode(eventName, bl);
274 encode(userIdentity, bl);
275 encode(sourceIPAddress, bl);
276 encode(x_amz_request_id, bl);
277 encode(x_amz_id_2, bl);
278 encode(s3SchemaVersion, bl);
279 encode(configurationId, bl);
280 encode(bucket_name, bl);
281 encode(bucket_ownerIdentity, bl);
282 encode(bucket_arn, bl);
283 encode(object_key, bl);
284 encode(object_size, bl);
285 encode(object_etag, bl);
286 encode(object_versionId, bl);
287 encode(object_sequencer, bl);
288 encode(id, bl);
289 encode(bucket_id, bl);
290 encode(x_meta_map, bl);
291 encode(tags, bl);
292 encode(opaque_data, bl);
293 ENCODE_FINISH(bl);
294 }
295
296 void decode(bufferlist::const_iterator& bl) {
297 DECODE_START(4, bl);
298 decode(eventVersion, bl);
299 decode(eventSource, bl);
300 decode(awsRegion, bl);
301 decode(eventTime, bl);
302 decode(eventName, bl);
303 decode(userIdentity, bl);
304 decode(sourceIPAddress, bl);
305 decode(x_amz_request_id, bl);
306 decode(x_amz_id_2, bl);
307 decode(s3SchemaVersion, bl);
308 decode(configurationId, bl);
309 decode(bucket_name, bl);
310 decode(bucket_ownerIdentity, bl);
311 decode(bucket_arn, bl);
312 decode(object_key, bl);
313 decode(object_size, bl);
314 decode(object_etag, bl);
315 decode(object_versionId, bl);
316 decode(object_sequencer, bl);
317 decode(id, bl);
318 if (struct_v >= 2) {
319 decode(bucket_id, bl);
320 decode(x_meta_map, bl);
321 }
322 if (struct_v >= 3) {
323 decode(tags, bl);
324 }
325 if (struct_v >= 4) {
326 decode(opaque_data, bl);
327 }
328 DECODE_FINISH(bl);
329 }
330
331 void dump(Formatter *f) const;
332 };
333 WRITE_CLASS_ENCODER(rgw_pubsub_s3_event)
334
335 struct rgw_pubsub_event {
336 constexpr static const char* const json_type_plural = "events";
337 std::string id;
338 std::string event_name;
339 std::string source;
340 ceph::real_time timestamp;
341 JSONFormattable info;
342
343 void encode(bufferlist& bl) const {
344 ENCODE_START(1, 1, bl);
345 encode(id, bl);
346 encode(event_name, bl);
347 encode(source, bl);
348 encode(timestamp, bl);
349 encode(info, bl);
350 ENCODE_FINISH(bl);
351 }
352
353 void decode(bufferlist::const_iterator& bl) {
354 DECODE_START(1, bl);
355 decode(id, bl);
356 decode(event_name, bl);
357 decode(source, bl);
358 decode(timestamp, bl);
359 decode(info, bl);
360 DECODE_FINISH(bl);
361 }
362
363 void dump(Formatter *f) const;
364 };
365 WRITE_CLASS_ENCODER(rgw_pubsub_event)
366
367 // setting a unique ID for an event based on object hash and timestamp
368 void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
369
370 struct rgw_pubsub_sub_dest {
371 std::string bucket_name;
372 std::string oid_prefix;
373 std::string push_endpoint;
374 std::string push_endpoint_args;
375 std::string arn_topic;
376 bool stored_secret = false;
377 bool persistent = false;
378
379 void encode(bufferlist& bl) const {
380 ENCODE_START(5, 1, bl);
381 encode(bucket_name, bl);
382 encode(oid_prefix, bl);
383 encode(push_endpoint, bl);
384 encode(push_endpoint_args, bl);
385 encode(arn_topic, bl);
386 encode(stored_secret, bl);
387 encode(persistent, bl);
388 ENCODE_FINISH(bl);
389 }
390
391 void decode(bufferlist::const_iterator& bl) {
392 DECODE_START(5, bl);
393 decode(bucket_name, bl);
394 decode(oid_prefix, bl);
395 decode(push_endpoint, bl);
396 if (struct_v >= 2) {
397 decode(push_endpoint_args, bl);
398 }
399 if (struct_v >= 3) {
400 decode(arn_topic, bl);
401 }
402 if (struct_v >= 4) {
403 decode(stored_secret, bl);
404 }
405 if (struct_v >= 5) {
406 decode(persistent, bl);
407 }
408 DECODE_FINISH(bl);
409 }
410
411 void dump(Formatter *f) const;
412 void dump_xml(Formatter *f) const;
413 std::string to_json_str() const;
414 };
415 WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
416
417 struct rgw_pubsub_sub_config {
418 rgw_user user;
419 std::string name;
420 std::string topic;
421 rgw_pubsub_sub_dest dest;
422 std::string s3_id;
423
424 void encode(bufferlist& bl) const {
425 ENCODE_START(2, 1, bl);
426 encode(user, bl);
427 encode(name, bl);
428 encode(topic, bl);
429 encode(dest, bl);
430 encode(s3_id, bl);
431 ENCODE_FINISH(bl);
432 }
433
434 void decode(bufferlist::const_iterator& bl) {
435 DECODE_START(2, bl);
436 decode(user, bl);
437 decode(name, bl);
438 decode(topic, bl);
439 decode(dest, bl);
440 if (struct_v >= 2) {
441 decode(s3_id, bl);
442 }
443 DECODE_FINISH(bl);
444 }
445
446 void dump(Formatter *f) const;
447 };
448 WRITE_CLASS_ENCODER(rgw_pubsub_sub_config)
449
450 struct rgw_pubsub_topic {
451 rgw_user user;
452 std::string name;
453 rgw_pubsub_sub_dest dest;
454 std::string arn;
455 std::string opaque_data;
456
457 void encode(bufferlist& bl) const {
458 ENCODE_START(3, 1, bl);
459 encode(user, bl);
460 encode(name, bl);
461 encode(dest, bl);
462 encode(arn, bl);
463 encode(opaque_data, bl);
464 ENCODE_FINISH(bl);
465 }
466
467 void decode(bufferlist::const_iterator& bl) {
468 DECODE_START(3, bl);
469 decode(user, bl);
470 decode(name, bl);
471 if (struct_v >= 2) {
472 decode(dest, bl);
473 decode(arn, bl);
474 }
475 if (struct_v >= 3) {
476 decode(opaque_data, bl);
477 }
478 DECODE_FINISH(bl);
479 }
480
481 std::string to_str() const {
482 return user.tenant + "/" + name;
483 }
484
485 void dump(Formatter *f) const;
486 void dump_xml(Formatter *f) const;
487 void dump_xml_as_attributes(Formatter *f) const;
488
489 bool operator<(const rgw_pubsub_topic& t) const {
490 return to_str().compare(t.to_str());
491 }
492 };
493 WRITE_CLASS_ENCODER(rgw_pubsub_topic)
494
495 struct rgw_pubsub_topic_subs {
496 rgw_pubsub_topic topic;
497 std::set<std::string> subs;
498
499 void encode(bufferlist& bl) const {
500 ENCODE_START(1, 1, bl);
501 encode(topic, bl);
502 encode(subs, bl);
503 ENCODE_FINISH(bl);
504 }
505
506 void decode(bufferlist::const_iterator& bl) {
507 DECODE_START(1, bl);
508 decode(topic, bl);
509 decode(subs, bl);
510 DECODE_FINISH(bl);
511 }
512
513 void dump(Formatter *f) const;
514 };
515 WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)
516
517 struct rgw_pubsub_topic_filter {
518 rgw_pubsub_topic topic;
519 rgw::notify::EventTypeList events;
520 std::string s3_id;
521 rgw_s3_filter s3_filter;
522
523 void encode(bufferlist& bl) const {
524 ENCODE_START(3, 1, bl);
525 encode(topic, bl);
526 // events are stored as a vector of std::strings
527 std::vector<std::string> tmp_events;
528 const auto converter = s3_id.empty() ? rgw::notify::to_ceph_string : rgw::notify::to_string;
529 std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter);
530 encode(tmp_events, bl);
531 encode(s3_id, bl);
532 encode(s3_filter, bl);
533 ENCODE_FINISH(bl);
534 }
535
536 void decode(bufferlist::const_iterator& bl) {
537 DECODE_START(3, bl);
538 decode(topic, bl);
539 // events are stored as a vector of std::strings
540 events.clear();
541 std::vector<std::string> tmp_events;
542 decode(tmp_events, bl);
543 std::transform(tmp_events.begin(), tmp_events.end(), std::back_inserter(events), rgw::notify::from_string);
544 if (struct_v >= 2) {
545 decode(s3_id, bl);
546 }
547 if (struct_v >= 3) {
548 decode(s3_filter, bl);
549 }
550 DECODE_FINISH(bl);
551 }
552
553 void dump(Formatter *f) const;
554 };
555 WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter)
556
557 struct rgw_pubsub_bucket_topics {
558 std::map<std::string, rgw_pubsub_topic_filter> topics;
559
560 void encode(bufferlist& bl) const {
561 ENCODE_START(1, 1, bl);
562 encode(topics, bl);
563 ENCODE_FINISH(bl);
564 }
565
566 void decode(bufferlist::const_iterator& bl) {
567 DECODE_START(1, bl);
568 decode(topics, bl);
569 DECODE_FINISH(bl);
570 }
571
572 void dump(Formatter *f) const;
573 };
574 WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
575
576 struct rgw_pubsub_topics {
577 std::map<std::string, rgw_pubsub_topic_subs> topics;
578
579 void encode(bufferlist& bl) const {
580 ENCODE_START(1, 1, bl);
581 encode(topics, bl);
582 ENCODE_FINISH(bl);
583 }
584
585 void decode(bufferlist::const_iterator& bl) {
586 DECODE_START(1, bl);
587 decode(topics, bl);
588 DECODE_FINISH(bl);
589 }
590
591 void dump(Formatter *f) const;
592 void dump_xml(Formatter *f) const;
593 };
594 WRITE_CLASS_ENCODER(rgw_pubsub_topics)
595
596 static std::string pubsub_oid_prefix = "pubsub.";
597
598 class RGWPubSub
599 {
600 friend class Bucket;
601
602 rgw::sal::RadosStore* store;
603 const std::string tenant;
604 RGWSysObjectCtx obj_ctx;
605
606 rgw_raw_obj meta_obj;
607
608 std::string meta_oid() const {
609 return pubsub_oid_prefix + tenant;
610 }
611
612 std::string bucket_meta_oid(const rgw_bucket& bucket) const {
613 return pubsub_oid_prefix + tenant + ".bucket." + bucket.name + "/" + bucket.marker;
614 }
615
616 std::string sub_meta_oid(const std::string& name) const {
617 return pubsub_oid_prefix + tenant + ".sub." + name;
618 }
619
620 template <class T>
621 int read(const rgw_raw_obj& obj, T* data, RGWObjVersionTracker* objv_tracker);
622
623 template <class T>
624 int write(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, const T& info,
625 RGWObjVersionTracker* obj_tracker, optional_yield y);
626
627 int remove(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, RGWObjVersionTracker* objv_tracker,
628 optional_yield y);
629
630 int read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker* objv_tracker);
631 int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
632 RGWObjVersionTracker* objv_tracker, optional_yield y);
633
634 public:
635 RGWPubSub(rgw::sal::RadosStore* _store, const std::string& tenant);
636
637 class Bucket {
638 friend class RGWPubSub;
639 RGWPubSub *ps;
640 rgw_bucket bucket;
641 rgw_raw_obj bucket_meta_obj;
642
643 // read the list of topics associated with a bucket and populate into result
644 // use version tacker to enforce atomicity between read/write
645 // return 0 on success or if no topic was associated with the bucket, error code otherwise
646 int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker* objv_tracker);
647 // set the list of topics associated with a bucket
648 // use version tacker to enforce atomicity between read/write
649 // return 0 on success, error code otherwise
650 int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics,
651 RGWObjVersionTracker* objv_tracker, optional_yield y);
652 public:
653 Bucket(RGWPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
654 ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
655 }
656
657 // read the list of topics associated with a bucket and populate into result
658 // return 0 on success or if no topic was associated with the bucket, error code otherwise
659 int get_topics(rgw_pubsub_bucket_topics *result);
660 // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
661 // assigning a notification name is optional (needed for S3 compatible notifications)
662 // if the topic already exist on the bucket, the filter event list may be updated
663 // for S3 compliant notifications the version with: s3_filter and notif_name should be used
664 // return -ENOENT if the topic does not exists
665 // return 0 on success, error code otherwise
666 int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y);
667 int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y);
668 // remove a topic and filter from bucket
669 // if the topic does not exists on the bucket it is a no-op (considered success)
670 // return -ENOENT if the topic does not exists
671 // return 0 on success, error code otherwise
672 int remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y);
673 // remove all notifications (and autogenerated topics) associated with the bucket
674 // return 0 on success or if no topic was associated with the bucket, error code otherwise
675 int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y);
676 };
677
678 // base class for subscription
679 class Sub {
680 friend class RGWPubSub;
681 protected:
682 RGWPubSub* const ps;
683 const std::string sub;
684 rgw_raw_obj sub_meta_obj;
685
686 int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker* objv_tracker);
687 int write_sub(const DoutPrefixProvider *dpp, const rgw_pubsub_sub_config& sub_conf,
688 RGWObjVersionTracker* objv_tracker, optional_yield y);
689 int remove_sub(const DoutPrefixProvider *dpp, RGWObjVersionTracker* objv_tracker, optional_yield y);
690 public:
691 Sub(RGWPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
692 ps->get_sub_meta_obj(sub, &sub_meta_obj);
693 }
694
695 virtual ~Sub() = default;
696
697 int subscribe(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw_pubsub_sub_dest& dest, optional_yield y,
698 const std::string& s3_id="");
699 int unsubscribe(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y);
700 int get_conf(rgw_pubsub_sub_config* result);
701
702 static const int DEFAULT_MAX_EVENTS = 100;
703 // followint virtual methods should only be called in derived
704 virtual int list_events(const DoutPrefixProvider *dpp, const std::string& marker, int max_events) {ceph_assert(false);}
705 virtual int remove_event(const DoutPrefixProvider *dpp, const std::string& event_id) {ceph_assert(false);}
706 virtual void dump(Formatter* f) const {ceph_assert(false);}
707 };
708
709 // subscription with templated list of events to support both S3 compliant and Ceph specific events
710 template<typename EventType>
711 class SubWithEvents : public Sub {
712 private:
713 struct list_events_result {
714 std::string next_marker;
715 bool is_truncated{false};
716 void dump(Formatter *f) const;
717 std::vector<EventType> events;
718 } list;
719
720 public:
721 SubWithEvents(RGWPubSub *_ps, const std::string& _sub) : Sub(_ps, _sub) {}
722
723 virtual ~SubWithEvents() = default;
724
725 int list_events(const DoutPrefixProvider *dpp, const std::string& marker, int max_events) override;
726 int remove_event(const DoutPrefixProvider *dpp, const std::string& event_id) override;
727 void dump(Formatter* f) const override;
728 };
729
730 using BucketRef = std::shared_ptr<Bucket>;
731 using SubRef = std::shared_ptr<Sub>;
732
733 BucketRef get_bucket(const rgw_bucket& bucket) {
734 return std::make_shared<Bucket>(this, bucket);
735 }
736
737 SubRef get_sub(const std::string& sub) {
738 return std::make_shared<Sub>(this, sub);
739 }
740
741 SubRef get_sub_with_events(const std::string& sub) {
742 auto tmpsub = Sub(this, sub);
743 rgw_pubsub_sub_config conf;
744 if (tmpsub.get_conf(&conf) < 0) {
745 return nullptr;
746 }
747 if (conf.s3_id.empty()) {
748 return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub);
749 }
750 return std::make_shared<SubWithEvents<rgw_pubsub_s3_event>>(this, sub);
751 }
752
753 void get_meta_obj(rgw_raw_obj *obj) const;
754 void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const;
755
756 void get_sub_meta_obj(const std::string& name, rgw_raw_obj *obj) const;
757
758 // get all topics (per tenant, if used)) and populate them into "result"
759 // return 0 on success or if no topics exist, error code otherwise
760 int get_topics(rgw_pubsub_topics *result);
761 // get a topic with its subscriptions by its name and populate it into "result"
762 // return -ENOENT if the topic does not exists
763 // return 0 on success, error code otherwise
764 int get_topic(const std::string& name, rgw_pubsub_topic_subs *result);
765 // get a topic with by its name and populate it into "result"
766 // return -ENOENT if the topic does not exists
767 // return 0 on success, error code otherwise
768 int get_topic(const std::string& name, rgw_pubsub_topic *result);
769 // create a topic with a name only
770 // if the topic already exists it is a no-op (considered success)
771 // return 0 on success, error code otherwise
772 int create_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y);
773 // create a topic with push destination information and ARN
774 // if the topic already exists the destination and ARN values may be updated (considered succsess)
775 // return 0 on success, error code otherwise
776 int create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y);
777 // remove a topic according to its name
778 // if the topic does not exists it is a no-op (considered success)
779 // return 0 on success, error code otherwise
780 int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y);
781 };
782
783
784 template <class T>
785 int RGWPubSub::read(const rgw_raw_obj& obj, T* result, RGWObjVersionTracker* objv_tracker)
786 {
787 bufferlist bl;
788 int ret = rgw_get_system_obj(obj_ctx,
789 obj.pool, obj.oid,
790 bl,
791 objv_tracker,
792 nullptr, null_yield, nullptr, nullptr);
793 if (ret < 0) {
794 return ret;
795 }
796
797 auto iter = bl.cbegin();
798 try {
799 decode(*result, iter);
800 } catch (buffer::error& err) {
801 return -EIO;
802 }
803
804 return 0;
805 }
806
807 template <class T>
808 int RGWPubSub::write(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, const T& info,
809 RGWObjVersionTracker* objv_tracker, optional_yield y)
810 {
811 bufferlist bl;
812 encode(info, bl);
813
814 int ret = rgw_put_system_obj(dpp, obj_ctx, obj.pool, obj.oid,
815 bl, false, objv_tracker,
816 real_time(), y);
817 if (ret < 0) {
818 return ret;
819 }
820
821 obj_ctx.invalidate(obj);
822 return 0;
823 }
824
825 #endif