]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #include "services/svc_zone.h" | |
11fdf7f2 | 5 | #include "rgw_b64.h" |
9f95a23c | 6 | #include "rgw_sal.h" |
f67539c2 | 7 | #include "rgw_sal_rados.h" |
11fdf7f2 TL |
8 | #include "rgw_pubsub.h" |
9 | #include "rgw_tools.h" | |
eafe8130 TL |
10 | #include "rgw_xml.h" |
11 | #include "rgw_arn.h" | |
12 | #include "rgw_pubsub_push.h" | |
eafe8130 TL |
13 | #include <regex> |
14 | #include <algorithm> | |
11fdf7f2 TL |
15 | |
16 | #define dout_subsys ceph_subsys_rgw | |
17 | ||
92f5a8d4 TL |
18 | void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) { |
19 | char buf[64]; | |
20 | const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str()); | |
21 | if (len > 0) { | |
22 | id.assign(buf, len); | |
23 | } | |
24 | } | |
25 | ||
eafe8130 TL |
26 | bool rgw_s3_key_filter::decode_xml(XMLObj* obj) { |
27 | XMLObjIter iter = obj->find("FilterRule"); | |
28 | XMLObj *o; | |
29 | ||
30 | const auto throw_if_missing = true; | |
31 | auto prefix_not_set = true; | |
32 | auto suffix_not_set = true; | |
33 | auto regex_not_set = true; | |
34 | std::string name; | |
35 | ||
36 | while ((o = iter.get_next())) { | |
37 | RGWXMLDecoder::decode_xml("Name", name, o, throw_if_missing); | |
38 | if (name == "prefix" && prefix_not_set) { | |
39 | prefix_not_set = false; | |
40 | RGWXMLDecoder::decode_xml("Value", prefix_rule, o, throw_if_missing); | |
41 | } else if (name == "suffix" && suffix_not_set) { | |
42 | suffix_not_set = false; | |
43 | RGWXMLDecoder::decode_xml("Value", suffix_rule, o, throw_if_missing); | |
44 | } else if (name == "regex" && regex_not_set) { | |
45 | regex_not_set = false; | |
46 | RGWXMLDecoder::decode_xml("Value", regex_rule, o, throw_if_missing); | |
47 | } else { | |
48 | throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name + "'"); | |
49 | } | |
50 | } | |
51 | return true; | |
52 | } | |
53 | ||
54 | void rgw_s3_key_filter::dump_xml(Formatter *f) const { | |
55 | if (!prefix_rule.empty()) { | |
56 | f->open_object_section("FilterRule"); | |
57 | ::encode_xml("Name", "prefix", f); | |
58 | ::encode_xml("Value", prefix_rule, f); | |
59 | f->close_section(); | |
60 | } | |
61 | if (!suffix_rule.empty()) { | |
62 | f->open_object_section("FilterRule"); | |
63 | ::encode_xml("Name", "suffix", f); | |
64 | ::encode_xml("Value", suffix_rule, f); | |
65 | f->close_section(); | |
66 | } | |
67 | if (!regex_rule.empty()) { | |
68 | f->open_object_section("FilterRule"); | |
69 | ::encode_xml("Name", "regex", f); | |
70 | ::encode_xml("Value", regex_rule, f); | |
71 | f->close_section(); | |
72 | } | |
73 | } | |
74 | ||
75 | bool rgw_s3_key_filter::has_content() const { | |
76 | return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty()); | |
77 | } | |
78 | ||
9f95a23c | 79 | bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) { |
f67539c2 | 80 | kv.clear(); |
eafe8130 TL |
81 | XMLObjIter iter = obj->find("FilterRule"); |
82 | XMLObj *o; | |
83 | ||
84 | const auto throw_if_missing = true; | |
85 | ||
86 | std::string key; | |
87 | std::string value; | |
88 | ||
89 | while ((o = iter.get_next())) { | |
90 | RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing); | |
91 | RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing); | |
f67539c2 | 92 | kv.emplace(key, value); |
eafe8130 TL |
93 | } |
94 | return true; | |
95 | } | |
96 | ||
9f95a23c | 97 | void rgw_s3_key_value_filter::dump_xml(Formatter *f) const { |
f67539c2 | 98 | for (const auto& key_value : kv) { |
eafe8130 TL |
99 | f->open_object_section("FilterRule"); |
100 | ::encode_xml("Name", key_value.first, f); | |
101 | ::encode_xml("Value", key_value.second, f); | |
102 | f->close_section(); | |
103 | } | |
104 | } | |
105 | ||
9f95a23c | 106 | bool rgw_s3_key_value_filter::has_content() const { |
f67539c2 | 107 | return !kv.empty(); |
eafe8130 TL |
108 | } |
109 | ||
110 | bool rgw_s3_filter::decode_xml(XMLObj* obj) { | |
111 | RGWXMLDecoder::decode_xml("S3Key", key_filter, obj); | |
112 | RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj); | |
9f95a23c | 113 | RGWXMLDecoder::decode_xml("S3Tags", tag_filter, obj); |
eafe8130 TL |
114 | return true; |
115 | } | |
116 | ||
117 | void rgw_s3_filter::dump_xml(Formatter *f) const { | |
118 | if (key_filter.has_content()) { | |
119 | ::encode_xml("S3Key", key_filter, f); | |
120 | } | |
121 | if (metadata_filter.has_content()) { | |
122 | ::encode_xml("S3Metadata", metadata_filter, f); | |
123 | } | |
9f95a23c TL |
124 | if (tag_filter.has_content()) { |
125 | ::encode_xml("S3Tags", tag_filter, f); | |
126 | } | |
eafe8130 TL |
127 | } |
128 | ||
129 | bool rgw_s3_filter::has_content() const { | |
130 | return key_filter.has_content() || | |
9f95a23c TL |
131 | metadata_filter.has_content() || |
132 | tag_filter.has_content(); | |
eafe8130 TL |
133 | } |
134 | ||
135 | bool match(const rgw_s3_key_filter& filter, const std::string& key) { | |
136 | const auto key_size = key.size(); | |
137 | const auto prefix_size = filter.prefix_rule.size(); | |
138 | if (prefix_size != 0) { | |
139 | // prefix rule exists | |
140 | if (prefix_size > key_size) { | |
141 | // if prefix is longer than key, we fail | |
142 | return false; | |
143 | } | |
144 | if (!std::equal(filter.prefix_rule.begin(), filter.prefix_rule.end(), key.begin())) { | |
145 | return false; | |
146 | } | |
147 | } | |
148 | const auto suffix_size = filter.suffix_rule.size(); | |
149 | if (suffix_size != 0) { | |
150 | // suffix rule exists | |
151 | if (suffix_size > key_size) { | |
152 | // if suffix is longer than key, we fail | |
153 | return false; | |
154 | } | |
155 | if (!std::equal(filter.suffix_rule.begin(), filter.suffix_rule.end(), (key.end() - suffix_size))) { | |
156 | return false; | |
157 | } | |
158 | } | |
159 | if (!filter.regex_rule.empty()) { | |
160 | // TODO add regex chaching in the filter | |
161 | const std::regex base_regex(filter.regex_rule); | |
162 | if (!std::regex_match(key, base_regex)) { | |
163 | return false; | |
164 | } | |
165 | } | |
166 | return true; | |
167 | } | |
168 | ||
f67539c2 | 169 | bool match(const rgw_s3_key_value_filter& filter, const KeyValueMap& kv) { |
9f95a23c TL |
170 | // all filter pairs must exist with the same value in the object's metadata/tags |
171 | // object metadata/tags may include items not in the filter | |
f67539c2 | 172 | return std::includes(kv.begin(), kv.end(), filter.kv.begin(), filter.kv.end()); |
eafe8130 TL |
173 | } |
174 | ||
175 | bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) { | |
176 | // if event list exists, and none of the events in the list matches the event type, filter the message | |
177 | if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) { | |
178 | return false; | |
179 | } | |
180 | return true; | |
181 | } | |
182 | ||
183 | void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj) { | |
184 | l.clear(); | |
185 | ||
186 | XMLObjIter iter = obj->find(name); | |
187 | XMLObj *o; | |
188 | ||
189 | while ((o = iter.get_next())) { | |
190 | std::string val; | |
191 | decode_xml_obj(val, o); | |
192 | l.push_back(rgw::notify::from_string(val)); | |
193 | } | |
194 | } | |
195 | ||
196 | bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) { | |
197 | const auto throw_if_missing = true; | |
198 | RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing); | |
199 | ||
200 | RGWXMLDecoder::decode_xml("Topic", topic_arn, obj, throw_if_missing); | |
201 | ||
202 | RGWXMLDecoder::decode_xml("Filter", filter, obj); | |
203 | ||
204 | do_decode_xml_obj(events, "Event", obj); | |
205 | if (events.empty()) { | |
206 | // if no events are provided, we assume all events | |
207 | events.push_back(rgw::notify::ObjectCreated); | |
208 | events.push_back(rgw::notify::ObjectRemoved); | |
209 | } | |
210 | return true; | |
211 | } | |
212 | ||
213 | void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const { | |
214 | ::encode_xml("Id", id, f); | |
215 | ::encode_xml("Topic", topic_arn.c_str(), f); | |
216 | if (filter.has_content()) { | |
217 | ::encode_xml("Filter", filter, f); | |
218 | } | |
219 | for (const auto& event : events) { | |
220 | ::encode_xml("Event", rgw::notify::to_string(event), f); | |
221 | } | |
222 | } | |
223 | ||
224 | bool rgw_pubsub_s3_notifications::decode_xml(XMLObj *obj) { | |
225 | do_decode_xml_obj(list, "TopicConfiguration", obj); | |
226 | if (list.empty()) { | |
227 | throw RGWXMLDecoder::err("at least one 'TopicConfiguration' must exist"); | |
228 | } | |
229 | return true; | |
230 | } | |
231 | ||
232 | rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter) : | |
233 | id(topic_filter.s3_id), events(topic_filter.events), topic_arn(topic_filter.topic.arn), filter(topic_filter.s3_filter) {} | |
234 | ||
235 | void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const { | |
236 | do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f); | |
237 | } | |
238 | ||
f67539c2 | 239 | void rgw_pubsub_s3_event::dump(Formatter *f) const { |
eafe8130 TL |
240 | encode_json("eventVersion", eventVersion, f); |
241 | encode_json("eventSource", eventSource, f); | |
242 | encode_json("awsRegion", awsRegion, f); | |
243 | utime_t ut(eventTime); | |
244 | encode_json("eventTime", ut, f); | |
245 | encode_json("eventName", eventName, f); | |
246 | { | |
247 | Formatter::ObjectSection s(*f, "userIdentity"); | |
248 | encode_json("principalId", userIdentity, f); | |
249 | } | |
250 | { | |
251 | Formatter::ObjectSection s(*f, "requestParameters"); | |
252 | encode_json("sourceIPAddress", sourceIPAddress, f); | |
253 | } | |
254 | { | |
255 | Formatter::ObjectSection s(*f, "responseElements"); | |
256 | encode_json("x-amz-request-id", x_amz_request_id, f); | |
257 | encode_json("x-amz-id-2", x_amz_id_2, f); | |
258 | } | |
259 | { | |
260 | Formatter::ObjectSection s(*f, "s3"); | |
261 | encode_json("s3SchemaVersion", s3SchemaVersion, f); | |
262 | encode_json("configurationId", configurationId, f); | |
263 | { | |
264 | Formatter::ObjectSection sub_s(*f, "bucket"); | |
265 | encode_json("name", bucket_name, f); | |
266 | { | |
267 | Formatter::ObjectSection sub_sub_s(*f, "ownerIdentity"); | |
268 | encode_json("principalId", bucket_ownerIdentity, f); | |
269 | } | |
270 | encode_json("arn", bucket_arn, f); | |
271 | encode_json("id", bucket_id, f); | |
272 | } | |
273 | { | |
274 | Formatter::ObjectSection sub_s(*f, "object"); | |
275 | encode_json("key", object_key, f); | |
276 | encode_json("size", object_size, f); | |
a4b75251 | 277 | encode_json("eTag", object_etag, f); |
eafe8130 TL |
278 | encode_json("versionId", object_versionId, f); |
279 | encode_json("sequencer", object_sequencer, f); | |
280 | encode_json("metadata", x_meta_map, f); | |
9f95a23c | 281 | encode_json("tags", tags, f); |
eafe8130 TL |
282 | } |
283 | } | |
284 | encode_json("eventId", id, f); | |
9f95a23c | 285 | encode_json("opaqueData", opaque_data, f); |
eafe8130 | 286 | } |
11fdf7f2 TL |
287 | |
288 | void rgw_pubsub_event::dump(Formatter *f) const | |
289 | { | |
290 | encode_json("id", id, f); | |
eafe8130 | 291 | encode_json("event", event_name, f); |
11fdf7f2 TL |
292 | utime_t ut(timestamp); |
293 | encode_json("timestamp", ut, f); | |
294 | encode_json("info", info, f); | |
295 | } | |
296 | ||
297 | void rgw_pubsub_topic::dump(Formatter *f) const | |
298 | { | |
299 | encode_json("user", user, f); | |
300 | encode_json("name", name, f); | |
eafe8130 TL |
301 | encode_json("dest", dest, f); |
302 | encode_json("arn", arn, f); | |
9f95a23c | 303 | encode_json("opaqueData", opaque_data, f); |
eafe8130 TL |
304 | } |
305 | ||
306 | void rgw_pubsub_topic::dump_xml(Formatter *f) const | |
307 | { | |
308 | encode_xml("User", user, f); | |
309 | encode_xml("Name", name, f); | |
310 | encode_xml("EndPoint", dest, f); | |
311 | encode_xml("TopicArn", arn, f); | |
9f95a23c | 312 | encode_xml("OpaqueData", opaque_data, f); |
eafe8130 TL |
313 | } |
314 | ||
f67539c2 TL |
315 | void encode_xml_key_value_entry(const std::string& key, const std::string& value, Formatter *f) { |
316 | f->open_object_section("entry"); | |
317 | encode_xml("key", key, f); | |
318 | encode_xml("value", value, f); | |
319 | f->close_section(); // entry | |
320 | } | |
321 | ||
322 | void rgw_pubsub_topic::dump_xml_as_attributes(Formatter *f) const | |
323 | { | |
324 | f->open_array_section("Attributes"); | |
325 | std::string str_user; | |
326 | user.to_str(str_user); | |
327 | encode_xml_key_value_entry("User", str_user, f); | |
328 | encode_xml_key_value_entry("Name", name, f); | |
329 | encode_xml_key_value_entry("EndPoint", dest.to_json_str(), f); | |
330 | encode_xml_key_value_entry("TopicArn", arn, f); | |
331 | encode_xml_key_value_entry("OpaqueData", opaque_data, f); | |
332 | f->close_section(); // Attributes | |
333 | } | |
334 | ||
eafe8130 TL |
335 | void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f) |
336 | { | |
337 | f->open_array_section(name); | |
338 | for (auto iter = l.cbegin(); iter != l.cend(); ++iter) { | |
339 | f->dump_string("obj", rgw::notify::to_ceph_string(*iter)); | |
340 | } | |
341 | f->close_section(); | |
11fdf7f2 TL |
342 | } |
343 | ||
344 | void rgw_pubsub_topic_filter::dump(Formatter *f) const | |
345 | { | |
346 | encode_json("topic", topic, f); | |
347 | encode_json("events", events, f); | |
348 | } | |
349 | ||
350 | void rgw_pubsub_topic_subs::dump(Formatter *f) const | |
351 | { | |
352 | encode_json("topic", topic, f); | |
353 | encode_json("subs", subs, f); | |
354 | } | |
355 | ||
356 | void rgw_pubsub_bucket_topics::dump(Formatter *f) const | |
357 | { | |
358 | Formatter::ArraySection s(*f, "topics"); | |
359 | for (auto& t : topics) { | |
360 | encode_json(t.first.c_str(), t.second, f); | |
361 | } | |
362 | } | |
363 | ||
f67539c2 | 364 | void rgw_pubsub_topics::dump(Formatter *f) const |
11fdf7f2 TL |
365 | { |
366 | Formatter::ArraySection s(*f, "topics"); | |
367 | for (auto& t : topics) { | |
368 | encode_json(t.first.c_str(), t.second, f); | |
369 | } | |
370 | } | |
371 | ||
f67539c2 | 372 | void rgw_pubsub_topics::dump_xml(Formatter *f) const |
eafe8130 TL |
373 | { |
374 | for (auto& t : topics) { | |
375 | encode_xml("member", t.second.topic, f); | |
376 | } | |
377 | } | |
378 | ||
11fdf7f2 TL |
379 | void rgw_pubsub_sub_dest::dump(Formatter *f) const |
380 | { | |
381 | encode_json("bucket_name", bucket_name, f); | |
382 | encode_json("oid_prefix", oid_prefix, f); | |
383 | encode_json("push_endpoint", push_endpoint, f); | |
eafe8130 TL |
384 | encode_json("push_endpoint_args", push_endpoint_args, f); |
385 | encode_json("push_endpoint_topic", arn_topic, f); | |
f67539c2 TL |
386 | encode_json("stored_secret", stored_secret, f); |
387 | encode_json("persistent", persistent, f); | |
eafe8130 TL |
388 | } |
389 | ||
390 | void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const | |
391 | { | |
f67539c2 TL |
392 | // first 2 members are omitted here since they |
393 | // dont apply to AWS compliant topics | |
eafe8130 TL |
394 | encode_xml("EndpointAddress", push_endpoint, f); |
395 | encode_xml("EndpointArgs", push_endpoint_args, f); | |
396 | encode_xml("EndpointTopic", arn_topic, f); | |
f67539c2 TL |
397 | encode_xml("HasStoredSecret", stored_secret, f); |
398 | encode_xml("Persistent", persistent, f); | |
399 | } | |
400 | ||
401 | std::string rgw_pubsub_sub_dest::to_json_str() const | |
402 | { | |
403 | // first 2 members are omitted here since they | |
404 | // dont apply to AWS compliant topics | |
405 | JSONFormatter f; | |
406 | f.open_object_section(""); | |
407 | encode_json("EndpointAddress", push_endpoint, &f); | |
408 | encode_json("EndpointArgs", push_endpoint_args, &f); | |
409 | encode_json("EndpointTopic", arn_topic, &f); | |
410 | encode_json("HasStoredSecret", stored_secret, &f); | |
411 | encode_json("Persistent", persistent, &f); | |
412 | f.close_section(); | |
413 | std::stringstream ss; | |
414 | f.flush(ss); | |
415 | return ss.str(); | |
11fdf7f2 TL |
416 | } |
417 | ||
418 | void rgw_pubsub_sub_config::dump(Formatter *f) const | |
419 | { | |
420 | encode_json("user", user, f); | |
421 | encode_json("name", name, f); | |
422 | encode_json("topic", topic, f); | |
423 | encode_json("dest", dest, f); | |
eafe8130 | 424 | encode_json("s3_id", s3_id, f); |
11fdf7f2 TL |
425 | } |
426 | ||
f67539c2 | 427 | RGWPubSub::RGWPubSub(rgw::sal::RGWRadosStore* _store, const std::string& _tenant) : |
9f95a23c | 428 | store(_store), |
f67539c2 | 429 | tenant(_tenant), |
9f95a23c | 430 | obj_ctx(store->svc()->sysobj->init_obj_ctx()) { |
f67539c2 | 431 | get_meta_obj(&meta_obj); |
9f95a23c | 432 | } |
11fdf7f2 | 433 | |
b3b6e05e TL |
434 | int RGWPubSub::remove(const DoutPrefixProvider *dpp, |
435 | const rgw_raw_obj& obj, | |
f67539c2 TL |
436 | RGWObjVersionTracker *objv_tracker, |
437 | optional_yield y) | |
11fdf7f2 | 438 | { |
b3b6e05e | 439 | int ret = rgw_delete_system_obj(dpp, store->svc()->sysobj, obj.pool, obj.oid, objv_tracker, y); |
11fdf7f2 TL |
440 | if (ret < 0) { |
441 | return ret; | |
442 | } | |
443 | ||
444 | return 0; | |
445 | } | |
446 | ||
f67539c2 | 447 | int RGWPubSub::read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker *objv_tracker) |
11fdf7f2 | 448 | { |
f67539c2 | 449 | int ret = read(meta_obj, result, objv_tracker); |
eafe8130 TL |
450 | if (ret < 0) { |
451 | ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl; | |
11fdf7f2 TL |
452 | return ret; |
453 | } | |
454 | return 0; | |
455 | } | |
456 | ||
b3b6e05e | 457 | int RGWPubSub::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics, |
f67539c2 | 458 | RGWObjVersionTracker *objv_tracker, optional_yield y) |
11fdf7f2 | 459 | { |
b3b6e05e | 460 | int ret = write(dpp, meta_obj, topics, objv_tracker, y); |
11fdf7f2 | 461 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 462 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
463 | return ret; |
464 | } | |
465 | return 0; | |
466 | } | |
467 | ||
f67539c2 | 468 | int RGWPubSub::get_topics(rgw_pubsub_topics *result) |
11fdf7f2 | 469 | { |
f67539c2 | 470 | return read_topics(result, nullptr); |
11fdf7f2 TL |
471 | } |
472 | ||
f67539c2 | 473 | int RGWPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker) |
11fdf7f2 TL |
474 | { |
475 | int ret = ps->read(bucket_meta_obj, result, objv_tracker); | |
476 | if (ret < 0 && ret != -ENOENT) { | |
eafe8130 | 477 | ldout(ps->store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
478 | return ret; |
479 | } | |
480 | return 0; | |
481 | } | |
482 | ||
b3b6e05e | 483 | int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics, |
f67539c2 TL |
484 | RGWObjVersionTracker *objv_tracker, |
485 | optional_yield y) | |
11fdf7f2 | 486 | { |
b3b6e05e | 487 | int ret = ps->write(dpp, bucket_meta_obj, topics, objv_tracker, y); |
11fdf7f2 | 488 | if (ret < 0) { |
eafe8130 | 489 | ldout(ps->store->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
490 | return ret; |
491 | } | |
492 | ||
493 | return 0; | |
494 | } | |
495 | ||
f67539c2 | 496 | int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result) |
11fdf7f2 TL |
497 | { |
498 | return read_topics(result, nullptr); | |
499 | } | |
500 | ||
f67539c2 | 501 | int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result) |
11fdf7f2 | 502 | { |
f67539c2 TL |
503 | rgw_pubsub_topics topics; |
504 | int ret = get_topics(&topics); | |
11fdf7f2 | 505 | if (ret < 0) { |
eafe8130 | 506 | ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
507 | return ret; |
508 | } | |
509 | ||
510 | auto iter = topics.topics.find(name); | |
511 | if (iter == topics.topics.end()) { | |
eafe8130 | 512 | ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl; |
11fdf7f2 TL |
513 | return -ENOENT; |
514 | } | |
515 | ||
516 | *result = iter->second; | |
517 | return 0; | |
518 | } | |
519 | ||
f67539c2 | 520 | int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic *result) |
11fdf7f2 | 521 | { |
f67539c2 TL |
522 | rgw_pubsub_topics topics; |
523 | int ret = get_topics(&topics); | |
eafe8130 TL |
524 | if (ret < 0) { |
525 | ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; | |
526 | return ret; | |
527 | } | |
528 | ||
529 | auto iter = topics.topics.find(name); | |
530 | if (iter == topics.topics.end()) { | |
531 | ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl; | |
532 | return -ENOENT; | |
533 | } | |
534 | ||
535 | *result = iter->second.topic; | |
536 | return 0; | |
537 | } | |
538 | ||
b3b6e05e TL |
539 | int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y) { |
540 | return create_notification(dpp, topic_name, events, std::nullopt, "", y); | |
eafe8130 TL |
541 | } |
542 | ||
b3b6e05e | 543 | int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const string& topic_name,const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) { |
f67539c2 | 544 | rgw_pubsub_topic_subs topic_info; |
11fdf7f2 | 545 | |
f67539c2 | 546 | int ret = ps->get_topic(topic_name, &topic_info); |
11fdf7f2 | 547 | if (ret < 0) { |
b3b6e05e | 548 | ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl; |
11fdf7f2 TL |
549 | return ret; |
550 | } | |
b3b6e05e | 551 | ldpp_dout(dpp, 20) << "successfully read topic '" << topic_name << "' info" << dendl; |
11fdf7f2 TL |
552 | |
553 | RGWObjVersionTracker objv_tracker; | |
554 | rgw_pubsub_bucket_topics bucket_topics; | |
555 | ||
556 | ret = read_topics(&bucket_topics, &objv_tracker); | |
eafe8130 | 557 | if (ret < 0) { |
b3b6e05e | 558 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics from bucket '" << |
eafe8130 | 559 | bucket.name << "': ret=" << ret << dendl; |
11fdf7f2 TL |
560 | return ret; |
561 | } | |
b3b6e05e | 562 | ldpp_dout(dpp, 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" << |
eafe8130 | 563 | bucket.name << "'" << dendl; |
11fdf7f2 TL |
564 | |
565 | auto& topic_filter = bucket_topics.topics[topic_name]; | |
f67539c2 | 566 | topic_filter.topic = topic_info.topic; |
11fdf7f2 | 567 | topic_filter.events = events; |
eafe8130 TL |
568 | topic_filter.s3_id = notif_name; |
569 | if (s3_filter) { | |
570 | topic_filter.s3_filter = *s3_filter; | |
571 | } | |
11fdf7f2 | 572 | |
b3b6e05e | 573 | ret = write_topics(dpp, bucket_topics, &objv_tracker, y); |
11fdf7f2 | 574 | if (ret < 0) { |
b3b6e05e | 575 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics to bucket '" << bucket.name << "': ret=" << ret << dendl; |
11fdf7f2 TL |
576 | return ret; |
577 | } | |
eafe8130 | 578 | |
b3b6e05e | 579 | ldpp_dout(dpp, 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket.name << "'" << dendl; |
11fdf7f2 TL |
580 | |
581 | return 0; | |
582 | } | |
583 | ||
b3b6e05e | 584 | int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const string& topic_name, optional_yield y) |
11fdf7f2 | 585 | { |
f67539c2 | 586 | rgw_pubsub_topic_subs topic_info; |
11fdf7f2 | 587 | |
f67539c2 | 588 | int ret = ps->get_topic(topic_name, &topic_info); |
11fdf7f2 | 589 | if (ret < 0) { |
b3b6e05e | 590 | ldpp_dout(dpp, 1) << "ERROR: failed to read topic info: ret=" << ret << dendl; |
11fdf7f2 TL |
591 | return ret; |
592 | } | |
593 | ||
594 | RGWObjVersionTracker objv_tracker; | |
595 | rgw_pubsub_bucket_topics bucket_topics; | |
596 | ||
597 | ret = read_topics(&bucket_topics, &objv_tracker); | |
eafe8130 | 598 | if (ret < 0) { |
b3b6e05e | 599 | ldpp_dout(dpp, 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
600 | return ret; |
601 | } | |
602 | ||
603 | bucket_topics.topics.erase(topic_name); | |
604 | ||
522d829b TL |
605 | if (bucket_topics.topics.empty()) { |
606 | // no more topics - delete the notification object of the bucket | |
607 | ret = ps->remove(dpp, bucket_meta_obj, &objv_tracker, y); | |
608 | if (ret < 0 && ret != -ENOENT) { | |
609 | ldout(ps->store->ctx(), 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl; | |
610 | return ret; | |
611 | } | |
612 | return 0; | |
613 | } | |
614 | ||
615 | // write back the notifications without the deleted one | |
b3b6e05e | 616 | ret = write_topics(dpp, bucket_topics, &objv_tracker, y); |
11fdf7f2 | 617 | if (ret < 0) { |
b3b6e05e | 618 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
619 | return ret; |
620 | } | |
621 | ||
622 | return 0; | |
623 | } | |
624 | ||
b3b6e05e | 625 | int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) |
f67539c2 TL |
626 | { |
627 | // get all topics on a bucket | |
628 | rgw_pubsub_bucket_topics bucket_topics; | |
629 | auto ret = get_topics(&bucket_topics); | |
630 | if (ret < 0 && ret != -ENOENT) { | |
b3b6e05e | 631 | ldpp_dout(dpp, 1) << "ERROR: failed to get list of topics from bucket '" << bucket.name << "', ret=" << ret << dendl; |
f67539c2 TL |
632 | return ret ; |
633 | } | |
634 | ||
635 | // remove all auto-genrated topics | |
636 | for (const auto& topic : bucket_topics.topics) { | |
637 | const auto& topic_name = topic.first; | |
b3b6e05e | 638 | ret = ps->remove_topic(dpp, topic_name, y); |
f67539c2 | 639 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 640 | ldpp_dout(dpp, 5) << "WARNING: failed to remove auto-generated topic '" << topic_name << "', ret=" << ret << dendl; |
f67539c2 TL |
641 | } |
642 | } | |
643 | ||
522d829b | 644 | // delete the notification object of the bucket |
b3b6e05e | 645 | ret = ps->remove(dpp, bucket_meta_obj, nullptr, y); |
f67539c2 | 646 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 647 | ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl; |
f67539c2 TL |
648 | return ret; |
649 | } | |
650 | ||
651 | return 0; | |
eafe8130 TL |
652 | } |
653 | ||
b3b6e05e TL |
654 | int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y) { |
655 | return create_topic(dpp, name, rgw_pubsub_sub_dest(), "", "", y); | |
f67539c2 TL |
656 | } |
657 | ||
b3b6e05e | 658 | int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) { |
11fdf7f2 | 659 | RGWObjVersionTracker objv_tracker; |
f67539c2 | 660 | rgw_pubsub_topics topics; |
11fdf7f2 | 661 | |
f67539c2 | 662 | int ret = read_topics(&topics, &objv_tracker); |
11fdf7f2 | 663 | if (ret < 0 && ret != -ENOENT) { |
eafe8130 | 664 | // its not an error if not topics exist, we create one |
b3b6e05e | 665 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
666 | return ret; |
667 | } | |
eafe8130 | 668 | |
11fdf7f2 | 669 | rgw_pubsub_topic_subs& new_topic = topics.topics[name]; |
f67539c2 | 670 | new_topic.topic.user = rgw_user("", tenant); |
11fdf7f2 | 671 | new_topic.topic.name = name; |
eafe8130 TL |
672 | new_topic.topic.dest = dest; |
673 | new_topic.topic.arn = arn; | |
9f95a23c | 674 | new_topic.topic.opaque_data = opaque_data; |
11fdf7f2 | 675 | |
b3b6e05e | 676 | ret = write_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 677 | if (ret < 0) { |
b3b6e05e | 678 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
679 | return ret; |
680 | } | |
681 | ||
682 | return 0; | |
683 | } | |
684 | ||
b3b6e05e | 685 | int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y) |
11fdf7f2 TL |
686 | { |
687 | RGWObjVersionTracker objv_tracker; | |
f67539c2 | 688 | rgw_pubsub_topics topics; |
11fdf7f2 | 689 | |
f67539c2 | 690 | int ret = read_topics(&topics, &objv_tracker); |
11fdf7f2 | 691 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 692 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 | 693 | return ret; |
eafe8130 TL |
694 | } else if (ret == -ENOENT) { |
695 | // its not an error if no topics exist, just a no-op | |
b3b6e05e | 696 | ldpp_dout(dpp, 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret << dendl; |
eafe8130 | 697 | return 0; |
11fdf7f2 TL |
698 | } |
699 | ||
700 | topics.topics.erase(name); | |
701 | ||
b3b6e05e | 702 | ret = write_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 703 | if (ret < 0) { |
b3b6e05e | 704 | ldpp_dout(dpp, 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
705 | return ret; |
706 | } | |
707 | ||
708 | return 0; | |
709 | } | |
710 | ||
f67539c2 | 711 | int RGWPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker) |
11fdf7f2 TL |
712 | { |
713 | int ret = ps->read(sub_meta_obj, result, objv_tracker); | |
714 | if (ret < 0 && ret != -ENOENT) { | |
eafe8130 | 715 | ldout(ps->store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
716 | return ret; |
717 | } | |
718 | return 0; | |
719 | } | |
720 | ||
b3b6e05e TL |
721 | int RGWPubSub::Sub::write_sub(const DoutPrefixProvider *dpp, |
722 | const rgw_pubsub_sub_config& sub_conf, | |
f67539c2 TL |
723 | RGWObjVersionTracker *objv_tracker, |
724 | optional_yield y) | |
11fdf7f2 | 725 | { |
b3b6e05e | 726 | int ret = ps->write(dpp, sub_meta_obj, sub_conf, objv_tracker, y); |
11fdf7f2 | 727 | if (ret < 0) { |
b3b6e05e | 728 | ldpp_dout(dpp, 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
729 | return ret; |
730 | } | |
731 | ||
732 | return 0; | |
733 | } | |
734 | ||
b3b6e05e | 735 | int RGWPubSub::Sub::remove_sub(const DoutPrefixProvider *dpp, RGWObjVersionTracker *objv_tracker, |
f67539c2 | 736 | optional_yield y) |
11fdf7f2 | 737 | { |
b3b6e05e | 738 | int ret = ps->remove(dpp, sub_meta_obj, objv_tracker, y); |
11fdf7f2 | 739 | if (ret < 0) { |
b3b6e05e | 740 | ldpp_dout(dpp, 1) << "ERROR: failed to remove subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
741 | return ret; |
742 | } | |
743 | ||
744 | return 0; | |
745 | } | |
746 | ||
f67539c2 | 747 | int RGWPubSub::Sub::get_conf(rgw_pubsub_sub_config *result) |
11fdf7f2 TL |
748 | { |
749 | return read_sub(result, nullptr); | |
750 | } | |
751 | ||
b3b6e05e | 752 | int RGWPubSub::Sub::subscribe(const DoutPrefixProvider *dpp, const string& topic, const rgw_pubsub_sub_dest& dest, optional_yield y, const std::string& s3_id) |
11fdf7f2 | 753 | { |
f67539c2 TL |
754 | RGWObjVersionTracker objv_tracker; |
755 | rgw_pubsub_topics topics; | |
11fdf7f2 | 756 | |
f67539c2 | 757 | int ret = ps->read_topics(&topics, &objv_tracker); |
11fdf7f2 | 758 | if (ret < 0) { |
b3b6e05e | 759 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
eafe8130 | 760 | return ret != -ENOENT ? ret : -EINVAL; |
11fdf7f2 TL |
761 | } |
762 | ||
763 | auto iter = topics.topics.find(topic); | |
764 | if (iter == topics.topics.end()) { | |
b3b6e05e | 765 | ldpp_dout(dpp, 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl; |
eafe8130 | 766 | return -EINVAL; |
11fdf7f2 TL |
767 | } |
768 | ||
769 | auto& t = iter->second; | |
770 | ||
771 | rgw_pubsub_sub_config sub_conf; | |
772 | ||
f67539c2 | 773 | sub_conf.user = rgw_user("", ps->tenant); |
11fdf7f2 TL |
774 | sub_conf.name = sub; |
775 | sub_conf.topic = topic; | |
776 | sub_conf.dest = dest; | |
eafe8130 | 777 | sub_conf.s3_id = s3_id; |
11fdf7f2 TL |
778 | |
779 | t.subs.insert(sub); | |
780 | ||
b3b6e05e | 781 | ret = ps->write_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 782 | if (ret < 0) { |
b3b6e05e | 783 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
784 | return ret; |
785 | } | |
786 | ||
b3b6e05e | 787 | ret = write_sub(dpp, sub_conf, nullptr, y); |
11fdf7f2 | 788 | if (ret < 0) { |
b3b6e05e | 789 | ldpp_dout(dpp, 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
790 | return ret; |
791 | } | |
792 | return 0; | |
793 | } | |
794 | ||
b3b6e05e | 795 | int RGWPubSub::Sub::unsubscribe(const DoutPrefixProvider *dpp, const string& _topic, optional_yield y) |
11fdf7f2 TL |
796 | { |
797 | string topic = _topic; | |
798 | RGWObjVersionTracker sobjv_tracker; | |
11fdf7f2 TL |
799 | |
800 | if (topic.empty()) { | |
801 | rgw_pubsub_sub_config sub_conf; | |
802 | int ret = read_sub(&sub_conf, &sobjv_tracker); | |
803 | if (ret < 0) { | |
b3b6e05e | 804 | ldpp_dout(dpp, 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
805 | return ret; |
806 | } | |
807 | topic = sub_conf.topic; | |
808 | } | |
809 | ||
810 | RGWObjVersionTracker objv_tracker; | |
f67539c2 | 811 | rgw_pubsub_topics topics; |
11fdf7f2 | 812 | |
f67539c2 | 813 | int ret = ps->read_topics(&topics, &objv_tracker); |
11fdf7f2 | 814 | if (ret < 0) { |
eafe8130 | 815 | // not an error - could be that topic was already deleted |
b3b6e05e | 816 | ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl; |
eafe8130 | 817 | } else { |
11fdf7f2 TL |
818 | auto iter = topics.topics.find(topic); |
819 | if (iter != topics.topics.end()) { | |
820 | auto& t = iter->second; | |
821 | ||
822 | t.subs.erase(sub); | |
823 | ||
b3b6e05e | 824 | ret = ps->write_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 825 | if (ret < 0) { |
b3b6e05e | 826 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
827 | return ret; |
828 | } | |
829 | } | |
830 | } | |
831 | ||
b3b6e05e | 832 | ret = remove_sub(dpp, &sobjv_tracker, y); |
11fdf7f2 | 833 | if (ret < 0) { |
b3b6e05e | 834 | ldpp_dout(dpp, 1) << "ERROR: failed to delete subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
835 | return ret; |
836 | } | |
837 | return 0; | |
838 | } | |
839 | ||
eafe8130 | 840 | template<typename EventType> |
f67539c2 | 841 | void RGWPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const |
11fdf7f2 TL |
842 | { |
843 | encode_json("next_marker", next_marker, f); | |
844 | encode_json("is_truncated", is_truncated, f); | |
845 | ||
eafe8130 | 846 | Formatter::ArraySection s(*f, EventType::json_type_plural); |
11fdf7f2 | 847 | for (auto& event : events) { |
92f5a8d4 | 848 | encode_json("", event, f); |
11fdf7f2 TL |
849 | } |
850 | } | |
851 | ||
eafe8130 | 852 | template<typename EventType> |
b3b6e05e | 853 | int RGWPubSub::SubWithEvents<EventType>::list_events(const DoutPrefixProvider *dpp, const string& marker, int max_events) |
11fdf7f2 | 854 | { |
9f95a23c | 855 | RGWRados *store = ps->store->getRados(); |
11fdf7f2 TL |
856 | rgw_pubsub_sub_config sub_conf; |
857 | int ret = get_conf(&sub_conf); | |
858 | if (ret < 0) { | |
b3b6e05e | 859 | ldpp_dout(dpp, 1) << "ERROR: failed to read sub config: ret=" << ret << dendl; |
11fdf7f2 TL |
860 | return ret; |
861 | } | |
862 | ||
863 | RGWBucketInfo bucket_info; | |
864 | string tenant; | |
9f95a23c | 865 | ret = store->get_bucket_info(&store->svc, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr); |
11fdf7f2 | 866 | if (ret == -ENOENT) { |
eafe8130 | 867 | list.is_truncated = false; |
11fdf7f2 TL |
868 | return 0; |
869 | } | |
870 | if (ret < 0) { | |
b3b6e05e | 871 | ldpp_dout(dpp, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; |
11fdf7f2 TL |
872 | return ret; |
873 | } | |
874 | ||
875 | RGWRados::Bucket target(store, bucket_info); | |
876 | RGWRados::Bucket::List list_op(&target); | |
877 | ||
878 | list_op.params.prefix = sub_conf.dest.oid_prefix; | |
879 | list_op.params.marker = marker; | |
880 | ||
eafe8130 | 881 | std::vector<rgw_bucket_dir_entry> objs; |
11fdf7f2 | 882 | |
b3b6e05e | 883 | ret = list_op.list_objects(dpp, max_events, &objs, nullptr, &list.is_truncated, null_yield); |
11fdf7f2 | 884 | if (ret < 0) { |
b3b6e05e | 885 | ldpp_dout(dpp, 1) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; |
11fdf7f2 TL |
886 | return ret; |
887 | } | |
eafe8130 TL |
888 | if (list.is_truncated) { |
889 | list.next_marker = list_op.get_next_marker().name; | |
11fdf7f2 TL |
890 | } |
891 | ||
892 | for (auto& obj : objs) { | |
893 | bufferlist bl64; | |
894 | bufferlist bl; | |
895 | bl64.append(obj.meta.user_data); | |
896 | try { | |
897 | bl.decode_base64(bl64); | |
898 | } catch (buffer::error& err) { | |
b3b6e05e | 899 | ldpp_dout(dpp, 1) << "ERROR: failed to event (not a valid base64)" << dendl; |
11fdf7f2 TL |
900 | continue; |
901 | } | |
eafe8130 | 902 | EventType event; |
11fdf7f2 TL |
903 | |
904 | auto iter = bl.cbegin(); | |
905 | try { | |
906 | decode(event, iter); | |
907 | } catch (buffer::error& err) { | |
b3b6e05e | 908 | ldpp_dout(dpp, 1) << "ERROR: failed to decode event" << dendl; |
11fdf7f2 TL |
909 | continue; |
910 | }; | |
911 | ||
eafe8130 | 912 | list.events.push_back(event); |
11fdf7f2 TL |
913 | } |
914 | return 0; | |
915 | } | |
916 | ||
eafe8130 | 917 | template<typename EventType> |
b3b6e05e | 918 | int RGWPubSub::SubWithEvents<EventType>::remove_event(const DoutPrefixProvider *dpp, const string& event_id) |
11fdf7f2 | 919 | { |
9f95a23c | 920 | rgw::sal::RGWRadosStore *store = ps->store; |
11fdf7f2 TL |
921 | rgw_pubsub_sub_config sub_conf; |
922 | int ret = get_conf(&sub_conf); | |
923 | if (ret < 0) { | |
b3b6e05e | 924 | ldpp_dout(dpp, 1) << "ERROR: failed to read sub config: ret=" << ret << dendl; |
11fdf7f2 TL |
925 | return ret; |
926 | } | |
927 | ||
928 | RGWBucketInfo bucket_info; | |
929 | string tenant; | |
9f95a23c | 930 | ret = store->getRados()->get_bucket_info(store->svc(), tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr); |
11fdf7f2 | 931 | if (ret < 0) { |
b3b6e05e | 932 | ldpp_dout(dpp, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; |
11fdf7f2 TL |
933 | return ret; |
934 | } | |
935 | ||
936 | rgw_bucket& bucket = bucket_info.bucket; | |
937 | ||
938 | RGWObjectCtx obj_ctx(store); | |
939 | rgw_obj obj(bucket, sub_conf.dest.oid_prefix + event_id); | |
940 | ||
941 | obj_ctx.set_atomic(obj); | |
942 | ||
9f95a23c | 943 | RGWRados::Object del_target(store->getRados(), bucket_info, obj_ctx, obj); |
11fdf7f2 TL |
944 | RGWRados::Object::Delete del_op(&del_target); |
945 | ||
946 | del_op.params.bucket_owner = bucket_info.owner; | |
947 | del_op.params.versioning_status = bucket_info.versioning_status(); | |
948 | ||
b3b6e05e | 949 | ret = del_op.delete_obj(null_yield, dpp); |
11fdf7f2 | 950 | if (ret < 0) { |
b3b6e05e | 951 | ldpp_dout(dpp, 1) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl; |
11fdf7f2 TL |
952 | } |
953 | return 0; | |
954 | } | |
eafe8130 | 955 | |
f67539c2 TL |
956 | void RGWPubSub::get_meta_obj(rgw_raw_obj *obj) const { |
957 | *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, meta_oid()); | |
9f95a23c TL |
958 | } |
959 | ||
f67539c2 | 960 | void RGWPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const { |
9f95a23c TL |
961 | *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket)); |
962 | } | |
963 | ||
f67539c2 | 964 | void RGWPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const { |
9f95a23c TL |
965 | *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name)); |
966 | } | |
967 | ||
eafe8130 | 968 | template<typename EventType> |
f67539c2 | 969 | void RGWPubSub::SubWithEvents<EventType>::dump(Formatter* f) const { |
eafe8130 TL |
970 | list.dump(f); |
971 | } | |
972 | ||
973 | // explicit instantiation for the only two possible types | |
974 | // no need to move implementation to header | |
f67539c2 TL |
975 | template class RGWPubSub::SubWithEvents<rgw_pubsub_event>; |
976 | template class RGWPubSub::SubWithEvents<rgw_pubsub_s3_event>; | |
eafe8130 | 977 |