]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_pubsub.h
import ceph 14.2.5
[ceph.git] / ceph / src / rgw / rgw_pubsub.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef CEPH_RGW_PUBSUB_H
5 #define CEPH_RGW_PUBSUB_H
6
7 #include "rgw_common.h"
8 #include "rgw_tools.h"
9 #include "rgw_zone.h"
10 #include "rgw_rados.h"
11 #include "rgw_notify_event_type.h"
12 #include "services/svc_sys_obj.h"
13
14 class XMLObj;
15
16 struct rgw_s3_key_filter {
17 std::string prefix_rule;
18 std::string suffix_rule;
19 std::string regex_rule;
20
21 bool has_content() const;
22
23 bool decode_xml(XMLObj *obj);
24 void dump_xml(Formatter *f) const;
25
26 void encode(bufferlist& bl) const {
27 ENCODE_START(1, 1, bl);
28 encode(prefix_rule, bl);
29 encode(suffix_rule, bl);
30 encode(regex_rule, bl);
31 ENCODE_FINISH(bl);
32 }
33
34 void decode(bufferlist::const_iterator& bl) {
35 DECODE_START(1, bl);
36 decode(prefix_rule, bl);
37 decode(suffix_rule, bl);
38 decode(regex_rule, bl);
39 DECODE_FINISH(bl);
40 }
41 };
42 WRITE_CLASS_ENCODER(rgw_s3_key_filter)
43
44 using Metadata = std::map<std::string, std::string>;
45
46 struct rgw_s3_metadata_filter {
47 Metadata metadata;
48
49 bool has_content() const;
50
51 bool decode_xml(XMLObj *obj);
52 void dump_xml(Formatter *f) const;
53
54 void encode(bufferlist& bl) const {
55 ENCODE_START(1, 1, bl);
56 encode(metadata, bl);
57 ENCODE_FINISH(bl);
58 }
59 void decode(bufferlist::const_iterator& bl) {
60 DECODE_START(1, bl);
61 decode(metadata, bl);
62 DECODE_FINISH(bl);
63 }
64 };
65 WRITE_CLASS_ENCODER(rgw_s3_metadata_filter)
66
67 struct rgw_s3_filter {
68 rgw_s3_key_filter key_filter;
69 rgw_s3_metadata_filter metadata_filter;
70
71 bool has_content() const;
72
73 bool decode_xml(XMLObj *obj);
74 void dump_xml(Formatter *f) const;
75
76 void encode(bufferlist& bl) const {
77 ENCODE_START(1, 1, bl);
78 encode(key_filter, bl);
79 encode(metadata_filter, bl);
80 ENCODE_FINISH(bl);
81 }
82
83 void decode(bufferlist::const_iterator& bl) {
84 DECODE_START(1, bl);
85 decode(key_filter, bl);
86 decode(metadata_filter, bl);
87 DECODE_FINISH(bl);
88 }
89 };
90 WRITE_CLASS_ENCODER(rgw_s3_filter)
91
92 using OptionalFilter = std::optional<rgw_s3_filter>;
93
94 class rgw_pubsub_topic_filter;
95 /* S3 notification configuration
96 * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
97 <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
98 <TopicConfiguration>
99 <Filter>
100 <S3Key>
101 <FilterRule>
102 <Name>suffix</Name>
103 <Value>jpg</Value>
104 </FilterRule>
105 </S3Key>
106 <S3Metadata>
107 <FilterRule>
108 <Name></Name>
109 <Value></Value>
110 </FilterRule>
111 </s3Metadata>
112 </Filter>
113 <Id>notification1</Id>
114 <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
115 <Event>s3:ObjectCreated:*</Event>
116 <Event>s3:ObjectRemoved:*</Event>
117 </TopicConfiguration>
118 </NotificationConfiguration>
119 */
120 struct rgw_pubsub_s3_notification {
121 // notification id
122 std::string id;
123 // types of events
124 rgw::notify::EventTypeList events;
125 // topic ARN
126 std::string topic_arn;
127 // filter rules
128 rgw_s3_filter filter;
129
130 bool decode_xml(XMLObj *obj);
131 void dump_xml(Formatter *f) const;
132
133 rgw_pubsub_s3_notification() = default;
134 // construct from rgw_pubsub_topic_filter (used by get/list notifications)
135 rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
136 };
137
138 // return true if the key matches the prefix/suffix/regex rules of the key filter
139 bool match(const rgw_s3_key_filter& filter, const std::string& key);
140 // return true if the key matches the metadata rules of the metadata filter
141 bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata);
142 // return true if the event type matches (equal or contained in) one of the events in the list
143 bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event);
144
145 struct rgw_pubsub_s3_notifications {
146 std::list<rgw_pubsub_s3_notification> list;
147 bool decode_xml(XMLObj *obj);
148 void dump_xml(Formatter *f) const;
149 };
150
151 /* S3 event records structure
152 * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
153 {
154 "Records":[
155 {
156 "eventVersion":""
157 "eventSource":"",
158 "awsRegion":"",
159 "eventTime":"",
160 "eventName":"",
161 "userIdentity":{
162 "principalId":""
163 },
164 "requestParameters":{
165 "sourceIPAddress":""
166 },
167 "responseElements":{
168 "x-amz-request-id":"",
169 "x-amz-id-2":""
170 },
171 "s3":{
172 "s3SchemaVersion":"1.0",
173 "configurationId":"",
174 "bucket":{
175 "name":"",
176 "ownerIdentity":{
177 "principalId":""
178 },
179 "arn":""
180 "id": ""
181 },
182 "object":{
183 "key":"",
184 "size": ,
185 "eTag":"",
186 "versionId":"",
187 "sequencer": "",
188 "metadata": ""
189 }
190 },
191 "eventId":"",
192 }
193 ]
194 }*/
195
196 struct rgw_pubsub_s3_record {
197 constexpr static const char* const json_type_single = "Record";
198 constexpr static const char* const json_type_plural = "Records";
199 // 2.1
200 std::string eventVersion;
201 // aws:s3
202 std::string eventSource;
203 // zonegroup
204 std::string awsRegion;
205 // time of the request
206 ceph::real_time eventTime;
207 // type of the event
208 std::string eventName;
209 // user that sent the requet (not implemented)
210 std::string userIdentity;
211 // IP address of source of the request (not implemented)
212 std::string sourceIPAddress;
213 // request ID (not implemented)
214 std::string x_amz_request_id;
215 // radosgw that received the request
216 std::string x_amz_id_2;
217 // 1.0
218 std::string s3SchemaVersion;
219 // ID received in the notification request
220 std::string configurationId;
221 // bucket name
222 std::string bucket_name;
223 // bucket owner (not implemented)
224 std::string bucket_ownerIdentity;
225 // bucket ARN
226 std::string bucket_arn;
227 // object key
228 std::string object_key;
229 // object size (not implemented)
230 uint64_t object_size;
231 // object etag
232 std::string object_etag;
233 // object version id bucket is versioned
234 std::string object_versionId;
235 // hexadecimal value used to determine event order for specific key
236 std::string object_sequencer;
237 // this is an rgw extension (not S3 standard)
238 // used to store a globally unique identifier of the event
239 // that could be used for acking
240 std::string id;
241 // this is an rgw extension holding the internal bucket id
242 std::string bucket_id;
243 // meta data
244 std::map<std::string, std::string> x_meta_map;
245
246 void encode(bufferlist& bl) const {
247 ENCODE_START(2, 1, bl);
248 encode(eventVersion, bl);
249 encode(eventSource, bl);
250 encode(awsRegion, bl);
251 encode(eventTime, bl);
252 encode(eventName, bl);
253 encode(userIdentity, bl);
254 encode(sourceIPAddress, bl);
255 encode(x_amz_request_id, bl);
256 encode(x_amz_id_2, bl);
257 encode(s3SchemaVersion, bl);
258 encode(configurationId, bl);
259 encode(bucket_name, bl);
260 encode(bucket_ownerIdentity, bl);
261 encode(bucket_arn, bl);
262 encode(object_key, bl);
263 encode(object_size, bl);
264 encode(object_etag, bl);
265 encode(object_versionId, bl);
266 encode(object_sequencer, bl);
267 encode(id, bl);
268 encode(bucket_id, bl);
269 encode(x_meta_map, bl);
270 ENCODE_FINISH(bl);
271 }
272
273 void decode(bufferlist::const_iterator& bl) {
274 DECODE_START(2, bl);
275 decode(eventVersion, bl);
276 decode(eventSource, bl);
277 decode(awsRegion, bl);
278 decode(eventTime, bl);
279 decode(eventName, bl);
280 decode(userIdentity, bl);
281 decode(sourceIPAddress, bl);
282 decode(x_amz_request_id, bl);
283 decode(x_amz_id_2, bl);
284 decode(s3SchemaVersion, bl);
285 decode(configurationId, bl);
286 decode(bucket_name, bl);
287 decode(bucket_ownerIdentity, bl);
288 decode(bucket_arn, bl);
289 decode(object_key, bl);
290 decode(object_size, bl);
291 decode(object_etag, bl);
292 decode(object_versionId, bl);
293 decode(object_sequencer, bl);
294 decode(id, bl);
295 if (struct_v >= 2) {
296 decode(bucket_id, bl);
297 decode(x_meta_map, bl);
298 }
299 DECODE_FINISH(bl);
300 }
301
302 void dump(Formatter *f) const;
303 };
304 WRITE_CLASS_ENCODER(rgw_pubsub_s3_record)
305
306 struct rgw_pubsub_event {
307 constexpr static const char* const json_type_single = "event";
308 constexpr static const char* const json_type_plural = "events";
309 std::string id;
310 std::string event_name;
311 std::string source;
312 ceph::real_time timestamp;
313 JSONFormattable info;
314
315 void encode(bufferlist& bl) const {
316 ENCODE_START(1, 1, bl);
317 encode(id, bl);
318 encode(event_name, bl);
319 encode(source, bl);
320 encode(timestamp, bl);
321 encode(info, bl);
322 ENCODE_FINISH(bl);
323 }
324
325 void decode(bufferlist::const_iterator& bl) {
326 DECODE_START(1, bl);
327 decode(id, bl);
328 decode(event_name, bl);
329 decode(source, bl);
330 decode(timestamp, bl);
331 decode(info, bl);
332 DECODE_FINISH(bl);
333 }
334
335 void dump(Formatter *f) const;
336 };
337 WRITE_CLASS_ENCODER(rgw_pubsub_event)
338
339 struct rgw_pubsub_sub_dest {
340 std::string bucket_name;
341 std::string oid_prefix;
342 std::string push_endpoint;
343 std::string push_endpoint_args;
344 std::string arn_topic;
345
346 void encode(bufferlist& bl) const {
347 ENCODE_START(3, 1, bl);
348 encode(bucket_name, bl);
349 encode(oid_prefix, bl);
350 encode(push_endpoint, bl);
351 encode(push_endpoint_args, bl);
352 encode(arn_topic, bl);
353 ENCODE_FINISH(bl);
354 }
355
356 void decode(bufferlist::const_iterator& bl) {
357 DECODE_START(3, bl);
358 decode(bucket_name, bl);
359 decode(oid_prefix, bl);
360 decode(push_endpoint, bl);
361 if (struct_v >= 2) {
362 decode(push_endpoint_args, bl);
363 }
364 if (struct_v >= 3) {
365 decode(arn_topic, bl);
366 }
367 DECODE_FINISH(bl);
368 }
369
370 void dump(Formatter *f) const;
371 void dump_xml(Formatter *f) const;
372 };
373 WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
374
375 struct rgw_pubsub_sub_config {
376 rgw_user user;
377 std::string name;
378 std::string topic;
379 rgw_pubsub_sub_dest dest;
380 std::string s3_id;
381
382 void encode(bufferlist& bl) const {
383 ENCODE_START(2, 1, bl);
384 encode(user, bl);
385 encode(name, bl);
386 encode(topic, bl);
387 encode(dest, bl);
388 encode(s3_id, bl);
389 ENCODE_FINISH(bl);
390 }
391
392 void decode(bufferlist::const_iterator& bl) {
393 DECODE_START(2, bl);
394 decode(user, bl);
395 decode(name, bl);
396 decode(topic, bl);
397 decode(dest, bl);
398 if (struct_v >= 2) {
399 decode(s3_id, bl);
400 }
401 DECODE_FINISH(bl);
402 }
403
404 void dump(Formatter *f) const;
405 };
406 WRITE_CLASS_ENCODER(rgw_pubsub_sub_config)
407
408 struct rgw_pubsub_topic {
409 rgw_user user;
410 std::string name;
411 rgw_pubsub_sub_dest dest;
412 std::string arn;
413
414 void encode(bufferlist& bl) const {
415 ENCODE_START(2, 1, bl);
416 encode(user, bl);
417 encode(name, bl);
418 encode(dest, bl);
419 encode(arn, bl);
420 ENCODE_FINISH(bl);
421 }
422
423 void decode(bufferlist::const_iterator& bl) {
424 DECODE_START(2, bl);
425 decode(user, bl);
426 decode(name, bl);
427 if (struct_v >= 2) {
428 decode(dest, bl);
429 decode(arn, bl);
430 }
431 DECODE_FINISH(bl);
432 }
433
434 string to_str() const {
435 return user.to_str() + "/" + name;
436 }
437
438 void dump(Formatter *f) const;
439 void dump_xml(Formatter *f) const;
440
441 bool operator<(const rgw_pubsub_topic& t) const {
442 return to_str().compare(t.to_str());
443 }
444 };
445 WRITE_CLASS_ENCODER(rgw_pubsub_topic)
446
447 struct rgw_pubsub_topic_subs {
448 rgw_pubsub_topic topic;
449 std::set<std::string> subs;
450
451 void encode(bufferlist& bl) const {
452 ENCODE_START(1, 1, bl);
453 encode(topic, bl);
454 encode(subs, bl);
455 ENCODE_FINISH(bl);
456 }
457
458 void decode(bufferlist::const_iterator& bl) {
459 DECODE_START(1, bl);
460 decode(topic, bl);
461 decode(subs, bl);
462 DECODE_FINISH(bl);
463 }
464
465 void dump(Formatter *f) const;
466 };
467 WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)
468
469 struct rgw_pubsub_topic_filter {
470 rgw_pubsub_topic topic;
471 rgw::notify::EventTypeList events;
472 std::string s3_id;
473 rgw_s3_filter s3_filter;
474
475 void encode(bufferlist& bl) const {
476 ENCODE_START(3, 1, bl);
477 encode(topic, bl);
478 // events are stored as a vector of strings
479 std::vector<std::string> tmp_events;
480 const auto converter = s3_id.empty() ? rgw::notify::to_ceph_string : rgw::notify::to_string;
481 std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter);
482 encode(tmp_events, bl);
483 encode(s3_id, bl);
484 encode(s3_filter, bl);
485 ENCODE_FINISH(bl);
486 }
487
488 void decode(bufferlist::const_iterator& bl) {
489 DECODE_START(3, bl);
490 decode(topic, bl);
491 // events are stored as a vector of strings
492 events.clear();
493 std::vector<std::string> tmp_events;
494 decode(tmp_events, bl);
495 std::transform(tmp_events.begin(), tmp_events.end(), std::back_inserter(events), rgw::notify::from_string);
496 if (struct_v >= 2) {
497 decode(s3_id, bl);
498 }
499 if (struct_v >= 3) {
500 decode(s3_filter, bl);
501 }
502 DECODE_FINISH(bl);
503 }
504
505 void dump(Formatter *f) const;
506 };
507 WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter)
508
509 struct rgw_pubsub_bucket_topics {
510 std::map<std::string, rgw_pubsub_topic_filter> topics;
511
512 void encode(bufferlist& bl) const {
513 ENCODE_START(1, 1, bl);
514 encode(topics, bl);
515 ENCODE_FINISH(bl);
516 }
517
518 void decode(bufferlist::const_iterator& bl) {
519 DECODE_START(1, bl);
520 decode(topics, bl);
521 DECODE_FINISH(bl);
522 }
523
524 void dump(Formatter *f) const;
525 };
526 WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
527
528 struct rgw_pubsub_user_topics {
529 std::map<std::string, rgw_pubsub_topic_subs> topics;
530
531 void encode(bufferlist& bl) const {
532 ENCODE_START(1, 1, bl);
533 encode(topics, bl);
534 ENCODE_FINISH(bl);
535 }
536
537 void decode(bufferlist::const_iterator& bl) {
538 DECODE_START(1, bl);
539 decode(topics, bl);
540 DECODE_FINISH(bl);
541 }
542
543 void dump(Formatter *f) const;
544 void dump_xml(Formatter *f) const;
545 };
546 WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)
547
548 static std::string pubsub_user_oid_prefix = "pubsub.user.";
549
550 class RGWUserPubSub
551 {
552 friend class Bucket;
553
554 RGWRados *store;
555 rgw_user user;
556 RGWSysObjectCtx obj_ctx;
557
558 rgw_raw_obj user_meta_obj;
559
560 std::string user_meta_oid() const {
561 return pubsub_user_oid_prefix + user.to_str();
562 }
563
564 std::string bucket_meta_oid(const rgw_bucket& bucket) const {
565 return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
566 }
567
568 std::string sub_meta_oid(const string& name) const {
569 return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
570 }
571
572 template <class T>
573 int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);
574
575 template <class T>
576 int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker);
577
578 int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker);
579
580 int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker);
581 int write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker);
582
583 public:
584 RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store),
585 user(_user),
586 obj_ctx(store->svc.sysobj->init_obj_ctx()) {
587 get_user_meta_obj(&user_meta_obj);
588 }
589
590 class Bucket {
591 friend class RGWUserPubSub;
592 RGWUserPubSub *ps;
593 rgw_bucket bucket;
594 rgw_raw_obj bucket_meta_obj;
595
596 // read the list of topics associated with a bucket and populate into result
597 // use version tacker to enforce atomicity between read/write
598 // return 0 on success or if no topic was associated with the bucket, error code otherwise
599 int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker);
600 // set the list of topics associated with a bucket
601 // use version tacker to enforce atomicity between read/write
602 // return 0 on success, error code otherwise
603 int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker);
604 public:
605 Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
606 ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
607 }
608
609 // read the list of topics associated with a bucket and populate into result
610 // return 0 on success or if no topic was associated with the bucket, error code otherwise
611 int get_topics(rgw_pubsub_bucket_topics *result);
612 // adds a topic + filter (event list, and possibly name and metadata filters) to a bucket
613 // assigning a notification name is optional (needed for S3 compatible notifications)
614 // if the topic already exist on the bucket, the filter event list may be updated
615 // for S3 compliant notifications the version with: s3_filter and notif_name should be used
616 // return -ENOENT if the topic does not exists
617 // return 0 on success, error code otherwise
618 int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events);
619 int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name);
620 // remove a topic and filter from bucket
621 // if the topic does not exists on the bucket it is a no-op (considered success)
622 // return -ENOENT if the topic does not exists
623 // return 0 on success, error code otherwise
624 int remove_notification(const string& topic_name);
625 };
626
627 // base class for subscription
628 class Sub {
629 friend class RGWUserPubSub;
630 protected:
631 RGWUserPubSub* const ps;
632 const std::string sub;
633 rgw_raw_obj sub_meta_obj;
634
635 int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker);
636 int write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker);
637 int remove_sub(RGWObjVersionTracker *objv_tracker);
638 public:
639 Sub(RGWUserPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
640 ps->get_sub_meta_obj(sub, &sub_meta_obj);
641 }
642
643 virtual ~Sub() = default;
644
645 int subscribe(const string& topic_name, const rgw_pubsub_sub_dest& dest, const std::string& s3_id="");
646 int unsubscribe(const string& topic_name);
647 int get_conf(rgw_pubsub_sub_config* result);
648
649 static const int DEFAULT_MAX_EVENTS = 100;
650 // followint virtual methods should only be called in derived
651 virtual int list_events(const string& marker, int max_events) {ceph_assert(false);}
652 virtual int remove_event(const string& event_id) {ceph_assert(false);}
653 virtual void dump(Formatter* f) const {ceph_assert(false);}
654 };
655
656 // subscription with templated list of events to support both S3 compliant and Ceph specific events
657 template<typename EventType>
658 class SubWithEvents : public Sub {
659 private:
660 struct list_events_result {
661 std::string next_marker;
662 bool is_truncated{false};
663 void dump(Formatter *f) const;
664 std::vector<EventType> events;
665 } list;
666
667 public:
668 SubWithEvents(RGWUserPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {}
669
670 virtual ~SubWithEvents() = default;
671
672 int list_events(const string& marker, int max_events) override;
673 int remove_event(const string& event_id) override;
674 void dump(Formatter* f) const override;
675 };
676
677 using BucketRef = std::shared_ptr<Bucket>;
678 using SubRef = std::shared_ptr<Sub>;
679
680 BucketRef get_bucket(const rgw_bucket& bucket) {
681 return std::make_shared<Bucket>(this, bucket);
682 }
683
684 SubRef get_sub(const string& sub) {
685 return std::make_shared<Sub>(this, sub);
686 }
687
688 SubRef get_sub_with_events(const string& sub) {
689 auto tmpsub = Sub(this, sub);
690 rgw_pubsub_sub_config conf;
691 if (tmpsub.get_conf(&conf) < 0) {
692 return nullptr;
693 }
694 if (conf.s3_id.empty()) {
695 return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub);
696 }
697 return std::make_shared<SubWithEvents<rgw_pubsub_s3_record>>(this, sub);
698 }
699
700 void get_user_meta_obj(rgw_raw_obj *obj) const {
701 *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, user_meta_oid());
702 }
703
704 void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
705 *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
706 }
707
708 void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
709 *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sub_meta_oid(name));
710 }
711
712 // get all topics defined for the user and populate them into "result"
713 // return 0 on success or if no topics exist, error code otherwise
714 int get_user_topics(rgw_pubsub_user_topics *result);
715 // get a topic with its subscriptions by its name and populate it into "result"
716 // return -ENOENT if the topic does not exists
717 // return 0 on success, error code otherwise
718 int get_topic(const string& name, rgw_pubsub_topic_subs *result);
719 // get a topic with by its name and populate it into "result"
720 // return -ENOENT if the topic does not exists
721 // return 0 on success, error code otherwise
722 int get_topic(const string& name, rgw_pubsub_topic *result);
723 // create a topic with a name only
724 // if the topic already exists it is a no-op (considered success)
725 // return 0 on success, error code otherwise
726 int create_topic(const string& name);
727 // create a topic with push destination information and ARN
728 // if the topic already exists the destination and ARN values may be updated (considered succsess)
729 // return 0 on success, error code otherwise
730 int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn);
731 // remove a topic according to its name
732 // if the topic does not exists it is a no-op (considered success)
733 // return 0 on success, error code otherwise
734 int remove_topic(const string& name);
735 };
736
737 template <class T>
738 int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker)
739 {
740 bufferlist bl;
741 int ret = rgw_get_system_obj(store, obj_ctx,
742 obj.pool, obj.oid,
743 bl,
744 objv_tracker,
745 nullptr, nullptr, nullptr);
746 if (ret < 0) {
747 return ret;
748 }
749
750 auto iter = bl.cbegin();
751 try {
752 decode(*result, iter);
753 } catch (buffer::error& err) {
754 return -EIO;
755 }
756
757 return 0;
758 }
759
760 template <class T>
761 int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker)
762 {
763 bufferlist bl;
764 encode(info, bl);
765
766 int ret = rgw_put_system_obj(store, obj.pool, obj.oid,
767 bl, false, objv_tracker,
768 real_time());
769 if (ret < 0) {
770 return ret;
771 }
772
773 obj_ctx.invalidate(const_cast<rgw_raw_obj&>(obj));
774 return 0;
775 }
776
777 #endif