]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #include "services/svc_zone.h" | |
11fdf7f2 | 5 | #include "rgw_b64.h" |
9f95a23c | 6 | #include "rgw_sal.h" |
11fdf7f2 TL |
7 | #include "rgw_pubsub.h" |
8 | #include "rgw_tools.h" | |
eafe8130 TL |
9 | #include "rgw_xml.h" |
10 | #include "rgw_arn.h" | |
11 | #include "rgw_pubsub_push.h" | |
eafe8130 TL |
12 | #include <regex> |
13 | #include <algorithm> | |
11fdf7f2 TL |
14 | |
15 | #define dout_subsys ceph_subsys_rgw | |
16 | ||
92f5a8d4 TL |
17 | void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) { |
18 | char buf[64]; | |
19 | const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str()); | |
20 | if (len > 0) { | |
21 | id.assign(buf, len); | |
22 | } | |
23 | } | |
24 | ||
aee94f69 TL |
25 | void rgw_s3_key_filter::dump(Formatter *f) const { |
26 | if (!prefix_rule.empty()) { | |
27 | f->open_object_section("FilterRule"); | |
28 | ::encode_json("Name", "prefix", f); | |
29 | ::encode_json("Value", prefix_rule, f); | |
30 | f->close_section(); | |
31 | } | |
32 | if (!suffix_rule.empty()) { | |
33 | f->open_object_section("FilterRule"); | |
34 | ::encode_json("Name", "suffix", f); | |
35 | ::encode_json("Value", suffix_rule, f); | |
36 | f->close_section(); | |
37 | } | |
38 | if (!regex_rule.empty()) { | |
39 | f->open_object_section("FilterRule"); | |
40 | ::encode_json("Name", "regex", f); | |
41 | ::encode_json("Value", regex_rule, f); | |
42 | f->close_section(); | |
43 | } | |
44 | } | |
45 | ||
eafe8130 TL |
46 | bool rgw_s3_key_filter::decode_xml(XMLObj* obj) { |
47 | XMLObjIter iter = obj->find("FilterRule"); | |
48 | XMLObj *o; | |
49 | ||
50 | const auto throw_if_missing = true; | |
51 | auto prefix_not_set = true; | |
52 | auto suffix_not_set = true; | |
53 | auto regex_not_set = true; | |
54 | std::string name; | |
55 | ||
56 | while ((o = iter.get_next())) { | |
57 | RGWXMLDecoder::decode_xml("Name", name, o, throw_if_missing); | |
58 | if (name == "prefix" && prefix_not_set) { | |
59 | prefix_not_set = false; | |
60 | RGWXMLDecoder::decode_xml("Value", prefix_rule, o, throw_if_missing); | |
61 | } else if (name == "suffix" && suffix_not_set) { | |
62 | suffix_not_set = false; | |
63 | RGWXMLDecoder::decode_xml("Value", suffix_rule, o, throw_if_missing); | |
64 | } else if (name == "regex" && regex_not_set) { | |
65 | regex_not_set = false; | |
66 | RGWXMLDecoder::decode_xml("Value", regex_rule, o, throw_if_missing); | |
67 | } else { | |
68 | throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name + "'"); | |
69 | } | |
70 | } | |
71 | return true; | |
72 | } | |
73 | ||
74 | void rgw_s3_key_filter::dump_xml(Formatter *f) const { | |
75 | if (!prefix_rule.empty()) { | |
76 | f->open_object_section("FilterRule"); | |
77 | ::encode_xml("Name", "prefix", f); | |
78 | ::encode_xml("Value", prefix_rule, f); | |
79 | f->close_section(); | |
80 | } | |
81 | if (!suffix_rule.empty()) { | |
82 | f->open_object_section("FilterRule"); | |
83 | ::encode_xml("Name", "suffix", f); | |
84 | ::encode_xml("Value", suffix_rule, f); | |
85 | f->close_section(); | |
86 | } | |
87 | if (!regex_rule.empty()) { | |
88 | f->open_object_section("FilterRule"); | |
89 | ::encode_xml("Name", "regex", f); | |
90 | ::encode_xml("Value", regex_rule, f); | |
91 | f->close_section(); | |
92 | } | |
93 | } | |
94 | ||
95 | bool rgw_s3_key_filter::has_content() const { | |
96 | return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty()); | |
97 | } | |
98 | ||
aee94f69 TL |
99 | void rgw_s3_key_value_filter::dump(Formatter *f) const { |
100 | for (const auto& key_value : kv) { | |
101 | f->open_object_section("FilterRule"); | |
102 | ::encode_json("Name", key_value.first, f); | |
103 | ::encode_json("Value", key_value.second, f); | |
104 | f->close_section(); | |
105 | } | |
106 | } | |
107 | ||
9f95a23c | 108 | bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) { |
f67539c2 | 109 | kv.clear(); |
eafe8130 TL |
110 | XMLObjIter iter = obj->find("FilterRule"); |
111 | XMLObj *o; | |
112 | ||
113 | const auto throw_if_missing = true; | |
114 | ||
115 | std::string key; | |
116 | std::string value; | |
117 | ||
118 | while ((o = iter.get_next())) { | |
119 | RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing); | |
120 | RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing); | |
f67539c2 | 121 | kv.emplace(key, value); |
eafe8130 TL |
122 | } |
123 | return true; | |
124 | } | |
125 | ||
9f95a23c | 126 | void rgw_s3_key_value_filter::dump_xml(Formatter *f) const { |
f67539c2 | 127 | for (const auto& key_value : kv) { |
eafe8130 TL |
128 | f->open_object_section("FilterRule"); |
129 | ::encode_xml("Name", key_value.first, f); | |
130 | ::encode_xml("Value", key_value.second, f); | |
131 | f->close_section(); | |
132 | } | |
133 | } | |
134 | ||
9f95a23c | 135 | bool rgw_s3_key_value_filter::has_content() const { |
f67539c2 | 136 | return !kv.empty(); |
eafe8130 TL |
137 | } |
138 | ||
aee94f69 TL |
139 | void rgw_s3_filter::dump(Formatter *f) const { |
140 | encode_json("S3Key", key_filter, f); | |
141 | encode_json("S3Metadata", metadata_filter, f); | |
142 | encode_json("S3Tags", tag_filter, f); | |
143 | } | |
144 | ||
eafe8130 TL |
145 | bool rgw_s3_filter::decode_xml(XMLObj* obj) { |
146 | RGWXMLDecoder::decode_xml("S3Key", key_filter, obj); | |
147 | RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj); | |
9f95a23c | 148 | RGWXMLDecoder::decode_xml("S3Tags", tag_filter, obj); |
eafe8130 TL |
149 | return true; |
150 | } | |
151 | ||
152 | void rgw_s3_filter::dump_xml(Formatter *f) const { | |
153 | if (key_filter.has_content()) { | |
154 | ::encode_xml("S3Key", key_filter, f); | |
155 | } | |
156 | if (metadata_filter.has_content()) { | |
157 | ::encode_xml("S3Metadata", metadata_filter, f); | |
158 | } | |
9f95a23c TL |
159 | if (tag_filter.has_content()) { |
160 | ::encode_xml("S3Tags", tag_filter, f); | |
161 | } | |
eafe8130 TL |
162 | } |
163 | ||
164 | bool rgw_s3_filter::has_content() const { | |
165 | return key_filter.has_content() || | |
9f95a23c TL |
166 | metadata_filter.has_content() || |
167 | tag_filter.has_content(); | |
eafe8130 TL |
168 | } |
169 | ||
170 | bool match(const rgw_s3_key_filter& filter, const std::string& key) { | |
171 | const auto key_size = key.size(); | |
172 | const auto prefix_size = filter.prefix_rule.size(); | |
173 | if (prefix_size != 0) { | |
174 | // prefix rule exists | |
175 | if (prefix_size > key_size) { | |
176 | // if prefix is longer than key, we fail | |
177 | return false; | |
178 | } | |
179 | if (!std::equal(filter.prefix_rule.begin(), filter.prefix_rule.end(), key.begin())) { | |
180 | return false; | |
181 | } | |
182 | } | |
183 | const auto suffix_size = filter.suffix_rule.size(); | |
184 | if (suffix_size != 0) { | |
185 | // suffix rule exists | |
186 | if (suffix_size > key_size) { | |
187 | // if suffix is longer than key, we fail | |
188 | return false; | |
189 | } | |
190 | if (!std::equal(filter.suffix_rule.begin(), filter.suffix_rule.end(), (key.end() - suffix_size))) { | |
191 | return false; | |
192 | } | |
193 | } | |
194 | if (!filter.regex_rule.empty()) { | |
195 | // TODO add regex chaching in the filter | |
196 | const std::regex base_regex(filter.regex_rule); | |
197 | if (!std::regex_match(key, base_regex)) { | |
198 | return false; | |
199 | } | |
200 | } | |
201 | return true; | |
202 | } | |
203 | ||
f67539c2 | 204 | bool match(const rgw_s3_key_value_filter& filter, const KeyValueMap& kv) { |
9f95a23c TL |
205 | // all filter pairs must exist with the same value in the object's metadata/tags |
206 | // object metadata/tags may include items not in the filter | |
f67539c2 | 207 | return std::includes(kv.begin(), kv.end(), filter.kv.begin(), filter.kv.end()); |
eafe8130 TL |
208 | } |
209 | ||
20effc67 TL |
210 | bool match(const rgw_s3_key_value_filter& filter, const KeyMultiValueMap& kv) { |
211 | // all filter pairs must exist with the same value in the object's metadata/tags | |
212 | // object metadata/tags may include items not in the filter | |
213 | for (auto& filter : filter.kv) { | |
214 | auto result = kv.equal_range(filter.first); | |
1e59de90 | 215 | if (std::any_of(result.first, result.second, [&filter](const std::pair<std::string, std::string>& p) { return p.second == filter.second;})) |
20effc67 TL |
216 | continue; |
217 | else | |
218 | return false; | |
219 | } | |
220 | return true; | |
221 | } | |
222 | ||
eafe8130 TL |
223 | bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) { |
224 | // if event list exists, and none of the events in the list matches the event type, filter the message | |
225 | if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) { | |
226 | return false; | |
227 | } | |
228 | return true; | |
229 | } | |
230 | ||
1e59de90 | 231 | void do_decode_xml_obj(rgw::notify::EventTypeList& l, const std::string& name, XMLObj *obj) { |
eafe8130 TL |
232 | l.clear(); |
233 | ||
234 | XMLObjIter iter = obj->find(name); | |
235 | XMLObj *o; | |
236 | ||
237 | while ((o = iter.get_next())) { | |
238 | std::string val; | |
239 | decode_xml_obj(val, o); | |
240 | l.push_back(rgw::notify::from_string(val)); | |
241 | } | |
242 | } | |
243 | ||
244 | bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) { | |
245 | const auto throw_if_missing = true; | |
246 | RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing); | |
247 | ||
248 | RGWXMLDecoder::decode_xml("Topic", topic_arn, obj, throw_if_missing); | |
249 | ||
250 | RGWXMLDecoder::decode_xml("Filter", filter, obj); | |
251 | ||
252 | do_decode_xml_obj(events, "Event", obj); | |
253 | if (events.empty()) { | |
254 | // if no events are provided, we assume all events | |
255 | events.push_back(rgw::notify::ObjectCreated); | |
256 | events.push_back(rgw::notify::ObjectRemoved); | |
257 | } | |
258 | return true; | |
259 | } | |
260 | ||
261 | void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const { | |
262 | ::encode_xml("Id", id, f); | |
263 | ::encode_xml("Topic", topic_arn.c_str(), f); | |
264 | if (filter.has_content()) { | |
265 | ::encode_xml("Filter", filter, f); | |
266 | } | |
267 | for (const auto& event : events) { | |
268 | ::encode_xml("Event", rgw::notify::to_string(event), f); | |
269 | } | |
270 | } | |
271 | ||
272 | bool rgw_pubsub_s3_notifications::decode_xml(XMLObj *obj) { | |
273 | do_decode_xml_obj(list, "TopicConfiguration", obj); | |
eafe8130 TL |
274 | return true; |
275 | } | |
276 | ||
277 | rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter) : | |
278 | id(topic_filter.s3_id), events(topic_filter.events), topic_arn(topic_filter.topic.arn), filter(topic_filter.s3_filter) {} | |
279 | ||
280 | void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const { | |
281 | do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f); | |
282 | } | |
283 | ||
f67539c2 | 284 | void rgw_pubsub_s3_event::dump(Formatter *f) const { |
eafe8130 TL |
285 | encode_json("eventVersion", eventVersion, f); |
286 | encode_json("eventSource", eventSource, f); | |
287 | encode_json("awsRegion", awsRegion, f); | |
288 | utime_t ut(eventTime); | |
289 | encode_json("eventTime", ut, f); | |
290 | encode_json("eventName", eventName, f); | |
291 | { | |
292 | Formatter::ObjectSection s(*f, "userIdentity"); | |
293 | encode_json("principalId", userIdentity, f); | |
294 | } | |
295 | { | |
296 | Formatter::ObjectSection s(*f, "requestParameters"); | |
297 | encode_json("sourceIPAddress", sourceIPAddress, f); | |
298 | } | |
299 | { | |
300 | Formatter::ObjectSection s(*f, "responseElements"); | |
301 | encode_json("x-amz-request-id", x_amz_request_id, f); | |
302 | encode_json("x-amz-id-2", x_amz_id_2, f); | |
303 | } | |
304 | { | |
305 | Formatter::ObjectSection s(*f, "s3"); | |
306 | encode_json("s3SchemaVersion", s3SchemaVersion, f); | |
307 | encode_json("configurationId", configurationId, f); | |
308 | { | |
309 | Formatter::ObjectSection sub_s(*f, "bucket"); | |
310 | encode_json("name", bucket_name, f); | |
311 | { | |
312 | Formatter::ObjectSection sub_sub_s(*f, "ownerIdentity"); | |
313 | encode_json("principalId", bucket_ownerIdentity, f); | |
314 | } | |
315 | encode_json("arn", bucket_arn, f); | |
316 | encode_json("id", bucket_id, f); | |
317 | } | |
318 | { | |
319 | Formatter::ObjectSection sub_s(*f, "object"); | |
320 | encode_json("key", object_key, f); | |
321 | encode_json("size", object_size, f); | |
a4b75251 | 322 | encode_json("eTag", object_etag, f); |
eafe8130 TL |
323 | encode_json("versionId", object_versionId, f); |
324 | encode_json("sequencer", object_sequencer, f); | |
325 | encode_json("metadata", x_meta_map, f); | |
9f95a23c | 326 | encode_json("tags", tags, f); |
eafe8130 TL |
327 | } |
328 | } | |
329 | encode_json("eventId", id, f); | |
9f95a23c | 330 | encode_json("opaqueData", opaque_data, f); |
eafe8130 | 331 | } |
11fdf7f2 | 332 | |
11fdf7f2 TL |
333 | void rgw_pubsub_topic::dump(Formatter *f) const |
334 | { | |
335 | encode_json("user", user, f); | |
336 | encode_json("name", name, f); | |
eafe8130 TL |
337 | encode_json("dest", dest, f); |
338 | encode_json("arn", arn, f); | |
9f95a23c | 339 | encode_json("opaqueData", opaque_data, f); |
eafe8130 TL |
340 | } |
341 | ||
342 | void rgw_pubsub_topic::dump_xml(Formatter *f) const | |
343 | { | |
344 | encode_xml("User", user, f); | |
345 | encode_xml("Name", name, f); | |
346 | encode_xml("EndPoint", dest, f); | |
347 | encode_xml("TopicArn", arn, f); | |
9f95a23c | 348 | encode_xml("OpaqueData", opaque_data, f); |
eafe8130 TL |
349 | } |
350 | ||
f67539c2 TL |
351 | void encode_xml_key_value_entry(const std::string& key, const std::string& value, Formatter *f) { |
352 | f->open_object_section("entry"); | |
353 | encode_xml("key", key, f); | |
354 | encode_xml("value", value, f); | |
355 | f->close_section(); // entry | |
356 | } | |
357 | ||
358 | void rgw_pubsub_topic::dump_xml_as_attributes(Formatter *f) const | |
359 | { | |
360 | f->open_array_section("Attributes"); | |
361 | std::string str_user; | |
362 | user.to_str(str_user); | |
363 | encode_xml_key_value_entry("User", str_user, f); | |
364 | encode_xml_key_value_entry("Name", name, f); | |
365 | encode_xml_key_value_entry("EndPoint", dest.to_json_str(), f); | |
366 | encode_xml_key_value_entry("TopicArn", arn, f); | |
367 | encode_xml_key_value_entry("OpaqueData", opaque_data, f); | |
368 | f->close_section(); // Attributes | |
369 | } | |
370 | ||
eafe8130 TL |
371 | void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f) |
372 | { | |
373 | f->open_array_section(name); | |
374 | for (auto iter = l.cbegin(); iter != l.cend(); ++iter) { | |
1e59de90 | 375 | f->dump_string("obj", rgw::notify::to_string(*iter)); |
eafe8130 TL |
376 | } |
377 | f->close_section(); | |
11fdf7f2 TL |
378 | } |
379 | ||
380 | void rgw_pubsub_topic_filter::dump(Formatter *f) const | |
381 | { | |
aee94f69 TL |
382 | encode_json("TopicArn", topic.arn, f); |
383 | encode_json("Id", s3_id, f); | |
384 | encode_json("Events", events, f); | |
385 | encode_json("Filter", s3_filter, f); | |
11fdf7f2 TL |
386 | } |
387 | ||
11fdf7f2 TL |
388 | void rgw_pubsub_bucket_topics::dump(Formatter *f) const |
389 | { | |
aee94f69 | 390 | Formatter::ArraySection s(*f, "notifications"); |
11fdf7f2 TL |
391 | for (auto& t : topics) { |
392 | encode_json(t.first.c_str(), t.second, f); | |
393 | } | |
394 | } | |
395 | ||
f67539c2 | 396 | void rgw_pubsub_topics::dump(Formatter *f) const |
11fdf7f2 TL |
397 | { |
398 | Formatter::ArraySection s(*f, "topics"); | |
399 | for (auto& t : topics) { | |
aee94f69 TL |
400 | auto& topic = t.second; |
401 | if (topic.name == topic.dest.arn_topic) { | |
402 | encode_json(t.first.c_str(), topic, f); | |
403 | } | |
11fdf7f2 TL |
404 | } |
405 | } | |
406 | ||
f67539c2 | 407 | void rgw_pubsub_topics::dump_xml(Formatter *f) const |
eafe8130 TL |
408 | { |
409 | for (auto& t : topics) { | |
1e59de90 | 410 | encode_xml("member", t.second, f); |
eafe8130 TL |
411 | } |
412 | } | |
413 | ||
1e59de90 | 414 | void rgw_pubsub_dest::dump(Formatter *f) const |
11fdf7f2 | 415 | { |
11fdf7f2 | 416 | encode_json("push_endpoint", push_endpoint, f); |
eafe8130 TL |
417 | encode_json("push_endpoint_args", push_endpoint_args, f); |
418 | encode_json("push_endpoint_topic", arn_topic, f); | |
f67539c2 TL |
419 | encode_json("stored_secret", stored_secret, f); |
420 | encode_json("persistent", persistent, f); | |
eafe8130 TL |
421 | } |
422 | ||
1e59de90 | 423 | void rgw_pubsub_dest::dump_xml(Formatter *f) const |
eafe8130 TL |
424 | { |
425 | encode_xml("EndpointAddress", push_endpoint, f); | |
426 | encode_xml("EndpointArgs", push_endpoint_args, f); | |
427 | encode_xml("EndpointTopic", arn_topic, f); | |
f67539c2 TL |
428 | encode_xml("HasStoredSecret", stored_secret, f); |
429 | encode_xml("Persistent", persistent, f); | |
430 | } | |
431 | ||
1e59de90 | 432 | std::string rgw_pubsub_dest::to_json_str() const |
f67539c2 | 433 | { |
f67539c2 TL |
434 | JSONFormatter f; |
435 | f.open_object_section(""); | |
436 | encode_json("EndpointAddress", push_endpoint, &f); | |
437 | encode_json("EndpointArgs", push_endpoint_args, &f); | |
438 | encode_json("EndpointTopic", arn_topic, &f); | |
439 | encode_json("HasStoredSecret", stored_secret, &f); | |
440 | encode_json("Persistent", persistent, &f); | |
441 | f.close_section(); | |
442 | std::stringstream ss; | |
443 | f.flush(ss); | |
444 | return ss.str(); | |
11fdf7f2 TL |
445 | } |
446 | ||
1e59de90 TL |
447 | RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver, const std::string& _tenant) |
448 | : driver(_driver), tenant(_tenant) | |
449 | {} | |
11fdf7f2 | 450 | |
1e59de90 TL |
451 | int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, |
452 | RGWObjVersionTracker *objv_tracker, optional_yield y) const | |
11fdf7f2 | 453 | { |
1e59de90 | 454 | const int ret = driver->read_topics(tenant, result, objv_tracker, y, dpp); |
eafe8130 | 455 | if (ret < 0) { |
1e59de90 | 456 | ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
457 | return ret; |
458 | } | |
459 | return 0; | |
460 | } | |
461 | ||
b3b6e05e | 462 | int RGWPubSub::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics, |
1e59de90 | 463 | RGWObjVersionTracker *objv_tracker, optional_yield y) const |
11fdf7f2 | 464 | { |
1e59de90 | 465 | const int ret = driver->write_topics(tenant, topics, objv_tracker, y, dpp); |
11fdf7f2 | 466 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 467 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
468 | return ret; |
469 | } | |
470 | return 0; | |
471 | } | |
472 | ||
1e59de90 TL |
473 | int RGWPubSub::Bucket::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result, |
474 | RGWObjVersionTracker *objv_tracker, optional_yield y) const | |
11fdf7f2 | 475 | { |
1e59de90 | 476 | const int ret = bucket->read_topics(result, objv_tracker, y, dpp); |
11fdf7f2 | 477 | if (ret < 0 && ret != -ENOENT) { |
1e59de90 | 478 | ldpp_dout(dpp, 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
479 | return ret; |
480 | } | |
481 | return 0; | |
482 | } | |
483 | ||
b3b6e05e | 484 | int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics, |
f67539c2 | 485 | RGWObjVersionTracker *objv_tracker, |
1e59de90 | 486 | optional_yield y) const |
11fdf7f2 | 487 | { |
1e59de90 | 488 | const int ret = bucket->write_topics(topics, objv_tracker, y, dpp); |
11fdf7f2 | 489 | if (ret < 0) { |
1e59de90 | 490 | ldpp_dout(dpp, 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
491 | return ret; |
492 | } | |
493 | ||
494 | return 0; | |
495 | } | |
496 | ||
1e59de90 | 497 | int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const |
11fdf7f2 | 498 | { |
f67539c2 | 499 | rgw_pubsub_topics topics; |
1e59de90 | 500 | const int ret = read_topics(dpp, topics, nullptr, y); |
11fdf7f2 | 501 | if (ret < 0) { |
1e59de90 | 502 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
eafe8130 TL |
503 | return ret; |
504 | } | |
505 | ||
506 | auto iter = topics.topics.find(name); | |
507 | if (iter == topics.topics.end()) { | |
1e59de90 | 508 | ldpp_dout(dpp, 1) << "ERROR: topic not found" << dendl; |
eafe8130 TL |
509 | return -ENOENT; |
510 | } | |
511 | ||
1e59de90 | 512 | result = iter->second; |
eafe8130 TL |
513 | return 0; |
514 | } | |
515 | ||
aee94f69 TL |
516 | // from list of bucket topics, find the one that was auto-generated by a notification |
517 | auto find_unique_topic(const rgw_pubsub_bucket_topics &bucket_topics, const std::string ¬ification_id) { | |
518 | auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), | |
519 | [&](const auto& val) { return notification_id == val.second.s3_id; }); | |
520 | return it != bucket_topics.topics.end() ? | |
521 | std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second): | |
522 | std::nullopt; | |
523 | } | |
524 | ||
525 | int RGWPubSub::Bucket::get_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notification_id, | |
526 | rgw_pubsub_topic_filter& result, optional_yield y) const { | |
527 | rgw_pubsub_bucket_topics bucket_topics; | |
528 | const int ret = read_topics(dpp, bucket_topics, nullptr, y); | |
529 | if (ret < 0) { | |
530 | ldpp_dout(dpp, 1) << "ERROR: failed to read bucket_topics info: ret=" << ret << dendl; | |
531 | return ret; | |
532 | } | |
533 | ||
534 | auto iter = find_unique_topic(bucket_topics, notification_id); | |
535 | if (!iter) { | |
536 | ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl; | |
537 | return -ENOENT; | |
538 | } | |
539 | ||
540 | result = iter->get(); | |
541 | return 0; | |
542 | } | |
543 | ||
544 | ||
1e59de90 TL |
545 | int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, |
546 | const rgw::notify::EventTypeList& events, optional_yield y) const { | |
b3b6e05e | 547 | return create_notification(dpp, topic_name, events, std::nullopt, "", y); |
eafe8130 TL |
548 | } |
549 | ||
1e59de90 TL |
550 | int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, |
551 | const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const { | |
552 | rgw_pubsub_topic topic_info; | |
11fdf7f2 | 553 | |
1e59de90 | 554 | int ret = ps.get_topic(dpp, topic_name, topic_info, y); |
11fdf7f2 | 555 | if (ret < 0) { |
b3b6e05e | 556 | ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl; |
11fdf7f2 TL |
557 | return ret; |
558 | } | |
b3b6e05e | 559 | ldpp_dout(dpp, 20) << "successfully read topic '" << topic_name << "' info" << dendl; |
11fdf7f2 TL |
560 | |
561 | RGWObjVersionTracker objv_tracker; | |
562 | rgw_pubsub_bucket_topics bucket_topics; | |
563 | ||
1e59de90 | 564 | ret = read_topics(dpp, bucket_topics, &objv_tracker, y); |
eafe8130 | 565 | if (ret < 0) { |
b3b6e05e | 566 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics from bucket '" << |
1e59de90 | 567 | bucket->get_name() << "': ret=" << ret << dendl; |
11fdf7f2 TL |
568 | return ret; |
569 | } | |
b3b6e05e | 570 | ldpp_dout(dpp, 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" << |
1e59de90 | 571 | bucket->get_name() << "'" << dendl; |
11fdf7f2 TL |
572 | |
573 | auto& topic_filter = bucket_topics.topics[topic_name]; | |
1e59de90 | 574 | topic_filter.topic = topic_info; |
11fdf7f2 | 575 | topic_filter.events = events; |
eafe8130 TL |
576 | topic_filter.s3_id = notif_name; |
577 | if (s3_filter) { | |
578 | topic_filter.s3_filter = *s3_filter; | |
579 | } | |
11fdf7f2 | 580 | |
b3b6e05e | 581 | ret = write_topics(dpp, bucket_topics, &objv_tracker, y); |
11fdf7f2 | 582 | if (ret < 0) { |
1e59de90 | 583 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics to bucket '" << bucket->get_name() << "': ret=" << ret << dendl; |
11fdf7f2 TL |
584 | return ret; |
585 | } | |
eafe8130 | 586 | |
1e59de90 | 587 | ldpp_dout(dpp, 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket->get_name() << "'" << dendl; |
11fdf7f2 TL |
588 | |
589 | return 0; | |
590 | } | |
591 | ||
1e59de90 | 592 | int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y) const |
aee94f69 TL |
593 | { |
594 | return remove_notification_inner(dpp, topic_name, false, y); | |
595 | } | |
596 | ||
597 | int RGWPubSub::Bucket::remove_notification_inner(const DoutPrefixProvider *dpp, const std::string& notification_id, | |
598 | bool is_notification_id, optional_yield y) const | |
11fdf7f2 | 599 | { |
11fdf7f2 TL |
600 | RGWObjVersionTracker objv_tracker; |
601 | rgw_pubsub_bucket_topics bucket_topics; | |
602 | ||
1e59de90 | 603 | auto ret = read_topics(dpp, bucket_topics, &objv_tracker, y); |
eafe8130 | 604 | if (ret < 0) { |
b3b6e05e | 605 | ldpp_dout(dpp, 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
606 | return ret; |
607 | } | |
608 | ||
aee94f69 TL |
609 | |
610 | std::unique_ptr<std::string> topic_name = std::make_unique<std::string>(notification_id); | |
611 | if(is_notification_id) { | |
612 | auto iter = find_unique_topic(bucket_topics, notification_id); | |
613 | if (!iter) { | |
614 | ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl; | |
615 | return -ENOENT; | |
616 | } | |
617 | topic_name = std::make_unique<std::string>(iter->get().topic.name); | |
618 | } | |
619 | ||
620 | if (bucket_topics.topics.erase(*topic_name) == 0) { | |
1e59de90 TL |
621 | ldpp_dout(dpp, 1) << "INFO: no need to remove, topic does not exist" << dendl; |
622 | return 0; | |
623 | } | |
11fdf7f2 | 624 | |
522d829b TL |
625 | if (bucket_topics.topics.empty()) { |
626 | // no more topics - delete the notification object of the bucket | |
1e59de90 | 627 | ret = bucket->remove_topics(&objv_tracker, y, dpp); |
522d829b | 628 | if (ret < 0 && ret != -ENOENT) { |
20effc67 | 629 | ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl; |
522d829b TL |
630 | return ret; |
631 | } | |
632 | return 0; | |
633 | } | |
634 | ||
635 | // write back the notifications without the deleted one | |
b3b6e05e | 636 | ret = write_topics(dpp, bucket_topics, &objv_tracker, y); |
11fdf7f2 | 637 | if (ret < 0) { |
b3b6e05e | 638 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
639 | return ret; |
640 | } | |
641 | ||
642 | return 0; | |
643 | } | |
644 | ||
aee94f69 TL |
645 | int RGWPubSub::Bucket::remove_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notif_id, optional_yield y) const |
646 | { | |
647 | return remove_notification_inner(dpp, notif_id, true, y); | |
648 | } | |
649 | ||
1e59de90 | 650 | int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const |
f67539c2 TL |
651 | { |
652 | // get all topics on a bucket | |
653 | rgw_pubsub_bucket_topics bucket_topics; | |
1e59de90 | 654 | auto ret = get_topics(dpp, bucket_topics, y); |
f67539c2 | 655 | if (ret < 0 && ret != -ENOENT) { |
1e59de90 | 656 | ldpp_dout(dpp, 1) << "ERROR: failed to get list of topics from bucket '" << bucket->get_name() << "', ret=" << ret << dendl; |
f67539c2 TL |
657 | return ret ; |
658 | } | |
659 | ||
660 | // remove all auto-genrated topics | |
661 | for (const auto& topic : bucket_topics.topics) { | |
662 | const auto& topic_name = topic.first; | |
1e59de90 | 663 | ret = ps.remove_topic(dpp, topic_name, y); |
f67539c2 | 664 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 665 | ldpp_dout(dpp, 5) << "WARNING: failed to remove auto-generated topic '" << topic_name << "', ret=" << ret << dendl; |
f67539c2 TL |
666 | } |
667 | } | |
668 | ||
522d829b | 669 | // delete the notification object of the bucket |
1e59de90 | 670 | ret = bucket->remove_topics(nullptr, y, dpp); |
f67539c2 | 671 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 672 | ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl; |
f67539c2 TL |
673 | return ret; |
674 | } | |
675 | ||
676 | return 0; | |
eafe8130 TL |
677 | } |
678 | ||
1e59de90 TL |
679 | int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const { |
680 | return create_topic(dpp, name, rgw_pubsub_dest{}, "", "", y); | |
f67539c2 TL |
681 | } |
682 | ||
1e59de90 TL |
683 | int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_dest& dest, |
684 | const std::string& arn, const std::string& opaque_data, optional_yield y) const { | |
11fdf7f2 | 685 | RGWObjVersionTracker objv_tracker; |
f67539c2 | 686 | rgw_pubsub_topics topics; |
11fdf7f2 | 687 | |
1e59de90 | 688 | int ret = read_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 689 | if (ret < 0 && ret != -ENOENT) { |
eafe8130 | 690 | // its not an error if not topics exist, we create one |
b3b6e05e | 691 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
692 | return ret; |
693 | } | |
eafe8130 | 694 | |
1e59de90 TL |
695 | rgw_pubsub_topic& new_topic = topics.topics[name]; |
696 | new_topic.user = rgw_user("", tenant); | |
697 | new_topic.name = name; | |
698 | new_topic.dest = dest; | |
699 | new_topic.arn = arn; | |
700 | new_topic.opaque_data = opaque_data; | |
11fdf7f2 | 701 | |
b3b6e05e | 702 | ret = write_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 703 | if (ret < 0) { |
b3b6e05e | 704 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
705 | return ret; |
706 | } | |
707 | ||
708 | return 0; | |
709 | } | |
710 | ||
1e59de90 | 711 | int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const |
11fdf7f2 TL |
712 | { |
713 | RGWObjVersionTracker objv_tracker; | |
f67539c2 | 714 | rgw_pubsub_topics topics; |
11fdf7f2 | 715 | |
1e59de90 | 716 | int ret = read_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 717 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 718 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 | 719 | return ret; |
eafe8130 TL |
720 | } else if (ret == -ENOENT) { |
721 | // its not an error if no topics exist, just a no-op | |
b3b6e05e | 722 | ldpp_dout(dpp, 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret << dendl; |
eafe8130 | 723 | return 0; |
11fdf7f2 TL |
724 | } |
725 | ||
726 | topics.topics.erase(name); | |
727 | ||
b3b6e05e | 728 | ret = write_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 729 | if (ret < 0) { |
b3b6e05e | 730 | ldpp_dout(dpp, 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
731 | return ret; |
732 | } | |
733 | ||
734 | return 0; | |
735 | } | |
736 |