]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | #include "rgw_common.h" |
2 | #include "rgw_coroutine.h" | |
3 | #include "rgw_sync_module.h" | |
4 | #include "rgw_data_sync.h" | |
5 | #include "rgw_sync_module_pubsub.h" | |
6 | #include "rgw_sync_module_pubsub_rest.h" | |
7 | #include "rgw_rest_conn.h" | |
8 | #include "rgw_cr_rados.h" | |
9 | #include "rgw_cr_rest.h" | |
10 | #include "rgw_cr_tools.h" | |
11 | #include "rgw_op.h" | |
12 | #include "rgw_pubsub.h" | |
13 | #include "rgw_pubsub_push.h" | |
eafe8130 | 14 | #include "rgw_notify_event_type.h" |
11fdf7f2 | 15 | #include "rgw_perf_counters.h" |
eafe8130 TL |
16 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT |
17 | #include "rgw_amqp.h" | |
18 | #endif | |
11fdf7f2 | 19 | |
eafe8130 | 20 | #include <boost/algorithm/hex.hpp> |
11fdf7f2 TL |
21 | #include <boost/asio/yield.hpp> |
22 | ||
23 | #define dout_subsys ceph_subsys_rgw | |
24 | ||
25 | ||
26 | #define PUBSUB_EVENTS_RETENTION_DEFAULT 7 | |
27 | ||
28 | /* | |
29 | ||
30 | config: | |
31 | ||
32 | { | |
33 | "tenant": <tenant>, # default: <empty> | |
34 | "uid": <uid>, # default: "pubsub" | |
35 | "data_bucket_prefix": <prefix> # default: "pubsub-" | |
36 | "data_oid_prefix": <prefix> # | |
eafe8130 TL |
37 | "events_retention_days": <int> # default: 7 |
38 | "start_with_full_sync" <bool> # default: false | |
11fdf7f2 TL |
39 | |
40 | # non-dynamic config | |
41 | "notifications": [ | |
42 | { | |
43 | "path": <notification-path>, # this can be either an explicit path: <bucket>, or <bucket>/<object>, | |
44 | # or a prefix if it ends with a wildcard | |
45 | "topic": <topic-name> | |
46 | }, | |
47 | ... | |
48 | ], | |
49 | "subscriptions": [ | |
50 | { | |
51 | "name": <subscription-name>, | |
52 | "topic": <topic>, | |
53 | "push_endpoint": <endpoint>, | |
eafe8130 | 54 | "push_endpoint_args:" <arg list>. # any push endpoint specific args (include all args) |
11fdf7f2 TL |
55 | "data_bucket": <bucket>, # override name of bucket where subscription data will be store |
56 | "data_oid_prefix": <prefix> # set prefix for subscription data object ids | |
eafe8130 | 57 | "s3_id": <id> # in case of S3 compatible notifications, the notification ID will be set here |
11fdf7f2 TL |
58 | }, |
59 | ... | |
60 | ] | |
61 | } | |
62 | ||
63 | */ | |
64 | ||
65 | // utility function to convert the args list from string format | |
66 | // (ampresend separated with equal sign) to prased structure | |
67 | RGWHTTPArgs string_to_args(const std::string& str_args) { | |
68 | RGWHTTPArgs args; | |
69 | args.set(str_args); | |
70 | args.parse(); | |
71 | return args; | |
72 | } | |
73 | ||
eafe8130 TL |
74 | struct PSSubConfig { |
75 | std::string name; | |
76 | std::string topic; | |
77 | std::string push_endpoint_name; | |
78 | std::string push_endpoint_args; | |
79 | std::string data_bucket_name; | |
80 | std::string data_oid_prefix; | |
81 | std::string s3_id; | |
82 | std::string arn_topic; | |
11fdf7f2 TL |
83 | RGWPubSubEndpoint::Ptr push_endpoint; |
84 | ||
11fdf7f2 TL |
85 | void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc) { |
86 | name = uc.name; | |
87 | topic = uc.topic; | |
88 | push_endpoint_name = uc.dest.push_endpoint; | |
89 | data_bucket_name = uc.dest.bucket_name; | |
90 | data_oid_prefix = uc.dest.oid_prefix; | |
eafe8130 TL |
91 | s3_id = uc.s3_id; |
92 | arn_topic = uc.dest.arn_topic; | |
93 | if (!push_endpoint_name.empty()) { | |
11fdf7f2 TL |
94 | push_endpoint_args = uc.dest.push_endpoint_args; |
95 | try { | |
eafe8130 TL |
96 | push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct); |
97 | ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl; | |
11fdf7f2 | 98 | } catch (const RGWPubSubEndpoint::configuration_error& e) { |
eafe8130 | 99 | ldout(cct, 1) << "ERROR: failed to create push endpoint: " |
11fdf7f2 TL |
100 | << push_endpoint_name << " due to: " << e.what() << dendl; |
101 | } | |
102 | } | |
103 | } | |
104 | ||
105 | void dump(Formatter *f) const { | |
106 | encode_json("name", name, f); | |
107 | encode_json("topic", topic, f); | |
108 | encode_json("push_endpoint", push_endpoint_name, f); | |
eafe8130 | 109 | encode_json("push_endpoint_args", push_endpoint_args, f); |
11fdf7f2 TL |
110 | encode_json("data_bucket_name", data_bucket_name, f); |
111 | encode_json("data_oid_prefix", data_oid_prefix, f); | |
eafe8130 | 112 | encode_json("s3_id", s3_id, f); |
11fdf7f2 TL |
113 | } |
114 | ||
115 | void init(CephContext *cct, const JSONFormattable& config, | |
116 | const string& data_bucket_prefix, | |
117 | const string& default_oid_prefix) { | |
118 | name = config["name"]; | |
119 | topic = config["topic"]; | |
120 | push_endpoint_name = config["push_endpoint"]; | |
121 | string default_bucket_name = data_bucket_prefix + name; | |
122 | data_bucket_name = config["data_bucket"](default_bucket_name.c_str()); | |
123 | data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str()); | |
eafe8130 TL |
124 | s3_id = config["s3_id"]; |
125 | arn_topic = config["arn_topic"]; | |
11fdf7f2 TL |
126 | if (!push_endpoint_name.empty()) { |
127 | push_endpoint_args = config["push_endpoint_args"]; | |
128 | try { | |
eafe8130 TL |
129 | push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct); |
130 | ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl; | |
11fdf7f2 | 131 | } catch (const RGWPubSubEndpoint::configuration_error& e) { |
eafe8130 | 132 | ldout(cct, 1) << "ERROR: failed to create push endpoint: " |
11fdf7f2 TL |
133 | << push_endpoint_name << " due to: " << e.what() << dendl; |
134 | } | |
135 | } | |
136 | } | |
137 | }; | |
138 | ||
139 | using PSSubConfigRef = std::shared_ptr<PSSubConfig>; | |
140 | ||
141 | struct PSTopicConfig { | |
eafe8130 TL |
142 | std::string name; |
143 | std::set<std::string> subs; | |
11fdf7f2 TL |
144 | |
145 | void dump(Formatter *f) const { | |
146 | encode_json("name", name, f); | |
147 | encode_json("subs", subs, f); | |
148 | } | |
149 | }; | |
150 | ||
151 | struct PSNotificationConfig { | |
152 | uint64_t id{0}; | |
153 | string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */ | |
154 | string topic; | |
155 | bool is_prefix{false}; | |
156 | ||
157 | ||
158 | void dump(Formatter *f) const { | |
159 | encode_json("id", id, f); | |
160 | encode_json("path", path, f); | |
161 | encode_json("topic", topic, f); | |
162 | encode_json("is_prefix", is_prefix, f); | |
163 | } | |
164 | ||
165 | void init(CephContext *cct, const JSONFormattable& config) { | |
166 | path = config["path"]; | |
167 | if (!path.empty() && path[path.size() - 1] == '*') { | |
168 | path = path.substr(0, path.size() - 1); | |
169 | is_prefix = true; | |
170 | } | |
171 | topic = config["topic"]; | |
172 | } | |
173 | }; | |
174 | ||
175 | template<class T> | |
176 | static string json_str(const char *name, const T& obj, bool pretty = false) | |
177 | { | |
178 | stringstream ss; | |
179 | JSONFormatter f(pretty); | |
180 | ||
181 | encode_json(name, obj, &f); | |
182 | f.flush(ss); | |
183 | ||
184 | return ss.str(); | |
185 | } | |
186 | ||
187 | using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>; | |
eafe8130 | 188 | using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>; |
11fdf7f2 TL |
189 | |
190 | struct PSConfig { | |
191 | string id{"pubsub"}; | |
192 | rgw_user user; | |
193 | string data_bucket_prefix; | |
194 | string data_oid_prefix; | |
195 | ||
196 | int events_retention_days{0}; | |
197 | ||
198 | uint64_t sync_instance{0}; | |
199 | uint64_t max_id{0}; | |
200 | ||
201 | /* FIXME: no hard coded buckets, we'll have configurable topics */ | |
eafe8130 TL |
202 | std::map<std::string, PSSubConfigRef> subs; |
203 | std::map<std::string, PSTopicConfigRef> topics; | |
204 | std::multimap<std::string, PSNotificationConfig> notifications; | |
205 | ||
206 | bool start_with_full_sync{false}; | |
11fdf7f2 TL |
207 | |
208 | void dump(Formatter *f) const { | |
209 | encode_json("id", id, f); | |
210 | encode_json("user", user, f); | |
211 | encode_json("data_bucket_prefix", data_bucket_prefix, f); | |
212 | encode_json("data_oid_prefix", data_oid_prefix, f); | |
213 | encode_json("events_retention_days", events_retention_days, f); | |
214 | encode_json("sync_instance", sync_instance, f); | |
215 | encode_json("max_id", max_id, f); | |
216 | { | |
217 | Formatter::ArraySection section(*f, "subs"); | |
218 | for (auto& sub : subs) { | |
219 | encode_json("sub", *sub.second, f); | |
220 | } | |
221 | } | |
222 | { | |
223 | Formatter::ArraySection section(*f, "topics"); | |
224 | for (auto& topic : topics) { | |
225 | encode_json("topic", *topic.second, f); | |
226 | } | |
227 | } | |
228 | { | |
229 | Formatter::ObjectSection section(*f, "notifications"); | |
230 | string last; | |
231 | for (auto& notif : notifications) { | |
232 | const string& n = notif.first; | |
233 | if (n != last) { | |
234 | if (!last.empty()) { | |
235 | f->close_section(); | |
236 | } | |
237 | f->open_array_section(n.c_str()); | |
238 | } | |
239 | last = n; | |
240 | encode_json("notifications", notif.second, f); | |
241 | } | |
242 | if (!last.empty()) { | |
243 | f->close_section(); | |
244 | } | |
245 | } | |
eafe8130 | 246 | encode_json("start_with_full_sync", start_with_full_sync, f); |
11fdf7f2 TL |
247 | } |
248 | ||
249 | void init(CephContext *cct, const JSONFormattable& config) { | |
250 | string uid = config["uid"]("pubsub"); | |
251 | user = rgw_user(config["tenant"], uid); | |
252 | data_bucket_prefix = config["data_bucket_prefix"]("pubsub-"); | |
253 | data_oid_prefix = config["data_oid_prefix"]; | |
254 | events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT); | |
255 | ||
256 | for (auto& c : config["notifications"].array()) { | |
257 | PSNotificationConfig nc; | |
258 | nc.id = ++max_id; | |
259 | nc.init(cct, c); | |
260 | notifications.insert(std::make_pair(nc.path, nc)); | |
261 | ||
262 | PSTopicConfig topic_config = { .name = nc.topic }; | |
263 | topics[nc.topic] = make_shared<PSTopicConfig>(topic_config); | |
264 | } | |
265 | for (auto& c : config["subscriptions"].array()) { | |
266 | auto sc = std::make_shared<PSSubConfig>(); | |
267 | sc->init(cct, c, data_bucket_prefix, data_oid_prefix); | |
268 | subs[sc->name] = sc; | |
269 | auto iter = topics.find(sc->topic); | |
270 | if (iter != topics.end()) { | |
271 | iter->second->subs.insert(sc->name); | |
272 | } | |
273 | } | |
eafe8130 | 274 | start_with_full_sync = config["start_with_full_sync"](false); |
11fdf7f2 TL |
275 | |
276 | ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl; | |
277 | } | |
278 | ||
279 | void init_instance(const RGWRealm& realm, uint64_t instance_id) { | |
280 | sync_instance = instance_id; | |
281 | } | |
282 | ||
283 | void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) { | |
284 | string path = bucket.name + "/" + key.name; | |
285 | ||
286 | auto iter = notifications.upper_bound(path); | |
287 | if (iter == notifications.begin()) { | |
288 | return; | |
289 | } | |
290 | ||
291 | do { | |
292 | --iter; | |
293 | if (iter->first.size() > path.size()) { | |
294 | break; | |
295 | } | |
296 | if (path.compare(0, iter->first.size(), iter->first) != 0) { | |
297 | break; | |
298 | } | |
299 | ||
300 | PSNotificationConfig& target = iter->second; | |
301 | ||
302 | if (!target.is_prefix && | |
303 | path.size() != iter->first.size()) { | |
304 | continue; | |
305 | } | |
306 | ||
307 | auto topic = topics.find(target.topic); | |
308 | if (topic == topics.end()) { | |
309 | continue; | |
310 | } | |
311 | ||
eafe8130 TL |
312 | ldout(cct, 20) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << |
313 | " target_path=" << target.path << ", topic=" << target.topic << dendl; | |
11fdf7f2 TL |
314 | (*result)->push_back(topic->second); |
315 | } while (iter != notifications.begin()); | |
316 | } | |
317 | ||
318 | bool find_sub(const string& name, PSSubConfigRef *ref) { | |
319 | auto iter = subs.find(name); | |
320 | if (iter != subs.end()) { | |
321 | *ref = iter->second; | |
322 | return true; | |
323 | } | |
324 | return false; | |
325 | } | |
326 | }; | |
327 | ||
11fdf7f2 | 328 | using PSConfigRef = std::shared_ptr<PSConfig>; |
eafe8130 TL |
329 | template<typename EventType> |
330 | using EventRef = std::shared_ptr<EventType>; | |
11fdf7f2 TL |
331 | |
332 | struct objstore_event { | |
333 | string id; | |
334 | const rgw_bucket& bucket; | |
335 | const rgw_obj_key& key; | |
336 | const ceph::real_time& mtime; | |
337 | const std::vector<std::pair<std::string, std::string> > *attrs; | |
338 | ||
339 | objstore_event(const rgw_bucket& _bucket, | |
340 | const rgw_obj_key& _key, | |
341 | const ceph::real_time& _mtime, | |
342 | const std::vector<std::pair<std::string, std::string> > *_attrs) : bucket(_bucket), | |
343 | key(_key), | |
344 | mtime(_mtime), | |
345 | attrs(_attrs) {} | |
346 | ||
347 | string get_hash() { | |
348 | string etag; | |
349 | RGWMD5Etag hash; | |
350 | hash.update(bucket.bucket_id); | |
351 | hash.update(key.name); | |
352 | hash.update(key.instance); | |
353 | hash.finish(&etag); | |
354 | ||
355 | assert(etag.size() > 8); | |
356 | ||
357 | return etag.substr(0, 8); | |
358 | } | |
359 | ||
360 | void dump(Formatter *f) const { | |
361 | { | |
362 | Formatter::ObjectSection s(*f, "bucket"); | |
363 | encode_json("name", bucket.name, f); | |
364 | encode_json("tenant", bucket.tenant, f); | |
365 | encode_json("bucket_id", bucket.bucket_id, f); | |
366 | } | |
367 | { | |
368 | Formatter::ObjectSection s(*f, "key"); | |
369 | encode_json("name", key.name, f); | |
370 | encode_json("instance", key.instance, f); | |
371 | } | |
372 | utime_t mt(mtime); | |
373 | encode_json("mtime", mt, f); | |
374 | Formatter::ObjectSection s(*f, "attrs"); | |
375 | if (attrs) { | |
376 | for (auto& attr : *attrs) { | |
377 | encode_json(attr.first.c_str(), attr.second.c_str(), f); | |
378 | } | |
379 | } | |
380 | } | |
381 | }; | |
382 | ||
383 | static void make_event_ref(CephContext *cct, const rgw_bucket& bucket, | |
384 | const rgw_obj_key& key, | |
385 | const ceph::real_time& mtime, | |
386 | const std::vector<std::pair<std::string, std::string> > *attrs, | |
eafe8130 TL |
387 | rgw::notify::EventType event_type, |
388 | EventRef<rgw_pubsub_event> *event) { | |
11fdf7f2 TL |
389 | *event = std::make_shared<rgw_pubsub_event>(); |
390 | ||
eafe8130 TL |
391 | EventRef<rgw_pubsub_event>& e = *event; |
392 | e->event_name = rgw::notify::to_ceph_string(event_type); | |
11fdf7f2 TL |
393 | e->source = bucket.name + "/" + key.name; |
394 | e->timestamp = real_clock::now(); | |
395 | ||
396 | objstore_event oevent(bucket, key, mtime, attrs); | |
397 | ||
eafe8130 TL |
398 | const utime_t ts(e->timestamp); |
399 | set_event_id(e->id, oevent.get_hash(), ts); | |
11fdf7f2 TL |
400 | |
401 | encode_json("info", oevent, &e->info); | |
402 | } | |
403 | ||
eafe8130 TL |
404 | static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket, |
405 | const rgw_user& owner, | |
406 | const rgw_obj_key& key, | |
407 | const ceph::real_time& mtime, | |
408 | const std::vector<std::pair<std::string, std::string> > *attrs, | |
409 | rgw::notify::EventType event_type, | |
410 | EventRef<rgw_pubsub_s3_record> *record) { | |
411 | *record = std::make_shared<rgw_pubsub_s3_record>(); | |
412 | ||
413 | EventRef<rgw_pubsub_s3_record>& r = *record; | |
eafe8130 TL |
414 | r->eventTime = mtime; |
415 | r->eventName = rgw::notify::to_string(event_type); | |
92f5a8d4 TL |
416 | // userIdentity: not supported in sync module |
417 | // x_amz_request_id: not supported in sync module | |
418 | // x_amz_id_2: not supported in sync module | |
eafe8130 TL |
419 | // configurationId is filled from subscription configuration |
420 | r->bucket_name = bucket.name; | |
421 | r->bucket_ownerIdentity = owner.to_str(); | |
422 | r->bucket_arn = to_string(rgw::ARN(bucket)); | |
423 | r->bucket_id = bucket.bucket_id; // rgw extension | |
424 | r->object_key = key.name; | |
92f5a8d4 | 425 | // object_size not supported in sync module |
eafe8130 TL |
426 | objstore_event oevent(bucket, key, mtime, attrs); |
427 | r->object_etag = oevent.get_hash(); | |
428 | r->object_versionId = key.instance; | |
429 | ||
430 | // use timestamp as per key sequence id (hex encoded) | |
431 | const utime_t ts(real_clock::now()); | |
432 | boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), | |
433 | std::back_inserter(r->object_sequencer)); | |
434 | ||
eafe8130 TL |
435 | set_event_id(r->id, r->object_etag, ts); |
436 | } | |
437 | ||
11fdf7f2 TL |
438 | class PSManager; |
439 | using PSManagerRef = std::shared_ptr<PSManager>; | |
440 | ||
441 | struct PSEnv { | |
442 | PSConfigRef conf; | |
443 | shared_ptr<RGWUserInfo> data_user_info; | |
444 | PSManagerRef manager; | |
445 | ||
446 | PSEnv() : conf(make_shared<PSConfig>()), | |
447 | data_user_info(make_shared<RGWUserInfo>()) {} | |
448 | ||
449 | void init(CephContext *cct, const JSONFormattable& config) { | |
450 | conf->init(cct, config); | |
451 | } | |
452 | ||
453 | void init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr); | |
454 | }; | |
455 | ||
456 | using PSEnvRef = std::shared_ptr<PSEnv>; | |
457 | ||
eafe8130 | 458 | template<typename EventType> |
11fdf7f2 | 459 | class PSEvent { |
eafe8130 | 460 | const EventRef<EventType> event; |
11fdf7f2 TL |
461 | |
462 | public: | |
eafe8130 | 463 | PSEvent(const EventRef<EventType>& _event) : event(_event) {} |
11fdf7f2 TL |
464 | |
465 | void format(bufferlist *bl) const { | |
92f5a8d4 | 466 | bl->append(json_str("", *event)); |
11fdf7f2 TL |
467 | } |
468 | ||
469 | void encode_event(bufferlist& bl) const { | |
470 | encode(*event, bl); | |
471 | } | |
472 | ||
473 | const string& id() const { | |
474 | return event->id; | |
475 | } | |
476 | }; | |
477 | ||
478 | template <class T> | |
479 | class RGWSingletonCR : public RGWCoroutine { | |
480 | friend class WrapperCR; | |
481 | ||
482 | boost::asio::coroutine wrapper_state; | |
483 | bool started{false}; | |
484 | int operate_ret{0}; | |
485 | ||
486 | struct WaiterInfo { | |
487 | RGWCoroutine *cr{nullptr}; | |
488 | T *result; | |
489 | }; | |
490 | using WaiterInfoRef = std::shared_ptr<WaiterInfo>; | |
491 | ||
492 | deque<WaiterInfoRef> waiters; | |
493 | ||
494 | void add_waiter(RGWCoroutine *cr, T *result) { | |
495 | auto waiter = std::make_shared<WaiterInfo>(); | |
496 | waiter->cr = cr; | |
497 | waiter->result = result; | |
498 | waiters.push_back(waiter); | |
499 | }; | |
500 | ||
501 | bool get_next_waiter(WaiterInfoRef *waiter) { | |
502 | if (waiters.empty()) { | |
503 | waiter->reset(); | |
504 | return false; | |
505 | } | |
506 | ||
507 | *waiter = waiters.front(); | |
508 | waiters.pop_front(); | |
509 | return true; | |
510 | } | |
511 | ||
512 | int operate_wrapper() override { | |
513 | reenter(&wrapper_state) { | |
514 | while (!is_done()) { | |
515 | ldout(cct, 20) << __func__ << "(): operate_wrapper() -> operate()" << dendl; | |
516 | operate_ret = operate(); | |
517 | if (operate_ret < 0) { | |
518 | ldout(cct, 20) << *this << ": operate() returned r=" << operate_ret << dendl; | |
519 | } | |
520 | if (!is_done()) { | |
521 | yield; | |
522 | } | |
523 | } | |
524 | ||
525 | ldout(cct, 20) << __func__ << "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters.size() << " waiters" << dendl; | |
526 | /* we're done, can't yield anymore */ | |
527 | ||
528 | WaiterInfoRef waiter; | |
529 | while (get_next_waiter(&waiter)) { | |
530 | ldout(cct, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl; | |
531 | waiter->cr->set_retcode(retcode); | |
532 | waiter->cr->set_sleeping(false); | |
533 | return_result(waiter->result); | |
534 | put(); | |
535 | } | |
536 | ||
537 | return retcode; | |
538 | } | |
539 | return 0; | |
540 | } | |
541 | ||
542 | virtual void return_result(T *result) {} | |
543 | ||
544 | public: | |
545 | RGWSingletonCR(CephContext *_cct) | |
546 | : RGWCoroutine(_cct) {} | |
547 | ||
548 | int execute(RGWCoroutine *caller, T *result = nullptr) { | |
549 | if (!started) { | |
550 | ldout(cct, 20) << __func__ << "(): singleton not started, starting" << dendl; | |
551 | started = true; | |
552 | caller->call(this); | |
553 | return 0; | |
554 | } else if (!is_done()) { | |
555 | ldout(cct, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl; | |
556 | get(); | |
557 | add_waiter(caller, result); | |
558 | caller->set_sleeping(true); | |
559 | return 0; | |
560 | } | |
561 | ||
562 | ldout(cct, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl; | |
563 | caller->set_retcode(retcode); | |
564 | return_result(result); | |
565 | return retcode; | |
566 | } | |
567 | }; | |
568 | ||
569 | ||
570 | class PSSubscription; | |
571 | using PSSubscriptionRef = std::shared_ptr<PSSubscription>; | |
572 | ||
573 | class PSSubscription { | |
574 | class InitCR; | |
575 | friend class InitCR; | |
576 | friend class RGWPSHandleObjEventCR; | |
577 | ||
578 | RGWDataSyncEnv *sync_env; | |
579 | PSEnvRef env; | |
580 | PSSubConfigRef sub_conf; | |
eafe8130 | 581 | std::shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result; |
11fdf7f2 TL |
582 | RGWBucketInfo *bucket_info{nullptr}; |
583 | RGWDataAccessRef data_access; | |
584 | RGWDataAccess::BucketRef bucket; | |
585 | ||
11fdf7f2 TL |
586 | InitCR *init_cr{nullptr}; |
587 | ||
588 | class InitBucketLifecycleCR : public RGWCoroutine { | |
589 | RGWDataSyncEnv *sync_env; | |
590 | PSConfigRef& conf; | |
591 | LCRule rule; | |
592 | ||
593 | int retention_days; | |
594 | ||
595 | rgw_bucket_lifecycle_config_params lc_config; | |
596 | ||
597 | public: | |
598 | InitBucketLifecycleCR(RGWDataSyncEnv *_sync_env, | |
599 | PSConfigRef& _conf, | |
600 | RGWBucketInfo& _bucket_info, | |
eafe8130 | 601 | std::map<string, bufferlist>& _bucket_attrs) : RGWCoroutine(_sync_env->cct), |
11fdf7f2 TL |
602 | sync_env(_sync_env), |
603 | conf(_conf) { | |
604 | lc_config.bucket_info = _bucket_info; | |
605 | lc_config.bucket_attrs = _bucket_attrs; | |
606 | retention_days = conf->events_retention_days; | |
607 | } | |
608 | ||
609 | int operate() override { | |
610 | reenter(this) { | |
611 | ||
612 | rule.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days); | |
613 | ||
614 | { | |
615 | /* maybe we already have it configured? */ | |
616 | RGWLifecycleConfiguration old_config; | |
617 | auto aiter = lc_config.bucket_attrs.find(RGW_ATTR_LC); | |
618 | if (aiter != lc_config.bucket_attrs.end()) { | |
619 | bufferlist::const_iterator iter{&aiter->second}; | |
620 | try { | |
621 | old_config.decode(iter); | |
622 | } catch (const buffer::error& e) { | |
623 | ldout(cct, 0) << __func__ << "(): decode life cycle config failed" << dendl; | |
624 | } | |
625 | } | |
626 | ||
627 | auto old_rules = old_config.get_rule_map(); | |
628 | for (auto ori : old_rules) { | |
629 | auto& old_rule = ori.second; | |
630 | ||
631 | if (old_rule.get_prefix().empty() && | |
632 | old_rule.get_expiration().get_days() == retention_days && | |
633 | old_rule.is_enabled()) { | |
634 | ldout(sync_env->cct, 20) << "no need to set lifecycle rule on bucketi, existing rule matches config" << dendl; | |
635 | return set_cr_done(); | |
636 | } | |
637 | } | |
638 | } | |
639 | ||
640 | lc_config.config.add_rule(rule); | |
641 | yield call(new RGWBucketLifecycleConfigCR(sync_env->async_rados, | |
642 | sync_env->store, | |
643 | lc_config)); | |
644 | if (retcode < 0) { | |
645 | ldout(sync_env->cct, 0) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl; | |
646 | return set_cr_error(retcode); | |
647 | } | |
648 | ||
649 | return set_cr_done(); | |
650 | } | |
651 | return 0; | |
652 | } | |
653 | }; | |
654 | ||
655 | class InitCR : public RGWSingletonCR<bool> { | |
656 | RGWDataSyncEnv *sync_env; | |
657 | PSSubscriptionRef sub; | |
658 | rgw_get_bucket_info_params get_bucket_info; | |
659 | rgw_bucket_create_local_params create_bucket; | |
660 | PSConfigRef& conf; | |
661 | PSSubConfigRef& sub_conf; | |
662 | int i; | |
663 | ||
664 | public: | |
665 | InitCR(RGWDataSyncEnv *_sync_env, | |
666 | PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sync_env->cct), | |
667 | sync_env(_sync_env), | |
668 | sub(_sub), conf(sub->env->conf), | |
669 | sub_conf(sub->sub_conf) { | |
670 | } | |
671 | ||
672 | int operate() override { | |
673 | reenter(this) { | |
674 | get_bucket_info.tenant = conf->user.tenant; | |
675 | get_bucket_info.bucket_name = sub_conf->data_bucket_name; | |
676 | sub->get_bucket_info_result = make_shared<rgw_get_bucket_info_result>(); | |
677 | ||
678 | for (i = 0; i < 2; ++i) { | |
679 | yield call(new RGWGetBucketInfoCR(sync_env->async_rados, | |
680 | sync_env->store, | |
681 | get_bucket_info, | |
682 | sub->get_bucket_info_result)); | |
683 | if (retcode < 0 && retcode != -ENOENT) { | |
684 | ldout(sync_env->cct, 0) << "ERROR: failed to geting bucket info: " << "tenant=" | |
685 | << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl; | |
686 | } | |
687 | if (retcode == 0) { | |
688 | { | |
689 | auto& result = sub->get_bucket_info_result; | |
690 | sub->bucket_info = &result->bucket_info; | |
691 | ||
692 | int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket); | |
693 | if (ret < 0) { | |
694 | ldout(sync_env->cct, 0) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl; | |
695 | return set_cr_error(ret); | |
696 | } | |
697 | } | |
698 | ||
699 | yield call(new InitBucketLifecycleCR(sync_env, conf, | |
700 | sub->get_bucket_info_result->bucket_info, | |
701 | sub->get_bucket_info_result->attrs)); | |
702 | if (retcode < 0) { | |
703 | ldout(sync_env->cct, 0) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl; | |
704 | return set_cr_error(retcode); | |
705 | } | |
706 | ||
707 | return set_cr_done(); | |
708 | } | |
709 | ||
710 | create_bucket.user_info = sub->env->data_user_info; | |
711 | create_bucket.bucket_name = sub_conf->data_bucket_name; | |
712 | ldout(sync_env->cct, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl; | |
713 | yield call(new RGWBucketCreateLocalCR(sync_env->async_rados, | |
714 | sync_env->store, | |
715 | create_bucket)); | |
716 | if (retcode < 0) { | |
717 | ldout(sync_env->cct, 0) << "ERROR: failed to create bucket: " << "tenant=" | |
718 | << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl; | |
719 | return set_cr_error(retcode); | |
720 | } | |
721 | ||
722 | /* second iteration: we got -ENOENT and created a bucket */ | |
723 | } | |
724 | ||
725 | /* failed twice on -ENOENT, unexpected */ | |
726 | ldout(sync_env->cct, 0) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant | |
727 | << " name=" << get_bucket_info.bucket_name << dendl; | |
728 | return set_cr_error(-EIO); | |
729 | } | |
730 | return 0; | |
731 | } | |
732 | }; | |
733 | ||
eafe8130 | 734 | template<typename EventType> |
11fdf7f2 TL |
735 | class StoreEventCR : public RGWCoroutine { |
736 | RGWDataSyncEnv* const sync_env; | |
737 | const PSSubscriptionRef sub; | |
eafe8130 | 738 | const PSEvent<EventType> pse; |
11fdf7f2 TL |
739 | const string oid_prefix; |
740 | ||
741 | public: | |
742 | StoreEventCR(RGWDataSyncEnv* const _sync_env, | |
743 | const PSSubscriptionRef& _sub, | |
eafe8130 | 744 | const EventRef<EventType>& _event) : RGWCoroutine(_sync_env->cct), |
11fdf7f2 TL |
745 | sync_env(_sync_env), |
746 | sub(_sub), | |
747 | pse(_event), | |
748 | oid_prefix(sub->sub_conf->data_oid_prefix) { | |
749 | } | |
750 | ||
751 | int operate() override { | |
11fdf7f2 TL |
752 | rgw_object_simple_put_params put_obj; |
753 | reenter(this) { | |
754 | ||
755 | put_obj.bucket = sub->bucket; | |
756 | put_obj.key = rgw_obj_key(oid_prefix + pse.id()); | |
757 | ||
758 | pse.format(&put_obj.data); | |
759 | ||
760 | { | |
761 | bufferlist bl; | |
762 | pse.encode_event(bl); | |
763 | bufferlist bl64; | |
764 | bl.encode_base64(bl64); | |
765 | put_obj.user_data = bl64.to_str(); | |
766 | } | |
767 | ||
768 | yield call(new RGWObjectSimplePutCR(sync_env->async_rados, | |
769 | sync_env->store, | |
770 | put_obj)); | |
771 | if (retcode < 0) { | |
eafe8130 | 772 | ldpp_dout(sync_env->dpp, 10) << "failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl; |
11fdf7f2 TL |
773 | return set_cr_error(retcode); |
774 | } else { | |
eafe8130 | 775 | ldpp_dout(sync_env->dpp, 20) << "event stored: " << put_obj.bucket << "/" << put_obj.key << dendl; |
11fdf7f2 TL |
776 | } |
777 | ||
778 | return set_cr_done(); | |
779 | } | |
780 | return 0; | |
781 | } | |
782 | }; | |
783 | ||
eafe8130 | 784 | template<typename EventType> |
11fdf7f2 TL |
785 | class PushEventCR : public RGWCoroutine { |
786 | RGWDataSyncEnv* const sync_env; | |
eafe8130 | 787 | const EventRef<EventType> event; |
11fdf7f2 TL |
788 | const PSSubConfigRef& sub_conf; |
789 | ||
790 | public: | |
791 | PushEventCR(RGWDataSyncEnv* const _sync_env, | |
792 | const PSSubscriptionRef& _sub, | |
eafe8130 | 793 | const EventRef<EventType>& _event) : RGWCoroutine(_sync_env->cct), |
11fdf7f2 TL |
794 | sync_env(_sync_env), |
795 | event(_event), | |
796 | sub_conf(_sub->sub_conf) { | |
797 | } | |
798 | ||
799 | int operate() override { | |
800 | reenter(this) { | |
801 | ceph_assert(sub_conf->push_endpoint); | |
802 | yield call(sub_conf->push_endpoint->send_to_completion_async(*event.get(), sync_env)); | |
803 | ||
804 | if (retcode < 0) { | |
eafe8130 | 805 | ldout(sync_env->cct, 10) << "failed to push event: " << event->id << |
11fdf7f2 | 806 | " to endpoint: " << sub_conf->push_endpoint_name << " ret=" << retcode << dendl; |
11fdf7f2 TL |
807 | return set_cr_error(retcode); |
808 | } | |
809 | ||
eafe8130 | 810 | ldout(sync_env->cct, 20) << "event: " << event->id << |
11fdf7f2 | 811 | " pushed to endpoint: " << sub_conf->push_endpoint_name << dendl; |
11fdf7f2 TL |
812 | return set_cr_done(); |
813 | } | |
814 | return 0; | |
815 | } | |
816 | }; | |
817 | ||
818 | public: | |
819 | PSSubscription(RGWDataSyncEnv *_sync_env, | |
820 | PSEnvRef _env, | |
821 | PSSubConfigRef& _sub_conf) : sync_env(_sync_env), | |
822 | env(_env), | |
823 | sub_conf(_sub_conf), | |
824 | data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {} | |
825 | ||
826 | PSSubscription(RGWDataSyncEnv *_sync_env, | |
827 | PSEnvRef _env, | |
828 | rgw_pubsub_sub_config& user_sub_conf) : sync_env(_sync_env), | |
829 | env(_env), | |
830 | sub_conf(std::make_shared<PSSubConfig>()), | |
831 | data_access(std::make_shared<RGWDataAccess>(sync_env->store)) { | |
832 | sub_conf->from_user_conf(sync_env->cct, user_sub_conf); | |
833 | } | |
834 | virtual ~PSSubscription() { | |
835 | if (init_cr) { | |
836 | init_cr->put(); | |
837 | } | |
838 | } | |
839 | ||
840 | template <class C> | |
841 | static PSSubscriptionRef get_shared(RGWDataSyncEnv *_sync_env, | |
842 | PSEnvRef _env, | |
843 | C& _sub_conf) { | |
844 | auto sub = std::make_shared<PSSubscription>(_sync_env, _env, _sub_conf); | |
845 | sub->init_cr = new InitCR(_sync_env, sub); | |
846 | sub->init_cr->get(); | |
847 | return sub; | |
848 | } | |
849 | ||
850 | int call_init_cr(RGWCoroutine *caller) { | |
851 | return init_cr->execute(caller); | |
852 | } | |
853 | ||
eafe8130 TL |
854 | template<typename EventType> |
855 | static RGWCoroutine *store_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef<EventType>& event) { | |
856 | return new StoreEventCR<EventType>(sync_env, sub, event); | |
11fdf7f2 TL |
857 | } |
858 | ||
eafe8130 TL |
859 | template<typename EventType> |
860 | static RGWCoroutine *push_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef<EventType>& event) { | |
861 | return new PushEventCR<EventType>(sync_env, sub, event); | |
11fdf7f2 TL |
862 | } |
863 | friend class InitCR; | |
864 | }; | |
865 | ||
11fdf7f2 TL |
866 | class PSManager |
867 | { | |
868 | RGWDataSyncEnv *sync_env; | |
869 | PSEnvRef env; | |
870 | ||
eafe8130 | 871 | std::map<string, PSSubscriptionRef> subs; |
11fdf7f2 TL |
872 | |
873 | class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> { | |
874 | RGWDataSyncEnv *sync_env; | |
875 | PSManagerRef mgr; | |
876 | rgw_user owner; | |
877 | string sub_name; | |
878 | string sub_id; | |
879 | PSSubscriptionRef *ref; | |
880 | ||
881 | PSConfigRef conf; | |
882 | ||
883 | PSSubConfigRef sub_conf; | |
884 | rgw_pubsub_sub_config user_sub_conf; | |
eafe8130 | 885 | |
11fdf7f2 TL |
886 | public: |
887 | GetSubCR(RGWDataSyncEnv *_sync_env, | |
888 | PSManagerRef& _mgr, | |
889 | const rgw_user& _owner, | |
890 | const string& _sub_name, | |
891 | PSSubscriptionRef *_ref) : RGWSingletonCR<PSSubscriptionRef>(_sync_env->cct), | |
892 | sync_env(_sync_env), | |
893 | mgr(_mgr), | |
894 | owner(_owner), | |
895 | sub_name(_sub_name), | |
896 | ref(_ref), | |
897 | conf(mgr->env->conf) { | |
898 | } | |
eafe8130 | 899 | ~GetSubCR() { } |
11fdf7f2 TL |
900 | |
901 | int operate() override { | |
902 | reenter(this) { | |
903 | if (owner.empty()) { | |
904 | if (!conf->find_sub(sub_name, &sub_conf)) { | |
eafe8130 | 905 | ldout(sync_env->cct, 10) << "failed to find subscription config: name=" << sub_name << dendl; |
11fdf7f2 TL |
906 | mgr->remove_get_sub(owner, sub_name); |
907 | return set_cr_error(-ENOENT); | |
908 | } | |
909 | ||
910 | *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf); | |
911 | } else { | |
912 | using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>; | |
913 | yield { | |
914 | RGWUserPubSub ups(sync_env->store, owner); | |
915 | rgw_raw_obj obj; | |
916 | ups.get_sub_meta_obj(sub_name, &obj); | |
917 | bool empty_on_enoent = false; | |
918 | call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj, | |
919 | obj, | |
920 | &user_sub_conf, empty_on_enoent)); | |
921 | } | |
922 | if (retcode < 0) { | |
923 | mgr->remove_get_sub(owner, sub_name); | |
924 | return set_cr_error(retcode); | |
925 | } | |
926 | ||
927 | *ref = PSSubscription::get_shared(sync_env, mgr->env, user_sub_conf); | |
928 | } | |
929 | ||
930 | yield (*ref)->call_init_cr(this); | |
931 | if (retcode < 0) { | |
eafe8130 | 932 | ldout(sync_env->cct, 10) << "failed to init subscription" << dendl; |
11fdf7f2 TL |
933 | mgr->remove_get_sub(owner, sub_name); |
934 | return set_cr_error(retcode); | |
935 | } | |
936 | ||
937 | if (owner.empty()) { | |
938 | mgr->subs[sub_name] = *ref; | |
939 | } | |
940 | mgr->remove_get_sub(owner, sub_name); | |
941 | ||
942 | return set_cr_done(); | |
943 | } | |
944 | return 0; | |
945 | } | |
946 | ||
947 | void return_result(PSSubscriptionRef *result) override { | |
948 | ldout(cct, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl; | |
949 | if (retcode >= 0) { | |
950 | *result = *ref; | |
951 | } | |
952 | } | |
953 | }; | |
954 | ||
955 | string sub_id(const rgw_user& owner, const string& sub_name) { | |
956 | string owner_prefix; | |
957 | if (!owner.empty()) { | |
958 | owner_prefix = owner.to_str() + "/"; | |
959 | } | |
960 | ||
961 | return owner_prefix + sub_name; | |
962 | } | |
963 | ||
eafe8130 | 964 | std::map<std::string, GetSubCR *> get_subs; |
11fdf7f2 TL |
965 | |
966 | GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) { | |
967 | return get_subs[sub_id(owner, name)]; | |
968 | } | |
969 | ||
970 | void remove_get_sub(const rgw_user& owner, const string& name) { | |
971 | get_subs.erase(sub_id(owner, name)); | |
972 | } | |
973 | ||
974 | bool find_sub_instance(const rgw_user& owner, const string& sub_name, PSSubscriptionRef *sub) { | |
975 | auto iter = subs.find(sub_id(owner, sub_name)); | |
976 | if (iter != subs.end()) { | |
977 | *sub = iter->second; | |
978 | return true; | |
979 | } | |
980 | return false; | |
981 | } | |
982 | ||
983 | PSManager(RGWDataSyncEnv *_sync_env, | |
984 | PSEnvRef _env) : sync_env(_sync_env), | |
985 | env(_env) {} | |
986 | ||
987 | public: | |
988 | static PSManagerRef get_shared(RGWDataSyncEnv *_sync_env, | |
989 | PSEnvRef _env) { | |
990 | return std::shared_ptr<PSManager>(new PSManager(_sync_env, _env)); | |
991 | } | |
992 | ||
993 | static int call_get_subscription_cr(RGWDataSyncEnv *sync_env, PSManagerRef& mgr, | |
994 | RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) { | |
995 | if (mgr->find_sub_instance(owner, sub_name, ref)) { | |
996 | /* found it! nothing to execute */ | |
997 | ldout(sync_env->cct, 20) << __func__ << "(): found sub instance" << dendl; | |
998 | } | |
999 | auto& gs = mgr->get_get_subs(owner, sub_name); | |
1000 | if (!gs) { | |
1001 | ldout(sync_env->cct, 20) << __func__ << "(): first get subs" << dendl; | |
1002 | gs = new GetSubCR(sync_env, mgr, owner, sub_name, ref); | |
1003 | } | |
1004 | ldout(sync_env->cct, 20) << __func__ << "(): executing get subs" << dendl; | |
1005 | return gs->execute(caller, ref); | |
1006 | } | |
1007 | ||
1008 | friend class GetSubCR; | |
1009 | }; | |
1010 | ||
1011 | void PSEnv::init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr) { | |
1012 | manager = mgr; | |
1013 | conf->init_instance(realm, instance_id); | |
1014 | } | |
1015 | ||
1016 | class RGWPSInitEnvCBCR : public RGWCoroutine { | |
1017 | RGWDataSyncEnv *sync_env; | |
1018 | PSEnvRef env; | |
1019 | PSConfigRef& conf; | |
1020 | ||
1021 | rgw_user_create_params create_user; | |
1022 | rgw_get_user_info_params get_user_info; | |
1023 | public: | |
1024 | RGWPSInitEnvCBCR(RGWDataSyncEnv *_sync_env, | |
1025 | PSEnvRef& _env) : RGWCoroutine(_sync_env->cct), | |
1026 | sync_env(_sync_env), | |
1027 | env(_env), conf(env->conf) {} | |
1028 | int operate() override { | |
1029 | reenter(this) { | |
1030 | ldout(sync_env->cct, 0) << ": init pubsub config zone=" << sync_env->source_zone << dendl; | |
1031 | ||
1032 | /* nothing to do here right now */ | |
1033 | create_user.user = conf->user; | |
1034 | create_user.max_buckets = 0; /* unlimited */ | |
1035 | create_user.display_name = "pubsub"; | |
1036 | create_user.generate_key = false; | |
1037 | yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user)); | |
1038 | if (retcode < 0) { | |
1039 | ldout(sync_env->store->ctx(), 0) << "ERROR: failed to create rgw user: ret=" << retcode << dendl; | |
1040 | return set_cr_error(retcode); | |
1041 | } | |
1042 | ||
1043 | get_user_info.user = conf->user; | |
1044 | yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info)); | |
1045 | if (retcode < 0) { | |
1046 | ldout(sync_env->store->ctx(), 0) << "ERROR: failed to create rgw user: ret=" << retcode << dendl; | |
1047 | return set_cr_error(retcode); | |
1048 | } | |
1049 | ||
1050 | ldout(sync_env->cct, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env->data_user_info, true) << dendl; | |
1051 | ||
1052 | ||
1053 | return set_cr_done(); | |
1054 | } | |
1055 | return 0; | |
1056 | } | |
1057 | }; | |
1058 | ||
eafe8130 TL |
1059 | bool match(const rgw_pubsub_topic_filter& filter, const std::string& key_name, rgw::notify::EventType event_type) { |
1060 | if (!match(filter.events, event_type)) { | |
1061 | return false; | |
1062 | } | |
1063 | if (!match(filter.s3_filter.key_filter, key_name)) { | |
1064 | return false; | |
1065 | } | |
1066 | return true; | |
1067 | } | |
1068 | ||
11fdf7f2 TL |
1069 | class RGWPSFindBucketTopicsCR : public RGWCoroutine { |
1070 | RGWDataSyncEnv *sync_env; | |
1071 | PSEnvRef env; | |
1072 | rgw_user owner; | |
1073 | rgw_bucket bucket; | |
1074 | rgw_obj_key key; | |
eafe8130 | 1075 | rgw::notify::EventType event_type; |
11fdf7f2 TL |
1076 | |
1077 | RGWUserPubSub ups; | |
1078 | ||
1079 | rgw_raw_obj bucket_obj; | |
1080 | rgw_raw_obj user_obj; | |
1081 | rgw_pubsub_bucket_topics bucket_topics; | |
1082 | rgw_pubsub_user_topics user_topics; | |
1083 | TopicsRef *topics; | |
1084 | public: | |
1085 | RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env, | |
1086 | PSEnvRef& _env, | |
1087 | const rgw_user& _owner, | |
1088 | const rgw_bucket& _bucket, | |
1089 | const rgw_obj_key& _key, | |
eafe8130 | 1090 | rgw::notify::EventType _event_type, |
11fdf7f2 TL |
1091 | TopicsRef *_topics) : RGWCoroutine(_sync_env->cct), |
1092 | sync_env(_sync_env), | |
1093 | env(_env), | |
1094 | owner(_owner), | |
1095 | bucket(_bucket), | |
1096 | key(_key), | |
eafe8130 | 1097 | event_type(_event_type), |
11fdf7f2 TL |
1098 | ups(_sync_env->store, owner), |
1099 | topics(_topics) { | |
1100 | *topics = std::make_shared<vector<PSTopicConfigRef> >(); | |
1101 | } | |
1102 | int operate() override { | |
1103 | reenter(this) { | |
1104 | ups.get_bucket_meta_obj(bucket, &bucket_obj); | |
1105 | ups.get_user_meta_obj(&user_obj); | |
1106 | ||
1107 | using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>; | |
1108 | yield { | |
1109 | bool empty_on_enoent = true; | |
1110 | call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj, | |
1111 | bucket_obj, | |
1112 | &bucket_topics, empty_on_enoent)); | |
1113 | } | |
1114 | if (retcode < 0 && retcode != -ENOENT) { | |
1115 | return set_cr_error(retcode); | |
1116 | } | |
1117 | ||
1118 | ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl; | |
1119 | ||
1120 | if (!bucket_topics.topics.empty()) { | |
1121 | using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>; | |
1122 | yield { | |
1123 | bool empty_on_enoent = true; | |
1124 | call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj, | |
1125 | user_obj, | |
1126 | &user_topics, empty_on_enoent)); | |
1127 | } | |
1128 | if (retcode < 0 && retcode != -ENOENT) { | |
1129 | return set_cr_error(retcode); | |
1130 | } | |
1131 | } | |
1132 | ||
1133 | for (auto& titer : bucket_topics.topics) { | |
1134 | auto& topic_filter = titer.second; | |
1135 | auto& info = topic_filter.topic; | |
eafe8130 | 1136 | if (!match(topic_filter, key.name, event_type)) { |
11fdf7f2 TL |
1137 | continue; |
1138 | } | |
eafe8130 | 1139 | std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>(); |
11fdf7f2 TL |
1140 | tc->name = info.name; |
1141 | tc->subs = user_topics.topics[info.name].subs; | |
1142 | (*topics)->push_back(tc); | |
1143 | } | |
1144 | ||
1145 | env->conf->get_topics(sync_env->cct, bucket, key, topics); | |
1146 | return set_cr_done(); | |
1147 | } | |
1148 | return 0; | |
1149 | } | |
1150 | }; | |
1151 | ||
1152 | class RGWPSHandleObjEventCR : public RGWCoroutine { | |
1153 | RGWDataSyncEnv* const sync_env; | |
1154 | const PSEnvRef env; | |
1155 | const rgw_user& owner; | |
eafe8130 TL |
1156 | const EventRef<rgw_pubsub_event> event; |
1157 | const EventRef<rgw_pubsub_s3_record> record; | |
11fdf7f2 TL |
1158 | const TopicsRef topics; |
1159 | const std::array<rgw_user, 2> owners; | |
1160 | bool has_subscriptions; | |
1161 | bool event_handled; | |
1162 | bool sub_conf_found; | |
1163 | PSSubscriptionRef sub; | |
1164 | std::array<rgw_user, 2>::const_iterator oiter; | |
eafe8130 TL |
1165 | std::vector<PSTopicConfigRef>::const_iterator titer; |
1166 | std::set<string>::const_iterator siter; | |
11fdf7f2 TL |
1167 | int last_sub_conf_error; |
1168 | ||
1169 | public: | |
1170 | RGWPSHandleObjEventCR(RGWDataSyncEnv* const _sync_env, | |
1171 | const PSEnvRef _env, | |
1172 | const rgw_user& _owner, | |
eafe8130 TL |
1173 | const EventRef<rgw_pubsub_event>& _event, |
1174 | const EventRef<rgw_pubsub_s3_record>& _record, | |
11fdf7f2 TL |
1175 | const TopicsRef& _topics) : RGWCoroutine(_sync_env->cct), |
1176 | sync_env(_sync_env), | |
1177 | env(_env), | |
1178 | owner(_owner), | |
1179 | event(_event), | |
eafe8130 | 1180 | record(_record), |
11fdf7f2 TL |
1181 | topics(_topics), |
1182 | owners({owner, rgw_user{}}), | |
1183 | has_subscriptions(false), | |
1184 | event_handled(false) {} | |
1185 | ||
1186 | int operate() override { | |
1187 | reenter(this) { | |
eafe8130 | 1188 | ldout(sync_env->cct, 20) << ": handle event: obj: z=" << sync_env->source_zone |
11fdf7f2 TL |
1189 | << " event=" << json_str("event", *event, false) |
1190 | << " owner=" << owner << dendl; | |
1191 | ||
1192 | ldout(sync_env->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl; | |
eafe8130 TL |
1193 | |
1194 | // outside caller should check that | |
1195 | ceph_assert(!topics->empty()); | |
11fdf7f2 TL |
1196 | |
1197 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered); | |
1198 | ||
eafe8130 | 1199 | // loop over all topics related to the bucket/object |
11fdf7f2 | 1200 | for (titer = topics->begin(); titer != topics->end(); ++titer) { |
eafe8130 | 1201 | ldout(sync_env->cct, 20) << ": notification for " << event->source << ": topic=" << |
11fdf7f2 | 1202 | (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl; |
eafe8130 | 1203 | // loop over all subscriptions of the topic |
11fdf7f2 | 1204 | for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) { |
eafe8130 | 1205 | ldout(sync_env->cct, 20) << ": subscription: " << *siter << dendl; |
11fdf7f2 TL |
1206 | has_subscriptions = true; |
1207 | sub_conf_found = false; | |
eafe8130 TL |
1208 | // try to read subscription configuration from global/user cond |
1209 | // configuration is considered missing only if does not exist in either | |
11fdf7f2 | 1210 | for (oiter = owners.begin(); oiter != owners.end(); ++oiter) { |
11fdf7f2 TL |
1211 | yield PSManager::call_get_subscription_cr(sync_env, env->manager, this, *oiter, *siter, &sub); |
1212 | if (retcode < 0) { | |
eafe8130 TL |
1213 | if (sub_conf_found) { |
1214 | // not a real issue, sub conf already found | |
1215 | retcode = 0; | |
1216 | } | |
11fdf7f2 TL |
1217 | last_sub_conf_error = retcode; |
1218 | continue; | |
1219 | } | |
1220 | sub_conf_found = true; | |
eafe8130 TL |
1221 | if (sub->sub_conf->s3_id.empty()) { |
1222 | // subscription was not made by S3 compatible API | |
1223 | ldout(sync_env->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; | |
1224 | yield call(PSSubscription::store_event_cr(sync_env, sub, event)); | |
1225 | if (retcode < 0) { | |
1226 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); | |
1227 | ldout(sync_env->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl; | |
1228 | } else { | |
1229 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); | |
1230 | event_handled = true; | |
1231 | } | |
1232 | if (sub->sub_conf->push_endpoint) { | |
11fdf7f2 | 1233 | ldout(sync_env->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; |
eafe8130 TL |
1234 | yield call(PSSubscription::push_event_cr(sync_env, sub, event)); |
1235 | if (retcode < 0) { | |
1236 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); | |
1237 | ldout(sync_env->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl; | |
1238 | } else { | |
1239 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); | |
1240 | event_handled = true; | |
1241 | } | |
1242 | } | |
1243 | } else { | |
1244 | // subscription was made by S3 compatible API | |
1245 | ldout(sync_env->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; | |
1246 | record->configurationId = sub->sub_conf->s3_id; | |
1247 | yield call(PSSubscription::store_event_cr(sync_env, sub, record)); | |
11fdf7f2 | 1248 | if (retcode < 0) { |
eafe8130 TL |
1249 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); |
1250 | ldout(sync_env->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl; | |
11fdf7f2 | 1251 | } else { |
eafe8130 | 1252 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); |
11fdf7f2 TL |
1253 | event_handled = true; |
1254 | } | |
eafe8130 TL |
1255 | if (sub->sub_conf->push_endpoint) { |
1256 | ldout(sync_env->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl; | |
1257 | yield call(PSSubscription::push_event_cr(sync_env, sub, record)); | |
1258 | if (retcode < 0) { | |
1259 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); | |
1260 | ldout(sync_env->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl; | |
1261 | } else { | |
1262 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); | |
1263 | event_handled = true; | |
1264 | } | |
1265 | } | |
11fdf7f2 TL |
1266 | } |
1267 | } | |
1268 | if (!sub_conf_found) { | |
1269 | // could not find conf for subscription at user or global levels | |
eafe8130 TL |
1270 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf); |
1271 | ldout(sync_env->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter | |
11fdf7f2 | 1272 | << " ret=" << last_sub_conf_error << dendl; |
eafe8130 TL |
1273 | if (retcode == -ENOENT) { |
1274 | // missing subscription info should be reflected back as invalid argument | |
1275 | // and not as missing object | |
1276 | retcode = -EINVAL; | |
1277 | } | |
11fdf7f2 TL |
1278 | } |
1279 | } | |
1280 | } | |
1281 | if (has_subscriptions && !event_handled) { | |
1282 | // event is considered "lost" of it has subscriptions on any of its topics | |
1283 | // but it was not stored in, or pushed to, any of them | |
1284 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost); | |
1285 | } | |
1286 | if (retcode < 0) { | |
1287 | return set_cr_error(retcode); | |
1288 | } | |
1289 | return set_cr_done(); | |
1290 | } | |
1291 | return 0; | |
1292 | } | |
1293 | }; | |
1294 | ||
eafe8130 | 1295 | // coroutine invoked on remote object creation |
11fdf7f2 TL |
1296 | class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { |
1297 | RGWDataSyncEnv *sync_env; | |
1298 | PSEnvRef env; | |
1299 | std::optional<uint64_t> versioned_epoch; | |
eafe8130 TL |
1300 | EventRef<rgw_pubsub_event> event; |
1301 | EventRef<rgw_pubsub_s3_record> record; | |
11fdf7f2 TL |
1302 | TopicsRef topics; |
1303 | public: | |
1304 | RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, | |
1305 | RGWBucketInfo& _bucket_info, rgw_obj_key& _key, | |
1306 | PSEnvRef _env, std::optional<uint64_t> _versioned_epoch, | |
1307 | TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), | |
1308 | sync_env(_sync_env), | |
1309 | env(_env), | |
1310 | versioned_epoch(_versioned_epoch), | |
1311 | topics(_topics) { | |
1312 | } | |
1313 | int operate() override { | |
1314 | reenter(this) { | |
eafe8130 | 1315 | ldout(sync_env->cct, 20) << ": stat of remote obj: z=" << sync_env->source_zone |
11fdf7f2 TL |
1316 | << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime |
1317 | << " attrs=" << attrs << dendl; | |
1318 | { | |
1319 | std::vector<std::pair<std::string, std::string> > attrs; | |
1320 | for (auto& attr : attrs) { | |
1321 | string k = attr.first; | |
1322 | if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) { | |
1323 | k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1); | |
1324 | } | |
1325 | attrs.push_back(std::make_pair(k, attr.second)); | |
eafe8130 TL |
1326 | } |
1327 | // at this point we don't know whether we need the ceph event or S3 record | |
1328 | // this is why both are created here, once we have information about the | |
1329 | // subscription, we will store/push only the relevant ones | |
11fdf7f2 TL |
1330 | make_event_ref(sync_env->cct, |
1331 | bucket_info.bucket, key, | |
1332 | mtime, &attrs, | |
eafe8130 TL |
1333 | rgw::notify::ObjectCreated, &event); |
1334 | make_s3_record_ref(sync_env->cct, | |
1335 | bucket_info.bucket, bucket_info.owner, key, | |
1336 | mtime, &attrs, | |
1337 | rgw::notify::ObjectCreated, &record); | |
11fdf7f2 TL |
1338 | } |
1339 | ||
eafe8130 | 1340 | yield call(new RGWPSHandleObjEventCR(sync_env, env, bucket_info.owner, event, record, topics)); |
11fdf7f2 TL |
1341 | if (retcode < 0) { |
1342 | return set_cr_error(retcode); | |
1343 | } | |
1344 | return set_cr_done(); | |
1345 | } | |
1346 | return 0; | |
1347 | } | |
1348 | }; | |
1349 | ||
1350 | class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { | |
1351 | PSEnvRef env; | |
1352 | std::optional<uint64_t> versioned_epoch; | |
1353 | TopicsRef topics; | |
1354 | public: | |
1355 | RGWPSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, | |
1356 | RGWBucketInfo& _bucket_info, rgw_obj_key& _key, | |
1357 | PSEnvRef _env, std::optional<uint64_t> _versioned_epoch, | |
1358 | TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), | |
1359 | env(_env), versioned_epoch(_versioned_epoch), | |
1360 | topics(_topics) { | |
1361 | } | |
1362 | ||
1363 | ~RGWPSHandleRemoteObjCR() override {} | |
1364 | ||
1365 | RGWStatRemoteObjCBCR *allocate_callback() override { | |
1366 | return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, env, versioned_epoch, topics); | |
1367 | } | |
1368 | }; | |
1369 | ||
1370 | class RGWPSHandleObjCreateCR : public RGWCoroutine { | |
1371 | ||
1372 | RGWDataSyncEnv *sync_env; | |
1373 | RGWBucketInfo bucket_info; | |
1374 | rgw_obj_key key; | |
1375 | PSEnvRef env; | |
1376 | std::optional<uint64_t> versioned_epoch; | |
1377 | TopicsRef topics; | |
1378 | public: | |
1379 | RGWPSHandleObjCreateCR(RGWDataSyncEnv *_sync_env, | |
1380 | RGWBucketInfo& _bucket_info, rgw_obj_key& _key, | |
1381 | PSEnvRef _env, std::optional<uint64_t> _versioned_epoch) : RGWCoroutine(_sync_env->cct), | |
1382 | sync_env(_sync_env), | |
1383 | bucket_info(_bucket_info), | |
1384 | key(_key), | |
1385 | env(_env), | |
1386 | versioned_epoch(_versioned_epoch) { | |
1387 | } | |
1388 | ||
1389 | ~RGWPSHandleObjCreateCR() override {} | |
1390 | ||
1391 | int operate() override { | |
1392 | reenter(this) { | |
1393 | yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner, | |
1394 | bucket_info.bucket, key, | |
eafe8130 | 1395 | rgw::notify::ObjectCreated, |
11fdf7f2 TL |
1396 | &topics)); |
1397 | if (retcode < 0) { | |
eafe8130 | 1398 | ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; |
11fdf7f2 TL |
1399 | return set_cr_error(retcode); |
1400 | } | |
1401 | if (topics->empty()) { | |
1402 | ldout(sync_env->cct, 20) << "no topics found for " << bucket_info.bucket << "/" << key << dendl; | |
1403 | return set_cr_done(); | |
1404 | } | |
1405 | yield call(new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch, topics)); | |
1406 | if (retcode < 0) { | |
1407 | return set_cr_error(retcode); | |
1408 | } | |
1409 | return set_cr_done(); | |
1410 | } | |
1411 | return 0; | |
1412 | } | |
1413 | }; | |
1414 | ||
eafe8130 | 1415 | // coroutine invoked on remote object deletion |
11fdf7f2 TL |
1416 | class RGWPSGenericObjEventCBCR : public RGWCoroutine { |
1417 | RGWDataSyncEnv *sync_env; | |
1418 | PSEnvRef env; | |
1419 | rgw_user owner; | |
1420 | rgw_bucket bucket; | |
1421 | rgw_obj_key key; | |
1422 | ceph::real_time mtime; | |
eafe8130 TL |
1423 | rgw::notify::EventType event_type; |
1424 | EventRef<rgw_pubsub_event> event; | |
1425 | EventRef<rgw_pubsub_s3_record> record; | |
11fdf7f2 TL |
1426 | TopicsRef topics; |
1427 | public: | |
1428 | RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env, | |
1429 | PSEnvRef _env, | |
1430 | RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime, | |
eafe8130 | 1431 | rgw::notify::EventType _event_type) : RGWCoroutine(_sync_env->cct), |
11fdf7f2 TL |
1432 | sync_env(_sync_env), |
1433 | env(_env), | |
1434 | owner(_bucket_info.owner), | |
1435 | bucket(_bucket_info.bucket), | |
1436 | key(_key), | |
eafe8130 | 1437 | mtime(_mtime), event_type(_event_type) {} |
11fdf7f2 TL |
1438 | int operate() override { |
1439 | reenter(this) { | |
eafe8130 | 1440 | ldout(sync_env->cct, 20) << ": remove remote obj: z=" << sync_env->source_zone |
11fdf7f2 | 1441 | << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl; |
eafe8130 | 1442 | yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_type, &topics)); |
11fdf7f2 | 1443 | if (retcode < 0) { |
eafe8130 | 1444 | ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; |
11fdf7f2 TL |
1445 | return set_cr_error(retcode); |
1446 | } | |
1447 | if (topics->empty()) { | |
1448 | ldout(sync_env->cct, 20) << "no topics found for " << bucket << "/" << key << dendl; | |
1449 | return set_cr_done(); | |
1450 | } | |
eafe8130 TL |
1451 | // at this point we don't know whether we need the ceph event or S3 record |
1452 | // this is why both are created here, once we have information about the | |
1453 | // subscription, we will store/push only the relevant ones | |
1454 | make_event_ref(sync_env->cct, | |
1455 | bucket, key, | |
1456 | mtime, nullptr, | |
1457 | event_type, &event); | |
1458 | make_s3_record_ref(sync_env->cct, | |
1459 | bucket, owner, key, | |
1460 | mtime, nullptr, | |
1461 | event_type, &record); | |
1462 | yield call(new RGWPSHandleObjEventCR(sync_env, env, owner, event, record, topics)); | |
11fdf7f2 TL |
1463 | if (retcode < 0) { |
1464 | return set_cr_error(retcode); | |
1465 | } | |
1466 | return set_cr_done(); | |
1467 | } | |
1468 | return 0; | |
1469 | } | |
1470 | ||
1471 | }; | |
1472 | ||
1473 | class RGWPSDataSyncModule : public RGWDataSyncModule { | |
1474 | PSEnvRef env; | |
1475 | PSConfigRef& conf; | |
eafe8130 | 1476 | |
11fdf7f2 TL |
1477 | public: |
1478 | RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared<PSEnv>()), conf(env->conf) { | |
1479 | env->init(cct, config); | |
1480 | } | |
eafe8130 | 1481 | |
11fdf7f2 TL |
1482 | ~RGWPSDataSyncModule() override {} |
1483 | ||
1484 | void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override { | |
1485 | PSManagerRef mgr = PSManager::get_shared(sync_env, env); | |
1486 | env->init_instance(sync_env->store->svc.zone->get_realm(), instance_id, mgr); | |
1487 | } | |
1488 | ||
1489 | RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) override { | |
1490 | ldout(sync_env->cct, 5) << conf->id << ": start" << dendl; | |
1491 | return new RGWPSInitEnvCBCR(sync_env, env); | |
1492 | } | |
eafe8130 TL |
1493 | |
1494 | RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, | |
1495 | rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override { | |
1496 | ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << | |
1497 | " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; | |
11fdf7f2 TL |
1498 | return new RGWPSHandleObjCreateCR(sync_env, bucket_info, key, env, versioned_epoch); |
1499 | } | |
eafe8130 TL |
1500 | |
1501 | RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, | |
1502 | rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { | |
1503 | ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << | |
1504 | " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; | |
1505 | return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDelete); | |
11fdf7f2 | 1506 | } |
eafe8130 TL |
1507 | |
1508 | RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, | |
1509 | rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { | |
1510 | ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << | |
1511 | " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; | |
1512 | return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated); | |
11fdf7f2 TL |
1513 | } |
1514 | ||
1515 | PSConfigRef& get_conf() { return conf; } | |
1516 | }; | |
1517 | ||
1518 | RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config) | |
1519 | { | |
1520 | data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config)); | |
1521 | string jconf = json_str("conf", *data_handler->get_conf()); | |
1522 | JSONParser p; | |
1523 | if (!p.parse(jconf.c_str(), jconf.size())) { | |
eafe8130 | 1524 | ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl; |
11fdf7f2 TL |
1525 | effective_conf = config; |
1526 | } else { | |
1527 | effective_conf.decode_json(&p); | |
1528 | } | |
eafe8130 TL |
1529 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT |
1530 | if (!rgw::amqp::init(cct)) { | |
1531 | ldout(cct, 1) << "ERROR: failed to initialize AMQP manager in pubsub sync module" << dendl; | |
1532 | } | |
1533 | #endif | |
1534 | } | |
1535 | ||
1536 | RGWPSSyncModuleInstance::~RGWPSSyncModuleInstance() { | |
1537 | #ifdef WITH_RADOSGW_AMQP_ENDPOINT | |
1538 | rgw::amqp::shutdown(); | |
1539 | #endif | |
11fdf7f2 TL |
1540 | } |
1541 | ||
1542 | RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler() | |
1543 | { | |
1544 | return data_handler.get(); | |
1545 | } | |
1546 | ||
1547 | RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) { | |
1548 | if (dialect != RGW_REST_S3) { | |
1549 | return orig; | |
1550 | } | |
eafe8130 TL |
1551 | return new RGWRESTMgr_PubSub(); |
1552 | } | |
1553 | ||
1554 | bool RGWPSSyncModuleInstance::should_full_sync() const { | |
1555 | return data_handler->get_conf()->start_with_full_sync; | |
11fdf7f2 TL |
1556 | } |
1557 | ||
1558 | int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { | |
1559 | instance->reset(new RGWPSSyncModuleInstance(cct, config)); | |
1560 | return 0; | |
1561 | } | |
1562 | ||
eafe8130 | 1563 |