]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | #include "services/svc_zone.h" | |
11fdf7f2 TL |
5 | #include "rgw_common.h" |
6 | #include "rgw_coroutine.h" | |
7 | #include "rgw_sync_module.h" | |
8 | #include "rgw_data_sync.h" | |
9 | #include "rgw_sync_module_pubsub.h" | |
10 | #include "rgw_sync_module_pubsub_rest.h" | |
11 | #include "rgw_rest_conn.h" | |
12 | #include "rgw_cr_rados.h" | |
13 | #include "rgw_cr_rest.h" | |
14 | #include "rgw_cr_tools.h" | |
15 | #include "rgw_op.h" | |
16 | #include "rgw_pubsub.h" | |
17 | #include "rgw_pubsub_push.h" | |
eafe8130 | 18 | #include "rgw_notify_event_type.h" |
11fdf7f2 | 19 | #include "rgw_perf_counters.h" |
eafe8130 TL |
20 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT |
21 | #include "rgw_amqp.h" | |
22 | #endif | |
9f95a23c TL |
23 | #ifdef WITH_RADOSGW_KAFKA_ENDPOINT |
24 | #include "rgw_kafka.h" | |
25 | #endif | |
11fdf7f2 | 26 | |
eafe8130 | 27 | #include <boost/algorithm/hex.hpp> |
11fdf7f2 TL |
28 | #include <boost/asio/yield.hpp> |
29 | ||
30 | #define dout_subsys ceph_subsys_rgw | |
31 | ||
32 | ||
33 | #define PUBSUB_EVENTS_RETENTION_DEFAULT 7 | |
34 | ||
35 | /* | |
36 | ||
37 | config: | |
38 | ||
39 | { | |
40 | "tenant": <tenant>, # default: <empty> | |
41 | "uid": <uid>, # default: "pubsub" | |
42 | "data_bucket_prefix": <prefix> # default: "pubsub-" | |
43 | "data_oid_prefix": <prefix> # | |
eafe8130 TL |
44 | "events_retention_days": <int> # default: 7 |
45 | "start_with_full_sync" <bool> # default: false | |
11fdf7f2 TL |
46 | |
47 | # non-dynamic config | |
48 | "notifications": [ | |
49 | { | |
50 | "path": <notification-path>, # this can be either an explicit path: <bucket>, or <bucket>/<object>, | |
51 | # or a prefix if it ends with a wildcard | |
52 | "topic": <topic-name> | |
53 | }, | |
54 | ... | |
55 | ], | |
56 | "subscriptions": [ | |
57 | { | |
58 | "name": <subscription-name>, | |
59 | "topic": <topic>, | |
60 | "push_endpoint": <endpoint>, | |
eafe8130 | 61 | "push_endpoint_args:" <arg list>. # any push endpoint specific args (include all args) |
11fdf7f2 TL |
62 | "data_bucket": <bucket>, # override name of bucket where subscription data will be store |
63 | "data_oid_prefix": <prefix> # set prefix for subscription data object ids | |
eafe8130 | 64 | "s3_id": <id> # in case of S3 compatible notifications, the notification ID will be set here |
11fdf7f2 TL |
65 | }, |
66 | ... | |
67 | ] | |
68 | } | |
69 | ||
70 | */ | |
71 | ||
72 | // utility function to convert the args list from string format | |
73 | // (ampresend separated with equal sign) to prased structure | |
74 | RGWHTTPArgs string_to_args(const std::string& str_args) { | |
75 | RGWHTTPArgs args; | |
76 | args.set(str_args); | |
77 | args.parse(); | |
78 | return args; | |
79 | } | |
80 | ||
eafe8130 TL |
81 | struct PSSubConfig { |
82 | std::string name; | |
83 | std::string topic; | |
84 | std::string push_endpoint_name; | |
85 | std::string push_endpoint_args; | |
86 | std::string data_bucket_name; | |
87 | std::string data_oid_prefix; | |
88 | std::string s3_id; | |
89 | std::string arn_topic; | |
11fdf7f2 TL |
90 | RGWPubSubEndpoint::Ptr push_endpoint; |
91 | ||
11fdf7f2 TL |
92 | void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc) { |
93 | name = uc.name; | |
94 | topic = uc.topic; | |
95 | push_endpoint_name = uc.dest.push_endpoint; | |
96 | data_bucket_name = uc.dest.bucket_name; | |
97 | data_oid_prefix = uc.dest.oid_prefix; | |
eafe8130 TL |
98 | s3_id = uc.s3_id; |
99 | arn_topic = uc.dest.arn_topic; | |
100 | if (!push_endpoint_name.empty()) { | |
11fdf7f2 TL |
101 | push_endpoint_args = uc.dest.push_endpoint_args; |
102 | try { | |
eafe8130 TL |
103 | push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct); |
104 | ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl; | |
11fdf7f2 | 105 | } catch (const RGWPubSubEndpoint::configuration_error& e) { |
eafe8130 | 106 | ldout(cct, 1) << "ERROR: failed to create push endpoint: " |
11fdf7f2 TL |
107 | << push_endpoint_name << " due to: " << e.what() << dendl; |
108 | } | |
109 | } | |
110 | } | |
111 | ||
112 | void dump(Formatter *f) const { | |
113 | encode_json("name", name, f); | |
114 | encode_json("topic", topic, f); | |
115 | encode_json("push_endpoint", push_endpoint_name, f); | |
eafe8130 | 116 | encode_json("push_endpoint_args", push_endpoint_args, f); |
11fdf7f2 TL |
117 | encode_json("data_bucket_name", data_bucket_name, f); |
118 | encode_json("data_oid_prefix", data_oid_prefix, f); | |
eafe8130 | 119 | encode_json("s3_id", s3_id, f); |
11fdf7f2 TL |
120 | } |
121 | ||
122 | void init(CephContext *cct, const JSONFormattable& config, | |
123 | const string& data_bucket_prefix, | |
124 | const string& default_oid_prefix) { | |
125 | name = config["name"]; | |
126 | topic = config["topic"]; | |
127 | push_endpoint_name = config["push_endpoint"]; | |
128 | string default_bucket_name = data_bucket_prefix + name; | |
129 | data_bucket_name = config["data_bucket"](default_bucket_name.c_str()); | |
130 | data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str()); | |
eafe8130 TL |
131 | s3_id = config["s3_id"]; |
132 | arn_topic = config["arn_topic"]; | |
11fdf7f2 TL |
133 | if (!push_endpoint_name.empty()) { |
134 | push_endpoint_args = config["push_endpoint_args"]; | |
135 | try { | |
eafe8130 TL |
136 | push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct); |
137 | ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl; | |
11fdf7f2 | 138 | } catch (const RGWPubSubEndpoint::configuration_error& e) { |
eafe8130 | 139 | ldout(cct, 1) << "ERROR: failed to create push endpoint: " |
11fdf7f2 TL |
140 | << push_endpoint_name << " due to: " << e.what() << dendl; |
141 | } | |
142 | } | |
143 | } | |
144 | }; | |
145 | ||
146 | using PSSubConfigRef = std::shared_ptr<PSSubConfig>; | |
147 | ||
148 | struct PSTopicConfig { | |
eafe8130 TL |
149 | std::string name; |
150 | std::set<std::string> subs; | |
9f95a23c | 151 | std::string opaque_data; |
11fdf7f2 TL |
152 | |
153 | void dump(Formatter *f) const { | |
154 | encode_json("name", name, f); | |
155 | encode_json("subs", subs, f); | |
9f95a23c | 156 | encode_json("opaque", opaque_data, f); |
11fdf7f2 TL |
157 | } |
158 | }; | |
159 | ||
160 | struct PSNotificationConfig { | |
161 | uint64_t id{0}; | |
162 | string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */ | |
163 | string topic; | |
164 | bool is_prefix{false}; | |
165 | ||
166 | ||
167 | void dump(Formatter *f) const { | |
168 | encode_json("id", id, f); | |
169 | encode_json("path", path, f); | |
170 | encode_json("topic", topic, f); | |
171 | encode_json("is_prefix", is_prefix, f); | |
172 | } | |
173 | ||
174 | void init(CephContext *cct, const JSONFormattable& config) { | |
175 | path = config["path"]; | |
176 | if (!path.empty() && path[path.size() - 1] == '*') { | |
177 | path = path.substr(0, path.size() - 1); | |
178 | is_prefix = true; | |
179 | } | |
180 | topic = config["topic"]; | |
181 | } | |
182 | }; | |
183 | ||
184 | template<class T> | |
185 | static string json_str(const char *name, const T& obj, bool pretty = false) | |
186 | { | |
187 | stringstream ss; | |
188 | JSONFormatter f(pretty); | |
189 | ||
190 | encode_json(name, obj, &f); | |
191 | f.flush(ss); | |
192 | ||
193 | return ss.str(); | |
194 | } | |
195 | ||
196 | using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>; | |
eafe8130 | 197 | using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>; |
11fdf7f2 TL |
198 | |
199 | struct PSConfig { | |
9f95a23c | 200 | const std::string id{"pubsub"}; |
11fdf7f2 | 201 | rgw_user user; |
9f95a23c TL |
202 | std::string data_bucket_prefix; |
203 | std::string data_oid_prefix; | |
11fdf7f2 TL |
204 | |
205 | int events_retention_days{0}; | |
206 | ||
207 | uint64_t sync_instance{0}; | |
208 | uint64_t max_id{0}; | |
209 | ||
210 | /* FIXME: no hard coded buckets, we'll have configurable topics */ | |
eafe8130 TL |
211 | std::map<std::string, PSSubConfigRef> subs; |
212 | std::map<std::string, PSTopicConfigRef> topics; | |
213 | std::multimap<std::string, PSNotificationConfig> notifications; | |
214 | ||
215 | bool start_with_full_sync{false}; | |
11fdf7f2 TL |
216 | |
217 | void dump(Formatter *f) const { | |
218 | encode_json("id", id, f); | |
219 | encode_json("user", user, f); | |
220 | encode_json("data_bucket_prefix", data_bucket_prefix, f); | |
221 | encode_json("data_oid_prefix", data_oid_prefix, f); | |
222 | encode_json("events_retention_days", events_retention_days, f); | |
223 | encode_json("sync_instance", sync_instance, f); | |
224 | encode_json("max_id", max_id, f); | |
225 | { | |
226 | Formatter::ArraySection section(*f, "subs"); | |
227 | for (auto& sub : subs) { | |
228 | encode_json("sub", *sub.second, f); | |
229 | } | |
230 | } | |
231 | { | |
232 | Formatter::ArraySection section(*f, "topics"); | |
233 | for (auto& topic : topics) { | |
234 | encode_json("topic", *topic.second, f); | |
235 | } | |
236 | } | |
237 | { | |
238 | Formatter::ObjectSection section(*f, "notifications"); | |
9f95a23c | 239 | std::string last; |
11fdf7f2 TL |
240 | for (auto& notif : notifications) { |
241 | const string& n = notif.first; | |
242 | if (n != last) { | |
243 | if (!last.empty()) { | |
244 | f->close_section(); | |
245 | } | |
246 | f->open_array_section(n.c_str()); | |
247 | } | |
248 | last = n; | |
249 | encode_json("notifications", notif.second, f); | |
250 | } | |
251 | if (!last.empty()) { | |
252 | f->close_section(); | |
253 | } | |
254 | } | |
eafe8130 | 255 | encode_json("start_with_full_sync", start_with_full_sync, f); |
11fdf7f2 TL |
256 | } |
257 | ||
258 | void init(CephContext *cct, const JSONFormattable& config) { | |
259 | string uid = config["uid"]("pubsub"); | |
260 | user = rgw_user(config["tenant"], uid); | |
261 | data_bucket_prefix = config["data_bucket_prefix"]("pubsub-"); | |
262 | data_oid_prefix = config["data_oid_prefix"]; | |
263 | events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT); | |
264 | ||
265 | for (auto& c : config["notifications"].array()) { | |
266 | PSNotificationConfig nc; | |
267 | nc.id = ++max_id; | |
268 | nc.init(cct, c); | |
269 | notifications.insert(std::make_pair(nc.path, nc)); | |
270 | ||
271 | PSTopicConfig topic_config = { .name = nc.topic }; | |
272 | topics[nc.topic] = make_shared<PSTopicConfig>(topic_config); | |
273 | } | |
274 | for (auto& c : config["subscriptions"].array()) { | |
275 | auto sc = std::make_shared<PSSubConfig>(); | |
276 | sc->init(cct, c, data_bucket_prefix, data_oid_prefix); | |
277 | subs[sc->name] = sc; | |
278 | auto iter = topics.find(sc->topic); | |
279 | if (iter != topics.end()) { | |
280 | iter->second->subs.insert(sc->name); | |
281 | } | |
282 | } | |
eafe8130 | 283 | start_with_full_sync = config["start_with_full_sync"](false); |
11fdf7f2 TL |
284 | |
285 | ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl; | |
286 | } | |
287 | ||
288 | void init_instance(const RGWRealm& realm, uint64_t instance_id) { | |
289 | sync_instance = instance_id; | |
290 | } | |
291 | ||
292 | void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) { | |
9f95a23c | 293 | const std::string path = bucket.name + "/" + key.name; |
11fdf7f2 TL |
294 | |
295 | auto iter = notifications.upper_bound(path); | |
296 | if (iter == notifications.begin()) { | |
297 | return; | |
298 | } | |
299 | ||
300 | do { | |
301 | --iter; | |
302 | if (iter->first.size() > path.size()) { | |
303 | break; | |
304 | } | |
305 | if (path.compare(0, iter->first.size(), iter->first) != 0) { | |
306 | break; | |
307 | } | |
308 | ||
309 | PSNotificationConfig& target = iter->second; | |
310 | ||
311 | if (!target.is_prefix && | |
312 | path.size() != iter->first.size()) { | |
313 | continue; | |
314 | } | |
315 | ||
316 | auto topic = topics.find(target.topic); | |
317 | if (topic == topics.end()) { | |
318 | continue; | |
319 | } | |
320 | ||
eafe8130 TL |
321 | ldout(cct, 20) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << |
322 | " target_path=" << target.path << ", topic=" << target.topic << dendl; | |
11fdf7f2 TL |
323 | (*result)->push_back(topic->second); |
324 | } while (iter != notifications.begin()); | |
325 | } | |
326 | ||
327 | bool find_sub(const string& name, PSSubConfigRef *ref) { | |
328 | auto iter = subs.find(name); | |
329 | if (iter != subs.end()) { | |
330 | *ref = iter->second; | |
331 | return true; | |
332 | } | |
333 | return false; | |
334 | } | |
335 | }; | |
336 | ||
11fdf7f2 | 337 | using PSConfigRef = std::shared_ptr<PSConfig>; |
eafe8130 TL |
338 | template<typename EventType> |
339 | using EventRef = std::shared_ptr<EventType>; | |
11fdf7f2 TL |
340 | |
341 | struct objstore_event { | |
342 | string id; | |
343 | const rgw_bucket& bucket; | |
344 | const rgw_obj_key& key; | |
345 | const ceph::real_time& mtime; | |
346 | const std::vector<std::pair<std::string, std::string> > *attrs; | |
347 | ||
348 | objstore_event(const rgw_bucket& _bucket, | |
349 | const rgw_obj_key& _key, | |
350 | const ceph::real_time& _mtime, | |
351 | const std::vector<std::pair<std::string, std::string> > *_attrs) : bucket(_bucket), | |
352 | key(_key), | |
353 | mtime(_mtime), | |
354 | attrs(_attrs) {} | |
355 | ||
356 | string get_hash() { | |
357 | string etag; | |
358 | RGWMD5Etag hash; | |
359 | hash.update(bucket.bucket_id); | |
360 | hash.update(key.name); | |
361 | hash.update(key.instance); | |
362 | hash.finish(&etag); | |
363 | ||
364 | assert(etag.size() > 8); | |
365 | ||
366 | return etag.substr(0, 8); | |
367 | } | |
368 | ||
369 | void dump(Formatter *f) const { | |
370 | { | |
371 | Formatter::ObjectSection s(*f, "bucket"); | |
372 | encode_json("name", bucket.name, f); | |
373 | encode_json("tenant", bucket.tenant, f); | |
374 | encode_json("bucket_id", bucket.bucket_id, f); | |
375 | } | |
376 | { | |
377 | Formatter::ObjectSection s(*f, "key"); | |
378 | encode_json("name", key.name, f); | |
379 | encode_json("instance", key.instance, f); | |
380 | } | |
381 | utime_t mt(mtime); | |
382 | encode_json("mtime", mt, f); | |
383 | Formatter::ObjectSection s(*f, "attrs"); | |
384 | if (attrs) { | |
385 | for (auto& attr : *attrs) { | |
386 | encode_json(attr.first.c_str(), attr.second.c_str(), f); | |
387 | } | |
388 | } | |
389 | } | |
390 | }; | |
391 | ||
392 | static void make_event_ref(CephContext *cct, const rgw_bucket& bucket, | |
393 | const rgw_obj_key& key, | |
394 | const ceph::real_time& mtime, | |
395 | const std::vector<std::pair<std::string, std::string> > *attrs, | |
eafe8130 TL |
396 | rgw::notify::EventType event_type, |
397 | EventRef<rgw_pubsub_event> *event) { | |
11fdf7f2 TL |
398 | *event = std::make_shared<rgw_pubsub_event>(); |
399 | ||
eafe8130 TL |
400 | EventRef<rgw_pubsub_event>& e = *event; |
401 | e->event_name = rgw::notify::to_ceph_string(event_type); | |
11fdf7f2 TL |
402 | e->source = bucket.name + "/" + key.name; |
403 | e->timestamp = real_clock::now(); | |
404 | ||
405 | objstore_event oevent(bucket, key, mtime, attrs); | |
406 | ||
eafe8130 TL |
407 | const utime_t ts(e->timestamp); |
408 | set_event_id(e->id, oevent.get_hash(), ts); | |
11fdf7f2 TL |
409 | |
410 | encode_json("info", oevent, &e->info); | |
411 | } | |
412 | ||
eafe8130 TL |
413 | static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket, |
414 | const rgw_user& owner, | |
415 | const rgw_obj_key& key, | |
416 | const ceph::real_time& mtime, | |
417 | const std::vector<std::pair<std::string, std::string> > *attrs, | |
418 | rgw::notify::EventType event_type, | |
419 | EventRef<rgw_pubsub_s3_record> *record) { | |
420 | *record = std::make_shared<rgw_pubsub_s3_record>(); | |
421 | ||
422 | EventRef<rgw_pubsub_s3_record>& r = *record; | |
eafe8130 TL |
423 | r->eventTime = mtime; |
424 | r->eventName = rgw::notify::to_string(event_type); | |
92f5a8d4 TL |
425 | // userIdentity: not supported in sync module |
426 | // x_amz_request_id: not supported in sync module | |
427 | // x_amz_id_2: not supported in sync module | |
eafe8130 TL |
428 | // configurationId is filled from subscription configuration |
429 | r->bucket_name = bucket.name; | |
430 | r->bucket_ownerIdentity = owner.to_str(); | |
431 | r->bucket_arn = to_string(rgw::ARN(bucket)); | |
432 | r->bucket_id = bucket.bucket_id; // rgw extension | |
433 | r->object_key = key.name; | |
92f5a8d4 | 434 | // object_size not supported in sync module |
eafe8130 TL |
435 | objstore_event oevent(bucket, key, mtime, attrs); |
436 | r->object_etag = oevent.get_hash(); | |
437 | r->object_versionId = key.instance; | |
438 | ||
439 | // use timestamp as per key sequence id (hex encoded) | |
440 | const utime_t ts(real_clock::now()); | |
441 | boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), | |
442 | std::back_inserter(r->object_sequencer)); | |
443 | ||
eafe8130 TL |
444 | set_event_id(r->id, r->object_etag, ts); |
445 | } | |
446 | ||
11fdf7f2 TL |
447 | class PSManager; |
448 | using PSManagerRef = std::shared_ptr<PSManager>; | |
449 | ||
450 | struct PSEnv { | |
451 | PSConfigRef conf; | |
452 | shared_ptr<RGWUserInfo> data_user_info; | |
453 | PSManagerRef manager; | |
454 | ||
455 | PSEnv() : conf(make_shared<PSConfig>()), | |
456 | data_user_info(make_shared<RGWUserInfo>()) {} | |
457 | ||
458 | void init(CephContext *cct, const JSONFormattable& config) { | |
459 | conf->init(cct, config); | |
460 | } | |
461 | ||
462 | void init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr); | |
463 | }; | |
464 | ||
465 | using PSEnvRef = std::shared_ptr<PSEnv>; | |
466 | ||
eafe8130 | 467 | template<typename EventType> |
11fdf7f2 | 468 | class PSEvent { |
eafe8130 | 469 | const EventRef<EventType> event; |
11fdf7f2 TL |
470 | |
471 | public: | |
eafe8130 | 472 | PSEvent(const EventRef<EventType>& _event) : event(_event) {} |
11fdf7f2 TL |
473 | |
474 | void format(bufferlist *bl) const { | |
92f5a8d4 | 475 | bl->append(json_str("", *event)); |
11fdf7f2 TL |
476 | } |
477 | ||
478 | void encode_event(bufferlist& bl) const { | |
479 | encode(*event, bl); | |
480 | } | |
481 | ||
482 | const string& id() const { | |
483 | return event->id; | |
484 | } | |
485 | }; | |
486 | ||
487 | template <class T> | |
488 | class RGWSingletonCR : public RGWCoroutine { | |
489 | friend class WrapperCR; | |
490 | ||
491 | boost::asio::coroutine wrapper_state; | |
492 | bool started{false}; | |
493 | int operate_ret{0}; | |
494 | ||
495 | struct WaiterInfo { | |
496 | RGWCoroutine *cr{nullptr}; | |
497 | T *result; | |
498 | }; | |
499 | using WaiterInfoRef = std::shared_ptr<WaiterInfo>; | |
500 | ||
501 | deque<WaiterInfoRef> waiters; | |
502 | ||
503 | void add_waiter(RGWCoroutine *cr, T *result) { | |
504 | auto waiter = std::make_shared<WaiterInfo>(); | |
505 | waiter->cr = cr; | |
506 | waiter->result = result; | |
507 | waiters.push_back(waiter); | |
508 | }; | |
509 | ||
510 | bool get_next_waiter(WaiterInfoRef *waiter) { | |
511 | if (waiters.empty()) { | |
512 | waiter->reset(); | |
513 | return false; | |
514 | } | |
515 | ||
516 | *waiter = waiters.front(); | |
517 | waiters.pop_front(); | |
518 | return true; | |
519 | } | |
520 | ||
521 | int operate_wrapper() override { | |
522 | reenter(&wrapper_state) { | |
523 | while (!is_done()) { | |
524 | ldout(cct, 20) << __func__ << "(): operate_wrapper() -> operate()" << dendl; | |
525 | operate_ret = operate(); | |
526 | if (operate_ret < 0) { | |
527 | ldout(cct, 20) << *this << ": operate() returned r=" << operate_ret << dendl; | |
528 | } | |
529 | if (!is_done()) { | |
530 | yield; | |
531 | } | |
532 | } | |
533 | ||
534 | ldout(cct, 20) << __func__ << "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters.size() << " waiters" << dendl; | |
535 | /* we're done, can't yield anymore */ | |
536 | ||
537 | WaiterInfoRef waiter; | |
538 | while (get_next_waiter(&waiter)) { | |
539 | ldout(cct, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl; | |
540 | waiter->cr->set_retcode(retcode); | |
541 | waiter->cr->set_sleeping(false); | |
542 | return_result(waiter->result); | |
543 | put(); | |
544 | } | |
545 | ||
546 | return retcode; | |
547 | } | |
548 | return 0; | |
549 | } | |
550 | ||
551 | virtual void return_result(T *result) {} | |
552 | ||
553 | public: | |
554 | RGWSingletonCR(CephContext *_cct) | |
555 | : RGWCoroutine(_cct) {} | |
556 | ||
557 | int execute(RGWCoroutine *caller, T *result = nullptr) { | |
558 | if (!started) { | |
559 | ldout(cct, 20) << __func__ << "(): singleton not started, starting" << dendl; | |
560 | started = true; | |
561 | caller->call(this); | |
562 | return 0; | |
563 | } else if (!is_done()) { | |
564 | ldout(cct, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl; | |
565 | get(); | |
566 | add_waiter(caller, result); | |
567 | caller->set_sleeping(true); | |
568 | return 0; | |
569 | } | |
570 | ||
571 | ldout(cct, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl; | |
572 | caller->set_retcode(retcode); | |
573 | return_result(result); | |
574 | return retcode; | |
575 | } | |
576 | }; | |
577 | ||
578 | ||
579 | class PSSubscription; | |
580 | using PSSubscriptionRef = std::shared_ptr<PSSubscription>; | |
581 | ||
582 | class PSSubscription { | |
583 | class InitCR; | |
584 | friend class InitCR; | |
585 | friend class RGWPSHandleObjEventCR; | |
586 | ||
9f95a23c | 587 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
588 | RGWDataSyncEnv *sync_env; |
589 | PSEnvRef env; | |
590 | PSSubConfigRef sub_conf; | |
eafe8130 | 591 | std::shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result; |
11fdf7f2 TL |
592 | RGWBucketInfo *bucket_info{nullptr}; |
593 | RGWDataAccessRef data_access; | |
594 | RGWDataAccess::BucketRef bucket; | |
595 | ||
11fdf7f2 TL |
596 | InitCR *init_cr{nullptr}; |
597 | ||
598 | class InitBucketLifecycleCR : public RGWCoroutine { | |
9f95a23c | 599 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
600 | RGWDataSyncEnv *sync_env; |
601 | PSConfigRef& conf; | |
602 | LCRule rule; | |
603 | ||
604 | int retention_days; | |
605 | ||
606 | rgw_bucket_lifecycle_config_params lc_config; | |
607 | ||
608 | public: | |
9f95a23c | 609 | InitBucketLifecycleCR(RGWDataSyncCtx *_sc, |
11fdf7f2 TL |
610 | PSConfigRef& _conf, |
611 | RGWBucketInfo& _bucket_info, | |
9f95a23c TL |
612 | std::map<string, bufferlist>& _bucket_attrs) : RGWCoroutine(_sc->cct), |
613 | sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
614 | conf(_conf) { |
615 | lc_config.bucket_info = _bucket_info; | |
616 | lc_config.bucket_attrs = _bucket_attrs; | |
617 | retention_days = conf->events_retention_days; | |
618 | } | |
619 | ||
620 | int operate() override { | |
621 | reenter(this) { | |
622 | ||
623 | rule.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days); | |
624 | ||
625 | { | |
626 | /* maybe we already have it configured? */ | |
627 | RGWLifecycleConfiguration old_config; | |
628 | auto aiter = lc_config.bucket_attrs.find(RGW_ATTR_LC); | |
629 | if (aiter != lc_config.bucket_attrs.end()) { | |
630 | bufferlist::const_iterator iter{&aiter->second}; | |
631 | try { | |
632 | old_config.decode(iter); | |
633 | } catch (const buffer::error& e) { | |
9f95a23c | 634 | ldpp_dout(sync_env->dpp, 0) << __func__ << "(): decode life cycle config failed" << dendl; |
11fdf7f2 TL |
635 | } |
636 | } | |
637 | ||
638 | auto old_rules = old_config.get_rule_map(); | |
639 | for (auto ori : old_rules) { | |
640 | auto& old_rule = ori.second; | |
641 | ||
642 | if (old_rule.get_prefix().empty() && | |
643 | old_rule.get_expiration().get_days() == retention_days && | |
644 | old_rule.is_enabled()) { | |
9f95a23c | 645 | ldpp_dout(sync_env->dpp, 20) << "no need to set lifecycle rule on bucket, existing rule matches config" << dendl; |
11fdf7f2 TL |
646 | return set_cr_done(); |
647 | } | |
648 | } | |
649 | } | |
650 | ||
651 | lc_config.config.add_rule(rule); | |
652 | yield call(new RGWBucketLifecycleConfigCR(sync_env->async_rados, | |
653 | sync_env->store, | |
9f95a23c TL |
654 | lc_config, |
655 | sync_env->dpp)); | |
11fdf7f2 | 656 | if (retcode < 0) { |
9f95a23c | 657 | ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl; |
11fdf7f2 TL |
658 | return set_cr_error(retcode); |
659 | } | |
660 | ||
661 | return set_cr_done(); | |
662 | } | |
663 | return 0; | |
664 | } | |
665 | }; | |
666 | ||
667 | class InitCR : public RGWSingletonCR<bool> { | |
9f95a23c | 668 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
669 | RGWDataSyncEnv *sync_env; |
670 | PSSubscriptionRef sub; | |
671 | rgw_get_bucket_info_params get_bucket_info; | |
672 | rgw_bucket_create_local_params create_bucket; | |
673 | PSConfigRef& conf; | |
674 | PSSubConfigRef& sub_conf; | |
675 | int i; | |
676 | ||
677 | public: | |
9f95a23c TL |
678 | InitCR(RGWDataSyncCtx *_sc, |
679 | PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sc->cct), | |
680 | sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
681 | sub(_sub), conf(sub->env->conf), |
682 | sub_conf(sub->sub_conf) { | |
683 | } | |
684 | ||
685 | int operate() override { | |
686 | reenter(this) { | |
687 | get_bucket_info.tenant = conf->user.tenant; | |
688 | get_bucket_info.bucket_name = sub_conf->data_bucket_name; | |
689 | sub->get_bucket_info_result = make_shared<rgw_get_bucket_info_result>(); | |
690 | ||
691 | for (i = 0; i < 2; ++i) { | |
692 | yield call(new RGWGetBucketInfoCR(sync_env->async_rados, | |
693 | sync_env->store, | |
694 | get_bucket_info, | |
695 | sub->get_bucket_info_result)); | |
696 | if (retcode < 0 && retcode != -ENOENT) { | |
9f95a23c | 697 | ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to geting bucket info: " << "tenant=" |
11fdf7f2 TL |
698 | << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl; |
699 | } | |
700 | if (retcode == 0) { | |
701 | { | |
702 | auto& result = sub->get_bucket_info_result; | |
703 | sub->bucket_info = &result->bucket_info; | |
704 | ||
705 | int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket); | |
706 | if (ret < 0) { | |
9f95a23c | 707 | ldpp_dout(sync_env->dpp, 1) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl; |
11fdf7f2 TL |
708 | return set_cr_error(ret); |
709 | } | |
710 | } | |
711 | ||
9f95a23c | 712 | yield call(new InitBucketLifecycleCR(sc, conf, |
11fdf7f2 TL |
713 | sub->get_bucket_info_result->bucket_info, |
714 | sub->get_bucket_info_result->attrs)); | |
715 | if (retcode < 0) { | |
9f95a23c | 716 | ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl; |
11fdf7f2 TL |
717 | return set_cr_error(retcode); |
718 | } | |
719 | ||
720 | return set_cr_done(); | |
721 | } | |
722 | ||
723 | create_bucket.user_info = sub->env->data_user_info; | |
724 | create_bucket.bucket_name = sub_conf->data_bucket_name; | |
9f95a23c | 725 | ldpp_dout(sync_env->dpp, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl; |
11fdf7f2 TL |
726 | yield call(new RGWBucketCreateLocalCR(sync_env->async_rados, |
727 | sync_env->store, | |
9f95a23c TL |
728 | create_bucket, |
729 | sync_env->dpp)); | |
11fdf7f2 | 730 | if (retcode < 0) { |
9f95a23c | 731 | ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create bucket: " << "tenant=" |
11fdf7f2 TL |
732 | << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl; |
733 | return set_cr_error(retcode); | |
734 | } | |
735 | ||
736 | /* second iteration: we got -ENOENT and created a bucket */ | |
737 | } | |
738 | ||
739 | /* failed twice on -ENOENT, unexpected */ | |
9f95a23c | 740 | ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant |
11fdf7f2 TL |
741 | << " name=" << get_bucket_info.bucket_name << dendl; |
742 | return set_cr_error(-EIO); | |
743 | } | |
744 | return 0; | |
745 | } | |
746 | }; | |
747 | ||
eafe8130 | 748 | template<typename EventType> |
11fdf7f2 | 749 | class StoreEventCR : public RGWCoroutine { |
9f95a23c | 750 | RGWDataSyncCtx* const sc; |
11fdf7f2 TL |
751 | RGWDataSyncEnv* const sync_env; |
752 | const PSSubscriptionRef sub; | |
eafe8130 | 753 | const PSEvent<EventType> pse; |
11fdf7f2 TL |
754 | const string oid_prefix; |
755 | ||
756 | public: | |
9f95a23c | 757 | StoreEventCR(RGWDataSyncCtx* const _sc, |
11fdf7f2 | 758 | const PSSubscriptionRef& _sub, |
9f95a23c TL |
759 | const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct), |
760 | sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
761 | sub(_sub), |
762 | pse(_event), | |
763 | oid_prefix(sub->sub_conf->data_oid_prefix) { | |
764 | } | |
765 | ||
766 | int operate() override { | |
11fdf7f2 TL |
767 | rgw_object_simple_put_params put_obj; |
768 | reenter(this) { | |
769 | ||
770 | put_obj.bucket = sub->bucket; | |
771 | put_obj.key = rgw_obj_key(oid_prefix + pse.id()); | |
772 | ||
773 | pse.format(&put_obj.data); | |
774 | ||
775 | { | |
776 | bufferlist bl; | |
777 | pse.encode_event(bl); | |
778 | bufferlist bl64; | |
779 | bl.encode_base64(bl64); | |
780 | put_obj.user_data = bl64.to_str(); | |
781 | } | |
782 | ||
783 | yield call(new RGWObjectSimplePutCR(sync_env->async_rados, | |
784 | sync_env->store, | |
9f95a23c TL |
785 | put_obj, |
786 | sync_env->dpp)); | |
11fdf7f2 | 787 | if (retcode < 0) { |
eafe8130 | 788 | ldpp_dout(sync_env->dpp, 10) << "failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl; |
11fdf7f2 TL |
789 | return set_cr_error(retcode); |
790 | } else { | |
eafe8130 | 791 | ldpp_dout(sync_env->dpp, 20) << "event stored: " << put_obj.bucket << "/" << put_obj.key << dendl; |
11fdf7f2 TL |
792 | } |
793 | ||
794 | return set_cr_done(); | |
795 | } | |
796 | return 0; | |
797 | } | |
798 | }; | |
799 | ||
eafe8130 | 800 | template<typename EventType> |
11fdf7f2 | 801 | class PushEventCR : public RGWCoroutine { |
9f95a23c | 802 | RGWDataSyncCtx* const sc; |
11fdf7f2 | 803 | RGWDataSyncEnv* const sync_env; |
eafe8130 | 804 | const EventRef<EventType> event; |
11fdf7f2 TL |
805 | const PSSubConfigRef& sub_conf; |
806 | ||
807 | public: | |
9f95a23c | 808 | PushEventCR(RGWDataSyncCtx* const _sc, |
11fdf7f2 | 809 | const PSSubscriptionRef& _sub, |
9f95a23c TL |
810 | const EventRef<EventType>& _event) : RGWCoroutine(_sc->cct), |
811 | sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
812 | event(_event), |
813 | sub_conf(_sub->sub_conf) { | |
814 | } | |
815 | ||
816 | int operate() override { | |
817 | reenter(this) { | |
818 | ceph_assert(sub_conf->push_endpoint); | |
819 | yield call(sub_conf->push_endpoint->send_to_completion_async(*event.get(), sync_env)); | |
820 | ||
821 | if (retcode < 0) { | |
eafe8130 | 822 | ldout(sync_env->cct, 10) << "failed to push event: " << event->id << |
11fdf7f2 | 823 | " to endpoint: " << sub_conf->push_endpoint_name << " ret=" << retcode << dendl; |
11fdf7f2 TL |
824 | return set_cr_error(retcode); |
825 | } | |
826 | ||
eafe8130 | 827 | ldout(sync_env->cct, 20) << "event: " << event->id << |
11fdf7f2 | 828 | " pushed to endpoint: " << sub_conf->push_endpoint_name << dendl; |
11fdf7f2 TL |
829 | return set_cr_done(); |
830 | } | |
831 | return 0; | |
832 | } | |
833 | }; | |
834 | ||
835 | public: | |
9f95a23c | 836 | PSSubscription(RGWDataSyncCtx *_sc, |
11fdf7f2 | 837 | PSEnvRef _env, |
9f95a23c | 838 | PSSubConfigRef& _sub_conf) : sc(_sc), sync_env(_sc->env), |
11fdf7f2 TL |
839 | env(_env), |
840 | sub_conf(_sub_conf), | |
841 | data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {} | |
842 | ||
9f95a23c | 843 | PSSubscription(RGWDataSyncCtx *_sc, |
11fdf7f2 | 844 | PSEnvRef _env, |
9f95a23c | 845 | rgw_pubsub_sub_config& user_sub_conf) : sc(_sc), sync_env(_sc->env), |
11fdf7f2 TL |
846 | env(_env), |
847 | sub_conf(std::make_shared<PSSubConfig>()), | |
848 | data_access(std::make_shared<RGWDataAccess>(sync_env->store)) { | |
849 | sub_conf->from_user_conf(sync_env->cct, user_sub_conf); | |
850 | } | |
851 | virtual ~PSSubscription() { | |
852 | if (init_cr) { | |
853 | init_cr->put(); | |
854 | } | |
855 | } | |
856 | ||
857 | template <class C> | |
9f95a23c | 858 | static PSSubscriptionRef get_shared(RGWDataSyncCtx *_sc, |
11fdf7f2 TL |
859 | PSEnvRef _env, |
860 | C& _sub_conf) { | |
9f95a23c TL |
861 | auto sub = std::make_shared<PSSubscription>(_sc, _env, _sub_conf); |
862 | sub->init_cr = new InitCR(_sc, sub); | |
11fdf7f2 TL |
863 | sub->init_cr->get(); |
864 | return sub; | |
865 | } | |
866 | ||
867 | int call_init_cr(RGWCoroutine *caller) { | |
868 | return init_cr->execute(caller); | |
869 | } | |
870 | ||
eafe8130 | 871 | template<typename EventType> |
9f95a23c TL |
872 | static RGWCoroutine *store_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) { |
873 | return new StoreEventCR<EventType>(sc, sub, event); | |
11fdf7f2 TL |
874 | } |
875 | ||
eafe8130 | 876 | template<typename EventType> |
9f95a23c TL |
877 | static RGWCoroutine *push_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef<EventType>& event) { |
878 | return new PushEventCR<EventType>(sc, sub, event); | |
11fdf7f2 TL |
879 | } |
880 | friend class InitCR; | |
881 | }; | |
882 | ||
11fdf7f2 TL |
883 | class PSManager |
884 | { | |
9f95a23c | 885 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
886 | RGWDataSyncEnv *sync_env; |
887 | PSEnvRef env; | |
888 | ||
eafe8130 | 889 | std::map<string, PSSubscriptionRef> subs; |
11fdf7f2 TL |
890 | |
891 | class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> { | |
9f95a23c | 892 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
893 | RGWDataSyncEnv *sync_env; |
894 | PSManagerRef mgr; | |
895 | rgw_user owner; | |
896 | string sub_name; | |
897 | string sub_id; | |
898 | PSSubscriptionRef *ref; | |
899 | ||
900 | PSConfigRef conf; | |
901 | ||
902 | PSSubConfigRef sub_conf; | |
903 | rgw_pubsub_sub_config user_sub_conf; | |
eafe8130 | 904 | |
11fdf7f2 | 905 | public: |
9f95a23c | 906 | GetSubCR(RGWDataSyncCtx *_sc, |
11fdf7f2 TL |
907 | PSManagerRef& _mgr, |
908 | const rgw_user& _owner, | |
909 | const string& _sub_name, | |
9f95a23c TL |
910 | PSSubscriptionRef *_ref) : RGWSingletonCR<PSSubscriptionRef>(_sc->cct), |
911 | sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
912 | mgr(_mgr), |
913 | owner(_owner), | |
914 | sub_name(_sub_name), | |
915 | ref(_ref), | |
916 | conf(mgr->env->conf) { | |
917 | } | |
eafe8130 | 918 | ~GetSubCR() { } |
11fdf7f2 TL |
919 | |
920 | int operate() override { | |
921 | reenter(this) { | |
922 | if (owner.empty()) { | |
923 | if (!conf->find_sub(sub_name, &sub_conf)) { | |
eafe8130 | 924 | ldout(sync_env->cct, 10) << "failed to find subscription config: name=" << sub_name << dendl; |
11fdf7f2 TL |
925 | mgr->remove_get_sub(owner, sub_name); |
926 | return set_cr_error(-ENOENT); | |
927 | } | |
928 | ||
9f95a23c | 929 | *ref = PSSubscription::get_shared(sc, mgr->env, sub_conf); |
11fdf7f2 TL |
930 | } else { |
931 | using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>; | |
932 | yield { | |
933 | RGWUserPubSub ups(sync_env->store, owner); | |
934 | rgw_raw_obj obj; | |
935 | ups.get_sub_meta_obj(sub_name, &obj); | |
936 | bool empty_on_enoent = false; | |
9f95a23c | 937 | call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj, |
11fdf7f2 TL |
938 | obj, |
939 | &user_sub_conf, empty_on_enoent)); | |
940 | } | |
941 | if (retcode < 0) { | |
942 | mgr->remove_get_sub(owner, sub_name); | |
943 | return set_cr_error(retcode); | |
944 | } | |
945 | ||
9f95a23c | 946 | *ref = PSSubscription::get_shared(sc, mgr->env, user_sub_conf); |
11fdf7f2 TL |
947 | } |
948 | ||
949 | yield (*ref)->call_init_cr(this); | |
950 | if (retcode < 0) { | |
eafe8130 | 951 | ldout(sync_env->cct, 10) << "failed to init subscription" << dendl; |
11fdf7f2 TL |
952 | mgr->remove_get_sub(owner, sub_name); |
953 | return set_cr_error(retcode); | |
954 | } | |
955 | ||
956 | if (owner.empty()) { | |
957 | mgr->subs[sub_name] = *ref; | |
958 | } | |
959 | mgr->remove_get_sub(owner, sub_name); | |
960 | ||
961 | return set_cr_done(); | |
962 | } | |
963 | return 0; | |
964 | } | |
965 | ||
966 | void return_result(PSSubscriptionRef *result) override { | |
967 | ldout(cct, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl; | |
968 | if (retcode >= 0) { | |
969 | *result = *ref; | |
970 | } | |
971 | } | |
972 | }; | |
973 | ||
974 | string sub_id(const rgw_user& owner, const string& sub_name) { | |
975 | string owner_prefix; | |
976 | if (!owner.empty()) { | |
977 | owner_prefix = owner.to_str() + "/"; | |
978 | } | |
979 | ||
980 | return owner_prefix + sub_name; | |
981 | } | |
982 | ||
eafe8130 | 983 | std::map<std::string, GetSubCR *> get_subs; |
11fdf7f2 TL |
984 | |
985 | GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) { | |
986 | return get_subs[sub_id(owner, name)]; | |
987 | } | |
988 | ||
989 | void remove_get_sub(const rgw_user& owner, const string& name) { | |
990 | get_subs.erase(sub_id(owner, name)); | |
991 | } | |
992 | ||
993 | bool find_sub_instance(const rgw_user& owner, const string& sub_name, PSSubscriptionRef *sub) { | |
994 | auto iter = subs.find(sub_id(owner, sub_name)); | |
995 | if (iter != subs.end()) { | |
996 | *sub = iter->second; | |
997 | return true; | |
998 | } | |
999 | return false; | |
1000 | } | |
1001 | ||
9f95a23c TL |
1002 | PSManager(RGWDataSyncCtx *_sc, |
1003 | PSEnvRef _env) : sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
1004 | env(_env) {} |
1005 | ||
1006 | public: | |
9f95a23c | 1007 | static PSManagerRef get_shared(RGWDataSyncCtx *_sc, |
11fdf7f2 | 1008 | PSEnvRef _env) { |
9f95a23c | 1009 | return std::shared_ptr<PSManager>(new PSManager(_sc, _env)); |
11fdf7f2 TL |
1010 | } |
1011 | ||
9f95a23c | 1012 | static int call_get_subscription_cr(RGWDataSyncCtx *sc, PSManagerRef& mgr, |
11fdf7f2 TL |
1013 | RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) { |
1014 | if (mgr->find_sub_instance(owner, sub_name, ref)) { | |
1015 | /* found it! nothing to execute */ | |
9f95a23c | 1016 | ldout(sc->cct, 20) << __func__ << "(): found sub instance" << dendl; |
11fdf7f2 TL |
1017 | } |
1018 | auto& gs = mgr->get_get_subs(owner, sub_name); | |
1019 | if (!gs) { | |
9f95a23c TL |
1020 | ldout(sc->cct, 20) << __func__ << "(): first get subs" << dendl; |
1021 | gs = new GetSubCR(sc, mgr, owner, sub_name, ref); | |
11fdf7f2 | 1022 | } |
9f95a23c | 1023 | ldout(sc->cct, 20) << __func__ << "(): executing get subs" << dendl; |
11fdf7f2 TL |
1024 | return gs->execute(caller, ref); |
1025 | } | |
1026 | ||
1027 | friend class GetSubCR; | |
1028 | }; | |
1029 | ||
1030 | void PSEnv::init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr) { | |
1031 | manager = mgr; | |
1032 | conf->init_instance(realm, instance_id); | |
1033 | } | |
1034 | ||
1035 | class RGWPSInitEnvCBCR : public RGWCoroutine { | |
9f95a23c | 1036 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
1037 | RGWDataSyncEnv *sync_env; |
1038 | PSEnvRef env; | |
1039 | PSConfigRef& conf; | |
1040 | ||
1041 | rgw_user_create_params create_user; | |
1042 | rgw_get_user_info_params get_user_info; | |
1043 | public: | |
9f95a23c TL |
1044 | RGWPSInitEnvCBCR(RGWDataSyncCtx *_sc, |
1045 | PSEnvRef& _env) : RGWCoroutine(_sc->cct), | |
1046 | sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
1047 | env(_env), conf(env->conf) {} |
1048 | int operate() override { | |
1049 | reenter(this) { | |
9f95a23c | 1050 | ldpp_dout(sync_env->dpp, 1) << ": init pubsub config zone=" << sc->source_zone << dendl; |
11fdf7f2 TL |
1051 | |
1052 | /* nothing to do here right now */ | |
1053 | create_user.user = conf->user; | |
1054 | create_user.max_buckets = 0; /* unlimited */ | |
1055 | create_user.display_name = "pubsub"; | |
1056 | create_user.generate_key = false; | |
9f95a23c | 1057 | yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user, sync_env->dpp)); |
1911f103 | 1058 | if (retcode < 0 && retcode != -ERR_USER_EXIST) { |
9f95a23c | 1059 | ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl; |
11fdf7f2 TL |
1060 | return set_cr_error(retcode); |
1061 | } | |
1062 | ||
1063 | get_user_info.user = conf->user; | |
1064 | yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info)); | |
1065 | if (retcode < 0) { | |
9f95a23c | 1066 | ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl; |
11fdf7f2 TL |
1067 | return set_cr_error(retcode); |
1068 | } | |
1069 | ||
9f95a23c | 1070 | ldpp_dout(sync_env->dpp, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env->data_user_info, true) << dendl; |
11fdf7f2 TL |
1071 | |
1072 | ||
1073 | return set_cr_done(); | |
1074 | } | |
1075 | return 0; | |
1076 | } | |
1077 | }; | |
1078 | ||
eafe8130 TL |
1079 | bool match(const rgw_pubsub_topic_filter& filter, const std::string& key_name, rgw::notify::EventType event_type) { |
1080 | if (!match(filter.events, event_type)) { | |
1081 | return false; | |
1082 | } | |
1083 | if (!match(filter.s3_filter.key_filter, key_name)) { | |
1084 | return false; | |
1085 | } | |
1086 | return true; | |
1087 | } | |
1088 | ||
11fdf7f2 | 1089 | class RGWPSFindBucketTopicsCR : public RGWCoroutine { |
9f95a23c | 1090 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
1091 | RGWDataSyncEnv *sync_env; |
1092 | PSEnvRef env; | |
1093 | rgw_user owner; | |
1094 | rgw_bucket bucket; | |
1095 | rgw_obj_key key; | |
eafe8130 | 1096 | rgw::notify::EventType event_type; |
11fdf7f2 TL |
1097 | |
1098 | RGWUserPubSub ups; | |
1099 | ||
1100 | rgw_raw_obj bucket_obj; | |
1101 | rgw_raw_obj user_obj; | |
1102 | rgw_pubsub_bucket_topics bucket_topics; | |
1103 | rgw_pubsub_user_topics user_topics; | |
1104 | TopicsRef *topics; | |
1105 | public: | |
9f95a23c | 1106 | RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc, |
11fdf7f2 TL |
1107 | PSEnvRef& _env, |
1108 | const rgw_user& _owner, | |
1109 | const rgw_bucket& _bucket, | |
1110 | const rgw_obj_key& _key, | |
eafe8130 | 1111 | rgw::notify::EventType _event_type, |
9f95a23c TL |
1112 | TopicsRef *_topics) : RGWCoroutine(_sc->cct), |
1113 | sc(_sc), sync_env(_sc->env), | |
11fdf7f2 TL |
1114 | env(_env), |
1115 | owner(_owner), | |
1116 | bucket(_bucket), | |
1117 | key(_key), | |
eafe8130 | 1118 | event_type(_event_type), |
9f95a23c | 1119 | ups(sync_env->store, owner), |
11fdf7f2 TL |
1120 | topics(_topics) { |
1121 | *topics = std::make_shared<vector<PSTopicConfigRef> >(); | |
1122 | } | |
1123 | int operate() override { | |
1124 | reenter(this) { | |
1125 | ups.get_bucket_meta_obj(bucket, &bucket_obj); | |
1126 | ups.get_user_meta_obj(&user_obj); | |
1127 | ||
1128 | using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>; | |
1129 | yield { | |
1130 | bool empty_on_enoent = true; | |
9f95a23c | 1131 | call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj, |
11fdf7f2 TL |
1132 | bucket_obj, |
1133 | &bucket_topics, empty_on_enoent)); | |
1134 | } | |
1135 | if (retcode < 0 && retcode != -ENOENT) { | |
1136 | return set_cr_error(retcode); | |
1137 | } | |
1138 | ||
1139 | ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl; | |
1140 | ||
1141 | if (!bucket_topics.topics.empty()) { | |
1142 | using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>; | |
1143 | yield { | |
1144 | bool empty_on_enoent = true; | |
9f95a23c | 1145 | call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj, |
11fdf7f2 TL |
1146 | user_obj, |
1147 | &user_topics, empty_on_enoent)); | |
1148 | } | |
1149 | if (retcode < 0 && retcode != -ENOENT) { | |
1150 | return set_cr_error(retcode); | |
1151 | } | |
1152 | } | |
1153 | ||
1154 | for (auto& titer : bucket_topics.topics) { | |
1155 | auto& topic_filter = titer.second; | |
1156 | auto& info = topic_filter.topic; | |
eafe8130 | 1157 | if (!match(topic_filter, key.name, event_type)) { |
11fdf7f2 TL |
1158 | continue; |
1159 | } | |
eafe8130 | 1160 | std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>(); |
11fdf7f2 TL |
1161 | tc->name = info.name; |
1162 | tc->subs = user_topics.topics[info.name].subs; | |
9f95a23c | 1163 | tc->opaque_data = info.opaque_data; |
11fdf7f2 TL |
1164 | (*topics)->push_back(tc); |
1165 | } | |
1166 | ||
1167 | env->conf->get_topics(sync_env->cct, bucket, key, topics); | |
1168 | return set_cr_done(); | |
1169 | } | |
1170 | return 0; | |
1171 | } | |
1172 | }; | |
1173 | ||
1174 | class RGWPSHandleObjEventCR : public RGWCoroutine { | |
9f95a23c | 1175 | RGWDataSyncCtx* const sc; |
11fdf7f2 TL |
1176 | const PSEnvRef env; |
1177 | const rgw_user& owner; | |
eafe8130 TL |
1178 | const EventRef<rgw_pubsub_event> event; |
1179 | const EventRef<rgw_pubsub_s3_record> record; | |
11fdf7f2 TL |
1180 | const TopicsRef topics; |
1181 | const std::array<rgw_user, 2> owners; | |
1182 | bool has_subscriptions; | |
1183 | bool event_handled; | |
1184 | bool sub_conf_found; | |
1185 | PSSubscriptionRef sub; | |
1186 | std::array<rgw_user, 2>::const_iterator oiter; | |
eafe8130 | 1187 | std::vector<PSTopicConfigRef>::const_iterator titer; |
9f95a23c | 1188 | std::set<std::string>::const_iterator siter; |
11fdf7f2 TL |
1189 | int last_sub_conf_error; |
1190 | ||
1191 | public: | |
9f95a23c | 1192 | RGWPSHandleObjEventCR(RGWDataSyncCtx* const _sc, |
11fdf7f2 TL |
1193 | const PSEnvRef _env, |
1194 | const rgw_user& _owner, | |
eafe8130 TL |
1195 | const EventRef<rgw_pubsub_event>& _event, |
1196 | const EventRef<rgw_pubsub_s3_record>& _record, | |
9f95a23c TL |
1197 | const TopicsRef& _topics) : RGWCoroutine(_sc->cct), |
1198 | sc(_sc), | |
11fdf7f2 TL |
1199 | env(_env), |
1200 | owner(_owner), | |
1201 | event(_event), | |
eafe8130 | 1202 | record(_record), |
11fdf7f2 TL |
1203 | topics(_topics), |
1204 | owners({owner, rgw_user{}}), | |
1205 | has_subscriptions(false), | |
1206 | event_handled(false) {} | |
1207 | ||
1208 | int operate() override { | |
1209 | reenter(this) { | |
9f95a23c | 1210 | ldout(sc->cct, 20) << ": handle event: obj: z=" << sc->source_zone |
11fdf7f2 TL |
1211 | << " event=" << json_str("event", *event, false) |
1212 | << " owner=" << owner << dendl; | |
1213 | ||
9f95a23c | 1214 | ldout(sc->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl; |
eafe8130 TL |
1215 | |
1216 | // outside caller should check that | |
1217 | ceph_assert(!topics->empty()); | |
11fdf7f2 TL |
1218 | |
1219 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered); | |
1220 | ||
eafe8130 | 1221 | // loop over all topics related to the bucket/object |
11fdf7f2 | 1222 | for (titer = topics->begin(); titer != topics->end(); ++titer) { |
9f95a23c | 1223 | ldout(sc->cct, 20) << ": notification for " << event->source << ": topic=" << |
11fdf7f2 | 1224 | (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl; |
eafe8130 | 1225 | // loop over all subscriptions of the topic |
11fdf7f2 | 1226 | for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) { |
9f95a23c | 1227 | ldout(sc->cct, 20) << ": subscription: " << *siter << dendl; |
11fdf7f2 TL |
1228 | has_subscriptions = true; |
1229 | sub_conf_found = false; | |
eafe8130 TL |
1230 | // try to read subscription configuration from global/user cond |
1231 | // configuration is considered missing only if does not exist in either | |
11fdf7f2 | 1232 | for (oiter = owners.begin(); oiter != owners.end(); ++oiter) { |
9f95a23c | 1233 | yield PSManager::call_get_subscription_cr(sc, env->manager, this, *oiter, *siter, &sub); |
11fdf7f2 | 1234 | if (retcode < 0) { |
eafe8130 TL |
1235 | if (sub_conf_found) { |
1236 | // not a real issue, sub conf already found | |
1237 | retcode = 0; | |
1238 | } | |
11fdf7f2 TL |
1239 | last_sub_conf_error = retcode; |
1240 | continue; | |
1241 | } | |
1242 | sub_conf_found = true; | |
eafe8130 TL |
1243 | if (sub->sub_conf->s3_id.empty()) { |
1244 | // subscription was not made by S3 compatible API | |
9f95a23c TL |
1245 | ldout(sc->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; |
1246 | yield call(PSSubscription::store_event_cr(sc, sub, event)); | |
eafe8130 TL |
1247 | if (retcode < 0) { |
1248 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); | |
9f95a23c | 1249 | ldout(sc->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl; |
eafe8130 TL |
1250 | } else { |
1251 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); | |
1252 | event_handled = true; | |
1253 | } | |
1254 | if (sub->sub_conf->push_endpoint) { | |
9f95a23c TL |
1255 | ldout(sc->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; |
1256 | yield call(PSSubscription::push_event_cr(sc, sub, event)); | |
eafe8130 TL |
1257 | if (retcode < 0) { |
1258 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); | |
9f95a23c | 1259 | ldout(sc->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl; |
eafe8130 TL |
1260 | } else { |
1261 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); | |
1262 | event_handled = true; | |
1263 | } | |
1264 | } | |
1265 | } else { | |
1266 | // subscription was made by S3 compatible API | |
9f95a23c | 1267 | ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; |
eafe8130 | 1268 | record->configurationId = sub->sub_conf->s3_id; |
9f95a23c TL |
1269 | record->opaque_data = (*titer)->opaque_data; |
1270 | yield call(PSSubscription::store_event_cr(sc, sub, record)); | |
11fdf7f2 | 1271 | if (retcode < 0) { |
eafe8130 | 1272 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); |
9f95a23c | 1273 | ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl; |
11fdf7f2 | 1274 | } else { |
eafe8130 | 1275 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); |
11fdf7f2 TL |
1276 | event_handled = true; |
1277 | } | |
eafe8130 | 1278 | if (sub->sub_conf->push_endpoint) { |
9f95a23c TL |
1279 | ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; |
1280 | yield call(PSSubscription::push_event_cr(sc, sub, record)); | |
eafe8130 TL |
1281 | if (retcode < 0) { |
1282 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); | |
9f95a23c | 1283 | ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl; |
eafe8130 TL |
1284 | } else { |
1285 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); | |
1286 | event_handled = true; | |
1287 | } | |
1288 | } | |
11fdf7f2 TL |
1289 | } |
1290 | } | |
1291 | if (!sub_conf_found) { | |
1292 | // could not find conf for subscription at user or global levels | |
eafe8130 | 1293 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf); |
9f95a23c | 1294 | ldout(sc->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter |
11fdf7f2 | 1295 | << " ret=" << last_sub_conf_error << dendl; |
eafe8130 TL |
1296 | if (retcode == -ENOENT) { |
1297 | // missing subscription info should be reflected back as invalid argument | |
1298 | // and not as missing object | |
1299 | retcode = -EINVAL; | |
1300 | } | |
11fdf7f2 TL |
1301 | } |
1302 | } | |
1303 | } | |
1304 | if (has_subscriptions && !event_handled) { | |
1305 | // event is considered "lost" of it has subscriptions on any of its topics | |
1306 | // but it was not stored in, or pushed to, any of them | |
1307 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost); | |
1308 | } | |
1309 | if (retcode < 0) { | |
1310 | return set_cr_error(retcode); | |
1311 | } | |
1312 | return set_cr_done(); | |
1313 | } | |
1314 | return 0; | |
1315 | } | |
1316 | }; | |
1317 | ||
eafe8130 | 1318 | // coroutine invoked on remote object creation |
11fdf7f2 | 1319 | class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { |
9f95a23c TL |
1320 | RGWDataSyncCtx *sc; |
1321 | rgw_bucket_sync_pipe sync_pipe; | |
11fdf7f2 TL |
1322 | PSEnvRef env; |
1323 | std::optional<uint64_t> versioned_epoch; | |
eafe8130 TL |
1324 | EventRef<rgw_pubsub_event> event; |
1325 | EventRef<rgw_pubsub_s3_record> record; | |
11fdf7f2 TL |
1326 | TopicsRef topics; |
1327 | public: | |
9f95a23c TL |
1328 | RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc, |
1329 | rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, | |
11fdf7f2 | 1330 | PSEnvRef _env, std::optional<uint64_t> _versioned_epoch, |
9f95a23c TL |
1331 | TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key), |
1332 | sc(_sc), | |
1333 | sync_pipe(_sync_pipe), | |
11fdf7f2 TL |
1334 | env(_env), |
1335 | versioned_epoch(_versioned_epoch), | |
1336 | topics(_topics) { | |
1337 | } | |
1338 | int operate() override { | |
1339 | reenter(this) { | |
9f95a23c TL |
1340 | ldout(sc->cct, 20) << ": stat of remote obj: z=" << sc->source_zone |
1341 | << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime | |
11fdf7f2 TL |
1342 | << " attrs=" << attrs << dendl; |
1343 | { | |
1344 | std::vector<std::pair<std::string, std::string> > attrs; | |
1345 | for (auto& attr : attrs) { | |
9f95a23c | 1346 | std::string k = attr.first; |
11fdf7f2 TL |
1347 | if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) { |
1348 | k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1); | |
1349 | } | |
1350 | attrs.push_back(std::make_pair(k, attr.second)); | |
eafe8130 TL |
1351 | } |
1352 | // at this point we don't know whether we need the ceph event or S3 record | |
1353 | // this is why both are created here, once we have information about the | |
1354 | // subscription, we will store/push only the relevant ones | |
9f95a23c TL |
1355 | make_event_ref(sc->cct, |
1356 | sync_pipe.info.source_bs.bucket, key, | |
11fdf7f2 | 1357 | mtime, &attrs, |
eafe8130 | 1358 | rgw::notify::ObjectCreated, &event); |
9f95a23c TL |
1359 | make_s3_record_ref(sc->cct, |
1360 | sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key, | |
eafe8130 TL |
1361 | mtime, &attrs, |
1362 | rgw::notify::ObjectCreated, &record); | |
11fdf7f2 TL |
1363 | } |
1364 | ||
9f95a23c | 1365 | yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, record, topics)); |
11fdf7f2 TL |
1366 | if (retcode < 0) { |
1367 | return set_cr_error(retcode); | |
1368 | } | |
1369 | return set_cr_done(); | |
1370 | } | |
1371 | return 0; | |
1372 | } | |
1373 | }; | |
1374 | ||
1375 | class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { | |
9f95a23c | 1376 | rgw_bucket_sync_pipe sync_pipe; |
11fdf7f2 TL |
1377 | PSEnvRef env; |
1378 | std::optional<uint64_t> versioned_epoch; | |
1379 | TopicsRef topics; | |
1380 | public: | |
9f95a23c TL |
1381 | RGWPSHandleRemoteObjCR(RGWDataSyncCtx *_sc, |
1382 | rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, | |
11fdf7f2 | 1383 | PSEnvRef _env, std::optional<uint64_t> _versioned_epoch, |
9f95a23c TL |
1384 | TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key), |
1385 | sync_pipe(_sync_pipe), | |
11fdf7f2 TL |
1386 | env(_env), versioned_epoch(_versioned_epoch), |
1387 | topics(_topics) { | |
1388 | } | |
1389 | ||
1390 | ~RGWPSHandleRemoteObjCR() override {} | |
1391 | ||
1392 | RGWStatRemoteObjCBCR *allocate_callback() override { | |
9f95a23c | 1393 | return new RGWPSHandleRemoteObjCBCR(sc, sync_pipe, key, env, versioned_epoch, topics); |
11fdf7f2 TL |
1394 | } |
1395 | }; | |
1396 | ||
1397 | class RGWPSHandleObjCreateCR : public RGWCoroutine { | |
9f95a23c TL |
1398 | RGWDataSyncCtx *sc; |
1399 | rgw_bucket_sync_pipe sync_pipe; | |
11fdf7f2 TL |
1400 | rgw_obj_key key; |
1401 | PSEnvRef env; | |
1402 | std::optional<uint64_t> versioned_epoch; | |
1403 | TopicsRef topics; | |
1404 | public: | |
9f95a23c TL |
1405 | RGWPSHandleObjCreateCR(RGWDataSyncCtx *_sc, |
1406 | rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, | |
1407 | PSEnvRef _env, std::optional<uint64_t> _versioned_epoch) : RGWCoroutine(_sc->cct), | |
1408 | sc(_sc), | |
1409 | sync_pipe(_sync_pipe), | |
11fdf7f2 TL |
1410 | key(_key), |
1411 | env(_env), | |
1412 | versioned_epoch(_versioned_epoch) { | |
1413 | } | |
1414 | ||
1415 | ~RGWPSHandleObjCreateCR() override {} | |
1416 | ||
1417 | int operate() override { | |
1418 | reenter(this) { | |
9f95a23c TL |
1419 | yield call(new RGWPSFindBucketTopicsCR(sc, env, sync_pipe.dest_bucket_info.owner, |
1420 | sync_pipe.info.source_bs.bucket, key, | |
eafe8130 | 1421 | rgw::notify::ObjectCreated, |
11fdf7f2 TL |
1422 | &topics)); |
1423 | if (retcode < 0) { | |
9f95a23c | 1424 | ldout(sc->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; |
11fdf7f2 TL |
1425 | return set_cr_error(retcode); |
1426 | } | |
1427 | if (topics->empty()) { | |
9f95a23c | 1428 | ldout(sc->cct, 20) << "no topics found for " << sync_pipe.info.source_bs.bucket << "/" << key << dendl; |
11fdf7f2 TL |
1429 | return set_cr_done(); |
1430 | } | |
9f95a23c | 1431 | yield call(new RGWPSHandleRemoteObjCR(sc, sync_pipe, key, env, versioned_epoch, topics)); |
11fdf7f2 TL |
1432 | if (retcode < 0) { |
1433 | return set_cr_error(retcode); | |
1434 | } | |
1435 | return set_cr_done(); | |
1436 | } | |
1437 | return 0; | |
1438 | } | |
1439 | }; | |
1440 | ||
eafe8130 | 1441 | // coroutine invoked on remote object deletion |
11fdf7f2 | 1442 | class RGWPSGenericObjEventCBCR : public RGWCoroutine { |
9f95a23c | 1443 | RGWDataSyncCtx *sc; |
11fdf7f2 TL |
1444 | PSEnvRef env; |
1445 | rgw_user owner; | |
1446 | rgw_bucket bucket; | |
1447 | rgw_obj_key key; | |
1448 | ceph::real_time mtime; | |
eafe8130 TL |
1449 | rgw::notify::EventType event_type; |
1450 | EventRef<rgw_pubsub_event> event; | |
1451 | EventRef<rgw_pubsub_s3_record> record; | |
11fdf7f2 TL |
1452 | TopicsRef topics; |
1453 | public: | |
9f95a23c | 1454 | RGWPSGenericObjEventCBCR(RGWDataSyncCtx *_sc, |
11fdf7f2 | 1455 | PSEnvRef _env, |
9f95a23c TL |
1456 | rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime, |
1457 | rgw::notify::EventType _event_type) : RGWCoroutine(_sc->cct), | |
1458 | sc(_sc), | |
11fdf7f2 | 1459 | env(_env), |
9f95a23c TL |
1460 | owner(_sync_pipe.dest_bucket_info.owner), |
1461 | bucket(_sync_pipe.dest_bucket_info.bucket), | |
11fdf7f2 | 1462 | key(_key), |
eafe8130 | 1463 | mtime(_mtime), event_type(_event_type) {} |
11fdf7f2 TL |
1464 | int operate() override { |
1465 | reenter(this) { | |
9f95a23c | 1466 | ldout(sc->cct, 20) << ": remove remote obj: z=" << sc->source_zone |
11fdf7f2 | 1467 | << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl; |
9f95a23c | 1468 | yield call(new RGWPSFindBucketTopicsCR(sc, env, owner, bucket, key, event_type, &topics)); |
11fdf7f2 | 1469 | if (retcode < 0) { |
9f95a23c | 1470 | ldout(sc->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; |
11fdf7f2 TL |
1471 | return set_cr_error(retcode); |
1472 | } | |
1473 | if (topics->empty()) { | |
9f95a23c | 1474 | ldout(sc->cct, 20) << "no topics found for " << bucket << "/" << key << dendl; |
11fdf7f2 TL |
1475 | return set_cr_done(); |
1476 | } | |
eafe8130 TL |
1477 | // at this point we don't know whether we need the ceph event or S3 record |
1478 | // this is why both are created here, once we have information about the | |
1479 | // subscription, we will store/push only the relevant ones | |
9f95a23c | 1480 | make_event_ref(sc->cct, |
eafe8130 TL |
1481 | bucket, key, |
1482 | mtime, nullptr, | |
1483 | event_type, &event); | |
9f95a23c | 1484 | make_s3_record_ref(sc->cct, |
eafe8130 TL |
1485 | bucket, owner, key, |
1486 | mtime, nullptr, | |
1487 | event_type, &record); | |
9f95a23c | 1488 | yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, record, topics)); |
11fdf7f2 TL |
1489 | if (retcode < 0) { |
1490 | return set_cr_error(retcode); | |
1491 | } | |
1492 | return set_cr_done(); | |
1493 | } | |
1494 | return 0; | |
1495 | } | |
1496 | ||
1497 | }; | |
1498 | ||
1499 | class RGWPSDataSyncModule : public RGWDataSyncModule { | |
1500 | PSEnvRef env; | |
1501 | PSConfigRef& conf; | |
eafe8130 | 1502 | |
11fdf7f2 TL |
1503 | public: |
1504 | RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared<PSEnv>()), conf(env->conf) { | |
1505 | env->init(cct, config); | |
1506 | } | |
eafe8130 | 1507 | |
11fdf7f2 TL |
1508 | ~RGWPSDataSyncModule() override {} |
1509 | ||
9f95a23c TL |
1510 | void init(RGWDataSyncCtx *sc, uint64_t instance_id) override { |
1511 | auto sync_env = sc->env; | |
1512 | PSManagerRef mgr = PSManager::get_shared(sc, env); | |
1513 | env->init_instance(sync_env->svc->zone->get_realm(), instance_id, mgr); | |
11fdf7f2 TL |
1514 | } |
1515 | ||
9f95a23c TL |
1516 | RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override { |
1517 | ldout(sc->cct, 5) << conf->id << ": start" << dendl; | |
1518 | return new RGWPSInitEnvCBCR(sc, env); | |
11fdf7f2 | 1519 | } |
eafe8130 | 1520 | |
9f95a23c | 1521 | RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, |
eafe8130 | 1522 | rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override { |
9f95a23c | 1523 | ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe << |
eafe8130 | 1524 | " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; |
9f95a23c | 1525 | return new RGWPSHandleObjCreateCR(sc, sync_pipe, key, env, versioned_epoch); |
11fdf7f2 | 1526 | } |
eafe8130 | 1527 | |
9f95a23c | 1528 | RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, |
eafe8130 | 1529 | rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { |
9f95a23c | 1530 | ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe << |
eafe8130 | 1531 | " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; |
9f95a23c | 1532 | return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete); |
11fdf7f2 | 1533 | } |
eafe8130 | 1534 | |
9f95a23c | 1535 | RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, |
eafe8130 | 1536 | rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { |
9f95a23c | 1537 | ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe << |
eafe8130 | 1538 | " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; |
9f95a23c | 1539 | return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated); |
11fdf7f2 TL |
1540 | } |
1541 | ||
1542 | PSConfigRef& get_conf() { return conf; } | |
1543 | }; | |
1544 | ||
1545 | RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config) | |
1546 | { | |
1547 | data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config)); | |
9f95a23c | 1548 | const std::string jconf = json_str("conf", *data_handler->get_conf()); |
11fdf7f2 TL |
1549 | JSONParser p; |
1550 | if (!p.parse(jconf.c_str(), jconf.size())) { | |
eafe8130 | 1551 | ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl; |
11fdf7f2 TL |
1552 | effective_conf = config; |
1553 | } else { | |
1554 | effective_conf.decode_json(&p); | |
1555 | } | |
eafe8130 TL |
1556 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT |
1557 | if (!rgw::amqp::init(cct)) { | |
1558 | ldout(cct, 1) << "ERROR: failed to initialize AMQP manager in pubsub sync module" << dendl; | |
1559 | } | |
1560 | #endif | |
9f95a23c TL |
1561 | #ifdef WITH_RADOSGW_KAFKA_ENDPOINT |
1562 | if (!rgw::kafka::init(cct)) { | |
1563 | ldout(cct, 1) << "ERROR: failed to initialize Kafka manager in pubsub sync module" << dendl; | |
1564 | } | |
1565 | #endif | |
eafe8130 TL |
1566 | } |
1567 | ||
1568 | RGWPSSyncModuleInstance::~RGWPSSyncModuleInstance() { | |
1569 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT | |
1570 | rgw::amqp::shutdown(); | |
1571 | #endif | |
9f95a23c TL |
1572 | #ifdef WITH_RADOSGW_KAFKA_ENDPOINT |
1573 | rgw::kafka::shutdown(); | |
1574 | #endif | |
11fdf7f2 TL |
1575 | } |
1576 | ||
1577 | RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler() | |
1578 | { | |
1579 | return data_handler.get(); | |
1580 | } | |
1581 | ||
1582 | RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) { | |
1583 | if (dialect != RGW_REST_S3) { | |
1584 | return orig; | |
1585 | } | |
eafe8130 TL |
1586 | return new RGWRESTMgr_PubSub(); |
1587 | } | |
1588 | ||
1589 | bool RGWPSSyncModuleInstance::should_full_sync() const { | |
1590 | return data_handler->get_conf()->start_with_full_sync; | |
11fdf7f2 TL |
1591 | } |
1592 | ||
1593 | int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { | |
1594 | instance->reset(new RGWPSSyncModuleInstance(cct, config)); | |
1595 | return 0; | |
1596 | } | |
1597 | ||
eafe8130 | 1598 |