]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_pubsub.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_pubsub.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #pragma once
5
6 #include "rgw_sal.h"
7 #include "rgw_tools.h"
8 #include "rgw_zone.h"
9 #include "rgw_notify_event_type.h"
10 #include <boost/container/flat_map.hpp>
11
12 class XMLObj;
13
14 struct rgw_s3_key_filter {
15 std::string prefix_rule;
16 std::string suffix_rule;
17 std::string regex_rule;
18
19 bool has_content() const;
20
21 bool decode_xml(XMLObj *obj);
22 void dump_xml(Formatter *f) const;
23
24 void encode(bufferlist& bl) const {
25 ENCODE_START(1, 1, bl);
26 encode(prefix_rule, bl);
27 encode(suffix_rule, bl);
28 encode(regex_rule, bl);
29 ENCODE_FINISH(bl);
30 }
31
32 void decode(bufferlist::const_iterator& bl) {
33 DECODE_START(1, bl);
34 decode(prefix_rule, bl);
35 decode(suffix_rule, bl);
36 decode(regex_rule, bl);
37 DECODE_FINISH(bl);
38 }
39 };
40 WRITE_CLASS_ENCODER(rgw_s3_key_filter)
41
42 using KeyValueMap = boost::container::flat_map<std::string, std::string>;
43 using KeyMultiValueMap = std::multimap<std::string, std::string>;
44
45 struct rgw_s3_key_value_filter {
46 KeyValueMap kv;
47
48 bool has_content() const;
49
50 bool decode_xml(XMLObj *obj);
51 void dump_xml(Formatter *f) const;
52
53 void encode(bufferlist& bl) const {
54 ENCODE_START(1, 1, bl);
55 encode(kv, bl);
56 ENCODE_FINISH(bl);
57 }
58 void decode(bufferlist::const_iterator& bl) {
59 DECODE_START(1, bl);
60 decode(kv, bl);
61 DECODE_FINISH(bl);
62 }
63 };
64 WRITE_CLASS_ENCODER(rgw_s3_key_value_filter)
65
66 struct rgw_s3_filter {
67 rgw_s3_key_filter key_filter;
68 rgw_s3_key_value_filter metadata_filter;
69 rgw_s3_key_value_filter tag_filter;
70
71 bool has_content() const;
72
73 bool decode_xml(XMLObj *obj);
74 void dump_xml(Formatter *f) const;
75
76 void encode(bufferlist& bl) const {
77 ENCODE_START(2, 1, bl);
78 encode(key_filter, bl);
79 encode(metadata_filter, bl);
80 encode(tag_filter, bl);
81 ENCODE_FINISH(bl);
82 }
83
84 void decode(bufferlist::const_iterator& bl) {
85 DECODE_START(2, bl);
86 decode(key_filter, bl);
87 decode(metadata_filter, bl);
88 if (struct_v >= 2) {
89 decode(tag_filter, bl);
90 }
91 DECODE_FINISH(bl);
92 }
93 };
94 WRITE_CLASS_ENCODER(rgw_s3_filter)
95
96 using OptionalFilter = std::optional<rgw_s3_filter>;
97
98 struct rgw_pubsub_topic_filter;
99 /* S3 notification configuration
100 * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
101 <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
102 <TopicConfiguration>
103 <Filter>
104 <S3Key>
105 <FilterRule>
106 <Name>suffix</Name>
107 <Value>jpg</Value>
108 </FilterRule>
109 </S3Key>
110 <S3Metadata>
111 <FilterRule>
112 <Name></Name>
113 <Value></Value>
114 </FilterRule>
115 </S3Metadata>
116 <S3Tags>
117 <FilterRule>
118 <Name></Name>
119 <Value></Value>
120 </FilterRule>
121 </S3Tags>
122 </Filter>
123 <Id>notification1</Id>
124 <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
125 <Event>s3:ObjectCreated:*</Event>
126 <Event>s3:ObjectRemoved:*</Event>
127 </TopicConfiguration>
128 </NotificationConfiguration>
129 */
130 struct rgw_pubsub_s3_notification {
131 // notification id
132 std::string id;
133 // types of events
134 rgw::notify::EventTypeList events;
135 // topic ARN
136 std::string topic_arn;
137 // filter rules
138 rgw_s3_filter filter;
139
140 bool decode_xml(XMLObj *obj);
141 void dump_xml(Formatter *f) const;
142
143 rgw_pubsub_s3_notification() = default;
144 // construct from rgw_pubsub_topic_filter (used by get/list notifications)
145 explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
146 };
147
148 // return true if the key matches the prefix/suffix/regex rules of the key filter
149 bool match(const rgw_s3_key_filter& filter, const std::string& key);
150
151 // return true if the key matches the metadata rules of the metadata filter
152 bool match(const rgw_s3_key_value_filter& filter, const KeyValueMap& kv);
153
154 // return true if the key matches the tag rules of the tag filter
155 bool match(const rgw_s3_key_value_filter& filter, const KeyMultiValueMap& kv);
156
157 // return true if the event type matches (equal or contained in) one of the events in the list
158 bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event);
159
160 struct rgw_pubsub_s3_notifications {
161 std::list<rgw_pubsub_s3_notification> list;
162 bool decode_xml(XMLObj *obj);
163 void dump_xml(Formatter *f) const;
164 };
165
166 /* S3 event records structure
167 * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
168 {
169 "Records":[
170 {
171 "eventVersion":""
172 "eventSource":"",
173 "awsRegion":"",
174 "eventTime":"",
175 "eventName":"",
176 "userIdentity":{
177 "principalId":""
178 },
179 "requestParameters":{
180 "sourceIPAddress":""
181 },
182 "responseElements":{
183 "x-amz-request-id":"",
184 "x-amz-id-2":""
185 },
186 "s3":{
187 "s3SchemaVersion":"1.0",
188 "configurationId":"",
189 "bucket":{
190 "name":"",
191 "ownerIdentity":{
192 "principalId":""
193 },
194 "arn":""
195 "id": ""
196 },
197 "object":{
198 "key":"",
199 "size": ,
200 "eTag":"",
201 "versionId":"",
202 "sequencer": "",
203 "metadata": ""
204 "tags": ""
205 }
206 },
207 "eventId":"",
208 }
209 ]
210 }*/
211
212 struct rgw_pubsub_s3_event {
213 constexpr static const char* const json_type_plural = "Records";
214 std::string eventVersion = "2.2";
215 // aws:s3
216 std::string eventSource = "ceph:s3";
217 // zonegroup
218 std::string awsRegion;
219 // time of the request
220 ceph::real_time eventTime;
221 // type of the event
222 std::string eventName;
223 // user that sent the request
224 std::string userIdentity;
225 // IP address of source of the request (not implemented)
226 std::string sourceIPAddress;
227 // request ID (not implemented)
228 std::string x_amz_request_id;
229 // radosgw that received the request
230 std::string x_amz_id_2;
231 std::string s3SchemaVersion = "1.0";
232 // ID received in the notification request
233 std::string configurationId;
234 // bucket name
235 std::string bucket_name;
236 // bucket owner
237 std::string bucket_ownerIdentity;
238 // bucket ARN
239 std::string bucket_arn;
240 // object key
241 std::string object_key;
242 // object size
243 uint64_t object_size = 0;
244 // object etag
245 std::string object_etag;
246 // object version id bucket is versioned
247 std::string object_versionId;
248 // hexadecimal value used to determine event order for specific key
249 std::string object_sequencer;
250 // this is an rgw extension (not S3 standard)
251 // used to store a globally unique identifier of the event
252 // that could be used for acking or any other identification of the event
253 std::string id;
254 // this is an rgw extension holding the internal bucket id
255 std::string bucket_id;
256 // meta data
257 KeyValueMap x_meta_map;
258 // tags
259 KeyMultiValueMap tags;
260 // opaque data received from the topic
261 // could be used to identify the gateway
262 std::string opaque_data;
263
264 void encode(bufferlist& bl) const {
265 ENCODE_START(4, 1, bl);
266 encode(eventVersion, bl);
267 encode(eventSource, bl);
268 encode(awsRegion, bl);
269 encode(eventTime, bl);
270 encode(eventName, bl);
271 encode(userIdentity, bl);
272 encode(sourceIPAddress, bl);
273 encode(x_amz_request_id, bl);
274 encode(x_amz_id_2, bl);
275 encode(s3SchemaVersion, bl);
276 encode(configurationId, bl);
277 encode(bucket_name, bl);
278 encode(bucket_ownerIdentity, bl);
279 encode(bucket_arn, bl);
280 encode(object_key, bl);
281 encode(object_size, bl);
282 encode(object_etag, bl);
283 encode(object_versionId, bl);
284 encode(object_sequencer, bl);
285 encode(id, bl);
286 encode(bucket_id, bl);
287 encode(x_meta_map, bl);
288 encode(tags, bl);
289 encode(opaque_data, bl);
290 ENCODE_FINISH(bl);
291 }
292
293 void decode(bufferlist::const_iterator& bl) {
294 DECODE_START(4, bl);
295 decode(eventVersion, bl);
296 decode(eventSource, bl);
297 decode(awsRegion, bl);
298 decode(eventTime, bl);
299 decode(eventName, bl);
300 decode(userIdentity, bl);
301 decode(sourceIPAddress, bl);
302 decode(x_amz_request_id, bl);
303 decode(x_amz_id_2, bl);
304 decode(s3SchemaVersion, bl);
305 decode(configurationId, bl);
306 decode(bucket_name, bl);
307 decode(bucket_ownerIdentity, bl);
308 decode(bucket_arn, bl);
309 decode(object_key, bl);
310 decode(object_size, bl);
311 decode(object_etag, bl);
312 decode(object_versionId, bl);
313 decode(object_sequencer, bl);
314 decode(id, bl);
315 if (struct_v >= 2) {
316 decode(bucket_id, bl);
317 decode(x_meta_map, bl);
318 }
319 if (struct_v >= 3) {
320 decode(tags, bl);
321 }
322 if (struct_v >= 4) {
323 decode(opaque_data, bl);
324 }
325 DECODE_FINISH(bl);
326 }
327
328 void dump(Formatter *f) const;
329 };
330 WRITE_CLASS_ENCODER(rgw_pubsub_s3_event)
331
332 // setting a unique ID for an event based on object hash and timestamp
333 void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
334
335 struct rgw_pubsub_dest {
336 std::string push_endpoint;
337 std::string push_endpoint_args;
338 std::string arn_topic;
339 bool stored_secret = false;
340 bool persistent = false;
341
342 void encode(bufferlist& bl) const {
343 ENCODE_START(5, 1, bl);
344 encode("", bl);
345 encode("", bl);
346 encode(push_endpoint, bl);
347 encode(push_endpoint_args, bl);
348 encode(arn_topic, bl);
349 encode(stored_secret, bl);
350 encode(persistent, bl);
351 ENCODE_FINISH(bl);
352 }
353
354 void decode(bufferlist::const_iterator& bl) {
355 DECODE_START(5, bl);
356 std::string dummy;
357 decode(dummy, bl);
358 decode(dummy, bl);
359 decode(push_endpoint, bl);
360 if (struct_v >= 2) {
361 decode(push_endpoint_args, bl);
362 }
363 if (struct_v >= 3) {
364 decode(arn_topic, bl);
365 }
366 if (struct_v >= 4) {
367 decode(stored_secret, bl);
368 }
369 if (struct_v >= 5) {
370 decode(persistent, bl);
371 }
372 DECODE_FINISH(bl);
373 }
374
375 void dump(Formatter *f) const;
376 void dump_xml(Formatter *f) const;
377 std::string to_json_str() const;
378 };
379 WRITE_CLASS_ENCODER(rgw_pubsub_dest)
380
381 struct rgw_pubsub_topic {
382 rgw_user user;
383 std::string name;
384 rgw_pubsub_dest dest;
385 std::string arn;
386 std::string opaque_data;
387
388 void encode(bufferlist& bl) const {
389 ENCODE_START(3, 1, bl);
390 encode(user, bl);
391 encode(name, bl);
392 encode(dest, bl);
393 encode(arn, bl);
394 encode(opaque_data, bl);
395 ENCODE_FINISH(bl);
396 }
397
398 void decode(bufferlist::const_iterator& bl) {
399 DECODE_START(3, bl);
400 decode(user, bl);
401 decode(name, bl);
402 if (struct_v >= 2) {
403 decode(dest, bl);
404 decode(arn, bl);
405 }
406 if (struct_v >= 3) {
407 decode(opaque_data, bl);
408 }
409 DECODE_FINISH(bl);
410 }
411
412 std::string to_str() const {
413 return user.tenant + "/" + name;
414 }
415
416 void dump(Formatter *f) const;
417 void dump_xml(Formatter *f) const;
418 void dump_xml_as_attributes(Formatter *f) const;
419
420 bool operator<(const rgw_pubsub_topic& t) const {
421 return to_str().compare(t.to_str());
422 }
423 };
424 WRITE_CLASS_ENCODER(rgw_pubsub_topic)
425
426 // this struct deprecated and remain only for backward compatibility
427 struct rgw_pubsub_topic_subs {
428 rgw_pubsub_topic topic;
429 std::set<std::string> subs;
430
431 void encode(bufferlist& bl) const {
432 ENCODE_START(1, 1, bl);
433 encode(topic, bl);
434 encode(subs, bl);
435 ENCODE_FINISH(bl);
436 }
437
438 void decode(bufferlist::const_iterator& bl) {
439 DECODE_START(1, bl);
440 decode(topic, bl);
441 decode(subs, bl);
442 DECODE_FINISH(bl);
443 }
444
445 void dump(Formatter *f) const;
446 };
447 WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)
448
449 struct rgw_pubsub_topic_filter {
450 rgw_pubsub_topic topic;
451 rgw::notify::EventTypeList events;
452 std::string s3_id;
453 rgw_s3_filter s3_filter;
454
455 void encode(bufferlist& bl) const {
456 ENCODE_START(3, 1, bl);
457 encode(topic, bl);
458 // events are stored as a vector of std::strings
459 std::vector<std::string> tmp_events;
460 std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), rgw::notify::to_string);
461 encode(tmp_events, bl);
462 encode(s3_id, bl);
463 encode(s3_filter, bl);
464 ENCODE_FINISH(bl);
465 }
466
467 void decode(bufferlist::const_iterator& bl) {
468 DECODE_START(3, bl);
469 decode(topic, bl);
470 // events are stored as a vector of std::strings
471 events.clear();
472 std::vector<std::string> tmp_events;
473 decode(tmp_events, bl);
474 std::transform(tmp_events.begin(), tmp_events.end(), std::back_inserter(events), rgw::notify::from_string);
475 if (struct_v >= 2) {
476 decode(s3_id, bl);
477 }
478 if (struct_v >= 3) {
479 decode(s3_filter, bl);
480 }
481 DECODE_FINISH(bl);
482 }
483
484 void dump(Formatter *f) const;
485 };
486 WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter)
487
488 struct rgw_pubsub_bucket_topics {
489 std::map<std::string, rgw_pubsub_topic_filter> topics;
490
491 void encode(bufferlist& bl) const {
492 ENCODE_START(1, 1, bl);
493 encode(topics, bl);
494 ENCODE_FINISH(bl);
495 }
496
497 void decode(bufferlist::const_iterator& bl) {
498 DECODE_START(1, bl);
499 decode(topics, bl);
500 DECODE_FINISH(bl);
501 }
502
503 void dump(Formatter *f) const;
504 };
505 WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
506
507 struct rgw_pubsub_topics {
508 std::map<std::string, rgw_pubsub_topic> topics;
509
510 void encode(bufferlist& bl) const {
511 ENCODE_START(2, 2, bl);
512 encode(topics, bl);
513 ENCODE_FINISH(bl);
514 }
515
516 void decode(bufferlist::const_iterator& bl) {
517 DECODE_START(2, bl);
518 if (struct_v >= 2) {
519 decode(topics, bl);
520 } else {
521 std::map<std::string, rgw_pubsub_topic_subs> v1topics;
522 decode(v1topics, bl);
523 std::transform(v1topics.begin(), v1topics.end(), std::inserter(topics, topics.end()),
524 [](const auto& entry) {
525 return std::pair<std::string, rgw_pubsub_topic>(entry.first, entry.second.topic);
526 });
527 }
528 DECODE_FINISH(bl);
529 }
530
531 void dump(Formatter *f) const;
532 void dump_xml(Formatter *f) const;
533 };
534 WRITE_CLASS_ENCODER(rgw_pubsub_topics)
535
536 class RGWPubSub
537 {
538 friend class Bucket;
539
540 rgw::sal::Driver* const driver;
541 const std::string tenant;
542
543 int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
544 RGWObjVersionTracker* objv_tracker, optional_yield y) const;
545 int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
546 RGWObjVersionTracker* objv_tracker, optional_yield y) const;
547
548 public:
549 RGWPubSub(rgw::sal::Driver* _driver, const std::string& tenant);
550
551 class Bucket {
552 friend class RGWPubSub;
553 const RGWPubSub& ps;
554 rgw::sal::Bucket* const bucket;
555
556 // read the list of topics associated with a bucket and populate into result
557 // use version tacker to enforce atomicity between read/write
558 // return 0 on success or if no topic was associated with the bucket, error code otherwise
559 int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result,
560 RGWObjVersionTracker* objv_tracker, optional_yield y) const;
561 // set the list of topics associated with a bucket
562 // use version tacker to enforce atomicity between read/write
563 // return 0 on success, error code otherwise
564 int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics,
565 RGWObjVersionTracker* objv_tracker, optional_yield y) const;
566 public:
567 Bucket(const RGWPubSub& _ps, rgw::sal::Bucket* _bucket) :
568 ps(_ps), bucket(_bucket)
569 {}
570
571 // get the list of topics associated with a bucket and populate into result
572 // return 0 on success or if no topic was associated with the bucket, error code otherwise
573 int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result, optional_yield y) const {
574 return read_topics(dpp, result, nullptr, y);
575 }
576 // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
577 // assigning a notification name is optional (needed for S3 compatible notifications)
578 // if the topic already exist on the bucket, the filter event list may be updated
579 // for S3 compliant notifications the version with: s3_filter and notif_name should be used
580 // return -ENOENT if the topic does not exists
581 // return 0 on success, error code otherwise
582 int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
583 const rgw::notify::EventTypeList& events, optional_yield y) const;
584 int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
585 const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const;
586 // remove a topic and filter from bucket
587 // if the topic does not exists on the bucket it is a no-op (considered success)
588 // return -ENOENT if the topic does not exists
589 // return 0 on success, error code otherwise
590 int remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y) const;
591 // remove all notifications (and autogenerated topics) associated with the bucket
592 // return 0 on success or if no topic was associated with the bucket, error code otherwise
593 int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const;
594 };
595
596 // get the list of topics
597 // return 0 on success or if no topic was associated with the bucket, error code otherwise
598 int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, optional_yield y) const {
599 return read_topics(dpp, result, nullptr, y);
600 }
601 // get a topic with by its name and populate it into "result"
602 // return -ENOENT if the topic does not exists
603 // return 0 on success, error code otherwise
604 int get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const;
605 // create a topic with a name only
606 // if the topic already exists it is a no-op (considered success)
607 // return 0 on success, error code otherwise
608 int create_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const;
609 // create a topic with push destination information and ARN
610 // if the topic already exists the destination and ARN values may be updated (considered succsess)
611 // return 0 on success, error code otherwise
612 int create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_dest& dest,
613 const std::string& arn, const std::string& opaque_data, optional_yield y) const;
614 // remove a topic according to its name
615 // if the topic does not exists it is a no-op (considered success)
616 // return 0 on success, error code otherwise
617 int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const;
618 };
619