]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_pubsub.h
Add patch for failing prerm scripts
[ceph.git] / ceph / src / rgw / rgw_pubsub.h
CommitLineData
eafe8130
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
11fdf7f2
TL
4#ifndef CEPH_RGW_PUBSUB_H
5#define CEPH_RGW_PUBSUB_H
6
7#include "rgw_common.h"
8#include "rgw_tools.h"
9#include "rgw_zone.h"
eafe8130
TL
10#include "rgw_rados.h"
11#include "rgw_notify_event_type.h"
11fdf7f2
TL
12#include "services/svc_sys_obj.h"
13
eafe8130
TL
14class XMLObj;
15
16struct rgw_s3_key_filter {
17 std::string prefix_rule;
18 std::string suffix_rule;
19 std::string regex_rule;
20
21 bool has_content() const;
22
23 bool decode_xml(XMLObj *obj);
24 void dump_xml(Formatter *f) const;
25
26 void encode(bufferlist& bl) const {
27 ENCODE_START(1, 1, bl);
28 encode(prefix_rule, bl);
29 encode(suffix_rule, bl);
30 encode(regex_rule, bl);
31 ENCODE_FINISH(bl);
32 }
33
34 void decode(bufferlist::const_iterator& bl) {
35 DECODE_START(1, bl);
36 decode(prefix_rule, bl);
37 decode(suffix_rule, bl);
38 decode(regex_rule, bl);
39 DECODE_FINISH(bl);
40 }
41};
42WRITE_CLASS_ENCODER(rgw_s3_key_filter)
43
44using Metadata = std::map<std::string, std::string>;
45
46struct rgw_s3_metadata_filter {
47 Metadata metadata;
48
49 bool has_content() const;
50
51 bool decode_xml(XMLObj *obj);
52 void dump_xml(Formatter *f) const;
53
54 void encode(bufferlist& bl) const {
55 ENCODE_START(1, 1, bl);
56 encode(metadata, bl);
57 ENCODE_FINISH(bl);
58 }
59 void decode(bufferlist::const_iterator& bl) {
60 DECODE_START(1, bl);
61 decode(metadata, bl);
62 DECODE_FINISH(bl);
63 }
64};
65WRITE_CLASS_ENCODER(rgw_s3_metadata_filter)
66
67struct rgw_s3_filter {
68 rgw_s3_key_filter key_filter;
69 rgw_s3_metadata_filter metadata_filter;
70
71 bool has_content() const;
72
73 bool decode_xml(XMLObj *obj);
74 void dump_xml(Formatter *f) const;
75
76 void encode(bufferlist& bl) const {
77 ENCODE_START(1, 1, bl);
78 encode(key_filter, bl);
79 encode(metadata_filter, bl);
80 ENCODE_FINISH(bl);
81 }
82
83 void decode(bufferlist::const_iterator& bl) {
84 DECODE_START(1, bl);
85 decode(key_filter, bl);
86 decode(metadata_filter, bl);
87 DECODE_FINISH(bl);
88 }
89};
90WRITE_CLASS_ENCODER(rgw_s3_filter)
91
92using OptionalFilter = std::optional<rgw_s3_filter>;
93
94class rgw_pubsub_topic_filter;
95/* S3 notification configuration
96 * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
97<NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
98 <TopicConfiguration>
99 <Filter>
100 <S3Key>
101 <FilterRule>
102 <Name>suffix</Name>
103 <Value>jpg</Value>
104 </FilterRule>
105 </S3Key>
106 <S3Metadata>
107 <FilterRule>
108 <Name></Name>
109 <Value></Value>
110 </FilterRule>
111 </s3Metadata>
112 </Filter>
113 <Id>notification1</Id>
114 <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
115 <Event>s3:ObjectCreated:*</Event>
116 <Event>s3:ObjectRemoved:*</Event>
117 </TopicConfiguration>
118</NotificationConfiguration>
119*/
120struct rgw_pubsub_s3_notification {
121 // notification id
122 std::string id;
123 // types of events
124 rgw::notify::EventTypeList events;
125 // topic ARN
126 std::string topic_arn;
127 // filter rules
128 rgw_s3_filter filter;
129
130 bool decode_xml(XMLObj *obj);
131 void dump_xml(Formatter *f) const;
132
133 rgw_pubsub_s3_notification() = default;
134 // construct from rgw_pubsub_topic_filter (used by get/list notifications)
135 rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
136};
137
138// return true if the key matches the prefix/suffix/regex rules of the key filter
139bool match(const rgw_s3_key_filter& filter, const std::string& key);
140// return true if the key matches the metadata rules of the metadata filter
141bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata);
142// return true if the event type matches (equal or contained in) one of the events in the list
143bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event);
144
145struct rgw_pubsub_s3_notifications {
146 std::list<rgw_pubsub_s3_notification> list;
147 bool decode_xml(XMLObj *obj);
148 void dump_xml(Formatter *f) const;
149};
150
151/* S3 event records structure
152 * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
153{
154"Records":[
155 {
156 "eventVersion":""
157 "eventSource":"",
158 "awsRegion":"",
159 "eventTime":"",
160 "eventName":"",
161 "userIdentity":{
162 "principalId":""
163 },
164 "requestParameters":{
165 "sourceIPAddress":""
166 },
167 "responseElements":{
168 "x-amz-request-id":"",
169 "x-amz-id-2":""
170 },
171 "s3":{
172 "s3SchemaVersion":"1.0",
173 "configurationId":"",
174 "bucket":{
175 "name":"",
176 "ownerIdentity":{
177 "principalId":""
178 },
179 "arn":""
180 "id": ""
181 },
182 "object":{
183 "key":"",
184 "size": ,
185 "eTag":"",
186 "versionId":"",
187 "sequencer": "",
188 "metadata": ""
189 }
190 },
191 "eventId":"",
192 }
193]
194}*/
195
196struct rgw_pubsub_s3_record {
197 constexpr static const char* const json_type_single = "Record";
198 constexpr static const char* const json_type_plural = "Records";
199 // 2.1
200 std::string eventVersion;
201 // aws:s3
202 std::string eventSource;
203 // zonegroup
204 std::string awsRegion;
205 // time of the request
206 ceph::real_time eventTime;
207 // type of the event
208 std::string eventName;
209 // user that sent the requet (not implemented)
210 std::string userIdentity;
211 // IP address of source of the request (not implemented)
212 std::string sourceIPAddress;
213 // request ID (not implemented)
214 std::string x_amz_request_id;
215 // radosgw that received the request
216 std::string x_amz_id_2;
217 // 1.0
218 std::string s3SchemaVersion;
219 // ID received in the notification request
220 std::string configurationId;
221 // bucket name
222 std::string bucket_name;
223 // bucket owner (not implemented)
224 std::string bucket_ownerIdentity;
225 // bucket ARN
226 std::string bucket_arn;
227 // object key
228 std::string object_key;
229 // object size (not implemented)
230 uint64_t object_size;
231 // object etag
232 std::string object_etag;
233 // object version id bucket is versioned
234 std::string object_versionId;
235 // hexadecimal value used to determine event order for specific key
236 std::string object_sequencer;
237 // this is an rgw extension (not S3 standard)
238 // used to store a globally unique identifier of the event
239 // that could be used for acking
240 std::string id;
241 // this is an rgw extension holding the internal bucket id
242 std::string bucket_id;
243 // meta data
244 std::map<std::string, std::string> x_meta_map;
245
246 void encode(bufferlist& bl) const {
247 ENCODE_START(2, 1, bl);
248 encode(eventVersion, bl);
249 encode(eventSource, bl);
250 encode(awsRegion, bl);
251 encode(eventTime, bl);
252 encode(eventName, bl);
253 encode(userIdentity, bl);
254 encode(sourceIPAddress, bl);
255 encode(x_amz_request_id, bl);
256 encode(x_amz_id_2, bl);
257 encode(s3SchemaVersion, bl);
258 encode(configurationId, bl);
259 encode(bucket_name, bl);
260 encode(bucket_ownerIdentity, bl);
261 encode(bucket_arn, bl);
262 encode(object_key, bl);
263 encode(object_size, bl);
264 encode(object_etag, bl);
265 encode(object_versionId, bl);
266 encode(object_sequencer, bl);
267 encode(id, bl);
268 encode(bucket_id, bl);
269 encode(x_meta_map, bl);
270 ENCODE_FINISH(bl);
271 }
272
273 void decode(bufferlist::const_iterator& bl) {
274 DECODE_START(2, bl);
275 decode(eventVersion, bl);
276 decode(eventSource, bl);
277 decode(awsRegion, bl);
278 decode(eventTime, bl);
279 decode(eventName, bl);
280 decode(userIdentity, bl);
281 decode(sourceIPAddress, bl);
282 decode(x_amz_request_id, bl);
283 decode(x_amz_id_2, bl);
284 decode(s3SchemaVersion, bl);
285 decode(configurationId, bl);
286 decode(bucket_name, bl);
287 decode(bucket_ownerIdentity, bl);
288 decode(bucket_arn, bl);
289 decode(object_key, bl);
290 decode(object_size, bl);
291 decode(object_etag, bl);
292 decode(object_versionId, bl);
293 decode(object_sequencer, bl);
294 decode(id, bl);
295 if (struct_v >= 2) {
296 decode(bucket_id, bl);
297 decode(x_meta_map, bl);
298 }
299 DECODE_FINISH(bl);
300 }
301
302 void dump(Formatter *f) const;
303};
304WRITE_CLASS_ENCODER(rgw_pubsub_s3_record)
11fdf7f2
TL
305
306struct rgw_pubsub_event {
eafe8130
TL
307 constexpr static const char* const json_type_single = "event";
308 constexpr static const char* const json_type_plural = "events";
309 std::string id;
310 std::string event_name;
311 std::string source;
11fdf7f2
TL
312 ceph::real_time timestamp;
313 JSONFormattable info;
314
315 void encode(bufferlist& bl) const {
316 ENCODE_START(1, 1, bl);
317 encode(id, bl);
eafe8130 318 encode(event_name, bl);
11fdf7f2
TL
319 encode(source, bl);
320 encode(timestamp, bl);
321 encode(info, bl);
322 ENCODE_FINISH(bl);
323 }
324
325 void decode(bufferlist::const_iterator& bl) {
326 DECODE_START(1, bl);
327 decode(id, bl);
eafe8130 328 decode(event_name, bl);
11fdf7f2
TL
329 decode(source, bl);
330 decode(timestamp, bl);
331 decode(info, bl);
332 DECODE_FINISH(bl);
333 }
334
335 void dump(Formatter *f) const;
336};
337WRITE_CLASS_ENCODER(rgw_pubsub_event)
338
339struct rgw_pubsub_sub_dest {
eafe8130
TL
340 std::string bucket_name;
341 std::string oid_prefix;
342 std::string push_endpoint;
343 std::string push_endpoint_args;
344 std::string arn_topic;
11fdf7f2
TL
345
346 void encode(bufferlist& bl) const {
eafe8130 347 ENCODE_START(3, 1, bl);
11fdf7f2
TL
348 encode(bucket_name, bl);
349 encode(oid_prefix, bl);
350 encode(push_endpoint, bl);
351 encode(push_endpoint_args, bl);
eafe8130 352 encode(arn_topic, bl);
11fdf7f2
TL
353 ENCODE_FINISH(bl);
354 }
355
356 void decode(bufferlist::const_iterator& bl) {
eafe8130 357 DECODE_START(3, bl);
11fdf7f2
TL
358 decode(bucket_name, bl);
359 decode(oid_prefix, bl);
360 decode(push_endpoint, bl);
361 if (struct_v >= 2) {
362 decode(push_endpoint_args, bl);
363 }
eafe8130
TL
364 if (struct_v >= 3) {
365 decode(arn_topic, bl);
366 }
11fdf7f2
TL
367 DECODE_FINISH(bl);
368 }
369
370 void dump(Formatter *f) const;
eafe8130 371 void dump_xml(Formatter *f) const;
11fdf7f2
TL
372};
373WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
374
375struct rgw_pubsub_sub_config {
376 rgw_user user;
eafe8130
TL
377 std::string name;
378 std::string topic;
11fdf7f2 379 rgw_pubsub_sub_dest dest;
eafe8130 380 std::string s3_id;
11fdf7f2
TL
381
382 void encode(bufferlist& bl) const {
eafe8130 383 ENCODE_START(2, 1, bl);
11fdf7f2
TL
384 encode(user, bl);
385 encode(name, bl);
386 encode(topic, bl);
387 encode(dest, bl);
eafe8130 388 encode(s3_id, bl);
11fdf7f2
TL
389 ENCODE_FINISH(bl);
390 }
391
392 void decode(bufferlist::const_iterator& bl) {
eafe8130 393 DECODE_START(2, bl);
11fdf7f2
TL
394 decode(user, bl);
395 decode(name, bl);
396 decode(topic, bl);
397 decode(dest, bl);
eafe8130
TL
398 if (struct_v >= 2) {
399 decode(s3_id, bl);
400 }
11fdf7f2
TL
401 DECODE_FINISH(bl);
402 }
403
404 void dump(Formatter *f) const;
405};
406WRITE_CLASS_ENCODER(rgw_pubsub_sub_config)
407
408struct rgw_pubsub_topic {
409 rgw_user user;
eafe8130
TL
410 std::string name;
411 rgw_pubsub_sub_dest dest;
412 std::string arn;
11fdf7f2
TL
413
414 void encode(bufferlist& bl) const {
eafe8130 415 ENCODE_START(2, 1, bl);
11fdf7f2
TL
416 encode(user, bl);
417 encode(name, bl);
eafe8130
TL
418 encode(dest, bl);
419 encode(arn, bl);
11fdf7f2
TL
420 ENCODE_FINISH(bl);
421 }
422
423 void decode(bufferlist::const_iterator& bl) {
eafe8130 424 DECODE_START(2, bl);
11fdf7f2
TL
425 decode(user, bl);
426 decode(name, bl);
eafe8130
TL
427 if (struct_v >= 2) {
428 decode(dest, bl);
429 decode(arn, bl);
430 }
11fdf7f2
TL
431 DECODE_FINISH(bl);
432 }
433
434 string to_str() const {
435 return user.to_str() + "/" + name;
436 }
437
438 void dump(Formatter *f) const;
eafe8130 439 void dump_xml(Formatter *f) const;
11fdf7f2
TL
440
441 bool operator<(const rgw_pubsub_topic& t) const {
442 return to_str().compare(t.to_str());
443 }
444};
445WRITE_CLASS_ENCODER(rgw_pubsub_topic)
446
447struct rgw_pubsub_topic_subs {
448 rgw_pubsub_topic topic;
eafe8130 449 std::set<std::string> subs;
11fdf7f2
TL
450
451 void encode(bufferlist& bl) const {
452 ENCODE_START(1, 1, bl);
453 encode(topic, bl);
454 encode(subs, bl);
455 ENCODE_FINISH(bl);
456 }
457
458 void decode(bufferlist::const_iterator& bl) {
459 DECODE_START(1, bl);
460 decode(topic, bl);
461 decode(subs, bl);
462 DECODE_FINISH(bl);
463 }
464
465 void dump(Formatter *f) const;
466};
467WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)
468
469struct rgw_pubsub_topic_filter {
470 rgw_pubsub_topic topic;
eafe8130
TL
471 rgw::notify::EventTypeList events;
472 std::string s3_id;
473 rgw_s3_filter s3_filter;
11fdf7f2
TL
474
475 void encode(bufferlist& bl) const {
eafe8130 476 ENCODE_START(3, 1, bl);
11fdf7f2 477 encode(topic, bl);
eafe8130
TL
478 // events are stored as a vector of strings
479 std::vector<std::string> tmp_events;
480 const auto converter = s3_id.empty() ? rgw::notify::to_ceph_string : rgw::notify::to_string;
481 std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter);
482 encode(tmp_events, bl);
483 encode(s3_id, bl);
484 encode(s3_filter, bl);
11fdf7f2
TL
485 ENCODE_FINISH(bl);
486 }
487
488 void decode(bufferlist::const_iterator& bl) {
eafe8130 489 DECODE_START(3, bl);
11fdf7f2 490 decode(topic, bl);
eafe8130
TL
491 // events are stored as a vector of strings
492 events.clear();
493 std::vector<std::string> tmp_events;
494 decode(tmp_events, bl);
495 std::transform(tmp_events.begin(), tmp_events.end(), std::back_inserter(events), rgw::notify::from_string);
496 if (struct_v >= 2) {
497 decode(s3_id, bl);
498 }
499 if (struct_v >= 3) {
500 decode(s3_filter, bl);
501 }
11fdf7f2
TL
502 DECODE_FINISH(bl);
503 }
504
505 void dump(Formatter *f) const;
506};
507WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter)
508
509struct rgw_pubsub_bucket_topics {
eafe8130 510 std::map<std::string, rgw_pubsub_topic_filter> topics;
11fdf7f2
TL
511
512 void encode(bufferlist& bl) const {
513 ENCODE_START(1, 1, bl);
514 encode(topics, bl);
515 ENCODE_FINISH(bl);
516 }
517
518 void decode(bufferlist::const_iterator& bl) {
519 DECODE_START(1, bl);
520 decode(topics, bl);
521 DECODE_FINISH(bl);
522 }
523
524 void dump(Formatter *f) const;
525};
526WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
527
528struct rgw_pubsub_user_topics {
eafe8130 529 std::map<std::string, rgw_pubsub_topic_subs> topics;
11fdf7f2
TL
530
531 void encode(bufferlist& bl) const {
532 ENCODE_START(1, 1, bl);
533 encode(topics, bl);
534 ENCODE_FINISH(bl);
535 }
536
537 void decode(bufferlist::const_iterator& bl) {
538 DECODE_START(1, bl);
539 decode(topics, bl);
540 DECODE_FINISH(bl);
541 }
542
543 void dump(Formatter *f) const;
eafe8130 544 void dump_xml(Formatter *f) const;
11fdf7f2
TL
545};
546WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)
547
eafe8130 548static std::string pubsub_user_oid_prefix = "pubsub.user.";
11fdf7f2
TL
549
550class RGWUserPubSub
551{
552 friend class Bucket;
553
554 RGWRados *store;
555 rgw_user user;
556 RGWSysObjectCtx obj_ctx;
557
558 rgw_raw_obj user_meta_obj;
559
eafe8130 560 std::string user_meta_oid() const {
11fdf7f2
TL
561 return pubsub_user_oid_prefix + user.to_str();
562 }
563
eafe8130 564 std::string bucket_meta_oid(const rgw_bucket& bucket) const {
11fdf7f2
TL
565 return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
566 }
567
eafe8130 568 std::string sub_meta_oid(const string& name) const {
11fdf7f2
TL
569 return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
570 }
571
572 template <class T>
573 int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);
574
575 template <class T>
576 int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker);
577
578 int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker);
579
580 int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker);
581 int write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker);
eafe8130 582
11fdf7f2
TL
583public:
584 RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store),
585 user(_user),
586 obj_ctx(store->svc.sysobj->init_obj_ctx()) {
587 get_user_meta_obj(&user_meta_obj);
588 }
589
590 class Bucket {
591 friend class RGWUserPubSub;
592 RGWUserPubSub *ps;
593 rgw_bucket bucket;
594 rgw_raw_obj bucket_meta_obj;
595
eafe8130
TL
596 // read the list of topics associated with a bucket and populate into result
597 // use version tacker to enforce atomicity between read/write
598 // return 0 on success or if no topic was associated with the bucket, error code otherwise
11fdf7f2 599 int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker);
eafe8130
TL
600 // set the list of topics associated with a bucket
601 // use version tacker to enforce atomicity between read/write
602 // return 0 on success, error code otherwise
11fdf7f2
TL
603 int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker);
604 public:
605 Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
606 ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
607 }
608
eafe8130
TL
609 // read the list of topics associated with a bucket and populate into result
610 // return 0 on success or if no topic was associated with the bucket, error code otherwise
11fdf7f2 611 int get_topics(rgw_pubsub_bucket_topics *result);
eafe8130
TL
612 // adds a topic + filter (event list, and possibly name and metadata filters) to a bucket
613 // assigning a notification name is optional (needed for S3 compatible notifications)
614 // if the topic already exist on the bucket, the filter event list may be updated
615 // for S3 compliant notifications the version with: s3_filter and notif_name should be used
616 // return -ENOENT if the topic does not exists
617 // return 0 on success, error code otherwise
618 int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events);
619 int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name);
620 // remove a topic and filter from bucket
621 // if the topic does not exists on the bucket it is a no-op (considered success)
622 // return -ENOENT if the topic does not exists
623 // return 0 on success, error code otherwise
11fdf7f2
TL
624 int remove_notification(const string& topic_name);
625 };
626
eafe8130 627 // base class for subscription
11fdf7f2
TL
628 class Sub {
629 friend class RGWUserPubSub;
eafe8130
TL
630 protected:
631 RGWUserPubSub* const ps;
632 const std::string sub;
11fdf7f2
TL
633 rgw_raw_obj sub_meta_obj;
634
635 int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker);
636 int write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker);
637 int remove_sub(RGWObjVersionTracker *objv_tracker);
638 public:
eafe8130 639 Sub(RGWUserPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
11fdf7f2
TL
640 ps->get_sub_meta_obj(sub, &sub_meta_obj);
641 }
642
eafe8130
TL
643 virtual ~Sub() = default;
644
645 int subscribe(const string& topic_name, const rgw_pubsub_sub_dest& dest, const std::string& s3_id="");
11fdf7f2 646 int unsubscribe(const string& topic_name);
eafe8130
TL
647 int get_conf(rgw_pubsub_sub_config* result);
648
649 static const int DEFAULT_MAX_EVENTS = 100;
650 // followint virtual methods should only be called in derived
651 virtual int list_events(const string& marker, int max_events) {ceph_assert(false);}
652 virtual int remove_event(const string& event_id) {ceph_assert(false);}
653 virtual void dump(Formatter* f) const {ceph_assert(false);}
654 };
11fdf7f2 655
eafe8130
TL
656 // subscription with templated list of events to support both S3 compliant and Ceph specific events
657 template<typename EventType>
658 class SubWithEvents : public Sub {
659 private:
11fdf7f2 660 struct list_events_result {
eafe8130 661 std::string next_marker;
11fdf7f2 662 bool is_truncated{false};
11fdf7f2 663 void dump(Formatter *f) const;
eafe8130
TL
664 std::vector<EventType> events;
665 } list;
11fdf7f2 666
eafe8130
TL
667 public:
668 SubWithEvents(RGWUserPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {}
669
670 virtual ~SubWithEvents() = default;
671
672 int list_events(const string& marker, int max_events) override;
673 int remove_event(const string& event_id) override;
674 void dump(Formatter* f) const override;
11fdf7f2
TL
675 };
676
677 using BucketRef = std::shared_ptr<Bucket>;
678 using SubRef = std::shared_ptr<Sub>;
679
680 BucketRef get_bucket(const rgw_bucket& bucket) {
681 return std::make_shared<Bucket>(this, bucket);
682 }
683
684 SubRef get_sub(const string& sub) {
685 return std::make_shared<Sub>(this, sub);
686 }
eafe8130
TL
687
688 SubRef get_sub_with_events(const string& sub) {
689 auto tmpsub = Sub(this, sub);
690 rgw_pubsub_sub_config conf;
691 if (tmpsub.get_conf(&conf) < 0) {
692 return nullptr;
693 }
694 if (conf.s3_id.empty()) {
695 return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub);
696 }
697 return std::make_shared<SubWithEvents<rgw_pubsub_s3_record>>(this, sub);
698 }
699
11fdf7f2
TL
700 void get_user_meta_obj(rgw_raw_obj *obj) const {
701 *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, user_meta_oid());
702 }
703
704 void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
705 *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
706 }
707
708 void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
709 *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sub_meta_oid(name));
710 }
711
eafe8130
TL
712 // get all topics defined for the user and populate them into "result"
713 // return 0 on success or if no topics exist, error code otherwise
11fdf7f2 714 int get_user_topics(rgw_pubsub_user_topics *result);
eafe8130
TL
715 // get a topic with its subscriptions by its name and populate it into "result"
716 // return -ENOENT if the topic does not exists
717 // return 0 on success, error code otherwise
11fdf7f2 718 int get_topic(const string& name, rgw_pubsub_topic_subs *result);
eafe8130
TL
719 // get a topic with by its name and populate it into "result"
720 // return -ENOENT if the topic does not exists
721 // return 0 on success, error code otherwise
722 int get_topic(const string& name, rgw_pubsub_topic *result);
723 // create a topic with a name only
724 // if the topic already exists it is a no-op (considered success)
725 // return 0 on success, error code otherwise
11fdf7f2 726 int create_topic(const string& name);
eafe8130
TL
727 // create a topic with push destination information and ARN
728 // if the topic already exists the destination and ARN values may be updated (considered succsess)
729 // return 0 on success, error code otherwise
730 int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn);
731 // remove a topic according to its name
732 // if the topic does not exists it is a no-op (considered success)
733 // return 0 on success, error code otherwise
11fdf7f2
TL
734 int remove_topic(const string& name);
735};
736
737template <class T>
738int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker)
739{
740 bufferlist bl;
741 int ret = rgw_get_system_obj(store, obj_ctx,
742 obj.pool, obj.oid,
743 bl,
744 objv_tracker,
745 nullptr, nullptr, nullptr);
746 if (ret < 0) {
747 return ret;
748 }
749
750 auto iter = bl.cbegin();
751 try {
752 decode(*result, iter);
753 } catch (buffer::error& err) {
754 return -EIO;
755 }
756
757 return 0;
758}
759
760template <class T>
761int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker)
762{
763 bufferlist bl;
764 encode(info, bl);
765
766 int ret = rgw_put_system_obj(store, obj.pool, obj.oid,
767 bl, false, objv_tracker,
768 real_time());
769 if (ret < 0) {
770 return ret;
771 }
772
eafe8130 773 obj_ctx.invalidate(const_cast<rgw_raw_obj&>(obj));
11fdf7f2
TL
774 return 0;
775}
776
777#endif