]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_pubsub.h
import ceph 15.2.10
[ceph.git] / ceph / src / rgw / rgw_pubsub.h
CommitLineData
eafe8130
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
11fdf7f2
TL
4#ifndef CEPH_RGW_PUBSUB_H
5#define CEPH_RGW_PUBSUB_H
6
9f95a23c
TL
7#include "rgw_sal.h"
8#include "services/svc_sys_obj.h"
11fdf7f2
TL
9#include "rgw_tools.h"
10#include "rgw_zone.h"
eafe8130
TL
11#include "rgw_rados.h"
12#include "rgw_notify_event_type.h"
9f95a23c 13#include <boost/container/flat_map.hpp>
11fdf7f2 14
eafe8130
TL
15class XMLObj;
16
17struct rgw_s3_key_filter {
18 std::string prefix_rule;
19 std::string suffix_rule;
20 std::string regex_rule;
21
22 bool has_content() const;
23
24 bool decode_xml(XMLObj *obj);
25 void dump_xml(Formatter *f) const;
26
27 void encode(bufferlist& bl) const {
28 ENCODE_START(1, 1, bl);
29 encode(prefix_rule, bl);
30 encode(suffix_rule, bl);
31 encode(regex_rule, bl);
32 ENCODE_FINISH(bl);
33 }
34
35 void decode(bufferlist::const_iterator& bl) {
36 DECODE_START(1, bl);
37 decode(prefix_rule, bl);
38 decode(suffix_rule, bl);
39 decode(regex_rule, bl);
40 DECODE_FINISH(bl);
41 }
42};
43WRITE_CLASS_ENCODER(rgw_s3_key_filter)
44
9f95a23c 45using KeyValueList = boost::container::flat_map<std::string, std::string>;
eafe8130 46
9f95a23c
TL
47struct rgw_s3_key_value_filter {
48 KeyValueList kvl;
eafe8130
TL
49
50 bool has_content() const;
51
52 bool decode_xml(XMLObj *obj);
53 void dump_xml(Formatter *f) const;
54
55 void encode(bufferlist& bl) const {
56 ENCODE_START(1, 1, bl);
9f95a23c 57 encode(kvl, bl);
eafe8130
TL
58 ENCODE_FINISH(bl);
59 }
60 void decode(bufferlist::const_iterator& bl) {
61 DECODE_START(1, bl);
9f95a23c 62 decode(kvl, bl);
eafe8130
TL
63 DECODE_FINISH(bl);
64 }
65};
9f95a23c 66WRITE_CLASS_ENCODER(rgw_s3_key_value_filter)
eafe8130
TL
67
68struct rgw_s3_filter {
69 rgw_s3_key_filter key_filter;
9f95a23c
TL
70 rgw_s3_key_value_filter metadata_filter;
71 rgw_s3_key_value_filter tag_filter;
eafe8130
TL
72
73 bool has_content() const;
74
75 bool decode_xml(XMLObj *obj);
76 void dump_xml(Formatter *f) const;
77
78 void encode(bufferlist& bl) const {
9f95a23c 79 ENCODE_START(2, 1, bl);
eafe8130
TL
80 encode(key_filter, bl);
81 encode(metadata_filter, bl);
9f95a23c 82 encode(tag_filter, bl);
eafe8130
TL
83 ENCODE_FINISH(bl);
84 }
85
86 void decode(bufferlist::const_iterator& bl) {
9f95a23c 87 DECODE_START(2, bl);
eafe8130
TL
88 decode(key_filter, bl);
89 decode(metadata_filter, bl);
9f95a23c
TL
90 if (struct_v >= 2) {
91 decode(tag_filter, bl);
92 }
eafe8130
TL
93 DECODE_FINISH(bl);
94 }
95};
96WRITE_CLASS_ENCODER(rgw_s3_filter)
97
98using OptionalFilter = std::optional<rgw_s3_filter>;
99
9f95a23c 100struct rgw_pubsub_topic_filter;
eafe8130
TL
101/* S3 notification configuration
102 * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
103<NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
104 <TopicConfiguration>
105 <Filter>
106 <S3Key>
107 <FilterRule>
108 <Name>suffix</Name>
109 <Value>jpg</Value>
110 </FilterRule>
111 </S3Key>
112 <S3Metadata>
113 <FilterRule>
114 <Name></Name>
115 <Value></Value>
116 </FilterRule>
92f5a8d4 117 </S3Metadata>
9f95a23c
TL
118 <S3Tags>
119 <FilterRule>
120 <Name></Name>
121 <Value></Value>
122 </FilterRule>
123 </S3Tags>
eafe8130
TL
124 </Filter>
125 <Id>notification1</Id>
126 <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
127 <Event>s3:ObjectCreated:*</Event>
128 <Event>s3:ObjectRemoved:*</Event>
129 </TopicConfiguration>
130</NotificationConfiguration>
131*/
132struct rgw_pubsub_s3_notification {
133 // notification id
134 std::string id;
135 // types of events
136 rgw::notify::EventTypeList events;
137 // topic ARN
138 std::string topic_arn;
139 // filter rules
140 rgw_s3_filter filter;
141
142 bool decode_xml(XMLObj *obj);
143 void dump_xml(Formatter *f) const;
144
145 rgw_pubsub_s3_notification() = default;
146 // construct from rgw_pubsub_topic_filter (used by get/list notifications)
9f95a23c 147 explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
eafe8130
TL
148};
149
150// return true if the key matches the prefix/suffix/regex rules of the key filter
151bool match(const rgw_s3_key_filter& filter, const std::string& key);
9f95a23c
TL
152// return true if the key matches the metadata/tags rules of the metadata/tags filter
153bool match(const rgw_s3_key_value_filter& filter, const KeyValueList& kvl);
eafe8130
TL
154// return true if the event type matches (equal or contained in) one of the events in the list
155bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event);
156
157struct rgw_pubsub_s3_notifications {
158 std::list<rgw_pubsub_s3_notification> list;
159 bool decode_xml(XMLObj *obj);
160 void dump_xml(Formatter *f) const;
161};
162
163/* S3 event records structure
164 * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
165{
166"Records":[
167 {
168 "eventVersion":""
169 "eventSource":"",
170 "awsRegion":"",
171 "eventTime":"",
172 "eventName":"",
173 "userIdentity":{
174 "principalId":""
175 },
176 "requestParameters":{
177 "sourceIPAddress":""
178 },
179 "responseElements":{
180 "x-amz-request-id":"",
181 "x-amz-id-2":""
182 },
183 "s3":{
184 "s3SchemaVersion":"1.0",
185 "configurationId":"",
186 "bucket":{
187 "name":"",
188 "ownerIdentity":{
189 "principalId":""
190 },
191 "arn":""
192 "id": ""
193 },
194 "object":{
195 "key":"",
196 "size": ,
197 "eTag":"",
198 "versionId":"",
199 "sequencer": "",
200 "metadata": ""
9f95a23c 201 "tags": ""
eafe8130
TL
202 }
203 },
204 "eventId":"",
205 }
206]
207}*/
208
209struct rgw_pubsub_s3_record {
eafe8130 210 constexpr static const char* const json_type_plural = "Records";
92f5a8d4 211 std::string eventVersion = "2.2";
eafe8130 212 // aws:s3
92f5a8d4 213 std::string eventSource = "ceph:s3";
eafe8130
TL
214 // zonegroup
215 std::string awsRegion;
216 // time of the request
217 ceph::real_time eventTime;
218 // type of the event
219 std::string eventName;
92f5a8d4 220 // user that sent the request
eafe8130
TL
221 std::string userIdentity;
222 // IP address of source of the request (not implemented)
223 std::string sourceIPAddress;
224 // request ID (not implemented)
225 std::string x_amz_request_id;
226 // radosgw that received the request
227 std::string x_amz_id_2;
92f5a8d4 228 std::string s3SchemaVersion = "1.0";
eafe8130
TL
229 // ID received in the notification request
230 std::string configurationId;
231 // bucket name
232 std::string bucket_name;
92f5a8d4 233 // bucket owner
eafe8130
TL
234 std::string bucket_ownerIdentity;
235 // bucket ARN
236 std::string bucket_arn;
237 // object key
238 std::string object_key;
92f5a8d4
TL
239 // object size
240 uint64_t object_size = 0;
eafe8130
TL
241 // object etag
242 std::string object_etag;
243 // object version id bucket is versioned
244 std::string object_versionId;
245 // hexadecimal value used to determine event order for specific key
246 std::string object_sequencer;
247 // this is an rgw extension (not S3 standard)
248 // used to store a globally unique identifier of the event
92f5a8d4 249 // that could be used for acking or any other identification of the event
eafe8130
TL
250 std::string id;
251 // this is an rgw extension holding the internal bucket id
252 std::string bucket_id;
253 // meta data
9f95a23c
TL
254 KeyValueList x_meta_map;
255 // tags
256 KeyValueList tags;
257 // opaque data received from the topic
258 // could be used to identify the gateway
259 std::string opaque_data;
eafe8130
TL
260
261 void encode(bufferlist& bl) const {
9f95a23c 262 ENCODE_START(4, 1, bl);
eafe8130
TL
263 encode(eventVersion, bl);
264 encode(eventSource, bl);
265 encode(awsRegion, bl);
266 encode(eventTime, bl);
267 encode(eventName, bl);
268 encode(userIdentity, bl);
269 encode(sourceIPAddress, bl);
270 encode(x_amz_request_id, bl);
271 encode(x_amz_id_2, bl);
272 encode(s3SchemaVersion, bl);
273 encode(configurationId, bl);
274 encode(bucket_name, bl);
275 encode(bucket_ownerIdentity, bl);
276 encode(bucket_arn, bl);
277 encode(object_key, bl);
278 encode(object_size, bl);
279 encode(object_etag, bl);
280 encode(object_versionId, bl);
281 encode(object_sequencer, bl);
282 encode(id, bl);
283 encode(bucket_id, bl);
284 encode(x_meta_map, bl);
9f95a23c
TL
285 encode(tags, bl);
286 encode(opaque_data, bl);
eafe8130
TL
287 ENCODE_FINISH(bl);
288 }
289
290 void decode(bufferlist::const_iterator& bl) {
9f95a23c 291 DECODE_START(4, bl);
eafe8130
TL
292 decode(eventVersion, bl);
293 decode(eventSource, bl);
294 decode(awsRegion, bl);
295 decode(eventTime, bl);
296 decode(eventName, bl);
297 decode(userIdentity, bl);
298 decode(sourceIPAddress, bl);
299 decode(x_amz_request_id, bl);
300 decode(x_amz_id_2, bl);
301 decode(s3SchemaVersion, bl);
302 decode(configurationId, bl);
303 decode(bucket_name, bl);
304 decode(bucket_ownerIdentity, bl);
305 decode(bucket_arn, bl);
306 decode(object_key, bl);
307 decode(object_size, bl);
308 decode(object_etag, bl);
309 decode(object_versionId, bl);
310 decode(object_sequencer, bl);
311 decode(id, bl);
312 if (struct_v >= 2) {
9f95a23c
TL
313 decode(bucket_id, bl);
314 decode(x_meta_map, bl);
315 }
316 if (struct_v >= 3) {
317 decode(tags, bl);
318 }
319 if (struct_v >= 4) {
320 decode(opaque_data, bl);
eafe8130
TL
321 }
322 DECODE_FINISH(bl);
323 }
324
325 void dump(Formatter *f) const;
326};
327WRITE_CLASS_ENCODER(rgw_pubsub_s3_record)
11fdf7f2
TL
328
329struct rgw_pubsub_event {
eafe8130
TL
330 constexpr static const char* const json_type_plural = "events";
331 std::string id;
332 std::string event_name;
333 std::string source;
11fdf7f2
TL
334 ceph::real_time timestamp;
335 JSONFormattable info;
336
337 void encode(bufferlist& bl) const {
338 ENCODE_START(1, 1, bl);
339 encode(id, bl);
eafe8130 340 encode(event_name, bl);
11fdf7f2
TL
341 encode(source, bl);
342 encode(timestamp, bl);
343 encode(info, bl);
344 ENCODE_FINISH(bl);
345 }
346
347 void decode(bufferlist::const_iterator& bl) {
348 DECODE_START(1, bl);
349 decode(id, bl);
eafe8130 350 decode(event_name, bl);
11fdf7f2
TL
351 decode(source, bl);
352 decode(timestamp, bl);
353 decode(info, bl);
354 DECODE_FINISH(bl);
355 }
356
357 void dump(Formatter *f) const;
358};
359WRITE_CLASS_ENCODER(rgw_pubsub_event)
360
92f5a8d4
TL
361// settign a unique ID for an event/record based on object hash and timestamp
362void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
363
11fdf7f2 364struct rgw_pubsub_sub_dest {
eafe8130
TL
365 std::string bucket_name;
366 std::string oid_prefix;
367 std::string push_endpoint;
368 std::string push_endpoint_args;
369 std::string arn_topic;
9f95a23c 370 bool stored_secret = false;
11fdf7f2
TL
371
372 void encode(bufferlist& bl) const {
9f95a23c 373 ENCODE_START(4, 1, bl);
11fdf7f2
TL
374 encode(bucket_name, bl);
375 encode(oid_prefix, bl);
376 encode(push_endpoint, bl);
377 encode(push_endpoint_args, bl);
eafe8130 378 encode(arn_topic, bl);
9f95a23c 379 encode(stored_secret, bl);
11fdf7f2
TL
380 ENCODE_FINISH(bl);
381 }
382
383 void decode(bufferlist::const_iterator& bl) {
9f95a23c 384 DECODE_START(4, bl);
11fdf7f2
TL
385 decode(bucket_name, bl);
386 decode(oid_prefix, bl);
387 decode(push_endpoint, bl);
388 if (struct_v >= 2) {
389 decode(push_endpoint_args, bl);
390 }
eafe8130
TL
391 if (struct_v >= 3) {
392 decode(arn_topic, bl);
393 }
9f95a23c
TL
394 if (struct_v >= 4) {
395 decode(stored_secret, bl);
396 }
11fdf7f2
TL
397 DECODE_FINISH(bl);
398 }
399
400 void dump(Formatter *f) const;
eafe8130 401 void dump_xml(Formatter *f) const;
11fdf7f2
TL
402};
403WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
404
405struct rgw_pubsub_sub_config {
406 rgw_user user;
eafe8130
TL
407 std::string name;
408 std::string topic;
11fdf7f2 409 rgw_pubsub_sub_dest dest;
eafe8130 410 std::string s3_id;
11fdf7f2
TL
411
412 void encode(bufferlist& bl) const {
eafe8130 413 ENCODE_START(2, 1, bl);
11fdf7f2
TL
414 encode(user, bl);
415 encode(name, bl);
416 encode(topic, bl);
417 encode(dest, bl);
eafe8130 418 encode(s3_id, bl);
11fdf7f2
TL
419 ENCODE_FINISH(bl);
420 }
421
422 void decode(bufferlist::const_iterator& bl) {
eafe8130 423 DECODE_START(2, bl);
11fdf7f2
TL
424 decode(user, bl);
425 decode(name, bl);
426 decode(topic, bl);
427 decode(dest, bl);
eafe8130
TL
428 if (struct_v >= 2) {
429 decode(s3_id, bl);
430 }
11fdf7f2
TL
431 DECODE_FINISH(bl);
432 }
433
434 void dump(Formatter *f) const;
435};
436WRITE_CLASS_ENCODER(rgw_pubsub_sub_config)
437
438struct rgw_pubsub_topic {
439 rgw_user user;
eafe8130
TL
440 std::string name;
441 rgw_pubsub_sub_dest dest;
442 std::string arn;
9f95a23c 443 std::string opaque_data;
11fdf7f2
TL
444
445 void encode(bufferlist& bl) const {
9f95a23c 446 ENCODE_START(3, 1, bl);
11fdf7f2
TL
447 encode(user, bl);
448 encode(name, bl);
eafe8130
TL
449 encode(dest, bl);
450 encode(arn, bl);
9f95a23c 451 encode(opaque_data, bl);
11fdf7f2
TL
452 ENCODE_FINISH(bl);
453 }
454
455 void decode(bufferlist::const_iterator& bl) {
9f95a23c 456 DECODE_START(3, bl);
11fdf7f2
TL
457 decode(user, bl);
458 decode(name, bl);
eafe8130
TL
459 if (struct_v >= 2) {
460 decode(dest, bl);
461 decode(arn, bl);
462 }
9f95a23c
TL
463 if (struct_v >= 3) {
464 decode(opaque_data, bl);
465 }
11fdf7f2
TL
466 DECODE_FINISH(bl);
467 }
468
469 string to_str() const {
470 return user.to_str() + "/" + name;
471 }
472
473 void dump(Formatter *f) const;
eafe8130 474 void dump_xml(Formatter *f) const;
11fdf7f2
TL
475
476 bool operator<(const rgw_pubsub_topic& t) const {
477 return to_str().compare(t.to_str());
478 }
479};
480WRITE_CLASS_ENCODER(rgw_pubsub_topic)
481
482struct rgw_pubsub_topic_subs {
483 rgw_pubsub_topic topic;
eafe8130 484 std::set<std::string> subs;
11fdf7f2
TL
485
486 void encode(bufferlist& bl) const {
487 ENCODE_START(1, 1, bl);
488 encode(topic, bl);
489 encode(subs, bl);
490 ENCODE_FINISH(bl);
491 }
492
493 void decode(bufferlist::const_iterator& bl) {
494 DECODE_START(1, bl);
495 decode(topic, bl);
496 decode(subs, bl);
497 DECODE_FINISH(bl);
498 }
499
500 void dump(Formatter *f) const;
501};
502WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)
503
504struct rgw_pubsub_topic_filter {
505 rgw_pubsub_topic topic;
eafe8130
TL
506 rgw::notify::EventTypeList events;
507 std::string s3_id;
508 rgw_s3_filter s3_filter;
11fdf7f2
TL
509
510 void encode(bufferlist& bl) const {
eafe8130 511 ENCODE_START(3, 1, bl);
11fdf7f2 512 encode(topic, bl);
eafe8130
TL
513 // events are stored as a vector of strings
514 std::vector<std::string> tmp_events;
515 const auto converter = s3_id.empty() ? rgw::notify::to_ceph_string : rgw::notify::to_string;
516 std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter);
517 encode(tmp_events, bl);
518 encode(s3_id, bl);
519 encode(s3_filter, bl);
11fdf7f2
TL
520 ENCODE_FINISH(bl);
521 }
522
523 void decode(bufferlist::const_iterator& bl) {
eafe8130 524 DECODE_START(3, bl);
11fdf7f2 525 decode(topic, bl);
eafe8130
TL
526 // events are stored as a vector of strings
527 events.clear();
528 std::vector<std::string> tmp_events;
529 decode(tmp_events, bl);
530 std::transform(tmp_events.begin(), tmp_events.end(), std::back_inserter(events), rgw::notify::from_string);
531 if (struct_v >= 2) {
532 decode(s3_id, bl);
533 }
534 if (struct_v >= 3) {
535 decode(s3_filter, bl);
536 }
11fdf7f2
TL
537 DECODE_FINISH(bl);
538 }
539
540 void dump(Formatter *f) const;
541};
542WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter)
543
544struct rgw_pubsub_bucket_topics {
eafe8130 545 std::map<std::string, rgw_pubsub_topic_filter> topics;
11fdf7f2
TL
546
547 void encode(bufferlist& bl) const {
548 ENCODE_START(1, 1, bl);
549 encode(topics, bl);
550 ENCODE_FINISH(bl);
551 }
552
553 void decode(bufferlist::const_iterator& bl) {
554 DECODE_START(1, bl);
555 decode(topics, bl);
556 DECODE_FINISH(bl);
557 }
558
559 void dump(Formatter *f) const;
560};
561WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
562
563struct rgw_pubsub_user_topics {
eafe8130 564 std::map<std::string, rgw_pubsub_topic_subs> topics;
11fdf7f2
TL
565
566 void encode(bufferlist& bl) const {
567 ENCODE_START(1, 1, bl);
568 encode(topics, bl);
569 ENCODE_FINISH(bl);
570 }
571
572 void decode(bufferlist::const_iterator& bl) {
573 DECODE_START(1, bl);
574 decode(topics, bl);
575 DECODE_FINISH(bl);
576 }
577
578 void dump(Formatter *f) const;
eafe8130 579 void dump_xml(Formatter *f) const;
11fdf7f2
TL
580};
581WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)
582
eafe8130 583static std::string pubsub_user_oid_prefix = "pubsub.user.";
11fdf7f2
TL
584
585class RGWUserPubSub
586{
587 friend class Bucket;
588
9f95a23c 589 rgw::sal::RGWRadosStore *store;
11fdf7f2
TL
590 rgw_user user;
591 RGWSysObjectCtx obj_ctx;
592
593 rgw_raw_obj user_meta_obj;
594
eafe8130 595 std::string user_meta_oid() const {
11fdf7f2
TL
596 return pubsub_user_oid_prefix + user.to_str();
597 }
598
eafe8130 599 std::string bucket_meta_oid(const rgw_bucket& bucket) const {
11fdf7f2
TL
600 return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
601 }
602
eafe8130 603 std::string sub_meta_oid(const string& name) const {
11fdf7f2
TL
604 return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
605 }
606
607 template <class T>
608 int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);
609
610 template <class T>
611 int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker);
612
613 int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker);
614
615 int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker);
616 int write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker);
eafe8130 617
11fdf7f2 618public:
9f95a23c 619 RGWUserPubSub(rgw::sal::RGWRadosStore *_store, const rgw_user& _user);
11fdf7f2
TL
620
621 class Bucket {
622 friend class RGWUserPubSub;
623 RGWUserPubSub *ps;
624 rgw_bucket bucket;
625 rgw_raw_obj bucket_meta_obj;
626
eafe8130
TL
627 // read the list of topics associated with a bucket and populate into result
628 // use version tacker to enforce atomicity between read/write
629 // return 0 on success or if no topic was associated with the bucket, error code otherwise
11fdf7f2 630 int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker);
eafe8130
TL
631 // set the list of topics associated with a bucket
632 // use version tacker to enforce atomicity between read/write
633 // return 0 on success, error code otherwise
11fdf7f2
TL
634 int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker);
635 public:
636 Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
637 ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
638 }
639
eafe8130
TL
640 // read the list of topics associated with a bucket and populate into result
641 // return 0 on success or if no topic was associated with the bucket, error code otherwise
11fdf7f2 642 int get_topics(rgw_pubsub_bucket_topics *result);
9f95a23c 643 // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
eafe8130
TL
644 // assigning a notification name is optional (needed for S3 compatible notifications)
645 // if the topic already exist on the bucket, the filter event list may be updated
646 // for S3 compliant notifications the version with: s3_filter and notif_name should be used
647 // return -ENOENT if the topic does not exists
648 // return 0 on success, error code otherwise
649 int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events);
650 int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name);
651 // remove a topic and filter from bucket
652 // if the topic does not exists on the bucket it is a no-op (considered success)
653 // return -ENOENT if the topic does not exists
654 // return 0 on success, error code otherwise
11fdf7f2
TL
655 int remove_notification(const string& topic_name);
656 };
657
eafe8130 658 // base class for subscription
11fdf7f2
TL
659 class Sub {
660 friend class RGWUserPubSub;
eafe8130
TL
661 protected:
662 RGWUserPubSub* const ps;
663 const std::string sub;
11fdf7f2
TL
664 rgw_raw_obj sub_meta_obj;
665
666 int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker);
667 int write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker);
668 int remove_sub(RGWObjVersionTracker *objv_tracker);
669 public:
eafe8130 670 Sub(RGWUserPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
11fdf7f2
TL
671 ps->get_sub_meta_obj(sub, &sub_meta_obj);
672 }
673
eafe8130
TL
674 virtual ~Sub() = default;
675
676 int subscribe(const string& topic_name, const rgw_pubsub_sub_dest& dest, const std::string& s3_id="");
11fdf7f2 677 int unsubscribe(const string& topic_name);
eafe8130
TL
678 int get_conf(rgw_pubsub_sub_config* result);
679
680 static const int DEFAULT_MAX_EVENTS = 100;
681 // followint virtual methods should only be called in derived
682 virtual int list_events(const string& marker, int max_events) {ceph_assert(false);}
683 virtual int remove_event(const string& event_id) {ceph_assert(false);}
684 virtual void dump(Formatter* f) const {ceph_assert(false);}
685 };
11fdf7f2 686
eafe8130
TL
687 // subscription with templated list of events to support both S3 compliant and Ceph specific events
688 template<typename EventType>
689 class SubWithEvents : public Sub {
690 private:
11fdf7f2 691 struct list_events_result {
eafe8130 692 std::string next_marker;
11fdf7f2 693 bool is_truncated{false};
11fdf7f2 694 void dump(Formatter *f) const;
eafe8130
TL
695 std::vector<EventType> events;
696 } list;
11fdf7f2 697
eafe8130
TL
698 public:
699 SubWithEvents(RGWUserPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {}
700
701 virtual ~SubWithEvents() = default;
702
703 int list_events(const string& marker, int max_events) override;
704 int remove_event(const string& event_id) override;
705 void dump(Formatter* f) const override;
11fdf7f2
TL
706 };
707
708 using BucketRef = std::shared_ptr<Bucket>;
709 using SubRef = std::shared_ptr<Sub>;
710
711 BucketRef get_bucket(const rgw_bucket& bucket) {
712 return std::make_shared<Bucket>(this, bucket);
713 }
714
715 SubRef get_sub(const string& sub) {
716 return std::make_shared<Sub>(this, sub);
717 }
eafe8130
TL
718
719 SubRef get_sub_with_events(const string& sub) {
720 auto tmpsub = Sub(this, sub);
721 rgw_pubsub_sub_config conf;
722 if (tmpsub.get_conf(&conf) < 0) {
723 return nullptr;
724 }
725 if (conf.s3_id.empty()) {
726 return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub);
727 }
728 return std::make_shared<SubWithEvents<rgw_pubsub_s3_record>>(this, sub);
729 }
11fdf7f2 730
9f95a23c
TL
731 void get_user_meta_obj(rgw_raw_obj *obj) const;
732 void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const;
11fdf7f2 733
9f95a23c 734 void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const;
11fdf7f2 735
eafe8130
TL
736 // get all topics defined for the user and populate them into "result"
737 // return 0 on success or if no topics exist, error code otherwise
11fdf7f2 738 int get_user_topics(rgw_pubsub_user_topics *result);
eafe8130
TL
739 // get a topic with its subscriptions by its name and populate it into "result"
740 // return -ENOENT if the topic does not exists
741 // return 0 on success, error code otherwise
11fdf7f2 742 int get_topic(const string& name, rgw_pubsub_topic_subs *result);
eafe8130
TL
743 // get a topic with by its name and populate it into "result"
744 // return -ENOENT if the topic does not exists
745 // return 0 on success, error code otherwise
746 int get_topic(const string& name, rgw_pubsub_topic *result);
747 // create a topic with a name only
748 // if the topic already exists it is a no-op (considered success)
749 // return 0 on success, error code otherwise
11fdf7f2 750 int create_topic(const string& name);
eafe8130
TL
751 // create a topic with push destination information and ARN
752 // if the topic already exists the destination and ARN values may be updated (considered succsess)
753 // return 0 on success, error code otherwise
9f95a23c 754 int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data);
eafe8130
TL
755 // remove a topic according to its name
756 // if the topic does not exists it is a no-op (considered success)
757 // return 0 on success, error code otherwise
11fdf7f2
TL
758 int remove_topic(const string& name);
759};
760
9f95a23c 761
11fdf7f2
TL
762template <class T>
763int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker)
764{
765 bufferlist bl;
9f95a23c 766 int ret = rgw_get_system_obj(obj_ctx,
11fdf7f2
TL
767 obj.pool, obj.oid,
768 bl,
769 objv_tracker,
9f95a23c 770 nullptr, null_yield, nullptr, nullptr);
11fdf7f2
TL
771 if (ret < 0) {
772 return ret;
773 }
774
775 auto iter = bl.cbegin();
776 try {
777 decode(*result, iter);
778 } catch (buffer::error& err) {
779 return -EIO;
780 }
781
782 return 0;
783}
784
785template <class T>
786int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker)
787{
788 bufferlist bl;
789 encode(info, bl);
790
9f95a23c 791 int ret = rgw_put_system_obj(obj_ctx, obj.pool, obj.oid,
11fdf7f2
TL
792 bl, false, objv_tracker,
793 real_time());
794 if (ret < 0) {
795 return ret;
796 }
797
9f95a23c 798 obj_ctx.invalidate(obj);
11fdf7f2
TL
799 return 0;
800}
801
802#endif