]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
11fdf7f2 TL |
4 | #ifndef CEPH_RGW_PUBSUB_H |
5 | #define CEPH_RGW_PUBSUB_H | |
6 | ||
9f95a23c TL |
7 | #include "rgw_sal.h" |
8 | #include "services/svc_sys_obj.h" | |
11fdf7f2 TL |
9 | #include "rgw_tools.h" |
10 | #include "rgw_zone.h" | |
eafe8130 | 11 | #include "rgw_notify_event_type.h" |
9f95a23c | 12 | #include <boost/container/flat_map.hpp> |
11fdf7f2 | 13 | |
eafe8130 TL |
14 | class XMLObj; |
15 | ||
16 | struct rgw_s3_key_filter { | |
17 | std::string prefix_rule; | |
18 | std::string suffix_rule; | |
19 | std::string regex_rule; | |
20 | ||
21 | bool has_content() const; | |
22 | ||
23 | bool decode_xml(XMLObj *obj); | |
24 | void dump_xml(Formatter *f) const; | |
25 | ||
26 | void encode(bufferlist& bl) const { | |
27 | ENCODE_START(1, 1, bl); | |
28 | encode(prefix_rule, bl); | |
29 | encode(suffix_rule, bl); | |
30 | encode(regex_rule, bl); | |
31 | ENCODE_FINISH(bl); | |
32 | } | |
33 | ||
34 | void decode(bufferlist::const_iterator& bl) { | |
35 | DECODE_START(1, bl); | |
36 | decode(prefix_rule, bl); | |
37 | decode(suffix_rule, bl); | |
38 | decode(regex_rule, bl); | |
39 | DECODE_FINISH(bl); | |
40 | } | |
41 | }; | |
42 | WRITE_CLASS_ENCODER(rgw_s3_key_filter) | |
43 | ||
f67539c2 | 44 | using KeyValueMap = boost::container::flat_map<std::string, std::string>; |
eafe8130 | 45 | |
9f95a23c | 46 | struct rgw_s3_key_value_filter { |
f67539c2 | 47 | KeyValueMap kv; |
eafe8130 TL |
48 | |
49 | bool has_content() const; | |
50 | ||
51 | bool decode_xml(XMLObj *obj); | |
52 | void dump_xml(Formatter *f) const; | |
53 | ||
54 | void encode(bufferlist& bl) const { | |
55 | ENCODE_START(1, 1, bl); | |
f67539c2 | 56 | encode(kv, bl); |
eafe8130 TL |
57 | ENCODE_FINISH(bl); |
58 | } | |
59 | void decode(bufferlist::const_iterator& bl) { | |
60 | DECODE_START(1, bl); | |
f67539c2 | 61 | decode(kv, bl); |
eafe8130 TL |
62 | DECODE_FINISH(bl); |
63 | } | |
64 | }; | |
9f95a23c | 65 | WRITE_CLASS_ENCODER(rgw_s3_key_value_filter) |
eafe8130 TL |
66 | |
67 | struct rgw_s3_filter { | |
68 | rgw_s3_key_filter key_filter; | |
9f95a23c TL |
69 | rgw_s3_key_value_filter metadata_filter; |
70 | rgw_s3_key_value_filter tag_filter; | |
eafe8130 TL |
71 | |
72 | bool has_content() const; | |
73 | ||
74 | bool decode_xml(XMLObj *obj); | |
75 | void dump_xml(Formatter *f) const; | |
76 | ||
77 | void encode(bufferlist& bl) const { | |
9f95a23c | 78 | ENCODE_START(2, 1, bl); |
eafe8130 TL |
79 | encode(key_filter, bl); |
80 | encode(metadata_filter, bl); | |
9f95a23c | 81 | encode(tag_filter, bl); |
eafe8130 TL |
82 | ENCODE_FINISH(bl); |
83 | } | |
84 | ||
85 | void decode(bufferlist::const_iterator& bl) { | |
9f95a23c | 86 | DECODE_START(2, bl); |
eafe8130 TL |
87 | decode(key_filter, bl); |
88 | decode(metadata_filter, bl); | |
9f95a23c TL |
89 | if (struct_v >= 2) { |
90 | decode(tag_filter, bl); | |
91 | } | |
eafe8130 TL |
92 | DECODE_FINISH(bl); |
93 | } | |
94 | }; | |
95 | WRITE_CLASS_ENCODER(rgw_s3_filter) | |
96 | ||
97 | using OptionalFilter = std::optional<rgw_s3_filter>; | |
98 | ||
9f95a23c | 99 | struct rgw_pubsub_topic_filter; |
eafe8130 TL |
100 | /* S3 notification configuration |
101 | * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html | |
102 | <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> | |
103 | <TopicConfiguration> | |
104 | <Filter> | |
105 | <S3Key> | |
106 | <FilterRule> | |
107 | <Name>suffix</Name> | |
108 | <Value>jpg</Value> | |
109 | </FilterRule> | |
110 | </S3Key> | |
111 | <S3Metadata> | |
112 | <FilterRule> | |
113 | <Name></Name> | |
114 | <Value></Value> | |
115 | </FilterRule> | |
92f5a8d4 | 116 | </S3Metadata> |
9f95a23c TL |
117 | <S3Tags> |
118 | <FilterRule> | |
119 | <Name></Name> | |
120 | <Value></Value> | |
121 | </FilterRule> | |
122 | </S3Tags> | |
eafe8130 TL |
123 | </Filter> |
124 | <Id>notification1</Id> | |
125 | <Topic>arn:aws:sns:<region>:<account>:<topic></Topic> | |
126 | <Event>s3:ObjectCreated:*</Event> | |
127 | <Event>s3:ObjectRemoved:*</Event> | |
128 | </TopicConfiguration> | |
129 | </NotificationConfiguration> | |
130 | */ | |
131 | struct rgw_pubsub_s3_notification { | |
132 | // notification id | |
133 | std::string id; | |
134 | // types of events | |
135 | rgw::notify::EventTypeList events; | |
136 | // topic ARN | |
137 | std::string topic_arn; | |
138 | // filter rules | |
139 | rgw_s3_filter filter; | |
140 | ||
141 | bool decode_xml(XMLObj *obj); | |
142 | void dump_xml(Formatter *f) const; | |
143 | ||
144 | rgw_pubsub_s3_notification() = default; | |
145 | // construct from rgw_pubsub_topic_filter (used by get/list notifications) | |
9f95a23c | 146 | explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter); |
eafe8130 TL |
147 | }; |
148 | ||
149 | // return true if the key matches the prefix/suffix/regex rules of the key filter | |
150 | bool match(const rgw_s3_key_filter& filter, const std::string& key); | |
9f95a23c | 151 | // return true if the key matches the metadata/tags rules of the metadata/tags filter |
f67539c2 | 152 | bool match(const rgw_s3_key_value_filter& filter, const KeyValueMap& kv); |
eafe8130 TL |
153 | // return true if the event type matches (equal or contained in) one of the events in the list |
154 | bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event); | |
155 | ||
156 | struct rgw_pubsub_s3_notifications { | |
157 | std::list<rgw_pubsub_s3_notification> list; | |
158 | bool decode_xml(XMLObj *obj); | |
159 | void dump_xml(Formatter *f) const; | |
160 | }; | |
161 | ||
162 | /* S3 event records structure | |
163 | * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html | |
164 | { | |
165 | "Records":[ | |
166 | { | |
167 | "eventVersion":"" | |
168 | "eventSource":"", | |
169 | "awsRegion":"", | |
170 | "eventTime":"", | |
171 | "eventName":"", | |
172 | "userIdentity":{ | |
173 | "principalId":"" | |
174 | }, | |
175 | "requestParameters":{ | |
176 | "sourceIPAddress":"" | |
177 | }, | |
178 | "responseElements":{ | |
179 | "x-amz-request-id":"", | |
180 | "x-amz-id-2":"" | |
181 | }, | |
182 | "s3":{ | |
183 | "s3SchemaVersion":"1.0", | |
184 | "configurationId":"", | |
185 | "bucket":{ | |
186 | "name":"", | |
187 | "ownerIdentity":{ | |
188 | "principalId":"" | |
189 | }, | |
190 | "arn":"" | |
191 | "id": "" | |
192 | }, | |
193 | "object":{ | |
194 | "key":"", | |
195 | "size": , | |
196 | "eTag":"", | |
197 | "versionId":"", | |
198 | "sequencer": "", | |
199 | "metadata": "" | |
9f95a23c | 200 | "tags": "" |
eafe8130 TL |
201 | } |
202 | }, | |
203 | "eventId":"", | |
204 | } | |
205 | ] | |
206 | }*/ | |
207 | ||
f67539c2 | 208 | struct rgw_pubsub_s3_event { |
eafe8130 | 209 | constexpr static const char* const json_type_plural = "Records"; |
92f5a8d4 | 210 | std::string eventVersion = "2.2"; |
eafe8130 | 211 | // aws:s3 |
92f5a8d4 | 212 | std::string eventSource = "ceph:s3"; |
eafe8130 TL |
213 | // zonegroup |
214 | std::string awsRegion; | |
215 | // time of the request | |
216 | ceph::real_time eventTime; | |
217 | // type of the event | |
218 | std::string eventName; | |
92f5a8d4 | 219 | // user that sent the request |
eafe8130 TL |
220 | std::string userIdentity; |
221 | // IP address of source of the request (not implemented) | |
222 | std::string sourceIPAddress; | |
223 | // request ID (not implemented) | |
224 | std::string x_amz_request_id; | |
225 | // radosgw that received the request | |
226 | std::string x_amz_id_2; | |
92f5a8d4 | 227 | std::string s3SchemaVersion = "1.0"; |
eafe8130 TL |
228 | // ID received in the notification request |
229 | std::string configurationId; | |
230 | // bucket name | |
231 | std::string bucket_name; | |
92f5a8d4 | 232 | // bucket owner |
eafe8130 TL |
233 | std::string bucket_ownerIdentity; |
234 | // bucket ARN | |
235 | std::string bucket_arn; | |
236 | // object key | |
237 | std::string object_key; | |
92f5a8d4 TL |
238 | // object size |
239 | uint64_t object_size = 0; | |
eafe8130 TL |
240 | // object etag |
241 | std::string object_etag; | |
242 | // object version id bucket is versioned | |
243 | std::string object_versionId; | |
244 | // hexadecimal value used to determine event order for specific key | |
245 | std::string object_sequencer; | |
246 | // this is an rgw extension (not S3 standard) | |
247 | // used to store a globally unique identifier of the event | |
92f5a8d4 | 248 | // that could be used for acking or any other identification of the event |
eafe8130 TL |
249 | std::string id; |
250 | // this is an rgw extension holding the internal bucket id | |
251 | std::string bucket_id; | |
252 | // meta data | |
f67539c2 | 253 | KeyValueMap x_meta_map; |
9f95a23c | 254 | // tags |
f67539c2 | 255 | KeyValueMap tags; |
9f95a23c TL |
256 | // opaque data received from the topic |
257 | // could be used to identify the gateway | |
258 | std::string opaque_data; | |
eafe8130 TL |
259 | |
260 | void encode(bufferlist& bl) const { | |
9f95a23c | 261 | ENCODE_START(4, 1, bl); |
eafe8130 TL |
262 | encode(eventVersion, bl); |
263 | encode(eventSource, bl); | |
264 | encode(awsRegion, bl); | |
265 | encode(eventTime, bl); | |
266 | encode(eventName, bl); | |
267 | encode(userIdentity, bl); | |
268 | encode(sourceIPAddress, bl); | |
269 | encode(x_amz_request_id, bl); | |
270 | encode(x_amz_id_2, bl); | |
271 | encode(s3SchemaVersion, bl); | |
272 | encode(configurationId, bl); | |
273 | encode(bucket_name, bl); | |
274 | encode(bucket_ownerIdentity, bl); | |
275 | encode(bucket_arn, bl); | |
276 | encode(object_key, bl); | |
277 | encode(object_size, bl); | |
278 | encode(object_etag, bl); | |
279 | encode(object_versionId, bl); | |
280 | encode(object_sequencer, bl); | |
281 | encode(id, bl); | |
282 | encode(bucket_id, bl); | |
283 | encode(x_meta_map, bl); | |
9f95a23c TL |
284 | encode(tags, bl); |
285 | encode(opaque_data, bl); | |
eafe8130 TL |
286 | ENCODE_FINISH(bl); |
287 | } | |
288 | ||
289 | void decode(bufferlist::const_iterator& bl) { | |
9f95a23c | 290 | DECODE_START(4, bl); |
eafe8130 TL |
291 | decode(eventVersion, bl); |
292 | decode(eventSource, bl); | |
293 | decode(awsRegion, bl); | |
294 | decode(eventTime, bl); | |
295 | decode(eventName, bl); | |
296 | decode(userIdentity, bl); | |
297 | decode(sourceIPAddress, bl); | |
298 | decode(x_amz_request_id, bl); | |
299 | decode(x_amz_id_2, bl); | |
300 | decode(s3SchemaVersion, bl); | |
301 | decode(configurationId, bl); | |
302 | decode(bucket_name, bl); | |
303 | decode(bucket_ownerIdentity, bl); | |
304 | decode(bucket_arn, bl); | |
305 | decode(object_key, bl); | |
306 | decode(object_size, bl); | |
307 | decode(object_etag, bl); | |
308 | decode(object_versionId, bl); | |
309 | decode(object_sequencer, bl); | |
310 | decode(id, bl); | |
311 | if (struct_v >= 2) { | |
9f95a23c TL |
312 | decode(bucket_id, bl); |
313 | decode(x_meta_map, bl); | |
314 | } | |
315 | if (struct_v >= 3) { | |
316 | decode(tags, bl); | |
317 | } | |
318 | if (struct_v >= 4) { | |
319 | decode(opaque_data, bl); | |
eafe8130 TL |
320 | } |
321 | DECODE_FINISH(bl); | |
322 | } | |
323 | ||
324 | void dump(Formatter *f) const; | |
325 | }; | |
f67539c2 | 326 | WRITE_CLASS_ENCODER(rgw_pubsub_s3_event) |
11fdf7f2 TL |
327 | |
328 | struct rgw_pubsub_event { | |
eafe8130 TL |
329 | constexpr static const char* const json_type_plural = "events"; |
330 | std::string id; | |
331 | std::string event_name; | |
332 | std::string source; | |
11fdf7f2 TL |
333 | ceph::real_time timestamp; |
334 | JSONFormattable info; | |
335 | ||
336 | void encode(bufferlist& bl) const { | |
337 | ENCODE_START(1, 1, bl); | |
338 | encode(id, bl); | |
eafe8130 | 339 | encode(event_name, bl); |
11fdf7f2 TL |
340 | encode(source, bl); |
341 | encode(timestamp, bl); | |
342 | encode(info, bl); | |
343 | ENCODE_FINISH(bl); | |
344 | } | |
345 | ||
346 | void decode(bufferlist::const_iterator& bl) { | |
347 | DECODE_START(1, bl); | |
348 | decode(id, bl); | |
eafe8130 | 349 | decode(event_name, bl); |
11fdf7f2 TL |
350 | decode(source, bl); |
351 | decode(timestamp, bl); | |
352 | decode(info, bl); | |
353 | DECODE_FINISH(bl); | |
354 | } | |
355 | ||
356 | void dump(Formatter *f) const; | |
357 | }; | |
358 | WRITE_CLASS_ENCODER(rgw_pubsub_event) | |
359 | ||
f67539c2 | 360 | // settign a unique ID for an event based on object hash and timestamp |
92f5a8d4 TL |
361 | void set_event_id(std::string& id, const std::string& hash, const utime_t& ts); |
362 | ||
11fdf7f2 | 363 | struct rgw_pubsub_sub_dest { |
eafe8130 TL |
364 | std::string bucket_name; |
365 | std::string oid_prefix; | |
366 | std::string push_endpoint; | |
367 | std::string push_endpoint_args; | |
368 | std::string arn_topic; | |
9f95a23c | 369 | bool stored_secret = false; |
f67539c2 | 370 | bool persistent = false; |
11fdf7f2 TL |
371 | |
372 | void encode(bufferlist& bl) const { | |
f67539c2 | 373 | ENCODE_START(5, 1, bl); |
11fdf7f2 TL |
374 | encode(bucket_name, bl); |
375 | encode(oid_prefix, bl); | |
376 | encode(push_endpoint, bl); | |
377 | encode(push_endpoint_args, bl); | |
eafe8130 | 378 | encode(arn_topic, bl); |
9f95a23c | 379 | encode(stored_secret, bl); |
f67539c2 | 380 | encode(persistent, bl); |
11fdf7f2 TL |
381 | ENCODE_FINISH(bl); |
382 | } | |
383 | ||
384 | void decode(bufferlist::const_iterator& bl) { | |
f67539c2 | 385 | DECODE_START(5, bl); |
11fdf7f2 TL |
386 | decode(bucket_name, bl); |
387 | decode(oid_prefix, bl); | |
388 | decode(push_endpoint, bl); | |
389 | if (struct_v >= 2) { | |
390 | decode(push_endpoint_args, bl); | |
391 | } | |
eafe8130 TL |
392 | if (struct_v >= 3) { |
393 | decode(arn_topic, bl); | |
394 | } | |
9f95a23c TL |
395 | if (struct_v >= 4) { |
396 | decode(stored_secret, bl); | |
397 | } | |
f67539c2 TL |
398 | if (struct_v >= 5) { |
399 | decode(persistent, bl); | |
400 | } | |
11fdf7f2 TL |
401 | DECODE_FINISH(bl); |
402 | } | |
403 | ||
404 | void dump(Formatter *f) const; | |
eafe8130 | 405 | void dump_xml(Formatter *f) const; |
f67539c2 | 406 | std::string to_json_str() const; |
11fdf7f2 TL |
407 | }; |
408 | WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest) | |
409 | ||
410 | struct rgw_pubsub_sub_config { | |
411 | rgw_user user; | |
eafe8130 TL |
412 | std::string name; |
413 | std::string topic; | |
11fdf7f2 | 414 | rgw_pubsub_sub_dest dest; |
eafe8130 | 415 | std::string s3_id; |
11fdf7f2 TL |
416 | |
417 | void encode(bufferlist& bl) const { | |
eafe8130 | 418 | ENCODE_START(2, 1, bl); |
11fdf7f2 TL |
419 | encode(user, bl); |
420 | encode(name, bl); | |
421 | encode(topic, bl); | |
422 | encode(dest, bl); | |
eafe8130 | 423 | encode(s3_id, bl); |
11fdf7f2 TL |
424 | ENCODE_FINISH(bl); |
425 | } | |
426 | ||
427 | void decode(bufferlist::const_iterator& bl) { | |
eafe8130 | 428 | DECODE_START(2, bl); |
11fdf7f2 TL |
429 | decode(user, bl); |
430 | decode(name, bl); | |
431 | decode(topic, bl); | |
432 | decode(dest, bl); | |
eafe8130 TL |
433 | if (struct_v >= 2) { |
434 | decode(s3_id, bl); | |
435 | } | |
11fdf7f2 TL |
436 | DECODE_FINISH(bl); |
437 | } | |
438 | ||
439 | void dump(Formatter *f) const; | |
440 | }; | |
441 | WRITE_CLASS_ENCODER(rgw_pubsub_sub_config) | |
442 | ||
443 | struct rgw_pubsub_topic { | |
444 | rgw_user user; | |
eafe8130 TL |
445 | std::string name; |
446 | rgw_pubsub_sub_dest dest; | |
447 | std::string arn; | |
9f95a23c | 448 | std::string opaque_data; |
11fdf7f2 TL |
449 | |
450 | void encode(bufferlist& bl) const { | |
9f95a23c | 451 | ENCODE_START(3, 1, bl); |
11fdf7f2 TL |
452 | encode(user, bl); |
453 | encode(name, bl); | |
eafe8130 TL |
454 | encode(dest, bl); |
455 | encode(arn, bl); | |
9f95a23c | 456 | encode(opaque_data, bl); |
11fdf7f2 TL |
457 | ENCODE_FINISH(bl); |
458 | } | |
459 | ||
460 | void decode(bufferlist::const_iterator& bl) { | |
9f95a23c | 461 | DECODE_START(3, bl); |
11fdf7f2 TL |
462 | decode(user, bl); |
463 | decode(name, bl); | |
eafe8130 TL |
464 | if (struct_v >= 2) { |
465 | decode(dest, bl); | |
466 | decode(arn, bl); | |
467 | } | |
9f95a23c TL |
468 | if (struct_v >= 3) { |
469 | decode(opaque_data, bl); | |
470 | } | |
11fdf7f2 TL |
471 | DECODE_FINISH(bl); |
472 | } | |
473 | ||
474 | string to_str() const { | |
f67539c2 | 475 | return user.tenant + "/" + name; |
11fdf7f2 TL |
476 | } |
477 | ||
478 | void dump(Formatter *f) const; | |
eafe8130 | 479 | void dump_xml(Formatter *f) const; |
f67539c2 | 480 | void dump_xml_as_attributes(Formatter *f) const; |
11fdf7f2 TL |
481 | |
482 | bool operator<(const rgw_pubsub_topic& t) const { | |
483 | return to_str().compare(t.to_str()); | |
484 | } | |
485 | }; | |
486 | WRITE_CLASS_ENCODER(rgw_pubsub_topic) | |
487 | ||
488 | struct rgw_pubsub_topic_subs { | |
489 | rgw_pubsub_topic topic; | |
eafe8130 | 490 | std::set<std::string> subs; |
11fdf7f2 TL |
491 | |
492 | void encode(bufferlist& bl) const { | |
493 | ENCODE_START(1, 1, bl); | |
494 | encode(topic, bl); | |
495 | encode(subs, bl); | |
496 | ENCODE_FINISH(bl); | |
497 | } | |
498 | ||
499 | void decode(bufferlist::const_iterator& bl) { | |
500 | DECODE_START(1, bl); | |
501 | decode(topic, bl); | |
502 | decode(subs, bl); | |
503 | DECODE_FINISH(bl); | |
504 | } | |
505 | ||
506 | void dump(Formatter *f) const; | |
507 | }; | |
508 | WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs) | |
509 | ||
510 | struct rgw_pubsub_topic_filter { | |
511 | rgw_pubsub_topic topic; | |
eafe8130 TL |
512 | rgw::notify::EventTypeList events; |
513 | std::string s3_id; | |
514 | rgw_s3_filter s3_filter; | |
11fdf7f2 TL |
515 | |
516 | void encode(bufferlist& bl) const { | |
eafe8130 | 517 | ENCODE_START(3, 1, bl); |
11fdf7f2 | 518 | encode(topic, bl); |
eafe8130 TL |
519 | // events are stored as a vector of strings |
520 | std::vector<std::string> tmp_events; | |
521 | const auto converter = s3_id.empty() ? rgw::notify::to_ceph_string : rgw::notify::to_string; | |
522 | std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter); | |
523 | encode(tmp_events, bl); | |
524 | encode(s3_id, bl); | |
525 | encode(s3_filter, bl); | |
11fdf7f2 TL |
526 | ENCODE_FINISH(bl); |
527 | } | |
528 | ||
529 | void decode(bufferlist::const_iterator& bl) { | |
eafe8130 | 530 | DECODE_START(3, bl); |
11fdf7f2 | 531 | decode(topic, bl); |
eafe8130 TL |
532 | // events are stored as a vector of strings |
533 | events.clear(); | |
534 | std::vector<std::string> tmp_events; | |
535 | decode(tmp_events, bl); | |
536 | std::transform(tmp_events.begin(), tmp_events.end(), std::back_inserter(events), rgw::notify::from_string); | |
537 | if (struct_v >= 2) { | |
538 | decode(s3_id, bl); | |
539 | } | |
540 | if (struct_v >= 3) { | |
541 | decode(s3_filter, bl); | |
542 | } | |
11fdf7f2 TL |
543 | DECODE_FINISH(bl); |
544 | } | |
545 | ||
546 | void dump(Formatter *f) const; | |
547 | }; | |
548 | WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter) | |
549 | ||
550 | struct rgw_pubsub_bucket_topics { | |
eafe8130 | 551 | std::map<std::string, rgw_pubsub_topic_filter> topics; |
11fdf7f2 TL |
552 | |
553 | void encode(bufferlist& bl) const { | |
554 | ENCODE_START(1, 1, bl); | |
555 | encode(topics, bl); | |
556 | ENCODE_FINISH(bl); | |
557 | } | |
558 | ||
559 | void decode(bufferlist::const_iterator& bl) { | |
560 | DECODE_START(1, bl); | |
561 | decode(topics, bl); | |
562 | DECODE_FINISH(bl); | |
563 | } | |
564 | ||
565 | void dump(Formatter *f) const; | |
566 | }; | |
567 | WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics) | |
568 | ||
f67539c2 | 569 | struct rgw_pubsub_topics { |
eafe8130 | 570 | std::map<std::string, rgw_pubsub_topic_subs> topics; |
11fdf7f2 TL |
571 | |
572 | void encode(bufferlist& bl) const { | |
573 | ENCODE_START(1, 1, bl); | |
574 | encode(topics, bl); | |
575 | ENCODE_FINISH(bl); | |
576 | } | |
577 | ||
578 | void decode(bufferlist::const_iterator& bl) { | |
579 | DECODE_START(1, bl); | |
580 | decode(topics, bl); | |
581 | DECODE_FINISH(bl); | |
582 | } | |
583 | ||
584 | void dump(Formatter *f) const; | |
eafe8130 | 585 | void dump_xml(Formatter *f) const; |
11fdf7f2 | 586 | }; |
f67539c2 | 587 | WRITE_CLASS_ENCODER(rgw_pubsub_topics) |
11fdf7f2 | 588 | |
f67539c2 | 589 | static std::string pubsub_oid_prefix = "pubsub."; |
11fdf7f2 | 590 | |
f67539c2 | 591 | class RGWPubSub |
11fdf7f2 TL |
592 | { |
593 | friend class Bucket; | |
594 | ||
9f95a23c | 595 | rgw::sal::RGWRadosStore *store; |
f67539c2 | 596 | const std::string tenant; |
11fdf7f2 TL |
597 | RGWSysObjectCtx obj_ctx; |
598 | ||
f67539c2 | 599 | rgw_raw_obj meta_obj; |
11fdf7f2 | 600 | |
f67539c2 TL |
601 | std::string meta_oid() const { |
602 | return pubsub_oid_prefix + tenant; | |
11fdf7f2 TL |
603 | } |
604 | ||
eafe8130 | 605 | std::string bucket_meta_oid(const rgw_bucket& bucket) const { |
f67539c2 | 606 | return pubsub_oid_prefix + tenant + ".bucket." + bucket.name + "/" + bucket.bucket_id; |
11fdf7f2 TL |
607 | } |
608 | ||
eafe8130 | 609 | std::string sub_meta_oid(const string& name) const { |
f67539c2 | 610 | return pubsub_oid_prefix + tenant + ".sub." + name; |
11fdf7f2 TL |
611 | } |
612 | ||
613 | template <class T> | |
f67539c2 | 614 | int read(const rgw_raw_obj& obj, T* data, RGWObjVersionTracker* objv_tracker); |
11fdf7f2 TL |
615 | |
616 | template <class T> | |
b3b6e05e | 617 | int write(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, const T& info, |
f67539c2 | 618 | RGWObjVersionTracker* obj_tracker, optional_yield y); |
11fdf7f2 | 619 | |
b3b6e05e | 620 | int remove(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, RGWObjVersionTracker* objv_tracker, |
f67539c2 | 621 | optional_yield y); |
11fdf7f2 | 622 | |
f67539c2 | 623 | int read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker* objv_tracker); |
b3b6e05e | 624 | int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics, |
f67539c2 | 625 | RGWObjVersionTracker* objv_tracker, optional_yield y); |
eafe8130 | 626 | |
11fdf7f2 | 627 | public: |
f67539c2 | 628 | RGWPubSub(rgw::sal::RGWRadosStore *_store, const std::string& tenant); |
11fdf7f2 TL |
629 | |
630 | class Bucket { | |
f67539c2 TL |
631 | friend class RGWPubSub; |
632 | RGWPubSub *ps; | |
11fdf7f2 TL |
633 | rgw_bucket bucket; |
634 | rgw_raw_obj bucket_meta_obj; | |
635 | ||
eafe8130 TL |
636 | // read the list of topics associated with a bucket and populate into result |
637 | // use version tacker to enforce atomicity between read/write | |
638 | // return 0 on success or if no topic was associated with the bucket, error code otherwise | |
f67539c2 | 639 | int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker* objv_tracker); |
eafe8130 TL |
640 | // set the list of topics associated with a bucket |
641 | // use version tacker to enforce atomicity between read/write | |
642 | // return 0 on success, error code otherwise | |
b3b6e05e | 643 | int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics, |
f67539c2 | 644 | RGWObjVersionTracker* objv_tracker, optional_yield y); |
11fdf7f2 | 645 | public: |
f67539c2 | 646 | Bucket(RGWPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) { |
11fdf7f2 TL |
647 | ps->get_bucket_meta_obj(bucket, &bucket_meta_obj); |
648 | } | |
649 | ||
eafe8130 TL |
650 | // read the list of topics associated with a bucket and populate into result |
651 | // return 0 on success or if no topic was associated with the bucket, error code otherwise | |
11fdf7f2 | 652 | int get_topics(rgw_pubsub_bucket_topics *result); |
9f95a23c | 653 | // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket |
eafe8130 TL |
654 | // assigning a notification name is optional (needed for S3 compatible notifications) |
655 | // if the topic already exist on the bucket, the filter event list may be updated | |
656 | // for S3 compliant notifications the version with: s3_filter and notif_name should be used | |
657 | // return -ENOENT if the topic does not exists | |
658 | // return 0 on success, error code otherwise | |
b3b6e05e TL |
659 | int create_notification(const DoutPrefixProvider *dpp, const string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y); |
660 | int create_notification(const DoutPrefixProvider *dpp, const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y); | |
eafe8130 TL |
661 | // remove a topic and filter from bucket |
662 | // if the topic does not exists on the bucket it is a no-op (considered success) | |
663 | // return -ENOENT if the topic does not exists | |
664 | // return 0 on success, error code otherwise | |
b3b6e05e | 665 | int remove_notification(const DoutPrefixProvider *dpp, const string& topic_name, optional_yield y); |
f67539c2 TL |
666 | // remove all notifications (and autogenerated topics) associated with the bucket |
667 | // return 0 on success or if no topic was associated with the bucket, error code otherwise | |
b3b6e05e | 668 | int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y); |
11fdf7f2 TL |
669 | }; |
670 | ||
eafe8130 | 671 | // base class for subscription |
11fdf7f2 | 672 | class Sub { |
f67539c2 | 673 | friend class RGWPubSub; |
eafe8130 | 674 | protected: |
f67539c2 | 675 | RGWPubSub* const ps; |
eafe8130 | 676 | const std::string sub; |
11fdf7f2 TL |
677 | rgw_raw_obj sub_meta_obj; |
678 | ||
f67539c2 | 679 | int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker* objv_tracker); |
b3b6e05e | 680 | int write_sub(const DoutPrefixProvider *dpp, const rgw_pubsub_sub_config& sub_conf, |
f67539c2 | 681 | RGWObjVersionTracker* objv_tracker, optional_yield y); |
b3b6e05e | 682 | int remove_sub(const DoutPrefixProvider *dpp, RGWObjVersionTracker* objv_tracker, optional_yield y); |
11fdf7f2 | 683 | public: |
f67539c2 | 684 | Sub(RGWPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) { |
11fdf7f2 TL |
685 | ps->get_sub_meta_obj(sub, &sub_meta_obj); |
686 | } | |
687 | ||
eafe8130 TL |
688 | virtual ~Sub() = default; |
689 | ||
b3b6e05e | 690 | int subscribe(const DoutPrefixProvider *dpp, const string& topic_name, const rgw_pubsub_sub_dest& dest, optional_yield y, |
f67539c2 | 691 | const std::string& s3_id=""); |
b3b6e05e | 692 | int unsubscribe(const DoutPrefixProvider *dpp, const string& topic_name, optional_yield y); |
eafe8130 TL |
693 | int get_conf(rgw_pubsub_sub_config* result); |
694 | ||
695 | static const int DEFAULT_MAX_EVENTS = 100; | |
696 | // followint virtual methods should only be called in derived | |
b3b6e05e TL |
697 | virtual int list_events(const DoutPrefixProvider *dpp, const string& marker, int max_events) {ceph_assert(false);} |
698 | virtual int remove_event(const DoutPrefixProvider *dpp, const string& event_id) {ceph_assert(false);} | |
eafe8130 TL |
699 | virtual void dump(Formatter* f) const {ceph_assert(false);} |
700 | }; | |
11fdf7f2 | 701 | |
eafe8130 TL |
702 | // subscription with templated list of events to support both S3 compliant and Ceph specific events |
703 | template<typename EventType> | |
704 | class SubWithEvents : public Sub { | |
705 | private: | |
11fdf7f2 | 706 | struct list_events_result { |
eafe8130 | 707 | std::string next_marker; |
11fdf7f2 | 708 | bool is_truncated{false}; |
11fdf7f2 | 709 | void dump(Formatter *f) const; |
eafe8130 TL |
710 | std::vector<EventType> events; |
711 | } list; | |
11fdf7f2 | 712 | |
eafe8130 | 713 | public: |
f67539c2 | 714 | SubWithEvents(RGWPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {} |
eafe8130 TL |
715 | |
716 | virtual ~SubWithEvents() = default; | |
717 | ||
b3b6e05e TL |
718 | int list_events(const DoutPrefixProvider *dpp, const string& marker, int max_events) override; |
719 | int remove_event(const DoutPrefixProvider *dpp, const string& event_id) override; | |
eafe8130 | 720 | void dump(Formatter* f) const override; |
11fdf7f2 TL |
721 | }; |
722 | ||
723 | using BucketRef = std::shared_ptr<Bucket>; | |
724 | using SubRef = std::shared_ptr<Sub>; | |
725 | ||
726 | BucketRef get_bucket(const rgw_bucket& bucket) { | |
727 | return std::make_shared<Bucket>(this, bucket); | |
728 | } | |
729 | ||
730 | SubRef get_sub(const string& sub) { | |
731 | return std::make_shared<Sub>(this, sub); | |
732 | } | |
eafe8130 TL |
733 | |
734 | SubRef get_sub_with_events(const string& sub) { | |
735 | auto tmpsub = Sub(this, sub); | |
736 | rgw_pubsub_sub_config conf; | |
737 | if (tmpsub.get_conf(&conf) < 0) { | |
738 | return nullptr; | |
739 | } | |
740 | if (conf.s3_id.empty()) { | |
741 | return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub); | |
742 | } | |
f67539c2 | 743 | return std::make_shared<SubWithEvents<rgw_pubsub_s3_event>>(this, sub); |
eafe8130 | 744 | } |
11fdf7f2 | 745 | |
f67539c2 | 746 | void get_meta_obj(rgw_raw_obj *obj) const; |
9f95a23c | 747 | void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const; |
11fdf7f2 | 748 | |
9f95a23c | 749 | void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const; |
11fdf7f2 | 750 | |
f67539c2 | 751 | // get all topics (per tenant, if used)) and populate them into "result" |
eafe8130 | 752 | // return 0 on success or if no topics exist, error code otherwise |
f67539c2 | 753 | int get_topics(rgw_pubsub_topics *result); |
eafe8130 TL |
754 | // get a topic with its subscriptions by its name and populate it into "result" |
755 | // return -ENOENT if the topic does not exists | |
756 | // return 0 on success, error code otherwise | |
11fdf7f2 | 757 | int get_topic(const string& name, rgw_pubsub_topic_subs *result); |
eafe8130 TL |
758 | // get a topic with by its name and populate it into "result" |
759 | // return -ENOENT if the topic does not exists | |
760 | // return 0 on success, error code otherwise | |
761 | int get_topic(const string& name, rgw_pubsub_topic *result); | |
762 | // create a topic with a name only | |
763 | // if the topic already exists it is a no-op (considered success) | |
764 | // return 0 on success, error code otherwise | |
b3b6e05e | 765 | int create_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y); |
eafe8130 TL |
766 | // create a topic with push destination information and ARN |
767 | // if the topic already exists the destination and ARN values may be updated (considered succsess) | |
768 | // return 0 on success, error code otherwise | |
b3b6e05e | 769 | int create_topic(const DoutPrefixProvider *dpp, const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y); |
eafe8130 TL |
770 | // remove a topic according to its name |
771 | // if the topic does not exists it is a no-op (considered success) | |
772 | // return 0 on success, error code otherwise | |
b3b6e05e | 773 | int remove_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y); |
11fdf7f2 TL |
774 | }; |
775 | ||
9f95a23c | 776 | |
11fdf7f2 | 777 | template <class T> |
f67539c2 | 778 | int RGWPubSub::read(const rgw_raw_obj& obj, T* result, RGWObjVersionTracker* objv_tracker) |
11fdf7f2 TL |
779 | { |
780 | bufferlist bl; | |
9f95a23c | 781 | int ret = rgw_get_system_obj(obj_ctx, |
11fdf7f2 TL |
782 | obj.pool, obj.oid, |
783 | bl, | |
784 | objv_tracker, | |
9f95a23c | 785 | nullptr, null_yield, nullptr, nullptr); |
11fdf7f2 TL |
786 | if (ret < 0) { |
787 | return ret; | |
788 | } | |
789 | ||
790 | auto iter = bl.cbegin(); | |
791 | try { | |
792 | decode(*result, iter); | |
793 | } catch (buffer::error& err) { | |
794 | return -EIO; | |
795 | } | |
796 | ||
797 | return 0; | |
798 | } | |
799 | ||
800 | template <class T> | |
b3b6e05e | 801 | int RGWPubSub::write(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, const T& info, |
f67539c2 | 802 | RGWObjVersionTracker* objv_tracker, optional_yield y) |
11fdf7f2 TL |
803 | { |
804 | bufferlist bl; | |
805 | encode(info, bl); | |
806 | ||
b3b6e05e | 807 | int ret = rgw_put_system_obj(dpp, obj_ctx, obj.pool, obj.oid, |
f67539c2 TL |
808 | bl, false, objv_tracker, |
809 | real_time(), y); | |
11fdf7f2 TL |
810 | if (ret < 0) { |
811 | return ret; | |
812 | } | |
813 | ||
9f95a23c | 814 | obj_ctx.invalidate(obj); |
11fdf7f2 TL |
815 | return 0; |
816 | } | |
817 | ||
818 | #endif |