]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_pubsub.h
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / rgw / rgw_pubsub.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #pragma once
5
6 #include "rgw_sal.h"
7 #include "rgw_tools.h"
8 #include "rgw_zone.h"
9 #include "rgw_notify_event_type.h"
10 #include <boost/container/flat_map.hpp>
11
12 class XMLObj;
13
14 struct rgw_s3_key_filter {
15 std::string prefix_rule;
16 std::string suffix_rule;
17 std::string regex_rule;
18
19 bool has_content() const;
20
21 void dump(Formatter *f) const;
22 bool decode_xml(XMLObj *obj);
23 void dump_xml(Formatter *f) const;
24
25 void encode(bufferlist& bl) const {
26 ENCODE_START(1, 1, bl);
27 encode(prefix_rule, bl);
28 encode(suffix_rule, bl);
29 encode(regex_rule, bl);
30 ENCODE_FINISH(bl);
31 }
32
33 void decode(bufferlist::const_iterator& bl) {
34 DECODE_START(1, bl);
35 decode(prefix_rule, bl);
36 decode(suffix_rule, bl);
37 decode(regex_rule, bl);
38 DECODE_FINISH(bl);
39 }
40 };
41 WRITE_CLASS_ENCODER(rgw_s3_key_filter)
42
43 using KeyValueMap = boost::container::flat_map<std::string, std::string>;
44 using KeyMultiValueMap = std::multimap<std::string, std::string>;
45
46 struct rgw_s3_key_value_filter {
47 KeyValueMap kv;
48
49 bool has_content() const;
50
51 void dump(Formatter *f) const;
52 bool decode_xml(XMLObj *obj);
53 void dump_xml(Formatter *f) const;
54
55 void encode(bufferlist& bl) const {
56 ENCODE_START(1, 1, bl);
57 encode(kv, bl);
58 ENCODE_FINISH(bl);
59 }
60 void decode(bufferlist::const_iterator& bl) {
61 DECODE_START(1, bl);
62 decode(kv, bl);
63 DECODE_FINISH(bl);
64 }
65 };
66 WRITE_CLASS_ENCODER(rgw_s3_key_value_filter)
67
68 struct rgw_s3_filter {
69 rgw_s3_key_filter key_filter;
70 rgw_s3_key_value_filter metadata_filter;
71 rgw_s3_key_value_filter tag_filter;
72
73 bool has_content() const;
74
75 void dump(Formatter *f) const;
76 bool decode_xml(XMLObj *obj);
77 void dump_xml(Formatter *f) const;
78
79 void encode(bufferlist& bl) const {
80 ENCODE_START(2, 1, bl);
81 encode(key_filter, bl);
82 encode(metadata_filter, bl);
83 encode(tag_filter, bl);
84 ENCODE_FINISH(bl);
85 }
86
87 void decode(bufferlist::const_iterator& bl) {
88 DECODE_START(2, bl);
89 decode(key_filter, bl);
90 decode(metadata_filter, bl);
91 if (struct_v >= 2) {
92 decode(tag_filter, bl);
93 }
94 DECODE_FINISH(bl);
95 }
96 };
97 WRITE_CLASS_ENCODER(rgw_s3_filter)
98
99 using OptionalFilter = std::optional<rgw_s3_filter>;
100
101 struct rgw_pubsub_topic_filter;
102 /* S3 notification configuration
103 * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
104 <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
105 <TopicConfiguration>
106 <Filter>
107 <S3Key>
108 <FilterRule>
109 <Name>suffix</Name>
110 <Value>jpg</Value>
111 </FilterRule>
112 </S3Key>
113 <S3Metadata>
114 <FilterRule>
115 <Name></Name>
116 <Value></Value>
117 </FilterRule>
118 </S3Metadata>
119 <S3Tags>
120 <FilterRule>
121 <Name></Name>
122 <Value></Value>
123 </FilterRule>
124 </S3Tags>
125 </Filter>
126 <Id>notification1</Id>
127 <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
128 <Event>s3:ObjectCreated:*</Event>
129 <Event>s3:ObjectRemoved:*</Event>
130 </TopicConfiguration>
131 </NotificationConfiguration>
132 */
133 struct rgw_pubsub_s3_notification {
134 // notification id
135 std::string id;
136 // types of events
137 rgw::notify::EventTypeList events;
138 // topic ARN
139 std::string topic_arn;
140 // filter rules
141 rgw_s3_filter filter;
142
143 bool decode_xml(XMLObj *obj);
144 void dump_xml(Formatter *f) const;
145
146 rgw_pubsub_s3_notification() = default;
147 // construct from rgw_pubsub_topic_filter (used by get/list notifications)
148 explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
149 };
150
151 // return true if the key matches the prefix/suffix/regex rules of the key filter
152 bool match(const rgw_s3_key_filter& filter, const std::string& key);
153
154 // return true if the key matches the metadata rules of the metadata filter
155 bool match(const rgw_s3_key_value_filter& filter, const KeyValueMap& kv);
156
157 // return true if the key matches the tag rules of the tag filter
158 bool match(const rgw_s3_key_value_filter& filter, const KeyMultiValueMap& kv);
159
160 // return true if the event type matches (equal or contained in) one of the events in the list
161 bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event);
162
163 struct rgw_pubsub_s3_notifications {
164 std::list<rgw_pubsub_s3_notification> list;
165 bool decode_xml(XMLObj *obj);
166 void dump_xml(Formatter *f) const;
167 };
168
169 /* S3 event records structure
170 * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
171 {
172 "Records":[
173 {
174 "eventVersion":""
175 "eventSource":"",
176 "awsRegion":"",
177 "eventTime":"",
178 "eventName":"",
179 "userIdentity":{
180 "principalId":""
181 },
182 "requestParameters":{
183 "sourceIPAddress":""
184 },
185 "responseElements":{
186 "x-amz-request-id":"",
187 "x-amz-id-2":""
188 },
189 "s3":{
190 "s3SchemaVersion":"1.0",
191 "configurationId":"",
192 "bucket":{
193 "name":"",
194 "ownerIdentity":{
195 "principalId":""
196 },
197 "arn":""
198 "id": ""
199 },
200 "object":{
201 "key":"",
202 "size": ,
203 "eTag":"",
204 "versionId":"",
205 "sequencer": "",
206 "metadata": ""
207 "tags": ""
208 }
209 },
210 "eventId":"",
211 }
212 ]
213 }*/
214
215 struct rgw_pubsub_s3_event {
216 constexpr static const char* const json_type_plural = "Records";
217 std::string eventVersion = "2.2";
218 // aws:s3
219 std::string eventSource = "ceph:s3";
220 // zonegroup
221 std::string awsRegion;
222 // time of the request
223 ceph::real_time eventTime;
224 // type of the event
225 std::string eventName;
226 // user that sent the request
227 std::string userIdentity;
228 // IP address of source of the request (not implemented)
229 std::string sourceIPAddress;
230 // request ID (not implemented)
231 std::string x_amz_request_id;
232 // radosgw that received the request
233 std::string x_amz_id_2;
234 std::string s3SchemaVersion = "1.0";
235 // ID received in the notification request
236 std::string configurationId;
237 // bucket name
238 std::string bucket_name;
239 // bucket owner
240 std::string bucket_ownerIdentity;
241 // bucket ARN
242 std::string bucket_arn;
243 // object key
244 std::string object_key;
245 // object size
246 uint64_t object_size = 0;
247 // object etag
248 std::string object_etag;
249 // object version id bucket is versioned
250 std::string object_versionId;
251 // hexadecimal value used to determine event order for specific key
252 std::string object_sequencer;
253 // this is an rgw extension (not S3 standard)
254 // used to store a globally unique identifier of the event
255 // that could be used for acking or any other identification of the event
256 std::string id;
257 // this is an rgw extension holding the internal bucket id
258 std::string bucket_id;
259 // meta data
260 KeyValueMap x_meta_map;
261 // tags
262 KeyMultiValueMap tags;
263 // opaque data received from the topic
264 // could be used to identify the gateway
265 std::string opaque_data;
266
267 void encode(bufferlist& bl) const {
268 ENCODE_START(4, 1, bl);
269 encode(eventVersion, bl);
270 encode(eventSource, bl);
271 encode(awsRegion, bl);
272 encode(eventTime, bl);
273 encode(eventName, bl);
274 encode(userIdentity, bl);
275 encode(sourceIPAddress, bl);
276 encode(x_amz_request_id, bl);
277 encode(x_amz_id_2, bl);
278 encode(s3SchemaVersion, bl);
279 encode(configurationId, bl);
280 encode(bucket_name, bl);
281 encode(bucket_ownerIdentity, bl);
282 encode(bucket_arn, bl);
283 encode(object_key, bl);
284 encode(object_size, bl);
285 encode(object_etag, bl);
286 encode(object_versionId, bl);
287 encode(object_sequencer, bl);
288 encode(id, bl);
289 encode(bucket_id, bl);
290 encode(x_meta_map, bl);
291 encode(tags, bl);
292 encode(opaque_data, bl);
293 ENCODE_FINISH(bl);
294 }
295
296 void decode(bufferlist::const_iterator& bl) {
297 DECODE_START(4, bl);
298 decode(eventVersion, bl);
299 decode(eventSource, bl);
300 decode(awsRegion, bl);
301 decode(eventTime, bl);
302 decode(eventName, bl);
303 decode(userIdentity, bl);
304 decode(sourceIPAddress, bl);
305 decode(x_amz_request_id, bl);
306 decode(x_amz_id_2, bl);
307 decode(s3SchemaVersion, bl);
308 decode(configurationId, bl);
309 decode(bucket_name, bl);
310 decode(bucket_ownerIdentity, bl);
311 decode(bucket_arn, bl);
312 decode(object_key, bl);
313 decode(object_size, bl);
314 decode(object_etag, bl);
315 decode(object_versionId, bl);
316 decode(object_sequencer, bl);
317 decode(id, bl);
318 if (struct_v >= 2) {
319 decode(bucket_id, bl);
320 decode(x_meta_map, bl);
321 }
322 if (struct_v >= 3) {
323 decode(tags, bl);
324 }
325 if (struct_v >= 4) {
326 decode(opaque_data, bl);
327 }
328 DECODE_FINISH(bl);
329 }
330
331 void dump(Formatter *f) const;
332 };
333 WRITE_CLASS_ENCODER(rgw_pubsub_s3_event)
334
335 // setting a unique ID for an event based on object hash and timestamp
336 void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
337
338 struct rgw_pubsub_dest {
339 std::string push_endpoint;
340 std::string push_endpoint_args;
341 std::string arn_topic;
342 bool stored_secret = false;
343 bool persistent = false;
344
345 void encode(bufferlist& bl) const {
346 ENCODE_START(5, 1, bl);
347 encode("", bl);
348 encode("", bl);
349 encode(push_endpoint, bl);
350 encode(push_endpoint_args, bl);
351 encode(arn_topic, bl);
352 encode(stored_secret, bl);
353 encode(persistent, bl);
354 ENCODE_FINISH(bl);
355 }
356
357 void decode(bufferlist::const_iterator& bl) {
358 DECODE_START(5, bl);
359 std::string dummy;
360 decode(dummy, bl);
361 decode(dummy, bl);
362 decode(push_endpoint, bl);
363 if (struct_v >= 2) {
364 decode(push_endpoint_args, bl);
365 }
366 if (struct_v >= 3) {
367 decode(arn_topic, bl);
368 }
369 if (struct_v >= 4) {
370 decode(stored_secret, bl);
371 }
372 if (struct_v >= 5) {
373 decode(persistent, bl);
374 }
375 DECODE_FINISH(bl);
376 }
377
378 void dump(Formatter *f) const;
379 void dump_xml(Formatter *f) const;
380 std::string to_json_str() const;
381 };
382 WRITE_CLASS_ENCODER(rgw_pubsub_dest)
383
384 struct rgw_pubsub_topic {
385 rgw_user user;
386 std::string name;
387 rgw_pubsub_dest dest;
388 std::string arn;
389 std::string opaque_data;
390
391 void encode(bufferlist& bl) const {
392 ENCODE_START(3, 1, bl);
393 encode(user, bl);
394 encode(name, bl);
395 encode(dest, bl);
396 encode(arn, bl);
397 encode(opaque_data, bl);
398 ENCODE_FINISH(bl);
399 }
400
401 void decode(bufferlist::const_iterator& bl) {
402 DECODE_START(3, bl);
403 decode(user, bl);
404 decode(name, bl);
405 if (struct_v >= 2) {
406 decode(dest, bl);
407 decode(arn, bl);
408 }
409 if (struct_v >= 3) {
410 decode(opaque_data, bl);
411 }
412 DECODE_FINISH(bl);
413 }
414
415 std::string to_str() const {
416 return user.tenant + "/" + name;
417 }
418
419 void dump(Formatter *f) const;
420 void dump_xml(Formatter *f) const;
421 void dump_xml_as_attributes(Formatter *f) const;
422
423 bool operator<(const rgw_pubsub_topic& t) const {
424 return to_str().compare(t.to_str());
425 }
426 };
427 WRITE_CLASS_ENCODER(rgw_pubsub_topic)
428
429 // this struct deprecated and remain only for backward compatibility
430 struct rgw_pubsub_topic_subs {
431 rgw_pubsub_topic topic;
432 std::set<std::string> subs;
433
434 void encode(bufferlist& bl) const {
435 ENCODE_START(1, 1, bl);
436 encode(topic, bl);
437 encode(subs, bl);
438 ENCODE_FINISH(bl);
439 }
440
441 void decode(bufferlist::const_iterator& bl) {
442 DECODE_START(1, bl);
443 decode(topic, bl);
444 decode(subs, bl);
445 DECODE_FINISH(bl);
446 }
447
448 void dump(Formatter *f) const;
449 };
450 WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)
451
452 struct rgw_pubsub_topic_filter {
453 rgw_pubsub_topic topic;
454 rgw::notify::EventTypeList events;
455 std::string s3_id;
456 rgw_s3_filter s3_filter;
457
458 void encode(bufferlist& bl) const {
459 ENCODE_START(3, 1, bl);
460 encode(topic, bl);
461 // events are stored as a vector of std::strings
462 std::vector<std::string> tmp_events;
463 std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), rgw::notify::to_string);
464 encode(tmp_events, bl);
465 encode(s3_id, bl);
466 encode(s3_filter, bl);
467 ENCODE_FINISH(bl);
468 }
469
470 void decode(bufferlist::const_iterator& bl) {
471 DECODE_START(3, bl);
472 decode(topic, bl);
473 // events are stored as a vector of std::strings
474 events.clear();
475 std::vector<std::string> tmp_events;
476 decode(tmp_events, bl);
477 std::transform(tmp_events.begin(), tmp_events.end(), std::back_inserter(events), rgw::notify::from_string);
478 if (struct_v >= 2) {
479 decode(s3_id, bl);
480 }
481 if (struct_v >= 3) {
482 decode(s3_filter, bl);
483 }
484 DECODE_FINISH(bl);
485 }
486
487 void dump(Formatter *f) const;
488 };
489 WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter)
490
491 struct rgw_pubsub_bucket_topics {
492 std::map<std::string, rgw_pubsub_topic_filter> topics;
493
494 void encode(bufferlist& bl) const {
495 ENCODE_START(1, 1, bl);
496 encode(topics, bl);
497 ENCODE_FINISH(bl);
498 }
499
500 void decode(bufferlist::const_iterator& bl) {
501 DECODE_START(1, bl);
502 decode(topics, bl);
503 DECODE_FINISH(bl);
504 }
505
506 void dump(Formatter *f) const;
507 };
508 WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
509
510 struct rgw_pubsub_topics {
511 std::map<std::string, rgw_pubsub_topic> topics;
512
513 void encode(bufferlist& bl) const {
514 ENCODE_START(2, 2, bl);
515 encode(topics, bl);
516 ENCODE_FINISH(bl);
517 }
518
519 void decode(bufferlist::const_iterator& bl) {
520 DECODE_START(2, bl);
521 if (struct_v >= 2) {
522 decode(topics, bl);
523 } else {
524 std::map<std::string, rgw_pubsub_topic_subs> v1topics;
525 decode(v1topics, bl);
526 std::transform(v1topics.begin(), v1topics.end(), std::inserter(topics, topics.end()),
527 [](const auto& entry) {
528 return std::pair<std::string, rgw_pubsub_topic>(entry.first, entry.second.topic);
529 });
530 }
531 DECODE_FINISH(bl);
532 }
533
534 void dump(Formatter *f) const;
535 void dump_xml(Formatter *f) const;
536 };
537 WRITE_CLASS_ENCODER(rgw_pubsub_topics)
538
539 class RGWPubSub
540 {
541 friend class Bucket;
542
543 rgw::sal::Driver* const driver;
544 const std::string tenant;
545
546 int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
547 RGWObjVersionTracker* objv_tracker, optional_yield y) const;
548 int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
549 RGWObjVersionTracker* objv_tracker, optional_yield y) const;
550
551 public:
552 RGWPubSub(rgw::sal::Driver* _driver, const std::string& tenant);
553
554 class Bucket {
555 friend class RGWPubSub;
556 const RGWPubSub& ps;
557 rgw::sal::Bucket* const bucket;
558
559 // read the list of topics associated with a bucket and populate into result
560 // use version tacker to enforce atomicity between read/write
561 // return 0 on success or if no topic was associated with the bucket, error code otherwise
562 int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result,
563 RGWObjVersionTracker* objv_tracker, optional_yield y) const;
564 // set the list of topics associated with a bucket
565 // use version tacker to enforce atomicity between read/write
566 // return 0 on success, error code otherwise
567 int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics,
568 RGWObjVersionTracker* objv_tracker, optional_yield y) const;
569 int remove_notification_inner(const DoutPrefixProvider *dpp, const std::string& notification_id,
570 bool notif_id_or_topic, optional_yield y) const;
571 public:
572 Bucket(const RGWPubSub& _ps, rgw::sal::Bucket* _bucket) :
573 ps(_ps), bucket(_bucket)
574 {}
575
576 // get the list of topics associated with a bucket and populate into result
577 // return 0 on success or if no topic was associated with the bucket, error code otherwise
578 int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result, optional_yield y) const {
579 return read_topics(dpp, result, nullptr, y);
580 }
581 // get a bucket_topic with by its name and populate it into "result"
582 // return -ENOENT if the topic does not exists
583 // return 0 on success, error code otherwise
584 int get_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notification_id, rgw_pubsub_topic_filter& result, optional_yield y) const;
585 // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
586 // assigning a notification name is optional (needed for S3 compatible notifications)
587 // if the topic already exist on the bucket, the filter event list may be updated
588 // for S3 compliant notifications the version with: s3_filter and notif_name should be used
589 // return -ENOENT if the topic does not exists
590 // return 0 on success, error code otherwise
591 int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
592 const rgw::notify::EventTypeList& events, optional_yield y) const;
593 int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
594 const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const;
595 // remove a topic and filter from bucket
596 // if the topic does not exists on the bucket it is a no-op (considered success)
597 // return -ENOENT if the notification-id/topic does not exists
598 // return 0 on success, error code otherwise
599 int remove_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notif_id, optional_yield y) const;
600 int remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y) const;
601 // remove all notifications (and autogenerated topics) associated with the bucket
602 // return 0 on success or if no topic was associated with the bucket, error code otherwise
603 int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const;
604 };
605
606 // get the list of topics
607 // return 0 on success or if no topic was associated with the bucket, error code otherwise
608 int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, optional_yield y) const {
609 return read_topics(dpp, result, nullptr, y);
610 }
611 // get a topic with by its name and populate it into "result"
612 // return -ENOENT if the topic does not exists
613 // return 0 on success, error code otherwise
614 int get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const;
615 // create a topic with a name only
616 // if the topic already exists it is a no-op (considered success)
617 // return 0 on success, error code otherwise
618 int create_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const;
619 // create a topic with push destination information and ARN
620 // if the topic already exists the destination and ARN values may be updated (considered succsess)
621 // return 0 on success, error code otherwise
622 int create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_dest& dest,
623 const std::string& arn, const std::string& opaque_data, optional_yield y) const;
624 // remove a topic according to its name
625 // if the topic does not exists it is a no-op (considered success)
626 // return 0 on success, error code otherwise
627 int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const;
628 };
629