]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_pubsub.h
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_pubsub.h
CommitLineData
eafe8130
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
11fdf7f2
TL
4#ifndef CEPH_RGW_PUBSUB_H
5#define CEPH_RGW_PUBSUB_H
6
9f95a23c
TL
7#include "rgw_sal.h"
8#include "services/svc_sys_obj.h"
11fdf7f2
TL
9#include "rgw_tools.h"
10#include "rgw_zone.h"
eafe8130 11#include "rgw_notify_event_type.h"
9f95a23c 12#include <boost/container/flat_map.hpp>
11fdf7f2 13
eafe8130
TL
14class XMLObj;
15
16struct rgw_s3_key_filter {
17 std::string prefix_rule;
18 std::string suffix_rule;
19 std::string regex_rule;
20
21 bool has_content() const;
22
23 bool decode_xml(XMLObj *obj);
24 void dump_xml(Formatter *f) const;
25
26 void encode(bufferlist& bl) const {
27 ENCODE_START(1, 1, bl);
28 encode(prefix_rule, bl);
29 encode(suffix_rule, bl);
30 encode(regex_rule, bl);
31 ENCODE_FINISH(bl);
32 }
33
34 void decode(bufferlist::const_iterator& bl) {
35 DECODE_START(1, bl);
36 decode(prefix_rule, bl);
37 decode(suffix_rule, bl);
38 decode(regex_rule, bl);
39 DECODE_FINISH(bl);
40 }
41};
42WRITE_CLASS_ENCODER(rgw_s3_key_filter)
43
f67539c2 44using KeyValueMap = boost::container::flat_map<std::string, std::string>;
eafe8130 45
9f95a23c 46struct rgw_s3_key_value_filter {
f67539c2 47 KeyValueMap kv;
eafe8130
TL
48
49 bool has_content() const;
50
51 bool decode_xml(XMLObj *obj);
52 void dump_xml(Formatter *f) const;
53
54 void encode(bufferlist& bl) const {
55 ENCODE_START(1, 1, bl);
f67539c2 56 encode(kv, bl);
eafe8130
TL
57 ENCODE_FINISH(bl);
58 }
59 void decode(bufferlist::const_iterator& bl) {
60 DECODE_START(1, bl);
f67539c2 61 decode(kv, bl);
eafe8130
TL
62 DECODE_FINISH(bl);
63 }
64};
9f95a23c 65WRITE_CLASS_ENCODER(rgw_s3_key_value_filter)
eafe8130
TL
66
67struct rgw_s3_filter {
68 rgw_s3_key_filter key_filter;
9f95a23c
TL
69 rgw_s3_key_value_filter metadata_filter;
70 rgw_s3_key_value_filter tag_filter;
eafe8130
TL
71
72 bool has_content() const;
73
74 bool decode_xml(XMLObj *obj);
75 void dump_xml(Formatter *f) const;
76
77 void encode(bufferlist& bl) const {
9f95a23c 78 ENCODE_START(2, 1, bl);
eafe8130
TL
79 encode(key_filter, bl);
80 encode(metadata_filter, bl);
9f95a23c 81 encode(tag_filter, bl);
eafe8130
TL
82 ENCODE_FINISH(bl);
83 }
84
85 void decode(bufferlist::const_iterator& bl) {
9f95a23c 86 DECODE_START(2, bl);
eafe8130
TL
87 decode(key_filter, bl);
88 decode(metadata_filter, bl);
9f95a23c
TL
89 if (struct_v >= 2) {
90 decode(tag_filter, bl);
91 }
eafe8130
TL
92 DECODE_FINISH(bl);
93 }
94};
95WRITE_CLASS_ENCODER(rgw_s3_filter)
96
97using OptionalFilter = std::optional<rgw_s3_filter>;
98
9f95a23c 99struct rgw_pubsub_topic_filter;
eafe8130
TL
100/* S3 notification configuration
101 * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
102<NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
103 <TopicConfiguration>
104 <Filter>
105 <S3Key>
106 <FilterRule>
107 <Name>suffix</Name>
108 <Value>jpg</Value>
109 </FilterRule>
110 </S3Key>
111 <S3Metadata>
112 <FilterRule>
113 <Name></Name>
114 <Value></Value>
115 </FilterRule>
92f5a8d4 116 </S3Metadata>
9f95a23c
TL
117 <S3Tags>
118 <FilterRule>
119 <Name></Name>
120 <Value></Value>
121 </FilterRule>
122 </S3Tags>
eafe8130
TL
123 </Filter>
124 <Id>notification1</Id>
125 <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
126 <Event>s3:ObjectCreated:*</Event>
127 <Event>s3:ObjectRemoved:*</Event>
128 </TopicConfiguration>
129</NotificationConfiguration>
130*/
131struct rgw_pubsub_s3_notification {
132 // notification id
133 std::string id;
134 // types of events
135 rgw::notify::EventTypeList events;
136 // topic ARN
137 std::string topic_arn;
138 // filter rules
139 rgw_s3_filter filter;
140
141 bool decode_xml(XMLObj *obj);
142 void dump_xml(Formatter *f) const;
143
144 rgw_pubsub_s3_notification() = default;
145 // construct from rgw_pubsub_topic_filter (used by get/list notifications)
9f95a23c 146 explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
eafe8130
TL
147};
148
149// return true if the key matches the prefix/suffix/regex rules of the key filter
150bool match(const rgw_s3_key_filter& filter, const std::string& key);
9f95a23c 151// return true if the key matches the metadata/tags rules of the metadata/tags filter
f67539c2 152bool match(const rgw_s3_key_value_filter& filter, const KeyValueMap& kv);
eafe8130
TL
153// return true if the event type matches (equal or contained in) one of the events in the list
154bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event);
155
156struct rgw_pubsub_s3_notifications {
157 std::list<rgw_pubsub_s3_notification> list;
158 bool decode_xml(XMLObj *obj);
159 void dump_xml(Formatter *f) const;
160};
161
162/* S3 event records structure
163 * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
164{
165"Records":[
166 {
167 "eventVersion":""
168 "eventSource":"",
169 "awsRegion":"",
170 "eventTime":"",
171 "eventName":"",
172 "userIdentity":{
173 "principalId":""
174 },
175 "requestParameters":{
176 "sourceIPAddress":""
177 },
178 "responseElements":{
179 "x-amz-request-id":"",
180 "x-amz-id-2":""
181 },
182 "s3":{
183 "s3SchemaVersion":"1.0",
184 "configurationId":"",
185 "bucket":{
186 "name":"",
187 "ownerIdentity":{
188 "principalId":""
189 },
190 "arn":""
191 "id": ""
192 },
193 "object":{
194 "key":"",
195 "size": ,
196 "eTag":"",
197 "versionId":"",
198 "sequencer": "",
199 "metadata": ""
9f95a23c 200 "tags": ""
eafe8130
TL
201 }
202 },
203 "eventId":"",
204 }
205]
206}*/
207
f67539c2 208struct rgw_pubsub_s3_event {
eafe8130 209 constexpr static const char* const json_type_plural = "Records";
92f5a8d4 210 std::string eventVersion = "2.2";
eafe8130 211 // aws:s3
92f5a8d4 212 std::string eventSource = "ceph:s3";
eafe8130
TL
213 // zonegroup
214 std::string awsRegion;
215 // time of the request
216 ceph::real_time eventTime;
217 // type of the event
218 std::string eventName;
92f5a8d4 219 // user that sent the request
eafe8130
TL
220 std::string userIdentity;
221 // IP address of source of the request (not implemented)
222 std::string sourceIPAddress;
223 // request ID (not implemented)
224 std::string x_amz_request_id;
225 // radosgw that received the request
226 std::string x_amz_id_2;
92f5a8d4 227 std::string s3SchemaVersion = "1.0";
eafe8130
TL
228 // ID received in the notification request
229 std::string configurationId;
230 // bucket name
231 std::string bucket_name;
92f5a8d4 232 // bucket owner
eafe8130
TL
233 std::string bucket_ownerIdentity;
234 // bucket ARN
235 std::string bucket_arn;
236 // object key
237 std::string object_key;
92f5a8d4
TL
238 // object size
239 uint64_t object_size = 0;
eafe8130
TL
240 // object etag
241 std::string object_etag;
242 // object version id bucket is versioned
243 std::string object_versionId;
244 // hexadecimal value used to determine event order for specific key
245 std::string object_sequencer;
246 // this is an rgw extension (not S3 standard)
247 // used to store a globally unique identifier of the event
92f5a8d4 248 // that could be used for acking or any other identification of the event
eafe8130
TL
249 std::string id;
250 // this is an rgw extension holding the internal bucket id
251 std::string bucket_id;
252 // meta data
f67539c2 253 KeyValueMap x_meta_map;
9f95a23c 254 // tags
f67539c2 255 KeyValueMap tags;
9f95a23c
TL
256 // opaque data received from the topic
257 // could be used to identify the gateway
258 std::string opaque_data;
eafe8130
TL
259
260 void encode(bufferlist& bl) const {
9f95a23c 261 ENCODE_START(4, 1, bl);
eafe8130
TL
262 encode(eventVersion, bl);
263 encode(eventSource, bl);
264 encode(awsRegion, bl);
265 encode(eventTime, bl);
266 encode(eventName, bl);
267 encode(userIdentity, bl);
268 encode(sourceIPAddress, bl);
269 encode(x_amz_request_id, bl);
270 encode(x_amz_id_2, bl);
271 encode(s3SchemaVersion, bl);
272 encode(configurationId, bl);
273 encode(bucket_name, bl);
274 encode(bucket_ownerIdentity, bl);
275 encode(bucket_arn, bl);
276 encode(object_key, bl);
277 encode(object_size, bl);
278 encode(object_etag, bl);
279 encode(object_versionId, bl);
280 encode(object_sequencer, bl);
281 encode(id, bl);
282 encode(bucket_id, bl);
283 encode(x_meta_map, bl);
9f95a23c
TL
284 encode(tags, bl);
285 encode(opaque_data, bl);
eafe8130
TL
286 ENCODE_FINISH(bl);
287 }
288
289 void decode(bufferlist::const_iterator& bl) {
9f95a23c 290 DECODE_START(4, bl);
eafe8130
TL
291 decode(eventVersion, bl);
292 decode(eventSource, bl);
293 decode(awsRegion, bl);
294 decode(eventTime, bl);
295 decode(eventName, bl);
296 decode(userIdentity, bl);
297 decode(sourceIPAddress, bl);
298 decode(x_amz_request_id, bl);
299 decode(x_amz_id_2, bl);
300 decode(s3SchemaVersion, bl);
301 decode(configurationId, bl);
302 decode(bucket_name, bl);
303 decode(bucket_ownerIdentity, bl);
304 decode(bucket_arn, bl);
305 decode(object_key, bl);
306 decode(object_size, bl);
307 decode(object_etag, bl);
308 decode(object_versionId, bl);
309 decode(object_sequencer, bl);
310 decode(id, bl);
311 if (struct_v >= 2) {
9f95a23c
TL
312 decode(bucket_id, bl);
313 decode(x_meta_map, bl);
314 }
315 if (struct_v >= 3) {
316 decode(tags, bl);
317 }
318 if (struct_v >= 4) {
319 decode(opaque_data, bl);
eafe8130
TL
320 }
321 DECODE_FINISH(bl);
322 }
323
324 void dump(Formatter *f) const;
325};
f67539c2 326WRITE_CLASS_ENCODER(rgw_pubsub_s3_event)
11fdf7f2
TL
327
328struct rgw_pubsub_event {
eafe8130
TL
329 constexpr static const char* const json_type_plural = "events";
330 std::string id;
331 std::string event_name;
332 std::string source;
11fdf7f2
TL
333 ceph::real_time timestamp;
334 JSONFormattable info;
335
336 void encode(bufferlist& bl) const {
337 ENCODE_START(1, 1, bl);
338 encode(id, bl);
eafe8130 339 encode(event_name, bl);
11fdf7f2
TL
340 encode(source, bl);
341 encode(timestamp, bl);
342 encode(info, bl);
343 ENCODE_FINISH(bl);
344 }
345
346 void decode(bufferlist::const_iterator& bl) {
347 DECODE_START(1, bl);
348 decode(id, bl);
eafe8130 349 decode(event_name, bl);
11fdf7f2
TL
350 decode(source, bl);
351 decode(timestamp, bl);
352 decode(info, bl);
353 DECODE_FINISH(bl);
354 }
355
356 void dump(Formatter *f) const;
357};
358WRITE_CLASS_ENCODER(rgw_pubsub_event)
359
f67539c2 360// settign a unique ID for an event based on object hash and timestamp
92f5a8d4
TL
361void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
362
11fdf7f2 363struct rgw_pubsub_sub_dest {
eafe8130
TL
364 std::string bucket_name;
365 std::string oid_prefix;
366 std::string push_endpoint;
367 std::string push_endpoint_args;
368 std::string arn_topic;
9f95a23c 369 bool stored_secret = false;
f67539c2 370 bool persistent = false;
11fdf7f2
TL
371
372 void encode(bufferlist& bl) const {
f67539c2 373 ENCODE_START(5, 1, bl);
11fdf7f2
TL
374 encode(bucket_name, bl);
375 encode(oid_prefix, bl);
376 encode(push_endpoint, bl);
377 encode(push_endpoint_args, bl);
eafe8130 378 encode(arn_topic, bl);
9f95a23c 379 encode(stored_secret, bl);
f67539c2 380 encode(persistent, bl);
11fdf7f2
TL
381 ENCODE_FINISH(bl);
382 }
383
384 void decode(bufferlist::const_iterator& bl) {
f67539c2 385 DECODE_START(5, bl);
11fdf7f2
TL
386 decode(bucket_name, bl);
387 decode(oid_prefix, bl);
388 decode(push_endpoint, bl);
389 if (struct_v >= 2) {
390 decode(push_endpoint_args, bl);
391 }
eafe8130
TL
392 if (struct_v >= 3) {
393 decode(arn_topic, bl);
394 }
9f95a23c
TL
395 if (struct_v >= 4) {
396 decode(stored_secret, bl);
397 }
f67539c2
TL
398 if (struct_v >= 5) {
399 decode(persistent, bl);
400 }
11fdf7f2
TL
401 DECODE_FINISH(bl);
402 }
403
404 void dump(Formatter *f) const;
eafe8130 405 void dump_xml(Formatter *f) const;
f67539c2 406 std::string to_json_str() const;
11fdf7f2
TL
407};
408WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
409
410struct rgw_pubsub_sub_config {
411 rgw_user user;
eafe8130
TL
412 std::string name;
413 std::string topic;
11fdf7f2 414 rgw_pubsub_sub_dest dest;
eafe8130 415 std::string s3_id;
11fdf7f2
TL
416
417 void encode(bufferlist& bl) const {
eafe8130 418 ENCODE_START(2, 1, bl);
11fdf7f2
TL
419 encode(user, bl);
420 encode(name, bl);
421 encode(topic, bl);
422 encode(dest, bl);
eafe8130 423 encode(s3_id, bl);
11fdf7f2
TL
424 ENCODE_FINISH(bl);
425 }
426
427 void decode(bufferlist::const_iterator& bl) {
eafe8130 428 DECODE_START(2, bl);
11fdf7f2
TL
429 decode(user, bl);
430 decode(name, bl);
431 decode(topic, bl);
432 decode(dest, bl);
eafe8130
TL
433 if (struct_v >= 2) {
434 decode(s3_id, bl);
435 }
11fdf7f2
TL
436 DECODE_FINISH(bl);
437 }
438
439 void dump(Formatter *f) const;
440};
441WRITE_CLASS_ENCODER(rgw_pubsub_sub_config)
442
443struct rgw_pubsub_topic {
444 rgw_user user;
eafe8130
TL
445 std::string name;
446 rgw_pubsub_sub_dest dest;
447 std::string arn;
9f95a23c 448 std::string opaque_data;
11fdf7f2
TL
449
450 void encode(bufferlist& bl) const {
9f95a23c 451 ENCODE_START(3, 1, bl);
11fdf7f2
TL
452 encode(user, bl);
453 encode(name, bl);
eafe8130
TL
454 encode(dest, bl);
455 encode(arn, bl);
9f95a23c 456 encode(opaque_data, bl);
11fdf7f2
TL
457 ENCODE_FINISH(bl);
458 }
459
460 void decode(bufferlist::const_iterator& bl) {
9f95a23c 461 DECODE_START(3, bl);
11fdf7f2
TL
462 decode(user, bl);
463 decode(name, bl);
eafe8130
TL
464 if (struct_v >= 2) {
465 decode(dest, bl);
466 decode(arn, bl);
467 }
9f95a23c
TL
468 if (struct_v >= 3) {
469 decode(opaque_data, bl);
470 }
11fdf7f2
TL
471 DECODE_FINISH(bl);
472 }
473
474 string to_str() const {
f67539c2 475 return user.tenant + "/" + name;
11fdf7f2
TL
476 }
477
478 void dump(Formatter *f) const;
eafe8130 479 void dump_xml(Formatter *f) const;
f67539c2 480 void dump_xml_as_attributes(Formatter *f) const;
11fdf7f2
TL
481
482 bool operator<(const rgw_pubsub_topic& t) const {
483 return to_str().compare(t.to_str());
484 }
485};
486WRITE_CLASS_ENCODER(rgw_pubsub_topic)
487
488struct rgw_pubsub_topic_subs {
489 rgw_pubsub_topic topic;
eafe8130 490 std::set<std::string> subs;
11fdf7f2
TL
491
492 void encode(bufferlist& bl) const {
493 ENCODE_START(1, 1, bl);
494 encode(topic, bl);
495 encode(subs, bl);
496 ENCODE_FINISH(bl);
497 }
498
499 void decode(bufferlist::const_iterator& bl) {
500 DECODE_START(1, bl);
501 decode(topic, bl);
502 decode(subs, bl);
503 DECODE_FINISH(bl);
504 }
505
506 void dump(Formatter *f) const;
507};
508WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)
509
510struct rgw_pubsub_topic_filter {
511 rgw_pubsub_topic topic;
eafe8130
TL
512 rgw::notify::EventTypeList events;
513 std::string s3_id;
514 rgw_s3_filter s3_filter;
11fdf7f2
TL
515
516 void encode(bufferlist& bl) const {
eafe8130 517 ENCODE_START(3, 1, bl);
11fdf7f2 518 encode(topic, bl);
eafe8130
TL
519 // events are stored as a vector of strings
520 std::vector<std::string> tmp_events;
521 const auto converter = s3_id.empty() ? rgw::notify::to_ceph_string : rgw::notify::to_string;
522 std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter);
523 encode(tmp_events, bl);
524 encode(s3_id, bl);
525 encode(s3_filter, bl);
11fdf7f2
TL
526 ENCODE_FINISH(bl);
527 }
528
529 void decode(bufferlist::const_iterator& bl) {
eafe8130 530 DECODE_START(3, bl);
11fdf7f2 531 decode(topic, bl);
eafe8130
TL
532 // events are stored as a vector of strings
533 events.clear();
534 std::vector<std::string> tmp_events;
535 decode(tmp_events, bl);
536 std::transform(tmp_events.begin(), tmp_events.end(), std::back_inserter(events), rgw::notify::from_string);
537 if (struct_v >= 2) {
538 decode(s3_id, bl);
539 }
540 if (struct_v >= 3) {
541 decode(s3_filter, bl);
542 }
11fdf7f2
TL
543 DECODE_FINISH(bl);
544 }
545
546 void dump(Formatter *f) const;
547};
548WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter)
549
550struct rgw_pubsub_bucket_topics {
eafe8130 551 std::map<std::string, rgw_pubsub_topic_filter> topics;
11fdf7f2
TL
552
553 void encode(bufferlist& bl) const {
554 ENCODE_START(1, 1, bl);
555 encode(topics, bl);
556 ENCODE_FINISH(bl);
557 }
558
559 void decode(bufferlist::const_iterator& bl) {
560 DECODE_START(1, bl);
561 decode(topics, bl);
562 DECODE_FINISH(bl);
563 }
564
565 void dump(Formatter *f) const;
566};
567WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
568
f67539c2 569struct rgw_pubsub_topics {
eafe8130 570 std::map<std::string, rgw_pubsub_topic_subs> topics;
11fdf7f2
TL
571
572 void encode(bufferlist& bl) const {
573 ENCODE_START(1, 1, bl);
574 encode(topics, bl);
575 ENCODE_FINISH(bl);
576 }
577
578 void decode(bufferlist::const_iterator& bl) {
579 DECODE_START(1, bl);
580 decode(topics, bl);
581 DECODE_FINISH(bl);
582 }
583
584 void dump(Formatter *f) const;
eafe8130 585 void dump_xml(Formatter *f) const;
11fdf7f2 586};
f67539c2 587WRITE_CLASS_ENCODER(rgw_pubsub_topics)
11fdf7f2 588
f67539c2 589static std::string pubsub_oid_prefix = "pubsub.";
11fdf7f2 590
f67539c2 591class RGWPubSub
11fdf7f2
TL
592{
593 friend class Bucket;
594
9f95a23c 595 rgw::sal::RGWRadosStore *store;
f67539c2 596 const std::string tenant;
11fdf7f2
TL
597 RGWSysObjectCtx obj_ctx;
598
f67539c2 599 rgw_raw_obj meta_obj;
11fdf7f2 600
f67539c2
TL
601 std::string meta_oid() const {
602 return pubsub_oid_prefix + tenant;
11fdf7f2
TL
603 }
604
eafe8130 605 std::string bucket_meta_oid(const rgw_bucket& bucket) const {
f67539c2 606 return pubsub_oid_prefix + tenant + ".bucket." + bucket.name + "/" + bucket.bucket_id;
11fdf7f2
TL
607 }
608
eafe8130 609 std::string sub_meta_oid(const string& name) const {
f67539c2 610 return pubsub_oid_prefix + tenant + ".sub." + name;
11fdf7f2
TL
611 }
612
613 template <class T>
f67539c2 614 int read(const rgw_raw_obj& obj, T* data, RGWObjVersionTracker* objv_tracker);
11fdf7f2
TL
615
616 template <class T>
b3b6e05e 617 int write(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, const T& info,
f67539c2 618 RGWObjVersionTracker* obj_tracker, optional_yield y);
11fdf7f2 619
b3b6e05e 620 int remove(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, RGWObjVersionTracker* objv_tracker,
f67539c2 621 optional_yield y);
11fdf7f2 622
f67539c2 623 int read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker* objv_tracker);
b3b6e05e 624 int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
f67539c2 625 RGWObjVersionTracker* objv_tracker, optional_yield y);
eafe8130 626
11fdf7f2 627public:
f67539c2 628 RGWPubSub(rgw::sal::RGWRadosStore *_store, const std::string& tenant);
11fdf7f2
TL
629
630 class Bucket {
f67539c2
TL
631 friend class RGWPubSub;
632 RGWPubSub *ps;
11fdf7f2
TL
633 rgw_bucket bucket;
634 rgw_raw_obj bucket_meta_obj;
635
eafe8130
TL
636 // read the list of topics associated with a bucket and populate into result
637 // use version tacker to enforce atomicity between read/write
638 // return 0 on success or if no topic was associated with the bucket, error code otherwise
f67539c2 639 int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker* objv_tracker);
eafe8130
TL
640 // set the list of topics associated with a bucket
641 // use version tacker to enforce atomicity between read/write
642 // return 0 on success, error code otherwise
b3b6e05e 643 int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics,
f67539c2 644 RGWObjVersionTracker* objv_tracker, optional_yield y);
11fdf7f2 645 public:
f67539c2 646 Bucket(RGWPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
11fdf7f2
TL
647 ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
648 }
649
eafe8130
TL
650 // read the list of topics associated with a bucket and populate into result
651 // return 0 on success or if no topic was associated with the bucket, error code otherwise
11fdf7f2 652 int get_topics(rgw_pubsub_bucket_topics *result);
9f95a23c 653 // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
eafe8130
TL
654 // assigning a notification name is optional (needed for S3 compatible notifications)
655 // if the topic already exist on the bucket, the filter event list may be updated
656 // for S3 compliant notifications the version with: s3_filter and notif_name should be used
657 // return -ENOENT if the topic does not exists
658 // return 0 on success, error code otherwise
b3b6e05e
TL
659 int create_notification(const DoutPrefixProvider *dpp, const string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y);
660 int create_notification(const DoutPrefixProvider *dpp, const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y);
eafe8130
TL
661 // remove a topic and filter from bucket
662 // if the topic does not exists on the bucket it is a no-op (considered success)
663 // return -ENOENT if the topic does not exists
664 // return 0 on success, error code otherwise
b3b6e05e 665 int remove_notification(const DoutPrefixProvider *dpp, const string& topic_name, optional_yield y);
f67539c2
TL
666 // remove all notifications (and autogenerated topics) associated with the bucket
667 // return 0 on success or if no topic was associated with the bucket, error code otherwise
b3b6e05e 668 int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y);
11fdf7f2
TL
669 };
670
eafe8130 671 // base class for subscription
11fdf7f2 672 class Sub {
f67539c2 673 friend class RGWPubSub;
eafe8130 674 protected:
f67539c2 675 RGWPubSub* const ps;
eafe8130 676 const std::string sub;
11fdf7f2
TL
677 rgw_raw_obj sub_meta_obj;
678
f67539c2 679 int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker* objv_tracker);
b3b6e05e 680 int write_sub(const DoutPrefixProvider *dpp, const rgw_pubsub_sub_config& sub_conf,
f67539c2 681 RGWObjVersionTracker* objv_tracker, optional_yield y);
b3b6e05e 682 int remove_sub(const DoutPrefixProvider *dpp, RGWObjVersionTracker* objv_tracker, optional_yield y);
11fdf7f2 683 public:
f67539c2 684 Sub(RGWPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
11fdf7f2
TL
685 ps->get_sub_meta_obj(sub, &sub_meta_obj);
686 }
687
eafe8130
TL
688 virtual ~Sub() = default;
689
b3b6e05e 690 int subscribe(const DoutPrefixProvider *dpp, const string& topic_name, const rgw_pubsub_sub_dest& dest, optional_yield y,
f67539c2 691 const std::string& s3_id="");
b3b6e05e 692 int unsubscribe(const DoutPrefixProvider *dpp, const string& topic_name, optional_yield y);
eafe8130
TL
693 int get_conf(rgw_pubsub_sub_config* result);
694
695 static const int DEFAULT_MAX_EVENTS = 100;
696 // followint virtual methods should only be called in derived
b3b6e05e
TL
697 virtual int list_events(const DoutPrefixProvider *dpp, const string& marker, int max_events) {ceph_assert(false);}
698 virtual int remove_event(const DoutPrefixProvider *dpp, const string& event_id) {ceph_assert(false);}
eafe8130
TL
699 virtual void dump(Formatter* f) const {ceph_assert(false);}
700 };
11fdf7f2 701
eafe8130
TL
702 // subscription with templated list of events to support both S3 compliant and Ceph specific events
703 template<typename EventType>
704 class SubWithEvents : public Sub {
705 private:
11fdf7f2 706 struct list_events_result {
eafe8130 707 std::string next_marker;
11fdf7f2 708 bool is_truncated{false};
11fdf7f2 709 void dump(Formatter *f) const;
eafe8130
TL
710 std::vector<EventType> events;
711 } list;
11fdf7f2 712
eafe8130 713 public:
f67539c2 714 SubWithEvents(RGWPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {}
eafe8130
TL
715
716 virtual ~SubWithEvents() = default;
717
b3b6e05e
TL
718 int list_events(const DoutPrefixProvider *dpp, const string& marker, int max_events) override;
719 int remove_event(const DoutPrefixProvider *dpp, const string& event_id) override;
eafe8130 720 void dump(Formatter* f) const override;
11fdf7f2
TL
721 };
722
723 using BucketRef = std::shared_ptr<Bucket>;
724 using SubRef = std::shared_ptr<Sub>;
725
726 BucketRef get_bucket(const rgw_bucket& bucket) {
727 return std::make_shared<Bucket>(this, bucket);
728 }
729
730 SubRef get_sub(const string& sub) {
731 return std::make_shared<Sub>(this, sub);
732 }
eafe8130
TL
733
734 SubRef get_sub_with_events(const string& sub) {
735 auto tmpsub = Sub(this, sub);
736 rgw_pubsub_sub_config conf;
737 if (tmpsub.get_conf(&conf) < 0) {
738 return nullptr;
739 }
740 if (conf.s3_id.empty()) {
741 return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub);
742 }
f67539c2 743 return std::make_shared<SubWithEvents<rgw_pubsub_s3_event>>(this, sub);
eafe8130 744 }
11fdf7f2 745
f67539c2 746 void get_meta_obj(rgw_raw_obj *obj) const;
9f95a23c 747 void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const;
11fdf7f2 748
9f95a23c 749 void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const;
11fdf7f2 750
f67539c2 751 // get all topics (per tenant, if used)) and populate them into "result"
eafe8130 752 // return 0 on success or if no topics exist, error code otherwise
f67539c2 753 int get_topics(rgw_pubsub_topics *result);
eafe8130
TL
754 // get a topic with its subscriptions by its name and populate it into "result"
755 // return -ENOENT if the topic does not exists
756 // return 0 on success, error code otherwise
11fdf7f2 757 int get_topic(const string& name, rgw_pubsub_topic_subs *result);
eafe8130
TL
758 // get a topic with by its name and populate it into "result"
759 // return -ENOENT if the topic does not exists
760 // return 0 on success, error code otherwise
761 int get_topic(const string& name, rgw_pubsub_topic *result);
762 // create a topic with a name only
763 // if the topic already exists it is a no-op (considered success)
764 // return 0 on success, error code otherwise
b3b6e05e 765 int create_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y);
eafe8130
TL
766 // create a topic with push destination information and ARN
767 // if the topic already exists the destination and ARN values may be updated (considered succsess)
768 // return 0 on success, error code otherwise
b3b6e05e 769 int create_topic(const DoutPrefixProvider *dpp, const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y);
eafe8130
TL
770 // remove a topic according to its name
771 // if the topic does not exists it is a no-op (considered success)
772 // return 0 on success, error code otherwise
b3b6e05e 773 int remove_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y);
11fdf7f2
TL
774};
775
9f95a23c 776
11fdf7f2 777template <class T>
f67539c2 778int RGWPubSub::read(const rgw_raw_obj& obj, T* result, RGWObjVersionTracker* objv_tracker)
11fdf7f2
TL
779{
780 bufferlist bl;
9f95a23c 781 int ret = rgw_get_system_obj(obj_ctx,
11fdf7f2
TL
782 obj.pool, obj.oid,
783 bl,
784 objv_tracker,
9f95a23c 785 nullptr, null_yield, nullptr, nullptr);
11fdf7f2
TL
786 if (ret < 0) {
787 return ret;
788 }
789
790 auto iter = bl.cbegin();
791 try {
792 decode(*result, iter);
793 } catch (buffer::error& err) {
794 return -EIO;
795 }
796
797 return 0;
798}
799
800template <class T>
b3b6e05e 801int RGWPubSub::write(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, const T& info,
f67539c2 802 RGWObjVersionTracker* objv_tracker, optional_yield y)
11fdf7f2
TL
803{
804 bufferlist bl;
805 encode(info, bl);
806
b3b6e05e 807 int ret = rgw_put_system_obj(dpp, obj_ctx, obj.pool, obj.oid,
f67539c2
TL
808 bl, false, objv_tracker,
809 real_time(), y);
11fdf7f2
TL
810 if (ret < 0) {
811 return ret;
812 }
813
9f95a23c 814 obj_ctx.invalidate(obj);
11fdf7f2
TL
815 return 0;
816}
817
818#endif