]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
11fdf7f2 TL |
4 | #ifndef CEPH_RGW_PUBSUB_H |
5 | #define CEPH_RGW_PUBSUB_H | |
6 | ||
7 | #include "rgw_common.h" | |
8 | #include "rgw_tools.h" | |
9 | #include "rgw_zone.h" | |
eafe8130 TL |
10 | #include "rgw_rados.h" |
11 | #include "rgw_notify_event_type.h" | |
11fdf7f2 TL |
12 | #include "services/svc_sys_obj.h" |
13 | ||
eafe8130 TL |
14 | class XMLObj; |
15 | ||
16 | struct rgw_s3_key_filter { | |
17 | std::string prefix_rule; | |
18 | std::string suffix_rule; | |
19 | std::string regex_rule; | |
20 | ||
21 | bool has_content() const; | |
22 | ||
23 | bool decode_xml(XMLObj *obj); | |
24 | void dump_xml(Formatter *f) const; | |
25 | ||
26 | void encode(bufferlist& bl) const { | |
27 | ENCODE_START(1, 1, bl); | |
28 | encode(prefix_rule, bl); | |
29 | encode(suffix_rule, bl); | |
30 | encode(regex_rule, bl); | |
31 | ENCODE_FINISH(bl); | |
32 | } | |
33 | ||
34 | void decode(bufferlist::const_iterator& bl) { | |
35 | DECODE_START(1, bl); | |
36 | decode(prefix_rule, bl); | |
37 | decode(suffix_rule, bl); | |
38 | decode(regex_rule, bl); | |
39 | DECODE_FINISH(bl); | |
40 | } | |
41 | }; | |
42 | WRITE_CLASS_ENCODER(rgw_s3_key_filter) | |
43 | ||
44 | using Metadata = std::map<std::string, std::string>; | |
45 | ||
46 | struct rgw_s3_metadata_filter { | |
47 | Metadata metadata; | |
48 | ||
49 | bool has_content() const; | |
50 | ||
51 | bool decode_xml(XMLObj *obj); | |
52 | void dump_xml(Formatter *f) const; | |
53 | ||
54 | void encode(bufferlist& bl) const { | |
55 | ENCODE_START(1, 1, bl); | |
56 | encode(metadata, bl); | |
57 | ENCODE_FINISH(bl); | |
58 | } | |
59 | void decode(bufferlist::const_iterator& bl) { | |
60 | DECODE_START(1, bl); | |
61 | decode(metadata, bl); | |
62 | DECODE_FINISH(bl); | |
63 | } | |
64 | }; | |
65 | WRITE_CLASS_ENCODER(rgw_s3_metadata_filter) | |
66 | ||
67 | struct rgw_s3_filter { | |
68 | rgw_s3_key_filter key_filter; | |
69 | rgw_s3_metadata_filter metadata_filter; | |
70 | ||
71 | bool has_content() const; | |
72 | ||
73 | bool decode_xml(XMLObj *obj); | |
74 | void dump_xml(Formatter *f) const; | |
75 | ||
76 | void encode(bufferlist& bl) const { | |
77 | ENCODE_START(1, 1, bl); | |
78 | encode(key_filter, bl); | |
79 | encode(metadata_filter, bl); | |
80 | ENCODE_FINISH(bl); | |
81 | } | |
82 | ||
83 | void decode(bufferlist::const_iterator& bl) { | |
84 | DECODE_START(1, bl); | |
85 | decode(key_filter, bl); | |
86 | decode(metadata_filter, bl); | |
87 | DECODE_FINISH(bl); | |
88 | } | |
89 | }; | |
90 | WRITE_CLASS_ENCODER(rgw_s3_filter) | |
91 | ||
92 | using OptionalFilter = std::optional<rgw_s3_filter>; | |
93 | ||
94 | class rgw_pubsub_topic_filter; | |
95 | /* S3 notification configuration | |
96 | * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html | |
97 | <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> | |
98 | <TopicConfiguration> | |
99 | <Filter> | |
100 | <S3Key> | |
101 | <FilterRule> | |
102 | <Name>suffix</Name> | |
103 | <Value>jpg</Value> | |
104 | </FilterRule> | |
105 | </S3Key> | |
106 | <S3Metadata> | |
107 | <FilterRule> | |
108 | <Name></Name> | |
109 | <Value></Value> | |
110 | </FilterRule> | |
111 | </s3Metadata> | |
112 | </Filter> | |
113 | <Id>notification1</Id> | |
114 | <Topic>arn:aws:sns:<region>:<account>:<topic></Topic> | |
115 | <Event>s3:ObjectCreated:*</Event> | |
116 | <Event>s3:ObjectRemoved:*</Event> | |
117 | </TopicConfiguration> | |
118 | </NotificationConfiguration> | |
119 | */ | |
120 | struct rgw_pubsub_s3_notification { | |
121 | // notification id | |
122 | std::string id; | |
123 | // types of events | |
124 | rgw::notify::EventTypeList events; | |
125 | // topic ARN | |
126 | std::string topic_arn; | |
127 | // filter rules | |
128 | rgw_s3_filter filter; | |
129 | ||
130 | bool decode_xml(XMLObj *obj); | |
131 | void dump_xml(Formatter *f) const; | |
132 | ||
133 | rgw_pubsub_s3_notification() = default; | |
134 | // construct from rgw_pubsub_topic_filter (used by get/list notifications) | |
135 | rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter); | |
136 | }; | |
137 | ||
138 | // return true if the key matches the prefix/suffix/regex rules of the key filter | |
139 | bool match(const rgw_s3_key_filter& filter, const std::string& key); | |
140 | // return true if the key matches the metadata rules of the metadata filter | |
141 | bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata); | |
142 | // return true if the event type matches (equal or contained in) one of the events in the list | |
143 | bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event); | |
144 | ||
145 | struct rgw_pubsub_s3_notifications { | |
146 | std::list<rgw_pubsub_s3_notification> list; | |
147 | bool decode_xml(XMLObj *obj); | |
148 | void dump_xml(Formatter *f) const; | |
149 | }; | |
150 | ||
151 | /* S3 event records structure | |
152 | * based on: https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html | |
153 | { | |
154 | "Records":[ | |
155 | { | |
156 | "eventVersion":"" | |
157 | "eventSource":"", | |
158 | "awsRegion":"", | |
159 | "eventTime":"", | |
160 | "eventName":"", | |
161 | "userIdentity":{ | |
162 | "principalId":"" | |
163 | }, | |
164 | "requestParameters":{ | |
165 | "sourceIPAddress":"" | |
166 | }, | |
167 | "responseElements":{ | |
168 | "x-amz-request-id":"", | |
169 | "x-amz-id-2":"" | |
170 | }, | |
171 | "s3":{ | |
172 | "s3SchemaVersion":"1.0", | |
173 | "configurationId":"", | |
174 | "bucket":{ | |
175 | "name":"", | |
176 | "ownerIdentity":{ | |
177 | "principalId":"" | |
178 | }, | |
179 | "arn":"" | |
180 | "id": "" | |
181 | }, | |
182 | "object":{ | |
183 | "key":"", | |
184 | "size": , | |
185 | "eTag":"", | |
186 | "versionId":"", | |
187 | "sequencer": "", | |
188 | "metadata": "" | |
189 | } | |
190 | }, | |
191 | "eventId":"", | |
192 | } | |
193 | ] | |
194 | }*/ | |
195 | ||
196 | struct rgw_pubsub_s3_record { | |
197 | constexpr static const char* const json_type_single = "Record"; | |
198 | constexpr static const char* const json_type_plural = "Records"; | |
199 | // 2.1 | |
200 | std::string eventVersion; | |
201 | // aws:s3 | |
202 | std::string eventSource; | |
203 | // zonegroup | |
204 | std::string awsRegion; | |
205 | // time of the request | |
206 | ceph::real_time eventTime; | |
207 | // type of the event | |
208 | std::string eventName; | |
209 | // user that sent the requet (not implemented) | |
210 | std::string userIdentity; | |
211 | // IP address of source of the request (not implemented) | |
212 | std::string sourceIPAddress; | |
213 | // request ID (not implemented) | |
214 | std::string x_amz_request_id; | |
215 | // radosgw that received the request | |
216 | std::string x_amz_id_2; | |
217 | // 1.0 | |
218 | std::string s3SchemaVersion; | |
219 | // ID received in the notification request | |
220 | std::string configurationId; | |
221 | // bucket name | |
222 | std::string bucket_name; | |
223 | // bucket owner (not implemented) | |
224 | std::string bucket_ownerIdentity; | |
225 | // bucket ARN | |
226 | std::string bucket_arn; | |
227 | // object key | |
228 | std::string object_key; | |
229 | // object size (not implemented) | |
230 | uint64_t object_size; | |
231 | // object etag | |
232 | std::string object_etag; | |
233 | // object version id bucket is versioned | |
234 | std::string object_versionId; | |
235 | // hexadecimal value used to determine event order for specific key | |
236 | std::string object_sequencer; | |
237 | // this is an rgw extension (not S3 standard) | |
238 | // used to store a globally unique identifier of the event | |
239 | // that could be used for acking | |
240 | std::string id; | |
241 | // this is an rgw extension holding the internal bucket id | |
242 | std::string bucket_id; | |
243 | // meta data | |
244 | std::map<std::string, std::string> x_meta_map; | |
245 | ||
246 | void encode(bufferlist& bl) const { | |
247 | ENCODE_START(2, 1, bl); | |
248 | encode(eventVersion, bl); | |
249 | encode(eventSource, bl); | |
250 | encode(awsRegion, bl); | |
251 | encode(eventTime, bl); | |
252 | encode(eventName, bl); | |
253 | encode(userIdentity, bl); | |
254 | encode(sourceIPAddress, bl); | |
255 | encode(x_amz_request_id, bl); | |
256 | encode(x_amz_id_2, bl); | |
257 | encode(s3SchemaVersion, bl); | |
258 | encode(configurationId, bl); | |
259 | encode(bucket_name, bl); | |
260 | encode(bucket_ownerIdentity, bl); | |
261 | encode(bucket_arn, bl); | |
262 | encode(object_key, bl); | |
263 | encode(object_size, bl); | |
264 | encode(object_etag, bl); | |
265 | encode(object_versionId, bl); | |
266 | encode(object_sequencer, bl); | |
267 | encode(id, bl); | |
268 | encode(bucket_id, bl); | |
269 | encode(x_meta_map, bl); | |
270 | ENCODE_FINISH(bl); | |
271 | } | |
272 | ||
273 | void decode(bufferlist::const_iterator& bl) { | |
274 | DECODE_START(2, bl); | |
275 | decode(eventVersion, bl); | |
276 | decode(eventSource, bl); | |
277 | decode(awsRegion, bl); | |
278 | decode(eventTime, bl); | |
279 | decode(eventName, bl); | |
280 | decode(userIdentity, bl); | |
281 | decode(sourceIPAddress, bl); | |
282 | decode(x_amz_request_id, bl); | |
283 | decode(x_amz_id_2, bl); | |
284 | decode(s3SchemaVersion, bl); | |
285 | decode(configurationId, bl); | |
286 | decode(bucket_name, bl); | |
287 | decode(bucket_ownerIdentity, bl); | |
288 | decode(bucket_arn, bl); | |
289 | decode(object_key, bl); | |
290 | decode(object_size, bl); | |
291 | decode(object_etag, bl); | |
292 | decode(object_versionId, bl); | |
293 | decode(object_sequencer, bl); | |
294 | decode(id, bl); | |
295 | if (struct_v >= 2) { | |
296 | decode(bucket_id, bl); | |
297 | decode(x_meta_map, bl); | |
298 | } | |
299 | DECODE_FINISH(bl); | |
300 | } | |
301 | ||
302 | void dump(Formatter *f) const; | |
303 | }; | |
304 | WRITE_CLASS_ENCODER(rgw_pubsub_s3_record) | |
11fdf7f2 TL |
305 | |
306 | struct rgw_pubsub_event { | |
eafe8130 TL |
307 | constexpr static const char* const json_type_single = "event"; |
308 | constexpr static const char* const json_type_plural = "events"; | |
309 | std::string id; | |
310 | std::string event_name; | |
311 | std::string source; | |
11fdf7f2 TL |
312 | ceph::real_time timestamp; |
313 | JSONFormattable info; | |
314 | ||
315 | void encode(bufferlist& bl) const { | |
316 | ENCODE_START(1, 1, bl); | |
317 | encode(id, bl); | |
eafe8130 | 318 | encode(event_name, bl); |
11fdf7f2 TL |
319 | encode(source, bl); |
320 | encode(timestamp, bl); | |
321 | encode(info, bl); | |
322 | ENCODE_FINISH(bl); | |
323 | } | |
324 | ||
325 | void decode(bufferlist::const_iterator& bl) { | |
326 | DECODE_START(1, bl); | |
327 | decode(id, bl); | |
eafe8130 | 328 | decode(event_name, bl); |
11fdf7f2 TL |
329 | decode(source, bl); |
330 | decode(timestamp, bl); | |
331 | decode(info, bl); | |
332 | DECODE_FINISH(bl); | |
333 | } | |
334 | ||
335 | void dump(Formatter *f) const; | |
336 | }; | |
337 | WRITE_CLASS_ENCODER(rgw_pubsub_event) | |
338 | ||
339 | struct rgw_pubsub_sub_dest { | |
eafe8130 TL |
340 | std::string bucket_name; |
341 | std::string oid_prefix; | |
342 | std::string push_endpoint; | |
343 | std::string push_endpoint_args; | |
344 | std::string arn_topic; | |
11fdf7f2 TL |
345 | |
346 | void encode(bufferlist& bl) const { | |
eafe8130 | 347 | ENCODE_START(3, 1, bl); |
11fdf7f2 TL |
348 | encode(bucket_name, bl); |
349 | encode(oid_prefix, bl); | |
350 | encode(push_endpoint, bl); | |
351 | encode(push_endpoint_args, bl); | |
eafe8130 | 352 | encode(arn_topic, bl); |
11fdf7f2 TL |
353 | ENCODE_FINISH(bl); |
354 | } | |
355 | ||
356 | void decode(bufferlist::const_iterator& bl) { | |
eafe8130 | 357 | DECODE_START(3, bl); |
11fdf7f2 TL |
358 | decode(bucket_name, bl); |
359 | decode(oid_prefix, bl); | |
360 | decode(push_endpoint, bl); | |
361 | if (struct_v >= 2) { | |
362 | decode(push_endpoint_args, bl); | |
363 | } | |
eafe8130 TL |
364 | if (struct_v >= 3) { |
365 | decode(arn_topic, bl); | |
366 | } | |
11fdf7f2 TL |
367 | DECODE_FINISH(bl); |
368 | } | |
369 | ||
370 | void dump(Formatter *f) const; | |
eafe8130 | 371 | void dump_xml(Formatter *f) const; |
11fdf7f2 TL |
372 | }; |
373 | WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest) | |
374 | ||
375 | struct rgw_pubsub_sub_config { | |
376 | rgw_user user; | |
eafe8130 TL |
377 | std::string name; |
378 | std::string topic; | |
11fdf7f2 | 379 | rgw_pubsub_sub_dest dest; |
eafe8130 | 380 | std::string s3_id; |
11fdf7f2 TL |
381 | |
382 | void encode(bufferlist& bl) const { | |
eafe8130 | 383 | ENCODE_START(2, 1, bl); |
11fdf7f2 TL |
384 | encode(user, bl); |
385 | encode(name, bl); | |
386 | encode(topic, bl); | |
387 | encode(dest, bl); | |
eafe8130 | 388 | encode(s3_id, bl); |
11fdf7f2 TL |
389 | ENCODE_FINISH(bl); |
390 | } | |
391 | ||
392 | void decode(bufferlist::const_iterator& bl) { | |
eafe8130 | 393 | DECODE_START(2, bl); |
11fdf7f2 TL |
394 | decode(user, bl); |
395 | decode(name, bl); | |
396 | decode(topic, bl); | |
397 | decode(dest, bl); | |
eafe8130 TL |
398 | if (struct_v >= 2) { |
399 | decode(s3_id, bl); | |
400 | } | |
11fdf7f2 TL |
401 | DECODE_FINISH(bl); |
402 | } | |
403 | ||
404 | void dump(Formatter *f) const; | |
405 | }; | |
406 | WRITE_CLASS_ENCODER(rgw_pubsub_sub_config) | |
407 | ||
408 | struct rgw_pubsub_topic { | |
409 | rgw_user user; | |
eafe8130 TL |
410 | std::string name; |
411 | rgw_pubsub_sub_dest dest; | |
412 | std::string arn; | |
11fdf7f2 TL |
413 | |
414 | void encode(bufferlist& bl) const { | |
eafe8130 | 415 | ENCODE_START(2, 1, bl); |
11fdf7f2 TL |
416 | encode(user, bl); |
417 | encode(name, bl); | |
eafe8130 TL |
418 | encode(dest, bl); |
419 | encode(arn, bl); | |
11fdf7f2 TL |
420 | ENCODE_FINISH(bl); |
421 | } | |
422 | ||
423 | void decode(bufferlist::const_iterator& bl) { | |
eafe8130 | 424 | DECODE_START(2, bl); |
11fdf7f2 TL |
425 | decode(user, bl); |
426 | decode(name, bl); | |
eafe8130 TL |
427 | if (struct_v >= 2) { |
428 | decode(dest, bl); | |
429 | decode(arn, bl); | |
430 | } | |
11fdf7f2 TL |
431 | DECODE_FINISH(bl); |
432 | } | |
433 | ||
434 | string to_str() const { | |
435 | return user.to_str() + "/" + name; | |
436 | } | |
437 | ||
438 | void dump(Formatter *f) const; | |
eafe8130 | 439 | void dump_xml(Formatter *f) const; |
11fdf7f2 TL |
440 | |
441 | bool operator<(const rgw_pubsub_topic& t) const { | |
442 | return to_str().compare(t.to_str()); | |
443 | } | |
444 | }; | |
445 | WRITE_CLASS_ENCODER(rgw_pubsub_topic) | |
446 | ||
447 | struct rgw_pubsub_topic_subs { | |
448 | rgw_pubsub_topic topic; | |
eafe8130 | 449 | std::set<std::string> subs; |
11fdf7f2 TL |
450 | |
451 | void encode(bufferlist& bl) const { | |
452 | ENCODE_START(1, 1, bl); | |
453 | encode(topic, bl); | |
454 | encode(subs, bl); | |
455 | ENCODE_FINISH(bl); | |
456 | } | |
457 | ||
458 | void decode(bufferlist::const_iterator& bl) { | |
459 | DECODE_START(1, bl); | |
460 | decode(topic, bl); | |
461 | decode(subs, bl); | |
462 | DECODE_FINISH(bl); | |
463 | } | |
464 | ||
465 | void dump(Formatter *f) const; | |
466 | }; | |
467 | WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs) | |
468 | ||
469 | struct rgw_pubsub_topic_filter { | |
470 | rgw_pubsub_topic topic; | |
eafe8130 TL |
471 | rgw::notify::EventTypeList events; |
472 | std::string s3_id; | |
473 | rgw_s3_filter s3_filter; | |
11fdf7f2 TL |
474 | |
475 | void encode(bufferlist& bl) const { | |
eafe8130 | 476 | ENCODE_START(3, 1, bl); |
11fdf7f2 | 477 | encode(topic, bl); |
eafe8130 TL |
478 | // events are stored as a vector of strings |
479 | std::vector<std::string> tmp_events; | |
480 | const auto converter = s3_id.empty() ? rgw::notify::to_ceph_string : rgw::notify::to_string; | |
481 | std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter); | |
482 | encode(tmp_events, bl); | |
483 | encode(s3_id, bl); | |
484 | encode(s3_filter, bl); | |
11fdf7f2 TL |
485 | ENCODE_FINISH(bl); |
486 | } | |
487 | ||
488 | void decode(bufferlist::const_iterator& bl) { | |
eafe8130 | 489 | DECODE_START(3, bl); |
11fdf7f2 | 490 | decode(topic, bl); |
eafe8130 TL |
491 | // events are stored as a vector of strings |
492 | events.clear(); | |
493 | std::vector<std::string> tmp_events; | |
494 | decode(tmp_events, bl); | |
495 | std::transform(tmp_events.begin(), tmp_events.end(), std::back_inserter(events), rgw::notify::from_string); | |
496 | if (struct_v >= 2) { | |
497 | decode(s3_id, bl); | |
498 | } | |
499 | if (struct_v >= 3) { | |
500 | decode(s3_filter, bl); | |
501 | } | |
11fdf7f2 TL |
502 | DECODE_FINISH(bl); |
503 | } | |
504 | ||
505 | void dump(Formatter *f) const; | |
506 | }; | |
507 | WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter) | |
508 | ||
509 | struct rgw_pubsub_bucket_topics { | |
eafe8130 | 510 | std::map<std::string, rgw_pubsub_topic_filter> topics; |
11fdf7f2 TL |
511 | |
512 | void encode(bufferlist& bl) const { | |
513 | ENCODE_START(1, 1, bl); | |
514 | encode(topics, bl); | |
515 | ENCODE_FINISH(bl); | |
516 | } | |
517 | ||
518 | void decode(bufferlist::const_iterator& bl) { | |
519 | DECODE_START(1, bl); | |
520 | decode(topics, bl); | |
521 | DECODE_FINISH(bl); | |
522 | } | |
523 | ||
524 | void dump(Formatter *f) const; | |
525 | }; | |
526 | WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics) | |
527 | ||
528 | struct rgw_pubsub_user_topics { | |
eafe8130 | 529 | std::map<std::string, rgw_pubsub_topic_subs> topics; |
11fdf7f2 TL |
530 | |
531 | void encode(bufferlist& bl) const { | |
532 | ENCODE_START(1, 1, bl); | |
533 | encode(topics, bl); | |
534 | ENCODE_FINISH(bl); | |
535 | } | |
536 | ||
537 | void decode(bufferlist::const_iterator& bl) { | |
538 | DECODE_START(1, bl); | |
539 | decode(topics, bl); | |
540 | DECODE_FINISH(bl); | |
541 | } | |
542 | ||
543 | void dump(Formatter *f) const; | |
eafe8130 | 544 | void dump_xml(Formatter *f) const; |
11fdf7f2 TL |
545 | }; |
546 | WRITE_CLASS_ENCODER(rgw_pubsub_user_topics) | |
547 | ||
eafe8130 | 548 | static std::string pubsub_user_oid_prefix = "pubsub.user."; |
11fdf7f2 TL |
549 | |
550 | class RGWUserPubSub | |
551 | { | |
552 | friend class Bucket; | |
553 | ||
554 | RGWRados *store; | |
555 | rgw_user user; | |
556 | RGWSysObjectCtx obj_ctx; | |
557 | ||
558 | rgw_raw_obj user_meta_obj; | |
559 | ||
eafe8130 | 560 | std::string user_meta_oid() const { |
11fdf7f2 TL |
561 | return pubsub_user_oid_prefix + user.to_str(); |
562 | } | |
563 | ||
eafe8130 | 564 | std::string bucket_meta_oid(const rgw_bucket& bucket) const { |
11fdf7f2 TL |
565 | return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id; |
566 | } | |
567 | ||
eafe8130 | 568 | std::string sub_meta_oid(const string& name) const { |
11fdf7f2 TL |
569 | return pubsub_user_oid_prefix + user.to_str() + ".sub." + name; |
570 | } | |
571 | ||
572 | template <class T> | |
573 | int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker); | |
574 | ||
575 | template <class T> | |
576 | int write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *obj_tracker); | |
577 | ||
578 | int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker); | |
579 | ||
580 | int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker); | |
581 | int write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker); | |
eafe8130 | 582 | |
11fdf7f2 TL |
583 | public: |
584 | RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store), | |
585 | user(_user), | |
586 | obj_ctx(store->svc.sysobj->init_obj_ctx()) { | |
587 | get_user_meta_obj(&user_meta_obj); | |
588 | } | |
589 | ||
590 | class Bucket { | |
591 | friend class RGWUserPubSub; | |
592 | RGWUserPubSub *ps; | |
593 | rgw_bucket bucket; | |
594 | rgw_raw_obj bucket_meta_obj; | |
595 | ||
eafe8130 TL |
596 | // read the list of topics associated with a bucket and populate into result |
597 | // use version tacker to enforce atomicity between read/write | |
598 | // return 0 on success or if no topic was associated with the bucket, error code otherwise | |
11fdf7f2 | 599 | int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker); |
eafe8130 TL |
600 | // set the list of topics associated with a bucket |
601 | // use version tacker to enforce atomicity between read/write | |
602 | // return 0 on success, error code otherwise | |
11fdf7f2 TL |
603 | int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker); |
604 | public: | |
605 | Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) { | |
606 | ps->get_bucket_meta_obj(bucket, &bucket_meta_obj); | |
607 | } | |
608 | ||
eafe8130 TL |
609 | // read the list of topics associated with a bucket and populate into result |
610 | // return 0 on success or if no topic was associated with the bucket, error code otherwise | |
11fdf7f2 | 611 | int get_topics(rgw_pubsub_bucket_topics *result); |
eafe8130 TL |
612 | // adds a topic + filter (event list, and possibly name and metadata filters) to a bucket |
613 | // assigning a notification name is optional (needed for S3 compatible notifications) | |
614 | // if the topic already exist on the bucket, the filter event list may be updated | |
615 | // for S3 compliant notifications the version with: s3_filter and notif_name should be used | |
616 | // return -ENOENT if the topic does not exists | |
617 | // return 0 on success, error code otherwise | |
618 | int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events); | |
619 | int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name); | |
620 | // remove a topic and filter from bucket | |
621 | // if the topic does not exists on the bucket it is a no-op (considered success) | |
622 | // return -ENOENT if the topic does not exists | |
623 | // return 0 on success, error code otherwise | |
11fdf7f2 TL |
624 | int remove_notification(const string& topic_name); |
625 | }; | |
626 | ||
eafe8130 | 627 | // base class for subscription |
11fdf7f2 TL |
628 | class Sub { |
629 | friend class RGWUserPubSub; | |
eafe8130 TL |
630 | protected: |
631 | RGWUserPubSub* const ps; | |
632 | const std::string sub; | |
11fdf7f2 TL |
633 | rgw_raw_obj sub_meta_obj; |
634 | ||
635 | int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker); | |
636 | int write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker); | |
637 | int remove_sub(RGWObjVersionTracker *objv_tracker); | |
638 | public: | |
eafe8130 | 639 | Sub(RGWUserPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) { |
11fdf7f2 TL |
640 | ps->get_sub_meta_obj(sub, &sub_meta_obj); |
641 | } | |
642 | ||
eafe8130 TL |
643 | virtual ~Sub() = default; |
644 | ||
645 | int subscribe(const string& topic_name, const rgw_pubsub_sub_dest& dest, const std::string& s3_id=""); | |
11fdf7f2 | 646 | int unsubscribe(const string& topic_name); |
eafe8130 TL |
647 | int get_conf(rgw_pubsub_sub_config* result); |
648 | ||
649 | static const int DEFAULT_MAX_EVENTS = 100; | |
650 | // followint virtual methods should only be called in derived | |
651 | virtual int list_events(const string& marker, int max_events) {ceph_assert(false);} | |
652 | virtual int remove_event(const string& event_id) {ceph_assert(false);} | |
653 | virtual void dump(Formatter* f) const {ceph_assert(false);} | |
654 | }; | |
11fdf7f2 | 655 | |
eafe8130 TL |
656 | // subscription with templated list of events to support both S3 compliant and Ceph specific events |
657 | template<typename EventType> | |
658 | class SubWithEvents : public Sub { | |
659 | private: | |
11fdf7f2 | 660 | struct list_events_result { |
eafe8130 | 661 | std::string next_marker; |
11fdf7f2 | 662 | bool is_truncated{false}; |
11fdf7f2 | 663 | void dump(Formatter *f) const; |
eafe8130 TL |
664 | std::vector<EventType> events; |
665 | } list; | |
11fdf7f2 | 666 | |
eafe8130 TL |
667 | public: |
668 | SubWithEvents(RGWUserPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {} | |
669 | ||
670 | virtual ~SubWithEvents() = default; | |
671 | ||
672 | int list_events(const string& marker, int max_events) override; | |
673 | int remove_event(const string& event_id) override; | |
674 | void dump(Formatter* f) const override; | |
11fdf7f2 TL |
675 | }; |
676 | ||
677 | using BucketRef = std::shared_ptr<Bucket>; | |
678 | using SubRef = std::shared_ptr<Sub>; | |
679 | ||
680 | BucketRef get_bucket(const rgw_bucket& bucket) { | |
681 | return std::make_shared<Bucket>(this, bucket); | |
682 | } | |
683 | ||
684 | SubRef get_sub(const string& sub) { | |
685 | return std::make_shared<Sub>(this, sub); | |
686 | } | |
eafe8130 TL |
687 | |
688 | SubRef get_sub_with_events(const string& sub) { | |
689 | auto tmpsub = Sub(this, sub); | |
690 | rgw_pubsub_sub_config conf; | |
691 | if (tmpsub.get_conf(&conf) < 0) { | |
692 | return nullptr; | |
693 | } | |
694 | if (conf.s3_id.empty()) { | |
695 | return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub); | |
696 | } | |
697 | return std::make_shared<SubWithEvents<rgw_pubsub_s3_record>>(this, sub); | |
698 | } | |
699 | ||
11fdf7f2 TL |
700 | void get_user_meta_obj(rgw_raw_obj *obj) const { |
701 | *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, user_meta_oid()); | |
702 | } | |
703 | ||
704 | void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const { | |
705 | *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, bucket_meta_oid(bucket)); | |
706 | } | |
707 | ||
708 | void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const { | |
709 | *obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sub_meta_oid(name)); | |
710 | } | |
711 | ||
eafe8130 TL |
712 | // get all topics defined for the user and populate them into "result" |
713 | // return 0 on success or if no topics exist, error code otherwise | |
11fdf7f2 | 714 | int get_user_topics(rgw_pubsub_user_topics *result); |
eafe8130 TL |
715 | // get a topic with its subscriptions by its name and populate it into "result" |
716 | // return -ENOENT if the topic does not exists | |
717 | // return 0 on success, error code otherwise | |
11fdf7f2 | 718 | int get_topic(const string& name, rgw_pubsub_topic_subs *result); |
eafe8130 TL |
719 | // get a topic with by its name and populate it into "result" |
720 | // return -ENOENT if the topic does not exists | |
721 | // return 0 on success, error code otherwise | |
722 | int get_topic(const string& name, rgw_pubsub_topic *result); | |
723 | // create a topic with a name only | |
724 | // if the topic already exists it is a no-op (considered success) | |
725 | // return 0 on success, error code otherwise | |
11fdf7f2 | 726 | int create_topic(const string& name); |
eafe8130 TL |
727 | // create a topic with push destination information and ARN |
728 | // if the topic already exists the destination and ARN values may be updated (considered succsess) | |
729 | // return 0 on success, error code otherwise | |
730 | int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn); | |
731 | // remove a topic according to its name | |
732 | // if the topic does not exists it is a no-op (considered success) | |
733 | // return 0 on success, error code otherwise | |
11fdf7f2 TL |
734 | int remove_topic(const string& name); |
735 | }; | |
736 | ||
737 | template <class T> | |
738 | int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker) | |
739 | { | |
740 | bufferlist bl; | |
741 | int ret = rgw_get_system_obj(store, obj_ctx, | |
742 | obj.pool, obj.oid, | |
743 | bl, | |
744 | objv_tracker, | |
745 | nullptr, nullptr, nullptr); | |
746 | if (ret < 0) { | |
747 | return ret; | |
748 | } | |
749 | ||
750 | auto iter = bl.cbegin(); | |
751 | try { | |
752 | decode(*result, iter); | |
753 | } catch (buffer::error& err) { | |
754 | return -EIO; | |
755 | } | |
756 | ||
757 | return 0; | |
758 | } | |
759 | ||
760 | template <class T> | |
761 | int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTracker *objv_tracker) | |
762 | { | |
763 | bufferlist bl; | |
764 | encode(info, bl); | |
765 | ||
766 | int ret = rgw_put_system_obj(store, obj.pool, obj.oid, | |
767 | bl, false, objv_tracker, | |
768 | real_time()); | |
769 | if (ret < 0) { | |
770 | return ret; | |
771 | } | |
772 | ||
eafe8130 | 773 | obj_ctx.invalidate(const_cast<rgw_raw_obj&>(obj)); |
11fdf7f2 TL |
774 | return 0; |
775 | } | |
776 | ||
777 | #endif |