]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #include "services/svc_zone.h" | |
11fdf7f2 TL |
5 | #include "rgw_b64.h" |
6 | #include "rgw_rados.h" | |
7 | #include "rgw_pubsub.h" | |
8 | #include "rgw_tools.h" | |
eafe8130 TL |
9 | #include "rgw_xml.h" |
10 | #include "rgw_arn.h" | |
11 | #include "rgw_pubsub_push.h" | |
12 | #include "rgw_rados.h" | |
13 | #include <regex> | |
14 | #include <algorithm> | |
11fdf7f2 TL |
15 | |
16 | #define dout_subsys ceph_subsys_rgw | |
17 | ||
92f5a8d4 TL |
18 | void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) { |
19 | char buf[64]; | |
20 | const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str()); | |
21 | if (len > 0) { | |
22 | id.assign(buf, len); | |
23 | } | |
24 | } | |
25 | ||
eafe8130 TL |
26 | bool rgw_s3_key_filter::decode_xml(XMLObj* obj) { |
27 | XMLObjIter iter = obj->find("FilterRule"); | |
28 | XMLObj *o; | |
29 | ||
30 | const auto throw_if_missing = true; | |
31 | auto prefix_not_set = true; | |
32 | auto suffix_not_set = true; | |
33 | auto regex_not_set = true; | |
34 | std::string name; | |
35 | ||
36 | while ((o = iter.get_next())) { | |
37 | RGWXMLDecoder::decode_xml("Name", name, o, throw_if_missing); | |
38 | if (name == "prefix" && prefix_not_set) { | |
39 | prefix_not_set = false; | |
40 | RGWXMLDecoder::decode_xml("Value", prefix_rule, o, throw_if_missing); | |
41 | } else if (name == "suffix" && suffix_not_set) { | |
42 | suffix_not_set = false; | |
43 | RGWXMLDecoder::decode_xml("Value", suffix_rule, o, throw_if_missing); | |
44 | } else if (name == "regex" && regex_not_set) { | |
45 | regex_not_set = false; | |
46 | RGWXMLDecoder::decode_xml("Value", regex_rule, o, throw_if_missing); | |
47 | } else { | |
48 | throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name + "'"); | |
49 | } | |
50 | } | |
51 | return true; | |
52 | } | |
53 | ||
54 | void rgw_s3_key_filter::dump_xml(Formatter *f) const { | |
55 | if (!prefix_rule.empty()) { | |
56 | f->open_object_section("FilterRule"); | |
57 | ::encode_xml("Name", "prefix", f); | |
58 | ::encode_xml("Value", prefix_rule, f); | |
59 | f->close_section(); | |
60 | } | |
61 | if (!suffix_rule.empty()) { | |
62 | f->open_object_section("FilterRule"); | |
63 | ::encode_xml("Name", "suffix", f); | |
64 | ::encode_xml("Value", suffix_rule, f); | |
65 | f->close_section(); | |
66 | } | |
67 | if (!regex_rule.empty()) { | |
68 | f->open_object_section("FilterRule"); | |
69 | ::encode_xml("Name", "regex", f); | |
70 | ::encode_xml("Value", regex_rule, f); | |
71 | f->close_section(); | |
72 | } | |
73 | } | |
74 | ||
75 | bool rgw_s3_key_filter::has_content() const { | |
76 | return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty()); | |
77 | } | |
78 | ||
79 | bool rgw_s3_metadata_filter::decode_xml(XMLObj* obj) { | |
80 | metadata.clear(); | |
81 | XMLObjIter iter = obj->find("FilterRule"); | |
82 | XMLObj *o; | |
83 | ||
84 | const auto throw_if_missing = true; | |
85 | ||
86 | std::string key; | |
87 | std::string value; | |
88 | ||
89 | while ((o = iter.get_next())) { | |
90 | RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing); | |
91 | RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing); | |
92 | metadata.emplace(key, value); | |
93 | } | |
94 | return true; | |
95 | } | |
96 | ||
97 | void rgw_s3_metadata_filter::dump_xml(Formatter *f) const { | |
98 | for (const auto& key_value : metadata) { | |
99 | f->open_object_section("FilterRule"); | |
100 | ::encode_xml("Name", key_value.first, f); | |
101 | ::encode_xml("Value", key_value.second, f); | |
102 | f->close_section(); | |
103 | } | |
104 | } | |
105 | ||
106 | bool rgw_s3_metadata_filter::has_content() const { | |
107 | return !metadata.empty(); | |
108 | } | |
109 | ||
110 | bool rgw_s3_filter::decode_xml(XMLObj* obj) { | |
111 | RGWXMLDecoder::decode_xml("S3Key", key_filter, obj); | |
112 | RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj); | |
113 | return true; | |
114 | } | |
115 | ||
116 | void rgw_s3_filter::dump_xml(Formatter *f) const { | |
117 | if (key_filter.has_content()) { | |
118 | ::encode_xml("S3Key", key_filter, f); | |
119 | } | |
120 | if (metadata_filter.has_content()) { | |
121 | ::encode_xml("S3Metadata", metadata_filter, f); | |
122 | } | |
123 | } | |
124 | ||
125 | bool rgw_s3_filter::has_content() const { | |
126 | return key_filter.has_content() || | |
127 | metadata_filter.has_content(); | |
128 | } | |
129 | ||
130 | bool match(const rgw_s3_key_filter& filter, const std::string& key) { | |
131 | const auto key_size = key.size(); | |
132 | const auto prefix_size = filter.prefix_rule.size(); | |
133 | if (prefix_size != 0) { | |
134 | // prefix rule exists | |
135 | if (prefix_size > key_size) { | |
136 | // if prefix is longer than key, we fail | |
137 | return false; | |
138 | } | |
139 | if (!std::equal(filter.prefix_rule.begin(), filter.prefix_rule.end(), key.begin())) { | |
140 | return false; | |
141 | } | |
142 | } | |
143 | const auto suffix_size = filter.suffix_rule.size(); | |
144 | if (suffix_size != 0) { | |
145 | // suffix rule exists | |
146 | if (suffix_size > key_size) { | |
147 | // if suffix is longer than key, we fail | |
148 | return false; | |
149 | } | |
150 | if (!std::equal(filter.suffix_rule.begin(), filter.suffix_rule.end(), (key.end() - suffix_size))) { | |
151 | return false; | |
152 | } | |
153 | } | |
154 | if (!filter.regex_rule.empty()) { | |
155 | // TODO add regex chaching in the filter | |
156 | const std::regex base_regex(filter.regex_rule); | |
157 | if (!std::regex_match(key, base_regex)) { | |
158 | return false; | |
159 | } | |
160 | } | |
161 | return true; | |
162 | } | |
163 | ||
164 | bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata) { | |
165 | // all filter pairs must exist with the same value in the object's metadata | |
166 | // object metadata may include items not in the filter | |
167 | return std::includes(metadata.begin(), metadata.end(), filter.metadata.begin(), filter.metadata.end()); | |
168 | } | |
169 | ||
170 | bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) { | |
171 | // if event list exists, and none of the events in the list matches the event type, filter the message | |
172 | if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) { | |
173 | return false; | |
174 | } | |
175 | return true; | |
176 | } | |
177 | ||
178 | void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj) { | |
179 | l.clear(); | |
180 | ||
181 | XMLObjIter iter = obj->find(name); | |
182 | XMLObj *o; | |
183 | ||
184 | while ((o = iter.get_next())) { | |
185 | std::string val; | |
186 | decode_xml_obj(val, o); | |
187 | l.push_back(rgw::notify::from_string(val)); | |
188 | } | |
189 | } | |
190 | ||
191 | bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) { | |
192 | const auto throw_if_missing = true; | |
193 | RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing); | |
194 | ||
195 | RGWXMLDecoder::decode_xml("Topic", topic_arn, obj, throw_if_missing); | |
196 | ||
197 | RGWXMLDecoder::decode_xml("Filter", filter, obj); | |
198 | ||
199 | do_decode_xml_obj(events, "Event", obj); | |
200 | if (events.empty()) { | |
201 | // if no events are provided, we assume all events | |
202 | events.push_back(rgw::notify::ObjectCreated); | |
203 | events.push_back(rgw::notify::ObjectRemoved); | |
204 | } | |
205 | return true; | |
206 | } | |
207 | ||
208 | void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const { | |
209 | ::encode_xml("Id", id, f); | |
210 | ::encode_xml("Topic", topic_arn.c_str(), f); | |
211 | if (filter.has_content()) { | |
212 | ::encode_xml("Filter", filter, f); | |
213 | } | |
214 | for (const auto& event : events) { | |
215 | ::encode_xml("Event", rgw::notify::to_string(event), f); | |
216 | } | |
217 | } | |
218 | ||
219 | bool rgw_pubsub_s3_notifications::decode_xml(XMLObj *obj) { | |
220 | do_decode_xml_obj(list, "TopicConfiguration", obj); | |
221 | if (list.empty()) { | |
222 | throw RGWXMLDecoder::err("at least one 'TopicConfiguration' must exist"); | |
223 | } | |
224 | return true; | |
225 | } | |
226 | ||
227 | rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter) : | |
228 | id(topic_filter.s3_id), events(topic_filter.events), topic_arn(topic_filter.topic.arn), filter(topic_filter.s3_filter) {} | |
229 | ||
230 | void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const { | |
231 | do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f); | |
232 | } | |
233 | ||
234 | void rgw_pubsub_s3_record::dump(Formatter *f) const { | |
235 | encode_json("eventVersion", eventVersion, f); | |
236 | encode_json("eventSource", eventSource, f); | |
237 | encode_json("awsRegion", awsRegion, f); | |
238 | utime_t ut(eventTime); | |
239 | encode_json("eventTime", ut, f); | |
240 | encode_json("eventName", eventName, f); | |
241 | { | |
242 | Formatter::ObjectSection s(*f, "userIdentity"); | |
243 | encode_json("principalId", userIdentity, f); | |
244 | } | |
245 | { | |
246 | Formatter::ObjectSection s(*f, "requestParameters"); | |
247 | encode_json("sourceIPAddress", sourceIPAddress, f); | |
248 | } | |
249 | { | |
250 | Formatter::ObjectSection s(*f, "responseElements"); | |
251 | encode_json("x-amz-request-id", x_amz_request_id, f); | |
252 | encode_json("x-amz-id-2", x_amz_id_2, f); | |
253 | } | |
254 | { | |
255 | Formatter::ObjectSection s(*f, "s3"); | |
256 | encode_json("s3SchemaVersion", s3SchemaVersion, f); | |
257 | encode_json("configurationId", configurationId, f); | |
258 | { | |
259 | Formatter::ObjectSection sub_s(*f, "bucket"); | |
260 | encode_json("name", bucket_name, f); | |
261 | { | |
262 | Formatter::ObjectSection sub_sub_s(*f, "ownerIdentity"); | |
263 | encode_json("principalId", bucket_ownerIdentity, f); | |
264 | } | |
265 | encode_json("arn", bucket_arn, f); | |
266 | encode_json("id", bucket_id, f); | |
267 | } | |
268 | { | |
269 | Formatter::ObjectSection sub_s(*f, "object"); | |
270 | encode_json("key", object_key, f); | |
271 | encode_json("size", object_size, f); | |
272 | encode_json("etag", object_etag, f); | |
273 | encode_json("versionId", object_versionId, f); | |
274 | encode_json("sequencer", object_sequencer, f); | |
275 | encode_json("metadata", x_meta_map, f); | |
276 | } | |
277 | } | |
278 | encode_json("eventId", id, f); | |
279 | } | |
11fdf7f2 TL |
280 | |
281 | void rgw_pubsub_event::dump(Formatter *f) const | |
282 | { | |
283 | encode_json("id", id, f); | |
eafe8130 | 284 | encode_json("event", event_name, f); |
11fdf7f2 TL |
285 | utime_t ut(timestamp); |
286 | encode_json("timestamp", ut, f); | |
287 | encode_json("info", info, f); | |
288 | } | |
289 | ||
290 | void rgw_pubsub_topic::dump(Formatter *f) const | |
291 | { | |
292 | encode_json("user", user, f); | |
293 | encode_json("name", name, f); | |
eafe8130 TL |
294 | encode_json("dest", dest, f); |
295 | encode_json("arn", arn, f); | |
296 | } | |
297 | ||
298 | void rgw_pubsub_topic::dump_xml(Formatter *f) const | |
299 | { | |
300 | encode_xml("User", user, f); | |
301 | encode_xml("Name", name, f); | |
302 | encode_xml("EndPoint", dest, f); | |
303 | encode_xml("TopicArn", arn, f); | |
304 | } | |
305 | ||
306 | void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f) | |
307 | { | |
308 | f->open_array_section(name); | |
309 | for (auto iter = l.cbegin(); iter != l.cend(); ++iter) { | |
310 | f->dump_string("obj", rgw::notify::to_ceph_string(*iter)); | |
311 | } | |
312 | f->close_section(); | |
11fdf7f2 TL |
313 | } |
314 | ||
315 | void rgw_pubsub_topic_filter::dump(Formatter *f) const | |
316 | { | |
317 | encode_json("topic", topic, f); | |
318 | encode_json("events", events, f); | |
319 | } | |
320 | ||
321 | void rgw_pubsub_topic_subs::dump(Formatter *f) const | |
322 | { | |
323 | encode_json("topic", topic, f); | |
324 | encode_json("subs", subs, f); | |
325 | } | |
326 | ||
327 | void rgw_pubsub_bucket_topics::dump(Formatter *f) const | |
328 | { | |
329 | Formatter::ArraySection s(*f, "topics"); | |
330 | for (auto& t : topics) { | |
331 | encode_json(t.first.c_str(), t.second, f); | |
332 | } | |
333 | } | |
334 | ||
335 | void rgw_pubsub_user_topics::dump(Formatter *f) const | |
336 | { | |
337 | Formatter::ArraySection s(*f, "topics"); | |
338 | for (auto& t : topics) { | |
339 | encode_json(t.first.c_str(), t.second, f); | |
340 | } | |
341 | } | |
342 | ||
eafe8130 TL |
343 | void rgw_pubsub_user_topics::dump_xml(Formatter *f) const |
344 | { | |
345 | for (auto& t : topics) { | |
346 | encode_xml("member", t.second.topic, f); | |
347 | } | |
348 | } | |
349 | ||
11fdf7f2 TL |
350 | void rgw_pubsub_sub_dest::dump(Formatter *f) const |
351 | { | |
352 | encode_json("bucket_name", bucket_name, f); | |
353 | encode_json("oid_prefix", oid_prefix, f); | |
354 | encode_json("push_endpoint", push_endpoint, f); | |
eafe8130 TL |
355 | encode_json("push_endpoint_args", push_endpoint_args, f); |
356 | encode_json("push_endpoint_topic", arn_topic, f); | |
357 | } | |
358 | ||
359 | void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const | |
360 | { | |
361 | encode_xml("EndpointAddress", push_endpoint, f); | |
362 | encode_xml("EndpointArgs", push_endpoint_args, f); | |
363 | encode_xml("EndpointTopic", arn_topic, f); | |
11fdf7f2 TL |
364 | } |
365 | ||
366 | void rgw_pubsub_sub_config::dump(Formatter *f) const | |
367 | { | |
368 | encode_json("user", user, f); | |
369 | encode_json("name", name, f); | |
370 | encode_json("topic", topic, f); | |
371 | encode_json("dest", dest, f); | |
eafe8130 | 372 | encode_json("s3_id", s3_id, f); |
11fdf7f2 TL |
373 | } |
374 | ||
375 | ||
376 | int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) | |
377 | { | |
378 | int ret = rgw_delete_system_obj(store, obj.pool, obj.oid, objv_tracker); | |
379 | if (ret < 0) { | |
380 | return ret; | |
381 | } | |
382 | ||
383 | return 0; | |
384 | } | |
385 | ||
386 | int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker) | |
387 | { | |
388 | int ret = read(user_meta_obj, result, objv_tracker); | |
eafe8130 TL |
389 | if (ret < 0) { |
390 | ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl; | |
11fdf7f2 TL |
391 | return ret; |
392 | } | |
393 | return 0; | |
394 | } | |
395 | ||
396 | int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker) | |
397 | { | |
398 | int ret = write(user_meta_obj, topics, objv_tracker); | |
399 | if (ret < 0 && ret != -ENOENT) { | |
eafe8130 | 400 | ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
401 | return ret; |
402 | } | |
403 | return 0; | |
404 | } | |
405 | ||
406 | int RGWUserPubSub::get_user_topics(rgw_pubsub_user_topics *result) | |
407 | { | |
408 | return read_user_topics(result, nullptr); | |
409 | } | |
410 | ||
411 | int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker) | |
412 | { | |
413 | int ret = ps->read(bucket_meta_obj, result, objv_tracker); | |
414 | if (ret < 0 && ret != -ENOENT) { | |
eafe8130 | 415 | ldout(ps->store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
416 | return ret; |
417 | } | |
418 | return 0; | |
419 | } | |
420 | ||
421 | int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker) | |
422 | { | |
423 | int ret = ps->write(bucket_meta_obj, topics, objv_tracker); | |
424 | if (ret < 0) { | |
eafe8130 | 425 | ldout(ps->store->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
426 | return ret; |
427 | } | |
428 | ||
429 | return 0; | |
430 | } | |
431 | ||
432 | int RGWUserPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result) | |
433 | { | |
434 | return read_topics(result, nullptr); | |
435 | } | |
436 | ||
437 | int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result) | |
438 | { | |
439 | rgw_pubsub_user_topics topics; | |
440 | int ret = get_user_topics(&topics); | |
441 | if (ret < 0) { | |
eafe8130 | 442 | ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
443 | return ret; |
444 | } | |
445 | ||
446 | auto iter = topics.topics.find(name); | |
447 | if (iter == topics.topics.end()) { | |
eafe8130 | 448 | ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl; |
11fdf7f2 TL |
449 | return -ENOENT; |
450 | } | |
451 | ||
452 | *result = iter->second; | |
453 | return 0; | |
454 | } | |
455 | ||
eafe8130 | 456 | int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic *result) |
11fdf7f2 | 457 | { |
eafe8130 TL |
458 | rgw_pubsub_user_topics topics; |
459 | int ret = get_user_topics(&topics); | |
460 | if (ret < 0) { | |
461 | ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; | |
462 | return ret; | |
463 | } | |
464 | ||
465 | auto iter = topics.topics.find(name); | |
466 | if (iter == topics.topics.end()) { | |
467 | ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl; | |
468 | return -ENOENT; | |
469 | } | |
470 | ||
471 | *result = iter->second.topic; | |
472 | return 0; | |
473 | } | |
474 | ||
475 | int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events) { | |
476 | return create_notification(topic_name, events, std::nullopt, ""); | |
477 | } | |
478 | ||
479 | int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name) { | |
11fdf7f2 TL |
480 | rgw_pubsub_topic_subs user_topic_info; |
481 | RGWRados *store = ps->store; | |
482 | ||
483 | int ret = ps->get_topic(topic_name, &user_topic_info); | |
484 | if (ret < 0) { | |
eafe8130 | 485 | ldout(store->ctx(), 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl; |
11fdf7f2 TL |
486 | return ret; |
487 | } | |
eafe8130 | 488 | ldout(store->ctx(), 20) << "successfully read topic '" << topic_name << "' info" << dendl; |
11fdf7f2 TL |
489 | |
490 | RGWObjVersionTracker objv_tracker; | |
491 | rgw_pubsub_bucket_topics bucket_topics; | |
492 | ||
493 | ret = read_topics(&bucket_topics, &objv_tracker); | |
eafe8130 TL |
494 | if (ret < 0) { |
495 | ldout(store->ctx(), 1) << "ERROR: failed to read topics from bucket '" << | |
496 | bucket.name << "': ret=" << ret << dendl; | |
11fdf7f2 TL |
497 | return ret; |
498 | } | |
eafe8130 TL |
499 | ldout(store->ctx(), 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" << |
500 | bucket.name << "'" << dendl; | |
11fdf7f2 TL |
501 | |
502 | auto& topic_filter = bucket_topics.topics[topic_name]; | |
503 | topic_filter.topic = user_topic_info.topic; | |
504 | topic_filter.events = events; | |
eafe8130 TL |
505 | topic_filter.s3_id = notif_name; |
506 | if (s3_filter) { | |
507 | topic_filter.s3_filter = *s3_filter; | |
508 | } | |
11fdf7f2 TL |
509 | |
510 | ret = write_topics(bucket_topics, &objv_tracker); | |
511 | if (ret < 0) { | |
eafe8130 | 512 | ldout(store->ctx(), 1) << "ERROR: failed to write topics to bucket '" << bucket.name << "': ret=" << ret << dendl; |
11fdf7f2 TL |
513 | return ret; |
514 | } | |
eafe8130 TL |
515 | |
516 | ldout(store->ctx(), 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket.name << "'" << dendl; | |
11fdf7f2 TL |
517 | |
518 | return 0; | |
519 | } | |
520 | ||
521 | int RGWUserPubSub::Bucket::remove_notification(const string& topic_name) | |
522 | { | |
523 | rgw_pubsub_topic_subs user_topic_info; | |
524 | RGWRados *store = ps->store; | |
525 | ||
526 | int ret = ps->get_topic(topic_name, &user_topic_info); | |
527 | if (ret < 0) { | |
eafe8130 | 528 | ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl; |
11fdf7f2 TL |
529 | return ret; |
530 | } | |
531 | ||
532 | RGWObjVersionTracker objv_tracker; | |
533 | rgw_pubsub_bucket_topics bucket_topics; | |
534 | ||
535 | ret = read_topics(&bucket_topics, &objv_tracker); | |
eafe8130 TL |
536 | if (ret < 0) { |
537 | ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; | |
11fdf7f2 TL |
538 | return ret; |
539 | } | |
540 | ||
541 | bucket_topics.topics.erase(topic_name); | |
542 | ||
543 | ret = write_topics(bucket_topics, &objv_tracker); | |
544 | if (ret < 0) { | |
eafe8130 | 545 | ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
546 | return ret; |
547 | } | |
548 | ||
549 | return 0; | |
550 | } | |
551 | ||
eafe8130 TL |
552 | int RGWUserPubSub::create_topic(const string& name) { |
553 | return create_topic(name, rgw_pubsub_sub_dest(), ""); | |
554 | } | |
555 | ||
556 | int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn) { | |
11fdf7f2 TL |
557 | RGWObjVersionTracker objv_tracker; |
558 | rgw_pubsub_user_topics topics; | |
559 | ||
560 | int ret = read_user_topics(&topics, &objv_tracker); | |
561 | if (ret < 0 && ret != -ENOENT) { | |
eafe8130 TL |
562 | // its not an error if not topics exist, we create one |
563 | ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; | |
11fdf7f2 TL |
564 | return ret; |
565 | } | |
eafe8130 | 566 | |
11fdf7f2 TL |
567 | rgw_pubsub_topic_subs& new_topic = topics.topics[name]; |
568 | new_topic.topic.user = user; | |
569 | new_topic.topic.name = name; | |
eafe8130 TL |
570 | new_topic.topic.dest = dest; |
571 | new_topic.topic.arn = arn; | |
11fdf7f2 TL |
572 | |
573 | ret = write_user_topics(topics, &objv_tracker); | |
574 | if (ret < 0) { | |
eafe8130 | 575 | ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
576 | return ret; |
577 | } | |
578 | ||
579 | return 0; | |
580 | } | |
581 | ||
582 | int RGWUserPubSub::remove_topic(const string& name) | |
583 | { | |
584 | RGWObjVersionTracker objv_tracker; | |
585 | rgw_pubsub_user_topics topics; | |
586 | ||
587 | int ret = read_user_topics(&topics, &objv_tracker); | |
588 | if (ret < 0 && ret != -ENOENT) { | |
eafe8130 | 589 | ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
11fdf7f2 | 590 | return ret; |
eafe8130 TL |
591 | } else if (ret == -ENOENT) { |
592 | // its not an error if no topics exist, just a no-op | |
593 | ldout(store->ctx(), 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret << dendl; | |
594 | return 0; | |
11fdf7f2 TL |
595 | } |
596 | ||
597 | topics.topics.erase(name); | |
598 | ||
599 | ret = write_user_topics(topics, &objv_tracker); | |
600 | if (ret < 0) { | |
eafe8130 | 601 | ldout(store->ctx(), 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
602 | return ret; |
603 | } | |
604 | ||
605 | return 0; | |
606 | } | |
607 | ||
608 | int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker) | |
609 | { | |
610 | int ret = ps->read(sub_meta_obj, result, objv_tracker); | |
611 | if (ret < 0 && ret != -ENOENT) { | |
eafe8130 | 612 | ldout(ps->store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
613 | return ret; |
614 | } | |
615 | return 0; | |
616 | } | |
617 | ||
618 | int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker) | |
619 | { | |
620 | int ret = ps->write(sub_meta_obj, sub_conf, objv_tracker); | |
621 | if (ret < 0) { | |
eafe8130 | 622 | ldout(ps->store->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
623 | return ret; |
624 | } | |
625 | ||
626 | return 0; | |
627 | } | |
628 | ||
629 | int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker) | |
630 | { | |
631 | int ret = ps->remove(sub_meta_obj, objv_tracker); | |
632 | if (ret < 0) { | |
eafe8130 | 633 | ldout(ps->store->ctx(), 1) << "ERROR: failed to remove subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
634 | return ret; |
635 | } | |
636 | ||
637 | return 0; | |
638 | } | |
639 | ||
640 | int RGWUserPubSub::Sub::get_conf(rgw_pubsub_sub_config *result) | |
641 | { | |
642 | return read_sub(result, nullptr); | |
643 | } | |
644 | ||
eafe8130 | 645 | int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, const std::string& s3_id) |
11fdf7f2 TL |
646 | { |
647 | RGWObjVersionTracker user_objv_tracker; | |
648 | rgw_pubsub_user_topics topics; | |
649 | RGWRados *store = ps->store; | |
650 | ||
651 | int ret = ps->read_user_topics(&topics, &user_objv_tracker); | |
652 | if (ret < 0) { | |
eafe8130 TL |
653 | ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl; |
654 | return ret != -ENOENT ? ret : -EINVAL; | |
11fdf7f2 TL |
655 | } |
656 | ||
657 | auto iter = topics.topics.find(topic); | |
658 | if (iter == topics.topics.end()) { | |
eafe8130 TL |
659 | ldout(store->ctx(), 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl; |
660 | return -EINVAL; | |
11fdf7f2 TL |
661 | } |
662 | ||
663 | auto& t = iter->second; | |
664 | ||
665 | rgw_pubsub_sub_config sub_conf; | |
666 | ||
667 | sub_conf.user = ps->user; | |
668 | sub_conf.name = sub; | |
669 | sub_conf.topic = topic; | |
670 | sub_conf.dest = dest; | |
eafe8130 | 671 | sub_conf.s3_id = s3_id; |
11fdf7f2 TL |
672 | |
673 | t.subs.insert(sub); | |
674 | ||
675 | ret = ps->write_user_topics(topics, &user_objv_tracker); | |
676 | if (ret < 0) { | |
eafe8130 | 677 | ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
678 | return ret; |
679 | } | |
680 | ||
681 | ret = write_sub(sub_conf, nullptr); | |
682 | if (ret < 0) { | |
eafe8130 | 683 | ldout(store->ctx(), 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
684 | return ret; |
685 | } | |
686 | return 0; | |
687 | } | |
688 | ||
689 | int RGWUserPubSub::Sub::unsubscribe(const string& _topic) | |
690 | { | |
691 | string topic = _topic; | |
692 | RGWObjVersionTracker sobjv_tracker; | |
693 | RGWRados *store = ps->store; | |
694 | ||
695 | if (topic.empty()) { | |
696 | rgw_pubsub_sub_config sub_conf; | |
697 | int ret = read_sub(&sub_conf, &sobjv_tracker); | |
698 | if (ret < 0) { | |
eafe8130 | 699 | ldout(store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
700 | return ret; |
701 | } | |
702 | topic = sub_conf.topic; | |
703 | } | |
704 | ||
705 | RGWObjVersionTracker objv_tracker; | |
706 | rgw_pubsub_user_topics topics; | |
707 | ||
708 | int ret = ps->read_user_topics(&topics, &objv_tracker); | |
709 | if (ret < 0) { | |
eafe8130 TL |
710 | // not an error - could be that topic was already deleted |
711 | ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl; | |
712 | } else { | |
11fdf7f2 TL |
713 | auto iter = topics.topics.find(topic); |
714 | if (iter != topics.topics.end()) { | |
715 | auto& t = iter->second; | |
716 | ||
717 | t.subs.erase(sub); | |
718 | ||
719 | ret = ps->write_user_topics(topics, &objv_tracker); | |
720 | if (ret < 0) { | |
eafe8130 | 721 | ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl; |
11fdf7f2 TL |
722 | return ret; |
723 | } | |
724 | } | |
725 | } | |
726 | ||
727 | ret = remove_sub(&sobjv_tracker); | |
728 | if (ret < 0) { | |
eafe8130 | 729 | ldout(store->ctx(), 1) << "ERROR: failed to delete subscription info: ret=" << ret << dendl; |
11fdf7f2 TL |
730 | return ret; |
731 | } | |
732 | return 0; | |
733 | } | |
734 | ||
eafe8130 TL |
735 | template<typename EventType> |
736 | void RGWUserPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const | |
11fdf7f2 TL |
737 | { |
738 | encode_json("next_marker", next_marker, f); | |
739 | encode_json("is_truncated", is_truncated, f); | |
740 | ||
eafe8130 | 741 | Formatter::ArraySection s(*f, EventType::json_type_plural); |
11fdf7f2 | 742 | for (auto& event : events) { |
92f5a8d4 | 743 | encode_json("", event, f); |
11fdf7f2 TL |
744 | } |
745 | } | |
746 | ||
eafe8130 TL |
747 | template<typename EventType> |
748 | int RGWUserPubSub::SubWithEvents<EventType>::list_events(const string& marker, int max_events) | |
11fdf7f2 TL |
749 | { |
750 | RGWRados *store = ps->store; | |
751 | rgw_pubsub_sub_config sub_conf; | |
752 | int ret = get_conf(&sub_conf); | |
753 | if (ret < 0) { | |
eafe8130 | 754 | ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl; |
11fdf7f2 TL |
755 | return ret; |
756 | } | |
757 | ||
758 | RGWBucketInfo bucket_info; | |
759 | string tenant; | |
760 | RGWSysObjectCtx obj_ctx(store->svc.sysobj->init_obj_ctx()); | |
761 | ret = store->get_bucket_info(obj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr); | |
762 | if (ret == -ENOENT) { | |
eafe8130 | 763 | list.is_truncated = false; |
11fdf7f2 TL |
764 | return 0; |
765 | } | |
766 | if (ret < 0) { | |
eafe8130 | 767 | ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; |
11fdf7f2 TL |
768 | return ret; |
769 | } | |
770 | ||
771 | RGWRados::Bucket target(store, bucket_info); | |
772 | RGWRados::Bucket::List list_op(&target); | |
773 | ||
774 | list_op.params.prefix = sub_conf.dest.oid_prefix; | |
775 | list_op.params.marker = marker; | |
776 | ||
eafe8130 | 777 | std::vector<rgw_bucket_dir_entry> objs; |
11fdf7f2 | 778 | |
eafe8130 | 779 | ret = list_op.list_objects(max_events, &objs, nullptr, &list.is_truncated); |
11fdf7f2 | 780 | if (ret < 0) { |
eafe8130 | 781 | ldout(store->ctx(), 1) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; |
11fdf7f2 TL |
782 | return ret; |
783 | } | |
eafe8130 TL |
784 | if (list.is_truncated) { |
785 | list.next_marker = list_op.get_next_marker().name; | |
11fdf7f2 TL |
786 | } |
787 | ||
788 | for (auto& obj : objs) { | |
789 | bufferlist bl64; | |
790 | bufferlist bl; | |
791 | bl64.append(obj.meta.user_data); | |
792 | try { | |
793 | bl.decode_base64(bl64); | |
794 | } catch (buffer::error& err) { | |
eafe8130 | 795 | ldout(store->ctx(), 1) << "ERROR: failed to event (not a valid base64)" << dendl; |
11fdf7f2 TL |
796 | continue; |
797 | } | |
eafe8130 | 798 | EventType event; |
11fdf7f2 TL |
799 | |
800 | auto iter = bl.cbegin(); | |
801 | try { | |
802 | decode(event, iter); | |
803 | } catch (buffer::error& err) { | |
eafe8130 | 804 | ldout(store->ctx(), 1) << "ERROR: failed to decode event" << dendl; |
11fdf7f2 TL |
805 | continue; |
806 | }; | |
807 | ||
eafe8130 | 808 | list.events.push_back(event); |
11fdf7f2 TL |
809 | } |
810 | return 0; | |
811 | } | |
812 | ||
eafe8130 TL |
813 | template<typename EventType> |
814 | int RGWUserPubSub::SubWithEvents<EventType>::remove_event(const string& event_id) | |
11fdf7f2 TL |
815 | { |
816 | RGWRados *store = ps->store; | |
817 | rgw_pubsub_sub_config sub_conf; | |
818 | int ret = get_conf(&sub_conf); | |
819 | if (ret < 0) { | |
eafe8130 | 820 | ldout(store->ctx(), 1) << "ERROR: failed to read sub config: ret=" << ret << dendl; |
11fdf7f2 TL |
821 | return ret; |
822 | } | |
823 | ||
824 | RGWBucketInfo bucket_info; | |
825 | string tenant; | |
826 | RGWSysObjectCtx sysobj_ctx(store->svc.sysobj->init_obj_ctx()); | |
827 | ret = store->get_bucket_info(sysobj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr); | |
828 | if (ret < 0) { | |
eafe8130 | 829 | ldout(store->ctx(), 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; |
11fdf7f2 TL |
830 | return ret; |
831 | } | |
832 | ||
833 | rgw_bucket& bucket = bucket_info.bucket; | |
834 | ||
835 | RGWObjectCtx obj_ctx(store); | |
836 | rgw_obj obj(bucket, sub_conf.dest.oid_prefix + event_id); | |
837 | ||
838 | obj_ctx.set_atomic(obj); | |
839 | ||
840 | RGWRados::Object del_target(store, bucket_info, obj_ctx, obj); | |
841 | RGWRados::Object::Delete del_op(&del_target); | |
842 | ||
843 | del_op.params.bucket_owner = bucket_info.owner; | |
844 | del_op.params.versioning_status = bucket_info.versioning_status(); | |
845 | ||
846 | ret = del_op.delete_obj(); | |
847 | if (ret < 0) { | |
eafe8130 | 848 | ldout(store->ctx(), 1) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl; |
11fdf7f2 TL |
849 | } |
850 | return 0; | |
851 | } | |
eafe8130 TL |
852 | |
853 | template<typename EventType> | |
854 | void RGWUserPubSub::SubWithEvents<EventType>::dump(Formatter* f) const { | |
855 | list.dump(f); | |
856 | } | |
857 | ||
858 | // explicit instantiation for the only two possible types | |
859 | // no need to move implementation to header | |
860 | template class RGWUserPubSub::SubWithEvents<rgw_pubsub_event>; | |
861 | template class RGWUserPubSub::SubWithEvents<rgw_pubsub_s3_record>; | |
862 |