]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #include "services/svc_zone.h" | |
11fdf7f2 | 5 | #include "rgw_b64.h" |
9f95a23c | 6 | #include "rgw_sal.h" |
f67539c2 | 7 | #include "rgw_sal_rados.h" |
11fdf7f2 TL |
8 | #include "rgw_pubsub.h" |
9 | #include "rgw_tools.h" | |
eafe8130 TL |
10 | #include "rgw_xml.h" |
11 | #include "rgw_arn.h" | |
12 | #include "rgw_pubsub_push.h" | |
eafe8130 TL |
13 | #include <regex> |
14 | #include <algorithm> | |
11fdf7f2 TL |
15 | |
16 | #define dout_subsys ceph_subsys_rgw | |
17 | ||
20effc67 | 18 | using namespace std; |
92f5a8d4 TL |
19 | void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) { |
20 | char buf[64]; | |
21 | const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str()); | |
22 | if (len > 0) { | |
23 | id.assign(buf, len); | |
24 | } | |
25 | } | |
26 | ||
eafe8130 TL |
27 | bool rgw_s3_key_filter::decode_xml(XMLObj* obj) { |
28 | XMLObjIter iter = obj->find("FilterRule"); | |
29 | XMLObj *o; | |
30 | ||
31 | const auto throw_if_missing = true; | |
32 | auto prefix_not_set = true; | |
33 | auto suffix_not_set = true; | |
34 | auto regex_not_set = true; | |
35 | std::string name; | |
36 | ||
37 | while ((o = iter.get_next())) { | |
38 | RGWXMLDecoder::decode_xml("Name", name, o, throw_if_missing); | |
39 | if (name == "prefix" && prefix_not_set) { | |
40 | prefix_not_set = false; | |
41 | RGWXMLDecoder::decode_xml("Value", prefix_rule, o, throw_if_missing); | |
42 | } else if (name == "suffix" && suffix_not_set) { | |
43 | suffix_not_set = false; | |
44 | RGWXMLDecoder::decode_xml("Value", suffix_rule, o, throw_if_missing); | |
45 | } else if (name == "regex" && regex_not_set) { | |
46 | regex_not_set = false; | |
47 | RGWXMLDecoder::decode_xml("Value", regex_rule, o, throw_if_missing); | |
48 | } else { | |
49 | throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name + "'"); | |
50 | } | |
51 | } | |
52 | return true; | |
53 | } | |
54 | ||
55 | void rgw_s3_key_filter::dump_xml(Formatter *f) const { | |
56 | if (!prefix_rule.empty()) { | |
57 | f->open_object_section("FilterRule"); | |
58 | ::encode_xml("Name", "prefix", f); | |
59 | ::encode_xml("Value", prefix_rule, f); | |
60 | f->close_section(); | |
61 | } | |
62 | if (!suffix_rule.empty()) { | |
63 | f->open_object_section("FilterRule"); | |
64 | ::encode_xml("Name", "suffix", f); | |
65 | ::encode_xml("Value", suffix_rule, f); | |
66 | f->close_section(); | |
67 | } | |
68 | if (!regex_rule.empty()) { | |
69 | f->open_object_section("FilterRule"); | |
70 | ::encode_xml("Name", "regex", f); | |
71 | ::encode_xml("Value", regex_rule, f); | |
72 | f->close_section(); | |
73 | } | |
74 | } | |
75 | ||
76 | bool rgw_s3_key_filter::has_content() const { | |
77 | return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty()); | |
78 | } | |
79 | ||
9f95a23c | 80 | bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) { |
f67539c2 | 81 | kv.clear(); |
eafe8130 TL |
82 | XMLObjIter iter = obj->find("FilterRule"); |
83 | XMLObj *o; | |
84 | ||
85 | const auto throw_if_missing = true; | |
86 | ||
87 | std::string key; | |
88 | std::string value; | |
89 | ||
90 | while ((o = iter.get_next())) { | |
91 | RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing); | |
92 | RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing); | |
f67539c2 | 93 | kv.emplace(key, value); |
eafe8130 TL |
94 | } |
95 | return true; | |
96 | } | |
97 | ||
9f95a23c | 98 | void rgw_s3_key_value_filter::dump_xml(Formatter *f) const { |
f67539c2 | 99 | for (const auto& key_value : kv) { |
eafe8130 TL |
100 | f->open_object_section("FilterRule"); |
101 | ::encode_xml("Name", key_value.first, f); | |
102 | ::encode_xml("Value", key_value.second, f); | |
103 | f->close_section(); | |
104 | } | |
105 | } | |
106 | ||
9f95a23c | 107 | bool rgw_s3_key_value_filter::has_content() const { |
f67539c2 | 108 | return !kv.empty(); |
eafe8130 TL |
109 | } |
110 | ||
111 | bool rgw_s3_filter::decode_xml(XMLObj* obj) { | |
112 | RGWXMLDecoder::decode_xml("S3Key", key_filter, obj); | |
113 | RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj); | |
9f95a23c | 114 | RGWXMLDecoder::decode_xml("S3Tags", tag_filter, obj); |
eafe8130 TL |
115 | return true; |
116 | } | |
117 | ||
118 | void rgw_s3_filter::dump_xml(Formatter *f) const { | |
119 | if (key_filter.has_content()) { | |
120 | ::encode_xml("S3Key", key_filter, f); | |
121 | } | |
122 | if (metadata_filter.has_content()) { | |
123 | ::encode_xml("S3Metadata", metadata_filter, f); | |
124 | } | |
9f95a23c TL |
125 | if (tag_filter.has_content()) { |
126 | ::encode_xml("S3Tags", tag_filter, f); | |
127 | } | |
eafe8130 TL |
128 | } |
129 | ||
130 | bool rgw_s3_filter::has_content() const { | |
131 | return key_filter.has_content() || | |
9f95a23c TL |
132 | metadata_filter.has_content() || |
133 | tag_filter.has_content(); | |
eafe8130 TL |
134 | } |
135 | ||
136 | bool match(const rgw_s3_key_filter& filter, const std::string& key) { | |
137 | const auto key_size = key.size(); | |
138 | const auto prefix_size = filter.prefix_rule.size(); | |
139 | if (prefix_size != 0) { | |
140 | // prefix rule exists | |
141 | if (prefix_size > key_size) { | |
142 | // if prefix is longer than key, we fail | |
143 | return false; | |
144 | } | |
145 | if (!std::equal(filter.prefix_rule.begin(), filter.prefix_rule.end(), key.begin())) { | |
146 | return false; | |
147 | } | |
148 | } | |
149 | const auto suffix_size = filter.suffix_rule.size(); | |
150 | if (suffix_size != 0) { | |
151 | // suffix rule exists | |
152 | if (suffix_size > key_size) { | |
153 | // if suffix is longer than key, we fail | |
154 | return false; | |
155 | } | |
156 | if (!std::equal(filter.suffix_rule.begin(), filter.suffix_rule.end(), (key.end() - suffix_size))) { | |
157 | return false; | |
158 | } | |
159 | } | |
160 | if (!filter.regex_rule.empty()) { | |
161 | // TODO add regex chaching in the filter | |
162 | const std::regex base_regex(filter.regex_rule); | |
163 | if (!std::regex_match(key, base_regex)) { | |
164 | return false; | |
165 | } | |
166 | } | |
167 | return true; | |
168 | } | |
169 | ||
f67539c2 | 170 | bool match(const rgw_s3_key_value_filter& filter, const KeyValueMap& kv) { |
9f95a23c TL |
171 | // all filter pairs must exist with the same value in the object's metadata/tags |
172 | // object metadata/tags may include items not in the filter | |
f67539c2 | 173 | return std::includes(kv.begin(), kv.end(), filter.kv.begin(), filter.kv.end()); |
eafe8130 TL |
174 | } |
175 | ||
20effc67 TL |
176 | bool match(const rgw_s3_key_value_filter& filter, const KeyMultiValueMap& kv) { |
177 | // all filter pairs must exist with the same value in the object's metadata/tags | |
178 | // object metadata/tags may include items not in the filter | |
179 | for (auto& filter : filter.kv) { | |
180 | auto result = kv.equal_range(filter.first); | |
181 | if (std::any_of(result.first, result.second, [&filter](const pair<string,string>& p) { return p.second == filter.second;})) | |
182 | continue; | |
183 | else | |
184 | return false; | |
185 | } | |
186 | return true; | |
187 | } | |
188 | ||
eafe8130 TL |
189 | bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) { |
190 | // if event list exists, and none of the events in the list matches the event type, filter the message | |
191 | if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) { | |
192 | return false; | |
193 | } | |
194 | return true; | |
195 | } | |
196 | ||
197 | void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj) { | |
198 | l.clear(); | |
199 | ||
200 | XMLObjIter iter = obj->find(name); | |
201 | XMLObj *o; | |
202 | ||
203 | while ((o = iter.get_next())) { | |
204 | std::string val; | |
205 | decode_xml_obj(val, o); | |
206 | l.push_back(rgw::notify::from_string(val)); | |
207 | } | |
208 | } | |
209 | ||
210 | bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) { | |
211 | const auto throw_if_missing = true; | |
212 | RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing); | |
213 | ||
214 | RGWXMLDecoder::decode_xml("Topic", topic_arn, obj, throw_if_missing); | |
215 | ||
216 | RGWXMLDecoder::decode_xml("Filter", filter, obj); | |
217 | ||
218 | do_decode_xml_obj(events, "Event", obj); | |
219 | if (events.empty()) { | |
220 | // if no events are provided, we assume all events | |
221 | events.push_back(rgw::notify::ObjectCreated); | |
222 | events.push_back(rgw::notify::ObjectRemoved); | |
223 | } | |
224 | return true; | |
225 | } | |
226 | ||
227 | void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const { | |
228 | ::encode_xml("Id", id, f); | |
229 | ::encode_xml("Topic", topic_arn.c_str(), f); | |
230 | if (filter.has_content()) { | |
231 | ::encode_xml("Filter", filter, f); | |
232 | } | |
233 | for (const auto& event : events) { | |
234 | ::encode_xml("Event", rgw::notify::to_string(event), f); | |
235 | } | |
236 | } | |
237 | ||
238 | bool rgw_pubsub_s3_notifications::decode_xml(XMLObj *obj) { | |
239 | do_decode_xml_obj(list, "TopicConfiguration", obj); | |
eafe8130 TL |
240 | return true; |
241 | } | |
242 | ||
243 | rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter) : | |
244 | id(topic_filter.s3_id), events(topic_filter.events), topic_arn(topic_filter.topic.arn), filter(topic_filter.s3_filter) {} | |
245 | ||
246 | void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const { | |
247 | do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f); | |
248 | } | |
249 | ||
f67539c2 | 250 | void rgw_pubsub_s3_event::dump(Formatter *f) const { |
eafe8130 TL |
251 | encode_json("eventVersion", eventVersion, f); |
252 | encode_json("eventSource", eventSource, f); | |
253 | encode_json("awsRegion", awsRegion, f); | |
254 | utime_t ut(eventTime); | |
255 | encode_json("eventTime", ut, f); | |
256 | encode_json("eventName", eventName, f); | |
257 | { | |
258 | Formatter::ObjectSection s(*f, "userIdentity"); | |
259 | encode_json("principalId", userIdentity, f); | |
260 | } | |
261 | { | |
262 | Formatter::ObjectSection s(*f, "requestParameters"); | |
263 | encode_json("sourceIPAddress", sourceIPAddress, f); | |
264 | } | |
265 | { | |
266 | Formatter::ObjectSection s(*f, "responseElements"); | |
267 | encode_json("x-amz-request-id", x_amz_request_id, f); | |
268 | encode_json("x-amz-id-2", x_amz_id_2, f); | |
269 | } | |
270 | { | |
271 | Formatter::ObjectSection s(*f, "s3"); | |
272 | encode_json("s3SchemaVersion", s3SchemaVersion, f); | |
273 | encode_json("configurationId", configurationId, f); | |
274 | { | |
275 | Formatter::ObjectSection sub_s(*f, "bucket"); | |
276 | encode_json("name", bucket_name, f); | |
277 | { | |
278 | Formatter::ObjectSection sub_sub_s(*f, "ownerIdentity"); | |
279 | encode_json("principalId", bucket_ownerIdentity, f); | |
280 | } | |
281 | encode_json("arn", bucket_arn, f); | |
282 | encode_json("id", bucket_id, f); | |
283 | } | |
284 | { | |
285 | Formatter::ObjectSection sub_s(*f, "object"); | |
286 | encode_json("key", object_key, f); | |
287 | encode_json("size", object_size, f); | |
a4b75251 | 288 | encode_json("eTag", object_etag, f); |
eafe8130 TL |
289 | encode_json("versionId", object_versionId, f); |
290 | encode_json("sequencer", object_sequencer, f); | |
291 | encode_json("metadata", x_meta_map, f); | |
9f95a23c | 292 | encode_json("tags", tags, f); |
eafe8130 TL |
293 | } |
294 | } | |
295 | encode_json("eventId", id, f); | |
9f95a23c | 296 | encode_json("opaqueData", opaque_data, f); |
eafe8130 | 297 | } |
11fdf7f2 TL |
298 | |
299 | void rgw_pubsub_event::dump(Formatter *f) const | |
300 | { | |
301 | encode_json("id", id, f); | |
eafe8130 | 302 | encode_json("event", event_name, f); |
11fdf7f2 TL |
303 | utime_t ut(timestamp); |
304 | encode_json("timestamp", ut, f); | |
305 | encode_json("info", info, f); | |
306 | } | |
307 | ||
308 | void rgw_pubsub_topic::dump(Formatter *f) const | |
309 | { | |
310 | encode_json("user", user, f); | |
311 | encode_json("name", name, f); | |
eafe8130 TL |
312 | encode_json("dest", dest, f); |
313 | encode_json("arn", arn, f); | |
9f95a23c | 314 | encode_json("opaqueData", opaque_data, f); |
eafe8130 TL |
315 | } |
316 | ||
317 | void rgw_pubsub_topic::dump_xml(Formatter *f) const | |
318 | { | |
319 | encode_xml("User", user, f); | |
320 | encode_xml("Name", name, f); | |
321 | encode_xml("EndPoint", dest, f); | |
322 | encode_xml("TopicArn", arn, f); | |
9f95a23c | 323 | encode_xml("OpaqueData", opaque_data, f); |
eafe8130 TL |
324 | } |
325 | ||
f67539c2 TL |
326 | void encode_xml_key_value_entry(const std::string& key, const std::string& value, Formatter *f) { |
327 | f->open_object_section("entry"); | |
328 | encode_xml("key", key, f); | |
329 | encode_xml("value", value, f); | |
330 | f->close_section(); // entry | |
331 | } | |
332 | ||
333 | void rgw_pubsub_topic::dump_xml_as_attributes(Formatter *f) const | |
334 | { | |
335 | f->open_array_section("Attributes"); | |
336 | std::string str_user; | |
337 | user.to_str(str_user); | |
338 | encode_xml_key_value_entry("User", str_user, f); | |
339 | encode_xml_key_value_entry("Name", name, f); | |
340 | encode_xml_key_value_entry("EndPoint", dest.to_json_str(), f); | |
341 | encode_xml_key_value_entry("TopicArn", arn, f); | |
342 | encode_xml_key_value_entry("OpaqueData", opaque_data, f); | |
343 | f->close_section(); // Attributes | |
344 | } | |
345 | ||
eafe8130 TL |
346 | void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f) |
347 | { | |
348 | f->open_array_section(name); | |
349 | for (auto iter = l.cbegin(); iter != l.cend(); ++iter) { | |
350 | f->dump_string("obj", rgw::notify::to_ceph_string(*iter)); | |
351 | } | |
352 | f->close_section(); | |
11fdf7f2 TL |
353 | } |
354 | ||
355 | void rgw_pubsub_topic_filter::dump(Formatter *f) const | |
356 | { | |
357 | encode_json("topic", topic, f); | |
358 | encode_json("events", events, f); | |
359 | } | |
360 | ||
361 | void rgw_pubsub_topic_subs::dump(Formatter *f) const | |
362 | { | |
363 | encode_json("topic", topic, f); | |
364 | encode_json("subs", subs, f); | |
365 | } | |
366 | ||
367 | void rgw_pubsub_bucket_topics::dump(Formatter *f) const | |
368 | { | |
369 | Formatter::ArraySection s(*f, "topics"); | |
370 | for (auto& t : topics) { | |
371 | encode_json(t.first.c_str(), t.second, f); | |
372 | } | |
373 | } | |
374 | ||
f67539c2 | 375 | void rgw_pubsub_topics::dump(Formatter *f) const |
11fdf7f2 TL |
376 | { |
377 | Formatter::ArraySection s(*f, "topics"); | |
378 | for (auto& t : topics) { | |
379 | encode_json(t.first.c_str(), t.second, f); | |
380 | } | |
381 | } | |
382 | ||
f67539c2 | 383 | void rgw_pubsub_topics::dump_xml(Formatter *f) const |
eafe8130 TL |
384 | { |
385 | for (auto& t : topics) { | |
386 | encode_xml("member", t.second.topic, f); | |
387 | } | |
388 | } | |
389 | ||
11fdf7f2 TL |
390 | void rgw_pubsub_sub_dest::dump(Formatter *f) const |
391 | { | |
392 | encode_json("bucket_name", bucket_name, f); | |
393 | encode_json("oid_prefix", oid_prefix, f); | |
394 | encode_json("push_endpoint", push_endpoint, f); | |
eafe8130 TL |
395 | encode_json("push_endpoint_args", push_endpoint_args, f); |
396 | encode_json("push_endpoint_topic", arn_topic, f); | |
f67539c2 TL |
397 | encode_json("stored_secret", stored_secret, f); |
398 | encode_json("persistent", persistent, f); | |
eafe8130 TL |
399 | } |
400 | ||
401 | void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const | |
402 | { | |
f67539c2 TL |
403 | // first 2 members are omitted here since they |
404 | // dont apply to AWS compliant topics | |
eafe8130 TL |
405 | encode_xml("EndpointAddress", push_endpoint, f); |
406 | encode_xml("EndpointArgs", push_endpoint_args, f); | |
407 | encode_xml("EndpointTopic", arn_topic, f); | |
f67539c2 TL |
408 | encode_xml("HasStoredSecret", stored_secret, f); |
409 | encode_xml("Persistent", persistent, f); | |
410 | } | |
411 | ||
412 | std::string rgw_pubsub_sub_dest::to_json_str() const | |
413 | { | |
414 | // first 2 members are omitted here since they | |
415 | // dont apply to AWS compliant topics | |
416 | JSONFormatter f; | |
417 | f.open_object_section(""); | |
418 | encode_json("EndpointAddress", push_endpoint, &f); | |
419 | encode_json("EndpointArgs", push_endpoint_args, &f); | |
420 | encode_json("EndpointTopic", arn_topic, &f); | |
421 | encode_json("HasStoredSecret", stored_secret, &f); | |
422 | encode_json("Persistent", persistent, &f); | |
423 | f.close_section(); | |
424 | std::stringstream ss; | |
425 | f.flush(ss); | |
426 | return ss.str(); | |
11fdf7f2 TL |
427 | } |
428 | ||
429 | void rgw_pubsub_sub_config::dump(Formatter *f) const | |
430 | { | |
431 | encode_json("user", user, f); | |
432 | encode_json("name", name, f); | |
433 | encode_json("topic", topic, f); | |
434 | encode_json("dest", dest, f); | |
eafe8130 | 435 | encode_json("s3_id", s3_id, f); |
11fdf7f2 TL |
436 | } |
437 | ||
20effc67 | 438 | RGWPubSub::RGWPubSub(rgw::sal::RadosStore* _store, const std::string& _tenant) : |
9f95a23c | 439 | store(_store), |
f67539c2 | 440 | tenant(_tenant), |
9f95a23c | 441 | obj_ctx(store->svc()->sysobj->init_obj_ctx()) { |
f67539c2 | 442 | get_meta_obj(&meta_obj); |
9f95a23c | 443 | } |
11fdf7f2 | 444 | |
b3b6e05e TL |
445 | int RGWPubSub::remove(const DoutPrefixProvider *dpp, |
446 | const rgw_raw_obj& obj, | |
f67539c2 TL |
447 | RGWObjVersionTracker *objv_tracker, |
448 | optional_yield y) | |
11fdf7f2 | 449 | { |
b3b6e05e | 450 | int ret = rgw_delete_system_obj(dpp, store->svc()->sysobj, obj.pool, obj.oid, objv_tracker, y); |
11fdf7f2 TL |
451 | if (ret < 0) { |
452 | return ret; | |
453 | } | |
454 | ||
455 | return 0; | |
456 | } | |
457 | ||
f67539c2 | 458 | int RGWPubSub::read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker *objv_tracker) |
11fdf7f2 | 459 | { |
f67539c2 | 460 | int ret = read(meta_obj, result, objv_tracker); |
eafe8130 TL |
461 | if (ret < 0) { |
462 | ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl; | |
11fdf7f2 TL |
463 | return ret; |
464 | } | |
465 | return 0; | |
466 | } | |
467 | ||
b3b6e05e | 468 | int RGWPubSub::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics, |
f67539c2 | 469 | RGWObjVersionTracker *objv_tracker, optional_yield y) |
11fdf7f2 | 470 | { |
b3b6e05e | 471 | int ret = write(dpp, meta_obj, topics, objv_tracker, y); |
11fdf7f2 | 472 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 473 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
474 | return ret; |
475 | } | |
476 | return 0; | |
477 | } | |
478 | ||
f67539c2 | 479 | int RGWPubSub::get_topics(rgw_pubsub_topics *result) |
11fdf7f2 | 480 | { |
f67539c2 | 481 | return read_topics(result, nullptr); |
11fdf7f2 TL |
482 | } |
483 | ||
f67539c2 | 484 | int RGWPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker) |
11fdf7f2 TL |
485 | { |
486 | int ret = ps->read(bucket_meta_obj, result, objv_tracker); | |
487 | if (ret < 0 && ret != -ENOENT) { | |
eafe8130 | 488 | ldout(ps->store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
489 | return ret; |
490 | } | |
491 | return 0; | |
492 | } | |
493 | ||
b3b6e05e | 494 | int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics, |
f67539c2 TL |
495 | RGWObjVersionTracker *objv_tracker, |
496 | optional_yield y) | |
11fdf7f2 | 497 | { |
b3b6e05e | 498 | int ret = ps->write(dpp, bucket_meta_obj, topics, objv_tracker, y); |
11fdf7f2 | 499 | if (ret < 0) { |
eafe8130 | 500 | ldout(ps->store->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
501 | return ret; |
502 | } | |
503 | ||
504 | return 0; | |
505 | } | |
506 | ||
f67539c2 | 507 | int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result) |
11fdf7f2 TL |
508 | { |
509 | return read_topics(result, nullptr); | |
510 | } | |
511 | ||
f67539c2 | 512 | int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result) |
11fdf7f2 | 513 | { |
f67539c2 TL |
514 | rgw_pubsub_topics topics; |
515 | int ret = get_topics(&topics); | |
11fdf7f2 | 516 | if (ret < 0) { |
eafe8130 | 517 | ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
518 | return ret; |
519 | } | |
520 | ||
521 | auto iter = topics.topics.find(name); | |
522 | if (iter == topics.topics.end()) { | |
eafe8130 | 523 | ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl; |
11fdf7f2 TL |
524 | return -ENOENT; |
525 | } | |
526 | ||
527 | *result = iter->second; | |
528 | return 0; | |
529 | } | |
530 | ||
f67539c2 | 531 | int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic *result) |
11fdf7f2 | 532 | { |
f67539c2 TL |
533 | rgw_pubsub_topics topics; |
534 | int ret = get_topics(&topics); | |
eafe8130 TL |
535 | if (ret < 0) { |
536 | ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; | |
537 | return ret; | |
538 | } | |
539 | ||
540 | auto iter = topics.topics.find(name); | |
541 | if (iter == topics.topics.end()) { | |
542 | ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl; | |
543 | return -ENOENT; | |
544 | } | |
545 | ||
546 | *result = iter->second.topic; | |
547 | return 0; | |
548 | } | |
549 | ||
b3b6e05e TL |
550 | int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y) { |
551 | return create_notification(dpp, topic_name, events, std::nullopt, "", y); | |
eafe8130 TL |
552 | } |
553 | ||
b3b6e05e | 554 | int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const string& topic_name,const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) { |
f67539c2 | 555 | rgw_pubsub_topic_subs topic_info; |
11fdf7f2 | 556 | |
f67539c2 | 557 | int ret = ps->get_topic(topic_name, &topic_info); |
11fdf7f2 | 558 | if (ret < 0) { |
b3b6e05e | 559 | ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl; |
11fdf7f2 TL |
560 | return ret; |
561 | } | |
b3b6e05e | 562 | ldpp_dout(dpp, 20) << "successfully read topic '" << topic_name << "' info" << dendl; |
11fdf7f2 TL |
563 | |
564 | RGWObjVersionTracker objv_tracker; | |
565 | rgw_pubsub_bucket_topics bucket_topics; | |
566 | ||
567 | ret = read_topics(&bucket_topics, &objv_tracker); | |
eafe8130 | 568 | if (ret < 0) { |
b3b6e05e | 569 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics from bucket '" << |
eafe8130 | 570 | bucket.name << "': ret=" << ret << dendl; |
11fdf7f2 TL |
571 | return ret; |
572 | } | |
b3b6e05e | 573 | ldpp_dout(dpp, 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" << |
eafe8130 | 574 | bucket.name << "'" << dendl; |
11fdf7f2 TL |
575 | |
576 | auto& topic_filter = bucket_topics.topics[topic_name]; | |
f67539c2 | 577 | topic_filter.topic = topic_info.topic; |
11fdf7f2 | 578 | topic_filter.events = events; |
eafe8130 TL |
579 | topic_filter.s3_id = notif_name; |
580 | if (s3_filter) { | |
581 | topic_filter.s3_filter = *s3_filter; | |
582 | } | |
11fdf7f2 | 583 | |
b3b6e05e | 584 | ret = write_topics(dpp, bucket_topics, &objv_tracker, y); |
11fdf7f2 | 585 | if (ret < 0) { |
b3b6e05e | 586 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics to bucket '" << bucket.name << "': ret=" << ret << dendl; |
11fdf7f2 TL |
587 | return ret; |
588 | } | |
eafe8130 | 589 | |
b3b6e05e | 590 | ldpp_dout(dpp, 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket.name << "'" << dendl; |
11fdf7f2 TL |
591 | |
592 | return 0; | |
593 | } | |
594 | ||
b3b6e05e | 595 | int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const string& topic_name, optional_yield y) |
11fdf7f2 | 596 | { |
f67539c2 | 597 | rgw_pubsub_topic_subs topic_info; |
11fdf7f2 | 598 | |
f67539c2 | 599 | int ret = ps->get_topic(topic_name, &topic_info); |
11fdf7f2 | 600 | if (ret < 0) { |
b3b6e05e | 601 | ldpp_dout(dpp, 1) << "ERROR: failed to read topic info: ret=" << ret << dendl; |
11fdf7f2 TL |
602 | return ret; |
603 | } | |
604 | ||
605 | RGWObjVersionTracker objv_tracker; | |
606 | rgw_pubsub_bucket_topics bucket_topics; | |
607 | ||
608 | ret = read_topics(&bucket_topics, &objv_tracker); | |
eafe8130 | 609 | if (ret < 0) { |
b3b6e05e | 610 | ldpp_dout(dpp, 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
611 | return ret; |
612 | } | |
613 | ||
614 | bucket_topics.topics.erase(topic_name); | |
615 | ||
522d829b TL |
616 | if (bucket_topics.topics.empty()) { |
617 | // no more topics - delete the notification object of the bucket | |
618 | ret = ps->remove(dpp, bucket_meta_obj, &objv_tracker, y); | |
619 | if (ret < 0 && ret != -ENOENT) { | |
20effc67 | 620 | ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl; |
522d829b TL |
621 | return ret; |
622 | } | |
623 | return 0; | |
624 | } | |
625 | ||
626 | // write back the notifications without the deleted one | |
b3b6e05e | 627 | ret = write_topics(dpp, bucket_topics, &objv_tracker, y); |
11fdf7f2 | 628 | if (ret < 0) { |
b3b6e05e | 629 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
630 | return ret; |
631 | } | |
632 | ||
633 | return 0; | |
634 | } | |
635 | ||
b3b6e05e | 636 | int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) |
f67539c2 TL |
637 | { |
638 | // get all topics on a bucket | |
639 | rgw_pubsub_bucket_topics bucket_topics; | |
640 | auto ret = get_topics(&bucket_topics); | |
641 | if (ret < 0 && ret != -ENOENT) { | |
b3b6e05e | 642 | ldpp_dout(dpp, 1) << "ERROR: failed to get list of topics from bucket '" << bucket.name << "', ret=" << ret << dendl; |
f67539c2 TL |
643 | return ret ; |
644 | } | |
645 | ||
646 | // remove all auto-genrated topics | |
647 | for (const auto& topic : bucket_topics.topics) { | |
648 | const auto& topic_name = topic.first; | |
b3b6e05e | 649 | ret = ps->remove_topic(dpp, topic_name, y); |
f67539c2 | 650 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 651 | ldpp_dout(dpp, 5) << "WARNING: failed to remove auto-generated topic '" << topic_name << "', ret=" << ret << dendl; |
f67539c2 TL |
652 | } |
653 | } | |
654 | ||
522d829b | 655 | // delete the notification object of the bucket |
b3b6e05e | 656 | ret = ps->remove(dpp, bucket_meta_obj, nullptr, y); |
f67539c2 | 657 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 658 | ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl; |
f67539c2 TL |
659 | return ret; |
660 | } | |
661 | ||
662 | return 0; | |
eafe8130 TL |
663 | } |
664 | ||
b3b6e05e TL |
665 | int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y) { |
666 | return create_topic(dpp, name, rgw_pubsub_sub_dest(), "", "", y); | |
f67539c2 TL |
667 | } |
668 | ||
b3b6e05e | 669 | int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) { |
11fdf7f2 | 670 | RGWObjVersionTracker objv_tracker; |
f67539c2 | 671 | rgw_pubsub_topics topics; |
11fdf7f2 | 672 | |
f67539c2 | 673 | int ret = read_topics(&topics, &objv_tracker); |
11fdf7f2 | 674 | if (ret < 0 && ret != -ENOENT) { |
eafe8130 | 675 | // its not an error if not topics exist, we create one |
b3b6e05e | 676 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
677 | return ret; |
678 | } | |
eafe8130 | 679 | |
11fdf7f2 | 680 | rgw_pubsub_topic_subs& new_topic = topics.topics[name]; |
f67539c2 | 681 | new_topic.topic.user = rgw_user("", tenant); |
11fdf7f2 | 682 | new_topic.topic.name = name; |
eafe8130 TL |
683 | new_topic.topic.dest = dest; |
684 | new_topic.topic.arn = arn; | |
9f95a23c | 685 | new_topic.topic.opaque_data = opaque_data; |
11fdf7f2 | 686 | |
b3b6e05e | 687 | ret = write_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 688 | if (ret < 0) { |
b3b6e05e | 689 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
690 | return ret; |
691 | } | |
692 | ||
693 | return 0; | |
694 | } | |
695 | ||
b3b6e05e | 696 | int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y) |
11fdf7f2 TL |
697 | { |
698 | RGWObjVersionTracker objv_tracker; | |
f67539c2 | 699 | rgw_pubsub_topics topics; |
11fdf7f2 | 700 | |
f67539c2 | 701 | int ret = read_topics(&topics, &objv_tracker); |
11fdf7f2 | 702 | if (ret < 0 && ret != -ENOENT) { |
b3b6e05e | 703 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 | 704 | return ret; |
eafe8130 TL |
705 | } else if (ret == -ENOENT) { |
706 | // its not an error if no topics exist, just a no-op | |
b3b6e05e | 707 | ldpp_dout(dpp, 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret << dendl; |
eafe8130 | 708 | return 0; |
11fdf7f2 TL |
709 | } |
710 | ||
711 | topics.topics.erase(name); | |
712 | ||
b3b6e05e | 713 | ret = write_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 714 | if (ret < 0) { |
b3b6e05e | 715 | ldpp_dout(dpp, 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
716 | return ret; |
717 | } | |
718 | ||
719 | return 0; | |
720 | } | |
721 | ||
f67539c2 | 722 | int RGWPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker) |
11fdf7f2 TL |
723 | { |
724 | int ret = ps->read(sub_meta_obj, result, objv_tracker); | |
725 | if (ret < 0 && ret != -ENOENT) { | |
eafe8130 | 726 | ldout(ps->store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
727 | return ret; |
728 | } | |
729 | return 0; | |
730 | } | |
731 | ||
b3b6e05e TL |
732 | int RGWPubSub::Sub::write_sub(const DoutPrefixProvider *dpp, |
733 | const rgw_pubsub_sub_config& sub_conf, | |
f67539c2 TL |
734 | RGWObjVersionTracker *objv_tracker, |
735 | optional_yield y) | |
11fdf7f2 | 736 | { |
b3b6e05e | 737 | int ret = ps->write(dpp, sub_meta_obj, sub_conf, objv_tracker, y); |
11fdf7f2 | 738 | if (ret < 0) { |
b3b6e05e | 739 | ldpp_dout(dpp, 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
740 | return ret; |
741 | } | |
742 | ||
743 | return 0; | |
744 | } | |
745 | ||
b3b6e05e | 746 | int RGWPubSub::Sub::remove_sub(const DoutPrefixProvider *dpp, RGWObjVersionTracker *objv_tracker, |
f67539c2 | 747 | optional_yield y) |
11fdf7f2 | 748 | { |
b3b6e05e | 749 | int ret = ps->remove(dpp, sub_meta_obj, objv_tracker, y); |
11fdf7f2 | 750 | if (ret < 0) { |
b3b6e05e | 751 | ldpp_dout(dpp, 1) << "ERROR: failed to remove subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
752 | return ret; |
753 | } | |
754 | ||
755 | return 0; | |
756 | } | |
757 | ||
f67539c2 | 758 | int RGWPubSub::Sub::get_conf(rgw_pubsub_sub_config *result) |
11fdf7f2 TL |
759 | { |
760 | return read_sub(result, nullptr); | |
761 | } | |
762 | ||
b3b6e05e | 763 | int RGWPubSub::Sub::subscribe(const DoutPrefixProvider *dpp, const string& topic, const rgw_pubsub_sub_dest& dest, optional_yield y, const std::string& s3_id) |
11fdf7f2 | 764 | { |
f67539c2 TL |
765 | RGWObjVersionTracker objv_tracker; |
766 | rgw_pubsub_topics topics; | |
11fdf7f2 | 767 | |
f67539c2 | 768 | int ret = ps->read_topics(&topics, &objv_tracker); |
11fdf7f2 | 769 | if (ret < 0) { |
b3b6e05e | 770 | ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
eafe8130 | 771 | return ret != -ENOENT ? ret : -EINVAL; |
11fdf7f2 TL |
772 | } |
773 | ||
774 | auto iter = topics.topics.find(topic); | |
775 | if (iter == topics.topics.end()) { | |
b3b6e05e | 776 | ldpp_dout(dpp, 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl; |
eafe8130 | 777 | return -EINVAL; |
11fdf7f2 TL |
778 | } |
779 | ||
780 | auto& t = iter->second; | |
781 | ||
782 | rgw_pubsub_sub_config sub_conf; | |
783 | ||
f67539c2 | 784 | sub_conf.user = rgw_user("", ps->tenant); |
11fdf7f2 TL |
785 | sub_conf.name = sub; |
786 | sub_conf.topic = topic; | |
787 | sub_conf.dest = dest; | |
eafe8130 | 788 | sub_conf.s3_id = s3_id; |
11fdf7f2 TL |
789 | |
790 | t.subs.insert(sub); | |
791 | ||
b3b6e05e | 792 | ret = ps->write_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 793 | if (ret < 0) { |
b3b6e05e | 794 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
795 | return ret; |
796 | } | |
797 | ||
b3b6e05e | 798 | ret = write_sub(dpp, sub_conf, nullptr, y); |
11fdf7f2 | 799 | if (ret < 0) { |
b3b6e05e | 800 | ldpp_dout(dpp, 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
801 | return ret; |
802 | } | |
803 | return 0; | |
804 | } | |
805 | ||
b3b6e05e | 806 | int RGWPubSub::Sub::unsubscribe(const DoutPrefixProvider *dpp, const string& _topic, optional_yield y) |
11fdf7f2 TL |
807 | { |
808 | string topic = _topic; | |
809 | RGWObjVersionTracker sobjv_tracker; | |
11fdf7f2 TL |
810 | |
811 | if (topic.empty()) { | |
812 | rgw_pubsub_sub_config sub_conf; | |
813 | int ret = read_sub(&sub_conf, &sobjv_tracker); | |
814 | if (ret < 0) { | |
b3b6e05e | 815 | ldpp_dout(dpp, 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
816 | return ret; |
817 | } | |
818 | topic = sub_conf.topic; | |
819 | } | |
820 | ||
821 | RGWObjVersionTracker objv_tracker; | |
f67539c2 | 822 | rgw_pubsub_topics topics; |
11fdf7f2 | 823 | |
f67539c2 | 824 | int ret = ps->read_topics(&topics, &objv_tracker); |
11fdf7f2 | 825 | if (ret < 0) { |
eafe8130 | 826 | // not an error - could be that topic was already deleted |
b3b6e05e | 827 | ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl; |
eafe8130 | 828 | } else { |
11fdf7f2 TL |
829 | auto iter = topics.topics.find(topic); |
830 | if (iter != topics.topics.end()) { | |
831 | auto& t = iter->second; | |
832 | ||
833 | t.subs.erase(sub); | |
834 | ||
b3b6e05e | 835 | ret = ps->write_topics(dpp, topics, &objv_tracker, y); |
11fdf7f2 | 836 | if (ret < 0) { |
b3b6e05e | 837 | ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
838 | return ret; |
839 | } | |
840 | } | |
841 | } | |
842 | ||
b3b6e05e | 843 | ret = remove_sub(dpp, &sobjv_tracker, y); |
11fdf7f2 | 844 | if (ret < 0) { |
b3b6e05e | 845 | ldpp_dout(dpp, 1) << "ERROR: failed to delete subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
846 | return ret; |
847 | } | |
848 | return 0; | |
849 | } | |
850 | ||
eafe8130 | 851 | template<typename EventType> |
f67539c2 | 852 | void RGWPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const |
11fdf7f2 TL |
853 | { |
854 | encode_json("next_marker", next_marker, f); | |
855 | encode_json("is_truncated", is_truncated, f); | |
856 | ||
eafe8130 | 857 | Formatter::ArraySection s(*f, EventType::json_type_plural); |
11fdf7f2 | 858 | for (auto& event : events) { |
92f5a8d4 | 859 | encode_json("", event, f); |
11fdf7f2 TL |
860 | } |
861 | } | |
862 | ||
eafe8130 | 863 | template<typename EventType> |
b3b6e05e | 864 | int RGWPubSub::SubWithEvents<EventType>::list_events(const DoutPrefixProvider *dpp, const string& marker, int max_events) |
11fdf7f2 | 865 | { |
9f95a23c | 866 | RGWRados *store = ps->store->getRados(); |
11fdf7f2 TL |
867 | rgw_pubsub_sub_config sub_conf; |
868 | int ret = get_conf(&sub_conf); | |
869 | if (ret < 0) { | |
b3b6e05e | 870 | ldpp_dout(dpp, 1) << "ERROR: failed to read sub config: ret=" << ret << dendl; |
11fdf7f2 TL |
871 | return ret; |
872 | } | |
873 | ||
874 | RGWBucketInfo bucket_info; | |
875 | string tenant; | |
9f95a23c | 876 | ret = store->get_bucket_info(&store->svc, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr); |
11fdf7f2 | 877 | if (ret == -ENOENT) { |
eafe8130 | 878 | list.is_truncated = false; |
11fdf7f2 TL |
879 | return 0; |
880 | } | |
881 | if (ret < 0) { | |
b3b6e05e | 882 | ldpp_dout(dpp, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; |
11fdf7f2 TL |
883 | return ret; |
884 | } | |
885 | ||
886 | RGWRados::Bucket target(store, bucket_info); | |
887 | RGWRados::Bucket::List list_op(&target); | |
888 | ||
889 | list_op.params.prefix = sub_conf.dest.oid_prefix; | |
890 | list_op.params.marker = marker; | |
891 | ||
eafe8130 | 892 | std::vector<rgw_bucket_dir_entry> objs; |
11fdf7f2 | 893 | |
b3b6e05e | 894 | ret = list_op.list_objects(dpp, max_events, &objs, nullptr, &list.is_truncated, null_yield); |
11fdf7f2 | 895 | if (ret < 0) { |
b3b6e05e | 896 | ldpp_dout(dpp, 1) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; |
11fdf7f2 TL |
897 | return ret; |
898 | } | |
eafe8130 TL |
899 | if (list.is_truncated) { |
900 | list.next_marker = list_op.get_next_marker().name; | |
11fdf7f2 TL |
901 | } |
902 | ||
903 | for (auto& obj : objs) { | |
904 | bufferlist bl64; | |
905 | bufferlist bl; | |
906 | bl64.append(obj.meta.user_data); | |
907 | try { | |
908 | bl.decode_base64(bl64); | |
909 | } catch (buffer::error& err) { | |
b3b6e05e | 910 | ldpp_dout(dpp, 1) << "ERROR: failed to event (not a valid base64)" << dendl; |
11fdf7f2 TL |
911 | continue; |
912 | } | |
eafe8130 | 913 | EventType event; |
11fdf7f2 TL |
914 | |
915 | auto iter = bl.cbegin(); | |
916 | try { | |
917 | decode(event, iter); | |
918 | } catch (buffer::error& err) { | |
b3b6e05e | 919 | ldpp_dout(dpp, 1) << "ERROR: failed to decode event" << dendl; |
11fdf7f2 TL |
920 | continue; |
921 | }; | |
922 | ||
eafe8130 | 923 | list.events.push_back(event); |
11fdf7f2 TL |
924 | } |
925 | return 0; | |
926 | } | |
927 | ||
eafe8130 | 928 | template<typename EventType> |
b3b6e05e | 929 | int RGWPubSub::SubWithEvents<EventType>::remove_event(const DoutPrefixProvider *dpp, const string& event_id) |
11fdf7f2 | 930 | { |
20effc67 | 931 | rgw::sal::RadosStore* store = ps->store; |
11fdf7f2 TL |
932 | rgw_pubsub_sub_config sub_conf; |
933 | int ret = get_conf(&sub_conf); | |
934 | if (ret < 0) { | |
b3b6e05e | 935 | ldpp_dout(dpp, 1) << "ERROR: failed to read sub config: ret=" << ret << dendl; |
11fdf7f2 TL |
936 | return ret; |
937 | } | |
938 | ||
939 | RGWBucketInfo bucket_info; | |
940 | string tenant; | |
9f95a23c | 941 | ret = store->getRados()->get_bucket_info(store->svc(), tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr); |
11fdf7f2 | 942 | if (ret < 0) { |
b3b6e05e | 943 | ldpp_dout(dpp, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; |
11fdf7f2 TL |
944 | return ret; |
945 | } | |
946 | ||
947 | rgw_bucket& bucket = bucket_info.bucket; | |
948 | ||
949 | RGWObjectCtx obj_ctx(store); | |
950 | rgw_obj obj(bucket, sub_conf.dest.oid_prefix + event_id); | |
951 | ||
952 | obj_ctx.set_atomic(obj); | |
953 | ||
9f95a23c | 954 | RGWRados::Object del_target(store->getRados(), bucket_info, obj_ctx, obj); |
11fdf7f2 TL |
955 | RGWRados::Object::Delete del_op(&del_target); |
956 | ||
957 | del_op.params.bucket_owner = bucket_info.owner; | |
958 | del_op.params.versioning_status = bucket_info.versioning_status(); | |
959 | ||
b3b6e05e | 960 | ret = del_op.delete_obj(null_yield, dpp); |
11fdf7f2 | 961 | if (ret < 0) { |
b3b6e05e | 962 | ldpp_dout(dpp, 1) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl; |
11fdf7f2 TL |
963 | } |
964 | return 0; | |
965 | } | |
eafe8130 | 966 | |
f67539c2 TL |
967 | void RGWPubSub::get_meta_obj(rgw_raw_obj *obj) const { |
968 | *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, meta_oid()); | |
9f95a23c TL |
969 | } |
970 | ||
f67539c2 | 971 | void RGWPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const { |
9f95a23c TL |
972 | *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket)); |
973 | } | |
974 | ||
f67539c2 | 975 | void RGWPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const { |
9f95a23c TL |
976 | *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name)); |
977 | } | |
978 | ||
eafe8130 | 979 | template<typename EventType> |
f67539c2 | 980 | void RGWPubSub::SubWithEvents<EventType>::dump(Formatter* f) const { |
eafe8130 TL |
981 | list.dump(f); |
982 | } | |
983 | ||
984 | // explicit instantiation for the only two possible types | |
985 | // no need to move implementation to header | |
f67539c2 TL |
986 | template class RGWPubSub::SubWithEvents<rgw_pubsub_event>; |
987 | template class RGWPubSub::SubWithEvents<rgw_pubsub_s3_event>; | |
eafe8130 | 988 |