]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #include "services/svc_zone.h" | |
11fdf7f2 | 5 | #include "rgw_b64.h" |
9f95a23c | 6 | #include "rgw_sal.h" |
11fdf7f2 TL |
7 | #include "rgw_pubsub.h" |
8 | #include "rgw_tools.h" | |
eafe8130 TL |
9 | #include "rgw_xml.h" |
10 | #include "rgw_arn.h" | |
11 | #include "rgw_pubsub_push.h" | |
eafe8130 TL |
12 | #include <regex> |
13 | #include <algorithm> | |
11fdf7f2 TL |
14 | |
15 | #define dout_subsys ceph_subsys_rgw | |
16 | ||
92f5a8d4 TL |
17 | void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) { |
18 | char buf[64]; | |
19 | const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str()); | |
20 | if (len > 0) { | |
21 | id.assign(buf, len); | |
22 | } | |
23 | } | |
24 | ||
eafe8130 TL |
25 | bool rgw_s3_key_filter::decode_xml(XMLObj* obj) { |
26 | XMLObjIter iter = obj->find("FilterRule"); | |
27 | XMLObj *o; | |
28 | ||
29 | const auto throw_if_missing = true; | |
30 | auto prefix_not_set = true; | |
31 | auto suffix_not_set = true; | |
32 | auto regex_not_set = true; | |
33 | std::string name; | |
34 | ||
35 | while ((o = iter.get_next())) { | |
36 | RGWXMLDecoder::decode_xml("Name", name, o, throw_if_missing); | |
37 | if (name == "prefix" && prefix_not_set) { | |
38 | prefix_not_set = false; | |
39 | RGWXMLDecoder::decode_xml("Value", prefix_rule, o, throw_if_missing); | |
40 | } else if (name == "suffix" && suffix_not_set) { | |
41 | suffix_not_set = false; | |
42 | RGWXMLDecoder::decode_xml("Value", suffix_rule, o, throw_if_missing); | |
43 | } else if (name == "regex" && regex_not_set) { | |
44 | regex_not_set = false; | |
45 | RGWXMLDecoder::decode_xml("Value", regex_rule, o, throw_if_missing); | |
46 | } else { | |
47 | throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name + "'"); | |
48 | } | |
49 | } | |
50 | return true; | |
51 | } | |
52 | ||
53 | void rgw_s3_key_filter::dump_xml(Formatter *f) const { | |
54 | if (!prefix_rule.empty()) { | |
55 | f->open_object_section("FilterRule"); | |
56 | ::encode_xml("Name", "prefix", f); | |
57 | ::encode_xml("Value", prefix_rule, f); | |
58 | f->close_section(); | |
59 | } | |
60 | if (!suffix_rule.empty()) { | |
61 | f->open_object_section("FilterRule"); | |
62 | ::encode_xml("Name", "suffix", f); | |
63 | ::encode_xml("Value", suffix_rule, f); | |
64 | f->close_section(); | |
65 | } | |
66 | if (!regex_rule.empty()) { | |
67 | f->open_object_section("FilterRule"); | |
68 | ::encode_xml("Name", "regex", f); | |
69 | ::encode_xml("Value", regex_rule, f); | |
70 | f->close_section(); | |
71 | } | |
72 | } | |
73 | ||
74 | bool rgw_s3_key_filter::has_content() const { | |
75 | return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty()); | |
76 | } | |
77 | ||
9f95a23c | 78 | bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) { |
f67539c2 | 79 | kv.clear(); |
eafe8130 TL |
80 | XMLObjIter iter = obj->find("FilterRule"); |
81 | XMLObj *o; | |
82 | ||
83 | const auto throw_if_missing = true; | |
84 | ||
85 | std::string key; | |
86 | std::string value; | |
87 | ||
88 | while ((o = iter.get_next())) { | |
89 | RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing); | |
90 | RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing); | |
f67539c2 | 91 | kv.emplace(key, value); |
eafe8130 TL |
92 | } |
93 | return true; | |
94 | } | |
95 | ||
9f95a23c | 96 | void rgw_s3_key_value_filter::dump_xml(Formatter *f) const { |
f67539c2 | 97 | for (const auto& key_value : kv) { |
eafe8130 TL |
98 | f->open_object_section("FilterRule"); |
99 | ::encode_xml("Name", key_value.first, f); | |
100 | ::encode_xml("Value", key_value.second, f); | |
101 | f->close_section(); | |
102 | } | |
103 | } | |
104 | ||
9f95a23c | 105 | bool rgw_s3_key_value_filter::has_content() const { |
f67539c2 | 106 | return !kv.empty(); |
eafe8130 TL |
107 | } |
108 | ||
109 | bool rgw_s3_filter::decode_xml(XMLObj* obj) { | |
110 | RGWXMLDecoder::decode_xml("S3Key", key_filter, obj); | |
111 | RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj); | |
9f95a23c | 112 | RGWXMLDecoder::decode_xml("S3Tags", tag_filter, obj); |
eafe8130 TL |
113 | return true; |
114 | } | |
115 | ||
116 | void rgw_s3_filter::dump_xml(Formatter *f) const { | |
117 | if (key_filter.has_content()) { | |
118 | ::encode_xml("S3Key", key_filter, f); | |
119 | } | |
120 | if (metadata_filter.has_content()) { | |
121 | ::encode_xml("S3Metadata", metadata_filter, f); | |
122 | } | |
9f95a23c TL |
123 | if (tag_filter.has_content()) { |
124 | ::encode_xml("S3Tags", tag_filter, f); | |
125 | } | |
eafe8130 TL |
126 | } |
127 | ||
128 | bool rgw_s3_filter::has_content() const { | |
129 | return key_filter.has_content() || | |
9f95a23c TL |
130 | metadata_filter.has_content() || |
131 | tag_filter.has_content(); | |
eafe8130 TL |
132 | } |
133 | ||
134 | bool match(const rgw_s3_key_filter& filter, const std::string& key) { | |
135 | const auto key_size = key.size(); | |
136 | const auto prefix_size = filter.prefix_rule.size(); | |
137 | if (prefix_size != 0) { | |
138 | // prefix rule exists | |
139 | if (prefix_size > key_size) { | |
140 | // if prefix is longer than key, we fail | |
141 | return false; | |
142 | } | |
143 | if (!std::equal(filter.prefix_rule.begin(), filter.prefix_rule.end(), key.begin())) { | |
144 | return false; | |
145 | } | |
146 | } | |
147 | const auto suffix_size = filter.suffix_rule.size(); | |
148 | if (suffix_size != 0) { | |
149 | // suffix rule exists | |
150 | if (suffix_size > key_size) { | |
151 | // if suffix is longer than key, we fail | |
152 | return false; | |
153 | } | |
154 | if (!std::equal(filter.suffix_rule.begin(), filter.suffix_rule.end(), (key.end() - suffix_size))) { | |
155 | return false; | |
156 | } | |
157 | } | |
158 | if (!filter.regex_rule.empty()) { | |
159 | // TODO add regex chaching in the filter | |
160 | const std::regex base_regex(filter.regex_rule); | |
161 | if (!std::regex_match(key, base_regex)) { | |
162 | return false; | |
163 | } | |
164 | } | |
165 | return true; | |
166 | } | |
167 | ||
f67539c2 | 168 | bool match(const rgw_s3_key_value_filter& filter, const KeyValueMap& kv) { |
9f95a23c TL |
169 | // all filter pairs must exist with the same value in the object's metadata/tags |
170 | // object metadata/tags may include items not in the filter | |
f67539c2 | 171 | return std::includes(kv.begin(), kv.end(), filter.kv.begin(), filter.kv.end()); |
eafe8130 TL |
172 | } |
173 | ||
20effc67 TL |
174 | bool match(const rgw_s3_key_value_filter& filter, const KeyMultiValueMap& kv) { |
175 | // all filter pairs must exist with the same value in the object's metadata/tags | |
176 | // object metadata/tags may include items not in the filter | |
177 | for (auto& filter : filter.kv) { | |
178 | auto result = kv.equal_range(filter.first); | |
1e59de90 | 179 | if (std::any_of(result.first, result.second, [&filter](const std::pair<std::string, std::string>& p) { return p.second == filter.second;})) |
20effc67 TL |
180 | continue; |
181 | else | |
182 | return false; | |
183 | } | |
184 | return true; | |
185 | } | |
186 | ||
eafe8130 TL |
187 | bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) { |
188 | // if event list exists, and none of the events in the list matches the event type, filter the message | |
189 | if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) { | |
190 | return false; | |
191 | } | |
192 | return true; | |
193 | } | |
194 | ||
1e59de90 | 195 | void do_decode_xml_obj(rgw::notify::EventTypeList& l, const std::string& name, XMLObj *obj) { |
eafe8130 TL |
196 | l.clear(); |
197 | ||
198 | XMLObjIter iter = obj->find(name); | |
199 | XMLObj *o; | |
200 | ||
201 | while ((o = iter.get_next())) { | |
202 | std::string val; | |
203 | decode_xml_obj(val, o); | |
204 | l.push_back(rgw::notify::from_string(val)); | |
205 | } | |
206 | } | |
207 | ||
208 | bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) { | |
209 | const auto throw_if_missing = true; | |
210 | RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing); | |
211 | ||
212 | RGWXMLDecoder::decode_xml("Topic", topic_arn, obj, throw_if_missing); | |
213 | ||
214 | RGWXMLDecoder::decode_xml("Filter", filter, obj); | |
215 | ||
216 | do_decode_xml_obj(events, "Event", obj); | |
217 | if (events.empty()) { | |
218 | // if no events are provided, we assume all events | |
219 | events.push_back(rgw::notify::ObjectCreated); | |
220 | events.push_back(rgw::notify::ObjectRemoved); | |
221 | } | |
222 | return true; | |
223 | } | |
224 | ||
225 | void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const { | |
226 | ::encode_xml("Id", id, f); | |
227 | ::encode_xml("Topic", topic_arn.c_str(), f); | |
228 | if (filter.has_content()) { | |
229 | ::encode_xml("Filter", filter, f); | |
230 | } | |
231 | for (const auto& event : events) { | |
232 | ::encode_xml("Event", rgw::notify::to_string(event), f); | |
233 | } | |
234 | } | |
235 | ||
236 | bool rgw_pubsub_s3_notifications::decode_xml(XMLObj *obj) { | |
237 | do_decode_xml_obj(list, "TopicConfiguration", obj); | |
eafe8130 TL |
238 | return true; |
239 | } | |
240 | ||
241 | rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter) : | |
242 | id(topic_filter.s3_id), events(topic_filter.events), topic_arn(topic_filter.topic.arn), filter(topic_filter.s3_filter) {} | |
243 | ||
244 | void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const { | |
245 | do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f); | |
246 | } | |
247 | ||
f67539c2 | 248 | void rgw_pubsub_s3_event::dump(Formatter *f) const { |
eafe8130 TL |
249 | encode_json("eventVersion", eventVersion, f); |
250 | encode_json("eventSource", eventSource, f); | |
251 | encode_json("awsRegion", awsRegion, f); | |
252 | utime_t ut(eventTime); | |
253 | encode_json("eventTime", ut, f); | |
254 | encode_json("eventName", eventName, f); | |
255 | { | |
256 | Formatter::ObjectSection s(*f, "userIdentity"); | |
257 | encode_json("principalId", userIdentity, f); | |
258 | } | |
259 | { | |
260 | Formatter::ObjectSection s(*f, "requestParameters"); | |
261 | encode_json("sourceIPAddress", sourceIPAddress, f); | |
262 | } | |
263 | { | |
264 | Formatter::ObjectSection s(*f, "responseElements"); | |
265 | encode_json("x-amz-request-id", x_amz_request_id, f); | |
266 | encode_json("x-amz-id-2", x_amz_id_2, f); | |
267 | } | |
268 | { | |
269 | Formatter::ObjectSection s(*f, "s3"); | |
270 | encode_json("s3SchemaVersion", s3SchemaVersion, f); | |
271 | encode_json("configurationId", configurationId, f); | |
272 | { | |
273 | Formatter::ObjectSection sub_s(*f, "bucket"); | |
274 | encode_json("name", bucket_name, f); | |
275 | { | |
276 | Formatter::ObjectSection sub_sub_s(*f, "ownerIdentity"); | |
277 | encode_json("principalId", bucket_ownerIdentity, f); | |
278 | } | |
279 | encode_json("arn", bucket_arn, f); | |
280 | encode_json("id", bucket_id, f); | |
281 | } | |
282 | { | |
283 | Formatter::ObjectSection sub_s(*f, "object"); | |
284 | encode_json("key", object_key, f); | |
285 | encode_json("size", object_size, f); | |
a4b75251 | 286 | encode_json("eTag", object_etag, f); |
eafe8130 TL |
287 | encode_json("versionId", object_versionId, f); |
288 | encode_json("sequencer", object_sequencer, f); | |
289 | encode_json("metadata", x_meta_map, f); | |
9f95a23c | 290 | encode_json("tags", tags, f); |
eafe8130 TL |
291 | } |
292 | } | |
293 | encode_json("eventId", id, f); | |
9f95a23c | 294 | encode_json("opaqueData", opaque_data, f); |
eafe8130 | 295 | } |
11fdf7f2 | 296 | |
11fdf7f2 TL |
297 | void rgw_pubsub_topic::dump(Formatter *f) const |
298 | { | |
299 | encode_json("user", user, f); | |
300 | encode_json("name", name, f); | |
eafe8130 TL |
301 | encode_json("dest", dest, f); |
302 | encode_json("arn", arn, f); | |
9f95a23c | 303 | encode_json("opaqueData", opaque_data, f); |
eafe8130 TL |
304 | } |
305 | ||
306 | void rgw_pubsub_topic::dump_xml(Formatter *f) const | |
307 | { | |
308 | encode_xml("User", user, f); | |
309 | encode_xml("Name", name, f); | |
310 | encode_xml("EndPoint", dest, f); | |
311 | encode_xml("TopicArn", arn, f); | |
9f95a23c | 312 | encode_xml("OpaqueData", opaque_data, f); |
eafe8130 TL |
313 | } |
314 | ||
f67539c2 TL |
315 | void encode_xml_key_value_entry(const std::string& key, const std::string& value, Formatter *f) { |
316 | f->open_object_section("entry"); | |
317 | encode_xml("key", key, f); | |
318 | encode_xml("value", value, f); | |
319 | f->close_section(); // entry | |
320 | } | |
321 | ||
322 | void rgw_pubsub_topic::dump_xml_as_attributes(Formatter *f) const | |
323 | { | |
324 | f->open_array_section("Attributes"); | |
325 | std::string str_user; | |
326 | user.to_str(str_user); | |
327 | encode_xml_key_value_entry("User", str_user, f); | |
328 | encode_xml_key_value_entry("Name", name, f); | |
329 | encode_xml_key_value_entry("EndPoint", dest.to_json_str(), f); | |
330 | encode_xml_key_value_entry("TopicArn", arn, f); | |
331 | encode_xml_key_value_entry("OpaqueData", opaque_data, f); | |
332 | f->close_section(); // Attributes | |
333 | } | |
334 | ||
eafe8130 TL |
335 | void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f) |
336 | { | |
337 | f->open_array_section(name); | |
338 | for (auto iter = l.cbegin(); iter != l.cend(); ++iter) { | |
1e59de90 | 339 | f->dump_string("obj", rgw::notify::to_string(*iter)); |
eafe8130 TL |
340 | } |
341 | f->close_section(); | |
11fdf7f2 TL |
342 | } |
343 | ||
344 | void rgw_pubsub_topic_filter::dump(Formatter *f) const | |
345 | { | |
346 | encode_json("topic", topic, f); | |
347 | encode_json("events", events, f); | |
348 | } | |
349 | ||
11fdf7f2 TL |
350 | void rgw_pubsub_bucket_topics::dump(Formatter *f) const |
351 | { | |
352 | Formatter::ArraySection s(*f, "topics"); | |
353 | for (auto& t : topics) { | |
354 | encode_json(t.first.c_str(), t.second, f); | |
355 | } | |
356 | } | |
357 | ||
f67539c2 | 358 | void rgw_pubsub_topics::dump(Formatter *f) const |
11fdf7f2 TL |
359 | { |
360 | Formatter::ArraySection s(*f, "topics"); | |
361 | for (auto& t : topics) { | |
362 | encode_json(t.first.c_str(), t.second, f); | |
363 | } | |
364 | } | |
365 | ||
f67539c2 | 366 | void rgw_pubsub_topics::dump_xml(Formatter *f) const |
eafe8130 TL |
367 | { |
368 | for (auto& t : topics) { | |
1e59de90 | 369 | encode_xml("member", t.second, f); |
eafe8130 TL |
370 | } |
371 | } | |
372 | ||
1e59de90 | 373 | void rgw_pubsub_dest::dump(Formatter *f) const |
11fdf7f2 | 374 | { |
11fdf7f2 | 375 | encode_json("push_endpoint", push_endpoint, f); |
eafe8130 TL |
376 | encode_json("push_endpoint_args", push_endpoint_args, f); |
377 | encode_json("push_endpoint_topic", arn_topic, f); | |
f67539c2 TL |
378 | encode_json("stored_secret", stored_secret, f); |
379 | encode_json("persistent", persistent, f); | |
eafe8130 TL |
380 | } |
381 | ||
1e59de90 | 382 | void rgw_pubsub_dest::dump_xml(Formatter *f) const |
eafe8130 TL |
383 | { |
384 | encode_xml("EndpointAddress", push_endpoint, f); | |
385 | encode_xml("EndpointArgs", push_endpoint_args, f); | |
386 | encode_xml("EndpointTopic", arn_topic, f); | |
f67539c2 TL |
387 | encode_xml("HasStoredSecret", stored_secret, f); |
388 | encode_xml("Persistent", persistent, f); | |
389 | } | |
390 | ||
1e59de90 | 391 | std::string rgw_pubsub_dest::to_json_str() const |
f67539c2 | 392 | { |
f67539c2 TL |
393 | JSONFormatter f; |
394 | f.open_object_section(""); | |
395 | encode_json("EndpointAddress", push_endpoint, &f); | |
396 | encode_json("EndpointArgs", push_endpoint_args, &f); | |
397 | encode_json("EndpointTopic", arn_topic, &f); | |
398 | encode_json("HasStoredSecret", stored_secret, &f); | |
399 | encode_json("Persistent", persistent, &f); | |
400 | f.close_section(); | |
401 | std::stringstream ss; | |
402 | f.flush(ss); | |
403 | return ss.str(); | |
11fdf7f2 TL |
404 | } |
405 | ||
1e59de90 TL |
406 | RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver, const std::string& _tenant) |
407 | : driver(_driver), tenant(_tenant) | |
408 | {} | |
11fdf7f2 | 409 | |
1e59de90 TL |
410 | int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, |
411 | RGWObjVersionTracker *objv_tracker, optional_yield y) const | |
11fdf7f2 | 412 | { |
1e59de90 | 413 | const int ret = driver->read_topics(tenant, result, objv_tracker, y, dpp); |
eafe8130 | 414 | if (ret < 0) { |
1e59de90 | 415 | ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
416 | return ret; |
417 | } | |
418 | return 0; | |
419 | } | |
420 | ||
b3b6e05e | 421 | int RGWPubSub::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics, |
1e59de90 | 422 | RGWObjVersionTracker *objv_tracker, optional_yield y) const |
11fdf7f2 | 423 | { |
1e59de90 | 424 | const int ret = driver->write_topics(tenant, topics, objv_tracker, y, dpp); |
11fdf7f2 | 425 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 426 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
427 | return ret; |
428 | } | |
429 | return 0; | |
430 | } | |
431 | ||
1e59de90 TL |
432 | int RGWPubSub::Bucket::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result, |
433 | RGWObjVersionTracker *objv_tracker, optional_yield y) const | |
11fdf7f2 | 434 | { |
1e59de90 | 435 | const int ret = bucket->read_topics(result, objv_tracker, y, dpp); |
11fdf7f2 | 436 | if (ret < 0 && ret != -ENOENT) { |
1e59de90 | 437 | ldpp_dout(dpp, 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
438 | return ret; |
439 | } | |
440 | return 0; | |
441 | } | |
442 | ||
b3b6e05e | 443 | int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics, |
f67539c2 | 444 | RGWObjVersionTracker *objv_tracker, |
1e59de90 | 445 | optional_yield y) const |
11fdf7f2 | 446 | { |
1e59de90 | 447 | const int ret = bucket->write_topics(topics, objv_tracker, y, dpp); |
11fdf7f2 | 448 | if (ret < 0) { |
1e59de90 | 449 | ldpp_dout(dpp, 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
450 | return ret; |
451 | } | |
452 | ||
453 | return 0; | |
454 | } | |
455 | ||
1e59de90 | 456 | int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, rgw_pubsub_topic& result, optional_yield y) const |
11fdf7f2 | 457 | { |
f67539c2 | 458 | rgw_pubsub_topics topics; |
1e59de90 | 459 | const int ret = read_topics(dpp, topics, nullptr, y); |
11fdf7f2 | 460 | if (ret < 0) { |
1e59de90 | 461 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
eafe8130 TL |
462 | return ret; |
463 | } | |
464 | ||
465 | auto iter = topics.topics.find(name); | |
466 | if (iter == topics.topics.end()) { | |
1e59de90 | 467 | ldpp_dout(dpp, 1) << "ERROR: topic not found" << dendl; |
eafe8130 TL |
468 | return -ENOENT; |
469 | } | |
470 | ||
1e59de90 | 471 | result = iter->second; |
eafe8130 TL |
472 | return 0; |
473 | } | |
474 | ||
1e59de90 TL |
475 | int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, |
476 | const rgw::notify::EventTypeList& events, optional_yield y) const { | |
b3b6e05e | 477 | return create_notification(dpp, topic_name, events, std::nullopt, "", y); |
eafe8130 TL |
478 | } |
479 | ||
1e59de90 TL |
480 | int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, |
481 | const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const { | |
482 | rgw_pubsub_topic topic_info; | |
11fdf7f2 | 483 | |
1e59de90 | 484 | int ret = ps.get_topic(dpp, topic_name, topic_info, y); |
11fdf7f2 | 485 | if (ret < 0) { |
b3b6e05e | 486 | ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl; |
11fdf7f2 TL |
487 | return ret; |
488 | } | |
b3b6e05e | 489 | ldpp_dout(dpp, 20) << "successfully read topic '" << topic_name << "' info" << dendl; |
11fdf7f2 TL |
490 | |
491 | RGWObjVersionTracker objv_tracker; | |
492 | rgw_pubsub_bucket_topics bucket_topics; | |
493 | ||
1e59de90 | 494 | ret = read_topics(dpp, bucket_topics, &objv_tracker, y); |
eafe8130 | 495 | if (ret < 0) { |
b3b6e05e | 496 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics from bucket '" << |
1e59de90 | 497 | bucket->get_name() << "': ret=" << ret << dendl; |
11fdf7f2 TL |
498 | return ret; |
499 | } | |
b3b6e05e | 500 | ldpp_dout(dpp, 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" << |
1e59de90 | 501 | bucket->get_name() << "'" << dendl; |
11fdf7f2 TL |
502 | |
503 | auto& topic_filter = bucket_topics.topics[topic_name]; | |
1e59de90 | 504 | topic_filter.topic = topic_info; |
11fdf7f2 | 505 | topic_filter.events = events; |
eafe8130 TL |
506 | topic_filter.s3_id = notif_name; |
507 | if (s3_filter) { | |
508 | topic_filter.s3_filter = *s3_filter; | |
509 | } | |
11fdf7f2 | 510 | |
b3b6e05e | 511 | ret = write_topics(dpp, bucket_topics, &objv_tracker, y); |
11fdf7f2 | 512 | if (ret < 0) { |
1e59de90 | 513 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics to bucket '" << bucket->get_name() << "': ret=" << ret << dendl; |
11fdf7f2 TL |
514 | return ret; |
515 | } | |
eafe8130 | 516 | |
1e59de90 | 517 | ldpp_dout(dpp, 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket->get_name() << "'" << dendl; |
11fdf7f2 TL |
518 | |
519 | return 0; | |
520 | } | |
521 | ||
1e59de90 | 522 | int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y) const |
11fdf7f2 | 523 | { |
11fdf7f2 TL |
524 | RGWObjVersionTracker objv_tracker; |
525 | rgw_pubsub_bucket_topics bucket_topics; | |
526 | ||
1e59de90 | 527 | auto ret = read_topics(dpp, bucket_topics, &objv_tracker, y); |
eafe8130 | 528 | if (ret < 0) { |
b3b6e05e | 529 | ldpp_dout(dpp, 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
530 | return ret; |
531 | } | |
532 | ||
1e59de90 TL |
533 | if (bucket_topics.topics.erase(topic_name) == 0) { |
534 | ldpp_dout(dpp, 1) << "INFO: no need to remove, topic does not exist" << dendl; | |
535 | return 0; | |
536 | } | |
11fdf7f2 | 537 | |
522d829b TL |
538 | if (bucket_topics.topics.empty()) { |
539 | // no more topics - delete the notification object of the bucket | |
1e59de90 | 540 | ret = bucket->remove_topics(&objv_tracker, y, dpp); |
522d829b | 541 | if (ret < 0 && ret != -ENOENT) { |
20effc67 | 542 | ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl; |
522d829b TL |
543 | return ret; |
544 | } | |
545 | return 0; | |
546 | } | |
547 | ||
548 | // write back the notifications without the deleted one | |
b3b6e05e | 549 | ret = write_topics(dpp, bucket_topics, &objv_tracker, y); |
11fdf7f2 | 550 | if (ret < 0) { |
b3b6e05e | 551 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
552 | return ret; |
553 | } | |
554 | ||
555 | return 0; | |
556 | } | |
557 | ||
1e59de90 | 558 | int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const |
f67539c2 TL |
559 | { |
560 | // get all topics on a bucket | |
561 | rgw_pubsub_bucket_topics bucket_topics; | |
1e59de90 | 562 | auto ret = get_topics(dpp, bucket_topics, y); |
f67539c2 | 563 | if (ret < 0 && ret != -ENOENT) { |
1e59de90 | 564 | ldpp_dout(dpp, 1) << "ERROR: failed to get list of topics from bucket '" << bucket->get_name() << "', ret=" << ret << dendl; |
f67539c2 TL |
565 | return ret ; |
566 | } | |
567 | ||
568 | // remove all auto-genrated topics | |
569 | for (const auto& topic : bucket_topics.topics) { | |
570 | const auto& topic_name = topic.first; | |
1e59de90 | 571 | ret = ps.remove_topic(dpp, topic_name, y); |
f67539c2 | 572 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 573 | ldpp_dout(dpp, 5) << "WARNING: failed to remove auto-generated topic '" << topic_name << "', ret=" << ret << dendl; |
f67539c2 TL |
574 | } |
575 | } | |
576 | ||
522d829b | 577 | // delete the notification object of the bucket |
1e59de90 | 578 | ret = bucket->remove_topics(nullptr, y, dpp); |
f67539c2 | 579 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 580 | ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl; |
f67539c2 TL |
581 | return ret; |
582 | } | |
583 | ||
584 | return 0; | |
eafe8130 TL |
585 | } |
586 | ||
1e59de90 TL |
587 | int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const { |
588 | return create_topic(dpp, name, rgw_pubsub_dest{}, "", "", y); | |
f67539c2 TL |
589 | } |
590 | ||
1e59de90 TL |
591 | int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_dest& dest, |
592 | const std::string& arn, const std::string& opaque_data, optional_yield y) const { | |
11fdf7f2 | 593 | RGWObjVersionTracker objv_tracker; |
f67539c2 | 594 | rgw_pubsub_topics topics; |
11fdf7f2 | 595 | |
1e59de90 | 596 | int ret = read_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 597 | if (ret < 0 && ret != -ENOENT) { |
eafe8130 | 598 | // its not an error if not topics exist, we create one |
b3b6e05e | 599 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
600 | return ret; |
601 | } | |
eafe8130 | 602 | |
1e59de90 TL |
603 | rgw_pubsub_topic& new_topic = topics.topics[name]; |
604 | new_topic.user = rgw_user("", tenant); | |
605 | new_topic.name = name; | |
606 | new_topic.dest = dest; | |
607 | new_topic.arn = arn; | |
608 | new_topic.opaque_data = opaque_data; | |
11fdf7f2 | 609 | |
b3b6e05e | 610 | ret = write_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 611 | if (ret < 0) { |
b3b6e05e | 612 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
613 | return ret; |
614 | } | |
615 | ||
616 | return 0; | |
617 | } | |
618 | ||
1e59de90 | 619 | int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const |
11fdf7f2 TL |
620 | { |
621 | RGWObjVersionTracker objv_tracker; | |
f67539c2 | 622 | rgw_pubsub_topics topics; |
11fdf7f2 | 623 | |
1e59de90 | 624 | int ret = read_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 625 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 626 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 | 627 | return ret; |
eafe8130 TL |
628 | } else if (ret == -ENOENT) { |
629 | // its not an error if no topics exist, just a no-op | |
b3b6e05e | 630 | ldpp_dout(dpp, 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret << dendl; |
eafe8130 | 631 | return 0; |
11fdf7f2 TL |
632 | } |
633 | ||
634 | topics.topics.erase(name); | |
635 | ||
b3b6e05e | 636 | ret = write_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 637 | if (ret < 0) { |
b3b6e05e | 638 | ldpp_dout(dpp, 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
639 | return ret; |
640 | } | |
641 | ||
642 | return 0; | |
643 | } | |
644 |