]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #include "services/svc_zone.h" | |
11fdf7f2 | 5 | #include "rgw_b64.h" |
9f95a23c | 6 | #include "rgw_sal.h" |
11fdf7f2 TL |
7 | #include "rgw_pubsub.h" |
8 | #include "rgw_tools.h" | |
eafe8130 TL |
9 | #include "rgw_xml.h" |
10 | #include "rgw_arn.h" | |
11 | #include "rgw_pubsub_push.h" | |
12 | #include "rgw_rados.h" | |
13 | #include <regex> | |
14 | #include <algorithm> | |
11fdf7f2 TL |
15 | |
16 | #define dout_subsys ceph_subsys_rgw | |
17 | ||
92f5a8d4 TL |
18 | void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) { |
19 | char buf[64]; | |
20 | const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str()); | |
21 | if (len > 0) { | |
22 | id.assign(buf, len); | |
23 | } | |
24 | } | |
25 | ||
eafe8130 TL |
26 | bool rgw_s3_key_filter::decode_xml(XMLObj* obj) { |
27 | XMLObjIter iter = obj->find("FilterRule"); | |
28 | XMLObj *o; | |
29 | ||
30 | const auto throw_if_missing = true; | |
31 | auto prefix_not_set = true; | |
32 | auto suffix_not_set = true; | |
33 | auto regex_not_set = true; | |
34 | std::string name; | |
35 | ||
36 | while ((o = iter.get_next())) { | |
37 | RGWXMLDecoder::decode_xml("Name", name, o, throw_if_missing); | |
38 | if (name == "prefix" && prefix_not_set) { | |
39 | prefix_not_set = false; | |
40 | RGWXMLDecoder::decode_xml("Value", prefix_rule, o, throw_if_missing); | |
41 | } else if (name == "suffix" && suffix_not_set) { | |
42 | suffix_not_set = false; | |
43 | RGWXMLDecoder::decode_xml("Value", suffix_rule, o, throw_if_missing); | |
44 | } else if (name == "regex" && regex_not_set) { | |
45 | regex_not_set = false; | |
46 | RGWXMLDecoder::decode_xml("Value", regex_rule, o, throw_if_missing); | |
47 | } else { | |
48 | throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name + "'"); | |
49 | } | |
50 | } | |
51 | return true; | |
52 | } | |
53 | ||
54 | void rgw_s3_key_filter::dump_xml(Formatter *f) const { | |
55 | if (!prefix_rule.empty()) { | |
56 | f->open_object_section("FilterRule"); | |
57 | ::encode_xml("Name", "prefix", f); | |
58 | ::encode_xml("Value", prefix_rule, f); | |
59 | f->close_section(); | |
60 | } | |
61 | if (!suffix_rule.empty()) { | |
62 | f->open_object_section("FilterRule"); | |
63 | ::encode_xml("Name", "suffix", f); | |
64 | ::encode_xml("Value", suffix_rule, f); | |
65 | f->close_section(); | |
66 | } | |
67 | if (!regex_rule.empty()) { | |
68 | f->open_object_section("FilterRule"); | |
69 | ::encode_xml("Name", "regex", f); | |
70 | ::encode_xml("Value", regex_rule, f); | |
71 | f->close_section(); | |
72 | } | |
73 | } | |
74 | ||
75 | bool rgw_s3_key_filter::has_content() const { | |
76 | return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty()); | |
77 | } | |
78 | ||
9f95a23c TL |
79 | bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) { |
80 | kvl.clear(); | |
eafe8130 TL |
81 | XMLObjIter iter = obj->find("FilterRule"); |
82 | XMLObj *o; | |
83 | ||
84 | const auto throw_if_missing = true; | |
85 | ||
86 | std::string key; | |
87 | std::string value; | |
88 | ||
89 | while ((o = iter.get_next())) { | |
90 | RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing); | |
91 | RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing); | |
9f95a23c | 92 | kvl.emplace(key, value); |
eafe8130 TL |
93 | } |
94 | return true; | |
95 | } | |
96 | ||
9f95a23c TL |
97 | void rgw_s3_key_value_filter::dump_xml(Formatter *f) const { |
98 | for (const auto& key_value : kvl) { | |
eafe8130 TL |
99 | f->open_object_section("FilterRule"); |
100 | ::encode_xml("Name", key_value.first, f); | |
101 | ::encode_xml("Value", key_value.second, f); | |
102 | f->close_section(); | |
103 | } | |
104 | } | |
105 | ||
9f95a23c TL |
106 | bool rgw_s3_key_value_filter::has_content() const { |
107 | return !kvl.empty(); | |
eafe8130 TL |
108 | } |
109 | ||
110 | bool rgw_s3_filter::decode_xml(XMLObj* obj) { | |
111 | RGWXMLDecoder::decode_xml("S3Key", key_filter, obj); | |
112 | RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj); | |
9f95a23c | 113 | RGWXMLDecoder::decode_xml("S3Tags", tag_filter, obj); |
eafe8130 TL |
114 | return true; |
115 | } | |
116 | ||
117 | void rgw_s3_filter::dump_xml(Formatter *f) const { | |
118 | if (key_filter.has_content()) { | |
119 | ::encode_xml("S3Key", key_filter, f); | |
120 | } | |
121 | if (metadata_filter.has_content()) { | |
122 | ::encode_xml("S3Metadata", metadata_filter, f); | |
123 | } | |
9f95a23c TL |
124 | if (tag_filter.has_content()) { |
125 | ::encode_xml("S3Tags", tag_filter, f); | |
126 | } | |
eafe8130 TL |
127 | } |
128 | ||
129 | bool rgw_s3_filter::has_content() const { | |
130 | return key_filter.has_content() || | |
9f95a23c TL |
131 | metadata_filter.has_content() || |
132 | tag_filter.has_content(); | |
eafe8130 TL |
133 | } |
134 | ||
135 | bool match(const rgw_s3_key_filter& filter, const std::string& key) { | |
136 | const auto key_size = key.size(); | |
137 | const auto prefix_size = filter.prefix_rule.size(); | |
138 | if (prefix_size != 0) { | |
139 | // prefix rule exists | |
140 | if (prefix_size > key_size) { | |
141 | // if prefix is longer than key, we fail | |
142 | return false; | |
143 | } | |
144 | if (!std::equal(filter.prefix_rule.begin(), filter.prefix_rule.end(), key.begin())) { | |
145 | return false; | |
146 | } | |
147 | } | |
148 | const auto suffix_size = filter.suffix_rule.size(); | |
149 | if (suffix_size != 0) { | |
150 | // suffix rule exists | |
151 | if (suffix_size > key_size) { | |
152 | // if suffix is longer than key, we fail | |
153 | return false; | |
154 | } | |
155 | if (!std::equal(filter.suffix_rule.begin(), filter.suffix_rule.end(), (key.end() - suffix_size))) { | |
156 | return false; | |
157 | } | |
158 | } | |
159 | if (!filter.regex_rule.empty()) { | |
160 | // TODO add regex chaching in the filter | |
161 | const std::regex base_regex(filter.regex_rule); | |
162 | if (!std::regex_match(key, base_regex)) { | |
163 | return false; | |
164 | } | |
165 | } | |
166 | return true; | |
167 | } | |
168 | ||
9f95a23c TL |
169 | bool match(const rgw_s3_key_value_filter& filter, const KeyValueList& kvl) { |
170 | // all filter pairs must exist with the same value in the object's metadata/tags | |
171 | // object metadata/tags may include items not in the filter | |
172 | return std::includes(kvl.begin(), kvl.end(), filter.kvl.begin(), filter.kvl.end()); | |
eafe8130 TL |
173 | } |
174 | ||
175 | bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) { | |
176 | // if event list exists, and none of the events in the list matches the event type, filter the message | |
177 | if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) { | |
178 | return false; | |
179 | } | |
180 | return true; | |
181 | } | |
182 | ||
183 | void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj) { | |
184 | l.clear(); | |
185 | ||
186 | XMLObjIter iter = obj->find(name); | |
187 | XMLObj *o; | |
188 | ||
189 | while ((o = iter.get_next())) { | |
190 | std::string val; | |
191 | decode_xml_obj(val, o); | |
192 | l.push_back(rgw::notify::from_string(val)); | |
193 | } | |
194 | } | |
195 | ||
196 | bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) { | |
197 | const auto throw_if_missing = true; | |
198 | RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing); | |
199 | ||
200 | RGWXMLDecoder::decode_xml("Topic", topic_arn, obj, throw_if_missing); | |
201 | ||
202 | RGWXMLDecoder::decode_xml("Filter", filter, obj); | |
203 | ||
204 | do_decode_xml_obj(events, "Event", obj); | |
205 | if (events.empty()) { | |
206 | // if no events are provided, we assume all events | |
207 | events.push_back(rgw::notify::ObjectCreated); | |
208 | events.push_back(rgw::notify::ObjectRemoved); | |
209 | } | |
210 | return true; | |
211 | } | |
212 | ||
213 | void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const { | |
214 | ::encode_xml("Id", id, f); | |
215 | ::encode_xml("Topic", topic_arn.c_str(), f); | |
216 | if (filter.has_content()) { | |
217 | ::encode_xml("Filter", filter, f); | |
218 | } | |
219 | for (const auto& event : events) { | |
220 | ::encode_xml("Event", rgw::notify::to_string(event), f); | |
221 | } | |
222 | } | |
223 | ||
224 | bool rgw_pubsub_s3_notifications::decode_xml(XMLObj *obj) { | |
225 | do_decode_xml_obj(list, "TopicConfiguration", obj); | |
226 | if (list.empty()) { | |
227 | throw RGWXMLDecoder::err("at least one 'TopicConfiguration' must exist"); | |
228 | } | |
229 | return true; | |
230 | } | |
231 | ||
232 | rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter) : | |
233 | id(topic_filter.s3_id), events(topic_filter.events), topic_arn(topic_filter.topic.arn), filter(topic_filter.s3_filter) {} | |
234 | ||
235 | void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const { | |
236 | do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f); | |
237 | } | |
238 | ||
239 | void rgw_pubsub_s3_record::dump(Formatter *f) const { | |
240 | encode_json("eventVersion", eventVersion, f); | |
241 | encode_json("eventSource", eventSource, f); | |
242 | encode_json("awsRegion", awsRegion, f); | |
243 | utime_t ut(eventTime); | |
244 | encode_json("eventTime", ut, f); | |
245 | encode_json("eventName", eventName, f); | |
246 | { | |
247 | Formatter::ObjectSection s(*f, "userIdentity"); | |
248 | encode_json("principalId", userIdentity, f); | |
249 | } | |
250 | { | |
251 | Formatter::ObjectSection s(*f, "requestParameters"); | |
252 | encode_json("sourceIPAddress", sourceIPAddress, f); | |
253 | } | |
254 | { | |
255 | Formatter::ObjectSection s(*f, "responseElements"); | |
256 | encode_json("x-amz-request-id", x_amz_request_id, f); | |
257 | encode_json("x-amz-id-2", x_amz_id_2, f); | |
258 | } | |
259 | { | |
260 | Formatter::ObjectSection s(*f, "s3"); | |
261 | encode_json("s3SchemaVersion", s3SchemaVersion, f); | |
262 | encode_json("configurationId", configurationId, f); | |
263 | { | |
264 | Formatter::ObjectSection sub_s(*f, "bucket"); | |
265 | encode_json("name", bucket_name, f); | |
266 | { | |
267 | Formatter::ObjectSection sub_sub_s(*f, "ownerIdentity"); | |
268 | encode_json("principalId", bucket_ownerIdentity, f); | |
269 | } | |
270 | encode_json("arn", bucket_arn, f); | |
271 | encode_json("id", bucket_id, f); | |
272 | } | |
273 | { | |
274 | Formatter::ObjectSection sub_s(*f, "object"); | |
275 | encode_json("key", object_key, f); | |
276 | encode_json("size", object_size, f); | |
277 | encode_json("etag", object_etag, f); | |
278 | encode_json("versionId", object_versionId, f); | |
279 | encode_json("sequencer", object_sequencer, f); | |
280 | encode_json("metadata", x_meta_map, f); | |
9f95a23c | 281 | encode_json("tags", tags, f); |
eafe8130 TL |
282 | } |
283 | } | |
284 | encode_json("eventId", id, f); | |
9f95a23c | 285 | encode_json("opaqueData", opaque_data, f); |
eafe8130 | 286 | } |
11fdf7f2 TL |
287 | |
288 | void rgw_pubsub_event::dump(Formatter *f) const | |
289 | { | |
290 | encode_json("id", id, f); | |
eafe8130 | 291 | encode_json("event", event_name, f); |
11fdf7f2 TL |
292 | utime_t ut(timestamp); |
293 | encode_json("timestamp", ut, f); | |
294 | encode_json("info", info, f); | |
295 | } | |
296 | ||
297 | void rgw_pubsub_topic::dump(Formatter *f) const | |
298 | { | |
299 | encode_json("user", user, f); | |
300 | encode_json("name", name, f); | |
eafe8130 TL |
301 | encode_json("dest", dest, f); |
302 | encode_json("arn", arn, f); | |
9f95a23c | 303 | encode_json("opaqueData", opaque_data, f); |
eafe8130 TL |
304 | } |
305 | ||
306 | void rgw_pubsub_topic::dump_xml(Formatter *f) const | |
307 | { | |
308 | encode_xml("User", user, f); | |
309 | encode_xml("Name", name, f); | |
310 | encode_xml("EndPoint", dest, f); | |
311 | encode_xml("TopicArn", arn, f); | |
9f95a23c | 312 | encode_xml("OpaqueData", opaque_data, f); |
eafe8130 TL |
313 | } |
314 | ||
315 | void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f) | |
316 | { | |
317 | f->open_array_section(name); | |
318 | for (auto iter = l.cbegin(); iter != l.cend(); ++iter) { | |
319 | f->dump_string("obj", rgw::notify::to_ceph_string(*iter)); | |
320 | } | |
321 | f->close_section(); | |
11fdf7f2 TL |
322 | } |
323 | ||
324 | void rgw_pubsub_topic_filter::dump(Formatter *f) const | |
325 | { | |
326 | encode_json("topic", topic, f); | |
327 | encode_json("events", events, f); | |
328 | } | |
329 | ||
330 | void rgw_pubsub_topic_subs::dump(Formatter *f) const | |
331 | { | |
332 | encode_json("topic", topic, f); | |
333 | encode_json("subs", subs, f); | |
334 | } | |
335 | ||
336 | void rgw_pubsub_bucket_topics::dump(Formatter *f) const | |
337 | { | |
338 | Formatter::ArraySection s(*f, "topics"); | |
339 | for (auto& t : topics) { | |
340 | encode_json(t.first.c_str(), t.second, f); | |
341 | } | |
342 | } | |
343 | ||
344 | void rgw_pubsub_user_topics::dump(Formatter *f) const | |
345 | { | |
346 | Formatter::ArraySection s(*f, "topics"); | |
347 | for (auto& t : topics) { | |
348 | encode_json(t.first.c_str(), t.second, f); | |
349 | } | |
350 | } | |
351 | ||
eafe8130 TL |
352 | void rgw_pubsub_user_topics::dump_xml(Formatter *f) const |
353 | { | |
354 | for (auto& t : topics) { | |
355 | encode_xml("member", t.second.topic, f); | |
356 | } | |
357 | } | |
358 | ||
11fdf7f2 TL |
359 | void rgw_pubsub_sub_dest::dump(Formatter *f) const |
360 | { | |
361 | encode_json("bucket_name", bucket_name, f); | |
362 | encode_json("oid_prefix", oid_prefix, f); | |
363 | encode_json("push_endpoint", push_endpoint, f); | |
eafe8130 TL |
364 | encode_json("push_endpoint_args", push_endpoint_args, f); |
365 | encode_json("push_endpoint_topic", arn_topic, f); | |
366 | } | |
367 | ||
368 | void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const | |
369 | { | |
370 | encode_xml("EndpointAddress", push_endpoint, f); | |
371 | encode_xml("EndpointArgs", push_endpoint_args, f); | |
372 | encode_xml("EndpointTopic", arn_topic, f); | |
11fdf7f2 TL |
373 | } |
374 | ||
375 | void rgw_pubsub_sub_config::dump(Formatter *f) const | |
376 | { | |
377 | encode_json("user", user, f); | |
378 | encode_json("name", name, f); | |
379 | encode_json("topic", topic, f); | |
380 | encode_json("dest", dest, f); | |
eafe8130 | 381 | encode_json("s3_id", s3_id, f); |
11fdf7f2 TL |
382 | } |
383 | ||
9f95a23c TL |
384 | RGWUserPubSub::RGWUserPubSub(rgw::sal::RGWRadosStore* _store, const rgw_user& _user) : |
385 | store(_store), | |
386 | user(_user), | |
387 | obj_ctx(store->svc()->sysobj->init_obj_ctx()) { | |
388 | get_user_meta_obj(&user_meta_obj); | |
389 | } | |
11fdf7f2 TL |
390 | |
391 | int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) | |
392 | { | |
9f95a23c | 393 | int ret = rgw_delete_system_obj(store->svc()->sysobj, obj.pool, obj.oid, objv_tracker); |
11fdf7f2 TL |
394 | if (ret < 0) { |
395 | return ret; | |
396 | } | |
397 | ||
398 | return 0; | |
399 | } | |
400 | ||
401 | int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker) | |
402 | { | |
403 | int ret = read(user_meta_obj, result, objv_tracker); | |
eafe8130 TL |
404 | if (ret < 0) { |
405 | ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl; | |
11fdf7f2 TL |
406 | return ret; |
407 | } | |
408 | return 0; | |
409 | } | |
410 | ||
411 | int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker) | |
412 | { | |
413 | int ret = write(user_meta_obj, topics, objv_tracker); | |
414 | if (ret < 0 && ret != -ENOENT) { | |
eafe8130 | 415 | ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
416 | return ret; |
417 | } | |
418 | return 0; | |
419 | } | |
420 | ||
421 | int RGWUserPubSub::get_user_topics(rgw_pubsub_user_topics *result) | |
422 | { | |
423 | return read_user_topics(result, nullptr); | |
424 | } | |
425 | ||
426 | int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker) | |
427 | { | |
428 | int ret = ps->read(bucket_meta_obj, result, objv_tracker); | |
429 | if (ret < 0 && ret != -ENOENT) { | |
eafe8130 | 430 | ldout(ps->store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
431 | return ret; |
432 | } | |
433 | return 0; | |
434 | } | |
435 | ||
436 | int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker) | |
437 | { | |
438 | int ret = ps->write(bucket_meta_obj, topics, objv_tracker); | |
439 | if (ret < 0) { | |
eafe8130 | 440 | ldout(ps->store->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
441 | return ret; |
442 | } | |
443 | ||
444 | return 0; | |
445 | } | |
446 | ||
447 | int RGWUserPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result) | |
448 | { | |
449 | return read_topics(result, nullptr); | |
450 | } | |
451 | ||
452 | int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result) | |
453 | { | |
454 | rgw_pubsub_user_topics topics; | |
455 | int ret = get_user_topics(&topics); | |
456 | if (ret < 0) { | |
eafe8130 | 457 | ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
458 | return ret; |
459 | } | |
460 | ||
461 | auto iter = topics.topics.find(name); | |
462 | if (iter == topics.topics.end()) { | |
eafe8130 | 463 | ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl; |
11fdf7f2 TL |
464 | return -ENOENT; |
465 | } | |
466 | ||
467 | *result = iter->second; | |
468 | return 0; | |
469 | } | |
470 | ||
eafe8130 | 471 | int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic *result) |
11fdf7f2 | 472 | { |
eafe8130 TL |
473 | rgw_pubsub_user_topics topics; |
474 | int ret = get_user_topics(&topics); | |
475 | if (ret < 0) { | |
476 | ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; | |
477 | return ret; | |
478 | } | |
479 | ||
480 | auto iter = topics.topics.find(name); | |
481 | if (iter == topics.topics.end()) { | |
482 | ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl; | |
483 | return -ENOENT; | |
484 | } | |
485 | ||
486 | *result = iter->second.topic; | |
487 | return 0; | |
488 | } | |
489 | ||
490 | int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events) { | |
491 | return create_notification(topic_name, events, std::nullopt, ""); | |
492 | } | |
493 | ||
494 | int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name) { | |
11fdf7f2 | 495 | rgw_pubsub_topic_subs user_topic_info; |
9f95a23c | 496 | rgw::sal::RGWRadosStore *store = ps->store; |
11fdf7f2 TL |
497 | |
498 | int ret = ps->get_topic(topic_name, &user_topic_info); | |
499 | if (ret < 0) { | |
eafe8130 | 500 | ldout(store->ctx(), 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl; |
11fdf7f2 TL |
501 | return ret; |
502 | } | |
eafe8130 | 503 | ldout(store->ctx(), 20) << "successfully read topic '" << topic_name << "' info" << dendl; |
11fdf7f2 TL |
504 | |
505 | RGWObjVersionTracker objv_tracker; | |
506 | rgw_pubsub_bucket_topics bucket_topics; | |
507 | ||
508 | ret = read_topics(&bucket_topics, &objv_tracker); | |
eafe8130 TL |
509 | if (ret < 0) { |
510 | ldout(store->ctx(), 1) << "ERROR: failed to read topics from bucket '" << | |
511 | bucket.name << "': ret=" << ret << dendl; | |
11fdf7f2 TL |
512 | return ret; |
513 | } | |
eafe8130 TL |
514 | ldout(store->ctx(), 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" << |
515 | bucket.name << "'" << dendl; | |
11fdf7f2 TL |
516 | |
517 | auto& topic_filter = bucket_topics.topics[topic_name]; | |
518 | topic_filter.topic = user_topic_info.topic; | |
519 | topic_filter.events = events; | |
eafe8130 TL |
520 | topic_filter.s3_id = notif_name; |
521 | if (s3_filter) { | |
522 | topic_filter.s3_filter = *s3_filter; | |
523 | } | |
11fdf7f2 TL |
524 | |
525 | ret = write_topics(bucket_topics, &objv_tracker); | |
526 | if (ret < 0) { | |
eafe8130 | 527 | ldout(store->ctx(), 1) << "ERROR: failed to write topics to bucket '" << bucket.name << "': ret=" << ret << dendl; |
11fdf7f2 TL |
528 | return ret; |
529 | } | |
eafe8130 TL |
530 | |
531 | ldout(store->ctx(), 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket.name << "'" << dendl; | |
11fdf7f2 TL |
532 | |
533 | return 0; | |
534 | } | |
535 | ||
536 | int RGWUserPubSub::Bucket::remove_notification(const string& topic_name) | |
537 | { | |
538 | rgw_pubsub_topic_subs user_topic_info; | |
9f95a23c | 539 | rgw::sal::RGWRadosStore *store = ps->store; |
11fdf7f2 TL |
540 | |
541 | int ret = ps->get_topic(topic_name, &user_topic_info); | |
542 | if (ret < 0) { | |
eafe8130 | 543 | ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl; |
11fdf7f2 TL |
544 | return ret; |
545 | } | |
546 | ||
547 | RGWObjVersionTracker objv_tracker; | |
548 | rgw_pubsub_bucket_topics bucket_topics; | |
549 | ||
550 | ret = read_topics(&bucket_topics, &objv_tracker); | |
eafe8130 TL |
551 | if (ret < 0) { |
552 | ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; | |
11fdf7f2 TL |
553 | return ret; |
554 | } | |
555 | ||
556 | bucket_topics.topics.erase(topic_name); | |
557 | ||
558 | ret = write_topics(bucket_topics, &objv_tracker); | |
559 | if (ret < 0) { | |
eafe8130 | 560 | ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
561 | return ret; |
562 | } | |
563 | ||
564 | return 0; | |
565 | } | |
566 | ||
eafe8130 | 567 | int RGWUserPubSub::create_topic(const string& name) { |
9f95a23c | 568 | return create_topic(name, rgw_pubsub_sub_dest(), "", ""); |
eafe8130 TL |
569 | } |
570 | ||
9f95a23c | 571 | int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data) { |
11fdf7f2 TL |
572 | RGWObjVersionTracker objv_tracker; |
573 | rgw_pubsub_user_topics topics; | |
574 | ||
575 | int ret = read_user_topics(&topics, &objv_tracker); | |
576 | if (ret < 0 && ret != -ENOENT) { | |
eafe8130 TL |
577 | // its not an error if not topics exist, we create one |
578 | ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; | |
11fdf7f2 TL |
579 | return ret; |
580 | } | |
eafe8130 | 581 | |
11fdf7f2 TL |
582 | rgw_pubsub_topic_subs& new_topic = topics.topics[name]; |
583 | new_topic.topic.user = user; | |
584 | new_topic.topic.name = name; | |
eafe8130 TL |
585 | new_topic.topic.dest = dest; |
586 | new_topic.topic.arn = arn; | |
9f95a23c | 587 | new_topic.topic.opaque_data = opaque_data; |
11fdf7f2 TL |
588 | |
589 | ret = write_user_topics(topics, &objv_tracker); | |
590 | if (ret < 0) { | |
eafe8130 | 591 | ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
592 | return ret; |
593 | } | |
594 | ||
595 | return 0; | |
596 | } | |
597 | ||
598 | int RGWUserPubSub::remove_topic(const string& name) | |
599 | { | |
600 | RGWObjVersionTracker objv_tracker; | |
601 | rgw_pubsub_user_topics topics; | |
602 | ||
603 | int ret = read_user_topics(&topics, &objv_tracker); | |
604 | if (ret < 0 && ret != -ENOENT) { | |
eafe8130 | 605 | ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 | 606 | return ret; |
eafe8130 TL |
607 | } else if (ret == -ENOENT) { |
608 | // its not an error if no topics exist, just a no-op | |
609 | ldout(store->ctx(), 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret << dendl; | |
610 | return 0; | |
11fdf7f2 TL |
611 | } |
612 | ||
613 | topics.topics.erase(name); | |
614 | ||
615 | ret = write_user_topics(topics, &objv_tracker); | |
616 | if (ret < 0) { | |
eafe8130 | 617 | ldout(store->ctx(), 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
618 | return ret; |
619 | } | |
620 | ||
621 | return 0; | |
622 | } | |
623 | ||
624 | int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker) | |
625 | { | |
626 | int ret = ps->read(sub_meta_obj, result, objv_tracker); | |
627 | if (ret < 0 && ret != -ENOENT) { | |
eafe8130 | 628 | ldout(ps->store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
629 | return ret; |
630 | } | |
631 | return 0; | |
632 | } | |
633 | ||
634 | int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker) | |
635 | { | |
636 | int ret = ps->write(sub_meta_obj, sub_conf, objv_tracker); | |
637 | if (ret < 0) { | |
eafe8130 | 638 | ldout(ps->store->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
639 | return ret; |
640 | } | |
641 | ||
642 | return 0; | |
643 | } | |
644 | ||
645 | int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker) | |
646 | { | |
647 | int ret = ps->remove(sub_meta_obj, objv_tracker); | |
648 | if (ret < 0) { | |
eafe8130 | 649 | ldout(ps->store->ctx(), 1) << "ERROR: failed to remove subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
650 | return ret; |
651 | } | |
652 | ||
653 | return 0; | |
654 | } | |
655 | ||
656 | int RGWUserPubSub::Sub::get_conf(rgw_pubsub_sub_config *result) | |
657 | { | |
658 | return read_sub(result, nullptr); | |
659 | } | |
660 | ||
eafe8130 | 661 | int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, const std::string& s3_id) |
11fdf7f2 TL |
662 | { |
663 | RGWObjVersionTracker user_objv_tracker; | |
664 | rgw_pubsub_user_topics topics; | |
9f95a23c | 665 | rgw::sal::RGWRadosStore *store = ps->store; |
11fdf7f2 TL |
666 | |
667 | int ret = ps->read_user_topics(&topics, &user_objv_tracker); | |
668 | if (ret < 0) { | |
eafe8130 TL |
669 | ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
670 | return ret != -ENOENT ? ret : -EINVAL; | |
11fdf7f2 TL |
671 | } |
672 | ||
673 | auto iter = topics.topics.find(topic); | |
674 | if (iter == topics.topics.end()) { | |
eafe8130 TL |
675 | ldout(store->ctx(), 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl; |
676 | return -EINVAL; | |
11fdf7f2 TL |
677 | } |
678 | ||
679 | auto& t = iter->second; | |
680 | ||
681 | rgw_pubsub_sub_config sub_conf; | |
682 | ||
683 | sub_conf.user = ps->user; | |
684 | sub_conf.name = sub; | |
685 | sub_conf.topic = topic; | |
686 | sub_conf.dest = dest; | |
eafe8130 | 687 | sub_conf.s3_id = s3_id; |
11fdf7f2 TL |
688 | |
689 | t.subs.insert(sub); | |
690 | ||
691 | ret = ps->write_user_topics(topics, &user_objv_tracker); | |
692 | if (ret < 0) { | |
eafe8130 | 693 | ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
694 | return ret; |
695 | } | |
696 | ||
697 | ret = write_sub(sub_conf, nullptr); | |
698 | if (ret < 0) { | |
eafe8130 | 699 | ldout(store->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
700 | return ret; |
701 | } | |
702 | return 0; | |
703 | } | |
704 | ||
705 | int RGWUserPubSub::Sub::unsubscribe(const string& _topic) | |
706 | { | |
707 | string topic = _topic; | |
708 | RGWObjVersionTracker sobjv_tracker; | |
9f95a23c | 709 | rgw::sal::RGWRadosStore *store = ps->store; |
11fdf7f2 TL |
710 | |
711 | if (topic.empty()) { | |
712 | rgw_pubsub_sub_config sub_conf; | |
713 | int ret = read_sub(&sub_conf, &sobjv_tracker); | |
714 | if (ret < 0) { | |
eafe8130 | 715 | ldout(store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
716 | return ret; |
717 | } | |
718 | topic = sub_conf.topic; | |
719 | } | |
720 | ||
721 | RGWObjVersionTracker objv_tracker; | |
722 | rgw_pubsub_user_topics topics; | |
723 | ||
724 | int ret = ps->read_user_topics(&topics, &objv_tracker); | |
725 | if (ret < 0) { | |
eafe8130 TL |
726 | // not an error - could be that topic was already deleted |
727 | ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl; | |
728 | } else { | |
11fdf7f2 TL |
729 | auto iter = topics.topics.find(topic); |
730 | if (iter != topics.topics.end()) { | |
731 | auto& t = iter->second; | |
732 | ||
733 | t.subs.erase(sub); | |
734 | ||
735 | ret = ps->write_user_topics(topics, &objv_tracker); | |
736 | if (ret < 0) { | |
eafe8130 | 737 | ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
738 | return ret; |
739 | } | |
740 | } | |
741 | } | |
742 | ||
743 | ret = remove_sub(&sobjv_tracker); | |
744 | if (ret < 0) { | |
eafe8130 | 745 | ldout(store->ctx(), 1) << "ERROR: failed to delete subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
746 | return ret; |
747 | } | |
748 | return 0; | |
749 | } | |
750 | ||
eafe8130 TL |
751 | template<typename EventType> |
752 | void RGWUserPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const | |
11fdf7f2 TL |
753 | { |
754 | encode_json("next_marker", next_marker, f); | |
755 | encode_json("is_truncated", is_truncated, f); | |
756 | ||
eafe8130 | 757 | Formatter::ArraySection s(*f, EventType::json_type_plural); |
11fdf7f2 | 758 | for (auto& event : events) { |
92f5a8d4 | 759 | encode_json("", event, f); |
11fdf7f2 TL |
760 | } |
761 | } | |
762 | ||
eafe8130 TL |
763 | template<typename EventType> |
764 | int RGWUserPubSub::SubWithEvents<EventType>::list_events(const string& marker, int max_events) | |
11fdf7f2 | 765 | { |
9f95a23c | 766 | RGWRados *store = ps->store->getRados(); |
11fdf7f2 TL |
767 | rgw_pubsub_sub_config sub_conf; |
768 | int ret = get_conf(&sub_conf); | |
769 | if (ret < 0) { | |
eafe8130 | 770 | ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl; |
11fdf7f2 TL |
771 | return ret; |
772 | } | |
773 | ||
774 | RGWBucketInfo bucket_info; | |
775 | string tenant; | |
9f95a23c | 776 | ret = store->get_bucket_info(&store->svc, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr); |
11fdf7f2 | 777 | if (ret == -ENOENT) { |
eafe8130 | 778 | list.is_truncated = false; |
11fdf7f2 TL |
779 | return 0; |
780 | } | |
781 | if (ret < 0) { | |
eafe8130 | 782 | ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; |
11fdf7f2 TL |
783 | return ret; |
784 | } | |
785 | ||
786 | RGWRados::Bucket target(store, bucket_info); | |
787 | RGWRados::Bucket::List list_op(&target); | |
788 | ||
789 | list_op.params.prefix = sub_conf.dest.oid_prefix; | |
790 | list_op.params.marker = marker; | |
791 | ||
eafe8130 | 792 | std::vector<rgw_bucket_dir_entry> objs; |
11fdf7f2 | 793 | |
9f95a23c | 794 | ret = list_op.list_objects(max_events, &objs, nullptr, &list.is_truncated, null_yield); |
11fdf7f2 | 795 | if (ret < 0) { |
eafe8130 | 796 | ldout(store->ctx(), 1) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; |
11fdf7f2 TL |
797 | return ret; |
798 | } | |
eafe8130 TL |
799 | if (list.is_truncated) { |
800 | list.next_marker = list_op.get_next_marker().name; | |
11fdf7f2 TL |
801 | } |
802 | ||
803 | for (auto& obj : objs) { | |
804 | bufferlist bl64; | |
805 | bufferlist bl; | |
806 | bl64.append(obj.meta.user_data); | |
807 | try { | |
808 | bl.decode_base64(bl64); | |
809 | } catch (buffer::error& err) { | |
eafe8130 | 810 | ldout(store->ctx(), 1) << "ERROR: failed to event (not a valid base64)" << dendl; |
11fdf7f2 TL |
811 | continue; |
812 | } | |
eafe8130 | 813 | EventType event; |
11fdf7f2 TL |
814 | |
815 | auto iter = bl.cbegin(); | |
816 | try { | |
817 | decode(event, iter); | |
818 | } catch (buffer::error& err) { | |
eafe8130 | 819 | ldout(store->ctx(), 1) << "ERROR: failed to decode event" << dendl; |
11fdf7f2 TL |
820 | continue; |
821 | }; | |
822 | ||
eafe8130 | 823 | list.events.push_back(event); |
11fdf7f2 TL |
824 | } |
825 | return 0; | |
826 | } | |
827 | ||
eafe8130 TL |
828 | template<typename EventType> |
829 | int RGWUserPubSub::SubWithEvents<EventType>::remove_event(const string& event_id) | |
11fdf7f2 | 830 | { |
9f95a23c | 831 | rgw::sal::RGWRadosStore *store = ps->store; |
11fdf7f2 TL |
832 | rgw_pubsub_sub_config sub_conf; |
833 | int ret = get_conf(&sub_conf); | |
834 | if (ret < 0) { | |
eafe8130 | 835 | ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl; |
11fdf7f2 TL |
836 | return ret; |
837 | } | |
838 | ||
839 | RGWBucketInfo bucket_info; | |
840 | string tenant; | |
9f95a23c | 841 | ret = store->getRados()->get_bucket_info(store->svc(), tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr); |
11fdf7f2 | 842 | if (ret < 0) { |
eafe8130 | 843 | ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; |
11fdf7f2 TL |
844 | return ret; |
845 | } | |
846 | ||
847 | rgw_bucket& bucket = bucket_info.bucket; | |
848 | ||
849 | RGWObjectCtx obj_ctx(store); | |
850 | rgw_obj obj(bucket, sub_conf.dest.oid_prefix + event_id); | |
851 | ||
852 | obj_ctx.set_atomic(obj); | |
853 | ||
9f95a23c | 854 | RGWRados::Object del_target(store->getRados(), bucket_info, obj_ctx, obj); |
11fdf7f2 TL |
855 | RGWRados::Object::Delete del_op(&del_target); |
856 | ||
857 | del_op.params.bucket_owner = bucket_info.owner; | |
858 | del_op.params.versioning_status = bucket_info.versioning_status(); | |
859 | ||
9f95a23c | 860 | ret = del_op.delete_obj(null_yield); |
11fdf7f2 | 861 | if (ret < 0) { |
eafe8130 | 862 | ldout(store->ctx(), 1) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl; |
11fdf7f2 TL |
863 | } |
864 | return 0; | |
865 | } | |
eafe8130 | 866 | |
9f95a23c TL |
867 | void RGWUserPubSub::get_user_meta_obj(rgw_raw_obj *obj) const { |
868 | *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, user_meta_oid()); | |
869 | } | |
870 | ||
871 | void RGWUserPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const { | |
872 | *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket)); | |
873 | } | |
874 | ||
875 | void RGWUserPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const { | |
876 | *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name)); | |
877 | } | |
878 | ||
eafe8130 TL |
879 | template<typename EventType> |
880 | void RGWUserPubSub::SubWithEvents<EventType>::dump(Formatter* f) const { | |
881 | list.dump(f); | |
882 | } | |
883 | ||
884 | // explicit instantiation for the only two possible types | |
885 | // no need to move implementation to header | |
886 | template class RGWUserPubSub::SubWithEvents<rgw_pubsub_event>; | |
887 | template class RGWUserPubSub::SubWithEvents<rgw_pubsub_s3_record>; | |
888 |