]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_pubsub.h
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / rgw / rgw_pubsub.h
CommitLineData
eafe8130
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
11fdf7f2
TL
4#ifndef CEPH_RGW_PUBSUB_H
5#define CEPH_RGW_PUBSUB_H
6
7#include "rgw_common.h"
8#include "rgw_tools.h"
9#include "rgw_zone.h"
eafe8130
TL
10#include "rgw_rados.h"
11#include "rgw_notify_event_type.h"
11fdf7f2
TL
12#include "services/svc_sys_obj.h"
13
eafe8130
TL
14class XMLObj;
15
16struct rgw_s3_key_filter {
17 std::string prefix_rule;
18 std::string suffix_rule;
19 std::string regex_rule;
20
21 bool has_content() const;
22
23 bool decode_xml(XMLObj *obj);
24 void dump_xml(Formatter *f) const;
25
26 void encode(bufferlist& bl) const {
27 ENCODE_START(1, 1, bl);
28 encode(prefix_rule, bl);
29 encode(suffix_rule, bl);
30 encode(regex_rule, bl);
31 ENCODE_FINISH(bl);
32 }
33
34 void decode(bufferlist::const_iterator& bl) {
35 DECODE_START(1, bl);
36 decode(prefix_rule, bl);
37 decode(suffix_rule, bl);
38 decode(regex_rule, bl);
39 DECODE_FINISH(bl);
40 }
41};
42WRITE_CLASS_ENCODER(rgw_s3_key_filter)
43
44using Metadata = std::map<std::string, std::string>;
45
46struct rgw_s3_metadata_filter {
47 Metadata metadata;
48
49 bool has_content() const;
50
51 bool decode_xml(XMLObj *obj);
52 void dump_xml(Formatter *f) const;
53
54 void encode(bufferlist& bl) const {
55 ENCODE_START(1, 1, bl);
56 encode(metadata, bl);
57 ENCODE_FINISH(bl);
58 }
59 void decode(bufferlist::const_iterator& bl) {
60 DECODE_START(1, bl);
61 decode(metadata, bl);
62 DECODE_FINISH(bl);
63 }
64};
65WRITE_CLASS_ENCODER(rgw_s3_metadata_filter)
66
67struct rgw_s3_filter {
68 rgw_s3_key_filter key_filter;
69 rgw_s3_metadata_filter metadata_filter;
70
71 bool has_content() const;
72
73 bool decode_xml(XMLObj *obj);
74 void dump_xml(Formatter *f) const;
75
76 void encode(bufferlist& bl) const {
77 ENCODE_START(1, 1, bl);
78 encode(key_filter, bl);
79 encode(metadata_filter, bl);
80 ENCODE_FINISH(bl);
81 }
82
83 void decode(bufferlist::const_iterator& bl) {
84 DECODE_START(1, bl);
85 decode(key_filter, bl);
86 decode(metadata_filter, bl);
87 DECODE_FINISH(bl);
88 }
89};
90WRITE_CLASS_ENCODER(rgw_s3_filter)
91
92using OptionalFilter = std::optional<rgw_s3_filter>;
93
94class rgw_pubsub_topic_filter;
95/* S3 notification configuration
96 * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
97<NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
98 <TopicConfiguration>
99 <Filter>
100 <S3Key>
101 <FilterRule>
102 <Name>suffix</Name>
103 <Value>jpg</Value>
104 </FilterRule>
105 </S3Key>
106 <S3Metadata>
107 <FilterRule>
108 <Name></Name>
109 <Value></Value>
110 </FilterRule>
92f5a8d4 111 </S3Metadata>
eafe8130
TL
112 </Filter>
113 <Id>notification1</Id>
114 <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
115 <Event>s3:ObjectCreated:*</Event>
116 <Event>s3:ObjectRemoved:*</Event>
117 </TopicConfiguration>
118</NotificationConfiguration>
119*/
120struct rgw_pubsub_s3_notification {
121 // notification id
122 std::string id;
123 // types of events
124 rgw::notify::EventTypeList events;
125 // topic ARN
126 std::string topic_arn;
127 // filter rules
128 rgw_s3_filter filter;
129
130 bool decode_xml(XMLObj *obj);
131 void dump_xml(Formatter *f) const;
132
133 rgw_pubsub_s3_notification() = default;
134 // construct from rgw_pubsub_topic_filter (used by get/list notifications)
135 rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
136};
137
138// return true if the key matches the prefix/suffix/regex rules of the key filter
139bool match(const rgw_s3_key_filter& filter, const std::string& key);
140// return true if the key matches the metadata rules of the metadata filter
141bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata);
142// return true if the event type matches (equal or contained in) one of the events in the list
143bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event);
144
145struct rgw_pubsub_s3_notifications {
146 std::list<rgw_pubsub_s3_notification> list;
147 bool decode_xml(XMLObj *obj);
148 void dump_xml(Formatter *f) const;
149};
150
151/* S3 event records structure
152 * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
153{
154"Records":[
155 {
156 "eventVersion":""
157 "eventSource":"",
158 "awsRegion":"",
159 "eventTime":"",
160 "eventName":"",
161 "userIdentity":{
162 "principalId":""
163 },
164 "requestParameters":{
165 "sourceIPAddress":""
166 },
167 "responseElements":{
168 "x-amz-request-id":"",
169 "x-amz-id-2":""
170 },
171 "s3":{
172 "s3SchemaVersion":"1.0",
173 "configurationId":"",
174 "bucket":{
175 "name":"",
176 "ownerIdentity":{
177 "principalId":""
178 },
179 "arn":""
180 "id": ""
181 },
182 "object":{
183 "key":"",
184 "size": ,
185 "eTag":"",
186 "versionId":"",
187 "sequencer": "",
188 "metadata": ""
189 }
190 },
191 "eventId":"",
192 }
193]
194}*/
195
196struct rgw_pubsub_s3_record {
eafe8130 197 constexpr static const char* const json_type_plural = "Records";
92f5a8d4 198 std::string eventVersion = "2.2";
eafe8130 199 // aws:s3
92f5a8d4 200 std::string eventSource = "ceph:s3";
eafe8130
TL
201 // zonegroup
202 std::string awsRegion;
203 // time of the request
204 ceph::real_time eventTime;
205 // type of the event
206 std::string eventName;
92f5a8d4 207 // user that sent the request
eafe8130
TL
208 std::string userIdentity;
209 // IP address of source of the request (not implemented)
210 std::string sourceIPAddress;
211 // request ID (not implemented)
212 std::string x_amz_request_id;
213 // radosgw that received the request
214 std::string x_amz_id_2;
92f5a8d4 215 std::string s3SchemaVersion = "1.0";
eafe8130
TL
216 // ID received in the notification request
217 std::string configurationId;
218 // bucket name
219 std::string bucket_name;
92f5a8d4 220 // bucket owner
eafe8130
TL
221 std::string bucket_ownerIdentity;
222 // bucket ARN
223 std::string bucket_arn;
224 // object key
225 std::string object_key;
92f5a8d4
TL
226 // object size
227 uint64_t object_size = 0;
eafe8130
TL
228 // object etag
229 std::string object_etag;
230 // object version id bucket is versioned
231 std::string object_versionId;
232 // hexadecimal value used to determine event order for specific key
233 std::string object_sequencer;
234 // this is an rgw extension (not S3 standard)
235 // used to store a globally unique identifier of the event
92f5a8d4 236 // that could be used for acking or any other identification of the event
eafe8130
TL
237 std::string id;
238 // this is an rgw extension holding the internal bucket id
239 std::string bucket_id;
240 // meta data
241 std::map<std::string, std::string> x_meta_map;
242
243 void encode(bufferlist& bl) const {
244 ENCODE_START(2, 1, bl);
245 encode(eventVersion, bl);
246 encode(eventSource, bl);
247 encode(awsRegion, bl);
248 encode(eventTime, bl);
249 encode(eventName, bl);
250 encode(userIdentity, bl);
251 encode(sourceIPAddress, bl);
252 encode(x_amz_request_id, bl);
253 encode(x_amz_id_2, bl);
254 encode(s3SchemaVersion, bl);
255 encode(configurationId, bl);
256 encode(bucket_name, bl);
257 encode(bucket_ownerIdentity, bl);
258 encode(bucket_arn, bl);
259 encode(object_key, bl);
260 encode(object_size, bl);
261 encode(object_etag, bl);
262 encode(object_versionId, bl);
263 encode(object_sequencer, bl);
264 encode(id, bl);
265 encode(bucket_id, bl);
266 encode(x_meta_map, bl);
267 ENCODE_FINISH(bl);
268 }
269
270 void decode(bufferlist::const_iterator& bl) {
271 DECODE_START(2, bl);
272 decode(eventVersion, bl);
273 decode(eventSource, bl);
274 decode(awsRegion, bl);
275 decode(eventTime, bl);
276 decode(eventName, bl);
277 decode(userIdentity, bl);
278 decode(sourceIPAddress, bl);
279 decode(x_amz_request_id, bl);
280 decode(x_amz_id_2, bl);
281 decode(s3SchemaVersion, bl);
282 decode(configurationId, bl);
283 decode(bucket_name, bl);
284 decode(bucket_ownerIdentity, bl);
285 decode(bucket_arn, bl);
286 decode(object_key, bl);
287 decode(object_size, bl);
288 decode(object_etag, bl);
289 decode(object_versionId, bl);
290 decode(object_sequencer, bl);
291 decode(id, bl);
292 if (struct_v >= 2) {
293 decode(bucket_id, bl);
294 decode(x_meta_map, bl);
295 }
296 DECODE_FINISH(bl);
297 }
298
299 void dump(Formatter *f) const;
300};
301WRITE_CLASS_ENCODER(rgw_pubsub_s3_record)
11fdf7f2
TL
302
303struct rgw_pubsub_event {
eafe8130
TL
304 constexpr static const char* const json_type_plural = "events";
305 std::string id;
306 std::string event_name;
307 std::string source;
11fdf7f2
TL
308 ceph::real_time timestamp;
309 JSONFormattable info;
310
311 void encode(bufferlist& bl) const {
312 ENCODE_START(1, 1, bl);
313 encode(id, bl);
eafe8130 314 encode(event_name, bl);
11fdf7f2
TL
315 encode(source, bl);
316 encode(timestamp, bl);
317 encode(info, bl);
318 ENCODE_FINISH(bl);
319 }
320
321 void decode(bufferlist::const_iterator& bl) {
322 DECODE_START(1, bl);
323 decode(id, bl);
eafe8130 324 decode(event_name, bl);
11fdf7f2
TL
325 decode(source, bl);
326 decode(timestamp, bl);
327 decode(info, bl);
328 DECODE_FINISH(bl);
329 }
330
331 void dump(Formatter *f) const;
332};
333WRITE_CLASS_ENCODER(rgw_pubsub_event)
334
92f5a8d4
TL
335// settign a unique ID for an event/record based on object hash and timestamp
336void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
337
11fdf7f2 338struct rgw_pubsub_sub_dest {
eafe8130
TL
339 std::string bucket_name;
340 std::string oid_prefix;
341 std::string push_endpoint;
342 std::string push_endpoint_args;
343 std::string arn_topic;
11fdf7f2
TL
344
345 void encode(bufferlist& bl) const {
eafe8130 346 ENCODE_START(3, 1, bl);
11fdf7f2
TL
347 encode(bucket_name, bl);
348 encode(oid_prefix, bl);
349 encode(push_endpoint, bl);
350 encode(push_endpoint_args, bl);
eafe8130 351 encode(arn_topic, bl);
11fdf7f2
TL
352 ENCODE_FINISH(bl);
353 }
354
355 void decode(bufferlist::const_iterator& bl) {
eafe8130 356 DECODE_START(3, bl);
11fdf7f2
TL
357 decode(bucket_name, bl);
358 decode(oid_prefix, bl);
359 decode(push_endpoint, bl);
360 if (struct_v >= 2) {
361 decode(push_endpoint_args, bl);
362 }
eafe8130
TL
363 if (struct_v >= 3) {
364 decode(arn_topic, bl);
365 }
11fdf7f2
TL
366 DECODE_FINISH(bl);
367 }
368
369 void dump(Formatter *f) const;
eafe8130 370 void dump_xml(Formatter *f) const;
11fdf7f2
TL
371};
372WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
373
374struct rgw_pubsub_sub_config {
375 rgw_user user;
eafe8130
TL
376 std::string name;
377 std::string topic;
11fdf7f2 378 rgw_pubsub_sub_dest dest;
eafe8130 379 std::string s3_id;
11fdf7f2
TL
380
381 void encode(bufferlist& bl) const {
eafe8130 382 ENCODE_START(2, 1, bl);
11fdf7f2
TL
383 encode(user, bl);
384 encode(name, bl);
385 encode(topic, bl);
386 encode(dest, bl);
eafe8130 387 encode(s3_id, bl);
11fdf7f2
TL
388 ENCODE_FINISH(bl);
389 }
390
391 void decode(bufferlist::const_iterator& bl) {
eafe8130 392 DECODE_START(2, bl);
11fdf7f2
TL
393 decode(user, bl);
394 decode(name, bl);
395 decode(topic, bl);
396 decode(dest, bl);
eafe8130
TL
397 if (struct_v >= 2) {
398 decode(s3_id, bl);
399 }
11fdf7f2
TL
400 DECODE_FINISH(bl);
401 }
402
403 void dump(Formatter *f) const;
404};
405WRITE_CLASS_ENCODER(rgw_pubsub_sub_config)
406
407struct rgw_pubsub_topic {
408 rgw_user user;
eafe8130
TL
409 std::string name;
410 rgw_pubsub_sub_dest dest;
411 std::string arn;
11fdf7f2
TL
412
413 void encode(bufferlist& bl) const {
eafe8130 414 ENCODE_START(2, 1, bl);
11fdf7f2
TL
415 encode(user, bl);
416 encode(name, bl);
eafe8130
TL
417 encode(dest, bl);
418 encode(arn, bl);
11fdf7f2
TL
419 ENCODE_FINISH(bl);
420 }
421
422 void decode(bufferlist::const_iterator& bl) {
eafe8130 423 DECODE_START(2, bl);
11fdf7f2
TL
424 decode(user, bl);
425 decode(name, bl);
eafe8130
TL
426 if (struct_v >= 2) {
427 decode(dest, bl);
428 decode(arn, bl);
429 }
11fdf7f2
TL
430 DECODE_FINISH(bl);
431 }
432
433 string to_str() const {
434 return user.to_str() + "/" + name;
435 }
436
437 void dump(Formatter *f) const;
eafe8130 438 void dump_xml(Formatter *f) const;
11fdf7f2
TL
439
440 bool operator<(const rgw_pubsub_topic& t) const {
441 return to_str().compare(t.to_str());
442 }
443};
444WRITE_CLASS_ENCODER(rgw_pubsub_topic)
445
446struct rgw_pubsub_topic_subs {
447 rgw_pubsub_topic topic;
eafe8130 448 std::set<std::string> subs;
11fdf7f2
TL
449
450 void encode(bufferlist& bl) const {
451 ENCODE_START(1, 1, bl);
452 encode(topic, bl);
453 encode(subs, bl);
454 ENCODE_FINISH(bl);
455 }
456
457 void decode(bufferlist::const_iterator& bl) {
458 DECODE_START(1, bl);
459 decode(topic, bl);
460 decode(subs, bl);
461 DECODE_FINISH(bl);
462 }
463
464 void dump(Formatter *f) const;
465};
466WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)
467
468struct rgw_pubsub_topic_filter {
469 rgw_pubsub_topic topic;
eafe8130
TL
470 rgw::notify::EventTypeList events;
471 std::string s3_id;
472 rgw_s3_filter s3_filter;
11fdf7f2
TL
473
474 void encode(bufferlist& bl) const {
eafe8130 475 ENCODE_START(3, 1, bl);
11fdf7f2 476 encode(topic, bl);
eafe8130
TL
477 // events are stored as a vector of strings
478 std::vector<std::string> tmp_events;
479 const auto converter = s3_id.empty() ? rgw::notify::to_ceph_string : rgw::notify::to_string;
480 std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter);
481 encode(tmp_events, bl);
482 encode(s3_id, bl);
483 encode(s3_filter, bl);
11fdf7f2
TL
484 ENCODE_FINISH(bl);
485 }
486
487 void decode(bufferlist::const_iterator& bl) {
eafe8130 488 DECODE_START(3, bl);
11fdf7f2 489 decode(topic, bl);
eafe8130
TL
490 // events are stored as a vector of strings
491 events.clear();
492 std::vector<std::string> tmp_events;
493 decode(tmp_events, bl);
494 std::transform(tmp_events.begin(), tmp_events.end(), std::back_inserter(events), rgw::notify::from_string);
495 if (struct_v >= 2) {
496 decode(s3_id, bl);
497 }
498 if (struct_v >= 3) {
499 decode(s3_filter, bl);
500 }
11fdf7f2
TL
501 DECODE_FINISH(bl);
502 }
503
504 void dump(Formatter *f) const;
505};
506WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter)
507
508struct rgw_pubsub_bucket_topics {
eafe8130 509 std::map<std::string, rgw_pubsub_topic_filter> topics;
11fdf7f2
TL
510
511 void encode(bufferlist& bl) const {
512 ENCODE_START(1, 1, bl);
513 encode(topics, bl);
514 ENCODE_FINISH(bl);
515 }
516
517 void decode(bufferlist::const_iterator& bl) {
518 DECODE_START(1, bl);
519 decode(topics, bl);
520 DECODE_FINISH(bl);
521 }
522
523 void dump(Formatter *f) const;
524};
525WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
526
527struct rgw_pubsub_user_topics {
eafe8130 528 std::map<std::string, rgw_pubsub_topic_subs> topics;
11fdf7f2
TL
529
530 void encode(bufferlist& bl) const {
531 ENCODE_START(1, 1, bl);
532 encode(topics, bl);
533 ENCODE_FINISH(bl);
534 }
535
536 void decode(bufferlist::const_iterator& bl) {
537 DECODE_START(1, bl);
538 decode(topics, bl);
539 DECODE_FINISH(bl);
540 }
541
542 void dump(Formatter *f) const;
eafe8130 543 void dump_xml(Formatter *f) const;
11fdf7f2
TL
544};
545WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)
546
eafe8130 547static std::string pubsub_user_oid_prefix = "pubsub.user.";
11fdf7f2
TL
548
549class RGWUserPubSub
550{
551 friend class Bucket;
552
553 RGWRados *store;
554 rgw_user user;
555 RGWSysObjectCtx obj_ctx;
556
557 rgw_raw_obj user_meta_obj;
558
eafe8130 559 std::string user_meta_oid() const {
11fdf7f2
TL
560 return pubsub_user_oid_prefix + user.to_str();
561 }
562
eafe8130 563 std::string bucket_meta_oid(const rgw_bucket& bucket) const {
11fdf7f2
TL
564 return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
565 }
566
eafe8130 567 std::string sub_meta_oid(const string& name) const {
11fdf7f2
TL
568 return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
569 }
570
571 template <class T>
572 int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);
573
574 template <class T>
575 int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker);
576
577 int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker);
578
579 int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker);
580 int write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker);
eafe8130 581
11fdf7f2
TL
582public:
583 RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store),
584 user(_user),
585 obj_ctx(store->svc.sysobj->init_obj_ctx()) {
586 get_user_meta_obj(&user_meta_obj);
587 }
588
589 class Bucket {
590 friend class RGWUserPubSub;
591 RGWUserPubSub *ps;
592 rgw_bucket bucket;
593 rgw_raw_obj bucket_meta_obj;
594
eafe8130
TL
595 // read the list of topics associated with a bucket and populate into result
596 // use version tacker to enforce atomicity between read/write
597 // return 0 on success or if no topic was associated with the bucket, error code otherwise
11fdf7f2 598 int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker);
eafe8130
TL
599 // set the list of topics associated with a bucket
600 // use version tacker to enforce atomicity between read/write
601 // return 0 on success, error code otherwise
11fdf7f2
TL
602 int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker);
603 public:
604 Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
605 ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
606 }
607
eafe8130
TL
608 // read the list of topics associated with a bucket and populate into result
609 // return 0 on success or if no topic was associated with the bucket, error code otherwise
11fdf7f2 610 int get_topics(rgw_pubsub_bucket_topics *result);
eafe8130
TL
611 // adds a topic + filter (event list, and possibly name and metadata filters) to a bucket
612 // assigning a notification name is optional (needed for S3 compatible notifications)
613 // if the topic already exist on the bucket, the filter event list may be updated
614 // for S3 compliant notifications the version with: s3_filter and notif_name should be used
615 // return -ENOENT if the topic does not exists
616 // return 0 on success, error code otherwise
617 int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events);
618 int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name);
619 // remove a topic and filter from bucket
620 // if the topic does not exists on the bucket it is a no-op (considered success)
621 // return -ENOENT if the topic does not exists
622 // return 0 on success, error code otherwise
11fdf7f2
TL
623 int remove_notification(const string& topic_name);
624 };
625
eafe8130 626 // base class for subscription
11fdf7f2
TL
627 class Sub {
628 friend class RGWUserPubSub;
eafe8130
TL
629 protected:
630 RGWUserPubSub* const ps;
631 const std::string sub;
11fdf7f2
TL
632 rgw_raw_obj sub_meta_obj;
633
634 int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker);
635 int write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker);
636 int remove_sub(RGWObjVersionTracker *objv_tracker);
637 public:
eafe8130 638 Sub(RGWUserPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
11fdf7f2
TL
639 ps->get_sub_meta_obj(sub, &sub_meta_obj);
640 }
641
eafe8130
TL
642 virtual ~Sub() = default;
643
644 int subscribe(const string& topic_name, const rgw_pubsub_sub_dest& dest, const std::string& s3_id="");
11fdf7f2 645 int unsubscribe(const string& topic_name);
eafe8130
TL
646 int get_conf(rgw_pubsub_sub_config* result);
647
648 static const int DEFAULT_MAX_EVENTS = 100;
649 // followint virtual methods should only be called in derived
650 virtual int list_events(const string& marker, int max_events) {ceph_assert(false);}
651 virtual int remove_event(const string& event_id) {ceph_assert(false);}
652 virtual void dump(Formatter* f) const {ceph_assert(false);}
653 };
11fdf7f2 654
eafe8130
TL
655 // subscription with templated list of events to support both S3 compliant and Ceph specific events
656 template<typename EventType>
657 class SubWithEvents : public Sub {
658 private:
11fdf7f2 659 struct list_events_result {
eafe8130 660 std::string next_marker;
11fdf7f2 661 bool is_truncated{false};
11fdf7f2 662 void dump(Formatter *f) const;
eafe8130
TL
663 std::vector<EventType> events;
664 } list;
11fdf7f2 665
eafe8130
TL
666 public:
667 SubWithEvents(RGWUserPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {}
668
669 virtual ~SubWithEvents() = default;
670
671 int list_events(const string& marker, int max_events) override;
672 int remove_event(const string& event_id) override;
673 void dump(Formatter* f) const override;
11fdf7f2
TL
674 };
675
676 using BucketRef = std::shared_ptr<Bucket>;
677 using SubRef = std::shared_ptr<Sub>;
678
679 BucketRef get_bucket(const rgw_bucket& bucket) {
680 return std::make_shared<Bucket>(this, bucket);
681 }
682
683 SubRef get_sub(const string& sub) {
684 return std::make_shared<Sub>(this, sub);
685 }
eafe8130
TL
686
687 SubRef get_sub_with_events(const string& sub) {
688 auto tmpsub = Sub(this, sub);
689 rgw_pubsub_sub_config conf;
690 if (tmpsub.get_conf(&conf) < 0) {
691 return nullptr;
692 }
693 if (conf.s3_id.empty()) {
694 return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub);
695 }
696 return std::make_shared<SubWithEvents<rgw_pubsub_s3_record>>(this, sub);
697 }
698
11fdf7f2
TL
699 void get_user_meta_obj(rgw_raw_obj *obj) const {
700 *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, user_meta_oid());
701 }
702
703 void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
704 *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
705 }
706
707 void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
708 *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sub_meta_oid(name));
709 }
710
eafe8130
TL
711 // get all topics defined for the user and populate them into "result"
712 // return 0 on success or if no topics exist, error code otherwise
11fdf7f2 713 int get_user_topics(rgw_pubsub_user_topics *result);
eafe8130
TL
714 // get a topic with its subscriptions by its name and populate it into "result"
715 // return -ENOENT if the topic does not exists
716 // return 0 on success, error code otherwise
11fdf7f2 717 int get_topic(const string& name, rgw_pubsub_topic_subs *result);
eafe8130
TL
718 // get a topic with by its name and populate it into "result"
719 // return -ENOENT if the topic does not exists
720 // return 0 on success, error code otherwise
721 int get_topic(const string& name, rgw_pubsub_topic *result);
722 // create a topic with a name only
723 // if the topic already exists it is a no-op (considered success)
724 // return 0 on success, error code otherwise
11fdf7f2 725 int create_topic(const string& name);
eafe8130
TL
726 // create a topic with push destination information and ARN
727 // if the topic already exists the destination and ARN values may be updated (considered succsess)
728 // return 0 on success, error code otherwise
729 int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn);
730 // remove a topic according to its name
731 // if the topic does not exists it is a no-op (considered success)
732 // return 0 on success, error code otherwise
11fdf7f2
TL
733 int remove_topic(const string& name);
734};
735
736template <class T>
737int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker)
738{
739 bufferlist bl;
740 int ret = rgw_get_system_obj(store, obj_ctx,
741 obj.pool, obj.oid,
742 bl,
743 objv_tracker,
744 nullptr, nullptr, nullptr);
745 if (ret < 0) {
746 return ret;
747 }
748
749 auto iter = bl.cbegin();
750 try {
751 decode(*result, iter);
752 } catch (buffer::error& err) {
753 return -EIO;
754 }
755
756 return 0;
757}
758
759template <class T>
760int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker)
761{
762 bufferlist bl;
763 encode(info, bl);
764
765 int ret = rgw_put_system_obj(store, obj.pool, obj.oid,
766 bl, false, objv_tracker,
767 real_time());
768 if (ret < 0) {
769 return ret;
770 }
771
eafe8130 772 obj_ctx.invalidate(const_cast<rgw_raw_obj&>(obj));
11fdf7f2
TL
773 return 0;
774}
775
776#endif