]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
11fdf7f2 TL |
4 | #ifndef CEPH_RGW_PUBSUB_H |
5 | #define CEPH_RGW_PUBSUB_H | |
6 | ||
7 | #include "rgw_common.h" | |
8 | #include "rgw_tools.h" | |
9 | #include "rgw_zone.h" | |
eafe8130 TL |
10 | #include "rgw_rados.h" |
11 | #include "rgw_notify_event_type.h" | |
11fdf7f2 TL |
12 | #include "services/svc_sys_obj.h" |
13 | ||
eafe8130 TL |
14 | class XMLObj; |
15 | ||
16 | struct rgw_s3_key_filter { | |
17 | std::string prefix_rule; | |
18 | std::string suffix_rule; | |
19 | std::string regex_rule; | |
20 | ||
21 | bool has_content() const; | |
22 | ||
23 | bool decode_xml(XMLObj *obj); | |
24 | void dump_xml(Formatter *f) const; | |
25 | ||
26 | void encode(bufferlist& bl) const { | |
27 | ENCODE_START(1, 1, bl); | |
28 | encode(prefix_rule, bl); | |
29 | encode(suffix_rule, bl); | |
30 | encode(regex_rule, bl); | |
31 | ENCODE_FINISH(bl); | |
32 | } | |
33 | ||
34 | void decode(bufferlist::const_iterator& bl) { | |
35 | DECODE_START(1, bl); | |
36 | decode(prefix_rule, bl); | |
37 | decode(suffix_rule, bl); | |
38 | decode(regex_rule, bl); | |
39 | DECODE_FINISH(bl); | |
40 | } | |
41 | }; | |
42 | WRITE_CLASS_ENCODER(rgw_s3_key_filter) | |
43 | ||
44 | using Metadata = std::map<std::string, std::string>; | |
45 | ||
46 | struct rgw_s3_metadata_filter { | |
47 | Metadata metadata; | |
48 | ||
49 | bool has_content() const; | |
50 | ||
51 | bool decode_xml(XMLObj *obj); | |
52 | void dump_xml(Formatter *f) const; | |
53 | ||
54 | void encode(bufferlist& bl) const { | |
55 | ENCODE_START(1, 1, bl); | |
56 | encode(metadata, bl); | |
57 | ENCODE_FINISH(bl); | |
58 | } | |
59 | void decode(bufferlist::const_iterator& bl) { | |
60 | DECODE_START(1, bl); | |
61 | decode(metadata, bl); | |
62 | DECODE_FINISH(bl); | |
63 | } | |
64 | }; | |
65 | WRITE_CLASS_ENCODER(rgw_s3_metadata_filter) | |
66 | ||
67 | struct rgw_s3_filter { | |
68 | rgw_s3_key_filter key_filter; | |
69 | rgw_s3_metadata_filter metadata_filter; | |
70 | ||
71 | bool has_content() const; | |
72 | ||
73 | bool decode_xml(XMLObj *obj); | |
74 | void dump_xml(Formatter *f) const; | |
75 | ||
76 | void encode(bufferlist& bl) const { | |
77 | ENCODE_START(1, 1, bl); | |
78 | encode(key_filter, bl); | |
79 | encode(metadata_filter, bl); | |
80 | ENCODE_FINISH(bl); | |
81 | } | |
82 | ||
83 | void decode(bufferlist::const_iterator& bl) { | |
84 | DECODE_START(1, bl); | |
85 | decode(key_filter, bl); | |
86 | decode(metadata_filter, bl); | |
87 | DECODE_FINISH(bl); | |
88 | } | |
89 | }; | |
90 | WRITE_CLASS_ENCODER(rgw_s3_filter) | |
91 | ||
92 | using OptionalFilter = std::optional<rgw_s3_filter>; | |
93 | ||
94 | class rgw_pubsub_topic_filter; | |
95 | /* S3 notification configuration | |
96 | * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html | |
97 | <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> | |
98 | <TopicConfiguration> | |
99 | <Filter> | |
100 | <S3Key> | |
101 | <FilterRule> | |
102 | <Name>suffix</Name> | |
103 | <Value>jpg</Value> | |
104 | </FilterRule> | |
105 | </S3Key> | |
106 | <S3Metadata> | |
107 | <FilterRule> | |
108 | <Name></Name> | |
109 | <Value></Value> | |
110 | </FilterRule> | |
92f5a8d4 | 111 | </S3Metadata> |
eafe8130 TL |
112 | </Filter> |
113 | <Id>notification1</Id> | |
114 | <Topic>arn:aws:sns:<region>:<account>:<topic></Topic> | |
115 | <Event>s3:ObjectCreated:*</Event> | |
116 | <Event>s3:ObjectRemoved:*</Event> | |
117 | </TopicConfiguration> | |
118 | </NotificationConfiguration> | |
119 | */ | |
120 | struct rgw_pubsub_s3_notification { | |
121 | // notification id | |
122 | std::string id; | |
123 | // types of events | |
124 | rgw::notify::EventTypeList events; | |
125 | // topic ARN | |
126 | std::string topic_arn; | |
127 | // filter rules | |
128 | rgw_s3_filter filter; | |
129 | ||
130 | bool decode_xml(XMLObj *obj); | |
131 | void dump_xml(Formatter *f) const; | |
132 | ||
133 | rgw_pubsub_s3_notification() = default; | |
134 | // construct from rgw_pubsub_topic_filter (used by get/list notifications) | |
135 | rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter); | |
136 | }; | |
137 | ||
138 | // return true if the key matches the prefix/suffix/regex rules of the key filter | |
139 | bool match(const rgw_s3_key_filter& filter, const std::string& key); | |
140 | // return true if the key matches the metadata rules of the metadata filter | |
141 | bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata); | |
142 | // return true if the event type matches (equal or contained in) one of the events in the list | |
143 | bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event); | |
144 | ||
145 | struct rgw_pubsub_s3_notifications { | |
146 | std::list<rgw_pubsub_s3_notification> list; | |
147 | bool decode_xml(XMLObj *obj); | |
148 | void dump_xml(Formatter *f) const; | |
149 | }; | |
150 | ||
151 | /* S3 event records structure | |
152 | * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html | |
153 | { | |
154 | "Records":[ | |
155 | { | |
156 | "eventVersion":"" | |
157 | "eventSource":"", | |
158 | "awsRegion":"", | |
159 | "eventTime":"", | |
160 | "eventName":"", | |
161 | "userIdentity":{ | |
162 | "principalId":"" | |
163 | }, | |
164 | "requestParameters":{ | |
165 | "sourceIPAddress":"" | |
166 | }, | |
167 | "responseElements":{ | |
168 | "x-amz-request-id":"", | |
169 | "x-amz-id-2":"" | |
170 | }, | |
171 | "s3":{ | |
172 | "s3SchemaVersion":"1.0", | |
173 | "configurationId":"", | |
174 | "bucket":{ | |
175 | "name":"", | |
176 | "ownerIdentity":{ | |
177 | "principalId":"" | |
178 | }, | |
179 | "arn":"" | |
180 | "id": "" | |
181 | }, | |
182 | "object":{ | |
183 | "key":"", | |
184 | "size": , | |
185 | "eTag":"", | |
186 | "versionId":"", | |
187 | "sequencer": "", | |
188 | "metadata": "" | |
189 | } | |
190 | }, | |
191 | "eventId":"", | |
192 | } | |
193 | ] | |
194 | }*/ | |
195 | ||
196 | struct rgw_pubsub_s3_record { | |
eafe8130 | 197 | constexpr static const char* const json_type_plural = "Records"; |
92f5a8d4 | 198 | std::string eventVersion = "2.2"; |
eafe8130 | 199 | // aws:s3 |
92f5a8d4 | 200 | std::string eventSource = "ceph:s3"; |
eafe8130 TL |
201 | // zonegroup |
202 | std::string awsRegion; | |
203 | // time of the request | |
204 | ceph::real_time eventTime; | |
205 | // type of the event | |
206 | std::string eventName; | |
92f5a8d4 | 207 | // user that sent the request |
eafe8130 TL |
208 | std::string userIdentity; |
209 | // IP address of source of the request (not implemented) | |
210 | std::string sourceIPAddress; | |
211 | // request ID (not implemented) | |
212 | std::string x_amz_request_id; | |
213 | // radosgw that received the request | |
214 | std::string x_amz_id_2; | |
92f5a8d4 | 215 | std::string s3SchemaVersion = "1.0"; |
eafe8130 TL |
216 | // ID received in the notification request |
217 | std::string configurationId; | |
218 | // bucket name | |
219 | std::string bucket_name; | |
92f5a8d4 | 220 | // bucket owner |
eafe8130 TL |
221 | std::string bucket_ownerIdentity; |
222 | // bucket ARN | |
223 | std::string bucket_arn; | |
224 | // object key | |
225 | std::string object_key; | |
92f5a8d4 TL |
226 | // object size |
227 | uint64_t object_size = 0; | |
eafe8130 TL |
228 | // object etag |
229 | std::string object_etag; | |
230 | // object version id bucket is versioned | |
231 | std::string object_versionId; | |
232 | // hexadecimal value used to determine event order for specific key | |
233 | std::string object_sequencer; | |
234 | // this is an rgw extension (not S3 standard) | |
235 | // used to store a globally unique identifier of the event | |
92f5a8d4 | 236 | // that could be used for acking or any other identification of the event |
eafe8130 TL |
237 | std::string id; |
238 | // this is an rgw extension holding the internal bucket id | |
239 | std::string bucket_id; | |
240 | // meta data | |
241 | std::map<std::string, std::string> x_meta_map; | |
242 | ||
243 | void encode(bufferlist& bl) const { | |
244 | ENCODE_START(2, 1, bl); | |
245 | encode(eventVersion, bl); | |
246 | encode(eventSource, bl); | |
247 | encode(awsRegion, bl); | |
248 | encode(eventTime, bl); | |
249 | encode(eventName, bl); | |
250 | encode(userIdentity, bl); | |
251 | encode(sourceIPAddress, bl); | |
252 | encode(x_amz_request_id, bl); | |
253 | encode(x_amz_id_2, bl); | |
254 | encode(s3SchemaVersion, bl); | |
255 | encode(configurationId, bl); | |
256 | encode(bucket_name, bl); | |
257 | encode(bucket_ownerIdentity, bl); | |
258 | encode(bucket_arn, bl); | |
259 | encode(object_key, bl); | |
260 | encode(object_size, bl); | |
261 | encode(object_etag, bl); | |
262 | encode(object_versionId, bl); | |
263 | encode(object_sequencer, bl); | |
264 | encode(id, bl); | |
265 | encode(bucket_id, bl); | |
266 | encode(x_meta_map, bl); | |
267 | ENCODE_FINISH(bl); | |
268 | } | |
269 | ||
270 | void decode(bufferlist::const_iterator& bl) { | |
271 | DECODE_START(2, bl); | |
272 | decode(eventVersion, bl); | |
273 | decode(eventSource, bl); | |
274 | decode(awsRegion, bl); | |
275 | decode(eventTime, bl); | |
276 | decode(eventName, bl); | |
277 | decode(userIdentity, bl); | |
278 | decode(sourceIPAddress, bl); | |
279 | decode(x_amz_request_id, bl); | |
280 | decode(x_amz_id_2, bl); | |
281 | decode(s3SchemaVersion, bl); | |
282 | decode(configurationId, bl); | |
283 | decode(bucket_name, bl); | |
284 | decode(bucket_ownerIdentity, bl); | |
285 | decode(bucket_arn, bl); | |
286 | decode(object_key, bl); | |
287 | decode(object_size, bl); | |
288 | decode(object_etag, bl); | |
289 | decode(object_versionId, bl); | |
290 | decode(object_sequencer, bl); | |
291 | decode(id, bl); | |
292 | if (struct_v >= 2) { | |
293 | decode(bucket_id, bl); | |
294 | decode(x_meta_map, bl); | |
295 | } | |
296 | DECODE_FINISH(bl); | |
297 | } | |
298 | ||
299 | void dump(Formatter *f) const; | |
300 | }; | |
301 | WRITE_CLASS_ENCODER(rgw_pubsub_s3_record) | |
11fdf7f2 TL |
302 | |
303 | struct rgw_pubsub_event { | |
eafe8130 TL |
304 | constexpr static const char* const json_type_plural = "events"; |
305 | std::string id; | |
306 | std::string event_name; | |
307 | std::string source; | |
11fdf7f2 TL |
308 | ceph::real_time timestamp; |
309 | JSONFormattable info; | |
310 | ||
311 | void encode(bufferlist& bl) const { | |
312 | ENCODE_START(1, 1, bl); | |
313 | encode(id, bl); | |
eafe8130 | 314 | encode(event_name, bl); |
11fdf7f2 TL |
315 | encode(source, bl); |
316 | encode(timestamp, bl); | |
317 | encode(info, bl); | |
318 | ENCODE_FINISH(bl); | |
319 | } | |
320 | ||
321 | void decode(bufferlist::const_iterator& bl) { | |
322 | DECODE_START(1, bl); | |
323 | decode(id, bl); | |
eafe8130 | 324 | decode(event_name, bl); |
11fdf7f2 TL |
325 | decode(source, bl); |
326 | decode(timestamp, bl); | |
327 | decode(info, bl); | |
328 | DECODE_FINISH(bl); | |
329 | } | |
330 | ||
331 | void dump(Formatter *f) const; | |
332 | }; | |
333 | WRITE_CLASS_ENCODER(rgw_pubsub_event) | |
334 | ||
92f5a8d4 TL |
335 | // settign a unique ID for an event/record based on object hash and timestamp |
336 | void set_event_id(std::string& id, const std::string& hash, const utime_t& ts); | |
337 | ||
11fdf7f2 | 338 | struct rgw_pubsub_sub_dest { |
eafe8130 TL |
339 | std::string bucket_name; |
340 | std::string oid_prefix; | |
341 | std::string push_endpoint; | |
342 | std::string push_endpoint_args; | |
343 | std::string arn_topic; | |
11fdf7f2 TL |
344 | |
345 | void encode(bufferlist& bl) const { | |
eafe8130 | 346 | ENCODE_START(3, 1, bl); |
11fdf7f2 TL |
347 | encode(bucket_name, bl); |
348 | encode(oid_prefix, bl); | |
349 | encode(push_endpoint, bl); | |
350 | encode(push_endpoint_args, bl); | |
eafe8130 | 351 | encode(arn_topic, bl); |
11fdf7f2 TL |
352 | ENCODE_FINISH(bl); |
353 | } | |
354 | ||
355 | void decode(bufferlist::const_iterator& bl) { | |
eafe8130 | 356 | DECODE_START(3, bl); |
11fdf7f2 TL |
357 | decode(bucket_name, bl); |
358 | decode(oid_prefix, bl); | |
359 | decode(push_endpoint, bl); | |
360 | if (struct_v >= 2) { | |
361 | decode(push_endpoint_args, bl); | |
362 | } | |
eafe8130 TL |
363 | if (struct_v >= 3) { |
364 | decode(arn_topic, bl); | |
365 | } | |
11fdf7f2 TL |
366 | DECODE_FINISH(bl); |
367 | } | |
368 | ||
369 | void dump(Formatter *f) const; | |
eafe8130 | 370 | void dump_xml(Formatter *f) const; |
11fdf7f2 TL |
371 | }; |
372 | WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest) | |
373 | ||
374 | struct rgw_pubsub_sub_config { | |
375 | rgw_user user; | |
eafe8130 TL |
376 | std::string name; |
377 | std::string topic; | |
11fdf7f2 | 378 | rgw_pubsub_sub_dest dest; |
eafe8130 | 379 | std::string s3_id; |
11fdf7f2 TL |
380 | |
381 | void encode(bufferlist& bl) const { | |
eafe8130 | 382 | ENCODE_START(2, 1, bl); |
11fdf7f2 TL |
383 | encode(user, bl); |
384 | encode(name, bl); | |
385 | encode(topic, bl); | |
386 | encode(dest, bl); | |
eafe8130 | 387 | encode(s3_id, bl); |
11fdf7f2 TL |
388 | ENCODE_FINISH(bl); |
389 | } | |
390 | ||
391 | void decode(bufferlist::const_iterator& bl) { | |
eafe8130 | 392 | DECODE_START(2, bl); |
11fdf7f2 TL |
393 | decode(user, bl); |
394 | decode(name, bl); | |
395 | decode(topic, bl); | |
396 | decode(dest, bl); | |
eafe8130 TL |
397 | if (struct_v >= 2) { |
398 | decode(s3_id, bl); | |
399 | } | |
11fdf7f2 TL |
400 | DECODE_FINISH(bl); |
401 | } | |
402 | ||
403 | void dump(Formatter *f) const; | |
404 | }; | |
405 | WRITE_CLASS_ENCODER(rgw_pubsub_sub_config) | |
406 | ||
407 | struct rgw_pubsub_topic { | |
408 | rgw_user user; | |
eafe8130 TL |
409 | std::string name; |
410 | rgw_pubsub_sub_dest dest; | |
411 | std::string arn; | |
11fdf7f2 TL |
412 | |
413 | void encode(bufferlist& bl) const { | |
eafe8130 | 414 | ENCODE_START(2, 1, bl); |
11fdf7f2 TL |
415 | encode(user, bl); |
416 | encode(name, bl); | |
eafe8130 TL |
417 | encode(dest, bl); |
418 | encode(arn, bl); | |
11fdf7f2 TL |
419 | ENCODE_FINISH(bl); |
420 | } | |
421 | ||
422 | void decode(bufferlist::const_iterator& bl) { | |
eafe8130 | 423 | DECODE_START(2, bl); |
11fdf7f2 TL |
424 | decode(user, bl); |
425 | decode(name, bl); | |
eafe8130 TL |
426 | if (struct_v >= 2) { |
427 | decode(dest, bl); | |
428 | decode(arn, bl); | |
429 | } | |
11fdf7f2 TL |
430 | DECODE_FINISH(bl); |
431 | } | |
432 | ||
433 | string to_str() const { | |
434 | return user.to_str() + "/" + name; | |
435 | } | |
436 | ||
437 | void dump(Formatter *f) const; | |
eafe8130 | 438 | void dump_xml(Formatter *f) const; |
11fdf7f2 TL |
439 | |
440 | bool operator<(const rgw_pubsub_topic& t) const { | |
441 | return to_str().compare(t.to_str()); | |
442 | } | |
443 | }; | |
444 | WRITE_CLASS_ENCODER(rgw_pubsub_topic) | |
445 | ||
446 | struct rgw_pubsub_topic_subs { | |
447 | rgw_pubsub_topic topic; | |
eafe8130 | 448 | std::set<std::string> subs; |
11fdf7f2 TL |
449 | |
450 | void encode(bufferlist& bl) const { | |
451 | ENCODE_START(1, 1, bl); | |
452 | encode(topic, bl); | |
453 | encode(subs, bl); | |
454 | ENCODE_FINISH(bl); | |
455 | } | |
456 | ||
457 | void decode(bufferlist::const_iterator& bl) { | |
458 | DECODE_START(1, bl); | |
459 | decode(topic, bl); | |
460 | decode(subs, bl); | |
461 | DECODE_FINISH(bl); | |
462 | } | |
463 | ||
464 | void dump(Formatter *f) const; | |
465 | }; | |
466 | WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs) | |
467 | ||
468 | struct rgw_pubsub_topic_filter { | |
469 | rgw_pubsub_topic topic; | |
eafe8130 TL |
470 | rgw::notify::EventTypeList events; |
471 | std::string s3_id; | |
472 | rgw_s3_filter s3_filter; | |
11fdf7f2 TL |
473 | |
474 | void encode(bufferlist& bl) const { | |
eafe8130 | 475 | ENCODE_START(3, 1, bl); |
11fdf7f2 | 476 | encode(topic, bl); |
eafe8130 TL |
477 | // events are stored as a vector of strings |
478 | std::vector<std::string> tmp_events; | |
479 | const auto converter = s3_id.empty() ? rgw::notify::to_ceph_string : rgw::notify::to_string; | |
480 | std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter); | |
481 | encode(tmp_events, bl); | |
482 | encode(s3_id, bl); | |
483 | encode(s3_filter, bl); | |
11fdf7f2 TL |
484 | ENCODE_FINISH(bl); |
485 | } | |
486 | ||
487 | void decode(bufferlist::const_iterator& bl) { | |
eafe8130 | 488 | DECODE_START(3, bl); |
11fdf7f2 | 489 | decode(topic, bl); |
eafe8130 TL |
490 | // events are stored as a vector of strings |
491 | events.clear(); | |
492 | std::vector<std::string> tmp_events; | |
493 | decode(tmp_events, bl); | |
494 | std::transform(tmp_events.begin(), tmp_events.end(), std::back_inserter(events), rgw::notify::from_string); | |
495 | if (struct_v >= 2) { | |
496 | decode(s3_id, bl); | |
497 | } | |
498 | if (struct_v >= 3) { | |
499 | decode(s3_filter, bl); | |
500 | } | |
11fdf7f2 TL |
501 | DECODE_FINISH(bl); |
502 | } | |
503 | ||
504 | void dump(Formatter *f) const; | |
505 | }; | |
506 | WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter) | |
507 | ||
508 | struct rgw_pubsub_bucket_topics { | |
eafe8130 | 509 | std::map<std::string, rgw_pubsub_topic_filter> topics; |
11fdf7f2 TL |
510 | |
511 | void encode(bufferlist& bl) const { | |
512 | ENCODE_START(1, 1, bl); | |
513 | encode(topics, bl); | |
514 | ENCODE_FINISH(bl); | |
515 | } | |
516 | ||
517 | void decode(bufferlist::const_iterator& bl) { | |
518 | DECODE_START(1, bl); | |
519 | decode(topics, bl); | |
520 | DECODE_FINISH(bl); | |
521 | } | |
522 | ||
523 | void dump(Formatter *f) const; | |
524 | }; | |
525 | WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics) | |
526 | ||
527 | struct rgw_pubsub_user_topics { | |
eafe8130 | 528 | std::map<std::string, rgw_pubsub_topic_subs> topics; |
11fdf7f2 TL |
529 | |
530 | void encode(bufferlist& bl) const { | |
531 | ENCODE_START(1, 1, bl); | |
532 | encode(topics, bl); | |
533 | ENCODE_FINISH(bl); | |
534 | } | |
535 | ||
536 | void decode(bufferlist::const_iterator& bl) { | |
537 | DECODE_START(1, bl); | |
538 | decode(topics, bl); | |
539 | DECODE_FINISH(bl); | |
540 | } | |
541 | ||
542 | void dump(Formatter *f) const; | |
eafe8130 | 543 | void dump_xml(Formatter *f) const; |
11fdf7f2 TL |
544 | }; |
545 | WRITE_CLASS_ENCODER(rgw_pubsub_user_topics) | |
546 | ||
eafe8130 | 547 | static std::string pubsub_user_oid_prefix = "pubsub.user."; |
11fdf7f2 TL |
548 | |
549 | class RGWUserPubSub | |
550 | { | |
551 | friend class Bucket; | |
552 | ||
553 | RGWRados *store; | |
554 | rgw_user user; | |
555 | RGWSysObjectCtx obj_ctx; | |
556 | ||
557 | rgw_raw_obj user_meta_obj; | |
558 | ||
eafe8130 | 559 | std::string user_meta_oid() const { |
11fdf7f2 TL |
560 | return pubsub_user_oid_prefix + user.to_str(); |
561 | } | |
562 | ||
eafe8130 | 563 | std::string bucket_meta_oid(const rgw_bucket& bucket) const { |
11fdf7f2 TL |
564 | return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id; |
565 | } | |
566 | ||
eafe8130 | 567 | std::string sub_meta_oid(const string& name) const { |
11fdf7f2 TL |
568 | return pubsub_user_oid_prefix + user.to_str() + ".sub." + name; |
569 | } | |
570 | ||
571 | template <class T> | |
572 | int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker); | |
573 | ||
574 | template <class T> | |
575 | int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker); | |
576 | ||
577 | int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker); | |
578 | ||
579 | int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker); | |
580 | int write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker); | |
eafe8130 | 581 | |
11fdf7f2 TL |
582 | public: |
583 | RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store), | |
584 | user(_user), | |
585 | obj_ctx(store->svc.sysobj->init_obj_ctx()) { | |
586 | get_user_meta_obj(&user_meta_obj); | |
587 | } | |
588 | ||
589 | class Bucket { | |
590 | friend class RGWUserPubSub; | |
591 | RGWUserPubSub *ps; | |
592 | rgw_bucket bucket; | |
593 | rgw_raw_obj bucket_meta_obj; | |
594 | ||
eafe8130 TL |
595 | // read the list of topics associated with a bucket and populate into result |
596 | // use version tacker to enforce atomicity between read/write | |
597 | // return 0 on success or if no topic was associated with the bucket, error code otherwise | |
11fdf7f2 | 598 | int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker); |
eafe8130 TL |
599 | // set the list of topics associated with a bucket |
600 | // use version tacker to enforce atomicity between read/write | |
601 | // return 0 on success, error code otherwise | |
11fdf7f2 TL |
602 | int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker); |
603 | public: | |
604 | Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) { | |
605 | ps->get_bucket_meta_obj(bucket, &bucket_meta_obj); | |
606 | } | |
607 | ||
eafe8130 TL |
608 | // read the list of topics associated with a bucket and populate into result |
609 | // return 0 on success or if no topic was associated with the bucket, error code otherwise | |
11fdf7f2 | 610 | int get_topics(rgw_pubsub_bucket_topics *result); |
eafe8130 TL |
611 | // adds a topic + filter (event list, and possibly name and metadata filters) to a bucket |
612 | // assigning a notification name is optional (needed for S3 compatible notifications) | |
613 | // if the topic already exist on the bucket, the filter event list may be updated | |
614 | // for S3 compliant notifications the version with: s3_filter and notif_name should be used | |
615 | // return -ENOENT if the topic does not exists | |
616 | // return 0 on success, error code otherwise | |
617 | int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events); | |
618 | int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name); | |
619 | // remove a topic and filter from bucket | |
620 | // if the topic does not exists on the bucket it is a no-op (considered success) | |
621 | // return -ENOENT if the topic does not exists | |
622 | // return 0 on success, error code otherwise | |
11fdf7f2 TL |
623 | int remove_notification(const string& topic_name); |
624 | }; | |
625 | ||
eafe8130 | 626 | // base class for subscription |
11fdf7f2 TL |
627 | class Sub { |
628 | friend class RGWUserPubSub; | |
eafe8130 TL |
629 | protected: |
630 | RGWUserPubSub* const ps; | |
631 | const std::string sub; | |
11fdf7f2 TL |
632 | rgw_raw_obj sub_meta_obj; |
633 | ||
634 | int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker); | |
635 | int write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker); | |
636 | int remove_sub(RGWObjVersionTracker *objv_tracker); | |
637 | public: | |
eafe8130 | 638 | Sub(RGWUserPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) { |
11fdf7f2 TL |
639 | ps->get_sub_meta_obj(sub, &sub_meta_obj); |
640 | } | |
641 | ||
eafe8130 TL |
642 | virtual ~Sub() = default; |
643 | ||
644 | int subscribe(const string& topic_name, const rgw_pubsub_sub_dest& dest, const std::string& s3_id=""); | |
11fdf7f2 | 645 | int unsubscribe(const string& topic_name); |
eafe8130 TL |
646 | int get_conf(rgw_pubsub_sub_config* result); |
647 | ||
648 | static const int DEFAULT_MAX_EVENTS = 100; | |
649 | // followint virtual methods should only be called in derived | |
650 | virtual int list_events(const string& marker, int max_events) {ceph_assert(false);} | |
651 | virtual int remove_event(const string& event_id) {ceph_assert(false);} | |
652 | virtual void dump(Formatter* f) const {ceph_assert(false);} | |
653 | }; | |
11fdf7f2 | 654 | |
eafe8130 TL |
655 | // subscription with templated list of events to support both S3 compliant and Ceph specific events |
656 | template<typename EventType> | |
657 | class SubWithEvents : public Sub { | |
658 | private: | |
11fdf7f2 | 659 | struct list_events_result { |
eafe8130 | 660 | std::string next_marker; |
11fdf7f2 | 661 | bool is_truncated{false}; |
11fdf7f2 | 662 | void dump(Formatter *f) const; |
eafe8130 TL |
663 | std::vector<EventType> events; |
664 | } list; | |
11fdf7f2 | 665 | |
eafe8130 TL |
666 | public: |
667 | SubWithEvents(RGWUserPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {} | |
668 | ||
669 | virtual ~SubWithEvents() = default; | |
670 | ||
671 | int list_events(const string& marker, int max_events) override; | |
672 | int remove_event(const string& event_id) override; | |
673 | void dump(Formatter* f) const override; | |
11fdf7f2 TL |
674 | }; |
675 | ||
676 | using BucketRef = std::shared_ptr<Bucket>; | |
677 | using SubRef = std::shared_ptr<Sub>; | |
678 | ||
679 | BucketRef get_bucket(const rgw_bucket& bucket) { | |
680 | return std::make_shared<Bucket>(this, bucket); | |
681 | } | |
682 | ||
683 | SubRef get_sub(const string& sub) { | |
684 | return std::make_shared<Sub>(this, sub); | |
685 | } | |
eafe8130 TL |
686 | |
687 | SubRef get_sub_with_events(const string& sub) { | |
688 | auto tmpsub = Sub(this, sub); | |
689 | rgw_pubsub_sub_config conf; | |
690 | if (tmpsub.get_conf(&conf) < 0) { | |
691 | return nullptr; | |
692 | } | |
693 | if (conf.s3_id.empty()) { | |
694 | return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub); | |
695 | } | |
696 | return std::make_shared<SubWithEvents<rgw_pubsub_s3_record>>(this, sub); | |
697 | } | |
698 | ||
11fdf7f2 TL |
699 | void get_user_meta_obj(rgw_raw_obj *obj) const { |
700 | *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, user_meta_oid()); | |
701 | } | |
702 | ||
703 | void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const { | |
704 | *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, bucket_meta_oid(bucket)); | |
705 | } | |
706 | ||
707 | void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const { | |
708 | *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sub_meta_oid(name)); | |
709 | } | |
710 | ||
eafe8130 TL |
711 | // get all topics defined for the user and populate them into "result" |
712 | // return 0 on success or if no topics exist, error code otherwise | |
11fdf7f2 | 713 | int get_user_topics(rgw_pubsub_user_topics *result); |
eafe8130 TL |
714 | // get a topic with its subscriptions by its name and populate it into "result" |
715 | // return -ENOENT if the topic does not exists | |
716 | // return 0 on success, error code otherwise | |
11fdf7f2 | 717 | int get_topic(const string& name, rgw_pubsub_topic_subs *result); |
eafe8130 TL |
718 | // get a topic with by its name and populate it into "result" |
719 | // return -ENOENT if the topic does not exists | |
720 | // return 0 on success, error code otherwise | |
721 | int get_topic(const string& name, rgw_pubsub_topic *result); | |
722 | // create a topic with a name only | |
723 | // if the topic already exists it is a no-op (considered success) | |
724 | // return 0 on success, error code otherwise | |
11fdf7f2 | 725 | int create_topic(const string& name); |
eafe8130 TL |
726 | // create a topic with push destination information and ARN |
727 | // if the topic already exists the destination and ARN values may be updated (considered succsess) | |
728 | // return 0 on success, error code otherwise | |
729 | int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn); | |
730 | // remove a topic according to its name | |
731 | // if the topic does not exists it is a no-op (considered success) | |
732 | // return 0 on success, error code otherwise | |
11fdf7f2 TL |
733 | int remove_topic(const string& name); |
734 | }; | |
735 | ||
736 | template <class T> | |
737 | int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker) | |
738 | { | |
739 | bufferlist bl; | |
740 | int ret = rgw_get_system_obj(store, obj_ctx, | |
741 | obj.pool, obj.oid, | |
742 | bl, | |
743 | objv_tracker, | |
744 | nullptr, nullptr, nullptr); | |
745 | if (ret < 0) { | |
746 | return ret; | |
747 | } | |
748 | ||
749 | auto iter = bl.cbegin(); | |
750 | try { | |
751 | decode(*result, iter); | |
752 | } catch (buffer::error& err) { | |
753 | return -EIO; | |
754 | } | |
755 | ||
756 | return 0; | |
757 | } | |
758 | ||
759 | template <class T> | |
760 | int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker) | |
761 | { | |
762 | bufferlist bl; | |
763 | encode(info, bl); | |
764 | ||
765 | int ret = rgw_put_system_obj(store, obj.pool, obj.oid, | |
766 | bl, false, objv_tracker, | |
767 | real_time()); | |
768 | if (ret < 0) { | |
769 | return ret; | |
770 | } | |
771 | ||
eafe8130 | 772 | obj_ctx.invalidate(const_cast<rgw_raw_obj&>(obj)); |
11fdf7f2 TL |
773 | return 0; |
774 | } | |
775 | ||
776 | #endif |