]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
11fdf7f2 TL |
4 | #ifndef CEPH_RGW_PUBSUB_H |
5 | #define CEPH_RGW_PUBSUB_H | |
6 | ||
9f95a23c TL |
7 | #include "rgw_sal.h" |
8 | #include "services/svc_sys_obj.h" | |
11fdf7f2 TL |
9 | #include "rgw_tools.h" |
10 | #include "rgw_zone.h" | |
eafe8130 TL |
11 | #include "rgw_rados.h" |
12 | #include "rgw_notify_event_type.h" | |
9f95a23c | 13 | #include <boost/container/flat_map.hpp> |
11fdf7f2 | 14 | |
eafe8130 TL |
15 | class XMLObj; |
16 | ||
17 | struct rgw_s3_key_filter { | |
18 | std::string prefix_rule; | |
19 | std::string suffix_rule; | |
20 | std::string regex_rule; | |
21 | ||
22 | bool has_content() const; | |
23 | ||
24 | bool decode_xml(XMLObj *obj); | |
25 | void dump_xml(Formatter *f) const; | |
26 | ||
27 | void encode(bufferlist& bl) const { | |
28 | ENCODE_START(1, 1, bl); | |
29 | encode(prefix_rule, bl); | |
30 | encode(suffix_rule, bl); | |
31 | encode(regex_rule, bl); | |
32 | ENCODE_FINISH(bl); | |
33 | } | |
34 | ||
35 | void decode(bufferlist::const_iterator& bl) { | |
36 | DECODE_START(1, bl); | |
37 | decode(prefix_rule, bl); | |
38 | decode(suffix_rule, bl); | |
39 | decode(regex_rule, bl); | |
40 | DECODE_FINISH(bl); | |
41 | } | |
42 | }; | |
43 | WRITE_CLASS_ENCODER(rgw_s3_key_filter) | |
44 | ||
9f95a23c | 45 | using KeyValueList = boost::container::flat_map<std::string, std::string>; |
eafe8130 | 46 | |
9f95a23c TL |
47 | struct rgw_s3_key_value_filter { |
48 | KeyValueList kvl; | |
eafe8130 TL |
49 | |
50 | bool has_content() const; | |
51 | ||
52 | bool decode_xml(XMLObj *obj); | |
53 | void dump_xml(Formatter *f) const; | |
54 | ||
55 | void encode(bufferlist& bl) const { | |
56 | ENCODE_START(1, 1, bl); | |
9f95a23c | 57 | encode(kvl, bl); |
eafe8130 TL |
58 | ENCODE_FINISH(bl); |
59 | } | |
60 | void decode(bufferlist::const_iterator& bl) { | |
61 | DECODE_START(1, bl); | |
9f95a23c | 62 | decode(kvl, bl); |
eafe8130 TL |
63 | DECODE_FINISH(bl); |
64 | } | |
65 | }; | |
9f95a23c | 66 | WRITE_CLASS_ENCODER(rgw_s3_key_value_filter) |
eafe8130 TL |
67 | |
68 | struct rgw_s3_filter { | |
69 | rgw_s3_key_filter key_filter; | |
9f95a23c TL |
70 | rgw_s3_key_value_filter metadata_filter; |
71 | rgw_s3_key_value_filter tag_filter; | |
eafe8130 TL |
72 | |
73 | bool has_content() const; | |
74 | ||
75 | bool decode_xml(XMLObj *obj); | |
76 | void dump_xml(Formatter *f) const; | |
77 | ||
78 | void encode(bufferlist& bl) const { | |
9f95a23c | 79 | ENCODE_START(2, 1, bl); |
eafe8130 TL |
80 | encode(key_filter, bl); |
81 | encode(metadata_filter, bl); | |
9f95a23c | 82 | encode(tag_filter, bl); |
eafe8130 TL |
83 | ENCODE_FINISH(bl); |
84 | } | |
85 | ||
86 | void decode(bufferlist::const_iterator& bl) { | |
9f95a23c | 87 | DECODE_START(2, bl); |
eafe8130 TL |
88 | decode(key_filter, bl); |
89 | decode(metadata_filter, bl); | |
9f95a23c TL |
90 | if (struct_v >= 2) { |
91 | decode(tag_filter, bl); | |
92 | } | |
eafe8130 TL |
93 | DECODE_FINISH(bl); |
94 | } | |
95 | }; | |
96 | WRITE_CLASS_ENCODER(rgw_s3_filter) | |
97 | ||
98 | using OptionalFilter = std::optional<rgw_s3_filter>; | |
99 | ||
9f95a23c | 100 | struct rgw_pubsub_topic_filter; |
eafe8130 TL |
101 | /* S3 notification configuration |
102 | * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html | |
103 | <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> | |
104 | <TopicConfiguration> | |
105 | <Filter> | |
106 | <S3Key> | |
107 | <FilterRule> | |
108 | <Name>suffix</Name> | |
109 | <Value>jpg</Value> | |
110 | </FilterRule> | |
111 | </S3Key> | |
112 | <S3Metadata> | |
113 | <FilterRule> | |
114 | <Name></Name> | |
115 | <Value></Value> | |
116 | </FilterRule> | |
92f5a8d4 | 117 | </S3Metadata> |
9f95a23c TL |
118 | <S3Tags> |
119 | <FilterRule> | |
120 | <Name></Name> | |
121 | <Value></Value> | |
122 | </FilterRule> | |
123 | </S3Tags> | |
eafe8130 TL |
124 | </Filter> |
125 | <Id>notification1</Id> | |
126 | <Topic>arn:aws:sns:<region>:<account>:<topic></Topic> | |
127 | <Event>s3:ObjectCreated:*</Event> | |
128 | <Event>s3:ObjectRemoved:*</Event> | |
129 | </TopicConfiguration> | |
130 | </NotificationConfiguration> | |
131 | */ | |
132 | struct rgw_pubsub_s3_notification { | |
133 | // notification id | |
134 | std::string id; | |
135 | // types of events | |
136 | rgw::notify::EventTypeList events; | |
137 | // topic ARN | |
138 | std::string topic_arn; | |
139 | // filter rules | |
140 | rgw_s3_filter filter; | |
141 | ||
142 | bool decode_xml(XMLObj *obj); | |
143 | void dump_xml(Formatter *f) const; | |
144 | ||
145 | rgw_pubsub_s3_notification() = default; | |
146 | // construct from rgw_pubsub_topic_filter (used by get/list notifications) | |
9f95a23c | 147 | explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter); |
eafe8130 TL |
148 | }; |
149 | ||
150 | // return true if the key matches the prefix/suffix/regex rules of the key filter | |
151 | bool match(const rgw_s3_key_filter& filter, const std::string& key); | |
9f95a23c TL |
152 | // return true if the key matches the metadata/tags rules of the metadata/tags filter |
153 | bool match(const rgw_s3_key_value_filter& filter, const KeyValueList& kvl); | |
eafe8130 TL |
154 | // return true if the event type matches (equal or contained in) one of the events in the list |
155 | bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event); | |
156 | ||
157 | struct rgw_pubsub_s3_notifications { | |
158 | std::list<rgw_pubsub_s3_notification> list; | |
159 | bool decode_xml(XMLObj *obj); | |
160 | void dump_xml(Formatter *f) const; | |
161 | }; | |
162 | ||
163 | /* S3 event records structure | |
164 | * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html | |
165 | { | |
166 | "Records":[ | |
167 | { | |
168 | "eventVersion":"" | |
169 | "eventSource":"", | |
170 | "awsRegion":"", | |
171 | "eventTime":"", | |
172 | "eventName":"", | |
173 | "userIdentity":{ | |
174 | "principalId":"" | |
175 | }, | |
176 | "requestParameters":{ | |
177 | "sourceIPAddress":"" | |
178 | }, | |
179 | "responseElements":{ | |
180 | "x-amz-request-id":"", | |
181 | "x-amz-id-2":"" | |
182 | }, | |
183 | "s3":{ | |
184 | "s3SchemaVersion":"1.0", | |
185 | "configurationId":"", | |
186 | "bucket":{ | |
187 | "name":"", | |
188 | "ownerIdentity":{ | |
189 | "principalId":"" | |
190 | }, | |
191 | "arn":"" | |
192 | "id": "" | |
193 | }, | |
194 | "object":{ | |
195 | "key":"", | |
196 | "size": , | |
197 | "eTag":"", | |
198 | "versionId":"", | |
199 | "sequencer": "", | |
200 | "metadata": "" | |
9f95a23c | 201 | "tags": "" |
eafe8130 TL |
202 | } |
203 | }, | |
204 | "eventId":"", | |
205 | } | |
206 | ] | |
207 | }*/ | |
208 | ||
209 | struct rgw_pubsub_s3_record { | |
eafe8130 | 210 | constexpr static const char* const json_type_plural = "Records"; |
92f5a8d4 | 211 | std::string eventVersion = "2.2"; |
eafe8130 | 212 | // aws:s3 |
92f5a8d4 | 213 | std::string eventSource = "ceph:s3"; |
eafe8130 TL |
214 | // zonegroup |
215 | std::string awsRegion; | |
216 | // time of the request | |
217 | ceph::real_time eventTime; | |
218 | // type of the event | |
219 | std::string eventName; | |
92f5a8d4 | 220 | // user that sent the request |
eafe8130 TL |
221 | std::string userIdentity; |
222 | // IP address of source of the request (not implemented) | |
223 | std::string sourceIPAddress; | |
224 | // request ID (not implemented) | |
225 | std::string x_amz_request_id; | |
226 | // radosgw that received the request | |
227 | std::string x_amz_id_2; | |
92f5a8d4 | 228 | std::string s3SchemaVersion = "1.0"; |
eafe8130 TL |
229 | // ID received in the notification request |
230 | std::string configurationId; | |
231 | // bucket name | |
232 | std::string bucket_name; | |
92f5a8d4 | 233 | // bucket owner |
eafe8130 TL |
234 | std::string bucket_ownerIdentity; |
235 | // bucket ARN | |
236 | std::string bucket_arn; | |
237 | // object key | |
238 | std::string object_key; | |
92f5a8d4 TL |
239 | // object size |
240 | uint64_t object_size = 0; | |
eafe8130 TL |
241 | // object etag |
242 | std::string object_etag; | |
243 | // object version id bucket is versioned | |
244 | std::string object_versionId; | |
245 | // hexadecimal value used to determine event order for specific key | |
246 | std::string object_sequencer; | |
247 | // this is an rgw extension (not S3 standard) | |
248 | // used to store a globally unique identifier of the event | |
92f5a8d4 | 249 | // that could be used for acking or any other identification of the event |
eafe8130 TL |
250 | std::string id; |
251 | // this is an rgw extension holding the internal bucket id | |
252 | std::string bucket_id; | |
253 | // meta data | |
9f95a23c TL |
254 | KeyValueList x_meta_map; |
255 | // tags | |
256 | KeyValueList tags; | |
257 | // opaque data received from the topic | |
258 | // could be used to identify the gateway | |
259 | std::string opaque_data; | |
eafe8130 TL |
260 | |
261 | void encode(bufferlist& bl) const { | |
9f95a23c | 262 | ENCODE_START(4, 1, bl); |
eafe8130 TL |
263 | encode(eventVersion, bl); |
264 | encode(eventSource, bl); | |
265 | encode(awsRegion, bl); | |
266 | encode(eventTime, bl); | |
267 | encode(eventName, bl); | |
268 | encode(userIdentity, bl); | |
269 | encode(sourceIPAddress, bl); | |
270 | encode(x_amz_request_id, bl); | |
271 | encode(x_amz_id_2, bl); | |
272 | encode(s3SchemaVersion, bl); | |
273 | encode(configurationId, bl); | |
274 | encode(bucket_name, bl); | |
275 | encode(bucket_ownerIdentity, bl); | |
276 | encode(bucket_arn, bl); | |
277 | encode(object_key, bl); | |
278 | encode(object_size, bl); | |
279 | encode(object_etag, bl); | |
280 | encode(object_versionId, bl); | |
281 | encode(object_sequencer, bl); | |
282 | encode(id, bl); | |
283 | encode(bucket_id, bl); | |
284 | encode(x_meta_map, bl); | |
9f95a23c TL |
285 | encode(tags, bl); |
286 | encode(opaque_data, bl); | |
eafe8130 TL |
287 | ENCODE_FINISH(bl); |
288 | } | |
289 | ||
290 | void decode(bufferlist::const_iterator& bl) { | |
9f95a23c | 291 | DECODE_START(4, bl); |
eafe8130 TL |
292 | decode(eventVersion, bl); |
293 | decode(eventSource, bl); | |
294 | decode(awsRegion, bl); | |
295 | decode(eventTime, bl); | |
296 | decode(eventName, bl); | |
297 | decode(userIdentity, bl); | |
298 | decode(sourceIPAddress, bl); | |
299 | decode(x_amz_request_id, bl); | |
300 | decode(x_amz_id_2, bl); | |
301 | decode(s3SchemaVersion, bl); | |
302 | decode(configurationId, bl); | |
303 | decode(bucket_name, bl); | |
304 | decode(bucket_ownerIdentity, bl); | |
305 | decode(bucket_arn, bl); | |
306 | decode(object_key, bl); | |
307 | decode(object_size, bl); | |
308 | decode(object_etag, bl); | |
309 | decode(object_versionId, bl); | |
310 | decode(object_sequencer, bl); | |
311 | decode(id, bl); | |
312 | if (struct_v >= 2) { | |
9f95a23c TL |
313 | decode(bucket_id, bl); |
314 | decode(x_meta_map, bl); | |
315 | } | |
316 | if (struct_v >= 3) { | |
317 | decode(tags, bl); | |
318 | } | |
319 | if (struct_v >= 4) { | |
320 | decode(opaque_data, bl); | |
eafe8130 TL |
321 | } |
322 | DECODE_FINISH(bl); | |
323 | } | |
324 | ||
325 | void dump(Formatter *f) const; | |
326 | }; | |
327 | WRITE_CLASS_ENCODER(rgw_pubsub_s3_record) | |
11fdf7f2 TL |
328 | |
329 | struct rgw_pubsub_event { | |
eafe8130 TL |
330 | constexpr static const char* const json_type_plural = "events"; |
331 | std::string id; | |
332 | std::string event_name; | |
333 | std::string source; | |
11fdf7f2 TL |
334 | ceph::real_time timestamp; |
335 | JSONFormattable info; | |
336 | ||
337 | void encode(bufferlist& bl) const { | |
338 | ENCODE_START(1, 1, bl); | |
339 | encode(id, bl); | |
eafe8130 | 340 | encode(event_name, bl); |
11fdf7f2 TL |
341 | encode(source, bl); |
342 | encode(timestamp, bl); | |
343 | encode(info, bl); | |
344 | ENCODE_FINISH(bl); | |
345 | } | |
346 | ||
347 | void decode(bufferlist::const_iterator& bl) { | |
348 | DECODE_START(1, bl); | |
349 | decode(id, bl); | |
eafe8130 | 350 | decode(event_name, bl); |
11fdf7f2 TL |
351 | decode(source, bl); |
352 | decode(timestamp, bl); | |
353 | decode(info, bl); | |
354 | DECODE_FINISH(bl); | |
355 | } | |
356 | ||
357 | void dump(Formatter *f) const; | |
358 | }; | |
359 | WRITE_CLASS_ENCODER(rgw_pubsub_event) | |
360 | ||
92f5a8d4 TL |
361 | // settign a unique ID for an event/record based on object hash and timestamp |
362 | void set_event_id(std::string& id, const std::string& hash, const utime_t& ts); | |
363 | ||
11fdf7f2 | 364 | struct rgw_pubsub_sub_dest { |
eafe8130 TL |
365 | std::string bucket_name; |
366 | std::string oid_prefix; | |
367 | std::string push_endpoint; | |
368 | std::string push_endpoint_args; | |
369 | std::string arn_topic; | |
9f95a23c | 370 | bool stored_secret = false; |
11fdf7f2 TL |
371 | |
372 | void encode(bufferlist& bl) const { | |
9f95a23c | 373 | ENCODE_START(4, 1, bl); |
11fdf7f2 TL |
374 | encode(bucket_name, bl); |
375 | encode(oid_prefix, bl); | |
376 | encode(push_endpoint, bl); | |
377 | encode(push_endpoint_args, bl); | |
eafe8130 | 378 | encode(arn_topic, bl); |
9f95a23c | 379 | encode(stored_secret, bl); |
11fdf7f2 TL |
380 | ENCODE_FINISH(bl); |
381 | } | |
382 | ||
383 | void decode(bufferlist::const_iterator& bl) { | |
9f95a23c | 384 | DECODE_START(4, bl); |
11fdf7f2 TL |
385 | decode(bucket_name, bl); |
386 | decode(oid_prefix, bl); | |
387 | decode(push_endpoint, bl); | |
388 | if (struct_v >= 2) { | |
389 | decode(push_endpoint_args, bl); | |
390 | } | |
eafe8130 TL |
391 | if (struct_v >= 3) { |
392 | decode(arn_topic, bl); | |
393 | } | |
9f95a23c TL |
394 | if (struct_v >= 4) { |
395 | decode(stored_secret, bl); | |
396 | } | |
11fdf7f2 TL |
397 | DECODE_FINISH(bl); |
398 | } | |
399 | ||
400 | void dump(Formatter *f) const; | |
eafe8130 | 401 | void dump_xml(Formatter *f) const; |
11fdf7f2 TL |
402 | }; |
403 | WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest) | |
404 | ||
405 | struct rgw_pubsub_sub_config { | |
406 | rgw_user user; | |
eafe8130 TL |
407 | std::string name; |
408 | std::string topic; | |
11fdf7f2 | 409 | rgw_pubsub_sub_dest dest; |
eafe8130 | 410 | std::string s3_id; |
11fdf7f2 TL |
411 | |
412 | void encode(bufferlist& bl) const { | |
eafe8130 | 413 | ENCODE_START(2, 1, bl); |
11fdf7f2 TL |
414 | encode(user, bl); |
415 | encode(name, bl); | |
416 | encode(topic, bl); | |
417 | encode(dest, bl); | |
eafe8130 | 418 | encode(s3_id, bl); |
11fdf7f2 TL |
419 | ENCODE_FINISH(bl); |
420 | } | |
421 | ||
422 | void decode(bufferlist::const_iterator& bl) { | |
eafe8130 | 423 | DECODE_START(2, bl); |
11fdf7f2 TL |
424 | decode(user, bl); |
425 | decode(name, bl); | |
426 | decode(topic, bl); | |
427 | decode(dest, bl); | |
eafe8130 TL |
428 | if (struct_v >= 2) { |
429 | decode(s3_id, bl); | |
430 | } | |
11fdf7f2 TL |
431 | DECODE_FINISH(bl); |
432 | } | |
433 | ||
434 | void dump(Formatter *f) const; | |
435 | }; | |
436 | WRITE_CLASS_ENCODER(rgw_pubsub_sub_config) | |
437 | ||
438 | struct rgw_pubsub_topic { | |
439 | rgw_user user; | |
eafe8130 TL |
440 | std::string name; |
441 | rgw_pubsub_sub_dest dest; | |
442 | std::string arn; | |
9f95a23c | 443 | std::string opaque_data; |
11fdf7f2 TL |
444 | |
445 | void encode(bufferlist& bl) const { | |
9f95a23c | 446 | ENCODE_START(3, 1, bl); |
11fdf7f2 TL |
447 | encode(user, bl); |
448 | encode(name, bl); | |
eafe8130 TL |
449 | encode(dest, bl); |
450 | encode(arn, bl); | |
9f95a23c | 451 | encode(opaque_data, bl); |
11fdf7f2 TL |
452 | ENCODE_FINISH(bl); |
453 | } | |
454 | ||
455 | void decode(bufferlist::const_iterator& bl) { | |
9f95a23c | 456 | DECODE_START(3, bl); |
11fdf7f2 TL |
457 | decode(user, bl); |
458 | decode(name, bl); | |
eafe8130 TL |
459 | if (struct_v >= 2) { |
460 | decode(dest, bl); | |
461 | decode(arn, bl); | |
462 | } | |
9f95a23c TL |
463 | if (struct_v >= 3) { |
464 | decode(opaque_data, bl); | |
465 | } | |
11fdf7f2 TL |
466 | DECODE_FINISH(bl); |
467 | } | |
468 | ||
469 | string to_str() const { | |
470 | return user.to_str() + "/" + name; | |
471 | } | |
472 | ||
473 | void dump(Formatter *f) const; | |
eafe8130 | 474 | void dump_xml(Formatter *f) const; |
11fdf7f2 TL |
475 | |
476 | bool operator<(const rgw_pubsub_topic& t) const { | |
477 | return to_str().compare(t.to_str()); | |
478 | } | |
479 | }; | |
480 | WRITE_CLASS_ENCODER(rgw_pubsub_topic) | |
481 | ||
482 | struct rgw_pubsub_topic_subs { | |
483 | rgw_pubsub_topic topic; | |
eafe8130 | 484 | std::set<std::string> subs; |
11fdf7f2 TL |
485 | |
486 | void encode(bufferlist& bl) const { | |
487 | ENCODE_START(1, 1, bl); | |
488 | encode(topic, bl); | |
489 | encode(subs, bl); | |
490 | ENCODE_FINISH(bl); | |
491 | } | |
492 | ||
493 | void decode(bufferlist::const_iterator& bl) { | |
494 | DECODE_START(1, bl); | |
495 | decode(topic, bl); | |
496 | decode(subs, bl); | |
497 | DECODE_FINISH(bl); | |
498 | } | |
499 | ||
500 | void dump(Formatter *f) const; | |
501 | }; | |
502 | WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs) | |
503 | ||
504 | struct rgw_pubsub_topic_filter { | |
505 | rgw_pubsub_topic topic; | |
eafe8130 TL |
506 | rgw::notify::EventTypeList events; |
507 | std::string s3_id; | |
508 | rgw_s3_filter s3_filter; | |
11fdf7f2 TL |
509 | |
510 | void encode(bufferlist& bl) const { | |
eafe8130 | 511 | ENCODE_START(3, 1, bl); |
11fdf7f2 | 512 | encode(topic, bl); |
eafe8130 TL |
513 | // events are stored as a vector of strings |
514 | std::vector<std::string> tmp_events; | |
515 | const auto converter = s3_id.empty() ? rgw::notify::to_ceph_string : rgw::notify::to_string; | |
516 | std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter); | |
517 | encode(tmp_events, bl); | |
518 | encode(s3_id, bl); | |
519 | encode(s3_filter, bl); | |
11fdf7f2 TL |
520 | ENCODE_FINISH(bl); |
521 | } | |
522 | ||
523 | void decode(bufferlist::const_iterator& bl) { | |
eafe8130 | 524 | DECODE_START(3, bl); |
11fdf7f2 | 525 | decode(topic, bl); |
eafe8130 TL |
526 | // events are stored as a vector of strings |
527 | events.clear(); | |
528 | std::vector<std::string> tmp_events; | |
529 | decode(tmp_events, bl); | |
530 | std::transform(tmp_events.begin(), tmp_events.end(), std::back_inserter(events), rgw::notify::from_string); | |
531 | if (struct_v >= 2) { | |
532 | decode(s3_id, bl); | |
533 | } | |
534 | if (struct_v >= 3) { | |
535 | decode(s3_filter, bl); | |
536 | } | |
11fdf7f2 TL |
537 | DECODE_FINISH(bl); |
538 | } | |
539 | ||
540 | void dump(Formatter *f) const; | |
541 | }; | |
542 | WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter) | |
543 | ||
544 | struct rgw_pubsub_bucket_topics { | |
eafe8130 | 545 | std::map<std::string, rgw_pubsub_topic_filter> topics; |
11fdf7f2 TL |
546 | |
547 | void encode(bufferlist& bl) const { | |
548 | ENCODE_START(1, 1, bl); | |
549 | encode(topics, bl); | |
550 | ENCODE_FINISH(bl); | |
551 | } | |
552 | ||
553 | void decode(bufferlist::const_iterator& bl) { | |
554 | DECODE_START(1, bl); | |
555 | decode(topics, bl); | |
556 | DECODE_FINISH(bl); | |
557 | } | |
558 | ||
559 | void dump(Formatter *f) const; | |
560 | }; | |
561 | WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics) | |
562 | ||
563 | struct rgw_pubsub_user_topics { | |
eafe8130 | 564 | std::map<std::string, rgw_pubsub_topic_subs> topics; |
11fdf7f2 TL |
565 | |
566 | void encode(bufferlist& bl) const { | |
567 | ENCODE_START(1, 1, bl); | |
568 | encode(topics, bl); | |
569 | ENCODE_FINISH(bl); | |
570 | } | |
571 | ||
572 | void decode(bufferlist::const_iterator& bl) { | |
573 | DECODE_START(1, bl); | |
574 | decode(topics, bl); | |
575 | DECODE_FINISH(bl); | |
576 | } | |
577 | ||
578 | void dump(Formatter *f) const; | |
eafe8130 | 579 | void dump_xml(Formatter *f) const; |
11fdf7f2 TL |
580 | }; |
581 | WRITE_CLASS_ENCODER(rgw_pubsub_user_topics) | |
582 | ||
eafe8130 | 583 | static std::string pubsub_user_oid_prefix = "pubsub.user."; |
11fdf7f2 TL |
584 | |
585 | class RGWUserPubSub | |
586 | { | |
587 | friend class Bucket; | |
588 | ||
9f95a23c | 589 | rgw::sal::RGWRadosStore *store; |
11fdf7f2 TL |
590 | rgw_user user; |
591 | RGWSysObjectCtx obj_ctx; | |
592 | ||
593 | rgw_raw_obj user_meta_obj; | |
594 | ||
eafe8130 | 595 | std::string user_meta_oid() const { |
11fdf7f2 TL |
596 | return pubsub_user_oid_prefix + user.to_str(); |
597 | } | |
598 | ||
eafe8130 | 599 | std::string bucket_meta_oid(const rgw_bucket& bucket) const { |
11fdf7f2 TL |
600 | return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id; |
601 | } | |
602 | ||
eafe8130 | 603 | std::string sub_meta_oid(const string& name) const { |
11fdf7f2 TL |
604 | return pubsub_user_oid_prefix + user.to_str() + ".sub." + name; |
605 | } | |
606 | ||
607 | template <class T> | |
608 | int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker); | |
609 | ||
610 | template <class T> | |
611 | int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker); | |
612 | ||
613 | int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker); | |
614 | ||
615 | int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker); | |
616 | int write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker); | |
eafe8130 | 617 | |
11fdf7f2 | 618 | public: |
9f95a23c | 619 | RGWUserPubSub(rgw::sal::RGWRadosStore *_store, const rgw_user& _user); |
11fdf7f2 TL |
620 | |
621 | class Bucket { | |
622 | friend class RGWUserPubSub; | |
623 | RGWUserPubSub *ps; | |
624 | rgw_bucket bucket; | |
625 | rgw_raw_obj bucket_meta_obj; | |
626 | ||
eafe8130 TL |
627 | // read the list of topics associated with a bucket and populate into result |
628 | // use version tacker to enforce atomicity between read/write | |
629 | // return 0 on success or if no topic was associated with the bucket, error code otherwise | |
11fdf7f2 | 630 | int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker); |
eafe8130 TL |
631 | // set the list of topics associated with a bucket |
632 | // use version tacker to enforce atomicity between read/write | |
633 | // return 0 on success, error code otherwise | |
11fdf7f2 TL |
634 | int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker); |
635 | public: | |
636 | Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) { | |
637 | ps->get_bucket_meta_obj(bucket, &bucket_meta_obj); | |
638 | } | |
639 | ||
eafe8130 TL |
640 | // read the list of topics associated with a bucket and populate into result |
641 | // return 0 on success or if no topic was associated with the bucket, error code otherwise | |
11fdf7f2 | 642 | int get_topics(rgw_pubsub_bucket_topics *result); |
9f95a23c | 643 | // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket |
eafe8130 TL |
644 | // assigning a notification name is optional (needed for S3 compatible notifications) |
645 | // if the topic already exist on the bucket, the filter event list may be updated | |
646 | // for S3 compliant notifications the version with: s3_filter and notif_name should be used | |
647 | // return -ENOENT if the topic does not exists | |
648 | // return 0 on success, error code otherwise | |
649 | int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events); | |
650 | int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name); | |
651 | // remove a topic and filter from bucket | |
652 | // if the topic does not exists on the bucket it is a no-op (considered success) | |
653 | // return -ENOENT if the topic does not exists | |
654 | // return 0 on success, error code otherwise | |
11fdf7f2 TL |
655 | int remove_notification(const string& topic_name); |
656 | }; | |
657 | ||
eafe8130 | 658 | // base class for subscription |
11fdf7f2 TL |
659 | class Sub { |
660 | friend class RGWUserPubSub; | |
eafe8130 TL |
661 | protected: |
662 | RGWUserPubSub* const ps; | |
663 | const std::string sub; | |
11fdf7f2 TL |
664 | rgw_raw_obj sub_meta_obj; |
665 | ||
666 | int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker); | |
667 | int write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker); | |
668 | int remove_sub(RGWObjVersionTracker *objv_tracker); | |
669 | public: | |
eafe8130 | 670 | Sub(RGWUserPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) { |
11fdf7f2 TL |
671 | ps->get_sub_meta_obj(sub, &sub_meta_obj); |
672 | } | |
673 | ||
eafe8130 TL |
674 | virtual ~Sub() = default; |
675 | ||
676 | int subscribe(const string& topic_name, const rgw_pubsub_sub_dest& dest, const std::string& s3_id=""); | |
11fdf7f2 | 677 | int unsubscribe(const string& topic_name); |
eafe8130 TL |
678 | int get_conf(rgw_pubsub_sub_config* result); |
679 | ||
680 | static const int DEFAULT_MAX_EVENTS = 100; | |
681 | // followint virtual methods should only be called in derived | |
682 | virtual int list_events(const string& marker, int max_events) {ceph_assert(false);} | |
683 | virtual int remove_event(const string& event_id) {ceph_assert(false);} | |
684 | virtual void dump(Formatter* f) const {ceph_assert(false);} | |
685 | }; | |
11fdf7f2 | 686 | |
eafe8130 TL |
687 | // subscription with templated list of events to support both S3 compliant and Ceph specific events |
688 | template<typename EventType> | |
689 | class SubWithEvents : public Sub { | |
690 | private: | |
11fdf7f2 | 691 | struct list_events_result { |
eafe8130 | 692 | std::string next_marker; |
11fdf7f2 | 693 | bool is_truncated{false}; |
11fdf7f2 | 694 | void dump(Formatter *f) const; |
eafe8130 TL |
695 | std::vector<EventType> events; |
696 | } list; | |
11fdf7f2 | 697 | |
eafe8130 TL |
698 | public: |
699 | SubWithEvents(RGWUserPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {} | |
700 | ||
701 | virtual ~SubWithEvents() = default; | |
702 | ||
703 | int list_events(const string& marker, int max_events) override; | |
704 | int remove_event(const string& event_id) override; | |
705 | void dump(Formatter* f) const override; | |
11fdf7f2 TL |
706 | }; |
707 | ||
708 | using BucketRef = std::shared_ptr<Bucket>; | |
709 | using SubRef = std::shared_ptr<Sub>; | |
710 | ||
711 | BucketRef get_bucket(const rgw_bucket& bucket) { | |
712 | return std::make_shared<Bucket>(this, bucket); | |
713 | } | |
714 | ||
715 | SubRef get_sub(const string& sub) { | |
716 | return std::make_shared<Sub>(this, sub); | |
717 | } | |
eafe8130 TL |
718 | |
719 | SubRef get_sub_with_events(const string& sub) { | |
720 | auto tmpsub = Sub(this, sub); | |
721 | rgw_pubsub_sub_config conf; | |
722 | if (tmpsub.get_conf(&conf) < 0) { | |
723 | return nullptr; | |
724 | } | |
725 | if (conf.s3_id.empty()) { | |
726 | return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub); | |
727 | } | |
728 | return std::make_shared<SubWithEvents<rgw_pubsub_s3_record>>(this, sub); | |
729 | } | |
11fdf7f2 | 730 | |
9f95a23c TL |
731 | void get_user_meta_obj(rgw_raw_obj *obj) const; |
732 | void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const; | |
11fdf7f2 | 733 | |
9f95a23c | 734 | void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const; |
11fdf7f2 | 735 | |
eafe8130 TL |
736 | // get all topics defined for the user and populate them into "result" |
737 | // return 0 on success or if no topics exist, error code otherwise | |
11fdf7f2 | 738 | int get_user_topics(rgw_pubsub_user_topics *result); |
eafe8130 TL |
739 | // get a topic with its subscriptions by its name and populate it into "result" |
740 | // return -ENOENT if the topic does not exists | |
741 | // return 0 on success, error code otherwise | |
11fdf7f2 | 742 | int get_topic(const string& name, rgw_pubsub_topic_subs *result); |
eafe8130 TL |
743 | // get a topic with by its name and populate it into "result" |
744 | // return -ENOENT if the topic does not exists | |
745 | // return 0 on success, error code otherwise | |
746 | int get_topic(const string& name, rgw_pubsub_topic *result); | |
747 | // create a topic with a name only | |
748 | // if the topic already exists it is a no-op (considered success) | |
749 | // return 0 on success, error code otherwise | |
11fdf7f2 | 750 | int create_topic(const string& name); |
eafe8130 TL |
751 | // create a topic with push destination information and ARN |
752 | // if the topic already exists the destination and ARN values may be updated (considered succsess) | |
753 | // return 0 on success, error code otherwise | |
9f95a23c | 754 | int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data); |
eafe8130 TL |
755 | // remove a topic according to its name |
756 | // if the topic does not exists it is a no-op (considered success) | |
757 | // return 0 on success, error code otherwise | |
11fdf7f2 TL |
758 | int remove_topic(const string& name); |
759 | }; | |
760 | ||
9f95a23c | 761 | |
11fdf7f2 TL |
762 | template <class T> |
763 | int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker) | |
764 | { | |
765 | bufferlist bl; | |
9f95a23c | 766 | int ret = rgw_get_system_obj(obj_ctx, |
11fdf7f2 TL |
767 | obj.pool, obj.oid, |
768 | bl, | |
769 | objv_tracker, | |
9f95a23c | 770 | nullptr, null_yield, nullptr, nullptr); |
11fdf7f2 TL |
771 | if (ret < 0) { |
772 | return ret; | |
773 | } | |
774 | ||
775 | auto iter = bl.cbegin(); | |
776 | try { | |
777 | decode(*result, iter); | |
778 | } catch (buffer::error& err) { | |
779 | return -EIO; | |
780 | } | |
781 | ||
782 | return 0; | |
783 | } | |
784 | ||
785 | template <class T> | |
786 | int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker) | |
787 | { | |
788 | bufferlist bl; | |
789 | encode(info, bl); | |
790 | ||
9f95a23c | 791 | int ret = rgw_put_system_obj(obj_ctx, obj.pool, obj.oid, |
11fdf7f2 TL |
792 | bl, false, objv_tracker, |
793 | real_time()); | |
794 | if (ret < 0) { | |
795 | return ret; | |
796 | } | |
797 | ||
9f95a23c | 798 | obj_ctx.invalidate(obj); |
11fdf7f2 TL |
799 | return 0; |
800 | } | |
801 | ||
802 | #endif |